worker.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package frankenphp
  2. // #include <stdlib.h>
  3. // #include "frankenphp.h"
  4. import "C"
  5. import (
  6. "context"
  7. "fmt"
  8. "net/http"
  9. "path/filepath"
  10. "runtime/cgo"
  11. "sync"
  12. "go.uber.org/zap"
  13. )
  14. var (
  15. workersRequestChans sync.Map // map[fileName]chan *http.Request
  16. workersReadyWG sync.WaitGroup
  17. workersWG sync.WaitGroup
  18. )
  19. func startWorkers(fileName string, nbWorkers int) error {
  20. absFileName, err := filepath.Abs(fileName)
  21. if err != nil {
  22. return fmt.Errorf("workers %q: %w", fileName, err)
  23. }
  24. if _, ok := workersRequestChans.Load(absFileName); ok {
  25. return fmt.Errorf("workers %q: already started", absFileName)
  26. }
  27. workersRequestChans.Store(absFileName, make(chan *http.Request))
  28. shutdownWG.Add(nbWorkers)
  29. workersReadyWG.Add(nbWorkers)
  30. var (
  31. m sync.Mutex
  32. errors []error
  33. )
  34. l := getLogger()
  35. for i := 0; i < nbWorkers; i++ {
  36. go func() {
  37. defer shutdownWG.Done()
  38. for {
  39. // Create main dummy request
  40. r, err := http.NewRequest("GET", "", nil)
  41. if err != nil {
  42. m.Lock()
  43. defer m.Unlock()
  44. errors = append(errors, fmt.Errorf("workers %q: unable to create main worker request: %w", absFileName, err))
  45. return
  46. }
  47. ctx := context.WithValue(
  48. r.Context(),
  49. contextKey,
  50. &FrankenPHPContext{
  51. Env: map[string]string{"SCRIPT_FILENAME": absFileName},
  52. },
  53. )
  54. l.Debug("starting", zap.String("worker", absFileName))
  55. if err := ServeHTTP(nil, r.WithContext(ctx)); err != nil {
  56. m.Lock()
  57. defer m.Unlock()
  58. errors = append(errors, fmt.Errorf("workers %q: unable to start: %w", absFileName, err))
  59. return
  60. }
  61. // TODO: make the max restart configurable
  62. if _, ok := workersRequestChans.Load(absFileName); ok {
  63. l.Error("unexpected termination, restarting", zap.String("worker", absFileName))
  64. } else {
  65. break
  66. }
  67. }
  68. // TODO: check if the termination is expected
  69. l.Debug("terminated", zap.String("worker", absFileName))
  70. }()
  71. }
  72. workersReadyWG.Wait()
  73. m.Lock()
  74. defer m.Unlock()
  75. if len(errors) == 0 {
  76. return nil
  77. }
  78. // Wrapping multiple errors will be available in Go 1.20: https://github.com/golang/go/issues/53435
  79. return fmt.Errorf("workers %q: error while starting: #%v", fileName, errors)
  80. }
  81. func stopWorkers() {
  82. workersRequestChans.Range(func(k, v any) bool {
  83. workersRequestChans.Delete(k)
  84. close(v.(chan *http.Request))
  85. return true
  86. })
  87. }
  88. //export go_frankenphp_worker_ready
  89. func go_frankenphp_worker_ready() {
  90. workersReadyWG.Done()
  91. }
  92. //export go_frankenphp_worker_handle_request_start
  93. func go_frankenphp_worker_handle_request_start(rh C.uintptr_t) C.uintptr_t {
  94. previousRequest := cgo.Handle(rh).Value().(*http.Request)
  95. previousFc := previousRequest.Context().Value(contextKey).(*FrankenPHPContext)
  96. v, ok := workersRequestChans.Load(previousFc.Env["SCRIPT_FILENAME"])
  97. if !ok {
  98. // Probably shutting down
  99. return 0
  100. }
  101. rc := v.(chan *http.Request)
  102. l := getLogger()
  103. l.Debug("waiting for request", zap.String("worker", previousFc.Env["SCRIPT_FILENAME"]))
  104. r, ok := <-rc
  105. if !ok {
  106. // channel closed, server is shutting down
  107. l.Debug("shutting down", zap.String("worker", previousFc.Env["SCRIPT_FILENAME"]))
  108. return 0
  109. }
  110. l.Debug("request handling started", zap.String("worker", previousFc.Env["SCRIPT_FILENAME"]), zap.String("url", r.RequestURI))
  111. if err := updateServerContext(r); err != nil {
  112. // Unexpected error
  113. l.Debug("unexpected error", zap.String("worker", previousFc.Env["SCRIPT_FILENAME"]), zap.String("url", r.RequestURI), zap.Error(err))
  114. return 0
  115. }
  116. return C.uintptr_t(cgo.NewHandle(r))
  117. }
  118. //export go_frankenphp_worker_handle_request_end
  119. func go_frankenphp_worker_handle_request_end(rh C.uintptr_t) {
  120. rHandle := cgo.Handle(rh)
  121. r := rHandle.Value().(*http.Request)
  122. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  123. cgo.Handle(rh).Delete()
  124. close(fc.done)
  125. fc.Logger.Debug("request handling finished", zap.String("worker", fc.Env["SCRIPT_FILENAME"]), zap.String("url", r.RequestURI))
  126. }