queue_for_actor.h 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. #pragma once
  2. #include <util/generic/vector.h>
  3. #include <util/system/yassert.h>
  4. #include <util/thread/lfstack.h>
  5. #include <util/thread/singleton.h>
  6. // TODO: include from correct directory
  7. #include "temp_tls_vector.h"
  8. namespace NActor {
  9. namespace NPrivate {
  10. struct TTagForTl {};
  11. }
  12. template <typename T>
  13. class TQueueForActor {
  14. private:
  15. TLockFreeStack<T> Queue;
  16. public:
  17. ~TQueueForActor() {
  18. Y_ABORT_UNLESS(Queue.IsEmpty());
  19. }
  20. bool IsEmpty() {
  21. return Queue.IsEmpty();
  22. }
  23. void Enqueue(const T& value) {
  24. Queue.Enqueue(value);
  25. }
  26. template <typename TCollection>
  27. void EnqueueAll(const TCollection& all) {
  28. Queue.EnqueueAll(all);
  29. }
  30. void Clear() {
  31. TVector<T> tmp;
  32. Queue.DequeueAll(&tmp);
  33. }
  34. template <typename TFunc>
  35. void DequeueAll(const TFunc& func
  36. // TODO: , std::enable_if_t<TFunctionParamCount<TFunc>::Value == 1>* = 0
  37. ) {
  38. TTempTlsVector<T> temp;
  39. Queue.DequeueAllSingleConsumer(temp.GetVector());
  40. for (typename TVector<T>::reverse_iterator i = temp.GetVector()->rbegin(); i != temp.GetVector()->rend(); ++i) {
  41. func(*i);
  42. }
  43. temp.Clear();
  44. if (temp.Capacity() * sizeof(T) > 64 * 1024) {
  45. temp.Shrink();
  46. }
  47. }
  48. template <typename TFunc>
  49. void DequeueAllLikelyEmpty(const TFunc& func) {
  50. if (Y_LIKELY(IsEmpty())) {
  51. return;
  52. }
  53. DequeueAll(func);
  54. }
  55. };
  56. }