worker.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package frankenphp
  2. // #include <stdlib.h>
  3. // #include "frankenphp.h"
  4. import "C"
  5. import (
  6. "fmt"
  7. "net/http"
  8. "path/filepath"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/dunglas/frankenphp/internal/watcher"
  13. "go.uber.org/zap"
  14. "go.uber.org/zap/zapcore"
  15. )
  16. type worker struct {
  17. fileName string
  18. num int
  19. env PreparedEnv
  20. requestChan chan *http.Request
  21. threads []*phpThread
  22. threadMutex sync.RWMutex
  23. }
  24. const maxWorkerErrorBackoff = 1 * time.Second
  25. const minWorkerErrorBackoff = 100 * time.Millisecond
  26. const maxWorkerConsecutiveFailures = 6
  27. var (
  28. watcherIsEnabled bool
  29. workersReadyWG sync.WaitGroup
  30. workerShutdownWG sync.WaitGroup
  31. workersAreReady atomic.Bool
  32. workersAreDone atomic.Bool
  33. workersDone chan interface{}
  34. workers = make(map[string]*worker)
  35. )
  36. func initWorkers(opt []workerOpt) error {
  37. workersDone = make(chan interface{})
  38. workersAreReady.Store(false)
  39. workersAreDone.Store(false)
  40. for _, o := range opt {
  41. worker, err := newWorker(o)
  42. worker.threads = make([]*phpThread, 0, o.num)
  43. if err != nil {
  44. return err
  45. }
  46. workersReadyWG.Add(worker.num)
  47. for i := 0; i < worker.num; i++ {
  48. go worker.startNewWorkerThread()
  49. }
  50. }
  51. workersReadyWG.Wait()
  52. workersAreReady.Store(true)
  53. return nil
  54. }
  55. func newWorker(o workerOpt) (*worker, error) {
  56. absFileName, err := filepath.Abs(o.fileName)
  57. if err != nil {
  58. return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
  59. }
  60. // if the worker already exists, return it,
  61. // it's necessary since we don't want to destroy the channels when restarting on file changes
  62. if w, ok := workers[absFileName]; ok {
  63. return w, nil
  64. }
  65. if o.env == nil {
  66. o.env = make(PreparedEnv, 1)
  67. }
  68. o.env["FRANKENPHP_WORKER\x00"] = "1"
  69. w := &worker{fileName: absFileName, num: o.num, env: o.env, requestChan: make(chan *http.Request)}
  70. workers[absFileName] = w
  71. return w, nil
  72. }
  73. func (worker *worker) startNewWorkerThread() {
  74. workerShutdownWG.Add(1)
  75. defer workerShutdownWG.Done()
  76. backoff := minWorkerErrorBackoff
  77. failureCount := 0
  78. backingOffLock := sync.RWMutex{}
  79. for {
  80. // if the worker can stay up longer than backoff*2, it is probably an application error
  81. upFunc := sync.Once{}
  82. go func() {
  83. backingOffLock.RLock()
  84. wait := backoff * 2
  85. backingOffLock.RUnlock()
  86. time.Sleep(wait)
  87. upFunc.Do(func() {
  88. backingOffLock.Lock()
  89. defer backingOffLock.Unlock()
  90. // if we come back to a stable state, reset the failure count
  91. if backoff == minWorkerErrorBackoff {
  92. failureCount = 0
  93. }
  94. // earn back the backoff over time
  95. if failureCount > 0 {
  96. backoff = max(backoff/2, 100*time.Millisecond)
  97. }
  98. })
  99. }()
  100. metrics.StartWorker(worker.fileName)
  101. // Create main dummy request
  102. r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
  103. if err != nil {
  104. panic(err)
  105. }
  106. r, err = NewRequestWithContext(
  107. r,
  108. WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
  109. WithRequestPreparedEnv(worker.env),
  110. )
  111. if err != nil {
  112. panic(err)
  113. }
  114. if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
  115. c.Write(zap.String("worker", worker.fileName), zap.Int("num", worker.num))
  116. }
  117. if err := ServeHTTP(nil, r); err != nil {
  118. panic(err)
  119. }
  120. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  121. // if we are done, exit the loop that restarts the worker script
  122. if workersAreDone.Load() {
  123. break
  124. }
  125. // on exit status 0 we just run the worker script again
  126. if fc.exitStatus == 0 {
  127. // TODO: make the max restart configurable
  128. if c := logger.Check(zapcore.InfoLevel, "restarting"); c != nil {
  129. c.Write(zap.String("worker", worker.fileName))
  130. }
  131. metrics.StopWorker(worker.fileName, StopReasonRestart)
  132. continue
  133. }
  134. // on exit status 1 we log the error and apply an exponential backoff when restarting
  135. upFunc.Do(func() {
  136. backingOffLock.Lock()
  137. defer backingOffLock.Unlock()
  138. // if we end up here, the worker has not been up for backoff*2
  139. // this is probably due to a syntax error or another fatal error
  140. if failureCount >= maxWorkerConsecutiveFailures {
  141. if !watcherIsEnabled {
  142. panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
  143. }
  144. logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", failureCount))
  145. }
  146. failureCount += 1
  147. })
  148. backingOffLock.RLock()
  149. wait := backoff
  150. backingOffLock.RUnlock()
  151. time.Sleep(wait)
  152. backingOffLock.Lock()
  153. backoff *= 2
  154. backoff = min(backoff, maxWorkerErrorBackoff)
  155. backingOffLock.Unlock()
  156. metrics.StopWorker(worker.fileName, StopReasonCrash)
  157. }
  158. metrics.StopWorker(worker.fileName, StopReasonShutdown)
  159. // TODO: check if the termination is expected
  160. if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
  161. c.Write(zap.String("worker", worker.fileName))
  162. }
  163. }
  164. func (worker *worker) handleRequest(r *http.Request) {
  165. worker.threadMutex.RLock()
  166. // dispatch requests to all worker threads in order
  167. for _, thread := range worker.threads {
  168. select {
  169. case thread.requestChan <- r:
  170. worker.threadMutex.RUnlock()
  171. return
  172. default:
  173. }
  174. }
  175. worker.threadMutex.RUnlock()
  176. // if no thread was available, fan the request out to all threads
  177. // TODO: theoretically there could be autoscaling of threads here
  178. worker.requestChan <- r
  179. }
  180. func stopWorkers() {
  181. workersAreDone.Store(true)
  182. close(workersDone)
  183. }
  184. func drainWorkers() {
  185. watcher.DrainWatcher()
  186. watcherIsEnabled = false
  187. stopWorkers()
  188. workerShutdownWG.Wait()
  189. workers = make(map[string]*worker)
  190. }
  191. func restartWorkersOnFileChanges(workerOpts []workerOpt) error {
  192. var directoriesToWatch []string
  193. for _, w := range workerOpts {
  194. directoriesToWatch = append(directoriesToWatch, w.watch...)
  195. }
  196. watcherIsEnabled = len(directoriesToWatch) > 0
  197. if !watcherIsEnabled {
  198. return nil
  199. }
  200. restartWorkers := func() {
  201. restartWorkers(workerOpts)
  202. }
  203. if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
  204. return err
  205. }
  206. return nil
  207. }
  208. func restartWorkers(workerOpts []workerOpt) {
  209. stopWorkers()
  210. workerShutdownWG.Wait()
  211. if err := initWorkers(workerOpts); err != nil {
  212. logger.Error("failed to restart workers when watching files")
  213. panic(err)
  214. }
  215. logger.Info("workers restarted successfully")
  216. }
  217. func assignThreadToWorker(thread *phpThread) {
  218. fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
  219. metrics.ReadyWorker(fc.scriptFilename)
  220. worker, ok := workers[fc.scriptFilename]
  221. if !ok {
  222. panic("worker not found for script: " + fc.scriptFilename)
  223. }
  224. thread.worker = worker
  225. if !workersAreReady.Load() {
  226. workersReadyWG.Done()
  227. }
  228. thread.requestChan = make(chan *http.Request)
  229. worker.threadMutex.Lock()
  230. worker.threads = append(worker.threads, thread)
  231. worker.threadMutex.Unlock()
  232. }
  233. //export go_frankenphp_worker_handle_request_start
  234. func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
  235. thread := phpThreads[threadIndex]
  236. // we assign a worker to the thread if it doesn't have one already
  237. if thread.worker == nil {
  238. assignThreadToWorker(thread)
  239. }
  240. if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
  241. c.Write(zap.String("worker", thread.worker.fileName))
  242. }
  243. var r *http.Request
  244. select {
  245. case <-workersDone:
  246. if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
  247. c.Write(zap.String("worker", thread.worker.fileName))
  248. }
  249. thread.worker = nil
  250. executePHPFunction("opcache_reset")
  251. return C.bool(false)
  252. case r = <-thread.worker.requestChan:
  253. case r = <-thread.requestChan:
  254. }
  255. thread.workerRequest = r
  256. if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
  257. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI))
  258. }
  259. if err := updateServerContext(thread, r, false, true); err != nil {
  260. // Unexpected error
  261. if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
  262. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
  263. }
  264. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  265. rejectRequest(fc.responseWriter, err.Error())
  266. maybeCloseContext(fc)
  267. thread.workerRequest = nil
  268. thread.Unpin()
  269. return go_frankenphp_worker_handle_request_start(threadIndex)
  270. }
  271. return C.bool(true)
  272. }
  273. //export go_frankenphp_finish_request
  274. func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool) {
  275. thread := phpThreads[threadIndex]
  276. r := thread.getActiveRequest()
  277. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  278. if isWorkerRequest {
  279. thread.workerRequest = nil
  280. }
  281. maybeCloseContext(fc)
  282. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  283. var fields []zap.Field
  284. if isWorkerRequest {
  285. fields = append(fields, zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
  286. } else {
  287. fields = append(fields, zap.String("url", r.RequestURI))
  288. }
  289. c.Write(fields...)
  290. }
  291. if isWorkerRequest {
  292. thread.Unpin()
  293. }
  294. }