123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- package frankenphp
- import (
- "slices"
- "strconv"
- "sync"
- )
- type stateID uint8
- const (
- // lifecycle states of a thread
- stateBooting stateID = iota
- stateShuttingDown
- stateDone
- // these states are safe to transition from at any time
- stateInactive
- stateReady
- // states necessary for restarting workers
- stateRestarting
- stateYielding
- // states necessary for transitioning between different handlers
- stateTransitionRequested
- stateTransitionInProgress
- stateTransitionComplete
- )
- type threadState struct {
- currentState stateID
- mu sync.RWMutex
- subscribers []stateSubscriber
- }
- type stateSubscriber struct {
- states []stateID
- ch chan struct{}
- }
- func newThreadState() *threadState {
- return &threadState{
- currentState: stateBooting,
- subscribers: []stateSubscriber{},
- mu: sync.RWMutex{},
- }
- }
- func (ts *threadState) is(state stateID) bool {
- ts.mu.RLock()
- ok := ts.currentState == state
- ts.mu.RUnlock()
- return ok
- }
- func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool {
- ts.mu.Lock()
- ok := ts.currentState == compareTo
- if ok {
- ts.currentState = swapTo
- ts.notifySubscribers(swapTo)
- }
- ts.mu.Unlock()
- return ok
- }
- func (ts *threadState) name() string {
- // TODO: return the actual name for logging/metrics
- return "state:" + strconv.Itoa(int(ts.get()))
- }
- func (ts *threadState) get() stateID {
- ts.mu.RLock()
- id := ts.currentState
- ts.mu.RUnlock()
- return id
- }
- func (ts *threadState) set(nextState stateID) {
- ts.mu.Lock()
- ts.currentState = nextState
- ts.notifySubscribers(nextState)
- ts.mu.Unlock()
- }
- func (ts *threadState) notifySubscribers(nextState stateID) {
- if len(ts.subscribers) == 0 {
- return
- }
- newSubscribers := []stateSubscriber{}
- // notify subscribers to the state change
- for _, sub := range ts.subscribers {
- if !slices.Contains(sub.states, nextState) {
- newSubscribers = append(newSubscribers, sub)
- continue
- }
- close(sub.ch)
- }
- ts.subscribers = newSubscribers
- }
- // block until the thread reaches a certain state
- func (ts *threadState) waitFor(states ...stateID) {
- ts.mu.Lock()
- if slices.Contains(states, ts.currentState) {
- ts.mu.Unlock()
- return
- }
- sub := stateSubscriber{
- states: states,
- ch: make(chan struct{}),
- }
- ts.subscribers = append(ts.subscribers, sub)
- ts.mu.Unlock()
- <-sub.ch
- }
- // safely request a state change from a different goroutine
- func (ts *threadState) requestSafeStateChange(nextState stateID) bool {
- ts.mu.Lock()
- switch ts.currentState {
- // disallow state changes if shutting down
- case stateShuttingDown, stateDone:
- ts.mu.Unlock()
- return false
- // ready and inactive are safe states to transition from
- case stateReady, stateInactive:
- ts.currentState = nextState
- ts.notifySubscribers(nextState)
- ts.mu.Unlock()
- return true
- }
- ts.mu.Unlock()
- // wait for the state to change to a safe state
- ts.waitFor(stateReady, stateInactive, stateShuttingDown)
- return ts.requestSafeStateChange(nextState)
- }
|