thread-worker.go 6.6 KB


  1. package frankenphp
  2. // #include "frankenphp.h"
  3. import "C"
  4. import (
  5. "net/http"
  6. "path/filepath"
  7. "time"
  8. "go.uber.org/zap"
  9. "go.uber.org/zap/zapcore"
  10. )
  11. // representation of a thread assigned to a worker script
  12. // executes the PHP worker script in a loop
  13. // implements the threadHandler interface
  14. type workerThread struct {
  15. state *threadState
  16. thread *phpThread
  17. worker *worker
  18. fakeRequest *http.Request
  19. workerRequest *http.Request
  20. backoff *exponentialBackoff
  21. }
  22. func convertToWorkerThread(thread *phpThread, worker *worker) {
  23. thread.setHandler(&workerThread{
  24. state: thread.state,
  25. thread: thread,
  26. worker: worker,
  27. backoff: &exponentialBackoff{
  28. maxBackoff: 1 * time.Second,
  29. minBackoff: 100 * time.Millisecond,
  30. maxConsecutiveFailures: 6,
  31. },
  32. })
  33. worker.attachThread(thread)
  34. }
  35. // beforeScriptExecution returns the name of the script or an empty string on shutdown
  36. func (handler *workerThread) beforeScriptExecution() string {
  37. switch handler.state.get() {
  38. case stateTransitionRequested:
  39. handler.worker.detachThread(handler.thread)
  40. return handler.thread.transitionToNewHandler()
  41. case stateRestarting:
  42. handler.state.set(stateYielding)
  43. handler.state.waitFor(stateReady, stateShuttingDown)
  44. return handler.beforeScriptExecution()
  45. case stateReady, stateTransitionComplete:
  46. setupWorkerScript(handler, handler.worker)
  47. return handler.worker.fileName
  48. case stateShuttingDown:
  49. // signal to stop
  50. return ""
  51. }
  52. panic("unexpected state: " + handler.state.name())
  53. }
  54. func (handler *workerThread) afterScriptExecution(exitStatus int) {
  55. tearDownWorkerScript(handler, exitStatus)
  56. }
  57. func (handler *workerThread) getActiveRequest() *http.Request {
  58. if handler.workerRequest != nil {
  59. return handler.workerRequest
  60. }
  61. return handler.fakeRequest
  62. }
  63. func setupWorkerScript(handler *workerThread, worker *worker) {
  64. handler.backoff.wait()
  65. metrics.StartWorker(worker.fileName)
  66. // Create a dummy request to set up the worker
  67. r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
  68. if err != nil {
  69. panic(err)
  70. }
  71. r, err = NewRequestWithContext(
  72. r,
  73. WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
  74. WithRequestPreparedEnv(worker.env),
  75. )
  76. if err != nil {
  77. panic(err)
  78. }
  79. if err := updateServerContext(handler.thread, r, true, false); err != nil {
  80. panic(err)
  81. }
  82. handler.fakeRequest = r
  83. if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
  84. c.Write(zap.String("worker", worker.fileName), zap.Int("thread", handler.thread.threadIndex))
  85. }
  86. }
  87. func tearDownWorkerScript(handler *workerThread, exitStatus int) {
  88. // if the worker request is not nil, the script might have crashed
  89. // make sure to close the worker request context
  90. if handler.workerRequest != nil {
  91. fc := handler.workerRequest.Context().Value(contextKey).(*FrankenPHPContext)
  92. maybeCloseContext(fc)
  93. handler.workerRequest = nil
  94. }
  95. fc := handler.fakeRequest.Context().Value(contextKey).(*FrankenPHPContext)
  96. fc.exitStatus = exitStatus
  97. defer func() {
  98. handler.fakeRequest = nil
  99. }()
  100. // on exit status 0 we just run the worker script again
  101. worker := handler.worker
  102. if fc.exitStatus == 0 {
  103. // TODO: make the max restart configurable
  104. metrics.StopWorker(worker.fileName, StopReasonRestart)
  105. handler.backoff.recordSuccess()
  106. if c := logger.Check(zapcore.DebugLevel, "restarting"); c != nil {
  107. c.Write(zap.String("worker", worker.fileName))
  108. }
  109. return
  110. }
  111. // TODO: error status
  112. // on exit status 1 we apply an exponential backoff when restarting
  113. metrics.StopWorker(worker.fileName, StopReasonCrash)
  114. if handler.backoff.recordFailure() {
  115. if !watcherIsEnabled {
  116. logger.Panic("too many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
  117. }
  118. logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
  119. }
  120. }
  121. func (handler *workerThread) waitForWorkerRequest() bool {
  122. // unpin any memory left over from previous requests
  123. handler.thread.Unpin()
  124. if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
  125. c.Write(zap.String("worker", handler.worker.fileName))
  126. }
  127. if handler.state.compareAndSwap(stateTransitionComplete, stateReady) {
  128. metrics.ReadyWorker(handler.worker.fileName)
  129. }
  130. var r *http.Request
  131. select {
  132. case <-handler.thread.drainChan:
  133. if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
  134. c.Write(zap.String("worker", handler.worker.fileName))
  135. }
  136. // execute opcache_reset if the restart was triggered by the watcher
  137. if watcherIsEnabled && handler.state.is(stateRestarting) {
  138. C.frankenphp_reset_opcache()
  139. }
  140. return false
  141. case r = <-handler.thread.requestChan:
  142. case r = <-handler.worker.requestChan:
  143. }
  144. handler.workerRequest = r
  145. if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
  146. c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI))
  147. }
  148. if err := updateServerContext(handler.thread, r, false, true); err != nil {
  149. // Unexpected error or invalid request
  150. if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
  151. c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
  152. }
  153. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  154. rejectRequest(fc.responseWriter, err.Error())
  155. maybeCloseContext(fc)
  156. handler.workerRequest = nil
  157. return handler.waitForWorkerRequest()
  158. }
  159. return true
  160. }
  161. //export go_frankenphp_worker_handle_request_start
  162. func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
  163. handler := phpThreads[threadIndex].handler.(*workerThread)
  164. return C.bool(handler.waitForWorkerRequest())
  165. }
  166. //export go_frankenphp_finish_worker_request
  167. func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) {
  168. thread := phpThreads[threadIndex]
  169. r := thread.getActiveRequest()
  170. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  171. maybeCloseContext(fc)
  172. thread.handler.(*workerThread).workerRequest = nil
  173. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  174. c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
  175. }
  176. }
  177. // when frankenphp_finish_request() is directly called from PHP
  178. //
  179. //export go_frankenphp_finish_php_request
  180. func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) {
  181. r := phpThreads[threadIndex].getActiveRequest()
  182. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  183. maybeCloseContext(fc)
  184. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  185. c.Write(zap.String("url", r.RequestURI))
  186. }
  187. }