123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- package frankenphp
- // #include "frankenphp.h"
- import "C"
- import (
- "fmt"
- "github.com/dunglas/frankenphp/internal/fastabs"
- "net/http"
- "sync"
- "time"
- "github.com/dunglas/frankenphp/internal/watcher"
- )
- // represents a worker script and can have many threads assigned to it
- type worker struct {
- fileName string
- num int
- env PreparedEnv
- requestChan chan *http.Request
- threads []*phpThread
- threadMutex sync.RWMutex
- }
- var (
- workers map[string]*worker
- watcherIsEnabled bool
- )
- func initWorkers(opt []workerOpt) error {
- workers = make(map[string]*worker, len(opt))
- workersReady := sync.WaitGroup{}
- directoriesToWatch := getDirectoriesToWatch(opt)
- watcherIsEnabled = len(directoriesToWatch) > 0
- for _, o := range opt {
- worker, err := newWorker(o)
- worker.threads = make([]*phpThread, 0, o.num)
- workersReady.Add(o.num)
- if err != nil {
- return err
- }
- for i := 0; i < worker.num; i++ {
- thread := getInactivePHPThread()
- convertToWorkerThread(thread, worker)
- go func() {
- thread.state.waitFor(stateReady)
- workersReady.Done()
- }()
- }
- }
- workersReady.Wait()
- if !watcherIsEnabled {
- return nil
- }
- if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
- return err
- }
- return nil
- }
- func newWorker(o workerOpt) (*worker, error) {
- absFileName, err := fastabs.FastAbs(o.fileName)
- if err != nil {
- return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
- }
- if o.env == nil {
- o.env = make(PreparedEnv, 1)
- }
- o.env["FRANKENPHP_WORKER\x00"] = "1"
- w := &worker{
- fileName: absFileName,
- num: o.num,
- env: o.env,
- requestChan: make(chan *http.Request),
- }
- workers[absFileName] = w
- return w, nil
- }
- func drainWorkers() {
- watcher.DrainWatcher()
- }
- func restartWorkers() {
- ready := sync.WaitGroup{}
- threadsToRestart := make([]*phpThread, 0)
- for _, worker := range workers {
- worker.threadMutex.RLock()
- ready.Add(len(worker.threads))
- for _, thread := range worker.threads {
- if !thread.state.requestSafeStateChange(stateRestarting) {
- // no state change allowed = shutdown
- continue
- }
- close(thread.drainChan)
- threadsToRestart = append(threadsToRestart, thread)
- go func(thread *phpThread) {
- thread.state.waitFor(stateYielding)
- ready.Done()
- }(thread)
- }
- worker.threadMutex.RUnlock()
- }
- ready.Wait()
- for _, thread := range threadsToRestart {
- thread.drainChan = make(chan struct{})
- thread.state.set(stateReady)
- }
- }
- func getDirectoriesToWatch(workerOpts []workerOpt) []string {
- directoriesToWatch := []string{}
- for _, w := range workerOpts {
- directoriesToWatch = append(directoriesToWatch, w.watch...)
- }
- return directoriesToWatch
- }
- func (worker *worker) attachThread(thread *phpThread) {
- worker.threadMutex.Lock()
- worker.threads = append(worker.threads, thread)
- worker.threadMutex.Unlock()
- }
- func (worker *worker) detachThread(thread *phpThread) {
- worker.threadMutex.Lock()
- for i, t := range worker.threads {
- if t == thread {
- worker.threads = append(worker.threads[:i], worker.threads[i+1:]...)
- break
- }
- }
- worker.threadMutex.Unlock()
- }
- func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) {
- metrics.StartWorkerRequest(fc.scriptFilename)
- // dispatch requests to all worker threads in order
- worker.threadMutex.RLock()
- for _, thread := range worker.threads {
- select {
- case thread.requestChan <- r:
- worker.threadMutex.RUnlock()
- <-fc.done
- metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
- return
- default:
- }
- }
- worker.threadMutex.RUnlock()
- // if no thread was available, fan the request out to all threads
- // TODO: theoretically there could be autoscaling of threads here
- worker.requestChan <- r
- <-fc.done
- metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
- }
|