thread_state.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package frankenphp
  2. import (
  3. "slices"
  4. "sync"
  5. )
  6. type threadState int
  7. const (
  8. stateBooting threadState = iota
  9. stateInactive
  10. stateActive
  11. stateReady
  12. stateWorking
  13. stateShuttingDown
  14. stateDone
  15. stateRestarting
  16. )
  17. type threadStateHandler struct {
  18. currentState threadState
  19. mu sync.RWMutex
  20. subscribers []stateSubscriber
  21. }
  22. type stateSubscriber struct {
  23. states []threadState
  24. ch chan struct{}
  25. yieldFor *sync.WaitGroup
  26. }
  27. func (h *threadStateHandler) is(state threadState) bool {
  28. h.mu.RLock()
  29. defer h.mu.RUnlock()
  30. return h.currentState == state
  31. }
  32. func (h *threadStateHandler) set(nextState threadState) {
  33. h.mu.Lock()
  34. defer h.mu.Unlock()
  35. if h.currentState == nextState {
  36. // TODO: do we return here or inform subscribers?
  37. // TODO: should we ever reach here?
  38. return
  39. }
  40. h.currentState = nextState
  41. if len(h.subscribers) == 0 {
  42. return
  43. }
  44. newSubscribers := []stateSubscriber{}
  45. // TODO: do we even need multiple subscribers?
  46. // notify subscribers to the state change
  47. for _, sub := range h.subscribers {
  48. if !slices.Contains(sub.states, nextState) {
  49. newSubscribers = append(newSubscribers, sub)
  50. continue
  51. }
  52. close(sub.ch)
  53. // yield for the subscriber
  54. if sub.yieldFor != nil {
  55. defer sub.yieldFor.Wait()
  56. }
  57. }
  58. h.subscribers = newSubscribers
  59. }
  60. // wait for the thread to reach a certain state
  61. func (h *threadStateHandler) waitFor(states ...threadState) {
  62. h.waitForStates(states, nil)
  63. }
  64. // make the thread yield to a WaitGroup once it reaches the state
  65. // this makes sure all threads are in sync both ways
  66. func (h *threadStateHandler) waitForAndYield(yieldFor *sync.WaitGroup, states ...threadState) {
  67. h.waitForStates(states, yieldFor)
  68. }
  69. // subscribe to a state and wait until the thread reaches it
  70. func (h *threadStateHandler) waitForStates(states []threadState, yieldFor *sync.WaitGroup) {
  71. h.mu.Lock()
  72. if slices.Contains(states, h.currentState) {
  73. h.mu.Unlock()
  74. return
  75. }
  76. sub := stateSubscriber{
  77. states: states,
  78. ch: make(chan struct{}),
  79. yieldFor: yieldFor,
  80. }
  81. h.subscribers = append(h.subscribers, sub)
  82. h.mu.Unlock()
  83. <-sub.ch
  84. }