pool.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package util
  2. import (
  3. "errors"
  4. "sync"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. )
  8. var (
  9. TimeoutErr = errors.New("timeout")
  10. )
  11. // A bufferedChan implemented by a buffered channel
  12. type ResourcePool struct {
  13. sync.Mutex
  14. bufferedChan chan interface{}
  15. poolSizeLimit int
  16. inuse int
  17. newFn func() (interface{}, error)
  18. }
  19. func NewResourcePool(poolSizeLimit int, newFn func() (interface{}, error)) *ResourcePool {
  20. p := &ResourcePool{
  21. poolSizeLimit: poolSizeLimit,
  22. newFn: newFn,
  23. bufferedChan: make(chan interface{}, poolSizeLimit),
  24. }
  25. return p
  26. }
  27. func (p *ResourcePool) Size() int {
  28. p.Lock()
  29. defer p.Unlock()
  30. return len(p.bufferedChan) + p.inuse
  31. }
  32. func (p *ResourcePool) Free() int {
  33. p.Lock()
  34. defer p.Unlock()
  35. return p.poolSizeLimit - p.inuse
  36. }
  37. func (p *ResourcePool) Get(timeout time.Duration) (interface{}, error) {
  38. d, err := p.get(timeout)
  39. if err != nil {
  40. return nil, err
  41. }
  42. if d == nil && p.newFn != nil {
  43. var err error
  44. d, err = p.newFn()
  45. if err != nil {
  46. return nil, err
  47. }
  48. }
  49. p.Lock()
  50. defer p.Unlock()
  51. p.inuse++
  52. return d, nil
  53. }
  54. func (p *ResourcePool) Release(v interface{}) {
  55. p.Lock()
  56. defer p.Unlock()
  57. if p.inuse == 0 {
  58. glog.V(0).Infof("released too many times?")
  59. return
  60. }
  61. p.bufferedChan <- v
  62. p.inuse--
  63. }
  64. func (p *ResourcePool) get(timeout time.Duration) (interface{}, error) {
  65. select {
  66. case v := <-p.bufferedChan:
  67. return v, nil
  68. default:
  69. }
  70. if p.Free() > 0 {
  71. d, err := p.newFn()
  72. if err != nil {
  73. return nil, err
  74. }
  75. return d, nil
  76. }
  77. // wait for an freed item
  78. select {
  79. case v := <-p.bufferedChan:
  80. return v, nil
  81. case <-time.After(timeout):
  82. }
  83. return nil, TimeoutErr
  84. }