123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- package frankenphp
- import (
- "slices"
- "sync"
- "time"
- )
- type stateID uint8
- const (
- // livecycle states of a thread
- stateReserved stateID = iota
- stateBooting
- stateShuttingDown
- stateDone
- // these states are 'stable' and safe to transition from at any time
- stateInactive
- stateReady
- // states necessary for restarting workers
- stateRestarting
- stateYielding
- // states necessary for transitioning between different handlers
- stateTransitionRequested
- stateTransitionInProgress
- stateTransitionComplete
- )
- var stateNames = map[stateID]string{
- stateReserved: "reserved",
- stateBooting: "booting",
- stateInactive: "inactive",
- stateReady: "ready",
- stateShuttingDown: "shutting down",
- stateDone: "done",
- stateRestarting: "restarting",
- stateYielding: "yielding",
- stateTransitionRequested: "transition requested",
- stateTransitionInProgress: "transition in progress",
- stateTransitionComplete: "transition complete",
- }
- type threadState struct {
- currentState stateID
- mu sync.RWMutex
- subscribers []stateSubscriber
- // how long threads have been waiting in stable states
- waitingSince time.Time
- isWaiting bool
- }
- type stateSubscriber struct {
- states []stateID
- ch chan struct{}
- }
- func newThreadState() *threadState {
- return &threadState{
- currentState: stateReserved,
- subscribers: []stateSubscriber{},
- mu: sync.RWMutex{},
- }
- }
- func (ts *threadState) is(state stateID) bool {
- ts.mu.RLock()
- ok := ts.currentState == state
- ts.mu.RUnlock()
- return ok
- }
- func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool {
- ts.mu.Lock()
- ok := ts.currentState == compareTo
- if ok {
- ts.currentState = swapTo
- ts.notifySubscribers(swapTo)
- }
- ts.mu.Unlock()
- return ok
- }
- func (ts *threadState) name() string {
- return stateNames[ts.get()]
- }
- func (ts *threadState) get() stateID {
- ts.mu.RLock()
- id := ts.currentState
- ts.mu.RUnlock()
- return id
- }
- func (ts *threadState) set(nextState stateID) {
- ts.mu.Lock()
- ts.currentState = nextState
- ts.notifySubscribers(nextState)
- ts.mu.Unlock()
- }
- func (ts *threadState) notifySubscribers(nextState stateID) {
- if len(ts.subscribers) == 0 {
- return
- }
- newSubscribers := []stateSubscriber{}
- // notify subscribers to the state change
- for _, sub := range ts.subscribers {
- if !slices.Contains(sub.states, nextState) {
- newSubscribers = append(newSubscribers, sub)
- continue
- }
- close(sub.ch)
- }
- ts.subscribers = newSubscribers
- }
- // block until the thread reaches a certain state
- func (ts *threadState) waitFor(states ...stateID) {
- ts.mu.Lock()
- if slices.Contains(states, ts.currentState) {
- ts.mu.Unlock()
- return
- }
- sub := stateSubscriber{
- states: states,
- ch: make(chan struct{}),
- }
- ts.subscribers = append(ts.subscribers, sub)
- ts.mu.Unlock()
- <-sub.ch
- }
- // safely request a state change from a different goroutine
- func (ts *threadState) requestSafeStateChange(nextState stateID) bool {
- ts.mu.Lock()
- switch ts.currentState {
- // disallow state changes if shutting down or done
- case stateShuttingDown, stateDone, stateReserved:
- ts.mu.Unlock()
- return false
- // ready and inactive are safe states to transition from
- case stateReady, stateInactive:
- ts.currentState = nextState
- ts.notifySubscribers(nextState)
- ts.mu.Unlock()
- return true
- }
- ts.mu.Unlock()
- // wait for the state to change to a safe state
- ts.waitFor(stateReady, stateInactive, stateShuttingDown)
- return ts.requestSafeStateChange(nextState)
- }
- // the thread reached a stable state and is waiting for requests or shutdown
- func (ts *threadState) markAsWaiting(isWaiting bool) {
- ts.mu.Lock()
- if isWaiting {
- ts.isWaiting = true
- ts.waitingSince = time.Now()
- } else {
- ts.isWaiting = false
- }
- ts.mu.Unlock()
- }
- // the time since the thread is waiting in a stable state in ms
- func (ts *threadState) waitTime() int64 {
- ts.mu.RLock()
- waitTime := int64(0)
- if ts.isWaiting {
- waitTime = time.Now().UnixMilli() - ts.waitingSince.UnixMilli()
- }
- ts.mu.RUnlock()
- return waitTime
- }
|