worker.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. package frankenphp
  2. // #include <stdlib.h>
  3. // #include "frankenphp.h"
  4. import "C"
  5. import (
  6. "fmt"
  7. "github.com/dunglas/frankenphp/internal/fastabs"
  8. "net/http"
  9. "path/filepath"
  10. "sync"
  11. "time"
  12. "github.com/dunglas/frankenphp/internal/watcher"
  13. "go.uber.org/zap"
  14. "go.uber.org/zap/zapcore"
  15. )
  16. type worker struct {
  17. fileName string
  18. num int
  19. env PreparedEnv
  20. requestChan chan *http.Request
  21. threads []*phpThread
  22. threadMutex sync.RWMutex
  23. ready chan struct{}
  24. }
  25. const maxWorkerErrorBackoff = 1 * time.Second
  26. const minWorkerErrorBackoff = 100 * time.Millisecond
  27. const maxWorkerConsecutiveFailures = 6
  28. var (
  29. watcherIsEnabled bool
  30. workerShutdownWG sync.WaitGroup
  31. workersDone chan interface{}
  32. workers = make(map[string]*worker)
  33. )
  34. func initWorkers(opt []workerOpt) error {
  35. workersDone = make(chan interface{})
  36. ready := sync.WaitGroup{}
  37. for _, o := range opt {
  38. worker, err := newWorker(o)
  39. worker.threads = make([]*phpThread, 0, o.num)
  40. if err != nil {
  41. return err
  42. }
  43. for i := 0; i < worker.num; i++ {
  44. go worker.startNewWorkerThread()
  45. }
  46. ready.Add(1)
  47. go func() {
  48. for i := 0; i < worker.num; i++ {
  49. <-worker.ready
  50. }
  51. ready.Done()
  52. }()
  53. }
  54. ready.Wait()
  55. return nil
  56. }
  57. func newWorker(o workerOpt) (*worker, error) {
  58. absFileName, err := fastabs.FastAbs(o.fileName)
  59. if err != nil {
  60. return nil, fmt.Errorf("worker filename is invalid %q: %w", o.fileName, err)
  61. }
  62. // if the worker already exists, return it,
  63. // it's necessary since we don't want to destroy the channels when restarting on file changes
  64. if w, ok := workers[absFileName]; ok {
  65. return w, nil
  66. }
  67. if o.env == nil {
  68. o.env = make(PreparedEnv, 1)
  69. }
  70. o.env["FRANKENPHP_WORKER\x00"] = "1"
  71. w := &worker{
  72. fileName: absFileName,
  73. num: o.num,
  74. env: o.env,
  75. requestChan: make(chan *http.Request),
  76. ready: make(chan struct{}, o.num),
  77. }
  78. workers[absFileName] = w
  79. return w, nil
  80. }
  81. func (worker *worker) startNewWorkerThread() {
  82. workerShutdownWG.Add(1)
  83. defer workerShutdownWG.Done()
  84. backoff := minWorkerErrorBackoff
  85. failureCount := 0
  86. backingOffLock := sync.RWMutex{}
  87. for {
  88. // if the worker can stay up longer than backoff*2, it is probably an application error
  89. upFunc := sync.Once{}
  90. go func() {
  91. backingOffLock.RLock()
  92. wait := backoff * 2
  93. backingOffLock.RUnlock()
  94. time.Sleep(wait)
  95. upFunc.Do(func() {
  96. backingOffLock.Lock()
  97. defer backingOffLock.Unlock()
  98. // if we come back to a stable state, reset the failure count
  99. if backoff == minWorkerErrorBackoff {
  100. failureCount = 0
  101. }
  102. // earn back the backoff over time
  103. if failureCount > 0 {
  104. backoff = max(backoff/2, 100*time.Millisecond)
  105. }
  106. })
  107. }()
  108. metrics.StartWorker(worker.fileName)
  109. // Create main dummy request
  110. r, err := http.NewRequest(http.MethodGet, filepath.Base(worker.fileName), nil)
  111. if err != nil {
  112. panic(err)
  113. }
  114. r, err = NewRequestWithContext(
  115. r,
  116. WithRequestDocumentRoot(filepath.Dir(worker.fileName), false),
  117. WithRequestPreparedEnv(worker.env),
  118. )
  119. if err != nil {
  120. panic(err)
  121. }
  122. if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
  123. c.Write(zap.String("worker", worker.fileName), zap.Int("num", worker.num))
  124. }
  125. if err := ServeHTTP(nil, r); err != nil {
  126. panic(err)
  127. }
  128. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  129. // if we are done, exit the loop that restarts the worker script
  130. select {
  131. case _, ok := <-workersDone:
  132. if !ok {
  133. metrics.StopWorker(worker.fileName, StopReasonShutdown)
  134. if c := logger.Check(zapcore.DebugLevel, "terminated"); c != nil {
  135. c.Write(zap.String("worker", worker.fileName))
  136. }
  137. return
  138. }
  139. // continue on since the channel is still open
  140. default:
  141. // continue on since the channel is still open
  142. }
  143. // on exit status 0 we just run the worker script again
  144. if fc.exitStatus == 0 {
  145. // TODO: make the max restart configurable
  146. if c := logger.Check(zapcore.InfoLevel, "restarting"); c != nil {
  147. c.Write(zap.String("worker", worker.fileName))
  148. }
  149. metrics.StopWorker(worker.fileName, StopReasonRestart)
  150. continue
  151. }
  152. // on exit status 1 we log the error and apply an exponential backoff when restarting
  153. upFunc.Do(func() {
  154. backingOffLock.Lock()
  155. defer backingOffLock.Unlock()
  156. // if we end up here, the worker has not been up for backoff*2
  157. // this is probably due to a syntax error or another fatal error
  158. if failureCount >= maxWorkerConsecutiveFailures {
  159. if !watcherIsEnabled {
  160. panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
  161. }
  162. logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", failureCount))
  163. }
  164. failureCount += 1
  165. })
  166. backingOffLock.RLock()
  167. wait := backoff
  168. backingOffLock.RUnlock()
  169. time.Sleep(wait)
  170. backingOffLock.Lock()
  171. backoff *= 2
  172. backoff = min(backoff, maxWorkerErrorBackoff)
  173. backingOffLock.Unlock()
  174. metrics.StopWorker(worker.fileName, StopReasonCrash)
  175. }
  176. // unreachable
  177. }
  178. func (worker *worker) handleRequest(r *http.Request) {
  179. worker.threadMutex.RLock()
  180. // dispatch requests to all worker threads in order
  181. for _, thread := range worker.threads {
  182. select {
  183. case thread.requestChan <- r:
  184. worker.threadMutex.RUnlock()
  185. return
  186. default:
  187. }
  188. }
  189. worker.threadMutex.RUnlock()
  190. // if no thread was available, fan the request out to all threads
  191. // TODO: theoretically there could be autoscaling of threads here
  192. worker.requestChan <- r
  193. }
  194. func stopWorkers() {
  195. close(workersDone)
  196. }
  197. func drainWorkers() {
  198. watcher.DrainWatcher()
  199. watcherIsEnabled = false
  200. stopWorkers()
  201. workerShutdownWG.Wait()
  202. workers = make(map[string]*worker)
  203. }
  204. func restartWorkersOnFileChanges(workerOpts []workerOpt) error {
  205. var directoriesToWatch []string
  206. for _, w := range workerOpts {
  207. directoriesToWatch = append(directoriesToWatch, w.watch...)
  208. }
  209. watcherIsEnabled = len(directoriesToWatch) > 0
  210. if !watcherIsEnabled {
  211. return nil
  212. }
  213. restartWorkers := func() {
  214. restartWorkers(workerOpts)
  215. }
  216. if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil {
  217. return err
  218. }
  219. return nil
  220. }
  221. func restartWorkers(workerOpts []workerOpt) {
  222. stopWorkers()
  223. workerShutdownWG.Wait()
  224. if err := initWorkers(workerOpts); err != nil {
  225. logger.Error("failed to restart workers when watching files")
  226. panic(err)
  227. }
  228. logger.Info("workers restarted successfully")
  229. }
  230. func assignThreadToWorker(thread *phpThread) {
  231. fc := thread.mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
  232. worker, ok := workers[fc.scriptFilename]
  233. if !ok {
  234. panic("worker not found for script: " + fc.scriptFilename)
  235. }
  236. thread.worker = worker
  237. thread.requestChan = make(chan *http.Request)
  238. worker.threadMutex.Lock()
  239. worker.threads = append(worker.threads, thread)
  240. worker.threadMutex.Unlock()
  241. }
  242. //export go_frankenphp_worker_handle_request_start
  243. func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
  244. thread := phpThreads[threadIndex]
  245. // we assign a worker to the thread if it doesn't have one already
  246. if thread.worker == nil {
  247. assignThreadToWorker(thread)
  248. }
  249. thread.readiedOnce.Do(func() {
  250. // inform metrics that the worker is ready
  251. metrics.ReadyWorker(thread.worker.fileName)
  252. })
  253. select {
  254. case thread.worker.ready <- struct{}{}:
  255. default:
  256. }
  257. if c := logger.Check(zapcore.DebugLevel, "waiting for request"); c != nil {
  258. c.Write(zap.String("worker", thread.worker.fileName))
  259. }
  260. var r *http.Request
  261. select {
  262. case <-workersDone:
  263. if c := logger.Check(zapcore.DebugLevel, "shutting down"); c != nil {
  264. c.Write(zap.String("worker", thread.worker.fileName))
  265. }
  266. thread.worker = nil
  267. C.frankenphp_reset_opcache()
  268. return C.bool(false)
  269. case r = <-thread.worker.requestChan:
  270. case r = <-thread.requestChan:
  271. }
  272. thread.workerRequest = r
  273. if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
  274. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI))
  275. }
  276. if err := updateServerContext(thread, r, false, true); err != nil {
  277. // Unexpected error
  278. if c := logger.Check(zapcore.DebugLevel, "unexpected error"); c != nil {
  279. c.Write(zap.String("worker", thread.worker.fileName), zap.String("url", r.RequestURI), zap.Error(err))
  280. }
  281. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  282. rejectRequest(fc.responseWriter, err.Error())
  283. maybeCloseContext(fc)
  284. thread.workerRequest = nil
  285. thread.Unpin()
  286. return go_frankenphp_worker_handle_request_start(threadIndex)
  287. }
  288. return C.bool(true)
  289. }
  290. //export go_frankenphp_finish_request
  291. func go_frankenphp_finish_request(threadIndex C.uintptr_t, isWorkerRequest bool) {
  292. thread := phpThreads[threadIndex]
  293. r := thread.getActiveRequest()
  294. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  295. if isWorkerRequest {
  296. thread.workerRequest = nil
  297. }
  298. maybeCloseContext(fc)
  299. if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
  300. var fields []zap.Field
  301. if isWorkerRequest {
  302. fields = append(fields, zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
  303. } else {
  304. fields = append(fields, zap.String("url", r.RequestURI))
  305. }
  306. c.Write(fields...)
  307. }
  308. if isWorkerRequest {
  309. thread.Unpin()
  310. }
  311. }