yt_poller.h 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #pragma once
  2. #include <yt/cpp/mapreduce/common/fwd.h>
  3. #include <yt/cpp/mapreduce/http/context.h>
  4. #include <yt/cpp/mapreduce/http/requests.h>
  5. #include <yt/cpp/mapreduce/interface/client.h>
  6. #include <util/generic/list.h>
  7. #include <util/system/mutex.h>
  8. #include <util/system/thread.h>
  9. #include <util/system/condvar.h>
  10. namespace NYT {
  11. namespace NDetail {
  12. namespace NRawClient {
  13. class TRawBatchRequest;
  14. }
  15. ////////////////////////////////////////////////////////////////////////////////
  16. class IYtPollerItem
  17. : public TThrRefBase
  18. {
  19. public:
  20. enum EStatus {
  21. PollContinue,
  22. PollBreak,
  23. };
  24. public:
  25. virtual ~IYtPollerItem() = default;
  26. virtual void PrepareRequest(NRawClient::TRawBatchRequest* batchRequest) = 0;
  27. // Should return PollContinue if poller should continue polling this item.
  28. // Should return PollBreak if poller should stop polling this item.
  29. virtual EStatus OnRequestExecuted() = 0;
  30. virtual void OnItemDiscarded() = 0;
  31. };
  32. using IYtPollerItemPtr = ::TIntrusivePtr<IYtPollerItem>;
  33. ////////////////////////////////////////////////////////////////////////////////
  34. class TYtPoller
  35. : public TThrRefBase
  36. {
  37. public:
  38. TYtPoller(TClientContext context, const IClientRetryPolicyPtr& retryPolicy);
  39. ~TYtPoller();
  40. void Watch(IYtPollerItemPtr item);
  41. void Stop();
  42. private:
  43. void DiscardQueuedItems();
  44. void WatchLoop();
  45. static void* WatchLoopProc(void*);
  46. private:
  47. struct TItem;
  48. const TClientContext Context_;
  49. const IClientRetryPolicyPtr ClientRetryPolicy_;
  50. TList<IYtPollerItemPtr> InProgress_;
  51. TList<IYtPollerItemPtr> Pending_;
  52. TThread WaiterThread_;
  53. TMutex Lock_;
  54. TCondVar HasData_;
  55. bool IsRunning_ = true;
  56. };
  57. ////////////////////////////////////////////////////////////////////////////////
  58. } // namespace NDetail
  59. } // namespace NYT