worker.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package frankenphp
  2. // #include "frankenphp.h"
  3. import "C"
  4. import (
  5. "fmt"
  6. "github.com/dunglas/frankenphp/internal/fastabs"
  7. "net/http"
  8. "sync"
  9. "time"
  10. "github.com/dunglas/frankenphp/internal/watcher"
  11. )
  12. // represents a worker script and can have many threads assigned to it
  13. type worker struct {
  14. fileName string
  15. num int
  16. env PreparedEnv
  17. requestChan chan *http.Request
  18. threads []*phpThread
  19. threadMutex sync.RWMutex
  20. }
  21. var (
  22. workers map[string]*worker
  23. watcherIsEnabled bool
  24. )
  25. func initWorkers(opt []workerOpt) error {
  26. workers = make(map[string]*worker, len(opt))
  27. workersReady := sync.WaitGroup{}
  28. directoriesToWatch := getDirectoriesToWatch(opt)
  29. watcherIsEnabled = len(directoriesToWatch) > 0
  30. for _, o := range opt {
  31. worker, err := newWorker(o)
  32. worker.threads = make([]*phpThread, 0, o.num)
  33. workersReady.Add(o.num)
  34. if err != nil {
  35. return err
  36. }
  37. for i := 0; i < worker.num; i++ {
  38. thread := getInactivePHPThread()
  39. convertToWorkerThread(thread, worker)
  40. go func() {
  41. thread.state.waitFor(stateReady)
  42. workersReady.Done()
  43. }()
  44. }
  45. }
  46. workersReady.Wait()
  47. if !watcherIsEnabled {
  48. return nil
  49. }
  50. watcherIsEnabled = true
  51. if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, getLogger()); err != nil {
  52. return err
  53. }
  54. return nil
  55. }
  56. func newWorker(o workerOpt) (*worker, error) {
  57. absFileName, err := fastabs.FastAbs(o.fileName)
  58. if err != nil {
  59. return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
  60. }
  61. if o.env == nil {
  62. o.env = make(PreparedEnv, 1)
  63. }
  64. o.env["FRANKENPHP_WORKER\x00"] = "1"
  65. w := &worker{
  66. fileName: absFileName,
  67. num: o.num,
  68. env: o.env,
  69. requestChan: make(chan *http.Request),
  70. }
  71. workers[absFileName] = w
  72. return w, nil
  73. }
  74. // EXPERIMENTAL: DrainWorkers finishes all worker scripts before a graceful shutdown
  75. func DrainWorkers() {
  76. _ = drainWorkerThreads()
  77. }
  78. func drainWorkerThreads() []*phpThread {
  79. ready := sync.WaitGroup{}
  80. drainedThreads := make([]*phpThread, 0)
  81. for _, worker := range workers {
  82. worker.threadMutex.RLock()
  83. ready.Add(len(worker.threads))
  84. for _, thread := range worker.threads {
  85. if !thread.state.requestSafeStateChange(stateRestarting) {
  86. // no state change allowed == thread is shutting down
  87. // we'll proceed to restart all other threads anyways
  88. continue
  89. }
  90. close(thread.drainChan)
  91. drainedThreads = append(drainedThreads, thread)
  92. go func(thread *phpThread) {
  93. thread.state.waitFor(stateYielding)
  94. ready.Done()
  95. }(thread)
  96. }
  97. worker.threadMutex.RUnlock()
  98. }
  99. ready.Wait()
  100. return drainedThreads
  101. }
  102. func drainWatcher() {
  103. if watcherIsEnabled {
  104. watcher.DrainWatcher()
  105. }
  106. }
  107. // RestartWorkers attempts to restart all workers gracefully
  108. func RestartWorkers() {
  109. // disallow scaling threads while restarting workers
  110. scalingMu.Lock()
  111. defer scalingMu.Unlock()
  112. threadsToRestart := drainWorkerThreads()
  113. for _, thread := range threadsToRestart {
  114. thread.drainChan = make(chan struct{})
  115. thread.state.set(stateReady)
  116. }
  117. }
  118. func getDirectoriesToWatch(workerOpts []workerOpt) []string {
  119. directoriesToWatch := []string{}
  120. for _, w := range workerOpts {
  121. directoriesToWatch = append(directoriesToWatch, w.watch...)
  122. }
  123. return directoriesToWatch
  124. }
  125. func (worker *worker) attachThread(thread *phpThread) {
  126. worker.threadMutex.Lock()
  127. worker.threads = append(worker.threads, thread)
  128. worker.threadMutex.Unlock()
  129. }
  130. func (worker *worker) detachThread(thread *phpThread) {
  131. worker.threadMutex.Lock()
  132. for i, t := range worker.threads {
  133. if t == thread {
  134. worker.threads = append(worker.threads[:i], worker.threads[i+1:]...)
  135. break
  136. }
  137. }
  138. worker.threadMutex.Unlock()
  139. }
  140. func (worker *worker) countThreads() int {
  141. worker.threadMutex.RLock()
  142. l := len(worker.threads)
  143. worker.threadMutex.RUnlock()
  144. return l
  145. }
  146. func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) {
  147. metrics.StartWorkerRequest(fc.scriptFilename)
  148. // dispatch requests to all worker threads in order
  149. worker.threadMutex.RLock()
  150. for _, thread := range worker.threads {
  151. select {
  152. case thread.requestChan <- r:
  153. worker.threadMutex.RUnlock()
  154. <-fc.done
  155. metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
  156. return
  157. default:
  158. // thread is busy, continue
  159. }
  160. }
  161. worker.threadMutex.RUnlock()
  162. // if no thread was available, mark the request as queued and apply the scaling strategy
  163. metrics.QueuedWorkerRequest(fc.scriptFilename)
  164. for {
  165. select {
  166. case worker.requestChan <- r:
  167. metrics.DequeuedWorkerRequest(fc.scriptFilename)
  168. <-fc.done
  169. metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
  170. return
  171. case scaleChan <- fc:
  172. // the request has triggered scaling, continue to wait for a thread
  173. }
  174. }
  175. }