123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- package frankenphp
- // #include "frankenphp.h"
- import "C"
- import (
- "fmt"
- "github.com/dunglas/frankenphp/internal/fastabs"
- "net/http"
- "path/filepath"
- "sync"
- "time"
- "github.com/dunglas/frankenphp/internal/watcher"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
- )
- type worker struct {
- fileName string
- num int
- env PreparedEnv
- requestChan chan *http.Request
- threads []*phpThread
- threadMutex sync.RWMutex
- }
- var (
- workers map[string]*worker
- workersDone chan interface{}
- watcherIsEnabled bool
- )
- func initWorkers(opt []workerOpt) error {
- workers = make(map[string]*worker, len(opt))
- workersDone = make(chan interface{})
- directoriesToWatch := getDirectoriesToWatch(opt)
- watcherIsEnabled = len(directoriesToWatch) > 0
- for _, o := range opt {
- worker, err := newWorker(o)
- worker.threads = make([]*phpThread, 0, o.num)
- if err != nil {
- return err
- }
- for i := 0; i < worker.num; i++ {
- worker.startNewThread()
- }
- }
- if len(directoriesToWatch) == 0 {
- return nil
- }
- if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
- return err
- }
- return nil
- }
- func newWorker(o workerOpt) (*worker, error) {
- absFileName, err := fastabs.FastAbs(o.fileName)
- if err != nil {
- return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
- }
- if o.env == nil {
- o.env = make(PreparedEnv, 1)
- }
- o.env["FRANKENPHP_WORKER\x00"] = "1"
- w := &worker{fileName: absFileName, num: o.num, env: o.env, requestChan: make(chan *http.Request)}
- workers[absFileName] = w
- return w, nil
- }
- func stopWorkers() {
- close(workersDone)
- }
- func drainWorkers() {
- watcher.DrainWatcher()
- stopWorkers()
- }
- func restartWorkers() {
- restart := sync.WaitGroup{}
- restart.Add(1)
- ready := sync.WaitGroup{}
- for _, worker := range workers {
- worker.threadMutex.RLock()
- ready.Add(len(worker.threads))
- for _, thread := range worker.threads {
- thread.state.set(stateRestarting)
- go func(thread *phpThread) {
- thread.state.waitForAndYield(&restart, stateReady)
- ready.Done()
- }(thread)
- }
- worker.threadMutex.RUnlock()
- }
- stopWorkers()
- ready.Wait()
- workersDone = make(chan interface{})
- restart.Done()
- }
- func getDirectoriesToWatch(workerOpts []workerOpt) []string {
- directoriesToWatch := []string{}
- for _, w := range workerOpts {
- directoriesToWatch = append(directoriesToWatch, w.watch...)
- }
- return directoriesToWatch
- }
- func (worker *worker) startNewThread() {
- getInactivePHPThread().setActive(
- // onStartup => right before the thread is ready
- func(thread *phpThread) {
- thread.worker = worker
- thread.scriptName = worker.fileName
- thread.requestChan = make(chan *http.Request)
- thread.backoff = newExponentialBackoff()
- worker.threadMutex.Lock()
- worker.threads = append(worker.threads, thread)
- worker.threadMutex.Unlock()
- metrics.ReadyWorker(worker.fileName)
- },
- // beforeScriptExecution => set up the worker with a fake request
- func(thread *phpThread) {
- worker.beforeScript(thread)
- },
- // afterScriptExecution => tear down the worker
- func(thread *phpThread, exitStatus int) {
- worker.afterScript(thread, exitStatus)
- },
- // onShutdown => after the thread is done
- func(thread *phpThread) {
- thread.worker = nil
- thread.backoff = nil
- },
- )
- }
- func (worker *worker) beforeScript(thread *phpThread) {
- // if we are restarting due to file watching, set the state back to ready
- if thread.state.is(stateRestarting) {
- thread.state.set(stateReady)
- }
- thread.backoff.reset()
- metrics.StartWorker(worker.fileName)
- // Create a dummy request to set up the worker
- r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
- if err != nil {
- panic(err)
- }
- r, err = NewRequestWithContext(
- r,
- WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
- WithRequestPreparedEnv(worker.env),
- )
- if err != nil {
- panic(err)
- }
- if err := updateServerContext(thread, r, true, false); err != nil {
- panic(err)
- }
- thread.mainRequest = r
- if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
- c.Write(zap.String("worker", worker.fileName), zap.Int("thread", thread.threadIndex))
- }
- }
- func (worker *worker) afterScript(thread *phpThread, exitStatus int) {
- fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
- fc.exitStatus = exitStatus
- defer func() {
- maybeCloseContext(fc)
- thread.mainRequest = nil
- }()
- // on exit status 0 we just run the worker script again
- if fc.exitStatus == 0 {
- // TODO: make the max restart configurable
- metrics.StopWorker(worker.fileName, StopReasonRestart)
- if c := logger.Check(zapcore.DebugLevel, "restarting"); c != nil {
- c.Write(zap.String("worker", worker.fileName))
- }
- return
- }
- // on exit status 1 we apply an exponential backoff when restarting
- metrics.StopWorker(worker.fileName, StopReasonCrash)
- thread.backoff.trigger(func(failureCount int) {
- // if we end up here, the worker has not been up for backoff*2
- // this is probably due to a syntax error or another fatal error
- if !watcherIsEnabled {
- panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
- }
- logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", failureCount))
- })
- }
- func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) {
- metrics.StartWorkerRequest(fc.scriptFilename)
- // dispatch requests to all worker threads in order
- worker.threadMutex.RLock()
- for _, thread := range worker.threads {
- select {
- case thread.requestChan <- r:
- worker.threadMutex.RUnlock()
- <-fc.done
- metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
- return
- default:
- }
- }
- worker.threadMutex.RUnlock()
- // if no thread was available, fan the request out to all threads
- // TODO: theoretically there could be autoscaling of threads here
- worker.requestChan <- r
- <-fc.done
- metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
- }
- //export go_frankenphp_worker_handle_request_start
- func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
- thread := phpThreads[threadIndex]
- if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
- c.Write(zap.String("worker", thread.worker.fileName))
- }
- var r *http.Request
- select {
- case <-workersDone:
- if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
- c.Write(zap.String("worker", thread.worker.fileName))
- }
- // execute opcache_reset if the restart was triggered by the watcher
- if watcherIsEnabled && thread.state.is(stateRestarting) {
- C.frankenphp_reset_opcache()
- }
- return C.bool(false)
- case r = <-thread.requestChan:
- case r = <-thread.worker.requestChan:
- }
- thread.workerRequest = r
- if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
- c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI))
- }
- if err := updateServerContext(thread, r, false, true); err != nil {
- // Unexpected error
- if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
- c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
- }
- fc := r.Context().Value(contextKey).(*FrankenPHPContext)
- rejectRequest(fc.responseWriter, err.Error())
- maybeCloseContext(fc)
- thread.workerRequest = nil
- thread.Unpin()
- return go_frankenphp_worker_handle_request_start(threadIndex)
- }
- return C.bool(true)
- }
- //export go_frankenphp_finish_worker_request
- func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) {
- thread := phpThreads[threadIndex]
- r := thread.getActiveRequest()
- fc := r.Context().Value(contextKey).(*FrankenPHPContext)
- maybeCloseContext(fc)
- thread.workerRequest = nil
- thread.Unpin()
- if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
- c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
- }
- }
- // when frankenphp_finish_request() is directly called from PHP
- //
- //export go_frankenphp_finish_php_request
- func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) {
- r := phpThreads[threadIndex].getActiveRequest()
- fc := r.Context().Value(contextKey).(*FrankenPHPContext)
- maybeCloseContext(fc)
- if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
- c.Write(zap.String("url", r.RequestURI))
- }
- }
|