worker.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. package frankenphp
  2. // #include "frankenphp.h"
  3. import "C"
  4. import (
  5. "fmt"
  6. "github.com/dunglas/frankenphp/internal/fastabs"
  7. "net/http"
  8. "path/filepath"
  9. "sync"
  10. "time"
  11. "github.com/dunglas/frankenphp/internal/watcher"
  12. "go.uber.org/zap"
  13. "go.uber.org/zap/zapcore"
  14. )
  15. type worker struct {
  16. fileName string
  17. num int
  18. env PreparedEnv
  19. requestChan chan *http.Request
  20. threads []*phpThread
  21. threadMutex sync.RWMutex
  22. }
  23. var (
  24. workers map[string]*worker
  25. workersDone chan interface{}
  26. watcherIsEnabled bool
  27. )
  28. func initWorkers(opt []workerOpt) error {
  29. workers = make(map[string]*worker, len(opt))
  30. workersDone = make(chan interface{})
  31. directoriesToWatch := getDirectoriesToWatch(opt)
  32. watcherIsEnabled = len(directoriesToWatch) > 0
  33. for _, o := range opt {
  34. worker, err := newWorker(o)
  35. worker.threads = make([]*phpThread, 0, o.num)
  36. if err != nil {
  37. return err
  38. }
  39. for i := 0; i < worker.num; i++ {
  40. worker.startNewThread()
  41. }
  42. }
  43. if len(directoriesToWatch) == 0 {
  44. return nil
  45. }
  46. if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
  47. return err
  48. }
  49. return nil
  50. }
  51. func newWorker(o workerOpt) (*worker, error) {
  52. absFileName, err := fastabs.FastAbs(o.fileName)
  53. if err != nil {
  54. return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
  55. }
  56. if o.env == nil {
  57. o.env = make(PreparedEnv, 1)
  58. }
  59. o.env["FRANKENPHP_WORKER\x00"] = "1"
  60. w := &worker{fileName: absFileName, num: o.num, env: o.env, requestChan: make(chan *http.Request)}
  61. workers[absFileName] = w
  62. return w, nil
  63. }
  64. func stopWorkers() {
  65. close(workersDone)
  66. }
  67. func drainWorkers() {
  68. watcher.DrainWatcher()
  69. stopWorkers()
  70. }
  71. func restartWorkers() {
  72. restart := sync.WaitGroup{}
  73. restart.Add(1)
  74. ready := sync.WaitGroup{}
  75. for _, worker := range workers {
  76. worker.threadMutex.RLock()
  77. ready.Add(len(worker.threads))
  78. for _, thread := range worker.threads {
  79. thread.state.set(stateRestarting)
  80. go func(thread *phpThread) {
  81. thread.state.waitForAndYield(&restart, stateReady)
  82. ready.Done()
  83. }(thread)
  84. }
  85. worker.threadMutex.RUnlock()
  86. }
  87. stopWorkers()
  88. ready.Wait()
  89. workersDone = make(chan interface{})
  90. restart.Done()
  91. }
  92. func getDirectoriesToWatch(workerOpts []workerOpt) []string {
  93. directoriesToWatch := []string{}
  94. for _, w := range workerOpts {
  95. directoriesToWatch = append(directoriesToWatch, w.watch...)
  96. }
  97. return directoriesToWatch
  98. }
  99. func (worker *worker) startNewThread() {
  100. getInactivePHPThread().setActive(
  101. // onStartup => right before the thread is ready
  102. func(thread *phpThread) {
  103. thread.worker = worker
  104. thread.scriptName = worker.fileName
  105. thread.requestChan = make(chan *http.Request)
  106. thread.backoff = newExponentialBackoff()
  107. worker.threadMutex.Lock()
  108. worker.threads = append(worker.threads, thread)
  109. worker.threadMutex.Unlock()
  110. metrics.ReadyWorker(worker.fileName)
  111. },
  112. // beforeScriptExecution => set up the worker with a fake request
  113. func(thread *phpThread) {
  114. worker.beforeScript(thread)
  115. },
  116. // afterScriptExecution => tear down the worker
  117. func(thread *phpThread, exitStatus int) {
  118. worker.afterScript(thread, exitStatus)
  119. },
  120. // onShutdown => after the thread is done
  121. func(thread *phpThread) {
  122. thread.worker = nil
  123. thread.backoff = nil
  124. },
  125. )
  126. }
  127. func (worker *worker) beforeScript(thread *phpThread) {
  128. // if we are restarting due to file watching, set the state back to ready
  129. if thread.state.is(stateRestarting) {
  130. thread.state.set(stateReady)
  131. }
  132. thread.backoff.reset()
  133. metrics.StartWorker(worker.fileName)
  134. // Create a dummy request to set up the worker
  135. r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
  136. if err != nil {
  137. panic(err)
  138. }
  139. r, err = NewRequestWithContext(
  140. r,
  141. WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
  142. WithRequestPreparedEnv(worker.env),
  143. )
  144. if err != nil {
  145. panic(err)
  146. }
  147. if err := updateServerContext(thread, r, true, false); err != nil {
  148. panic(err)
  149. }
  150. thread.mainRequest = r
  151. if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
  152. c.Write(zap.String("worker", worker.fileName), zap.Int("thread", thread.threadIndex))
  153. }
  154. }
  155. func (worker *worker) afterScript(thread *phpThread, exitStatus int) {
  156. fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
  157. fc.exitStatus = exitStatus
  158. defer func() {
  159. maybeCloseContext(fc)
  160. thread.mainRequest = nil
  161. }()
  162. // on exit status 0 we just run the worker script again
  163. if fc.exitStatus == 0 {
  164. // TODO: make the max restart configurable
  165. metrics.StopWorker(worker.fileName, StopReasonRestart)
  166. if c := logger.Check(zapcore.DebugLevel, "restarting"); c != nil {
  167. c.Write(zap.String("worker", worker.fileName))
  168. }
  169. return
  170. }
  171. // on exit status 1 we apply an exponential backoff when restarting
  172. metrics.StopWorker(worker.fileName, StopReasonCrash)
  173. thread.backoff.trigger(func(failureCount int) {
  174. // if we end up here, the worker has not been up for backoff*2
  175. // this is probably due to a syntax error or another fatal error
  176. if !watcherIsEnabled {
  177. panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
  178. }
  179. logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", failureCount))
  180. })
  181. }
  182. func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) {
  183. metrics.StartWorkerRequest(fc.scriptFilename)
  184. // dispatch requests to all worker threads in order
  185. worker.threadMutex.RLock()
  186. for _, thread := range worker.threads {
  187. select {
  188. case thread.requestChan <- r:
  189. worker.threadMutex.RUnlock()
  190. <-fc.done
  191. metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
  192. return
  193. default:
  194. }
  195. }
  196. worker.threadMutex.RUnlock()
  197. // if no thread was available, fan the request out to all threads
  198. // TODO: theoretically there could be autoscaling of threads here
  199. worker.requestChan <- r
  200. <-fc.done
  201. metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt))
  202. }
  203. //export go_frankenphp_worker_handle_request_start
  204. func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
  205. thread := phpThreads[threadIndex]
  206. if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
  207. c.Write(zap.String("worker", thread.worker.fileName))
  208. }
  209. var r *http.Request
  210. select {
  211. case <-workersDone:
  212. if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
  213. c.Write(zap.String("worker", thread.worker.fileName))
  214. }
  215. // execute opcache_reset if the restart was triggered by the watcher
  216. if watcherIsEnabled && thread.state.is(stateRestarting) {
  217. C.frankenphp_reset_opcache()
  218. }
  219. return C.bool(false)
  220. case r = <-thread.requestChan:
  221. case r = <-thread.worker.requestChan:
  222. }
  223. thread.workerRequest = r
  224. if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
  225. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI))
  226. }
  227. if err := updateServerContext(thread, r, false, true); err != nil {
  228. // Unexpected error
  229. if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
  230. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
  231. }
  232. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  233. rejectRequest(fc.responseWriter, err.Error())
  234. maybeCloseContext(fc)
  235. thread.workerRequest = nil
  236. thread.Unpin()
  237. return go_frankenphp_worker_handle_request_start(threadIndex)
  238. }
  239. return C.bool(true)
  240. }
  241. //export go_frankenphp_finish_worker_request
  242. func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) {
  243. thread := phpThreads[threadIndex]
  244. r := thread.getActiveRequest()
  245. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  246. maybeCloseContext(fc)
  247. thread.workerRequest = nil
  248. thread.Unpin()
  249. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  250. c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
  251. }
  252. }
  253. // when frankenphp_finish_request() is directly called from PHP
  254. //
  255. //export go_frankenphp_finish_php_request
  256. func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) {
  257. r := phpThreads[threadIndex].getActiveRequest()
  258. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  259. maybeCloseContext(fc)
  260. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  261. c.Write(zap.String("url", r.RequestURI))
  262. }
  263. }