limited_async_pool.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package util
  2. // initial version comes from https://hackernoon.com/asyncawait-in-golang-an-introductory-guide-ol1e34sg
  3. import (
  4. "container/list"
  5. "context"
  6. "sync"
  7. )
  8. type Future interface {
  9. Await() interface{}
  10. }
  11. type future struct {
  12. await func(ctx context.Context) interface{}
  13. }
  14. func (f future) Await() interface{} {
  15. return f.await(context.Background())
  16. }
  17. type LimitedAsyncExecutor struct {
  18. executor *LimitedConcurrentExecutor
  19. futureList *list.List
  20. futureListCond *sync.Cond
  21. }
  22. func NewLimitedAsyncExecutor(limit int) *LimitedAsyncExecutor {
  23. return &LimitedAsyncExecutor{
  24. executor: NewLimitedConcurrentExecutor(limit),
  25. futureList: list.New(),
  26. futureListCond: sync.NewCond(&sync.Mutex{}),
  27. }
  28. }
  29. func (ae *LimitedAsyncExecutor) Execute(job func() interface{}) {
  30. var result interface{}
  31. c := make(chan struct{})
  32. ae.executor.Execute(func() {
  33. defer close(c)
  34. result = job()
  35. })
  36. f := future{await: func(ctx context.Context) interface{} {
  37. select {
  38. case <-ctx.Done():
  39. return ctx.Err()
  40. case <-c:
  41. return result
  42. }
  43. }}
  44. ae.futureListCond.L.Lock()
  45. ae.futureList.PushBack(f)
  46. ae.futureListCond.Signal()
  47. ae.futureListCond.L.Unlock()
  48. }
  49. func (ae *LimitedAsyncExecutor) NextFuture() Future {
  50. ae.futureListCond.L.Lock()
  51. for ae.futureList.Len() == 0 {
  52. ae.futureListCond.Wait()
  53. }
  54. f := ae.futureList.Remove(ae.futureList.Front())
  55. ae.futureListCond.L.Unlock()
  56. return f.(Future)
  57. }