123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- #include "yt_poller.h"
- #include <yt/cpp/mapreduce/http_client/raw_batch_request.h>
- #include <yt/cpp/mapreduce/common/debug_metrics.h>
- #include <yt/cpp/mapreduce/common/retry_lib.h>
- #include <yt/cpp/mapreduce/common/wait_proxy.h>
- #include <yt/cpp/mapreduce/http/retry_request.h>
- #include <yt/cpp/mapreduce/interface/config.h>
- #include <yt/cpp/mapreduce/interface/raw_client.h>
- #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
- namespace NYT {
- namespace NDetail {
- using namespace NRawClient;
- ////////////////////////////////////////////////////////////////////////////////
- TYtPoller::TYtPoller(
- IRawClientPtr rawClient,
- const TConfigPtr& config,
- const IClientRetryPolicyPtr& retryPolicy)
- : RawClient_(std::move(rawClient))
- , Config_(config)
- , ClientRetryPolicy_(retryPolicy)
- , WaiterThread_(&TYtPoller::WatchLoopProc, this)
- {
- WaiterThread_.Start();
- }
- TYtPoller::~TYtPoller()
- {
- Stop();
- }
- void TYtPoller::Watch(IYtPollerItemPtr item)
- {
- auto g = Guard(Lock_);
- Pending_.emplace_back(std::move(item));
- HasData_.Signal();
- }
- void TYtPoller::Stop()
- {
- {
- auto g = Guard(Lock_);
- if (!IsRunning_) {
- return;
- }
- IsRunning_ = false;
- HasData_.Signal();
- }
- WaiterThread_.Join();
- }
- void TYtPoller::DiscardQueuedItems()
- {
- for (auto& item : Pending_) {
- item->OnItemDiscarded();
- }
- for (auto& item : InProgress_) {
- item->OnItemDiscarded();
- }
- }
- void TYtPoller::WatchLoop()
- {
- TInstant nextRequest = TInstant::Zero();
- while (true) {
- {
- auto g = Guard(Lock_);
- if (IsRunning_ && Pending_.empty() && InProgress_.empty()) {
- TWaitProxy::Get()->WaitCondVar(HasData_, Lock_);
- }
- if (!IsRunning_) {
- DiscardQueuedItems();
- return;
- }
- {
- auto ug = Unguard(Lock_); // allow adding new items into Pending_
- TWaitProxy::Get()->SleepUntil(nextRequest);
- nextRequest = TInstant::Now() + Config_->WaitLockPollInterval;
- }
- if (!Pending_.empty()) {
- InProgress_.splice(InProgress_.end(), Pending_);
- }
- Y_ABORT_UNLESS(!InProgress_.empty());
- }
- auto rawBatchRequest = RawClient_->CreateRawBatchRequest();
- for (auto& item : InProgress_) {
- item->PrepareRequest(rawBatchRequest.Get());
- }
- try {
- rawBatchRequest->ExecuteBatch();
- } catch (const std::exception& ex) {
- YT_LOG_ERROR("Exception while executing batch request: %v", ex.what());
- }
- for (auto it = InProgress_.begin(); it != InProgress_.end();) {
- auto& item = *it;
- IYtPollerItem::EStatus status = item->OnRequestExecuted();
- if (status == IYtPollerItem::PollBreak) {
- it = InProgress_.erase(it);
- } else {
- ++it;
- }
- }
- IncDebugMetric(TStringBuf("yt_poller_top_loop_repeat_count"));
- }
- }
- void* TYtPoller::WatchLoopProc(void* data)
- {
- static_cast<TYtPoller*>(data)->WatchLoop();
- return nullptr;
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NDetail
- } // namespace NYT
|