queue_unbounded.go 777 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. package util
  2. import "sync"
  3. type UnboundedQueue struct {
  4. outbound []string
  5. outboundLock sync.RWMutex
  6. inbound []string
  7. inboundLock sync.RWMutex
  8. }
  9. func NewUnboundedQueue() *UnboundedQueue {
  10. q := &UnboundedQueue{}
  11. return q
  12. }
  13. func (q *UnboundedQueue) EnQueue(items ...string) {
  14. q.inboundLock.Lock()
  15. defer q.inboundLock.Unlock()
  16. q.inbound = append(q.inbound, items...)
  17. }
  18. func (q *UnboundedQueue) Consume(fn func([]string)) {
  19. q.outboundLock.Lock()
  20. defer q.outboundLock.Unlock()
  21. if len(q.outbound) == 0 {
  22. q.inboundLock.Lock()
  23. inboundLen := len(q.inbound)
  24. if inboundLen > 0 {
  25. t := q.outbound
  26. q.outbound = q.inbound
  27. q.inbound = t
  28. }
  29. q.inboundLock.Unlock()
  30. }
  31. if len(q.outbound) > 0 {
  32. fn(q.outbound)
  33. q.outbound = q.outbound[:0]
  34. }
  35. }