Просмотр исходного кода

Adjusts scaling logic and comments.

Alliballibaba 2 месяцев назад
Родитель
Сommit
83e0c0806c
3 измененных файлов с 18 добавлено и 26 удалено
  1. 13 11
      scaling.go
  2. 2 14
      scaling_test.go
  3. 3 1
      worker.go

+ 13 - 11
scaling.go

@@ -56,10 +56,11 @@ func initAutoScaling(numThreads int, maxThreads int) {
 
 func drainAutoScaling() {
 	scalingMu.Lock()
+	logger.Debug("shutting down autoscalin", zap.Int("num scaled threads", len(autoScaledThreads)))
 	scalingMu.Unlock()
 }
 
-// turn the first inactive/reserved thread into a regular thread
+// AddRegularThread adds one regular PHP thread at runtime if max_threads are not yet reached
 func AddRegularThread() (int, error) {
 	scalingMu.Lock()
 	defer scalingMu.Unlock()
@@ -77,7 +78,7 @@ func addRegularThread() (*phpThread, error) {
 	return thread, nil
 }
 
-// remove the last regular thread
+// RemoveRegularThread removes one regular PHP thread at runtime, won't remove the last thread
 func RemoveRegularThread() (int, error) {
 	scalingMu.Lock()
 	defer scalingMu.Unlock()
@@ -97,7 +98,7 @@ func removeRegularThread() error {
 	return nil
 }
 
-// turn the first inactive/reserved thread into a worker thread
+// AddWorkerThread adds one PHP worker thread at runtime if max_threads are not yet reached
 func AddWorkerThread(workerFileName string) (int, error) {
 	worker, ok := workers[workerFileName]
 	if !ok {
@@ -119,7 +120,7 @@ func addWorkerThread(worker *worker) (*phpThread, error) {
 	return thread, nil
 }
 
-// remove the last worker thread
+// RemoveWorkerThread removes one PHP worker thread at runtime, won't remove the last thread
 func RemoveWorkerThread(workerFileName string) (int, error) {
 	worker, ok := workers[workerFileName]
 	if !ok {
@@ -254,14 +255,15 @@ func deactivateThreads() {
 			continue
 		}
 
+		// TODO: reactivate thread-shutdown once there's a way around leaky PECL extensions like #1299
 		// if threads are already inactive, shut them down
-		if thread.state.is(stateInactive) && waitTime > maxThreadIdleTime.Milliseconds() {
-			logger.Debug("auto-stopping thread", zap.Int("threadIndex", thread.threadIndex))
-			thread.shutdown()
-			stoppedThreadCount++
-			autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
-			continue
-		}
+		//if thread.state.is(stateInactive) && waitTime > maxThreadIdleTime.Milliseconds() {
+		//	logger.Debug("auto-stopping thread", zap.Int("threadIndex", thread.threadIndex))
+		//	thread.shutdown()
+		//	stoppedThreadCount++
+		//	autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
+		//	continue
+		//}
 	}
 }
 

+ 2 - 14
scaling_test.go

@@ -22,17 +22,11 @@ func TestScaleARegularThreadUpAndDown(t *testing.T) {
 	assert.Equal(t, stateReady, autoScaledThread.state.get())
 	assert.IsType(t, &regularThread{}, autoScaledThread.handler)
 
-	// on the first down-scale, the thread will be marked as inactive
+	// on down-scale, the thread will be marked as inactive
 	setLongWaitTime(autoScaledThread)
 	deactivateThreads()
 	assert.IsType(t, &inactiveThread{}, autoScaledThread.handler)
 
-	// on the second down-scale, the thread will be removed
-	autoScaledThread.state.waitFor(stateInactive)
-	setLongWaitTime(autoScaledThread)
-	deactivateThreads()
-	assert.Equal(t, stateReserved, autoScaledThread.state.get())
-
 	Shutdown()
 }
 
@@ -51,17 +45,11 @@ func TestScaleAWorkerThreadUpAndDown(t *testing.T) {
 	scaleWorkerThread(workers[workerPath])
 	assert.Equal(t, stateReady, autoScaledThread.state.get())
 
-	// on the first down-scale, the thread will be marked as inactive
+	// on down-scale, the thread will be marked as inactive
 	setLongWaitTime(autoScaledThread)
 	deactivateThreads()
 	assert.IsType(t, &inactiveThread{}, autoScaledThread.handler)
 
-	// on the second down-scale, the thread will be removed
-	autoScaledThread.state.waitFor(stateInactive)
-	setLongWaitTime(autoScaledThread)
-	deactivateThreads()
-	assert.Equal(t, stateReserved, autoScaledThread.state.get())
-
 	Shutdown()
 }
 

+ 3 - 1
worker.go

@@ -90,6 +90,7 @@ func drainWorkers() {
 	watcher.DrainWatcher()
 }
 
+// RestartWorkers attempts to restart all worker threads gracefully
 func RestartWorkers() {
 	// disallow scaling threads while restarting workers
 	scalingMu.Lock()
@@ -124,9 +125,10 @@ func RestartWorkers() {
 	}
 }
 
+// WorkerFileNames returns the list of worker file names
 func WorkerFileNames() []string {
 	workerNames := make([]string, 0, len(workers))
-	for fileName, _ := range workers {
+	for fileName := range workers {
 		workerNames = append(workerNames, fileName)
 	}
 	return workerNames