pool.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. package puddle
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/jackc/puddle/v2/internal/genstack"
  9. "golang.org/x/sync/semaphore"
  10. )
  11. const (
  12. resourceStatusConstructing = 0
  13. resourceStatusIdle = iota
  14. resourceStatusAcquired = iota
  15. resourceStatusHijacked = iota
  16. )
  17. // ErrClosedPool occurs on an attempt to acquire a connection from a closed pool
  18. // or a pool that is closed while the acquire is waiting.
  19. var ErrClosedPool = errors.New("closed pool")
  20. // ErrNotAvailable occurs on an attempt to acquire a resource from a pool
  21. // that is at maximum capacity and has no available resources.
  22. var ErrNotAvailable = errors.New("resource not available")
  23. // Constructor is a function called by the pool to construct a resource.
  24. type Constructor[T any] func(ctx context.Context) (res T, err error)
  25. // Destructor is a function called by the pool to destroy a resource.
  26. type Destructor[T any] func(res T)
  27. // Resource is the resource handle returned by acquiring from the pool.
  28. type Resource[T any] struct {
  29. value T
  30. pool *Pool[T]
  31. creationTime time.Time
  32. lastUsedNano int64
  33. poolResetCount int
  34. status byte
  35. }
  36. // Value returns the resource value.
  37. func (res *Resource[T]) Value() T {
  38. if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
  39. panic("tried to access resource that is not acquired or hijacked")
  40. }
  41. return res.value
  42. }
  43. // Release returns the resource to the pool. res must not be subsequently used.
  44. func (res *Resource[T]) Release() {
  45. if res.status != resourceStatusAcquired {
  46. panic("tried to release resource that is not acquired")
  47. }
  48. res.pool.releaseAcquiredResource(res, nanotime())
  49. }
  50. // ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime
  51. // will not change. res must not be subsequently used.
  52. func (res *Resource[T]) ReleaseUnused() {
  53. if res.status != resourceStatusAcquired {
  54. panic("tried to release resource that is not acquired")
  55. }
  56. res.pool.releaseAcquiredResource(res, res.lastUsedNano)
  57. }
  58. // Destroy returns the resource to the pool for destruction. res must not be
  59. // subsequently used.
  60. func (res *Resource[T]) Destroy() {
  61. if res.status != resourceStatusAcquired {
  62. panic("tried to destroy resource that is not acquired")
  63. }
  64. go res.pool.destroyAcquiredResource(res)
  65. }
  66. // Hijack assumes ownership of the resource from the pool. Caller is responsible
  67. // for cleanup of resource value.
  68. func (res *Resource[T]) Hijack() {
  69. if res.status != resourceStatusAcquired {
  70. panic("tried to hijack resource that is not acquired")
  71. }
  72. res.pool.hijackAcquiredResource(res)
  73. }
  74. // CreationTime returns when the resource was created by the pool.
  75. func (res *Resource[T]) CreationTime() time.Time {
  76. if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
  77. panic("tried to access resource that is not acquired or hijacked")
  78. }
  79. return res.creationTime
  80. }
  81. // LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time
  82. // (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with
  83. // other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead.
  84. func (res *Resource[T]) LastUsedNanotime() int64 {
  85. if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
  86. panic("tried to access resource that is not acquired or hijacked")
  87. }
  88. return res.lastUsedNano
  89. }
  90. // IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting
  91. // LastUsedNanotime to the current nanotime.
  92. func (res *Resource[T]) IdleDuration() time.Duration {
  93. if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
  94. panic("tried to access resource that is not acquired or hijacked")
  95. }
  96. return time.Duration(nanotime() - res.lastUsedNano)
  97. }
  98. // Pool is a concurrency-safe resource pool.
  99. type Pool[T any] struct {
  100. // mux is the pool internal lock. Any modification of shared state of
  101. // the pool (but Acquires of acquireSem) must be performed only by
  102. // holder of the lock. Long running operations are not allowed when mux
  103. // is held.
  104. mux sync.Mutex
  105. // acquireSem provides an allowance to acquire a resource.
  106. //
  107. // Releases are allowed only when caller holds mux. Acquires have to
  108. // happen before mux is locked (doesn't apply to semaphore.TryAcquire in
  109. // AcquireAllIdle).
  110. acquireSem *semaphore.Weighted
  111. destructWG sync.WaitGroup
  112. allResources resList[T]
  113. idleResources *genstack.GenStack[*Resource[T]]
  114. constructor Constructor[T]
  115. destructor Destructor[T]
  116. maxSize int32
  117. acquireCount int64
  118. acquireDuration time.Duration
  119. emptyAcquireCount int64
  120. canceledAcquireCount atomic.Int64
  121. resetCount int
  122. baseAcquireCtx context.Context
  123. cancelBaseAcquireCtx context.CancelFunc
  124. closed bool
  125. }
  126. type Config[T any] struct {
  127. Constructor Constructor[T]
  128. Destructor Destructor[T]
  129. MaxSize int32
  130. }
  131. // NewPool creates a new pool. Panics if maxSize is less than 1.
  132. func NewPool[T any](config *Config[T]) (*Pool[T], error) {
  133. if config.MaxSize < 1 {
  134. return nil, errors.New("MaxSize must be >= 1")
  135. }
  136. baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background())
  137. return &Pool[T]{
  138. acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
  139. idleResources: genstack.NewGenStack[*Resource[T]](),
  140. maxSize: config.MaxSize,
  141. constructor: config.Constructor,
  142. destructor: config.Destructor,
  143. baseAcquireCtx: baseAcquireCtx,
  144. cancelBaseAcquireCtx: cancelBaseAcquireCtx,
  145. }, nil
  146. }
  147. // Close destroys all resources in the pool and rejects future Acquire calls.
  148. // Blocks until all resources are returned to pool and destroyed.
  149. func (p *Pool[T]) Close() {
  150. defer p.destructWG.Wait()
  151. p.mux.Lock()
  152. defer p.mux.Unlock()
  153. if p.closed {
  154. return
  155. }
  156. p.closed = true
  157. p.cancelBaseAcquireCtx()
  158. for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
  159. p.allResources.remove(res)
  160. go p.destructResourceValue(res.value)
  161. }
  162. }
  163. // Stat is a snapshot of Pool statistics.
  164. type Stat struct {
  165. constructingResources int32
  166. acquiredResources int32
  167. idleResources int32
  168. maxResources int32
  169. acquireCount int64
  170. acquireDuration time.Duration
  171. emptyAcquireCount int64
  172. canceledAcquireCount int64
  173. }
  174. // TotalResources returns the total number of resources currently in the pool.
  175. // The value is the sum of ConstructingResources, AcquiredResources, and
  176. // IdleResources.
  177. func (s *Stat) TotalResources() int32 {
  178. return s.constructingResources + s.acquiredResources + s.idleResources
  179. }
  180. // ConstructingResources returns the number of resources with construction in progress in
  181. // the pool.
  182. func (s *Stat) ConstructingResources() int32 {
  183. return s.constructingResources
  184. }
  185. // AcquiredResources returns the number of currently acquired resources in the pool.
  186. func (s *Stat) AcquiredResources() int32 {
  187. return s.acquiredResources
  188. }
  189. // IdleResources returns the number of currently idle resources in the pool.
  190. func (s *Stat) IdleResources() int32 {
  191. return s.idleResources
  192. }
  193. // MaxResources returns the maximum size of the pool.
  194. func (s *Stat) MaxResources() int32 {
  195. return s.maxResources
  196. }
  197. // AcquireCount returns the cumulative count of successful acquires from the pool.
  198. func (s *Stat) AcquireCount() int64 {
  199. return s.acquireCount
  200. }
  201. // AcquireDuration returns the total duration of all successful acquires from
  202. // the pool.
  203. func (s *Stat) AcquireDuration() time.Duration {
  204. return s.acquireDuration
  205. }
  206. // EmptyAcquireCount returns the cumulative count of successful acquires from the pool
  207. // that waited for a resource to be released or constructed because the pool was
  208. // empty.
  209. func (s *Stat) EmptyAcquireCount() int64 {
  210. return s.emptyAcquireCount
  211. }
  212. // CanceledAcquireCount returns the cumulative count of acquires from the pool
  213. // that were canceled by a context.
  214. func (s *Stat) CanceledAcquireCount() int64 {
  215. return s.canceledAcquireCount
  216. }
  217. // Stat returns the current pool statistics.
  218. func (p *Pool[T]) Stat() *Stat {
  219. p.mux.Lock()
  220. defer p.mux.Unlock()
  221. s := &Stat{
  222. maxResources: p.maxSize,
  223. acquireCount: p.acquireCount,
  224. emptyAcquireCount: p.emptyAcquireCount,
  225. canceledAcquireCount: p.canceledAcquireCount.Load(),
  226. acquireDuration: p.acquireDuration,
  227. }
  228. for _, res := range p.allResources {
  229. switch res.status {
  230. case resourceStatusConstructing:
  231. s.constructingResources += 1
  232. case resourceStatusIdle:
  233. s.idleResources += 1
  234. case resourceStatusAcquired:
  235. s.acquiredResources += 1
  236. }
  237. }
  238. return s
  239. }
  240. // tryAcquireIdleResource checks if there is any idle resource. If there is
  241. // some, this method removes it from idle list and returns it. If the idle pool
  242. // is empty, this method returns nil and doesn't modify the idleResources slice.
  243. //
  244. // WARNING: Caller of this method must hold the pool mutex!
  245. func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] {
  246. res, ok := p.idleResources.Pop()
  247. if !ok {
  248. return nil
  249. }
  250. res.status = resourceStatusAcquired
  251. return res
  252. }
  253. // createNewResource creates a new resource and inserts it into list of pool
  254. // resources.
  255. //
  256. // WARNING: Caller of this method must hold the pool mutex!
  257. func (p *Pool[T]) createNewResource() *Resource[T] {
  258. res := &Resource[T]{
  259. pool: p,
  260. creationTime: time.Now(),
  261. lastUsedNano: nanotime(),
  262. poolResetCount: p.resetCount,
  263. status: resourceStatusConstructing,
  264. }
  265. p.allResources.append(res)
  266. p.destructWG.Add(1)
  267. return res
  268. }
  269. // Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will
  270. // create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be
  271. // used to cancel the Acquire.
  272. //
  273. // If Acquire creates a new resource the resource constructor function will receive a context that delegates Value() to
  274. // ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids
  275. // the problem of it being impossible to create resources when the time to create a resource is greater than any one
  276. // caller of Acquire is willing to wait.
  277. func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) {
  278. select {
  279. case <-ctx.Done():
  280. p.canceledAcquireCount.Add(1)
  281. return nil, ctx.Err()
  282. default:
  283. }
  284. return p.acquire(ctx)
  285. }
  286. // acquire is a continuation of Acquire function that doesn't check context
  287. // validity.
  288. //
  289. // This function exists solely only for benchmarking purposes.
  290. func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
  291. startNano := nanotime()
  292. var waitedForLock bool
  293. if !p.acquireSem.TryAcquire(1) {
  294. waitedForLock = true
  295. err := p.acquireSem.Acquire(ctx, 1)
  296. if err != nil {
  297. p.canceledAcquireCount.Add(1)
  298. return nil, err
  299. }
  300. }
  301. p.mux.Lock()
  302. if p.closed {
  303. p.acquireSem.Release(1)
  304. p.mux.Unlock()
  305. return nil, ErrClosedPool
  306. }
  307. // If a resource is available in the pool.
  308. if res := p.tryAcquireIdleResource(); res != nil {
  309. if waitedForLock {
  310. p.emptyAcquireCount += 1
  311. }
  312. p.acquireCount += 1
  313. p.acquireDuration += time.Duration(nanotime() - startNano)
  314. p.mux.Unlock()
  315. return res, nil
  316. }
  317. if len(p.allResources) >= int(p.maxSize) {
  318. // Unreachable code.
  319. panic("bug: semaphore allowed more acquires than pool allows")
  320. }
  321. // The resource is not idle, but there is enough space to create one.
  322. res := p.createNewResource()
  323. p.mux.Unlock()
  324. res, err := p.initResourceValue(ctx, res)
  325. if err != nil {
  326. return nil, err
  327. }
  328. p.mux.Lock()
  329. defer p.mux.Unlock()
  330. p.emptyAcquireCount += 1
  331. p.acquireCount += 1
  332. p.acquireDuration += time.Duration(nanotime() - startNano)
  333. return res, nil
  334. }
  335. func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Resource[T], error) {
  336. // Create the resource in a goroutine to immediately return from Acquire
  337. // if ctx is canceled without also canceling the constructor.
  338. //
  339. // See:
  340. // - https://github.com/jackc/pgx/issues/1287
  341. // - https://github.com/jackc/pgx/issues/1259
  342. constructErrChan := make(chan error)
  343. go func() {
  344. constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx)
  345. value, err := p.constructor(constructorCtx)
  346. if err != nil {
  347. p.mux.Lock()
  348. p.allResources.remove(res)
  349. p.destructWG.Done()
  350. // The resource won't be acquired because its
  351. // construction failed. We have to allow someone else to
  352. // take that resouce.
  353. p.acquireSem.Release(1)
  354. p.mux.Unlock()
  355. select {
  356. case constructErrChan <- err:
  357. case <-ctx.Done():
  358. // The caller is cancelled, so no-one awaits the
  359. // error. This branch avoid goroutine leak.
  360. }
  361. return
  362. }
  363. // The resource is already in p.allResources where it might be read. So we need to acquire the lock to update its
  364. // status.
  365. p.mux.Lock()
  366. res.value = value
  367. res.status = resourceStatusAcquired
  368. p.mux.Unlock()
  369. // This select works because the channel is unbuffered.
  370. select {
  371. case constructErrChan <- nil:
  372. case <-ctx.Done():
  373. p.releaseAcquiredResource(res, res.lastUsedNano)
  374. }
  375. }()
  376. select {
  377. case <-ctx.Done():
  378. p.canceledAcquireCount.Add(1)
  379. return nil, ctx.Err()
  380. case err := <-constructErrChan:
  381. if err != nil {
  382. return nil, err
  383. }
  384. return res, nil
  385. }
  386. }
  387. // TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no
  388. // resources are available but the pool has room to grow, a resource will be created in the background. ctx is only
  389. // used to cancel the background creation.
  390. func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
  391. if !p.acquireSem.TryAcquire(1) {
  392. return nil, ErrNotAvailable
  393. }
  394. p.mux.Lock()
  395. defer p.mux.Unlock()
  396. if p.closed {
  397. p.acquireSem.Release(1)
  398. return nil, ErrClosedPool
  399. }
  400. // If a resource is available now
  401. if res := p.tryAcquireIdleResource(); res != nil {
  402. p.acquireCount += 1
  403. return res, nil
  404. }
  405. if len(p.allResources) >= int(p.maxSize) {
  406. // Unreachable code.
  407. panic("bug: semaphore allowed more acquires than pool allows")
  408. }
  409. res := p.createNewResource()
  410. go func() {
  411. value, err := p.constructor(ctx)
  412. p.mux.Lock()
  413. defer p.mux.Unlock()
  414. // We have to create the resource and only then release the
  415. // semaphore - For the time being there is no resource that
  416. // someone could acquire.
  417. defer p.acquireSem.Release(1)
  418. if err != nil {
  419. p.allResources.remove(res)
  420. p.destructWG.Done()
  421. return
  422. }
  423. res.value = value
  424. res.status = resourceStatusIdle
  425. p.idleResources.Push(res)
  426. }()
  427. return nil, ErrNotAvailable
  428. }
  429. // acquireSemAll tries to acquire num free tokens from sem. This function is
  430. // guaranteed to acquire at least the lowest number of tokens that has been
  431. // available in the semaphore during runtime of this function.
  432. //
  433. // For the time being, semaphore doesn't allow to acquire all tokens atomically
  434. // (see https://github.com/golang/sync/pull/19). We simulate this by trying all
  435. // powers of 2 that are less or equal to num.
  436. //
  437. // For example, let's immagine we have 19 free tokens in the semaphore which in
  438. // total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then if
  439. // num is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1
  440. // tokens. Out of those, the acquire of 16, 2 and 1 tokens will succeed.
  441. //
  442. // Naturally, Acquires and Releases of the semaphore might take place
  443. // concurrently. For this reason, it's not guaranteed that absolutely all free
  444. // tokens in the semaphore will be acquired. But it's guaranteed that at least
  445. // the minimal number of tokens that has been present over the whole process
  446. // will be acquired. This is sufficient for the use-case we have in this
  447. // package.
  448. //
  449. // TODO: Replace this with acquireSem.TryAcquireAll() if it gets to
  450. // upstream. https://github.com/golang/sync/pull/19
  451. func acquireSemAll(sem *semaphore.Weighted, num int) int {
  452. if sem.TryAcquire(int64(num)) {
  453. return num
  454. }
  455. var acquired int
  456. for i := int(log2Int(num)); i >= 0; i-- {
  457. val := 1 << i
  458. if sem.TryAcquire(int64(val)) {
  459. acquired += val
  460. }
  461. }
  462. return acquired
  463. }
  464. // AcquireAllIdle acquires all currently idle resources. Its intended use is for
  465. // health check and keep-alive functionality. It does not update pool
  466. // statistics.
  467. func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
  468. p.mux.Lock()
  469. defer p.mux.Unlock()
  470. if p.closed {
  471. return nil
  472. }
  473. numIdle := p.idleResources.Len()
  474. if numIdle == 0 {
  475. return nil
  476. }
  477. // In acquireSemAll we use only TryAcquire and not Acquire. Because
  478. // TryAcquire cannot block, the fact that we hold mutex locked and try
  479. // to acquire semaphore cannot result in dead-lock.
  480. //
  481. // Because the mutex is locked, no parallel Release can run. This
  482. // implies that the number of tokens can only decrease because some
  483. // Acquire/TryAcquire call can consume the semaphore token. Consequently
  484. // acquired is always less or equal to numIdle. Moreover if acquired <
  485. // numIdle, then there are some parallel Acquire/TryAcquire calls that
  486. // will take the remaining idle connections.
  487. acquired := acquireSemAll(p.acquireSem, numIdle)
  488. idle := make([]*Resource[T], acquired)
  489. for i := range idle {
  490. res, _ := p.idleResources.Pop()
  491. res.status = resourceStatusAcquired
  492. idle[i] = res
  493. }
  494. // We have to bump the generation to ensure that Acquire/TryAcquire
  495. // calls running in parallel (those which caused acquired < numIdle)
  496. // will consume old connections and not freshly released connections
  497. // instead.
  498. p.idleResources.NextGen()
  499. return idle
  500. }
  501. // CreateResource constructs a new resource without acquiring it.
  502. // It goes straight in the IdlePool. It does not check against maxSize.
  503. // It can be useful to maintain warm resources under little load.
  504. func (p *Pool[T]) CreateResource(ctx context.Context) error {
  505. p.mux.Lock()
  506. if p.closed {
  507. p.mux.Unlock()
  508. return ErrClosedPool
  509. }
  510. p.destructWG.Add(1)
  511. p.mux.Unlock()
  512. value, err := p.constructor(ctx)
  513. if err != nil {
  514. p.destructWG.Done()
  515. return err
  516. }
  517. res := &Resource[T]{
  518. pool: p,
  519. creationTime: time.Now(),
  520. status: resourceStatusIdle,
  521. value: value,
  522. lastUsedNano: nanotime(),
  523. poolResetCount: p.resetCount,
  524. }
  525. p.mux.Lock()
  526. defer p.mux.Unlock()
  527. // If closed while constructing resource then destroy it and return an error
  528. if p.closed {
  529. go p.destructResourceValue(res.value)
  530. return ErrClosedPool
  531. }
  532. p.allResources.append(res)
  533. p.idleResources.Push(res)
  534. return nil
  535. }
  536. // Reset destroys all resources, but leaves the pool open. It is intended for use when an error is detected that would
  537. // disrupt all resources (such as a network interruption or a server state change).
  538. //
  539. // It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned
  540. // to the pool.
  541. func (p *Pool[T]) Reset() {
  542. p.mux.Lock()
  543. defer p.mux.Unlock()
  544. p.resetCount++
  545. for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
  546. p.allResources.remove(res)
  547. go p.destructResourceValue(res.value)
  548. }
  549. }
  550. // releaseAcquiredResource returns res to the the pool.
  551. func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
  552. p.mux.Lock()
  553. defer p.mux.Unlock()
  554. defer p.acquireSem.Release(1)
  555. if p.closed || res.poolResetCount != p.resetCount {
  556. p.allResources.remove(res)
  557. go p.destructResourceValue(res.value)
  558. } else {
  559. res.lastUsedNano = lastUsedNano
  560. res.status = resourceStatusIdle
  561. p.idleResources.Push(res)
  562. }
  563. }
  564. // Remove removes res from the pool and closes it. If res is not part of the
  565. // pool Remove will panic.
  566. func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
  567. p.destructResourceValue(res.value)
  568. p.mux.Lock()
  569. defer p.mux.Unlock()
  570. defer p.acquireSem.Release(1)
  571. p.allResources.remove(res)
  572. }
  573. func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
  574. p.mux.Lock()
  575. defer p.mux.Unlock()
  576. defer p.acquireSem.Release(1)
  577. p.allResources.remove(res)
  578. res.status = resourceStatusHijacked
  579. p.destructWG.Done() // not responsible for destructing hijacked resources
  580. }
  581. func (p *Pool[T]) destructResourceValue(value T) {
  582. p.destructor(value)
  583. p.destructWG.Done()
  584. }