worker.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package frankenphp
  2. // #include <stdlib.h>
  3. // #include "frankenphp.h"
  4. import "C"
  5. import (
  6. "fmt"
  7. "github.com/dunglas/frankenphp/internal/fastabs"
  8. "net/http"
  9. "path/filepath"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/dunglas/frankenphp/internal/watcher"
  14. "go.uber.org/zap"
  15. "go.uber.org/zap/zapcore"
  16. )
  17. type worker struct {
  18. fileName string
  19. num int
  20. env PreparedEnv
  21. requestChan chan *http.Request
  22. threads []*phpThread
  23. threadMutex sync.RWMutex
  24. }
  25. const maxWorkerErrorBackoff = 1 * time.Second
  26. const minWorkerErrorBackoff = 100 * time.Millisecond
  27. const maxWorkerConsecutiveFailures = 6
  28. var (
  29. watcherIsEnabled bool
  30. workersReadyWG sync.WaitGroup
  31. workerShutdownWG sync.WaitGroup
  32. workersAreReady atomic.Bool
  33. workersAreDone atomic.Bool
  34. workersDone chan interface{}
  35. workers = make(map[string]*worker)
  36. )
  37. func initWorkers(opt []workerOpt) error {
  38. workersDone = make(chan interface{})
  39. workersAreReady.Store(false)
  40. workersAreDone.Store(false)
  41. for _, o := range opt {
  42. worker, err := newWorker(o)
  43. worker.threads = make([]*phpThread, 0, o.num)
  44. if err != nil {
  45. return err
  46. }
  47. workersReadyWG.Add(worker.num)
  48. for i := 0; i < worker.num; i++ {
  49. go worker.startNewWorkerThread()
  50. }
  51. }
  52. workersReadyWG.Wait()
  53. workersAreReady.Store(true)
  54. return nil
  55. }
  56. func newWorker(o workerOpt) (*worker, error) {
  57. absFileName, err := fastabs.FastAbs(o.fileName)
  58. if err != nil {
  59. return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
  60. }
  61. // if the worker already exists, return it,
  62. // it's necessary since we don't want to destroy the channels when restarting on file changes
  63. if w, ok := workers[absFileName]; ok {
  64. return w, nil
  65. }
  66. if o.env == nil {
  67. o.env = make(PreparedEnv, 1)
  68. }
  69. o.env["FRANKENPHP_WORKER\x00"] = "1"
  70. w := &worker{fileName: absFileName, num: o.num, env: o.env, requestChan: make(chan *http.Request)}
  71. workers[absFileName] = w
  72. return w, nil
  73. }
  74. func (worker *worker) startNewWorkerThread() {
  75. workerShutdownWG.Add(1)
  76. defer workerShutdownWG.Done()
  77. backoff := minWorkerErrorBackoff
  78. failureCount := 0
  79. backingOffLock := sync.RWMutex{}
  80. for {
  81. // if the worker can stay up longer than backoff*2, it is probably an application error
  82. upFunc := sync.Once{}
  83. go func() {
  84. backingOffLock.RLock()
  85. wait := backoff * 2
  86. backingOffLock.RUnlock()
  87. time.Sleep(wait)
  88. upFunc.Do(func() {
  89. backingOffLock.Lock()
  90. defer backingOffLock.Unlock()
  91. // if we come back to a stable state, reset the failure count
  92. if backoff == minWorkerErrorBackoff {
  93. failureCount = 0
  94. }
  95. // earn back the backoff over time
  96. if failureCount > 0 {
  97. backoff = max(backoff/2, 100*time.Millisecond)
  98. }
  99. })
  100. }()
  101. metrics.StartWorker(worker.fileName)
  102. // Create main dummy request
  103. r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
  104. if err != nil {
  105. panic(err)
  106. }
  107. r, err = NewRequestWithContext(
  108. r,
  109. WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
  110. WithRequestPreparedEnv(worker.env),
  111. )
  112. if err != nil {
  113. panic(err)
  114. }
  115. if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
  116. c.Write(zap.String("worker", worker.fileName), zap.Int("num", worker.num))
  117. }
  118. if err := ServeHTTP(nil, r); err != nil {
  119. panic(err)
  120. }
  121. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  122. // if we are done, exit the loop that restarts the worker script
  123. if workersAreDone.Load() {
  124. break
  125. }
  126. // on exit status 0 we just run the worker script again
  127. if fc.exitStatus == 0 {
  128. // TODO: make the max restart configurable
  129. if c := logger.Check(zapcore.InfoLevel, "restarting"); c != nil {
  130. c.Write(zap.String("worker", worker.fileName))
  131. }
  132. metrics.StopWorker(worker.fileName, StopReasonRestart)
  133. continue
  134. }
  135. // on exit status 1 we log the error and apply an exponential backoff when restarting
  136. upFunc.Do(func() {
  137. backingOffLock.Lock()
  138. defer backingOffLock.Unlock()
  139. // if we end up here, the worker has not been up for backoff*2
  140. // this is probably due to a syntax error or another fatal error
  141. if failureCount >= maxWorkerConsecutiveFailures {
  142. if !watcherIsEnabled {
  143. panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
  144. }
  145. logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", failureCount))
  146. }
  147. failureCount += 1
  148. })
  149. backingOffLock.RLock()
  150. wait := backoff
  151. backingOffLock.RUnlock()
  152. time.Sleep(wait)
  153. backingOffLock.Lock()
  154. backoff *= 2
  155. backoff = min(backoff, maxWorkerErrorBackoff)
  156. backingOffLock.Unlock()
  157. metrics.StopWorker(worker.fileName, StopReasonCrash)
  158. }
  159. metrics.StopWorker(worker.fileName, StopReasonShutdown)
  160. // TODO: check if the termination is expected
  161. if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
  162. c.Write(zap.String("worker", worker.fileName))
  163. }
  164. }
  165. func (worker *worker) handleRequest(r *http.Request) {
  166. worker.threadMutex.RLock()
  167. // dispatch requests to all worker threads in order
  168. for _, thread := range worker.threads {
  169. select {
  170. case thread.requestChan <- r:
  171. worker.threadMutex.RUnlock()
  172. return
  173. default:
  174. }
  175. }
  176. worker.threadMutex.RUnlock()
  177. // if no thread was available, fan the request out to all threads
  178. // TODO: theoretically there could be autoscaling of threads here
  179. worker.requestChan <- r
  180. }
  181. func stopWorkers() {
  182. workersAreDone.Store(true)
  183. close(workersDone)
  184. }
  185. func drainWorkers() {
  186. watcher.DrainWatcher()
  187. watcherIsEnabled = false
  188. stopWorkers()
  189. workerShutdownWG.Wait()
  190. workers = make(map[string]*worker)
  191. }
  192. func restartWorkersOnFileChanges(workerOpts []workerOpt) error {
  193. var directoriesToWatch []string
  194. for _, w := range workerOpts {
  195. directoriesToWatch = append(directoriesToWatch, w.watch...)
  196. }
  197. watcherIsEnabled = len(directoriesToWatch) > 0
  198. if !watcherIsEnabled {
  199. return nil
  200. }
  201. restartWorkers := func() {
  202. restartWorkers(workerOpts)
  203. }
  204. if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
  205. return err
  206. }
  207. return nil
  208. }
  209. func restartWorkers(workerOpts []workerOpt) {
  210. stopWorkers()
  211. workerShutdownWG.Wait()
  212. if err := initWorkers(workerOpts); err != nil {
  213. logger.Error("failed to restart workers when watching files")
  214. panic(err)
  215. }
  216. logger.Info("workers restarted successfully")
  217. }
  218. func assignThreadToWorker(thread *phpThread) {
  219. fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
  220. metrics.ReadyWorker(fc.scriptFilename)
  221. worker, ok := workers[fc.scriptFilename]
  222. if !ok {
  223. panic("worker not found for script: " + fc.scriptFilename)
  224. }
  225. thread.worker = worker
  226. if !workersAreReady.Load() {
  227. workersReadyWG.Done()
  228. }
  229. thread.requestChan = make(chan *http.Request)
  230. worker.threadMutex.Lock()
  231. worker.threads = append(worker.threads, thread)
  232. worker.threadMutex.Unlock()
  233. }
  234. //export go_frankenphp_worker_handle_request_start
  235. func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
  236. thread := phpThreads[threadIndex]
  237. // we assign a worker to the thread if it doesn't have one already
  238. if thread.worker == nil {
  239. assignThreadToWorker(thread)
  240. }
  241. if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
  242. c.Write(zap.String("worker", thread.worker.fileName))
  243. }
  244. var r *http.Request
  245. select {
  246. case <-workersDone:
  247. if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
  248. c.Write(zap.String("worker", thread.worker.fileName))
  249. }
  250. thread.worker = nil
  251. C.frankenphp_reset_opcache()
  252. return C.bool(false)
  253. case r = <-thread.worker.requestChan:
  254. case r = <-thread.requestChan:
  255. }
  256. thread.workerRequest = r
  257. if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
  258. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI))
  259. }
  260. if err := updateServerContext(thread, r, false, true); err != nil {
  261. // Unexpected error
  262. if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
  263. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
  264. }
  265. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  266. rejectRequest(fc.responseWriter, err.Error())
  267. maybeCloseContext(fc)
  268. thread.workerRequest = nil
  269. thread.Unpin()
  270. return go_frankenphp_worker_handle_request_start(threadIndex)
  271. }
  272. return C.bool(true)
  273. }
  274. //export go_frankenphp_finish_request
  275. func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool) {
  276. thread := phpThreads[threadIndex]
  277. r := thread.getActiveRequest()
  278. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  279. if isWorkerRequest {
  280. thread.workerRequest = nil
  281. }
  282. maybeCloseContext(fc)
  283. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  284. var fields []zap.Field
  285. if isWorkerRequest {
  286. fields = append(fields, zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
  287. } else {
  288. fields = append(fields, zap.String("url", r.RequestURI))
  289. }
  290. c.Write(fields...)
  291. }
  292. if isWorkerRequest {
  293. thread.Unpin()
  294. }
  295. }