yt_poller.cpp 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. #include "yt_poller.h"
  2. #include <yt/cpp/mapreduce/http_client/raw_batch_request.h>
  3. #include <yt/cpp/mapreduce/common/debug_metrics.h>
  4. #include <yt/cpp/mapreduce/common/retry_lib.h>
  5. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  6. #include <yt/cpp/mapreduce/http/retry_request.h>
  7. #include <yt/cpp/mapreduce/interface/config.h>
  8. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  9. namespace NYT {
  10. namespace NDetail {
  11. using namespace NRawClient;
  12. ////////////////////////////////////////////////////////////////////////////////
  13. TYtPoller::TYtPoller(
  14. TClientContext context,
  15. const IClientRetryPolicyPtr& retryPolicy)
  16. : Context_(std::move(context))
  17. , ClientRetryPolicy_(retryPolicy)
  18. , WaiterThread_(&TYtPoller::WatchLoopProc, this)
  19. {
  20. WaiterThread_.Start();
  21. }
  22. TYtPoller::~TYtPoller()
  23. {
  24. Stop();
  25. }
  26. void TYtPoller::Watch(IYtPollerItemPtr item)
  27. {
  28. auto g = Guard(Lock_);
  29. Pending_.emplace_back(std::move(item));
  30. HasData_.Signal();
  31. }
  32. void TYtPoller::Stop()
  33. {
  34. {
  35. auto g = Guard(Lock_);
  36. if (!IsRunning_) {
  37. return;
  38. }
  39. IsRunning_ = false;
  40. HasData_.Signal();
  41. }
  42. WaiterThread_.Join();
  43. }
  44. void TYtPoller::DiscardQueuedItems()
  45. {
  46. for (auto& item : Pending_) {
  47. item->OnItemDiscarded();
  48. }
  49. for (auto& item : InProgress_) {
  50. item->OnItemDiscarded();
  51. }
  52. }
  53. void TYtPoller::WatchLoop()
  54. {
  55. TInstant nextRequest = TInstant::Zero();
  56. while (true) {
  57. {
  58. auto g = Guard(Lock_);
  59. if (IsRunning_ && Pending_.empty() && InProgress_.empty()) {
  60. TWaitProxy::Get()->WaitCondVar(HasData_, Lock_);
  61. }
  62. if (!IsRunning_) {
  63. DiscardQueuedItems();
  64. return;
  65. }
  66. {
  67. auto ug = Unguard(Lock_); // allow adding new items into Pending_
  68. TWaitProxy::Get()->SleepUntil(nextRequest);
  69. nextRequest = TInstant::Now() + Context_.Config->WaitLockPollInterval;
  70. }
  71. if (!Pending_.empty()) {
  72. InProgress_.splice(InProgress_.end(), Pending_);
  73. }
  74. Y_ABORT_UNLESS(!InProgress_.empty());
  75. }
  76. THttpRawBatchRequest rawBatchRequest(Context_, ClientRetryPolicy_->CreatePolicyForGenericRequest());
  77. for (auto& item : InProgress_) {
  78. item->PrepareRequest(&rawBatchRequest);
  79. }
  80. try {
  81. rawBatchRequest.ExecuteBatch();
  82. } catch (const std::exception& ex) {
  83. YT_LOG_ERROR("Exception while executing batch request: %v", ex.what());
  84. }
  85. for (auto it = InProgress_.begin(); it != InProgress_.end();) {
  86. auto& item = *it;
  87. IYtPollerItem::EStatus status = item->OnRequestExecuted();
  88. if (status == IYtPollerItem::PollBreak) {
  89. it = InProgress_.erase(it);
  90. } else {
  91. ++it;
  92. }
  93. }
  94. IncDebugMetric(TStringBuf("yt_poller_top_loop_repeat_count"));
  95. }
  96. }
  97. void* TYtPoller::WatchLoopProc(void* data)
  98. {
  99. static_cast<TYtPoller*>(data)->WatchLoop();
  100. return nullptr;
  101. }
  102. ////////////////////////////////////////////////////////////////////////////////
  103. } // namespace NDetail
  104. } // namespace NYT