worker.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package frankenphp
  2. // #include "frankenphp.h"
  3. import "C"
  4. import (
  5. "fmt"
  6. "github.com/dunglas/frankenphp/internal/fastabs"
  7. "net/http"
  8. "sync"
  9. "time"
  10. "github.com/dunglas/frankenphp/internal/watcher"
  11. )
  12. // represents a worker script and can have many threads assigned to it
  13. type worker struct {
  14. fileName string
  15. num int
  16. env PreparedEnv
  17. requestChan chan *http.Request
  18. threads []*phpThread
  19. threadMutex sync.RWMutex
  20. }
  21. var (
  22. workers map[string]*worker
  23. watcherIsEnabled bool
  24. )
  25. func initWorkers(opt []workerOpt) error {
  26. workers = make(map[string]*worker, len(opt))
  27. workersReady := sync.WaitGroup{}
  28. directoriesToWatch := getDirectoriesToWatch(opt)
  29. watcherIsEnabled = len(directoriesToWatch) > 0
  30. for _, o := range opt {
  31. worker, err := newWorker(o)
  32. worker.threads = make([]*phpThread, 0, o.num)
  33. workersReady.Add(o.num)
  34. if err != nil {
  35. return err
  36. }
  37. for i := 0; i < worker.num; i++ {
  38. thread := getInactivePHPThread()
  39. convertToWorkerThread(thread, worker)
  40. go func() {
  41. thread.state.waitFor(stateReady)
  42. workersReady.Done()
  43. }()
  44. }
  45. }
  46. workersReady.Wait()
  47. if !watcherIsEnabled {
  48. return nil
  49. }
  50. watcherIsEnabled = true
  51. if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, getLogger()); err != nil {
  52. return err
  53. }
  54. return nil
  55. }
  56. func newWorker(o workerOpt) (*worker, error) {
  57. absFileName, err := fastabs.FastAbs(o.fileName)
  58. if err != nil {
  59. return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
  60. }
  61. if o.env == nil {
  62. o.env = make(PreparedEnv, 1)
  63. }
  64. o.env["FRANKENPHP_WORKER\x00"] = "1"
  65. w := &worker{
  66. fileName: absFileName,
  67. num: o.num,
  68. env: o.env,
  69. requestChan: make(chan *http.Request),
  70. }
  71. workers[absFileName] = w
  72. return w, nil
  73. }
  74. func drainWorkers() {
  75. watcher.DrainWatcher()
  76. }
  77. // RestartWorkers attempts to restart all workers gracefully
  78. func RestartWorkers() {
  79. // disallow scaling threads while restarting workers
  80. scalingMu.Lock()
  81. defer scalingMu.Unlock()
  82. ready := sync.WaitGroup{}
  83. threadsToRestart := make([]*phpThread, 0)
  84. for _, worker := range workers {
  85. worker.threadMutex.RLock()
  86. ready.Add(len(worker.threads))
  87. for _, thread := range worker.threads {
  88. if !thread.state.requestSafeStateChange(stateRestarting) {
  89. // no state change allowed == thread is shutting down
  90. // we'll proceed to restart all other threads anyways
  91. continue
  92. }
  93. close(thread.drainChan)
  94. threadsToRestart = append(threadsToRestart, thread)
  95. go func(thread *phpThread) {
  96. thread.state.waitFor(stateYielding)
  97. ready.Done()
  98. }(thread)
  99. }
  100. worker.threadMutex.RUnlock()
  101. }
  102. ready.Wait()
  103. // the first thread should reset the opcache
  104. if len(threadsToRestart) > 0 {
  105. threadsToRestart[0].state.set(stateOpcacheReset)
  106. threadsToRestart[0].state.waitFor(stateYielding)
  107. }
  108. for _, thread := range threadsToRestart {
  109. thread.drainChan = make(chan struct{})
  110. thread.state.set(stateReady)
  111. }
  112. }
  113. func getDirectoriesToWatch(workerOpts []workerOpt) []string {
  114. directoriesToWatch := []string{}
  115. for _, w := range workerOpts {
  116. directoriesToWatch = append(directoriesToWatch, w.watch...)
  117. }
  118. return directoriesToWatch
  119. }
  120. func (worker *worker) attachThread(thread *phpThread) {
  121. worker.threadMutex.Lock()
  122. worker.threads = append(worker.threads, thread)
  123. worker.threadMutex.Unlock()
  124. }
  125. func (worker *worker) detachThread(thread *phpThread) {
  126. worker.threadMutex.Lock()
  127. for i, t := range worker.threads {
  128. if t == thread {
  129. worker.threads = append(worker.threads[:i], worker.threads[i+1:]...)
  130. break
  131. }
  132. }
  133. worker.threadMutex.Unlock()
  134. }
  135. func (worker *worker) countThreads() int {
  136. worker.threadMutex.RLock()
  137. l := len(worker.threads)
  138. worker.threadMutex.RUnlock()
  139. return l
  140. }
  141. func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) {
  142. metrics.StartWorkerRequest(fc.scriptFilename)
  143. // dispatch requests to all worker threads in order
  144. worker.threadMutex.RLock()
  145. for _, thread := range worker.threads {
  146. select {
  147. case thread.requestChan <- r:
  148. worker.threadMutex.RUnlock()
  149. <-fc.done
  150. metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
  151. return
  152. default:
  153. // thread is busy, continue
  154. }
  155. }
  156. worker.threadMutex.RUnlock()
  157. // if no thread was available, mark the request as queued and apply the scaling strategy
  158. metrics.QueuedWorkerRequest(fc.scriptFilename)
  159. for {
  160. select {
  161. case worker.requestChan <- r:
  162. metrics.DequeuedWorkerRequest(fc.scriptFilename)
  163. <-fc.done
  164. metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
  165. return
  166. case scaleChan <- fc:
  167. // the request has triggered scaling, continue to wait for a thread
  168. }
  169. }
  170. }