thread-worker.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package frankenphp
  2. // #include "frankenphp.h"
  3. import "C"
  4. import (
  5. "net/http"
  6. "path/filepath"
  7. "time"
  8. "go.uber.org/zap"
  9. "go.uber.org/zap/zapcore"
  10. )
  11. // representation of a thread assigned to a worker script
  12. // executes the PHP worker script in a loop
  13. // implements the threadHandler interface
  14. type workerThread struct {
  15. state *threadState
  16. thread *phpThread
  17. worker *worker
  18. fakeRequest *http.Request
  19. workerRequest *http.Request
  20. backoff *exponentialBackoff
  21. }
  22. func convertToWorkerThread(thread *phpThread, worker *worker) {
  23. thread.setHandler(&workerThread{
  24. state: thread.state,
  25. thread: thread,
  26. worker: worker,
  27. backoff: &exponentialBackoff{
  28. maxBackoff: 1 * time.Second,
  29. minBackoff: 100 * time.Millisecond,
  30. maxConsecutiveFailures: 6,
  31. },
  32. })
  33. worker.attachThread(thread)
  34. }
  35. // beforeScriptExecution returns the name of the script or an empty string on shutdown
  36. func (handler *workerThread) beforeScriptExecution() string {
  37. switch handler.state.get() {
  38. case stateTransitionRequested:
  39. handler.worker.detachThread(handler.thread)
  40. return handler.thread.transitionToNewHandler()
  41. case stateRestarting:
  42. handler.state.set(stateYielding)
  43. handler.state.waitFor(stateReady, stateShuttingDown)
  44. return handler.beforeScriptExecution()
  45. case stateReady, stateTransitionComplete:
  46. setupWorkerScript(handler, handler.worker)
  47. return handler.worker.fileName
  48. case stateShuttingDown:
  49. handler.worker.detachThread(handler.thread)
  50. // signal to stop
  51. return ""
  52. }
  53. panic("unexpected state: " + handler.state.name())
  54. }
  55. func (handler *workerThread) afterScriptExecution(exitStatus int) {
  56. tearDownWorkerScript(handler, exitStatus)
  57. }
  58. func (handler *workerThread) getActiveRequest() *http.Request {
  59. if handler.workerRequest != nil {
  60. return handler.workerRequest
  61. }
  62. return handler.fakeRequest
  63. }
  64. func (handler *workerThread) name() string {
  65. return "Worker PHP Thread - " + handler.worker.fileName
  66. }
  67. func setupWorkerScript(handler *workerThread, worker *worker) {
  68. handler.backoff.wait()
  69. metrics.StartWorker(worker.fileName)
  70. // Create a dummy request to set up the worker
  71. r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
  72. if err != nil {
  73. panic(err)
  74. }
  75. r, err = NewRequestWithContext(
  76. r,
  77. WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
  78. WithRequestPreparedEnv(worker.env),
  79. )
  80. if err != nil {
  81. panic(err)
  82. }
  83. if err := updateServerContext(handler.thread, r, true, false); err != nil {
  84. panic(err)
  85. }
  86. handler.fakeRequest = r
  87. if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
  88. c.Write(zap.String("worker", worker.fileName), zap.Int("thread", handler.thread.threadIndex))
  89. }
  90. }
  91. func tearDownWorkerScript(handler *workerThread, exitStatus int) {
  92. // if the worker request is not nil, the script might have crashed
  93. // make sure to close the worker request context
  94. if handler.workerRequest != nil {
  95. fc := handler.workerRequest.Context().Value(contextKey).(*FrankenPHPContext)
  96. maybeCloseContext(fc)
  97. handler.workerRequest = nil
  98. }
  99. fc := handler.fakeRequest.Context().Value(contextKey).(*FrankenPHPContext)
  100. fc.exitStatus = exitStatus
  101. defer func() {
  102. handler.fakeRequest = nil
  103. }()
  104. // on exit status 0 we just run the worker script again
  105. worker := handler.worker
  106. if fc.exitStatus == 0 {
  107. // TODO: make the max restart configurable
  108. metrics.StopWorker(worker.fileName, StopReasonRestart)
  109. handler.backoff.recordSuccess()
  110. if c := logger.Check(zapcore.DebugLevel, "restarting"); c != nil {
  111. c.Write(zap.String("worker", worker.fileName))
  112. }
  113. return
  114. }
  115. // TODO: error status
  116. // on exit status 1 we apply an exponential backoff when restarting
  117. metrics.StopWorker(worker.fileName, StopReasonCrash)
  118. if handler.backoff.recordFailure() {
  119. if !watcherIsEnabled {
  120. logger.Panic("too many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
  121. }
  122. logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", handler.backoff.failureCount))
  123. }
  124. }
  125. func (handler *workerThread) waitForWorkerRequest() bool {
  126. // unpin any memory left over from previous requests
  127. handler.thread.Unpin()
  128. handler.state.markAsWaiting(true)
  129. if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
  130. c.Write(zap.String("worker", handler.worker.fileName))
  131. }
  132. if handler.state.compareAndSwap(stateTransitionComplete, stateReady) {
  133. metrics.ReadyWorker(handler.worker.fileName)
  134. }
  135. var r *http.Request
  136. select {
  137. case <-handler.thread.drainChan:
  138. if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
  139. c.Write(zap.String("worker", handler.worker.fileName))
  140. }
  141. // flush the opcache when restarting due to watcher or admin api
  142. // note: this is done right before frankenphp_handle_request() returns 'false'
  143. if handler.state.is(stateRestarting) {
  144. C.frankenphp_reset_opcache()
  145. }
  146. return false
  147. case r = <-handler.thread.requestChan:
  148. case r = <-handler.worker.requestChan:
  149. }
  150. handler.workerRequest = r
  151. handler.state.markAsWaiting(false)
  152. if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
  153. c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI))
  154. }
  155. if err := updateServerContext(handler.thread, r, false, true); err != nil {
  156. // Unexpected error or invalid request
  157. if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
  158. c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
  159. }
  160. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  161. rejectRequest(fc.responseWriter, err.Error())
  162. maybeCloseContext(fc)
  163. handler.workerRequest = nil
  164. return handler.waitForWorkerRequest()
  165. }
  166. return true
  167. }
  168. //export go_frankenphp_worker_handle_request_start
  169. func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
  170. handler := phpThreads[threadIndex].handler.(*workerThread)
  171. return C.bool(handler.waitForWorkerRequest())
  172. }
  173. //export go_frankenphp_finish_worker_request
  174. func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) {
  175. thread := phpThreads[threadIndex]
  176. r := thread.getActiveRequest()
  177. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  178. maybeCloseContext(fc)
  179. thread.handler.(*workerThread).workerRequest = nil
  180. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  181. c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
  182. }
  183. }
  184. // when frankenphp_finish_request() is directly called from PHP
  185. //
  186. //export go_frankenphp_finish_php_request
  187. func go_frankenphp_finish_php_request(threadIndex C.uintptr_t) {
  188. r := phpThreads[threadIndex].getActiveRequest()
  189. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  190. maybeCloseContext(fc)
  191. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  192. c.Write(zap.String("url", r.RequestURI))
  193. }
  194. }