123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- package resource_pool
- import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- )
- type Semaphore interface {
- // Increment the semaphore counter by one.
- Release()
- // Decrement the semaphore counter by one, and block if counter < 0
- Acquire()
- // Decrement the semaphore counter by one, and block if counter < 0
- // Wait for up to the given duration. Returns true if did not timeout
- TryAcquire(timeout time.Duration) bool
- }
- // A simple counting Semaphore.
- type boundedSemaphore struct {
- slots chan struct{}
- }
- // Create a bounded semaphore. The count parameter must be a positive number.
- // NOTE: The bounded semaphore will panic if the user tries to Release
- // beyond the specified count.
- func NewBoundedSemaphore(count uint) Semaphore {
- sem := &boundedSemaphore{
- slots: make(chan struct{}, int(count)),
- }
- for i := 0; i < cap(sem.slots); i++ {
- sem.slots <- struct{}{}
- }
- return sem
- }
- // Acquire returns on successful acquisition.
- func (sem *boundedSemaphore) Acquire() {
- <-sem.slots
- }
- // TryAcquire returns true if it acquires a resource slot within the
- // timeout, false otherwise.
- func (sem *boundedSemaphore) TryAcquire(timeout time.Duration) bool {
- if timeout > 0 {
- // Wait until we get a slot or timeout expires.
- tm := time.NewTimer(timeout)
- defer tm.Stop()
- select {
- case <-sem.slots:
- return true
- case <-tm.C:
- // Timeout expired. In very rare cases this might happen even if
- // there is a slot available, e.g. GC pause after we create the timer
- // and select randomly picked this one out of the two available channels.
- // We should do one final immediate check below.
- }
- }
- // Return true if we have a slot available immediately and false otherwise.
- select {
- case <-sem.slots:
- return true
- default:
- return false
- }
- }
- // Release the acquired semaphore. You must not release more than you
- // have acquired.
- func (sem *boundedSemaphore) Release() {
- select {
- case sem.slots <- struct{}{}:
- default:
- // slots is buffered. If a send blocks, it indicates a programming
- // error.
- panic(fmt.Errorf("too many releases for boundedSemaphore"))
- }
- }
- // This returns an unbound counting semaphore with the specified initial count.
- // The semaphore counter can be arbitrary large (i.e., Release can be called
- // unlimited amount of times).
- //
- // NOTE: In general, users should use bounded semaphore since it is more
- // efficient than unbounded semaphore.
- func NewUnboundedSemaphore(initialCount int) Semaphore {
- res := &unboundedSemaphore{
- counter: int64(initialCount),
- }
- res.cond.L = &res.lock
- return res
- }
- type unboundedSemaphore struct {
- lock sync.Mutex
- cond sync.Cond
- counter int64
- }
- func (s *unboundedSemaphore) Release() {
- s.lock.Lock()
- s.counter += 1
- if s.counter > 0 {
- // Not broadcasting here since it's unlike we can satisfy all waiting
- // goroutines. Instead, we will Signal again if there are left over
- // quota after Acquire, in case of lost wakeups.
- s.cond.Signal()
- }
- s.lock.Unlock()
- }
- func (s *unboundedSemaphore) Acquire() {
- s.lock.Lock()
- for s.counter < 1 {
- s.cond.Wait()
- }
- s.counter -= 1
- if s.counter > 0 {
- s.cond.Signal()
- }
- s.lock.Unlock()
- }
- func (s *unboundedSemaphore) TryAcquire(timeout time.Duration) bool {
- done := make(chan bool, 1)
- // Gate used to communicate between the threads and decide what the result
- // is. If the main thread decides, we have timed out, otherwise we succeed.
- decided := new(int32)
- atomic.StoreInt32(decided, 0)
- go func() {
- s.Acquire()
- if atomic.SwapInt32(decided, 1) == 0 {
- // Acquire won the race
- done <- true
- } else {
- // If we already decided the result, and this thread did not win
- s.Release()
- }
- }()
- select {
- case <-done:
- return true
- case <-time.After(timeout):
- if atomic.SwapInt32(decided, 1) == 1 {
- // The other thread already decided the result
- return true
- }
- return false
- }
- }
|