worker.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package frankenphp
  2. // #include <stdlib.h>
  3. // #include "frankenphp.h"
  4. import "C"
  5. import (
  6. "context"
  7. "errors"
  8. "fmt"
  9. "net/http"
  10. "path/filepath"
  11. "runtime/cgo"
  12. "sync"
  13. "go.uber.org/zap"
  14. )
  15. var (
  16. workersRequestChans sync.Map // map[fileName]chan *http.Request
  17. workersReadyWG sync.WaitGroup
  18. )
  19. // TODO: start all the worker in parallell to reduce the boot time
  20. func initWorkers(opt []workerOpt) error {
  21. for _, w := range opt {
  22. if err := startWorkers(w.fileName, w.num); err != nil {
  23. return err
  24. }
  25. }
  26. return nil
  27. }
  28. func startWorkers(fileName string, nbWorkers int) error {
  29. absFileName, err := filepath.Abs(fileName)
  30. if err != nil {
  31. return fmt.Errorf("workers %q: %w", fileName, err)
  32. }
  33. if _, ok := workersRequestChans.Load(absFileName); ok {
  34. return fmt.Errorf("workers %q: already started", absFileName)
  35. }
  36. workersRequestChans.Store(absFileName, make(chan *http.Request))
  37. shutdownWG.Add(nbWorkers)
  38. workersReadyWG.Add(nbWorkers)
  39. var (
  40. m sync.RWMutex
  41. errs []error
  42. )
  43. l := getLogger()
  44. for i := 0; i < nbWorkers; i++ {
  45. go func() {
  46. defer shutdownWG.Done()
  47. for {
  48. // Create main dummy request
  49. fc := &FrankenPHPContext{
  50. Env: map[string]string{"SCRIPT_FILENAME": absFileName},
  51. }
  52. r, err := http.NewRequestWithContext(context.WithValue(
  53. context.Background(),
  54. contextKey,
  55. fc,
  56. ), "GET", "", nil)
  57. if err != nil {
  58. // TODO: this should never happen, maybe can we remove this block?
  59. m.Lock()
  60. defer m.Unlock()
  61. errs = append(errs, fmt.Errorf("workers %q: unable to create main worker request: %w", absFileName, err))
  62. return
  63. }
  64. l.Debug("starting", zap.String("worker", absFileName))
  65. if err := ServeHTTP(nil, r); err != nil {
  66. m.Lock()
  67. defer m.Unlock()
  68. errs = append(errs, fmt.Errorf("workers %q: unable to start: %w", absFileName, err))
  69. return
  70. }
  71. if fc.currentWorkerRequest != 0 {
  72. // Terminate the pending HTTP request handled by the worker
  73. maybeCloseContext(fc.currentWorkerRequest.Value().(*http.Request).Context().Value(contextKey).(*FrankenPHPContext))
  74. fc.currentWorkerRequest.Delete()
  75. fc.currentWorkerRequest = 0
  76. }
  77. // TODO: make the max restart configurable
  78. if _, ok := workersRequestChans.Load(absFileName); ok {
  79. workersReadyWG.Add(1)
  80. l.Error("unexpected termination, restarting", zap.String("worker", absFileName))
  81. } else {
  82. break
  83. }
  84. }
  85. // TODO: check if the termination is expected
  86. l.Debug("terminated", zap.String("worker", absFileName))
  87. }()
  88. }
  89. workersReadyWG.Wait()
  90. m.Lock()
  91. defer m.Unlock()
  92. if len(errs) == 0 {
  93. return nil
  94. }
  95. // Wrapping multiple errors will be available in Go 1.20: https://github.com/golang/go/issues/53435
  96. return fmt.Errorf("workers %q: error while starting: %w", fileName, errors.Join(errs...))
  97. }
  98. func stopWorkers() {
  99. workersRequestChans.Range(func(k, v any) bool {
  100. workersRequestChans.Delete(k)
  101. return true
  102. })
  103. }
  104. //export go_frankenphp_worker_ready
  105. func go_frankenphp_worker_ready() {
  106. workersReadyWG.Done()
  107. }
  108. //export go_frankenphp_worker_handle_request_start
  109. func go_frankenphp_worker_handle_request_start(mrh C.uintptr_t) C.uintptr_t {
  110. mainRequest := cgo.Handle(mrh).Value().(*http.Request)
  111. fc := mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
  112. v, ok := workersRequestChans.Load(fc.Env["SCRIPT_FILENAME"])
  113. if !ok {
  114. // Probably shutting down
  115. return 0
  116. }
  117. rc := v.(chan *http.Request)
  118. l := getLogger()
  119. l.Debug("waiting for request", zap.String("worker", fc.Env["SCRIPT_FILENAME"]))
  120. var r *http.Request
  121. select {
  122. case <-done:
  123. l.Debug("shutting down", zap.String("worker", fc.Env["SCRIPT_FILENAME"]))
  124. return 0
  125. case r = <-rc:
  126. }
  127. fc.currentWorkerRequest = cgo.NewHandle(r)
  128. l.Debug("request handling started", zap.String("worker", fc.Env["SCRIPT_FILENAME"]), zap.String("url", r.RequestURI))
  129. if err := updateServerContext(r, false, mrh); err != nil {
  130. // Unexpected error
  131. l.Debug("unexpected error", zap.String("worker", fc.Env["SCRIPT_FILENAME"]), zap.String("url", r.RequestURI), zap.Error(err))
  132. return 0
  133. }
  134. return C.uintptr_t(fc.currentWorkerRequest)
  135. }
  136. //export go_frankenphp_finish_request
  137. func go_frankenphp_finish_request(mrh, rh C.uintptr_t, deleteHandle bool) {
  138. rHandle := cgo.Handle(rh)
  139. r := rHandle.Value().(*http.Request)
  140. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  141. if deleteHandle {
  142. rHandle.Delete()
  143. cgo.Handle(mrh).Value().(*http.Request).Context().Value(contextKey).(*FrankenPHPContext).currentWorkerRequest = 0
  144. }
  145. maybeCloseContext(fc)
  146. var fields []zap.Field
  147. if mrh == 0 {
  148. fields = append(fields, zap.String("worker", fc.Env["SCRIPT_FILENAME"]), zap.String("url", r.RequestURI))
  149. } else {
  150. fields = append(fields, zap.String("url", r.RequestURI))
  151. }
  152. fc.Logger.Debug("request handling finished", fields...)
  153. }