worker.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package frankenphp
  2. // #include <stdlib.h>
  3. // #include "frankenphp.h"
  4. import "C"
  5. import (
  6. "context"
  7. "fmt"
  8. "net/http"
  9. "path/filepath"
  10. "runtime/cgo"
  11. "sync"
  12. "go.uber.org/zap"
  13. )
  14. var (
  15. workersRequestChans sync.Map // map[fileName]chan *http.Request
  16. workersReadyWG sync.WaitGroup
  17. workersWG sync.WaitGroup
  18. )
  19. func startWorkers(fileName string, nbWorkers int) error {
  20. absFileName, err := filepath.Abs(fileName)
  21. if err != nil {
  22. return fmt.Errorf("workers %q: %w", fileName, err)
  23. }
  24. if _, ok := workersRequestChans.Load(absFileName); ok {
  25. return fmt.Errorf("workers %q: already started", absFileName)
  26. }
  27. workersRequestChans.Store(absFileName, make(chan *http.Request))
  28. shutdownWG.Add(nbWorkers)
  29. workersReadyWG.Add(nbWorkers)
  30. var (
  31. m sync.Mutex
  32. errors []error
  33. )
  34. l := getLogger()
  35. for i := 0; i < nbWorkers; i++ {
  36. go func() {
  37. defer shutdownWG.Done()
  38. for {
  39. // Create main dummy request
  40. fc := &FrankenPHPContext{
  41. Env: map[string]string{"SCRIPT_FILENAME": absFileName},
  42. }
  43. r, err := http.NewRequestWithContext(context.WithValue(
  44. context.Background(),
  45. contextKey,
  46. fc,
  47. ), "GET", "", nil)
  48. if err != nil {
  49. // TODO: this should never happen, maybe can we remove this block?
  50. m.Lock()
  51. defer m.Unlock()
  52. errors = append(errors, fmt.Errorf("workers %q: unable to create main worker request: %w", absFileName, err))
  53. return
  54. }
  55. l.Debug("starting", zap.String("worker", absFileName))
  56. if err := ServeHTTP(nil, r); err != nil {
  57. m.Lock()
  58. defer m.Unlock()
  59. errors = append(errors, fmt.Errorf("workers %q: unable to start: %w", absFileName, err))
  60. return
  61. }
  62. if fc.currentWorkerRequest != 0 {
  63. // Terminate the pending HTTP request handled by the worker
  64. maybeCloseContext(fc.currentWorkerRequest.Value().(*http.Request).Context().Value(contextKey).(*FrankenPHPContext))
  65. fc.currentWorkerRequest.Delete()
  66. fc.currentWorkerRequest = 0
  67. }
  68. // TODO: make the max restart configurable
  69. if _, ok := workersRequestChans.Load(absFileName); ok {
  70. workersReadyWG.Add(1)
  71. l.Error("unexpected termination, restarting", zap.String("worker", absFileName))
  72. } else {
  73. break
  74. }
  75. }
  76. // TODO: check if the termination is expected
  77. l.Debug("terminated", zap.String("worker", absFileName))
  78. }()
  79. }
  80. workersReadyWG.Wait()
  81. m.Lock()
  82. defer m.Unlock()
  83. if len(errors) == 0 {
  84. return nil
  85. }
  86. // Wrapping multiple errors will be available in Go 1.20: https://github.com/golang/go/issues/53435
  87. return fmt.Errorf("workers %q: error while starting: #%v", fileName, errors)
  88. }
  89. func stopWorkers() {
  90. workersRequestChans.Range(func(k, v any) bool {
  91. workersRequestChans.Delete(k)
  92. return true
  93. })
  94. }
  95. //export go_frankenphp_worker_ready
  96. func go_frankenphp_worker_ready() {
  97. workersReadyWG.Done()
  98. }
  99. //export go_frankenphp_worker_handle_request_start
  100. func go_frankenphp_worker_handle_request_start(mrh C.uintptr_t) C.uintptr_t {
  101. mainRequest := cgo.Handle(mrh).Value().(*http.Request)
  102. fc := mainRequest.Context().Value(contextKey).(*FrankenPHPContext)
  103. v, ok := workersRequestChans.Load(fc.Env["SCRIPT_FILENAME"])
  104. if !ok {
  105. // Probably shutting down
  106. return 0
  107. }
  108. rc := v.(chan *http.Request)
  109. l := getLogger()
  110. l.Debug("waiting for request", zap.String("worker", fc.Env["SCRIPT_FILENAME"]))
  111. var r *http.Request
  112. select {
  113. case <-done:
  114. l.Debug("shutting down", zap.String("worker", fc.Env["SCRIPT_FILENAME"]))
  115. return 0
  116. case r = <-rc:
  117. }
  118. fc.currentWorkerRequest = cgo.NewHandle(r)
  119. l.Debug("request handling started", zap.String("worker", fc.Env["SCRIPT_FILENAME"]), zap.String("url", r.RequestURI))
  120. if err := updateServerContext(r, false, mrh); err != nil {
  121. // Unexpected error
  122. l.Debug("unexpected error", zap.String("worker", fc.Env["SCRIPT_FILENAME"]), zap.String("url", r.RequestURI), zap.Error(err))
  123. return 0
  124. }
  125. return C.uintptr_t(fc.currentWorkerRequest)
  126. }
  127. //export go_frankenphp_finish_request
  128. func go_frankenphp_finish_request(mrh, rh C.uintptr_t, deleteHandle bool) {
  129. rHandle := cgo.Handle(rh)
  130. r := rHandle.Value().(*http.Request)
  131. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  132. if deleteHandle {
  133. rHandle.Delete()
  134. cgo.Handle(mrh).Value().(*http.Request).Context().Value(contextKey).(*FrankenPHPContext).currentWorkerRequest = 0
  135. }
  136. maybeCloseContext(fc)
  137. var fields []zap.Field
  138. if mrh == 0 {
  139. fields = append(fields, zap.String("worker", fc.Env["SCRIPT_FILENAME"]), zap.String("url", r.RequestURI))
  140. } else {
  141. fields = append(fields, zap.String("url", r.RequestURI))
  142. }
  143. fc.Logger.Debug("request handling finished", fields...)
  144. }