multi_resource_pool.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package resource_pool
  2. import (
  3. "fmt"
  4. "sync"
  5. "errors"
  6. )
  7. // A resource pool implementation that manages multiple resource location
  8. // entries. The handles to each resource location entry acts independently.
  9. // For example "tcp localhost:11211" could act as memcache
  10. // shard 0 and "tcp localhost:11212" could act as memcache shard 1.
  11. type multiResourcePool struct {
  12. options Options
  13. createPool func(Options) ResourcePool
  14. rwMutex sync.RWMutex
  15. isLameDuck bool // guarded by rwMutex
  16. // NOTE: the locationPools is guarded by rwMutex, but the pool entries
  17. // are not.
  18. locationPools map[string]ResourcePool
  19. }
  20. // This returns a MultiResourcePool, which manages multiple
  21. // resource location entries. The handles to each resource location
  22. // entry acts independently.
  23. //
  24. // When createPool is nil, NewSimpleResourcePool is used as default.
  25. func NewMultiResourcePool(
  26. options Options,
  27. createPool func(Options) ResourcePool) ResourcePool {
  28. if createPool == nil {
  29. createPool = NewSimpleResourcePool
  30. }
  31. return &multiResourcePool{
  32. options: options,
  33. createPool: createPool,
  34. rwMutex: sync.RWMutex{},
  35. isLameDuck: false,
  36. locationPools: make(map[string]ResourcePool),
  37. }
  38. }
  39. // See ResourcePool for documentation.
  40. func (p *multiResourcePool) NumActive() int32 {
  41. total := int32(0)
  42. p.rwMutex.RLock()
  43. defer p.rwMutex.RUnlock()
  44. for _, pool := range p.locationPools {
  45. total += pool.NumActive()
  46. }
  47. return total
  48. }
  49. // See ResourcePool for documentation.
  50. func (p *multiResourcePool) ActiveHighWaterMark() int32 {
  51. high := int32(0)
  52. p.rwMutex.RLock()
  53. defer p.rwMutex.RUnlock()
  54. for _, pool := range p.locationPools {
  55. val := pool.ActiveHighWaterMark()
  56. if val > high {
  57. high = val
  58. }
  59. }
  60. return high
  61. }
  62. // See ResourcePool for documentation.
  63. func (p *multiResourcePool) NumIdle() int {
  64. total := 0
  65. p.rwMutex.RLock()
  66. defer p.rwMutex.RUnlock()
  67. for _, pool := range p.locationPools {
  68. total += pool.NumIdle()
  69. }
  70. return total
  71. }
  72. // See ResourcePool for documentation.
  73. func (p *multiResourcePool) Register(resourceLocation string) error {
  74. if resourceLocation == "" {
  75. return errors.New("Registering invalid resource location")
  76. }
  77. p.rwMutex.Lock()
  78. defer p.rwMutex.Unlock()
  79. if p.isLameDuck {
  80. return fmt.Errorf(
  81. "Cannot register %s to lame duck resource pool",
  82. resourceLocation)
  83. }
  84. if _, inMap := p.locationPools[resourceLocation]; inMap {
  85. return nil
  86. }
  87. pool := p.createPool(p.options)
  88. if err := pool.Register(resourceLocation); err != nil {
  89. return err
  90. }
  91. p.locationPools[resourceLocation] = pool
  92. return nil
  93. }
  94. // See ResourcePool for documentation.
  95. func (p *multiResourcePool) Unregister(resourceLocation string) error {
  96. p.rwMutex.Lock()
  97. defer p.rwMutex.Unlock()
  98. if pool, inMap := p.locationPools[resourceLocation]; inMap {
  99. _ = pool.Unregister("")
  100. pool.EnterLameDuckMode()
  101. delete(p.locationPools, resourceLocation)
  102. }
  103. return nil
  104. }
  105. func (p *multiResourcePool) ListRegistered() []string {
  106. p.rwMutex.RLock()
  107. defer p.rwMutex.RUnlock()
  108. result := make([]string, 0, len(p.locationPools))
  109. for key, _ := range p.locationPools {
  110. result = append(result, key)
  111. }
  112. return result
  113. }
  114. // See ResourcePool for documentation.
  115. func (p *multiResourcePool) Get(
  116. resourceLocation string) (ManagedHandle, error) {
  117. pool := p.getPool(resourceLocation)
  118. if pool == nil {
  119. return nil, fmt.Errorf(
  120. "%s is not registered in the resource pool",
  121. resourceLocation)
  122. }
  123. return pool.Get(resourceLocation)
  124. }
  125. // See ResourcePool for documentation.
  126. func (p *multiResourcePool) Release(handle ManagedHandle) error {
  127. pool := p.getPool(handle.ResourceLocation())
  128. if pool == nil {
  129. return errors.New(
  130. "Resource pool cannot take control of a handle owned " +
  131. "by another resource pool")
  132. }
  133. return pool.Release(handle)
  134. }
  135. // See ResourcePool for documentation.
  136. func (p *multiResourcePool) Discard(handle ManagedHandle) error {
  137. pool := p.getPool(handle.ResourceLocation())
  138. if pool == nil {
  139. return errors.New(
  140. "Resource pool cannot take control of a handle owned " +
  141. "by another resource pool")
  142. }
  143. return pool.Discard(handle)
  144. }
  145. // See ResourcePool for documentation.
  146. func (p *multiResourcePool) EnterLameDuckMode() {
  147. p.rwMutex.Lock()
  148. defer p.rwMutex.Unlock()
  149. p.isLameDuck = true
  150. for _, pool := range p.locationPools {
  151. pool.EnterLameDuckMode()
  152. }
  153. }
  154. func (p *multiResourcePool) getPool(resourceLocation string) ResourcePool {
  155. p.rwMutex.RLock()
  156. defer p.rwMutex.RUnlock()
  157. if pool, inMap := p.locationPools[resourceLocation]; inMap {
  158. return pool
  159. }
  160. return nil
  161. }