yt_poller.h 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. #pragma once
  2. #include <yt/cpp/mapreduce/common/fwd.h>
  3. #include <yt/cpp/mapreduce/http/context.h>
  4. #include <yt/cpp/mapreduce/http/requests.h>
  5. #include <yt/cpp/mapreduce/interface/client.h>
  6. #include <util/generic/list.h>
  7. #include <util/system/mutex.h>
  8. #include <util/system/thread.h>
  9. #include <util/system/condvar.h>
  10. namespace NYT {
  11. namespace NDetail {
  12. ////////////////////////////////////////////////////////////////////////////////
  13. class IYtPollerItem
  14. : public TThrRefBase
  15. {
  16. public:
  17. enum EStatus {
  18. PollContinue,
  19. PollBreak,
  20. };
  21. public:
  22. virtual ~IYtPollerItem() = default;
  23. virtual void PrepareRequest(IRawBatchRequest* batchRequest) = 0;
  24. // Should return PollContinue if poller should continue polling this item.
  25. // Should return PollBreak if poller should stop polling this item.
  26. virtual EStatus OnRequestExecuted() = 0;
  27. virtual void OnItemDiscarded() = 0;
  28. };
  29. using IYtPollerItemPtr = ::TIntrusivePtr<IYtPollerItem>;
  30. ////////////////////////////////////////////////////////////////////////////////
  31. class TYtPoller
  32. : public TThrRefBase
  33. {
  34. public:
  35. TYtPoller(TClientContext context, const IClientRetryPolicyPtr& retryPolicy);
  36. ~TYtPoller();
  37. void Watch(IYtPollerItemPtr item);
  38. void Stop();
  39. private:
  40. void DiscardQueuedItems();
  41. void WatchLoop();
  42. static void* WatchLoopProc(void*);
  43. private:
  44. struct TItem;
  45. const TClientContext Context_;
  46. const IClientRetryPolicyPtr ClientRetryPolicy_;
  47. TList<IYtPollerItemPtr> InProgress_;
  48. TList<IYtPollerItemPtr> Pending_;
  49. TThread WaiterThread_;
  50. TMutex Lock_;
  51. TCondVar HasData_;
  52. bool IsRunning_ = true;
  53. };
  54. ////////////////////////////////////////////////////////////////////////////////
  55. } // namespace NDetail
  56. } // namespace NYT