limiter.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package util
  2. import (
  3. "math/rand"
  4. "reflect"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. // initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
  9. // LimitedConcurrentExecutor object
  10. type LimitedConcurrentExecutor struct {
  11. limit int
  12. tokenChan chan int
  13. }
  14. func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor {
  15. // allocate a limiter instance
  16. c := &LimitedConcurrentExecutor{
  17. limit: limit,
  18. tokenChan: make(chan int, limit),
  19. }
  20. // allocate the tokenChan:
  21. for i := 0; i < c.limit; i++ {
  22. c.tokenChan <- i
  23. }
  24. return c
  25. }
  26. // Execute adds a function to the execution queue.
  27. // if num of go routines allocated by this instance is < limit
  28. // launch a new go routine to execute job
  29. // else wait until a go routine becomes available
  30. func (c *LimitedConcurrentExecutor) Execute(job func()) {
  31. token := <-c.tokenChan
  32. go func() {
  33. defer func() {
  34. c.tokenChan <- token
  35. }()
  36. // run the job
  37. job()
  38. }()
  39. }
  40. // a different implementation, but somehow more "conservative"
  41. type OperationRequest func()
  42. type LimitedOutOfOrderProcessor struct {
  43. processorSlots uint32
  44. processors []chan OperationRequest
  45. processorLimit int32
  46. processorLimitCond *sync.Cond
  47. currentProcessor int32
  48. }
  49. func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
  50. processorSlots := uint32(32)
  51. c = &LimitedOutOfOrderProcessor{
  52. processorSlots: processorSlots,
  53. processors: make([]chan OperationRequest, processorSlots),
  54. processorLimit: limit,
  55. processorLimitCond: sync.NewCond(new(sync.Mutex)),
  56. }
  57. for i := 0; i < int(processorSlots); i++ {
  58. c.processors[i] = make(chan OperationRequest)
  59. }
  60. cases := make([]reflect.SelectCase, processorSlots)
  61. for i, ch := range c.processors {
  62. cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
  63. }
  64. go func() {
  65. for {
  66. _, value, ok := reflect.Select(cases)
  67. if !ok {
  68. continue
  69. }
  70. request := value.Interface().(OperationRequest)
  71. if c.processorLimit > 0 {
  72. c.processorLimitCond.L.Lock()
  73. for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
  74. c.processorLimitCond.Wait()
  75. }
  76. atomic.AddInt32(&c.currentProcessor, 1)
  77. c.processorLimitCond.L.Unlock()
  78. }
  79. go func() {
  80. if c.processorLimit > 0 {
  81. defer atomic.AddInt32(&c.currentProcessor, -1)
  82. defer c.processorLimitCond.Signal()
  83. }
  84. request()
  85. }()
  86. }
  87. }()
  88. return c
  89. }
  90. func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
  91. index := rand.Uint32() % c.processorSlots
  92. c.processors[index] <- request
  93. }