worker.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package frankenphp
  2. // #include "frankenphp.h"
  3. import "C"
  4. import (
  5. "fmt"
  6. "github.com/dunglas/frankenphp/internal/fastabs"
  7. "net/http"
  8. "sync"
  9. "time"
  10. "github.com/dunglas/frankenphp/internal/watcher"
  11. )
  12. // represents a worker script and can have many threads assigned to it
  13. type worker struct {
  14. fileName string
  15. num int
  16. env PreparedEnv
  17. requestChan chan *http.Request
  18. threads []*phpThread
  19. threadMutex sync.RWMutex
  20. }
  21. var (
  22. workers map[string]*worker
  23. watcherIsEnabled bool
  24. )
  25. func initWorkers(opt []workerOpt) error {
  26. workers = make(map[string]*worker, len(opt))
  27. workersReady := sync.WaitGroup{}
  28. directoriesToWatch := getDirectoriesToWatch(opt)
  29. watcherIsEnabled = len(directoriesToWatch) > 0
  30. for _, o := range opt {
  31. worker, err := newWorker(o)
  32. worker.threads = make([]*phpThread, 0, o.num)
  33. workersReady.Add(o.num)
  34. if err != nil {
  35. return err
  36. }
  37. for i := 0; i < worker.num; i++ {
  38. thread := getInactivePHPThread()
  39. convertToWorkerThread(thread, worker)
  40. go func() {
  41. thread.state.waitFor(stateReady)
  42. workersReady.Done()
  43. }()
  44. }
  45. }
  46. workersReady.Wait()
  47. if !watcherIsEnabled {
  48. return nil
  49. }
  50. if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
  51. return err
  52. }
  53. return nil
  54. }
  55. func newWorker(o workerOpt) (*worker, error) {
  56. absFileName, err := fastabs.FastAbs(o.fileName)
  57. if err != nil {
  58. return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
  59. }
  60. if o.env == nil {
  61. o.env = make(PreparedEnv, 1)
  62. }
  63. o.env["FRANKENPHP_WORKER\x00"] = "1"
  64. w := &worker{
  65. fileName: absFileName,
  66. num: o.num,
  67. env: o.env,
  68. requestChan: make(chan *http.Request),
  69. }
  70. workers[absFileName] = w
  71. return w, nil
  72. }
  73. func drainWorkers() {
  74. watcher.DrainWatcher()
  75. }
  76. func restartWorkers() {
  77. ready := sync.WaitGroup{}
  78. threadsToRestart := make([]*phpThread, 0)
  79. for _, worker := range workers {
  80. worker.threadMutex.RLock()
  81. ready.Add(len(worker.threads))
  82. for _, thread := range worker.threads {
  83. if !thread.state.requestSafeStateChange(stateRestarting) {
  84. // no state change allowed = shutdown
  85. continue
  86. }
  87. close(thread.drainChan)
  88. threadsToRestart = append(threadsToRestart, thread)
  89. go func(thread *phpThread) {
  90. thread.state.waitFor(stateYielding)
  91. ready.Done()
  92. }(thread)
  93. }
  94. worker.threadMutex.RUnlock()
  95. }
  96. ready.Wait()
  97. for _, thread := range threadsToRestart {
  98. thread.drainChan = make(chan struct{})
  99. thread.state.set(stateReady)
  100. }
  101. }
  102. func getDirectoriesToWatch(workerOpts []workerOpt) []string {
  103. directoriesToWatch := []string{}
  104. for _, w := range workerOpts {
  105. directoriesToWatch = append(directoriesToWatch, w.watch...)
  106. }
  107. return directoriesToWatch
  108. }
  109. func (worker *worker) attachThread(thread *phpThread) {
  110. worker.threadMutex.Lock()
  111. worker.threads = append(worker.threads, thread)
  112. worker.threadMutex.Unlock()
  113. }
  114. func (worker *worker) detachThread(thread *phpThread) {
  115. worker.threadMutex.Lock()
  116. for i, t := range worker.threads {
  117. if t == thread {
  118. worker.threads = append(worker.threads[:i], worker.threads[i+1:]...)
  119. break
  120. }
  121. }
  122. worker.threadMutex.Unlock()
  123. }
  124. func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) {
  125. metrics.StartWorkerRequest(fc.scriptFilename)
  126. // dispatch requests to all worker threads in order
  127. worker.threadMutex.RLock()
  128. for _, thread := range worker.threads {
  129. select {
  130. case thread.requestChan <- r:
  131. worker.threadMutex.RUnlock()
  132. <-fc.done
  133. metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
  134. return
  135. default:
  136. }
  137. }
  138. worker.threadMutex.RUnlock()
  139. // if no thread was available, fan the request out to all threads
  140. // TODO: theoretically there could be autoscaling of threads here
  141. worker.requestChan <- r
  142. <-fc.done
  143. metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
  144. }