threadregular.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package frankenphp
  2. import (
  3. "net/http"
  4. "sync"
  5. )
  6. // representation of a non-worker PHP thread
  7. // executes PHP scripts in a web context
  8. // implements the threadHandler interface
  9. type regularThread struct {
  10. state *threadState
  11. thread *phpThread
  12. activeRequest *http.Request
  13. }
  14. var (
  15. regularThreads []*phpThread
  16. regularThreadMu = &sync.RWMutex{}
  17. regularRequestChan chan *http.Request
  18. )
  19. func convertToRegularThread(thread *phpThread) {
  20. thread.setHandler(&regularThread{
  21. thread: thread,
  22. state: thread.state,
  23. })
  24. attachRegularThread(thread)
  25. }
  26. // beforeScriptExecution returns the name of the script or an empty string on shutdown
  27. func (handler *regularThread) beforeScriptExecution() string {
  28. switch handler.state.get() {
  29. case stateTransitionRequested:
  30. detachRegularThread(handler.thread)
  31. return handler.thread.transitionToNewHandler()
  32. case stateTransitionComplete:
  33. handler.state.set(stateReady)
  34. return handler.waitForRequest()
  35. case stateReady:
  36. return handler.waitForRequest()
  37. case stateShuttingDown:
  38. detachRegularThread(handler.thread)
  39. // signal to stop
  40. return ""
  41. }
  42. panic("unexpected state: " + handler.state.name())
  43. }
  44. // return true if the worker should continue to run
  45. func (handler *regularThread) afterScriptExecution(exitStatus int) {
  46. handler.afterRequest(exitStatus)
  47. }
  48. func (handler *regularThread) getActiveRequest() *http.Request {
  49. return handler.activeRequest
  50. }
  51. func (handler *regularThread) name() string {
  52. return "Regular PHP Thread"
  53. }
  54. func (handler *regularThread) waitForRequest() string {
  55. handler.state.markAsWaiting(true)
  56. var r *http.Request
  57. select {
  58. case <-handler.thread.drainChan:
  59. // go back to beforeScriptExecution
  60. return handler.beforeScriptExecution()
  61. case r = <-regularRequestChan:
  62. }
  63. handler.activeRequest = r
  64. handler.state.markAsWaiting(false)
  65. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  66. if err := updateServerContext(handler.thread, r, true, false); err != nil {
  67. rejectRequest(fc.responseWriter, err.Error())
  68. handler.afterRequest(0)
  69. handler.thread.Unpin()
  70. // go back to beforeScriptExecution
  71. return handler.beforeScriptExecution()
  72. }
  73. // set the scriptFilename that should be executed
  74. return fc.scriptFilename
  75. }
  76. func (handler *regularThread) afterRequest(exitStatus int) {
  77. fc := handler.activeRequest.Context().Value(contextKey).(*FrankenPHPContext)
  78. fc.exitStatus = exitStatus
  79. maybeCloseContext(fc)
  80. handler.activeRequest = nil
  81. }
  82. func handleRequestWithRegularPHPThreads(r *http.Request, fc *FrankenPHPContext) {
  83. metrics.StartRequest()
  84. select {
  85. case regularRequestChan <- r:
  86. // a thread was available to handle the request immediately
  87. <-fc.done
  88. metrics.StopRequest()
  89. return
  90. default:
  91. // no thread was available
  92. }
  93. // if no thread was available, mark the request as queued and fan it out to all threads
  94. metrics.QueuedRequest()
  95. for {
  96. select {
  97. case regularRequestChan <- r:
  98. metrics.DequeuedRequest()
  99. <-fc.done
  100. metrics.StopRequest()
  101. return
  102. case scaleChan <- fc:
  103. // the request has triggered scaling, continue to wait for a thread
  104. }
  105. }
  106. }
  107. func attachRegularThread(thread *phpThread) {
  108. regularThreadMu.Lock()
  109. regularThreads = append(regularThreads, thread)
  110. regularThreadMu.Unlock()
  111. }
  112. func detachRegularThread(thread *phpThread) {
  113. regularThreadMu.Lock()
  114. for i, t := range regularThreads {
  115. if t == thread {
  116. regularThreads = append(regularThreads[:i], regularThreads[i+1:]...)
  117. break
  118. }
  119. }
  120. regularThreadMu.Unlock()
  121. }