worker.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. package frankenphp
  2. // #include <stdlib.h>
  3. // #include "frankenphp.h"
  4. import "C"
  5. import (
  6. "fmt"
  7. "net/http"
  8. "path/filepath"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/dunglas/frankenphp/internal/watcher"
  13. "go.uber.org/zap"
  14. "go.uber.org/zap/zapcore"
  15. )
  16. type worker struct {
  17. fileName string
  18. num int
  19. env PreparedEnv
  20. requestChan chan *http.Request
  21. }
  22. const maxWorkerErrorBackoff = 1 * time.Second
  23. const minWorkerErrorBackoff = 100 * time.Millisecond
  24. const maxWorkerConsecutiveFailures = 6
  25. var (
  26. watcherIsEnabled bool
  27. workersReadyWG sync.WaitGroup
  28. workerShutdownWG sync.WaitGroup
  29. workersAreReady atomic.Bool
  30. workersAreDone atomic.Bool
  31. workersDone chan interface{}
  32. workers = make(map[string]*worker)
  33. )
  34. func initWorkers(opt []workerOpt) error {
  35. workersDone = make(chan interface{})
  36. workersAreReady.Store(false)
  37. workersAreDone.Store(false)
  38. for _, o := range opt {
  39. worker, err := newWorker(o)
  40. if err != nil {
  41. return err
  42. }
  43. workersReadyWG.Add(worker.num)
  44. for i := 0; i < worker.num; i++ {
  45. go worker.startNewWorkerThread()
  46. }
  47. }
  48. workersReadyWG.Wait()
  49. workersAreReady.Store(true)
  50. return nil
  51. }
  52. func newWorker(o workerOpt) (*worker, error) {
  53. absFileName, err := filepath.Abs(o.fileName)
  54. if err != nil {
  55. return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
  56. }
  57. // if the worker already exists, return it
  58. // it's necessary since we don't want to destroy the channels when restarting on file changes
  59. if w, ok := workers[absFileName]; ok {
  60. return w, nil
  61. }
  62. if o.env == nil {
  63. o.env = make(PreparedEnv, 1)
  64. }
  65. o.env["FRANKENPHP_WORKER\x00"] = "1"
  66. w := &worker{fileName: absFileName, num: o.num, env: o.env, requestChan: make(chan *http.Request)}
  67. workers[absFileName] = w
  68. return w, nil
  69. }
  70. func (worker *worker) startNewWorkerThread() {
  71. workerShutdownWG.Add(1)
  72. defer workerShutdownWG.Done()
  73. backoff := minWorkerErrorBackoff
  74. failureCount := 0
  75. backingOffLock := sync.RWMutex{}
  76. for {
  77. // if the worker can stay up longer than backoff*2, it is probably an application error
  78. upFunc := sync.Once{}
  79. go func() {
  80. backingOffLock.RLock()
  81. wait := backoff * 2
  82. backingOffLock.RUnlock()
  83. time.Sleep(wait)
  84. upFunc.Do(func() {
  85. backingOffLock.Lock()
  86. defer backingOffLock.Unlock()
  87. // if we come back to a stable state, reset the failure count
  88. if backoff == minWorkerErrorBackoff {
  89. failureCount = 0
  90. }
  91. // earn back the backoff over time
  92. if failureCount > 0 {
  93. backoff = max(backoff/2, 100*time.Millisecond)
  94. }
  95. })
  96. }()
  97. metrics.StartWorker(worker.fileName)
  98. // Create main dummy request
  99. r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
  100. if err != nil {
  101. panic(err)
  102. }
  103. r, err = NewRequestWithContext(
  104. r,
  105. WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
  106. WithRequestPreparedEnv(worker.env),
  107. )
  108. if err != nil {
  109. panic(err)
  110. }
  111. if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
  112. c.Write(zap.String("worker", worker.fileName), zap.Int("num", worker.num))
  113. }
  114. if err := ServeHTTP(nil, r); err != nil {
  115. panic(err)
  116. }
  117. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  118. // if we are done, exit the loop that restarts the worker script
  119. if workersAreDone.Load() {
  120. break
  121. }
  122. // on exit status 0 we just run the worker script again
  123. if fc.exitStatus == 0 {
  124. // TODO: make the max restart configurable
  125. if c := logger.Check(zapcore.InfoLevel, "restarting"); c != nil {
  126. c.Write(zap.String("worker", worker.fileName))
  127. }
  128. metrics.StopWorker(worker.fileName, StopReasonRestart)
  129. continue
  130. }
  131. // on exit status 1 we log the error and apply an exponential backoff when restarting
  132. upFunc.Do(func() {
  133. backingOffLock.Lock()
  134. defer backingOffLock.Unlock()
  135. // if we end up here, the worker has not been up for backoff*2
  136. // this is probably due to a syntax error or another fatal error
  137. if failureCount >= maxWorkerConsecutiveFailures {
  138. if !watcherIsEnabled {
  139. panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
  140. }
  141. logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", failureCount))
  142. }
  143. failureCount += 1
  144. })
  145. backingOffLock.RLock()
  146. wait := backoff
  147. backingOffLock.RUnlock()
  148. time.Sleep(wait)
  149. backingOffLock.Lock()
  150. backoff *= 2
  151. backoff = min(backoff, maxWorkerErrorBackoff)
  152. backingOffLock.Unlock()
  153. metrics.StopWorker(worker.fileName, StopReasonCrash)
  154. }
  155. metrics.StopWorker(worker.fileName, StopReasonShutdown)
  156. // TODO: check if the termination is expected
  157. if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
  158. c.Write(zap.String("worker", worker.fileName))
  159. }
  160. }
  161. func stopWorkers() {
  162. workersAreDone.Store(true)
  163. close(workersDone)
  164. }
  165. func drainWorkers() {
  166. watcher.DrainWatcher()
  167. watcherIsEnabled = false
  168. stopWorkers()
  169. workerShutdownWG.Wait()
  170. workers = make(map[string]*worker)
  171. }
  172. func restartWorkersOnFileChanges(workerOpts []workerOpt) error {
  173. var directoriesToWatch []string
  174. for _, w := range workerOpts {
  175. directoriesToWatch = append(directoriesToWatch, w.watch...)
  176. }
  177. watcherIsEnabled = len(directoriesToWatch) > 0
  178. if !watcherIsEnabled {
  179. return nil
  180. }
  181. restartWorkers := func() {
  182. restartWorkers(workerOpts)
  183. }
  184. if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
  185. return err
  186. }
  187. return nil
  188. }
  189. func restartWorkers(workerOpts []workerOpt) {
  190. stopWorkers()
  191. workerShutdownWG.Wait()
  192. if err := initWorkers(workerOpts); err != nil {
  193. logger.Error("failed to restart workers when watching files")
  194. panic(err)
  195. }
  196. logger.Info("workers restarted successfully")
  197. }
  198. func assignThreadToWorker(thread *phpThread) {
  199. fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
  200. metrics.ReadyWorker(fc.scriptFilename)
  201. worker, ok := workers[fc.scriptFilename]
  202. if !ok {
  203. panic("worker not found for script: " + fc.scriptFilename)
  204. }
  205. thread.worker = worker
  206. if !workersAreReady.Load() {
  207. workersReadyWG.Done()
  208. }
  209. // TODO: we can also store all threads assigned to the worker if needed
  210. }
  211. //export go_frankenphp_worker_handle_request_start
  212. func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
  213. thread := phpThreads[threadIndex]
  214. // we assign a worker to the thread if it doesn't have one already
  215. if thread.worker == nil {
  216. assignThreadToWorker(thread)
  217. }
  218. if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
  219. c.Write(zap.String("worker", thread.worker.fileName))
  220. }
  221. var r *http.Request
  222. select {
  223. case <-workersDone:
  224. if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
  225. c.Write(zap.String("worker", thread.worker.fileName))
  226. }
  227. thread.worker = nil
  228. executePHPFunction("opcache_reset")
  229. return C.bool(false)
  230. case r = <-thread.worker.requestChan:
  231. }
  232. thread.workerRequest = r
  233. if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
  234. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI))
  235. }
  236. if err := updateServerContext(r, false, true); err != nil {
  237. // Unexpected error
  238. if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
  239. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
  240. }
  241. return C.bool(false)
  242. }
  243. return C.bool(true)
  244. }
  245. //export go_frankenphp_finish_request
  246. func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool) {
  247. thread := phpThreads[threadIndex]
  248. r := thread.getActiveRequest()
  249. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  250. if isWorkerRequest {
  251. thread.workerRequest = nil
  252. }
  253. maybeCloseContext(fc)
  254. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  255. var fields []zap.Field
  256. if isWorkerRequest {
  257. fields = append(fields, zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
  258. } else {
  259. fields = append(fields, zap.String("url", r.RequestURI))
  260. }
  261. c.Write(fields...)
  262. }
  263. thread.Unpin()
  264. }