123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- package frankenphp
- // #include "frankenphp.h"
- import "C"
- import (
- "net/http"
- "path/filepath"
- "time"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
- )
- // representation of a thread assigned to a worker script
- // executes the PHP worker script in a loop
- // implements the threadHandler interface
- type workerThread struct {
- state *threadState
- thread *phpThread
- worker *worker
- fakeRequest *http.Request
- workerRequest *http.Request
- backoff *exponentialBackoff
- inRequest bool // true if the worker is currently handling a request
- }
- func convertToWorkerThread(thread *phpThread, worker *worker) {
- thread.setHandler(&workerThread{
- state: thread.state,
- thread: thread,
- worker: worker,
- backoff: &exponentialBackoff{
- maxBackoff: 1 * time.Second,
- minBackoff: 100 * time.Millisecond,
- maxConsecutiveFailures: 6,
- },
- })
- worker.attachThread(thread)
- }
- // beforeScriptExecution returns the name of the script or an empty string on shutdown
- func (handler *workerThread) beforeScriptExecution() string {
- switch handler.state.get() {
- case stateTransitionRequested:
- handler.worker.detachThread(handler.thread)
- return handler.thread.transitionToNewHandler()
- case stateRestarting:
- handler.state.set(stateYielding)
- handler.state.waitFor(stateReady, stateShuttingDown)
- return handler.beforeScriptExecution()
- case stateReady, stateTransitionComplete:
- setupWorkerScript(handler, handler.worker)
- return handler.worker.fileName
- case stateShuttingDown:
- handler.worker.detachThread(handler.thread)
- // signal to stop
- return ""
- }
- panic("unexpected state: " + handler.state.name())
- }
- func (handler *workerThread) afterScriptExecution(exitStatus int) {
- tearDownWorkerScript(handler, exitStatus)
- }
- func (handler *workerThread) getActiveRequest() *http.Request {
- if handler.workerRequest != nil {
- return handler.workerRequest
- }
- return handler.fakeRequest
- }
- func (handler *workerThread) name() string {
- return "Worker PHP Thread - " + handler.worker.fileName
- }
- func setupWorkerScript(handler *workerThread, worker *worker) {
- handler.backoff.wait()
- metrics.StartWorker(worker.fileName)
- // Create a dummy request to set up the worker
- r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
- if err != nil {
- panic(err)
- }
- r, err = NewRequestWithContext(
- r,
- WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
- WithRequestPreparedEnv(worker.env),
- )
- if err != nil {
- panic(err)
- }
- if err := updateServerContext(handler.thread, r, true, false); err != nil {
- panic(err)
- }
- handler.setFakeRequest(r)
- if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
- c.Write(zap.String("worker", worker.fileName), zap.Int("thread", handler.thread.threadIndex))
- }
- }
- func tearDownWorkerScript(handler *workerThread, exitStatus int) {
- // if the worker request is not nil, the script might have crashed
- // make sure to close the worker request context
- if handler.workerRequest != nil {
- fc := handler.workerRequest.Context().Value(contextKey).(*FrankenPHPContext)
- maybeCloseContext(fc)
- handler.setWorkerRequest(nil)
- }
- fc := handler.fakeRequest.Context().Value(contextKey).(*FrankenPHPContext)
- fc.exitStatus = exitStatus
- handler.setFakeRequest(nil)
- // on exit status 0 we just run the worker script again
- worker := handler.worker
- if fc.exitStatus == 0 {
- // TODO: make the max restart configurable
- metrics.StopWorker(worker.fileName, StopReasonRestart)
- handler.backoff.recordSuccess()
- if c := logger.Check(zapcore.DebugLevel, "restarting"); c != nil {
- c.Write(zap.String("worker", worker.fileName))
- }
- return
- }
- // TODO: error status
- // on exit status 1 we apply an exponential backoff when restarting
- metrics.StopWorker(worker.fileName, StopReasonCrash)
- if !handler.inRequest && handler.backoff.recordFailure() {
- if !watcherIsEnabled {
- logger.Panic("too many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
- }
- logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
- }
- }
- // waitForWorkerRequest is called during frankenphp_handle_request in the php worker script.
- func (handler *workerThread) waitForWorkerRequest() bool {
- // unpin any memory left over from previous requests
- handler.thread.Unpin()
- handler.state.markAsWaiting(true)
- if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
- c.Write(zap.String("worker", handler.worker.fileName))
- }
- if handler.state.compareAndSwap(stateTransitionComplete, stateReady) {
- metrics.ReadyWorker(handler.worker.fileName)
- }
- var r *http.Request
- select {
- case <-handler.thread.drainChan:
- if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
- c.Write(zap.String("worker", handler.worker.fileName))
- }
- // flush the opcache when restarting due to watcher or admin api
- // note: this is done right before frankenphp_handle_request() returns 'false'
- if handler.state.is(stateRestarting) {
- C.frankenphp_reset_opcache()
- }
- return false
- case r = <-handler.thread.requestChan:
- case r = <-handler.worker.requestChan:
- }
- handler.setWorkerRequest(r)
- handler.state.markAsWaiting(false)
- if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
- c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI))
- }
- handler.inRequest = true
- if err := updateServerContext(handler.thread, r, false, true); err != nil {
- // Unexpected error or invalid request
- if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
- c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
- }
- fc := r.Context().Value(contextKey).(*FrankenPHPContext)
- rejectRequest(fc.responseWriter, err.Error())
- maybeCloseContext(fc)
- handler.workerRequest = nil
- return handler.waitForWorkerRequest()
- }
- return true
- }
- func (handler *workerThread) setWorkerRequest(r *http.Request) {
- handler.thread.requestMu.Lock()
- handler.workerRequest = r
- handler.thread.requestMu.Unlock()
- }
- func (handler *workerThread) setFakeRequest(r *http.Request) {
- handler.thread.requestMu.Lock()
- handler.fakeRequest = r
- handler.thread.requestMu.Unlock()
- }
- // go_frankenphp_worker_handle_request_start is called at the start of every php request served.
- //
- //export go_frankenphp_worker_handle_request_start
- func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
- handler := phpThreads[threadIndex].handler.(*workerThread)
- return C.bool(handler.waitForWorkerRequest())
- }
- // go_frankenphp_finish_worker_request is called at the end of every php request served.
- //
- //export go_frankenphp_finish_worker_request
- func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) {
- thread := phpThreads[threadIndex]
- r := thread.getActiveRequest()
- fc := r.Context().Value(contextKey).(*FrankenPHPContext)
- maybeCloseContext(fc)
- thread.handler.(*workerThread).workerRequest = nil
- if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
- c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
- }
- }
- // when frankenphp_finish_request() is directly called from PHP
- //
- //export go_frankenphp_finish_php_request
- func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) {
- r := phpThreads[threadIndex].getActiveRequest()
- fc := r.Context().Value(contextKey).(*FrankenPHPContext)
- maybeCloseContext(fc)
- if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
- c.Write(zap.String("url", r.RequestURI))
- }
- }
|