transaction_pinger.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. #include "transaction_pinger.h"
  2. #include "transaction.h"
  3. #include <yt/cpp/mapreduce/interface/config.h>
  4. #include <yt/cpp/mapreduce/interface/error_codes.h>
  5. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  6. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  7. #include <yt/cpp/mapreduce/common/retry_lib.h>
  8. #include <yt/cpp/mapreduce/http/requests.h>
  9. #include <yt/cpp/mapreduce/http/retry_request.h>
  10. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  11. #if defined(__x86_64__) || defined(__arm64__)
  12. #include <yt/yt/core/concurrency/periodic_executor.h>
  13. #include <yt/yt/core/concurrency/poller.h>
  14. #include <yt/yt/core/concurrency/scheduler_api.h>
  15. #include <yt/yt/core/concurrency/thread_pool_poller.h>
  16. #include <yt/yt/core/concurrency/thread_pool.h>
  17. #include <yt/yt/core/http/client.h>
  18. #include <yt/yt/core/http/http.h>
  19. #endif // defined(__x86_64__) || defined(__arm64__)
  20. #include <library/cpp/yson/node/node_io.h>
  21. #include <library/cpp/yt/threading/spin_lock.h>
  22. #include <library/cpp/yt/assert/assert.h>
  23. #include <util/datetime/base.h>
  24. #include <util/random/random.h>
  25. namespace NYT {
  26. ////////////////////////////////////////////////////////////////////////////////
  27. #if defined(__x86_64__) || defined(__arm64__)
  28. namespace {
  29. ////////////////////////////////////////////////////////////////////////////////
  30. void CheckError(const TString& requestId, NHttp::IResponsePtr response)
  31. {
  32. TErrorResponse errorResponse(static_cast<int>(response->GetStatusCode()), requestId);
  33. if (const auto* ytError = response->GetHeaders()->Find("X-YT-Error")) {
  34. errorResponse.ParseFromJsonError(*ytError);
  35. }
  36. if (errorResponse.IsOk()) {
  37. return;
  38. }
  39. YT_LOG_ERROR("RSP %v - HTTP %v - %v",
  40. requestId,
  41. response->GetStatusCode(),
  42. errorResponse.AsStrBuf());
  43. ythrow errorResponse;
  44. ////////////////////////////////////////////////////////////////////////////////
  45. } // namespace
  46. void PingTx(NHttp::IClientPtr httpClient, const TPingableTransaction& tx)
  47. {
  48. auto url = TString::Join("http://", tx.GetContext().ServerName, "/api/", tx.GetContext().Config->ApiVersion, "/ping_tx");
  49. auto headers = New<NHttp::THeaders>();
  50. auto requestId = CreateGuidAsString();
  51. headers->Add("Host", url);
  52. headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
  53. const auto& token = tx.GetContext().Token;
  54. if (!token.empty()) {
  55. headers->Add("Authorization", "OAuth " + token);
  56. }
  57. headers->Add("Transfer-Encoding", "chunked");
  58. headers->Add("X-YT-Correlation-Id", requestId);
  59. headers->Add("X-YT-Header-Format", "<format=text>yson");
  60. headers->Add("Content-Encoding", "identity");
  61. headers->Add("Accept-Encoding", "identity");
  62. TNode node;
  63. node["transaction_id"] = GetGuidAsString(tx.GetId());
  64. auto strParams = NodeToYsonString(node);
  65. YT_LOG_DEBUG("REQ %v - sending request (HostName: %v; Method POST %v; X-YT-Parameters (sent in body): %v)",
  66. requestId,
  67. tx.GetContext().ServerName,
  68. url,
  69. strParams
  70. );
  71. auto response = NConcurrency::WaitFor(httpClient->Post(url, TSharedRef::FromString(strParams), headers)).ValueOrThrow();
  72. CheckError(requestId, response);
  73. YT_LOG_DEBUG("RSP %v - received response %v bytes. (%v)",
  74. requestId,
  75. response->ReadAll().size(),
  76. strParams);
  77. }
  78. } // namespace
  79. ////////////////////////////////////////////////////////////////////////////////
  80. class TSharedTransactionPinger
  81. : public ITransactionPinger
  82. {
  83. public:
  84. TSharedTransactionPinger(NHttp::IClientPtr httpClient, int poolThreadCount)
  85. : PingerPool_(NConcurrency::CreateThreadPool(
  86. poolThreadCount, "tx_pinger_pool"))
  87. , HttpClient_(std::move(httpClient))
  88. { }
  89. ~TSharedTransactionPinger() override
  90. {
  91. PingerPool_->Shutdown();
  92. }
  93. ITransactionPingerPtr GetChildTxPinger() override
  94. {
  95. return this;
  96. }
  97. void RegisterTransaction(const TPingableTransaction& pingableTx) override
  98. {
  99. auto [minPingInterval, maxPingInterval] = pingableTx.GetPingInterval();
  100. auto pingInterval = (minPingInterval + maxPingInterval) / 2;
  101. double jitter = (maxPingInterval - pingInterval) / pingInterval;
  102. auto opts = NConcurrency::TPeriodicExecutorOptions{pingInterval, pingInterval, jitter};
  103. auto periodic = std::make_shared<NConcurrency::TPeriodicExecutorPtr>(nullptr);
  104. // Have to use weak_ptr in order to break reference cycle
  105. // This weak_ptr holds pointer to periodic, which will contain this lambda
  106. // Also we consider that lifetime of this lambda is no longer than lifetime of pingableTx
  107. // because every pingableTx have to call RemoveTransaction before it is destroyed
  108. auto pingRoutine = BIND([this, &pingableTx, periodic = std::weak_ptr{periodic}] {
  109. auto strong_ptr = periodic.lock();
  110. YT_VERIFY(strong_ptr);
  111. DoPingTransaction(pingableTx, *strong_ptr);
  112. });
  113. *periodic = New<NConcurrency::TPeriodicExecutor>(PingerPool_->GetInvoker(), pingRoutine, opts);
  114. (*periodic)->Start();
  115. auto guard = Guard(SpinLock_);
  116. YT_VERIFY(!Transactions_.contains(pingableTx.GetId()));
  117. Transactions_[pingableTx.GetId()] = std::move(periodic);
  118. }
  119. bool HasTransaction(const TPingableTransaction& pingableTx) override
  120. {
  121. auto guard = Guard(SpinLock_);
  122. return Transactions_.contains(pingableTx.GetId());
  123. }
  124. void RemoveTransaction(const TPingableTransaction& pingableTx) override
  125. {
  126. std::shared_ptr<NConcurrency::TPeriodicExecutorPtr> periodic;
  127. {
  128. auto guard = Guard(SpinLock_);
  129. auto it = Transactions_.find(pingableTx.GetId());
  130. YT_VERIFY(it != Transactions_.end());
  131. periodic = std::move(it->second);
  132. Transactions_.erase(it);
  133. }
  134. NConcurrency::WaitUntilSet((*periodic)->Stop());
  135. }
  136. private:
  137. void DoPingTransaction(const TPingableTransaction& pingableTx,
  138. NConcurrency::TPeriodicExecutorPtr periodic)
  139. {
  140. try {
  141. PingTx(HttpClient_, pingableTx);
  142. } catch (const std::exception& e) {
  143. if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) {
  144. if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
  145. YT_UNUSED_FUTURE(periodic->Stop());
  146. } else if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
  147. periodic->ScheduleOutOfBand();
  148. }
  149. }
  150. }
  151. }
  152. private:
  153. YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
  154. THashMap<TTransactionId, std::shared_ptr<NConcurrency::TPeriodicExecutorPtr>> Transactions_;
  155. NConcurrency::IThreadPoolPtr PingerPool_;
  156. NHttp::IClientPtr HttpClient_;
  157. };
  158. #endif // defined(__x86_64__) || defined(__arm64__)
  159. ////////////////////////////////////////////////////////////////////////////////
  160. class TThreadPerTransactionPinger
  161. : public ITransactionPinger
  162. {
  163. public:
  164. ~TThreadPerTransactionPinger() override
  165. {
  166. if (Running_) {
  167. RemoveTransaction(*PingableTx_);
  168. }
  169. }
  170. ITransactionPingerPtr GetChildTxPinger() override
  171. {
  172. return MakeIntrusive<TThreadPerTransactionPinger>();
  173. }
  174. void RegisterTransaction(const TPingableTransaction& pingableTx) override
  175. {
  176. YT_VERIFY(!Running_);
  177. YT_VERIFY(PingableTx_ == nullptr);
  178. PingableTx_ = &pingableTx;
  179. Running_ = true;
  180. PingerThread_ = MakeHolder<TThread>(
  181. TThread::TParams{Pinger, this}.SetName("pingable_tx"));
  182. PingerThread_->Start();
  183. }
  184. bool HasTransaction(const TPingableTransaction& pingableTx) override
  185. {
  186. return PingableTx_ == &pingableTx && Running_;
  187. }
  188. void RemoveTransaction(const TPingableTransaction& pingableTx) override
  189. {
  190. YT_VERIFY(HasTransaction(pingableTx));
  191. Running_ = false;
  192. if (PingerThread_) {
  193. PingerThread_->Join();
  194. }
  195. }
  196. private:
  197. static void* Pinger(void* opaque)
  198. {
  199. static_cast<TThreadPerTransactionPinger*>(opaque)->Pinger();
  200. return nullptr;
  201. }
  202. void Pinger()
  203. {
  204. auto [minPingInterval, maxPingInterval] = PingableTx_->GetPingInterval();
  205. while (Running_) {
  206. TDuration waitTime = minPingInterval + (maxPingInterval - minPingInterval) * RandomNumber<float>();
  207. try {
  208. auto noRetryPolicy = MakeIntrusive<TAttemptLimitedRetryPolicy>(1u, PingableTx_->GetContext().Config);
  209. NDetail::NRawClient::PingTx(noRetryPolicy, PingableTx_->GetContext(), PingableTx_->GetId());
  210. } catch (const std::exception& e) {
  211. if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) {
  212. if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
  213. break;
  214. } else if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
  215. waitTime = TDuration::MilliSeconds(0);
  216. }
  217. }
  218. // Else do nothing, going to retry this error.
  219. }
  220. TInstant t = Now();
  221. while (Running_ && Now() - t < waitTime) {
  222. NDetail::TWaitProxy::Get()->Sleep(TDuration::MilliSeconds(100));
  223. }
  224. }
  225. }
  226. private:
  227. const TPingableTransaction* PingableTx_ = nullptr;
  228. std::atomic<bool> Running_ = false;
  229. THolder<TThread> PingerThread_;
  230. };
  231. ////////////////////////////////////////////////////////////////////////////////
  232. ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config)
  233. {
  234. if (config->UseAsyncTxPinger) {
  235. // TODO(aleexfi): Remove it after YT-17689
  236. #if defined(__x86_64__) || defined(__arm64__)
  237. YT_LOG_DEBUG("Using async transaction pinger");
  238. auto httpClientConfig = NYT::New<NHttp::TClientConfig>();
  239. httpClientConfig->MaxIdleConnections = 16;
  240. auto httpPoller = NConcurrency::CreateThreadPoolPoller(
  241. config->AsyncHttpClientThreads,
  242. "tx_http_client_poller");
  243. auto httpClient = NHttp::CreateClient(std::move(httpClientConfig), std::move(httpPoller));
  244. return MakeIntrusive<TSharedTransactionPinger>(
  245. std::move(httpClient),
  246. config->AsyncTxPingerPoolThreads);
  247. #else
  248. YT_LOG_WARNING("Async transaction pinger is not supported on your platform. Fallback to TThreadPerTransactionPinger...");
  249. #endif // defined(__x86_64__) || defined(__arm64__)
  250. }
  251. return MakeIntrusive<TThreadPerTransactionPinger>();
  252. }
  253. ////////////////////////////////////////////////////////////////////////////////
  254. } // namespace NYT