123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690 |
- package puddle
- import (
- "context"
- "errors"
- "sync"
- "sync/atomic"
- "time"
- "github.com/jackc/puddle/v2/internal/genstack"
- "golang.org/x/sync/semaphore"
- )
- const (
- resourceStatusConstructing = 0
- resourceStatusIdle = iota
- resourceStatusAcquired = iota
- resourceStatusHijacked = iota
- )
- // ErrClosedPool occurs on an attempt to acquire a connection from a closed pool
- // or a pool that is closed while the acquire is waiting.
- var ErrClosedPool = errors.New("closed pool")
- // ErrNotAvailable occurs on an attempt to acquire a resource from a pool
- // that is at maximum capacity and has no available resources.
- var ErrNotAvailable = errors.New("resource not available")
- // Constructor is a function called by the pool to construct a resource.
- type Constructor[T any] func(ctx context.Context) (res T, err error)
- // Destructor is a function called by the pool to destroy a resource.
- type Destructor[T any] func(res T)
- // Resource is the resource handle returned by acquiring from the pool.
- type Resource[T any] struct {
- value T
- pool *Pool[T]
- creationTime time.Time
- lastUsedNano int64
- poolResetCount int
- status byte
- }
- // Value returns the resource value.
- func (res *Resource[T]) Value() T {
- if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
- panic("tried to access resource that is not acquired or hijacked")
- }
- return res.value
- }
- // Release returns the resource to the pool. res must not be subsequently used.
- func (res *Resource[T]) Release() {
- if res.status != resourceStatusAcquired {
- panic("tried to release resource that is not acquired")
- }
- res.pool.releaseAcquiredResource(res, nanotime())
- }
- // ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime
- // will not change. res must not be subsequently used.
- func (res *Resource[T]) ReleaseUnused() {
- if res.status != resourceStatusAcquired {
- panic("tried to release resource that is not acquired")
- }
- res.pool.releaseAcquiredResource(res, res.lastUsedNano)
- }
- // Destroy returns the resource to the pool for destruction. res must not be
- // subsequently used.
- func (res *Resource[T]) Destroy() {
- if res.status != resourceStatusAcquired {
- panic("tried to destroy resource that is not acquired")
- }
- go res.pool.destroyAcquiredResource(res)
- }
- // Hijack assumes ownership of the resource from the pool. Caller is responsible
- // for cleanup of resource value.
- func (res *Resource[T]) Hijack() {
- if res.status != resourceStatusAcquired {
- panic("tried to hijack resource that is not acquired")
- }
- res.pool.hijackAcquiredResource(res)
- }
- // CreationTime returns when the resource was created by the pool.
- func (res *Resource[T]) CreationTime() time.Time {
- if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
- panic("tried to access resource that is not acquired or hijacked")
- }
- return res.creationTime
- }
- // LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time
- // (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with
- // other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead.
- func (res *Resource[T]) LastUsedNanotime() int64 {
- if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
- panic("tried to access resource that is not acquired or hijacked")
- }
- return res.lastUsedNano
- }
- // IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting
- // LastUsedNanotime to the current nanotime.
- func (res *Resource[T]) IdleDuration() time.Duration {
- if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
- panic("tried to access resource that is not acquired or hijacked")
- }
- return time.Duration(nanotime() - res.lastUsedNano)
- }
- // Pool is a concurrency-safe resource pool.
- type Pool[T any] struct {
- // mux is the pool internal lock. Any modification of shared state of
- // the pool (but Acquires of acquireSem) must be performed only by
- // holder of the lock. Long running operations are not allowed when mux
- // is held.
- mux sync.Mutex
- // acquireSem provides an allowance to acquire a resource.
- //
- // Releases are allowed only when caller holds mux. Acquires have to
- // happen before mux is locked (doesn't apply to semaphore.TryAcquire in
- // AcquireAllIdle).
- acquireSem *semaphore.Weighted
- destructWG sync.WaitGroup
- allResources resList[T]
- idleResources *genstack.GenStack[*Resource[T]]
- constructor Constructor[T]
- destructor Destructor[T]
- maxSize int32
- acquireCount int64
- acquireDuration time.Duration
- emptyAcquireCount int64
- canceledAcquireCount atomic.Int64
- resetCount int
- baseAcquireCtx context.Context
- cancelBaseAcquireCtx context.CancelFunc
- closed bool
- }
- type Config[T any] struct {
- Constructor Constructor[T]
- Destructor Destructor[T]
- MaxSize int32
- }
- // NewPool creates a new pool. Panics if maxSize is less than 1.
- func NewPool[T any](config *Config[T]) (*Pool[T], error) {
- if config.MaxSize < 1 {
- return nil, errors.New("MaxSize must be >= 1")
- }
- baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background())
- return &Pool[T]{
- acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
- idleResources: genstack.NewGenStack[*Resource[T]](),
- maxSize: config.MaxSize,
- constructor: config.Constructor,
- destructor: config.Destructor,
- baseAcquireCtx: baseAcquireCtx,
- cancelBaseAcquireCtx: cancelBaseAcquireCtx,
- }, nil
- }
- // Close destroys all resources in the pool and rejects future Acquire calls.
- // Blocks until all resources are returned to pool and destroyed.
- func (p *Pool[T]) Close() {
- defer p.destructWG.Wait()
- p.mux.Lock()
- defer p.mux.Unlock()
- if p.closed {
- return
- }
- p.closed = true
- p.cancelBaseAcquireCtx()
- for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
- p.allResources.remove(res)
- go p.destructResourceValue(res.value)
- }
- }
- // Stat is a snapshot of Pool statistics.
- type Stat struct {
- constructingResources int32
- acquiredResources int32
- idleResources int32
- maxResources int32
- acquireCount int64
- acquireDuration time.Duration
- emptyAcquireCount int64
- canceledAcquireCount int64
- }
- // TotalResources returns the total number of resources currently in the pool.
- // The value is the sum of ConstructingResources, AcquiredResources, and
- // IdleResources.
- func (s *Stat) TotalResources() int32 {
- return s.constructingResources + s.acquiredResources + s.idleResources
- }
- // ConstructingResources returns the number of resources with construction in progress in
- // the pool.
- func (s *Stat) ConstructingResources() int32 {
- return s.constructingResources
- }
- // AcquiredResources returns the number of currently acquired resources in the pool.
- func (s *Stat) AcquiredResources() int32 {
- return s.acquiredResources
- }
- // IdleResources returns the number of currently idle resources in the pool.
- func (s *Stat) IdleResources() int32 {
- return s.idleResources
- }
- // MaxResources returns the maximum size of the pool.
- func (s *Stat) MaxResources() int32 {
- return s.maxResources
- }
- // AcquireCount returns the cumulative count of successful acquires from the pool.
- func (s *Stat) AcquireCount() int64 {
- return s.acquireCount
- }
- // AcquireDuration returns the total duration of all successful acquires from
- // the pool.
- func (s *Stat) AcquireDuration() time.Duration {
- return s.acquireDuration
- }
- // EmptyAcquireCount returns the cumulative count of successful acquires from the pool
- // that waited for a resource to be released or constructed because the pool was
- // empty.
- func (s *Stat) EmptyAcquireCount() int64 {
- return s.emptyAcquireCount
- }
- // CanceledAcquireCount returns the cumulative count of acquires from the pool
- // that were canceled by a context.
- func (s *Stat) CanceledAcquireCount() int64 {
- return s.canceledAcquireCount
- }
- // Stat returns the current pool statistics.
- func (p *Pool[T]) Stat() *Stat {
- p.mux.Lock()
- defer p.mux.Unlock()
- s := &Stat{
- maxResources: p.maxSize,
- acquireCount: p.acquireCount,
- emptyAcquireCount: p.emptyAcquireCount,
- canceledAcquireCount: p.canceledAcquireCount.Load(),
- acquireDuration: p.acquireDuration,
- }
- for _, res := range p.allResources {
- switch res.status {
- case resourceStatusConstructing:
- s.constructingResources += 1
- case resourceStatusIdle:
- s.idleResources += 1
- case resourceStatusAcquired:
- s.acquiredResources += 1
- }
- }
- return s
- }
- // tryAcquireIdleResource checks if there is any idle resource. If there is
- // some, this method removes it from idle list and returns it. If the idle pool
- // is empty, this method returns nil and doesn't modify the idleResources slice.
- //
- // WARNING: Caller of this method must hold the pool mutex!
- func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] {
- res, ok := p.idleResources.Pop()
- if !ok {
- return nil
- }
- res.status = resourceStatusAcquired
- return res
- }
- // createNewResource creates a new resource and inserts it into list of pool
- // resources.
- //
- // WARNING: Caller of this method must hold the pool mutex!
- func (p *Pool[T]) createNewResource() *Resource[T] {
- res := &Resource[T]{
- pool: p,
- creationTime: time.Now(),
- lastUsedNano: nanotime(),
- poolResetCount: p.resetCount,
- status: resourceStatusConstructing,
- }
- p.allResources.append(res)
- p.destructWG.Add(1)
- return res
- }
- // Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will
- // create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be
- // used to cancel the Acquire.
- //
- // If Acquire creates a new resource the resource constructor function will receive a context that delegates Value() to
- // ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids
- // the problem of it being impossible to create resources when the time to create a resource is greater than any one
- // caller of Acquire is willing to wait.
- func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) {
- select {
- case <-ctx.Done():
- p.canceledAcquireCount.Add(1)
- return nil, ctx.Err()
- default:
- }
- return p.acquire(ctx)
- }
- // acquire is a continuation of Acquire function that doesn't check context
- // validity.
- //
- // This function exists solely only for benchmarking purposes.
- func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
- startNano := nanotime()
- var waitedForLock bool
- if !p.acquireSem.TryAcquire(1) {
- waitedForLock = true
- err := p.acquireSem.Acquire(ctx, 1)
- if err != nil {
- p.canceledAcquireCount.Add(1)
- return nil, err
- }
- }
- p.mux.Lock()
- if p.closed {
- p.acquireSem.Release(1)
- p.mux.Unlock()
- return nil, ErrClosedPool
- }
- // If a resource is available in the pool.
- if res := p.tryAcquireIdleResource(); res != nil {
- if waitedForLock {
- p.emptyAcquireCount += 1
- }
- p.acquireCount += 1
- p.acquireDuration += time.Duration(nanotime() - startNano)
- p.mux.Unlock()
- return res, nil
- }
- if len(p.allResources) >= int(p.maxSize) {
- // Unreachable code.
- panic("bug: semaphore allowed more acquires than pool allows")
- }
- // The resource is not idle, but there is enough space to create one.
- res := p.createNewResource()
- p.mux.Unlock()
- res, err := p.initResourceValue(ctx, res)
- if err != nil {
- return nil, err
- }
- p.mux.Lock()
- defer p.mux.Unlock()
- p.emptyAcquireCount += 1
- p.acquireCount += 1
- p.acquireDuration += time.Duration(nanotime() - startNano)
- return res, nil
- }
- func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Resource[T], error) {
- // Create the resource in a goroutine to immediately return from Acquire
- // if ctx is canceled without also canceling the constructor.
- //
- // See:
- // - https://github.com/jackc/pgx/issues/1287
- // - https://github.com/jackc/pgx/issues/1259
- constructErrChan := make(chan error)
- go func() {
- constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx)
- value, err := p.constructor(constructorCtx)
- if err != nil {
- p.mux.Lock()
- p.allResources.remove(res)
- p.destructWG.Done()
- // The resource won't be acquired because its
- // construction failed. We have to allow someone else to
- // take that resouce.
- p.acquireSem.Release(1)
- p.mux.Unlock()
- select {
- case constructErrChan <- err:
- case <-ctx.Done():
- // The caller is cancelled, so no-one awaits the
- // error. This branch avoid goroutine leak.
- }
- return
- }
- // The resource is already in p.allResources where it might be read. So we need to acquire the lock to update its
- // status.
- p.mux.Lock()
- res.value = value
- res.status = resourceStatusAcquired
- p.mux.Unlock()
- // This select works because the channel is unbuffered.
- select {
- case constructErrChan <- nil:
- case <-ctx.Done():
- p.releaseAcquiredResource(res, res.lastUsedNano)
- }
- }()
- select {
- case <-ctx.Done():
- p.canceledAcquireCount.Add(1)
- return nil, ctx.Err()
- case err := <-constructErrChan:
- if err != nil {
- return nil, err
- }
- return res, nil
- }
- }
- // TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no
- // resources are available but the pool has room to grow, a resource will be created in the background. ctx is only
- // used to cancel the background creation.
- func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
- if !p.acquireSem.TryAcquire(1) {
- return nil, ErrNotAvailable
- }
- p.mux.Lock()
- defer p.mux.Unlock()
- if p.closed {
- p.acquireSem.Release(1)
- return nil, ErrClosedPool
- }
- // If a resource is available now
- if res := p.tryAcquireIdleResource(); res != nil {
- p.acquireCount += 1
- return res, nil
- }
- if len(p.allResources) >= int(p.maxSize) {
- // Unreachable code.
- panic("bug: semaphore allowed more acquires than pool allows")
- }
- res := p.createNewResource()
- go func() {
- value, err := p.constructor(ctx)
- p.mux.Lock()
- defer p.mux.Unlock()
- // We have to create the resource and only then release the
- // semaphore - For the time being there is no resource that
- // someone could acquire.
- defer p.acquireSem.Release(1)
- if err != nil {
- p.allResources.remove(res)
- p.destructWG.Done()
- return
- }
- res.value = value
- res.status = resourceStatusIdle
- p.idleResources.Push(res)
- }()
- return nil, ErrNotAvailable
- }
- // acquireSemAll tries to acquire num free tokens from sem. This function is
- // guaranteed to acquire at least the lowest number of tokens that has been
- // available in the semaphore during runtime of this function.
- //
- // For the time being, semaphore doesn't allow to acquire all tokens atomically
- // (see https://github.com/golang/sync/pull/19). We simulate this by trying all
- // powers of 2 that are less or equal to num.
- //
- // For example, let's immagine we have 19 free tokens in the semaphore which in
- // total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then if
- // num is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1
- // tokens. Out of those, the acquire of 16, 2 and 1 tokens will succeed.
- //
- // Naturally, Acquires and Releases of the semaphore might take place
- // concurrently. For this reason, it's not guaranteed that absolutely all free
- // tokens in the semaphore will be acquired. But it's guaranteed that at least
- // the minimal number of tokens that has been present over the whole process
- // will be acquired. This is sufficient for the use-case we have in this
- // package.
- //
- // TODO: Replace this with acquireSem.TryAcquireAll() if it gets to
- // upstream. https://github.com/golang/sync/pull/19
- func acquireSemAll(sem *semaphore.Weighted, num int) int {
- if sem.TryAcquire(int64(num)) {
- return num
- }
- var acquired int
- for i := int(log2Int(num)); i >= 0; i-- {
- val := 1 << i
- if sem.TryAcquire(int64(val)) {
- acquired += val
- }
- }
- return acquired
- }
- // AcquireAllIdle acquires all currently idle resources. Its intended use is for
- // health check and keep-alive functionality. It does not update pool
- // statistics.
- func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
- p.mux.Lock()
- defer p.mux.Unlock()
- if p.closed {
- return nil
- }
- numIdle := p.idleResources.Len()
- if numIdle == 0 {
- return nil
- }
- // In acquireSemAll we use only TryAcquire and not Acquire. Because
- // TryAcquire cannot block, the fact that we hold mutex locked and try
- // to acquire semaphore cannot result in dead-lock.
- //
- // Because the mutex is locked, no parallel Release can run. This
- // implies that the number of tokens can only decrease because some
- // Acquire/TryAcquire call can consume the semaphore token. Consequently
- // acquired is always less or equal to numIdle. Moreover if acquired <
- // numIdle, then there are some parallel Acquire/TryAcquire calls that
- // will take the remaining idle connections.
- acquired := acquireSemAll(p.acquireSem, numIdle)
- idle := make([]*Resource[T], acquired)
- for i := range idle {
- res, _ := p.idleResources.Pop()
- res.status = resourceStatusAcquired
- idle[i] = res
- }
- // We have to bump the generation to ensure that Acquire/TryAcquire
- // calls running in parallel (those which caused acquired < numIdle)
- // will consume old connections and not freshly released connections
- // instead.
- p.idleResources.NextGen()
- return idle
- }
- // CreateResource constructs a new resource without acquiring it.
- // It goes straight in the IdlePool. It does not check against maxSize.
- // It can be useful to maintain warm resources under little load.
- func (p *Pool[T]) CreateResource(ctx context.Context) error {
- p.mux.Lock()
- if p.closed {
- p.mux.Unlock()
- return ErrClosedPool
- }
- p.destructWG.Add(1)
- p.mux.Unlock()
- value, err := p.constructor(ctx)
- if err != nil {
- p.destructWG.Done()
- return err
- }
- res := &Resource[T]{
- pool: p,
- creationTime: time.Now(),
- status: resourceStatusIdle,
- value: value,
- lastUsedNano: nanotime(),
- poolResetCount: p.resetCount,
- }
- p.mux.Lock()
- defer p.mux.Unlock()
- // If closed while constructing resource then destroy it and return an error
- if p.closed {
- go p.destructResourceValue(res.value)
- return ErrClosedPool
- }
- p.allResources.append(res)
- p.idleResources.Push(res)
- return nil
- }
- // Reset destroys all resources, but leaves the pool open. It is intended for use when an error is detected that would
- // disrupt all resources (such as a network interruption or a server state change).
- //
- // It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned
- // to the pool.
- func (p *Pool[T]) Reset() {
- p.mux.Lock()
- defer p.mux.Unlock()
- p.resetCount++
- for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
- p.allResources.remove(res)
- go p.destructResourceValue(res.value)
- }
- }
- // releaseAcquiredResource returns res to the the pool.
- func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
- p.mux.Lock()
- defer p.mux.Unlock()
- defer p.acquireSem.Release(1)
- if p.closed || res.poolResetCount != p.resetCount {
- p.allResources.remove(res)
- go p.destructResourceValue(res.value)
- } else {
- res.lastUsedNano = lastUsedNano
- res.status = resourceStatusIdle
- p.idleResources.Push(res)
- }
- }
- // Remove removes res from the pool and closes it. If res is not part of the
- // pool Remove will panic.
- func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
- p.destructResourceValue(res.value)
- p.mux.Lock()
- defer p.mux.Unlock()
- defer p.acquireSem.Release(1)
- p.allResources.remove(res)
- }
- func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
- p.mux.Lock()
- defer p.mux.Unlock()
- defer p.acquireSem.Release(1)
- p.allResources.remove(res)
- res.status = resourceStatusHijacked
- p.destructWG.Done() // not responsible for destructing hijacked resources
- }
- func (p *Pool[T]) destructResourceValue(value T) {
- p.destructor(value)
- p.destructWG.Done()
- }
|