yt_poller.cpp 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #include "yt_poller.h"
  2. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  3. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  4. #include <yt/cpp/mapreduce/common/debug_metrics.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  7. #include <yt/cpp/mapreduce/http/retry_request.h>
  8. #include <yt/cpp/mapreduce/interface/config.h>
  9. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  10. namespace NYT {
  11. namespace NDetail {
  12. using namespace NRawClient;
  13. ////////////////////////////////////////////////////////////////////////////////
  14. TYtPoller::TYtPoller(
  15. TClientContext context,
  16. const IClientRetryPolicyPtr& retryPolicy)
  17. : Context_(std::move(context))
  18. , ClientRetryPolicy_(retryPolicy)
  19. , WaiterThread_(&TYtPoller::WatchLoopProc, this)
  20. {
  21. WaiterThread_.Start();
  22. }
  23. TYtPoller::~TYtPoller()
  24. {
  25. Stop();
  26. }
  27. void TYtPoller::Watch(IYtPollerItemPtr item)
  28. {
  29. auto g = Guard(Lock_);
  30. Pending_.emplace_back(std::move(item));
  31. HasData_.Signal();
  32. }
  33. void TYtPoller::Stop()
  34. {
  35. {
  36. auto g = Guard(Lock_);
  37. if (!IsRunning_) {
  38. return;
  39. }
  40. IsRunning_ = false;
  41. HasData_.Signal();
  42. }
  43. WaiterThread_.Join();
  44. }
  45. void TYtPoller::DiscardQueuedItems()
  46. {
  47. for (auto& item : Pending_) {
  48. item->OnItemDiscarded();
  49. }
  50. for (auto& item : InProgress_) {
  51. item->OnItemDiscarded();
  52. }
  53. }
  54. void TYtPoller::WatchLoop()
  55. {
  56. TInstant nextRequest = TInstant::Zero();
  57. while (true) {
  58. {
  59. auto g = Guard(Lock_);
  60. if (IsRunning_ && Pending_.empty() && InProgress_.empty()) {
  61. TWaitProxy::Get()->WaitCondVar(HasData_, Lock_);
  62. }
  63. if (!IsRunning_) {
  64. DiscardQueuedItems();
  65. return;
  66. }
  67. {
  68. auto ug = Unguard(Lock_); // allow adding new items into Pending_
  69. TWaitProxy::Get()->SleepUntil(nextRequest);
  70. nextRequest = TInstant::Now() + Context_.Config->WaitLockPollInterval;
  71. }
  72. if (!Pending_.empty()) {
  73. InProgress_.splice(InProgress_.end(), Pending_);
  74. }
  75. Y_VERIFY(!InProgress_.empty());
  76. }
  77. TRawBatchRequest rawBatchRequest(Context_.Config);
  78. for (auto& item : InProgress_) {
  79. item->PrepareRequest(&rawBatchRequest);
  80. }
  81. try {
  82. ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, rawBatchRequest);
  83. } catch (const std::exception& ex) {
  84. YT_LOG_ERROR("Exception while executing batch request: %v", ex.what());
  85. }
  86. for (auto it = InProgress_.begin(); it != InProgress_.end();) {
  87. auto& item = *it;
  88. IYtPollerItem::EStatus status = item->OnRequestExecuted();
  89. if (status == IYtPollerItem::PollBreak) {
  90. it = InProgress_.erase(it);
  91. } else {
  92. ++it;
  93. }
  94. }
  95. IncDebugMetric(TStringBuf("yt_poller_top_loop_repeat_count"));
  96. }
  97. }
  98. void* TYtPoller::WatchLoopProc(void* data)
  99. {
  100. static_cast<TYtPoller*>(data)->WatchLoop();
  101. return nullptr;
  102. }
  103. ////////////////////////////////////////////////////////////////////////////////
  104. } // namespace NDetail
  105. } // namespace NYT