thread-regular.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package frankenphp
  2. import (
  3. "net/http"
  4. "sync"
  5. )
  6. // representation of a non-worker PHP thread
  7. // executes PHP scripts in a web context
  8. // implements the threadHandler interface
  9. type regularThread struct {
  10. state *threadState
  11. thread *phpThread
  12. activeRequest *http.Request
  13. }
  14. var (
  15. regularThreads []*phpThread
  16. regularThreadMu = &sync.RWMutex{}
  17. regularRequestChan chan *http.Request
  18. )
  19. func convertToRegularThread(thread *phpThread) {
  20. thread.setHandler(&regularThread{
  21. thread: thread,
  22. state: thread.state,
  23. })
  24. attachRegularThread(thread)
  25. }
  26. // beforeScriptExecution returns the name of the script or an empty string on shutdown
  27. func (handler *regularThread) beforeScriptExecution() string {
  28. switch handler.state.get() {
  29. case stateTransitionRequested:
  30. detachRegularThread(handler.thread)
  31. return handler.thread.transitionToNewHandler()
  32. case stateTransitionComplete:
  33. handler.state.set(stateReady)
  34. return handler.waitForRequest()
  35. case stateReady:
  36. return handler.waitForRequest()
  37. case stateShuttingDown:
  38. detachRegularThread(handler.thread)
  39. // signal to stop
  40. return ""
  41. }
  42. panic("unexpected state: " + handler.state.name())
  43. }
  44. // return true if the worker should continue to run
  45. func (handler *regularThread) afterScriptExecution(exitStatus int) {
  46. handler.afterRequest(exitStatus)
  47. }
  48. func (handler *regularThread) getActiveRequest() *http.Request {
  49. return handler.activeRequest
  50. }
  51. func (handler *regularThread) setActiveRequest(r *http.Request) {
  52. handler.thread.requestMu.Lock()
  53. handler.activeRequest = r
  54. handler.thread.requestMu.Unlock()
  55. }
  56. func (handler *regularThread) name() string {
  57. return "Regular PHP Thread"
  58. }
  59. func (handler *regularThread) waitForRequest() string {
  60. handler.state.markAsWaiting(true)
  61. var r *http.Request
  62. select {
  63. case <-handler.thread.drainChan:
  64. // go back to beforeScriptExecution
  65. return handler.beforeScriptExecution()
  66. case r = <-regularRequestChan:
  67. }
  68. handler.setActiveRequest(r)
  69. handler.state.markAsWaiting(false)
  70. fc := r.Context().Value(contextKey).(*FrankenPHPContext)
  71. if err := updateServerContext(handler.thread, r, true, false); err != nil {
  72. rejectRequest(fc.responseWriter, err.Error())
  73. handler.afterRequest(0)
  74. handler.thread.Unpin()
  75. // go back to beforeScriptExecution
  76. return handler.beforeScriptExecution()
  77. }
  78. // set the scriptFilename that should be executed
  79. return fc.scriptFilename
  80. }
  81. func (handler *regularThread) afterRequest(exitStatus int) {
  82. fc := handler.activeRequest.Context().Value(contextKey).(*FrankenPHPContext)
  83. fc.exitStatus = exitStatus
  84. maybeCloseContext(fc)
  85. handler.setActiveRequest(nil)
  86. }
  87. func handleRequestWithRegularPHPThreads(r *http.Request, fc *FrankenPHPContext) {
  88. metrics.StartRequest()
  89. select {
  90. case regularRequestChan <- r:
  91. // a thread was available to handle the request immediately
  92. <-fc.done
  93. metrics.StopRequest()
  94. return
  95. default:
  96. // no thread was available
  97. }
  98. // if no thread was available, mark the request as queued and fan it out to all threads
  99. metrics.QueuedRequest()
  100. for {
  101. select {
  102. case regularRequestChan <- r:
  103. metrics.DequeuedRequest()
  104. <-fc.done
  105. metrics.StopRequest()
  106. return
  107. case scaleChan <- fc:
  108. // the request has triggered scaling, continue to wait for a thread
  109. }
  110. }
  111. }
  112. func attachRegularThread(thread *phpThread) {
  113. regularThreadMu.Lock()
  114. regularThreads = append(regularThreads, thread)
  115. regularThreadMu.Unlock()
  116. }
  117. func detachRegularThread(thread *phpThread) {
  118. regularThreadMu.Lock()
  119. for i, t := range regularThreads {
  120. if t == thread {
  121. regularThreads = append(regularThreads[:i], regularThreads[i+1:]...)
  122. break
  123. }
  124. }
  125. regularThreadMu.Unlock()
  126. }