transaction_pinger.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. #include "transaction_pinger.h"
  2. #include "transaction.h"
  3. #include <yt/cpp/mapreduce/interface/config.h>
  4. #include <yt/cpp/mapreduce/interface/error_codes.h>
  5. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  6. #include <yt/cpp/mapreduce/interface/tvm.h>
  7. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  8. #include <yt/cpp/mapreduce/common/retry_lib.h>
  9. #include <yt/cpp/mapreduce/http/requests.h>
  10. #include <yt/cpp/mapreduce/http/retry_request.h>
  11. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  12. #include <yt/yt/core/concurrency/periodic_executor.h>
  13. #include <yt/yt/core/concurrency/poller.h>
  14. #include <yt/yt/core/concurrency/scheduler_api.h>
  15. #include <yt/yt/core/concurrency/thread_pool_poller.h>
  16. #include <yt/yt/core/concurrency/thread_pool.h>
  17. #include <yt/yt/core/http/client.h>
  18. #include <yt/yt/core/http/http.h>
  19. #include <library/cpp/yson/node/node_io.h>
  20. #include <library/cpp/yt/threading/spin_lock.h>
  21. #include <library/cpp/yt/assert/assert.h>
  22. #include <util/datetime/base.h>
  23. #include <util/random/random.h>
  24. namespace NYT {
  25. ////////////////////////////////////////////////////////////////////////////////
  26. namespace {
  27. ////////////////////////////////////////////////////////////////////////////////
  28. void CheckError(const TString& requestId, NHttp::IResponsePtr response)
  29. {
  30. TErrorResponse errorResponse(static_cast<int>(response->GetStatusCode()), requestId);
  31. if (const auto* ytError = response->GetHeaders()->Find("X-YT-Error")) {
  32. errorResponse.ParseFromJsonError(*ytError);
  33. }
  34. if (errorResponse.IsOk()) {
  35. return;
  36. }
  37. YT_LOG_ERROR("RSP %v - HTTP %v - %v",
  38. requestId,
  39. response->GetStatusCode(),
  40. errorResponse.AsStrBuf());
  41. ythrow errorResponse;
  42. ////////////////////////////////////////////////////////////////////////////////
  43. } // namespace
  44. void PingTx(NHttp::IClientPtr httpClient, const TPingableTransaction& tx)
  45. {
  46. auto url = TString::Join("http://", tx.GetContext().ServerName, "/api/", tx.GetContext().Config->ApiVersion, "/ping_tx");
  47. auto headers = New<NHttp::THeaders>();
  48. auto requestId = CreateGuidAsString();
  49. headers->Add("Host", url);
  50. headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
  51. if (const auto& serviceTicketAuth = tx.GetContext().ServiceTicketAuth) {
  52. const auto serviceTicket = serviceTicketAuth->Ptr->IssueServiceTicket();
  53. headers->Add("X-Ya-Service-Ticket", serviceTicket);
  54. } else if (const auto& token = tx.GetContext().Token; !token.empty()) {
  55. headers->Add("Authorization", "OAuth " + token);
  56. }
  57. headers->Add("Transfer-Encoding", "chunked");
  58. headers->Add("X-YT-Correlation-Id", requestId);
  59. headers->Add("X-YT-Header-Format", "<format=text>yson");
  60. headers->Add("Content-Encoding", "identity");
  61. headers->Add("Accept-Encoding", "identity");
  62. TNode node;
  63. node["transaction_id"] = GetGuidAsString(tx.GetId());
  64. auto strParams = NodeToYsonString(node);
  65. YT_LOG_DEBUG("REQ %v - sending request (HostName: %v; Method POST %v; X-YT-Parameters (sent in body): %v)",
  66. requestId,
  67. tx.GetContext().ServerName,
  68. url,
  69. strParams
  70. );
  71. auto response = NConcurrency::WaitFor(httpClient->Post(url, TSharedRef::FromString(strParams), headers)).ValueOrThrow();
  72. CheckError(requestId, response);
  73. YT_LOG_DEBUG("RSP %v - received response %v bytes. (%v)",
  74. requestId,
  75. response->ReadAll().size(),
  76. strParams);
  77. }
  78. } // namespace
  79. ////////////////////////////////////////////////////////////////////////////////
  80. class TSharedTransactionPinger
  81. : public ITransactionPinger
  82. {
  83. public:
  84. TSharedTransactionPinger(NHttp::IClientPtr httpClient, int poolThreadCount)
  85. : PingerPool_(NConcurrency::CreateThreadPool(
  86. poolThreadCount, "tx_pinger_pool"))
  87. , HttpClient_(std::move(httpClient))
  88. { }
  89. ~TSharedTransactionPinger() override
  90. {
  91. PingerPool_->Shutdown();
  92. }
  93. ITransactionPingerPtr GetChildTxPinger() override
  94. {
  95. return this;
  96. }
  97. void RegisterTransaction(const TPingableTransaction& pingableTx) override
  98. {
  99. auto [minPingInterval, maxPingInterval] = pingableTx.GetPingInterval();
  100. auto pingInterval = (minPingInterval + maxPingInterval) / 2;
  101. double jitter = (maxPingInterval - pingInterval) / pingInterval;
  102. auto opts = NConcurrency::TPeriodicExecutorOptions{pingInterval, pingInterval, jitter};
  103. auto periodic = std::make_shared<NConcurrency::TPeriodicExecutorPtr>(nullptr);
  104. // Have to use weak_ptr in order to break reference cycle
  105. // This weak_ptr holds pointer to periodic, which will contain this lambda
  106. // Also we consider that lifetime of this lambda is no longer than lifetime of pingableTx
  107. // because every pingableTx have to call RemoveTransaction before it is destroyed
  108. auto pingRoutine = BIND([this, &pingableTx, periodic = std::weak_ptr{periodic}] {
  109. auto strong_ptr = periodic.lock();
  110. YT_VERIFY(strong_ptr);
  111. DoPingTransaction(pingableTx, *strong_ptr);
  112. });
  113. *periodic = New<NConcurrency::TPeriodicExecutor>(PingerPool_->GetInvoker(), pingRoutine, opts);
  114. (*periodic)->Start();
  115. auto guard = Guard(SpinLock_);
  116. YT_VERIFY(!Transactions_.contains(pingableTx.GetId()));
  117. Transactions_[pingableTx.GetId()] = std::move(periodic);
  118. }
  119. bool HasTransaction(const TPingableTransaction& pingableTx) override
  120. {
  121. auto guard = Guard(SpinLock_);
  122. return Transactions_.contains(pingableTx.GetId());
  123. }
  124. void RemoveTransaction(const TPingableTransaction& pingableTx) override
  125. {
  126. std::shared_ptr<NConcurrency::TPeriodicExecutorPtr> periodic;
  127. {
  128. auto guard = Guard(SpinLock_);
  129. auto it = Transactions_.find(pingableTx.GetId());
  130. YT_VERIFY(it != Transactions_.end());
  131. periodic = std::move(it->second);
  132. Transactions_.erase(it);
  133. }
  134. NConcurrency::WaitUntilSet((*periodic)->Stop());
  135. }
  136. private:
  137. void DoPingTransaction(const TPingableTransaction& pingableTx,
  138. NConcurrency::TPeriodicExecutorPtr periodic)
  139. {
  140. try {
  141. PingTx(HttpClient_, pingableTx);
  142. } catch (const std::exception& e) {
  143. if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) {
  144. if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
  145. YT_UNUSED_FUTURE(periodic->Stop());
  146. } else if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
  147. periodic->ScheduleOutOfBand();
  148. }
  149. }
  150. }
  151. }
  152. private:
  153. YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
  154. THashMap<TTransactionId, std::shared_ptr<NConcurrency::TPeriodicExecutorPtr>> Transactions_;
  155. NConcurrency::IThreadPoolPtr PingerPool_;
  156. NHttp::IClientPtr HttpClient_;
  157. };
  158. ////////////////////////////////////////////////////////////////////////////////
  159. class TThreadPerTransactionPinger
  160. : public ITransactionPinger
  161. {
  162. public:
  163. ~TThreadPerTransactionPinger() override
  164. {
  165. if (Running_) {
  166. RemoveTransaction(*PingableTx_);
  167. }
  168. }
  169. ITransactionPingerPtr GetChildTxPinger() override
  170. {
  171. return MakeIntrusive<TThreadPerTransactionPinger>();
  172. }
  173. void RegisterTransaction(const TPingableTransaction& pingableTx) override
  174. {
  175. YT_VERIFY(!Running_);
  176. YT_VERIFY(PingableTx_ == nullptr);
  177. PingableTx_ = &pingableTx;
  178. Running_ = true;
  179. PingerThread_ = MakeHolder<TThread>(
  180. TThread::TParams{Pinger, this}.SetName("pingable_tx"));
  181. PingerThread_->Start();
  182. }
  183. bool HasTransaction(const TPingableTransaction& pingableTx) override
  184. {
  185. return PingableTx_ == &pingableTx && Running_;
  186. }
  187. void RemoveTransaction(const TPingableTransaction& pingableTx) override
  188. {
  189. YT_VERIFY(HasTransaction(pingableTx));
  190. Running_ = false;
  191. if (PingerThread_) {
  192. PingerThread_->Join();
  193. }
  194. }
  195. private:
  196. static void* Pinger(void* opaque)
  197. {
  198. static_cast<TThreadPerTransactionPinger*>(opaque)->Pinger();
  199. return nullptr;
  200. }
  201. void Pinger()
  202. {
  203. auto [minPingInterval, maxPingInterval] = PingableTx_->GetPingInterval();
  204. while (Running_) {
  205. TDuration waitTime = minPingInterval + (maxPingInterval - minPingInterval) * RandomNumber<float>();
  206. try {
  207. auto noRetryPolicy = MakeIntrusive<TAttemptLimitedRetryPolicy>(1u, PingableTx_->GetContext().Config);
  208. NDetail::NRawClient::PingTx(noRetryPolicy, PingableTx_->GetContext(), PingableTx_->GetId());
  209. } catch (const std::exception& e) {
  210. if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) {
  211. if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
  212. break;
  213. } else if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
  214. waitTime = TDuration::MilliSeconds(0);
  215. }
  216. }
  217. // Else do nothing, going to retry this error.
  218. }
  219. TInstant t = Now();
  220. while (Running_ && Now() - t < waitTime) {
  221. NDetail::TWaitProxy::Get()->Sleep(TDuration::MilliSeconds(100));
  222. }
  223. }
  224. }
  225. private:
  226. const TPingableTransaction* PingableTx_ = nullptr;
  227. std::atomic<bool> Running_ = false;
  228. THolder<TThread> PingerThread_;
  229. };
  230. ////////////////////////////////////////////////////////////////////////////////
  231. ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config)
  232. {
  233. YT_LOG_DEBUG("Using async transaction pinger");
  234. auto httpClientConfig = NYT::New<NHttp::TClientConfig>();
  235. httpClientConfig->MaxIdleConnections = 16;
  236. auto httpPoller = NConcurrency::CreateThreadPoolPoller(
  237. config->AsyncHttpClientThreads,
  238. "tx_http_client_poller");
  239. auto httpClient = NHttp::CreateClient(std::move(httpClientConfig), std::move(httpPoller));
  240. return MakeIntrusive<TSharedTransactionPinger>(
  241. std::move(httpClient),
  242. config->AsyncTxPingerPoolThreads);
  243. }
  244. ////////////////////////////////////////////////////////////////////////////////
  245. } // namespace NYT