123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- package resource_pool
- import (
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- )
- type idleHandle struct {
- handle interface{}
- keepUntil *time.Time
- }
- type TooManyHandles struct {
- location string
- }
- func (t TooManyHandles) Error() string {
- return fmt.Sprintf("Too many handles to %s", t.location)
- }
- type OpenHandleError struct {
- location string
- err error
- }
- func (o OpenHandleError) Error() string {
- return fmt.Sprintf("Failed to open resource handle: %s (%v)", o.location, o.err)
- }
- // A resource pool implementation where all handles are associated to the
- // same resource location.
- type simpleResourcePool struct {
- options Options
- numActive *int32 // atomic counter
- activeHighWaterMark *int32 // atomic / monotonically increasing value
- openTokens Semaphore
- mutex sync.Mutex
- location string // guard by mutex
- idleHandles []*idleHandle // guarded by mutex
- isLameDuck bool // guarded by mutex
- }
- // This returns a SimpleResourcePool, where all handles are associated to a
- // single resource location.
- func NewSimpleResourcePool(options Options) ResourcePool {
- numActive := new(int32)
- atomic.StoreInt32(numActive, 0)
- activeHighWaterMark := new(int32)
- atomic.StoreInt32(activeHighWaterMark, 0)
- var tokens Semaphore
- if options.OpenMaxConcurrency > 0 {
- tokens = NewBoundedSemaphore(uint(options.OpenMaxConcurrency))
- }
- return &simpleResourcePool{
- location: "",
- options: options,
- numActive: numActive,
- activeHighWaterMark: activeHighWaterMark,
- openTokens: tokens,
- mutex: sync.Mutex{},
- idleHandles: make([]*idleHandle, 0, 0),
- isLameDuck: false,
- }
- }
- // See ResourcePool for documentation.
- func (p *simpleResourcePool) NumActive() int32 {
- return atomic.LoadInt32(p.numActive)
- }
- // See ResourcePool for documentation.
- func (p *simpleResourcePool) ActiveHighWaterMark() int32 {
- return atomic.LoadInt32(p.activeHighWaterMark)
- }
- // See ResourcePool for documentation.
- func (p *simpleResourcePool) NumIdle() int {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- return len(p.idleHandles)
- }
- // SimpleResourcePool can only register a single (network, address) entry.
- // Register should be call before any Get calls.
- func (p *simpleResourcePool) Register(resourceLocation string) error {
- if resourceLocation == "" {
- return errors.New("Invalid resource location")
- }
- p.mutex.Lock()
- defer p.mutex.Unlock()
- if p.isLameDuck {
- return fmt.Errorf(
- "cannot register %s to lame duck resource pool",
- resourceLocation)
- }
- if p.location == "" {
- p.location = resourceLocation
- return nil
- }
- return errors.New("SimpleResourcePool can only register one location")
- }
- // SimpleResourcePool will enter lame duck mode upon calling Unregister.
- func (p *simpleResourcePool) Unregister(resourceLocation string) error {
- p.EnterLameDuckMode()
- return nil
- }
- func (p *simpleResourcePool) ListRegistered() []string {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- if p.location != "" {
- return []string{p.location}
- }
- return []string{}
- }
- func (p *simpleResourcePool) getLocation() (string, error) {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- if p.location == "" {
- return "", fmt.Errorf(
- "resource location is not set for SimpleResourcePool")
- }
- if p.isLameDuck {
- return "", fmt.Errorf(
- "lame duck resource pool cannot return handles to %s",
- p.location)
- }
- return p.location, nil
- }
- // This gets an active resource from the resource pool. Note that the
- // resourceLocation argument is ignored (The handles are associated to the
- // resource location provided by the first Register call).
- func (p *simpleResourcePool) Get(unused string) (ManagedHandle, error) {
- activeCount := atomic.AddInt32(p.numActive, 1)
- if p.options.MaxActiveHandles > 0 &&
- activeCount > p.options.MaxActiveHandles {
- atomic.AddInt32(p.numActive, -1)
- return nil, TooManyHandles{p.location}
- }
- highest := atomic.LoadInt32(p.activeHighWaterMark)
- for activeCount > highest &&
- !atomic.CompareAndSwapInt32(
- p.activeHighWaterMark,
- highest,
- activeCount) {
- highest = atomic.LoadInt32(p.activeHighWaterMark)
- }
- if h := p.getIdleHandle(); h != nil {
- return h, nil
- }
- location, err := p.getLocation()
- if err != nil {
- atomic.AddInt32(p.numActive, -1)
- return nil, err
- }
- if p.openTokens != nil {
- // Current implementation does not wait for tokens to become available.
- // If that causes availability hits, we could increase the wait,
- // similar to simple_pool.go.
- if p.openTokens.TryAcquire(0) {
- defer p.openTokens.Release()
- } else {
- // We could not immediately acquire a token.
- // Instead of waiting
- atomic.AddInt32(p.numActive, -1)
- return nil, OpenHandleError{
- p.location, errors.New("Open Error: reached OpenMaxConcurrency")}
- }
- }
- handle, err := p.options.Open(location)
- if err != nil {
- atomic.AddInt32(p.numActive, -1)
- return nil, OpenHandleError{p.location, err}
- }
- return NewManagedHandle(p.location, handle, p, p.options), nil
- }
- // See ResourcePool for documentation.
- func (p *simpleResourcePool) Release(handle ManagedHandle) error {
- if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p {
- return errors.New(
- "Resource pool cannot take control of a handle owned " +
- "by another resource pool")
- }
- h := handle.ReleaseUnderlyingHandle()
- if h != nil {
- // We can unref either before or after queuing the idle handle.
- // The advantage of unref-ing before queuing is that there is
- // a higher chance of successful Get when number of active handles
- // is close to the limit (but potentially more handle creation).
- // The advantage of queuing before unref-ing is that there's a
- // higher chance of reusing handle (but potentially more Get failures).
- atomic.AddInt32(p.numActive, -1)
- p.queueIdleHandles(h)
- }
- return nil
- }
- // See ResourcePool for documentation.
- func (p *simpleResourcePool) Discard(handle ManagedHandle) error {
- if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p {
- return errors.New(
- "Resource pool cannot take control of a handle owned " +
- "by another resource pool")
- }
- h := handle.ReleaseUnderlyingHandle()
- if h != nil {
- atomic.AddInt32(p.numActive, -1)
- if err := p.options.Close(h); err != nil {
- return fmt.Errorf("failed to close resource handle: %v", err)
- }
- }
- return nil
- }
- // See ResourcePool for documentation.
- func (p *simpleResourcePool) EnterLameDuckMode() {
- p.mutex.Lock()
- toClose := p.idleHandles
- p.isLameDuck = true
- p.idleHandles = []*idleHandle{}
- p.mutex.Unlock()
- p.closeHandles(toClose)
- }
- // This returns an idle resource, if there is one.
- func (p *simpleResourcePool) getIdleHandle() ManagedHandle {
- var toClose []*idleHandle
- defer func() {
- // NOTE: Must keep the closure around to late bind the toClose slice.
- p.closeHandles(toClose)
- }()
- now := p.options.getCurrentTime()
- p.mutex.Lock()
- defer p.mutex.Unlock()
- var i int
- for i = 0; i < len(p.idleHandles); i++ {
- idle := p.idleHandles[i]
- if idle.keepUntil == nil || now.Before(*idle.keepUntil) {
- break
- }
- }
- if i > 0 {
- toClose = p.idleHandles[0:i]
- }
- if i < len(p.idleHandles) {
- idle := p.idleHandles[i]
- p.idleHandles = p.idleHandles[i+1:]
- return NewManagedHandle(p.location, idle.handle, p, p.options)
- }
- if len(p.idleHandles) > 0 {
- p.idleHandles = []*idleHandle{}
- }
- return nil
- }
- // This adds an idle resource to the pool.
- func (p *simpleResourcePool) queueIdleHandles(handle interface{}) {
- var toClose []*idleHandle
- defer func() {
- // NOTE: Must keep the closure around to late bind the toClose slice.
- p.closeHandles(toClose)
- }()
- now := p.options.getCurrentTime()
- var keepUntil *time.Time
- if p.options.MaxIdleTime != nil {
- // NOTE: Assign to temp variable first to work around compiler bug
- x := now.Add(*p.options.MaxIdleTime)
- keepUntil = &x
- }
- p.mutex.Lock()
- defer p.mutex.Unlock()
- if p.isLameDuck {
- toClose = []*idleHandle{
- {handle: handle},
- }
- return
- }
- p.idleHandles = append(
- p.idleHandles,
- &idleHandle{
- handle: handle,
- keepUntil: keepUntil,
- })
- nIdleHandles := uint32(len(p.idleHandles))
- if nIdleHandles > p.options.MaxIdleHandles {
- handlesToClose := nIdleHandles - p.options.MaxIdleHandles
- toClose = p.idleHandles[0:handlesToClose]
- p.idleHandles = p.idleHandles[handlesToClose:nIdleHandles]
- }
- }
- // Closes resources, at this point it is assumed that this resources
- // are no longer referenced from the main idleHandles slice.
- func (p *simpleResourcePool) closeHandles(handles []*idleHandle) {
- for _, handle := range handles {
- _ = p.options.Close(handle.handle)
- }
- }
|