state.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package frankenphp
  2. import (
  3. "slices"
  4. "sync"
  5. "time"
  6. )
  7. type stateID uint8
  8. const (
  9. // livecycle states of a thread
  10. stateReserved stateID = iota
  11. stateBooting
  12. stateShuttingDown
  13. stateDone
  14. // these states are 'stable' and safe to transition from at any time
  15. stateInactive
  16. stateReady
  17. // states necessary for restarting workers
  18. stateRestarting
  19. stateYielding
  20. stateOpcacheReset
  21. // states necessary for transitioning between different handlers
  22. stateTransitionRequested
  23. stateTransitionInProgress
  24. stateTransitionComplete
  25. )
  26. var stateNames = map[stateID]string{
  27. stateReserved: "reserved",
  28. stateBooting: "booting",
  29. stateInactive: "inactive",
  30. stateReady: "ready",
  31. stateShuttingDown: "shutting down",
  32. stateDone: "done",
  33. stateRestarting: "restarting",
  34. stateYielding: "yielding",
  35. stateTransitionRequested: "transition requested",
  36. stateTransitionInProgress: "transition in progress",
  37. stateTransitionComplete: "transition complete",
  38. stateOpcacheReset: "opcache reset",
  39. }
  40. type threadState struct {
  41. currentState stateID
  42. mu sync.RWMutex
  43. subscribers []stateSubscriber
  44. // how long threads have been waiting in stable states
  45. waitingSince time.Time
  46. isWaiting bool
  47. }
  48. type stateSubscriber struct {
  49. states []stateID
  50. ch chan struct{}
  51. }
  52. func newThreadState() *threadState {
  53. return &threadState{
  54. currentState: stateReserved,
  55. subscribers: []stateSubscriber{},
  56. mu: sync.RWMutex{},
  57. }
  58. }
  59. func (ts *threadState) is(state stateID) bool {
  60. ts.mu.RLock()
  61. ok := ts.currentState == state
  62. ts.mu.RUnlock()
  63. return ok
  64. }
  65. func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool {
  66. ts.mu.Lock()
  67. ok := ts.currentState == compareTo
  68. if ok {
  69. ts.currentState = swapTo
  70. ts.notifySubscribers(swapTo)
  71. }
  72. ts.mu.Unlock()
  73. return ok
  74. }
  75. func (ts *threadState) name() string {
  76. return stateNames[ts.get()]
  77. }
  78. func (ts *threadState) get() stateID {
  79. ts.mu.RLock()
  80. id := ts.currentState
  81. ts.mu.RUnlock()
  82. return id
  83. }
  84. func (ts *threadState) set(nextState stateID) {
  85. ts.mu.Lock()
  86. ts.currentState = nextState
  87. ts.notifySubscribers(nextState)
  88. ts.mu.Unlock()
  89. }
  90. func (ts *threadState) notifySubscribers(nextState stateID) {
  91. if len(ts.subscribers) == 0 {
  92. return
  93. }
  94. newSubscribers := []stateSubscriber{}
  95. // notify subscribers to the state change
  96. for _, sub := range ts.subscribers {
  97. if !slices.Contains(sub.states, nextState) {
  98. newSubscribers = append(newSubscribers, sub)
  99. continue
  100. }
  101. close(sub.ch)
  102. }
  103. ts.subscribers = newSubscribers
  104. }
  105. // block until the thread reaches a certain state
  106. func (ts *threadState) waitFor(states ...stateID) {
  107. ts.mu.Lock()
  108. if slices.Contains(states, ts.currentState) {
  109. ts.mu.Unlock()
  110. return
  111. }
  112. sub := stateSubscriber{
  113. states: states,
  114. ch: make(chan struct{}),
  115. }
  116. ts.subscribers = append(ts.subscribers, sub)
  117. ts.mu.Unlock()
  118. <-sub.ch
  119. }
  120. // safely request a state change from a different goroutine
  121. func (ts *threadState) requestSafeStateChange(nextState stateID) bool {
  122. ts.mu.Lock()
  123. switch ts.currentState {
  124. // disallow state changes if shutting down or done
  125. case stateShuttingDown, stateDone, stateReserved:
  126. ts.mu.Unlock()
  127. return false
  128. // ready and inactive are safe states to transition from
  129. case stateReady, stateInactive:
  130. ts.currentState = nextState
  131. ts.notifySubscribers(nextState)
  132. ts.mu.Unlock()
  133. return true
  134. }
  135. ts.mu.Unlock()
  136. // wait for the state to change to a safe state
  137. ts.waitFor(stateReady, stateInactive, stateShuttingDown)
  138. return ts.requestSafeStateChange(nextState)
  139. }
  140. // the thread reached a stable state and is waiting for requests or shutdown
  141. func (ts *threadState) markAsWaiting(isWaiting bool) {
  142. ts.mu.Lock()
  143. if isWaiting {
  144. ts.isWaiting = true
  145. ts.waitingSince = time.Now()
  146. } else {
  147. ts.isWaiting = false
  148. }
  149. ts.mu.Unlock()
  150. }
  151. // the time since the thread is waiting in a stable state in ms
  152. func (ts *threadState) waitTime() int64 {
  153. ts.mu.RLock()
  154. waitTime := int64(0)
  155. if ts.isWaiting {
  156. waitTime = time.Now().UnixMilli() - ts.waitingSince.UnixMilli()
  157. }
  158. ts.mu.RUnlock()
  159. return waitTime
  160. }