thread-worker.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package frankenphp
  2. // #include "frankenphp.h"
  3. import "C"
  4. import (
  5. "net/http"
  6. "path/filepath"
  7. "time"
  8. "go.uber.org/zap"
  9. "go.uber.org/zap/zapcore"
  10. )
  11. // representation of a thread assigned to a worker script
  12. // executes the PHP worker script in a loop
  13. // implements the threadHandler interface
  14. type workerThread struct {
  15. state *threadState
  16. thread *phpThread
  17. worker *worker
  18. fakeRequest *http.Request
  19. workerRequest *http.Request
  20. backoff *exponentialBackoff
  21. inRequest bool // true if the worker is currently handling a request
  22. }
  23. func convertToWorkerThread(thread *phpThread, worker *worker) {
  24. thread.setHandler(&workerThread{
  25. state: thread.state,
  26. thread: thread,
  27. worker: worker,
  28. backoff: &exponentialBackoff{
  29. maxBackoff: 1 * time.Second,
  30. minBackoff: 100 * time.Millisecond,
  31. maxConsecutiveFailures: 6,
  32. },
  33. })
  34. worker.attachThread(thread)
  35. }
  36. // beforeScriptExecution returns the name of the script or an empty string on shutdown
  37. func (handler *workerThread) beforeScriptExecution() string {
  38. switch handler.state.get() {
  39. case stateTransitionRequested:
  40. handler.worker.detachThread(handler.thread)
  41. return handler.thread.transitionToNewHandler()
  42. case stateRestarting:
  43. return handler.restartGracefully()
  44. case stateReady, stateTransitionComplete:
  45. setupWorkerScript(handler, handler.worker)
  46. return handler.worker.fileName
  47. case stateShuttingDown:
  48. handler.worker.detachThread(handler.thread)
  49. // signal to stop
  50. return ""
  51. }
  52. panic("unexpected state: " + handler.state.name())
  53. }
  54. func (handler *workerThread) afterScriptExecution(exitStatus int) {
  55. tearDownWorkerScript(handler, exitStatus)
  56. }
  57. func (handler *workerThread) getActiveRequest() *http.Request {
  58. if handler.workerRequest != nil {
  59. return handler.workerRequest
  60. }
  61. return handler.fakeRequest
  62. }
  63. func (handler *workerThread) name() string {
  64. return "Worker PHP Thread - " + handler.worker.fileName
  65. }
  66. func setupWorkerScript(handler *workerThread, worker *worker) {
  67. handler.backoff.wait()
  68. metrics.StartWorker(worker.fileName)
  69. // Create a dummy request to set up the worker
  70. r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
  71. if err != nil {
  72. panic(err)
  73. }
  74. r, err = NewRequestWithContext(
  75. r,
  76. WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
  77. WithRequestPreparedEnv(worker.env),
  78. )
  79. if err != nil {
  80. panic(err)
  81. }
  82. if err := updateServerContext(handler.thread, r, true, false); err != nil {
  83. panic(err)
  84. }
  85. handler.setFakeRequest(r)
  86. if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
  87. c.Write(zap.String("worker", worker.fileName), zap.Int("thread", handler.thread.threadIndex))
  88. }
  89. }
  90. func tearDownWorkerScript(handler *workerThread, exitStatus int) {
  91. // if the worker request is not nil, the script might have crashed
  92. // make sure to close the worker request context
  93. if handler.workerRequest != nil {
  94. fc := handler.workerRequest.Context().Value(contextKey).(*FrankenPHPContext)
  95. maybeCloseContext(fc)
  96. handler.setWorkerRequest(nil)
  97. }
  98. fc := handler.fakeRequest.Context().Value(contextKey).(*FrankenPHPContext)
  99. fc.exitStatus = exitStatus
  100. handler.setFakeRequest(nil)
  101. // on exit status 0 we just run the worker script again
  102. worker := handler.worker
  103. if fc.exitStatus == 0 {
  104. // TODO: make the max restart configurable
  105. metrics.StopWorker(worker.fileName, StopReasonRestart)
  106. handler.backoff.recordSuccess()
  107. if c := logger.Check(zapcore.DebugLevel, "restarting"); c != nil {
  108. c.Write(zap.String("worker", worker.fileName))
  109. }
  110. return
  111. }
  112. // TODO: error status
  113. // on exit status 1 we apply an exponential backoff when restarting
  114. metrics.StopWorker(worker.fileName, StopReasonCrash)
  115. if !handler.inRequest && handler.backoff.recordFailure() {
  116. if !watcherIsEnabled {
  117. logger.Panic("too many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
  118. }
  119. logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
  120. }
  121. }
  122. // waitForWorkerRequest is called during frankenphp_handle_request in the php worker script.
  123. func (handler *workerThread) waitForWorkerRequest() bool {
  124. // unpin any memory left over from previous requests
  125. handler.thread.Unpin()
  126. handler.state.markAsWaiting(true)
  127. if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
  128. c.Write(zap.String("worker", handler.worker.fileName))
  129. }
  130. if handler.state.compareAndSwap(stateTransitionComplete, stateReady) {
  131. metrics.ReadyWorker(handler.worker.fileName)
  132. }
  133. var r *http.Request
  134. select {
  135. case <-handler.thread.drainChan:
  136. if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
  137. c.Write(zap.String("worker", handler.worker.fileName))
  138. }
  139. return false
  140. case r = <-handler.thread.requestChan:
  141. case r = <-handler.worker.requestChan:
  142. }
  143. handler.setWorkerRequest(r)
  144. handler.state.markAsWaiting(false)
  145. if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
  146. c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI))
  147. }
  148. handler.inRequest = true
  149. if err := updateServerContext(handler.thread, r, false, true); err != nil {
  150. // Unexpected error or invalid request
  151. if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
  152. c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
  153. }
  154. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  155. rejectRequest(fc.responseWriter, err.Error())
  156. maybeCloseContext(fc)
  157. handler.workerRequest = nil
  158. return handler.waitForWorkerRequest()
  159. }
  160. return true
  161. }
  162. func (handler *workerThread) setWorkerRequest(r *http.Request) {
  163. handler.thread.requestMu.Lock()
  164. handler.workerRequest = r
  165. handler.thread.requestMu.Unlock()
  166. }
  167. func (handler *workerThread) setFakeRequest(r *http.Request) {
  168. handler.thread.requestMu.Lock()
  169. handler.fakeRequest = r
  170. handler.thread.requestMu.Unlock()
  171. }
  172. // When restarting gracefully, all threads wait for each other to finish
  173. // opcache_reset will be called once all threads are yielding
  174. func (handler *workerThread) restartGracefully() string {
  175. handler.state.set(stateYielding)
  176. handler.state.waitFor(stateReady, stateShuttingDown, stateOpcacheReset)
  177. // one thread will be marked to flush the opcache
  178. // this will avoid a race condition in opcache under high concurrency
  179. if handler.state.is(stateOpcacheReset) {
  180. C.frankenphp_reset_opcache()
  181. logger.Debug("opcache reset", zap.Int("threadIndex", handler.thread.threadIndex))
  182. handler.state.set(stateYielding)
  183. handler.state.waitFor(stateReady, stateShuttingDown)
  184. }
  185. return handler.beforeScriptExecution()
  186. }
  187. // go_frankenphp_worker_handle_request_start is called at the start of every php request served.
  188. //
  189. //export go_frankenphp_worker_handle_request_start
  190. func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
  191. handler := phpThreads[threadIndex].handler.(*workerThread)
  192. return C.bool(handler.waitForWorkerRequest())
  193. }
  194. // go_frankenphp_finish_worker_request is called at the end of every php request served.
  195. //
  196. //export go_frankenphp_finish_worker_request
  197. func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) {
  198. thread := phpThreads[threadIndex]
  199. r := thread.getActiveRequest()
  200. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  201. maybeCloseContext(fc)
  202. thread.handler.(*workerThread).workerRequest = nil
  203. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  204. c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
  205. }
  206. }
  207. // when frankenphp_finish_request() is directly called from PHP
  208. //
  209. //export go_frankenphp_finish_php_request
  210. func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) {
  211. r := phpThreads[threadIndex].getActiveRequest()
  212. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  213. maybeCloseContext(fc)
  214. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  215. c.Write(zap.String("url", r.RequestURI))
  216. }
  217. }