worker.go 8.1 KB


  1. package frankenphp
  2. // #include <stdlib.h>
  3. // #include "frankenphp.h"
  4. import "C"
  5. import (
  6. "fmt"
  7. "github.com/dunglas/frankenphp/internal/fastabs"
  8. "net/http"
  9. "path/filepath"
  10. "sync"
  11. "time"
  12. "github.com/dunglas/frankenphp/internal/watcher"
  13. "go.uber.org/zap"
  14. "go.uber.org/zap/zapcore"
  15. )
  16. type worker struct {
  17. fileName string
  18. num int
  19. env PreparedEnv
  20. requestChan chan *http.Request
  21. threads []*phpThread
  22. threadMutex sync.RWMutex
  23. ready chan struct{}
  24. }
  25. var (
  26. watcherIsEnabled bool
  27. workerShutdownWG sync.WaitGroup
  28. workersDone chan interface{}
  29. workers = make(map[string]*worker)
  30. )
  31. func initWorkers(opt []workerOpt) error {
  32. workersDone = make(chan interface{})
  33. ready := sync.WaitGroup{}
  34. for _, o := range opt {
  35. worker, err := newWorker(o)
  36. worker.threads = make([]*phpThread, 0, o.num)
  37. if err != nil {
  38. return err
  39. }
  40. for i := 0; i < worker.num; i++ {
  41. go worker.startNewWorkerThread()
  42. }
  43. ready.Add(1)
  44. go func() {
  45. for i := 0; i < worker.num; i++ {
  46. <-worker.ready
  47. }
  48. ready.Done()
  49. }()
  50. }
  51. ready.Wait()
  52. return nil
  53. }
  54. func newWorker(o workerOpt) (*worker, error) {
  55. absFileName, err := fastabs.FastAbs(o.fileName)
  56. if err != nil {
  57. return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
  58. }
  59. // if the worker already exists, return it,
  60. // it's necessary since we don't want to destroy the channels when restarting on file changes
  61. if w, ok := workers[absFileName]; ok {
  62. return w, nil
  63. }
  64. if o.env == nil {
  65. o.env = make(PreparedEnv, 1)
  66. }
  67. o.env["FRANKENPHP_WORKER\x00"] = "1"
  68. w := &worker{
  69. fileName: absFileName,
  70. num: o.num,
  71. env: o.env,
  72. requestChan: make(chan *http.Request),
  73. ready: make(chan struct{}, o.num),
  74. }
  75. workers[absFileName] = w
  76. return w, nil
  77. }
  78. func (worker *worker) startNewWorkerThread() {
  79. workerShutdownWG.Add(1)
  80. defer workerShutdownWG.Done()
  81. backoff := &exponentialBackoff{
  82. maxBackoff: 1 * time.Second,
  83. minBackoff: 100 * time.Millisecond,
  84. maxConsecutiveFailures: 6,
  85. }
  86. for {
  87. // if the worker can stay up longer than backoff*2, it is probably an application error
  88. backoff.wait()
  89. metrics.StartWorker(worker.fileName)
  90. // Create main dummy request
  91. r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
  92. if err != nil {
  93. panic(err)
  94. }
  95. r, err = NewRequestWithContext(
  96. r,
  97. WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
  98. WithRequestPreparedEnv(worker.env),
  99. )
  100. if err != nil {
  101. panic(err)
  102. }
  103. if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
  104. c.Write(zap.String("worker", worker.fileName), zap.Int("num", worker.num))
  105. }
  106. if err := ServeHTTP(nil, r); err != nil {
  107. panic(err)
  108. }
  109. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  110. // if we are done, exit the loop that restarts the worker script
  111. select {
  112. case _, ok := <-workersDone:
  113. if !ok {
  114. metrics.StopWorker(worker.fileName, StopReasonShutdown)
  115. if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
  116. c.Write(zap.String("worker", worker.fileName))
  117. }
  118. return
  119. }
  120. // continue on since the channel is still open
  121. default:
  122. // continue on since the channel is still open
  123. }
  124. // on exit status 0 we just run the worker script again
  125. if fc.exitStatus == 0 {
  126. // TODO: make the max restart configurable
  127. if c := logger.Check(zapcore.InfoLevel, "restarting"); c != nil {
  128. c.Write(zap.String("worker", worker.fileName))
  129. }
  130. metrics.StopWorker(worker.fileName, StopReasonRestart)
  131. backoff.recordSuccess()
  132. continue
  133. }
  134. // on exit status 1 we log the error and apply an exponential backoff when restarting
  135. if backoff.recordFailure() {
  136. if !watcherIsEnabled {
  137. panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
  138. }
  139. logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", backoff.failureCount))
  140. }
  141. metrics.StopWorker(worker.fileName, StopReasonCrash)
  142. }
  143. // unreachable
  144. }
  145. func (worker *worker) handleRequest(r *http.Request) {
  146. worker.threadMutex.RLock()
  147. // dispatch requests to all worker threads in order
  148. for _, thread := range worker.threads {
  149. select {
  150. case thread.requestChan <- r:
  151. worker.threadMutex.RUnlock()
  152. return
  153. default:
  154. }
  155. }
  156. worker.threadMutex.RUnlock()
  157. // if no thread was available, fan the request out to all threads
  158. // TODO: theoretically there could be autoscaling of threads here
  159. worker.requestChan <- r
  160. }
  161. func stopWorkers() {
  162. close(workersDone)
  163. }
  164. func drainWorkers() {
  165. watcher.DrainWatcher()
  166. watcherIsEnabled = false
  167. stopWorkers()
  168. workerShutdownWG.Wait()
  169. workers = make(map[string]*worker)
  170. }
  171. func restartWorkersOnFileChanges(workerOpts []workerOpt) error {
  172. var directoriesToWatch []string
  173. for _, w := range workerOpts {
  174. directoriesToWatch = append(directoriesToWatch, w.watch...)
  175. }
  176. watcherIsEnabled = len(directoriesToWatch) > 0
  177. if !watcherIsEnabled {
  178. return nil
  179. }
  180. restartWorkers := func() {
  181. restartWorkers(workerOpts)
  182. }
  183. if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
  184. return err
  185. }
  186. return nil
  187. }
  188. func restartWorkers(workerOpts []workerOpt) {
  189. stopWorkers()
  190. workerShutdownWG.Wait()
  191. if err := initWorkers(workerOpts); err != nil {
  192. logger.Error("failed to restart workers when watching files")
  193. panic(err)
  194. }
  195. logger.Info("workers restarted successfully")
  196. }
  197. func assignThreadToWorker(thread *phpThread) {
  198. fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
  199. worker, ok := workers[fc.scriptFilename]
  200. if !ok {
  201. panic("worker not found for script: " + fc.scriptFilename)
  202. }
  203. thread.worker = worker
  204. thread.requestChan = make(chan *http.Request)
  205. worker.threadMutex.Lock()
  206. worker.threads = append(worker.threads, thread)
  207. worker.threadMutex.Unlock()
  208. }
  209. //export go_frankenphp_worker_handle_request_start
  210. func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
  211. thread := phpThreads[threadIndex]
  212. // we assign a worker to the thread if it doesn't have one already
  213. if thread.worker == nil {
  214. assignThreadToWorker(thread)
  215. }
  216. thread.readiedOnce.Do(func() {
  217. // inform metrics that the worker is ready
  218. metrics.ReadyWorker(thread.worker.fileName)
  219. })
  220. select {
  221. case thread.worker.ready <- struct{}{}:
  222. default:
  223. }
  224. if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
  225. c.Write(zap.String("worker", thread.worker.fileName))
  226. }
  227. var r *http.Request
  228. select {
  229. case <-workersDone:
  230. if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
  231. c.Write(zap.String("worker", thread.worker.fileName))
  232. }
  233. thread.worker = nil
  234. C.frankenphp_reset_opcache()
  235. return C.bool(false)
  236. case r = <-thread.worker.requestChan:
  237. case r = <-thread.requestChan:
  238. }
  239. thread.workerRequest = r
  240. if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
  241. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI))
  242. }
  243. if err := updateServerContext(thread, r, false, true); err != nil {
  244. // Unexpected error
  245. if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
  246. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
  247. }
  248. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  249. rejectRequest(fc.responseWriter, err.Error())
  250. maybeCloseContext(fc)
  251. thread.workerRequest = nil
  252. thread.Unpin()
  253. return go_frankenphp_worker_handle_request_start(threadIndex)
  254. }
  255. return C.bool(true)
  256. }
  257. //export go_frankenphp_finish_request
  258. func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool) {
  259. thread := phpThreads[threadIndex]
  260. r := thread.getActiveRequest()
  261. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  262. if isWorkerRequest {
  263. thread.workerRequest = nil
  264. }
  265. maybeCloseContext(fc)
  266. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  267. var fields []zap.Field
  268. if isWorkerRequest {
  269. fields = append(fields, zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
  270. } else {
  271. fields = append(fields, zap.String("url", r.RequestURI))
  272. }
  273. c.Write(fields...)
  274. }
  275. if isWorkerRequest {
  276. thread.Unpin()
  277. }
  278. }