yt_poller.h 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #pragma once
  2. #include <yt/cpp/mapreduce/common/fwd.h>
  3. #include <yt/cpp/mapreduce/http/context.h>
  4. #include <yt/cpp/mapreduce/http/requests.h>
  5. #include <yt/cpp/mapreduce/interface/client.h>
  6. #include <util/generic/list.h>
  7. #include <util/system/mutex.h>
  8. #include <util/system/thread.h>
  9. #include <util/system/condvar.h>
  10. namespace NYT {
  11. namespace NDetail {
  12. ////////////////////////////////////////////////////////////////////////////////
  13. class IYtPollerItem
  14. : public TThrRefBase
  15. {
  16. public:
  17. enum EStatus {
  18. PollContinue,
  19. PollBreak,
  20. };
  21. public:
  22. virtual ~IYtPollerItem() = default;
  23. virtual void PrepareRequest(IRawBatchRequest* batchRequest) = 0;
  24. // Should return PollContinue if poller should continue polling this item.
  25. // Should return PollBreak if poller should stop polling this item.
  26. virtual EStatus OnRequestExecuted() = 0;
  27. virtual void OnItemDiscarded() = 0;
  28. };
  29. using IYtPollerItemPtr = ::TIntrusivePtr<IYtPollerItem>;
  30. ////////////////////////////////////////////////////////////////////////////////
  31. class TYtPoller
  32. : public TThrRefBase
  33. {
  34. public:
  35. TYtPoller(
  36. IRawClientPtr rawClient,
  37. const TConfigPtr& config,
  38. const IClientRetryPolicyPtr& retryPolicy);
  39. ~TYtPoller();
  40. void Watch(IYtPollerItemPtr item);
  41. void Stop();
  42. private:
  43. void DiscardQueuedItems();
  44. void WatchLoop();
  45. static void* WatchLoopProc(void*);
  46. private:
  47. struct TItem;
  48. const IRawClientPtr RawClient_;
  49. const TConfigPtr Config_;
  50. const IClientRetryPolicyPtr ClientRetryPolicy_;
  51. TList<IYtPollerItemPtr> InProgress_;
  52. TList<IYtPollerItemPtr> Pending_;
  53. TThread WaiterThread_;
  54. TMutex Lock_;
  55. TCondVar HasData_;
  56. bool IsRunning_ = true;
  57. };
  58. ////////////////////////////////////////////////////////////////////////////////
  59. } // namespace NDetail
  60. } // namespace NYT