transaction_pinger.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. #include "transaction_pinger.h"
  2. #include "transaction.h"
  3. #include <yt/cpp/mapreduce/interface/config.h>
  4. #include <yt/cpp/mapreduce/interface/error_codes.h>
  5. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  6. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  7. #include <yt/cpp/mapreduce/common/retry_lib.h>
  8. #include <yt/cpp/mapreduce/http/requests.h>
  9. #include <yt/cpp/mapreduce/http/retry_request.h>
  10. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  11. #include <yt/yt/core/concurrency/periodic_executor.h>
  12. #include <yt/yt/core/concurrency/poller.h>
  13. #include <yt/yt/core/concurrency/scheduler_api.h>
  14. #include <yt/yt/core/concurrency/thread_pool_poller.h>
  15. #include <yt/yt/core/concurrency/thread_pool.h>
  16. #include <yt/yt/core/http/client.h>
  17. #include <yt/yt/core/http/http.h>
  18. #include <library/cpp/yson/node/node_io.h>
  19. #include <library/cpp/yt/threading/spin_lock.h>
  20. #include <library/cpp/yt/assert/assert.h>
  21. #include <util/datetime/base.h>
  22. #include <util/random/random.h>
  23. namespace NYT {
  24. ////////////////////////////////////////////////////////////////////////////////
  25. namespace {
  26. ////////////////////////////////////////////////////////////////////////////////
  27. void CheckError(const TString& requestId, NHttp::IResponsePtr response)
  28. {
  29. TErrorResponse errorResponse(static_cast<int>(response->GetStatusCode()), requestId);
  30. if (const auto* ytError = response->GetHeaders()->Find("X-YT-Error")) {
  31. errorResponse.ParseFromJsonError(*ytError);
  32. }
  33. if (errorResponse.IsOk()) {
  34. return;
  35. }
  36. YT_LOG_ERROR("RSP %v - HTTP %v - %v",
  37. requestId,
  38. response->GetStatusCode(),
  39. errorResponse.AsStrBuf());
  40. ythrow errorResponse;
  41. ////////////////////////////////////////////////////////////////////////////////
  42. } // namespace
  43. void PingTx(NHttp::IClientPtr httpClient, const TPingableTransaction& tx)
  44. {
  45. auto url = TString::Join("http://", tx.GetContext().ServerName, "/api/", tx.GetContext().Config->ApiVersion, "/ping_tx");
  46. auto headers = New<NHttp::THeaders>();
  47. auto requestId = CreateGuidAsString();
  48. headers->Add("Host", url);
  49. headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
  50. const auto& token = tx.GetContext().Token;
  51. if (!token.empty()) {
  52. headers->Add("Authorization", "OAuth " + token);
  53. }
  54. headers->Add("Transfer-Encoding", "chunked");
  55. headers->Add("X-YT-Correlation-Id", requestId);
  56. headers->Add("X-YT-Header-Format", "<format=text>yson");
  57. headers->Add("Content-Encoding", "identity");
  58. headers->Add("Accept-Encoding", "identity");
  59. TNode node;
  60. node["transaction_id"] = GetGuidAsString(tx.GetId());
  61. auto strParams = NodeToYsonString(node);
  62. YT_LOG_DEBUG("REQ %v - sending request (HostName: %v; Method POST %v; X-YT-Parameters (sent in body): %v)",
  63. requestId,
  64. tx.GetContext().ServerName,
  65. url,
  66. strParams
  67. );
  68. auto response = NConcurrency::WaitFor(httpClient->Post(url, TSharedRef::FromString(strParams), headers)).ValueOrThrow();
  69. CheckError(requestId, response);
  70. YT_LOG_DEBUG("RSP %v - received response %v bytes. (%v)",
  71. requestId,
  72. response->ReadAll().size(),
  73. strParams);
  74. }
  75. } // namespace
  76. ////////////////////////////////////////////////////////////////////////////////
  77. class TSharedTransactionPinger
  78. : public ITransactionPinger
  79. {
  80. public:
  81. TSharedTransactionPinger(NHttp::IClientPtr httpClient, int poolThreadCount)
  82. : PingerPool_(NConcurrency::CreateThreadPool(
  83. poolThreadCount, "tx_pinger_pool"))
  84. , HttpClient_(std::move(httpClient))
  85. { }
  86. ~TSharedTransactionPinger() override
  87. {
  88. PingerPool_->Shutdown();
  89. }
  90. ITransactionPingerPtr GetChildTxPinger() override
  91. {
  92. return this;
  93. }
  94. void RegisterTransaction(const TPingableTransaction& pingableTx) override
  95. {
  96. auto [minPingInterval, maxPingInterval] = pingableTx.GetPingInterval();
  97. auto pingInterval = (minPingInterval + maxPingInterval) / 2;
  98. double jitter = (maxPingInterval - pingInterval) / pingInterval;
  99. auto opts = NConcurrency::TPeriodicExecutorOptions{pingInterval, pingInterval, jitter};
  100. auto periodic = std::make_shared<NConcurrency::TPeriodicExecutorPtr>(nullptr);
  101. // Have to use weak_ptr in order to break reference cycle
  102. // This weak_ptr holds pointer to periodic, which will contain this lambda
  103. // Also we consider that lifetime of this lambda is no longer than lifetime of pingableTx
  104. // because every pingableTx have to call RemoveTransaction before it is destroyed
  105. auto pingRoutine = BIND([this, &pingableTx, periodic = std::weak_ptr{periodic}] {
  106. auto strong_ptr = periodic.lock();
  107. YT_VERIFY(strong_ptr);
  108. DoPingTransaction(pingableTx, *strong_ptr);
  109. });
  110. *periodic = New<NConcurrency::TPeriodicExecutor>(PingerPool_->GetInvoker(), pingRoutine, opts);
  111. (*periodic)->Start();
  112. auto guard = Guard(SpinLock_);
  113. YT_VERIFY(!Transactions_.contains(pingableTx.GetId()));
  114. Transactions_[pingableTx.GetId()] = std::move(periodic);
  115. }
  116. bool HasTransaction(const TPingableTransaction& pingableTx) override
  117. {
  118. auto guard = Guard(SpinLock_);
  119. return Transactions_.contains(pingableTx.GetId());
  120. }
  121. void RemoveTransaction(const TPingableTransaction& pingableTx) override
  122. {
  123. std::shared_ptr<NConcurrency::TPeriodicExecutorPtr> periodic;
  124. {
  125. auto guard = Guard(SpinLock_);
  126. auto it = Transactions_.find(pingableTx.GetId());
  127. YT_VERIFY(it != Transactions_.end());
  128. periodic = std::move(it->second);
  129. Transactions_.erase(it);
  130. }
  131. NConcurrency::WaitUntilSet((*periodic)->Stop());
  132. }
  133. private:
  134. void DoPingTransaction(const TPingableTransaction& pingableTx,
  135. NConcurrency::TPeriodicExecutorPtr periodic)
  136. {
  137. try {
  138. PingTx(HttpClient_, pingableTx);
  139. } catch (const std::exception& e) {
  140. if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) {
  141. if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
  142. YT_UNUSED_FUTURE(periodic->Stop());
  143. } else if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
  144. periodic->ScheduleOutOfBand();
  145. }
  146. }
  147. }
  148. }
  149. private:
  150. YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
  151. THashMap<TTransactionId, std::shared_ptr<NConcurrency::TPeriodicExecutorPtr>> Transactions_;
  152. NConcurrency::IThreadPoolPtr PingerPool_;
  153. NHttp::IClientPtr HttpClient_;
  154. };
  155. ////////////////////////////////////////////////////////////////////////////////
  156. class TThreadPerTransactionPinger
  157. : public ITransactionPinger
  158. {
  159. public:
  160. ~TThreadPerTransactionPinger() override
  161. {
  162. if (Running_) {
  163. RemoveTransaction(*PingableTx_);
  164. }
  165. }
  166. ITransactionPingerPtr GetChildTxPinger() override
  167. {
  168. return MakeIntrusive<TThreadPerTransactionPinger>();
  169. }
  170. void RegisterTransaction(const TPingableTransaction& pingableTx) override
  171. {
  172. YT_VERIFY(!Running_);
  173. YT_VERIFY(PingableTx_ == nullptr);
  174. PingableTx_ = &pingableTx;
  175. Running_ = true;
  176. PingerThread_ = MakeHolder<TThread>(
  177. TThread::TParams{Pinger, this}.SetName("pingable_tx"));
  178. PingerThread_->Start();
  179. }
  180. bool HasTransaction(const TPingableTransaction& pingableTx) override
  181. {
  182. return PingableTx_ == &pingableTx && Running_;
  183. }
  184. void RemoveTransaction(const TPingableTransaction& pingableTx) override
  185. {
  186. YT_VERIFY(HasTransaction(pingableTx));
  187. Running_ = false;
  188. if (PingerThread_) {
  189. PingerThread_->Join();
  190. }
  191. }
  192. private:
  193. static void* Pinger(void* opaque)
  194. {
  195. static_cast<TThreadPerTransactionPinger*>(opaque)->Pinger();
  196. return nullptr;
  197. }
  198. void Pinger()
  199. {
  200. auto [minPingInterval, maxPingInterval] = PingableTx_->GetPingInterval();
  201. while (Running_) {
  202. TDuration waitTime = minPingInterval + (maxPingInterval - minPingInterval) * RandomNumber<float>();
  203. try {
  204. auto noRetryPolicy = MakeIntrusive<TAttemptLimitedRetryPolicy>(1u, PingableTx_->GetContext().Config);
  205. NDetail::NRawClient::PingTx(noRetryPolicy, PingableTx_->GetContext(), PingableTx_->GetId());
  206. } catch (const std::exception& e) {
  207. if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) {
  208. if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
  209. break;
  210. } else if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
  211. waitTime = TDuration::MilliSeconds(0);
  212. }
  213. }
  214. // Else do nothing, going to retry this error.
  215. }
  216. TInstant t = Now();
  217. while (Running_ && Now() - t < waitTime) {
  218. NDetail::TWaitProxy::Get()->Sleep(TDuration::MilliSeconds(100));
  219. }
  220. }
  221. }
  222. private:
  223. const TPingableTransaction* PingableTx_ = nullptr;
  224. std::atomic<bool> Running_ = false;
  225. THolder<TThread> PingerThread_;
  226. };
  227. ////////////////////////////////////////////////////////////////////////////////
  228. ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config)
  229. {
  230. if (config->UseAsyncTxPinger) {
  231. YT_LOG_DEBUG("Using async transaction pinger");
  232. auto httpClientConfig = NYT::New<NHttp::TClientConfig>();
  233. httpClientConfig->MaxIdleConnections = 16;
  234. auto httpPoller = NConcurrency::CreateThreadPoolPoller(
  235. config->AsyncHttpClientThreads,
  236. "tx_http_client_poller");
  237. auto httpClient = NHttp::CreateClient(std::move(httpClientConfig), std::move(httpPoller));
  238. return MakeIntrusive<TSharedTransactionPinger>(
  239. std::move(httpClient),
  240. config->AsyncTxPingerPoolThreads);
  241. } else {
  242. return MakeIntrusive<TThreadPerTransactionPinger>();
  243. }
  244. }
  245. ////////////////////////////////////////////////////////////////////////////////
  246. } // namespace NYT