123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- package frankenphp
- // #include <stdlib.h>
- // #include "frankenphp.h"
- import "C"
- import (
- "fmt"
- "github.com/dunglas/frankenphp/internal/fastabs"
- "net/http"
- "path/filepath"
- "sync"
- "sync/atomic"
- "time"
- "github.com/dunglas/frankenphp/internal/watcher"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
- )
- type worker struct {
- fileName string
- num int
- env PreparedEnv
- requestChan chan *http.Request
- threads []*phpThread
- threadMutex sync.RWMutex
- }
- const maxWorkerErrorBackoff = 1 * time.Second
- const minWorkerErrorBackoff = 100 * time.Millisecond
- const maxWorkerConsecutiveFailures = 6
- var (
- watcherIsEnabled bool
- workersReadyWG sync.WaitGroup
- workerShutdownWG sync.WaitGroup
- workersAreReady atomic.Bool
- workersAreDone atomic.Bool
- workersDone chan interface{}
- workers = make(map[string]*worker)
- )
- func initWorkers(opt []workerOpt) error {
- workersDone = make(chan interface{})
- workersAreReady.Store(false)
- workersAreDone.Store(false)
- for _, o := range opt {
- worker, err := newWorker(o)
- worker.threads = make([]*phpThread, 0, o.num)
- if err != nil {
- return err
- }
- workersReadyWG.Add(worker.num)
- for i := 0; i < worker.num; i++ {
- go worker.startNewWorkerThread()
- }
- }
- workersReadyWG.Wait()
- workersAreReady.Store(true)
- return nil
- }
- func newWorker(o workerOpt) (*worker, error) {
- absFileName, err := fastabs.FastAbs(o.fileName)
- if err != nil {
- return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
- }
- // if the worker already exists, return it,
- // it's necessary since we don't want to destroy the channels when restarting on file changes
- if w, ok := workers[absFileName]; ok {
- return w, nil
- }
- if o.env == nil {
- o.env = make(PreparedEnv, 1)
- }
- o.env["FRANKENPHP_WORKER\x00"] = "1"
- w := &worker{fileName: absFileName, num: o.num, env: o.env, requestChan: make(chan *http.Request)}
- workers[absFileName] = w
- return w, nil
- }
- func (worker *worker) startNewWorkerThread() {
- workerShutdownWG.Add(1)
- defer workerShutdownWG.Done()
- backoff := minWorkerErrorBackoff
- failureCount := 0
- backingOffLock := sync.RWMutex{}
- for {
- // if the worker can stay up longer than backoff*2, it is probably an application error
- upFunc := sync.Once{}
- go func() {
- backingOffLock.RLock()
- wait := backoff * 2
- backingOffLock.RUnlock()
- time.Sleep(wait)
- upFunc.Do(func() {
- backingOffLock.Lock()
- defer backingOffLock.Unlock()
- // if we come back to a stable state, reset the failure count
- if backoff == minWorkerErrorBackoff {
- failureCount = 0
- }
- // earn back the backoff over time
- if failureCount > 0 {
- backoff = max(backoff/2, 100*time.Millisecond)
- }
- })
- }()
- metrics.StartWorker(worker.fileName)
- // Create main dummy request
- r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
- if err != nil {
- panic(err)
- }
- r, err = NewRequestWithContext(
- r,
- WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
- WithRequestPreparedEnv(worker.env),
- )
- if err != nil {
- panic(err)
- }
- if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
- c.Write(zap.String("worker", worker.fileName), zap.Int("num", worker.num))
- }
- if err := ServeHTTP(nil, r); err != nil {
- panic(err)
- }
- fc := r.Context().Value(contextKey).(*FrankenPHPContext)
- // if we are done, exit the loop that restarts the worker script
- if workersAreDone.Load() {
- break
- }
- // on exit status 0 we just run the worker script again
- if fc.exitStatus == 0 {
- // TODO: make the max restart configurable
- if c := logger.Check(zapcore.InfoLevel, "restarting"); c != nil {
- c.Write(zap.String("worker", worker.fileName))
- }
- metrics.StopWorker(worker.fileName, StopReasonRestart)
- continue
- }
- // on exit status 1 we log the error and apply an exponential backoff when restarting
- upFunc.Do(func() {
- backingOffLock.Lock()
- defer backingOffLock.Unlock()
- // if we end up here, the worker has not been up for backoff*2
- // this is probably due to a syntax error or another fatal error
- if failureCount >= maxWorkerConsecutiveFailures {
- if !watcherIsEnabled {
- panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
- }
- logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", failureCount))
- }
- failureCount += 1
- })
- backingOffLock.RLock()
- wait := backoff
- backingOffLock.RUnlock()
- time.Sleep(wait)
- backingOffLock.Lock()
- backoff *= 2
- backoff = min(backoff, maxWorkerErrorBackoff)
- backingOffLock.Unlock()
- metrics.StopWorker(worker.fileName, StopReasonCrash)
- }
- metrics.StopWorker(worker.fileName, StopReasonShutdown)
- // TODO: check if the termination is expected
- if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
- c.Write(zap.String("worker", worker.fileName))
- }
- }
- func (worker *worker) handleRequest(r *http.Request) {
- worker.threadMutex.RLock()
- // dispatch requests to all worker threads in order
- for _, thread := range worker.threads {
- select {
- case thread.requestChan <- r:
- worker.threadMutex.RUnlock()
- return
- default:
- }
- }
- worker.threadMutex.RUnlock()
- // if no thread was available, fan the request out to all threads
- // TODO: theoretically there could be autoscaling of threads here
- worker.requestChan <- r
- }
- func stopWorkers() {
- workersAreDone.Store(true)
- close(workersDone)
- }
- func drainWorkers() {
- watcher.DrainWatcher()
- watcherIsEnabled = false
- stopWorkers()
- workerShutdownWG.Wait()
- workers = make(map[string]*worker)
- }
- func restartWorkersOnFileChanges(workerOpts []workerOpt) error {
- var directoriesToWatch []string
- for _, w := range workerOpts {
- directoriesToWatch = append(directoriesToWatch, w.watch...)
- }
- watcherIsEnabled = len(directoriesToWatch) > 0
- if !watcherIsEnabled {
- return nil
- }
- restartWorkers := func() {
- restartWorkers(workerOpts)
- }
- if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
- return err
- }
- return nil
- }
- func restartWorkers(workerOpts []workerOpt) {
- stopWorkers()
- workerShutdownWG.Wait()
- if err := initWorkers(workerOpts); err != nil {
- logger.Error("failed to restart workers when watching files")
- panic(err)
- }
- logger.Info("workers restarted successfully")
- }
- func assignThreadToWorker(thread *phpThread) {
- fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
- metrics.ReadyWorker(fc.scriptFilename)
- worker, ok := workers[fc.scriptFilename]
- if !ok {
- panic("worker not found for script: " + fc.scriptFilename)
- }
- thread.worker = worker
- if !workersAreReady.Load() {
- workersReadyWG.Done()
- }
- thread.requestChan = make(chan *http.Request)
- worker.threadMutex.Lock()
- worker.threads = append(worker.threads, thread)
- worker.threadMutex.Unlock()
- }
- //export go_frankenphp_worker_handle_request_start
- func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
- thread := phpThreads[threadIndex]
- // we assign a worker to the thread if it doesn't have one already
- if thread.worker == nil {
- assignThreadToWorker(thread)
- }
- if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
- c.Write(zap.String("worker", thread.worker.fileName))
- }
- var r *http.Request
- select {
- case <-workersDone:
- if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
- c.Write(zap.String("worker", thread.worker.fileName))
- }
- thread.worker = nil
- executePHPFunction("opcache_reset")
- return C.bool(false)
- case r = <-thread.worker.requestChan:
- case r = <-thread.requestChan:
- }
- thread.workerRequest = r
- if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
- c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI))
- }
- if err := updateServerContext(thread, r, false, true); err != nil {
- // Unexpected error
- if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
- c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
- }
- fc := r.Context().Value(contextKey).(*FrankenPHPContext)
- rejectRequest(fc.responseWriter, err.Error())
- maybeCloseContext(fc)
- thread.workerRequest = nil
- thread.Unpin()
- return go_frankenphp_worker_handle_request_start(threadIndex)
- }
- return C.bool(true)
- }
- //export go_frankenphp_finish_request
- func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool) {
- thread := phpThreads[threadIndex]
- r := thread.getActiveRequest()
- fc := r.Context().Value(contextKey).(*FrankenPHPContext)
- if isWorkerRequest {
- thread.workerRequest = nil
- }
- maybeCloseContext(fc)
- if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
- var fields []zap.Field
- if isWorkerRequest {
- fields = append(fields, zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
- } else {
- fields = append(fields, zap.String("url", r.RequestURI))
- }
- c.Write(fields...)
- }
- if isWorkerRequest {
- thread.Unpin()
- }
- }
|