worker.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package frankenphp
  2. // #include "frankenphp.h"
  3. import "C"
  4. import (
  5. "fmt"
  6. "github.com/dunglas/frankenphp/internal/fastabs"
  7. "net/http"
  8. "sync"
  9. "time"
  10. "github.com/dunglas/frankenphp/internal/watcher"
  11. )
  12. // represents a worker script and can have many threads assigned to it
  13. type worker struct {
  14. fileName string
  15. num int
  16. env PreparedEnv
  17. requestChan chan *http.Request
  18. threads []*phpThread
  19. threadMutex sync.RWMutex
  20. }
  21. var (
  22. workers map[string]*worker
  23. watcherIsEnabled bool
  24. )
  25. func initWorkers(opt []workerOpt) error {
  26. workers = make(map[string]*worker, len(opt))
  27. workersReady := sync.WaitGroup{}
  28. directoriesToWatch := getDirectoriesToWatch(opt)
  29. watcherIsEnabled = len(directoriesToWatch) > 0
  30. for _, o := range opt {
  31. worker, err := newWorker(o)
  32. worker.threads = make([]*phpThread, 0, o.num)
  33. workersReady.Add(o.num)
  34. if err != nil {
  35. return err
  36. }
  37. for i := 0; i < worker.num; i++ {
  38. thread := getInactivePHPThread()
  39. convertToWorkerThread(thread, worker)
  40. go func() {
  41. thread.state.waitFor(stateReady)
  42. workersReady.Done()
  43. }()
  44. }
  45. }
  46. workersReady.Wait()
  47. if !watcherIsEnabled {
  48. return nil
  49. }
  50. watcherIsEnabled = true
  51. if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, getLogger()); err != nil {
  52. return err
  53. }
  54. return nil
  55. }
  56. func newWorker(o workerOpt) (*worker, error) {
  57. absFileName, err := fastabs.FastAbs(o.fileName)
  58. if err != nil {
  59. return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
  60. }
  61. if o.env == nil {
  62. o.env = make(PreparedEnv, 1)
  63. }
  64. o.env["FRANKENPHP_WORKER\x00"] = "1"
  65. w := &worker{
  66. fileName: absFileName,
  67. num: o.num,
  68. env: o.env,
  69. requestChan: make(chan *http.Request),
  70. }
  71. workers[absFileName] = w
  72. return w, nil
  73. }
  74. func drainWorkers() {
  75. watcher.DrainWatcher()
  76. }
  77. // RestartWorkers attempts to restart all workers gracefully
  78. func RestartWorkers() {
  79. // disallow scaling threads while restarting workers
  80. scalingMu.Lock()
  81. defer scalingMu.Unlock()
  82. ready := sync.WaitGroup{}
  83. threadsToRestart := make([]*phpThread, 0)
  84. for _, worker := range workers {
  85. worker.threadMutex.RLock()
  86. ready.Add(len(worker.threads))
  87. for _, thread := range worker.threads {
  88. if !thread.state.requestSafeStateChange(stateRestarting) {
  89. // no state change allowed == thread is shutting down
  90. // we'll proceed to restart all other threads anyways
  91. continue
  92. }
  93. close(thread.drainChan)
  94. threadsToRestart = append(threadsToRestart, thread)
  95. go func(thread *phpThread) {
  96. thread.state.waitFor(stateYielding)
  97. ready.Done()
  98. }(thread)
  99. }
  100. worker.threadMutex.RUnlock()
  101. }
  102. ready.Wait()
  103. for _, thread := range threadsToRestart {
  104. thread.drainChan = make(chan struct{})
  105. thread.state.set(stateReady)
  106. }
  107. }
  108. func getDirectoriesToWatch(workerOpts []workerOpt) []string {
  109. directoriesToWatch := []string{}
  110. for _, w := range workerOpts {
  111. directoriesToWatch = append(directoriesToWatch, w.watch...)
  112. }
  113. return directoriesToWatch
  114. }
  115. func (worker *worker) attachThread(thread *phpThread) {
  116. worker.threadMutex.Lock()
  117. worker.threads = append(worker.threads, thread)
  118. worker.threadMutex.Unlock()
  119. }
  120. func (worker *worker) detachThread(thread *phpThread) {
  121. worker.threadMutex.Lock()
  122. for i, t := range worker.threads {
  123. if t == thread {
  124. worker.threads = append(worker.threads[:i], worker.threads[i+1:]...)
  125. break
  126. }
  127. }
  128. worker.threadMutex.Unlock()
  129. }
  130. func (worker *worker) countThreads() int {
  131. worker.threadMutex.RLock()
  132. l := len(worker.threads)
  133. worker.threadMutex.RUnlock()
  134. return l
  135. }
  136. func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) {
  137. metrics.StartWorkerRequest(fc.scriptFilename)
  138. // dispatch requests to all worker threads in order
  139. worker.threadMutex.RLock()
  140. for _, thread := range worker.threads {
  141. select {
  142. case thread.requestChan <- r:
  143. worker.threadMutex.RUnlock()
  144. <-fc.done
  145. metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
  146. return
  147. default:
  148. // thread is busy, continue
  149. }
  150. }
  151. worker.threadMutex.RUnlock()
  152. // if no thread was available, mark the request as queued and apply the scaling strategy
  153. metrics.QueuedWorkerRequest(fc.scriptFilename)
  154. for {
  155. select {
  156. case worker.requestChan <- r:
  157. metrics.DequeuedWorkerRequest(fc.scriptFilename)
  158. <-fc.done
  159. metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
  160. return
  161. case scaleChan <- fc:
  162. // the request has triggered scaling, continue to wait for a thread
  163. }
  164. }
  165. }