Browse Source

fix(metrics): handle the case where the worker is already assigned to a thread (#1171)

Rob Landers 3 months ago
parent
commit
08e99fc85f
3 changed files with 50 additions and 29 deletions
  1. 4 4
      caddy/watcher_test.go
  2. 4 2
      php_thread.go
  3. 42 23
      worker.go

+ 4 - 4
caddy/watcher_test.go

@@ -15,7 +15,7 @@ func TestWorkerWithInactiveWatcher(t *testing.T) {
 		{
 			skip_install_trust
 			admin localhost:2999
-			http_port 9080
+			http_port `+testPort+`
 
 			frankenphp {
 				worker {
@@ -26,13 +26,13 @@ func TestWorkerWithInactiveWatcher(t *testing.T) {
 			}
 		}
 
-		localhost:9080 {
+		localhost:`+testPort+` {
 			root ../testdata
 			rewrite worker-with-watcher.php
 			php
 		}
 		`, "caddyfile")
 
-	tester.AssertGetResponse("http://localhost:9080", http.StatusOK, "requests:1")
-	tester.AssertGetResponse("http://localhost:9080", http.StatusOK, "requests:2")
+	tester.AssertGetResponse("http://localhost:"+testPort, http.StatusOK, "requests:1")
+	tester.AssertGetResponse("http://localhost:"+testPort, http.StatusOK, "requests:2")
 }

+ 4 - 2
php_thread.go

@@ -6,6 +6,7 @@ import "C"
 import (
 	"net/http"
 	"runtime"
+	"sync"
 	"unsafe"
 )
 
@@ -19,6 +20,7 @@ type phpThread struct {
 	worker            *worker
 	requestChan       chan *http.Request
 	knownVariableKeys map[string]*C.zend_string
+	readiedOnce       sync.Once
 }
 
 func initPHPThreads(numThreads int) {
@@ -28,7 +30,7 @@ func initPHPThreads(numThreads int) {
 	}
 }
 
-func (thread phpThread) getActiveRequest() *http.Request {
+func (thread *phpThread) getActiveRequest() *http.Request {
 	if thread.workerRequest != nil {
 		return thread.workerRequest
 	}
@@ -46,5 +48,5 @@ func (thread *phpThread) pinString(s string) *C.char {
 
 // C strings must be null-terminated
 func (thread *phpThread) pinCString(s string) *C.char {
-	return thread.pinString(s+"\x00")
+	return thread.pinString(s + "\x00")
 }

+ 42 - 23
worker.go

@@ -9,7 +9,6 @@ import (
 	"net/http"
 	"path/filepath"
 	"sync"
-	"sync/atomic"
 	"time"
 
 	"github.com/dunglas/frankenphp/internal/watcher"
@@ -24,6 +23,7 @@ type worker struct {
 	requestChan chan *http.Request
 	threads     []*phpThread
 	threadMutex sync.RWMutex
+	ready       chan struct{}
 }
 
 const maxWorkerErrorBackoff = 1 * time.Second
@@ -32,18 +32,15 @@ const maxWorkerConsecutiveFailures = 6
 
 var (
 	watcherIsEnabled bool
-	workersReadyWG   sync.WaitGroup
 	workerShutdownWG sync.WaitGroup
-	workersAreReady  atomic.Bool
-	workersAreDone   atomic.Bool
 	workersDone      chan interface{}
 	workers          = make(map[string]*worker)
 )
 
 func initWorkers(opt []workerOpt) error {
 	workersDone = make(chan interface{})
-	workersAreReady.Store(false)
-	workersAreDone.Store(false)
+
+	ready := sync.WaitGroup{}
 
 	for _, o := range opt {
 		worker, err := newWorker(o)
@@ -51,14 +48,19 @@ func initWorkers(opt []workerOpt) error {
 		if err != nil {
 			return err
 		}
-		workersReadyWG.Add(worker.num)
 		for i := 0; i < worker.num; i++ {
 			go worker.startNewWorkerThread()
 		}
+		ready.Add(1)
+		go func() {
+			for i := 0; i < worker.num; i++ {
+				<-worker.ready
+			}
+			ready.Done()
+		}()
 	}
 
-	workersReadyWG.Wait()
-	workersAreReady.Store(true)
+	ready.Wait()
 
 	return nil
 }
@@ -80,7 +82,13 @@ func newWorker(o workerOpt) (*worker, error) {
 	}
 
 	o.env["FRANKENPHP_WORKER\x00"] = "1"
-	w := &worker{fileName: absFileName, num: o.num, env: o.env, requestChan: make(chan *http.Request)}
+	w := &worker{
+		fileName:    absFileName,
+		num:         o.num,
+		env:         o.env,
+		requestChan: make(chan *http.Request),
+		ready:       make(chan struct{}, o.num),
+	}
 	workers[absFileName] = w
 
 	return w, nil
@@ -145,8 +153,20 @@ func (worker *worker) startNewWorkerThread() {
 		fc := r.Context().Value(contextKey).(*FrankenPHPContext)
 
 		// if we are done, exit the loop that restarts the worker script
-		if workersAreDone.Load() {
-			break
+		select {
+		case _, ok := <-workersDone:
+			if !ok {
+				metrics.StopWorker(worker.fileName, StopReasonShutdown)
+
+				if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
+					c.Write(zap.String("worker", worker.fileName))
+				}
+
+				return
+			}
+			// continue on since the channel is still open
+		default:
+			// continue on since the channel is still open
 		}
 
 		// on exit status 0 we just run the worker script again
@@ -184,12 +204,7 @@ func (worker *worker) startNewWorkerThread() {
 		metrics.StopWorker(worker.fileName, StopReasonCrash)
 	}
 
-	metrics.StopWorker(worker.fileName, StopReasonShutdown)
-
-	// TODO: check if the termination is expected
-	if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
-		c.Write(zap.String("worker", worker.fileName))
-	}
+	// unreachable
 }
 
 func (worker *worker) handleRequest(r *http.Request) {
@@ -210,7 +225,6 @@ func (worker *worker) handleRequest(r *http.Request) {
 }
 
 func stopWorkers() {
-	workersAreDone.Store(true)
 	close(workersDone)
 }
 
@@ -253,15 +267,11 @@ func restartWorkers(workerOpts []workerOpt) {
 
 func assignThreadToWorker(thread *phpThread) {
 	fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
-	metrics.ReadyWorker(fc.scriptFilename)
 	worker, ok := workers[fc.scriptFilename]
 	if !ok {
 		panic("worker not found for script: " + fc.scriptFilename)
 	}
 	thread.worker = worker
-	if !workersAreReady.Load() {
-		workersReadyWG.Done()
-	}
 	thread.requestChan = make(chan *http.Request)
 	worker.threadMutex.Lock()
 	worker.threads = append(worker.threads, thread)
@@ -276,6 +286,15 @@ func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
 	if thread.worker == nil {
 		assignThreadToWorker(thread)
 	}
+	thread.readiedOnce.Do(func() {
+		// inform metrics that the worker is ready
+		metrics.ReadyWorker(thread.worker.fileName)
+	})
+
+	select {
+	case thread.worker.ready <- struct{}{}:
+	default:
+	}
 
 	if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
 		c.Write(zap.String("worker", thread.worker.fileName))