state.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package frankenphp
  2. import (
  3. "slices"
  4. "sync"
  5. "time"
  6. )
  7. type stateID uint8
  8. const (
  9. // livecycle states of a thread
  10. stateReserved stateID = iota
  11. stateBooting
  12. stateBootRequested
  13. stateShuttingDown
  14. stateDone
  15. // these states are 'stable' and safe to transition from at any time
  16. stateInactive
  17. stateReady
  18. // states necessary for restarting workers
  19. stateRestarting
  20. stateYielding
  21. // states necessary for transitioning between different handlers
  22. stateTransitionRequested
  23. stateTransitionInProgress
  24. stateTransitionComplete
  25. )
  26. var stateNames = map[stateID]string{
  27. stateReserved: "reserved",
  28. stateBooting: "booting",
  29. stateInactive: "inactive",
  30. stateReady: "ready",
  31. stateShuttingDown: "shutting down",
  32. stateDone: "done",
  33. stateRestarting: "restarting",
  34. stateYielding: "yielding",
  35. stateTransitionRequested: "transition requested",
  36. stateTransitionInProgress: "transition in progress",
  37. stateTransitionComplete: "transition complete",
  38. }
  39. type threadState struct {
  40. currentState stateID
  41. mu sync.RWMutex
  42. subscribers []stateSubscriber
  43. // how long threads have been waiting in stable states
  44. waitingSince time.Time
  45. isWaiting bool
  46. }
  47. type stateSubscriber struct {
  48. states []stateID
  49. ch chan struct{}
  50. }
  51. func newThreadState() *threadState {
  52. return &threadState{
  53. currentState: stateReserved,
  54. subscribers: []stateSubscriber{},
  55. mu: sync.RWMutex{},
  56. }
  57. }
  58. func (ts *threadState) is(state stateID) bool {
  59. ts.mu.RLock()
  60. ok := ts.currentState == state
  61. ts.mu.RUnlock()
  62. return ok
  63. }
  64. func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool {
  65. ts.mu.Lock()
  66. ok := ts.currentState == compareTo
  67. if ok {
  68. ts.currentState = swapTo
  69. ts.notifySubscribers(swapTo)
  70. }
  71. ts.mu.Unlock()
  72. return ok
  73. }
  74. func (ts *threadState) name() string {
  75. return stateNames[ts.get()]
  76. }
  77. func (ts *threadState) get() stateID {
  78. ts.mu.RLock()
  79. id := ts.currentState
  80. ts.mu.RUnlock()
  81. return id
  82. }
  83. func (ts *threadState) set(nextState stateID) {
  84. ts.mu.Lock()
  85. ts.currentState = nextState
  86. ts.notifySubscribers(nextState)
  87. ts.mu.Unlock()
  88. }
  89. func (ts *threadState) notifySubscribers(nextState stateID) {
  90. if len(ts.subscribers) == 0 {
  91. return
  92. }
  93. newSubscribers := []stateSubscriber{}
  94. // notify subscribers to the state change
  95. for _, sub := range ts.subscribers {
  96. if !slices.Contains(sub.states, nextState) {
  97. newSubscribers = append(newSubscribers, sub)
  98. continue
  99. }
  100. close(sub.ch)
  101. }
  102. ts.subscribers = newSubscribers
  103. }
  104. // block until the thread reaches a certain state
  105. func (ts *threadState) waitFor(states ...stateID) {
  106. ts.mu.Lock()
  107. if slices.Contains(states, ts.currentState) {
  108. ts.mu.Unlock()
  109. return
  110. }
  111. sub := stateSubscriber{
  112. states: states,
  113. ch: make(chan struct{}),
  114. }
  115. ts.subscribers = append(ts.subscribers, sub)
  116. ts.mu.Unlock()
  117. <-sub.ch
  118. }
  119. // safely request a state change from a different goroutine
  120. func (ts *threadState) requestSafeStateChange(nextState stateID) bool {
  121. ts.mu.Lock()
  122. switch ts.currentState {
  123. // disallow state changes if shutting down or done
  124. case stateShuttingDown, stateDone, stateReserved:
  125. ts.mu.Unlock()
  126. return false
  127. // ready and inactive are safe states to transition from
  128. case stateReady, stateInactive:
  129. ts.currentState = nextState
  130. ts.notifySubscribers(nextState)
  131. ts.mu.Unlock()
  132. return true
  133. }
  134. ts.mu.Unlock()
  135. // wait for the state to change to a safe state
  136. ts.waitFor(stateReady, stateInactive, stateShuttingDown)
  137. return ts.requestSafeStateChange(nextState)
  138. }
  139. // markAsWaiting hints that the thread reached a stable state and is waiting for requests or shutdown
  140. func (ts *threadState) markAsWaiting(isWaiting bool) {
  141. ts.mu.Lock()
  142. if isWaiting {
  143. ts.isWaiting = true
  144. ts.waitingSince = time.Now()
  145. } else {
  146. ts.isWaiting = false
  147. }
  148. ts.mu.Unlock()
  149. }
  150. // waitTime returns the time since the thread is waiting in a stable state in ms
  151. func (ts *threadState) waitTime() int64 {
  152. ts.mu.RLock()
  153. waitTime := int64(0)
  154. if ts.isWaiting {
  155. waitTime = time.Now().UnixMilli() - ts.waitingSince.UnixMilli()
  156. }
  157. ts.mu.RUnlock()
  158. return waitTime
  159. }