Queue.h 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. #ifndef QUEUE_H
  2. #define QUEUE_H
  3. #include "ml-private.h"
  4. #include "Mutex.h"
  5. #include <queue>
  6. #include <mutex>
  7. #include <condition_variable>
  8. template<typename T>
  9. class Queue {
  10. public:
  11. Queue(void) : Q(), M() {
  12. pthread_cond_init(&CV, nullptr);
  13. Exit = false;
  14. }
  15. ~Queue() {
  16. pthread_cond_destroy(&CV);
  17. }
  18. void push(T t) {
  19. std::lock_guard<Mutex> L(M);
  20. Q.push(t);
  21. pthread_cond_signal(&CV);
  22. }
  23. std::pair<T, size_t> pop(void) {
  24. std::lock_guard<Mutex> L(M);
  25. while (Q.empty()) {
  26. pthread_cond_wait(&CV, M.inner());
  27. if (Exit) {
  28. // This should happen only when we are destroying a host.
  29. // Callers should use a flag dedicated to checking if we
  30. // are about to delete the host or exit the agent. The original
  31. // implementation would call pthread_exit which would cause
  32. // the queue's mutex to be destroyed twice (and fail on the
  33. // 2nd time)
  34. return { T(), 0 };
  35. }
  36. }
  37. T V = Q.front();
  38. size_t Size = Q.size();
  39. Q.pop();
  40. return { V, Size };
  41. }
  42. void signal() {
  43. std::lock_guard<Mutex> L(M);
  44. Exit = true;
  45. pthread_cond_signal(&CV);
  46. }
  47. private:
  48. std::queue<T> Q;
  49. Mutex M;
  50. pthread_cond_t CV;
  51. std::atomic<bool> Exit;
  52. };
  53. #endif /* QUEUE_H */