transaction_pinger.cpp 10 KB


  1. #include "transaction_pinger.h"
  2. #include "transaction.h"
  3. #include <yt/cpp/mapreduce/interface/config.h>
  4. #include <yt/cpp/mapreduce/interface/error_codes.h>
  5. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  6. #include <yt/cpp/mapreduce/interface/tvm.h>
  7. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  8. #include <yt/cpp/mapreduce/common/retry_lib.h>
  9. #include <yt/cpp/mapreduce/http/requests.h>
  10. #include <yt/cpp/mapreduce/http/retry_request.h>
  11. #include <yt/yt/core/concurrency/periodic_executor.h>
  12. #include <yt/yt/core/concurrency/poller.h>
  13. #include <yt/yt/core/concurrency/scheduler_api.h>
  14. #include <yt/yt/core/concurrency/thread_pool_poller.h>
  15. #include <yt/yt/core/concurrency/thread_pool.h>
  16. #include <yt/yt/core/http/client.h>
  17. #include <yt/yt/core/http/http.h>
  18. #include <library/cpp/yson/node/node_io.h>
  19. #include <library/cpp/yt/threading/spin_lock.h>
  20. #include <library/cpp/yt/assert/assert.h>
  21. #include <util/datetime/base.h>
  22. #include <util/random/random.h>
  23. namespace NYT {
  24. ////////////////////////////////////////////////////////////////////////////////
  25. namespace {
  26. ////////////////////////////////////////////////////////////////////////////////
  27. void CheckError(const TString& requestId, NHttp::IResponsePtr response)
  28. {
  29. if (const auto* ytError = response->GetHeaders()->Find("X-YT-Error")) {
  30. TYtError error;
  31. error.ParseFrom(*ytError);
  32. TErrorResponse errorResponse(std::move(error), requestId);
  33. if (errorResponse.IsOk()) {
  34. return;
  35. }
  36. YT_LOG_ERROR("RSP %v - HTTP %v - %v",
  37. requestId,
  38. response->GetStatusCode(),
  39. errorResponse.AsStrBuf());
  40. ythrow errorResponse;
  41. }
  42. }
  43. void PingTx(NHttp::IClientPtr httpClient, const TPingableTransaction& tx)
  44. {
  45. auto url = TString::Join("http://", tx.GetContext().ServerName, "/api/", tx.GetContext().Config->ApiVersion, "/ping_tx");
  46. auto headers = New<NHttp::THeaders>();
  47. auto requestId = CreateGuidAsString();
  48. headers->Add("Host", url);
  49. headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
  50. if (const auto& serviceTicketAuth = tx.GetContext().ServiceTicketAuth) {
  51. const auto serviceTicket = serviceTicketAuth->Ptr->IssueServiceTicket();
  52. headers->Add("X-Ya-Service-Ticket", serviceTicket);
  53. } else if (const auto& token = tx.GetContext().Token; !token.empty()) {
  54. headers->Add("Authorization", "OAuth " + token);
  55. }
  56. headers->Add("Transfer-Encoding", "chunked");
  57. headers->Add("X-YT-Correlation-Id", requestId);
  58. headers->Add("X-YT-Header-Format", "<format=text>yson");
  59. headers->Add("Content-Encoding", "identity");
  60. headers->Add("Accept-Encoding", "identity");
  61. TNode node;
  62. node["transaction_id"] = GetGuidAsString(tx.GetId());
  63. auto strParams = NodeToYsonString(node);
  64. YT_LOG_DEBUG("REQ %v - sending request (HostName: %v; Method POST %v; X-YT-Parameters (sent in body): %v)",
  65. requestId,
  66. tx.GetContext().ServerName,
  67. url,
  68. strParams
  69. );
  70. auto response = NConcurrency::WaitFor(httpClient->Post(url, TSharedRef::FromString(strParams), headers)).ValueOrThrow();
  71. CheckError(requestId, response);
  72. YT_LOG_DEBUG("RSP %v - received response %v bytes. (%v)",
  73. requestId,
  74. response->ReadAll().size(),
  75. strParams);
  76. }
  77. } // namespace
  78. ////////////////////////////////////////////////////////////////////////////////
  79. class TSharedTransactionPinger
  80. : public ITransactionPinger
  81. {
  82. public:
  83. TSharedTransactionPinger(NHttp::IClientPtr httpClient, int poolThreadCount)
  84. : PingerPool_(NConcurrency::CreateThreadPool(
  85. poolThreadCount, "tx_pinger_pool"))
  86. , HttpClient_(std::move(httpClient))
  87. { }
  88. ~TSharedTransactionPinger() override
  89. {
  90. PingerPool_->Shutdown();
  91. }
  92. ITransactionPingerPtr GetChildTxPinger() override
  93. {
  94. return this;
  95. }
  96. void RegisterTransaction(const TPingableTransaction& pingableTx) override
  97. {
  98. auto [minPingInterval, maxPingInterval] = pingableTx.GetPingInterval();
  99. auto pingInterval = (minPingInterval + maxPingInterval) / 2;
  100. double jitter = (maxPingInterval - pingInterval) / pingInterval;
  101. auto opts = NConcurrency::TPeriodicExecutorOptions{pingInterval, pingInterval, jitter};
  102. auto periodic = std::make_shared<NConcurrency::TPeriodicExecutorPtr>(nullptr);
  103. // Have to use weak_ptr in order to break reference cycle
  104. // This weak_ptr holds pointer to periodic, which will contain this lambda
  105. // Also we consider that lifetime of this lambda is no longer than lifetime of pingableTx
  106. // because every pingableTx have to call RemoveTransaction before it is destroyed
  107. auto pingRoutine = BIND([this, &pingableTx, periodic = std::weak_ptr{periodic}] {
  108. auto strong_ptr = periodic.lock();
  109. YT_VERIFY(strong_ptr);
  110. DoPingTransaction(pingableTx, *strong_ptr);
  111. });
  112. *periodic = New<NConcurrency::TPeriodicExecutor>(PingerPool_->GetInvoker(), pingRoutine, opts);
  113. (*periodic)->Start();
  114. auto guard = Guard(SpinLock_);
  115. YT_VERIFY(!Transactions_.contains(pingableTx.GetId()));
  116. Transactions_[pingableTx.GetId()] = std::move(periodic);
  117. }
  118. bool HasTransaction(const TPingableTransaction& pingableTx) override
  119. {
  120. auto guard = Guard(SpinLock_);
  121. return Transactions_.contains(pingableTx.GetId());
  122. }
  123. void RemoveTransaction(const TPingableTransaction& pingableTx) override
  124. {
  125. std::shared_ptr<NConcurrency::TPeriodicExecutorPtr> periodic;
  126. {
  127. auto guard = Guard(SpinLock_);
  128. auto it = Transactions_.find(pingableTx.GetId());
  129. YT_VERIFY(it != Transactions_.end());
  130. periodic = std::move(it->second);
  131. Transactions_.erase(it);
  132. }
  133. NConcurrency::WaitUntilSet((*periodic)->Stop());
  134. }
  135. private:
  136. void DoPingTransaction(const TPingableTransaction& pingableTx,
  137. NConcurrency::TPeriodicExecutorPtr periodic)
  138. {
  139. try {
  140. PingTx(HttpClient_, pingableTx);
  141. } catch (const std::exception& e) {
  142. if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) {
  143. if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
  144. YT_UNUSED_FUTURE(periodic->Stop());
  145. } else if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
  146. periodic->ScheduleOutOfBand();
  147. }
  148. }
  149. }
  150. }
  151. private:
  152. YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
  153. THashMap<TTransactionId, std::shared_ptr<NConcurrency::TPeriodicExecutorPtr>> Transactions_;
  154. NConcurrency::IThreadPoolPtr PingerPool_;
  155. NHttp::IClientPtr HttpClient_;
  156. };
  157. ////////////////////////////////////////////////////////////////////////////////
  158. class TThreadPerTransactionPinger
  159. : public ITransactionPinger
  160. {
  161. public:
  162. ~TThreadPerTransactionPinger() override
  163. {
  164. if (Running_) {
  165. RemoveTransaction(*PingableTx_);
  166. }
  167. }
  168. ITransactionPingerPtr GetChildTxPinger() override
  169. {
  170. return MakeIntrusive<TThreadPerTransactionPinger>();
  171. }
  172. void RegisterTransaction(const TPingableTransaction& pingableTx) override
  173. {
  174. YT_VERIFY(!Running_);
  175. YT_VERIFY(PingableTx_ == nullptr);
  176. PingableTx_ = &pingableTx;
  177. Running_ = true;
  178. PingerThread_ = std::make_unique<TThread>(
  179. TThread::TParams{Pinger, this}.SetName("pingable_tx"));
  180. PingerThread_->Start();
  181. }
  182. bool HasTransaction(const TPingableTransaction& pingableTx) override
  183. {
  184. return PingableTx_ == &pingableTx && Running_;
  185. }
  186. void RemoveTransaction(const TPingableTransaction& pingableTx) override
  187. {
  188. YT_VERIFY(HasTransaction(pingableTx));
  189. Running_ = false;
  190. if (PingerThread_) {
  191. PingerThread_->Join();
  192. }
  193. }
  194. private:
  195. static void* Pinger(void* opaque)
  196. {
  197. static_cast<TThreadPerTransactionPinger*>(opaque)->Pinger();
  198. return nullptr;
  199. }
  200. void Pinger()
  201. {
  202. auto [minPingInterval, maxPingInterval] = PingableTx_->GetPingInterval();
  203. while (Running_) {
  204. TDuration waitTime = minPingInterval + (maxPingInterval - minPingInterval) * RandomNumber<float>();
  205. try {
  206. PingableTx_->Ping();
  207. } catch (const std::exception& e) {
  208. if (auto* errorResponse = dynamic_cast<const TErrorResponse*>(&e)) {
  209. if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NTransactionClient::NoSuchTransaction)) {
  210. break;
  211. } else if (errorResponse->GetError().ContainsErrorCode(NYT::NClusterErrorCodes::Timeout)) {
  212. waitTime = TDuration::MilliSeconds(0);
  213. }
  214. }
  215. // Else do nothing, going to retry this error.
  216. }
  217. TInstant t = Now();
  218. while (Running_ && Now() - t < waitTime) {
  219. NDetail::TWaitProxy::Get()->Sleep(TDuration::MilliSeconds(100));
  220. }
  221. }
  222. }
  223. private:
  224. const TPingableTransaction* PingableTx_ = nullptr;
  225. std::atomic<bool> Running_ = false;
  226. std::unique_ptr<TThread> PingerThread_;
  227. };
  228. ////////////////////////////////////////////////////////////////////////////////
  229. ITransactionPingerPtr CreateTransactionPinger(const TConfigPtr& config)
  230. {
  231. YT_LOG_DEBUG("Using async transaction pinger");
  232. auto httpClientConfig = NYT::New<NHttp::TClientConfig>();
  233. httpClientConfig->MaxIdleConnections = 16;
  234. auto httpPoller = NConcurrency::CreateThreadPoolPoller(
  235. config->AsyncHttpClientThreads,
  236. "tx_http_client_poller");
  237. auto httpClient = NHttp::CreateClient(std::move(httpClientConfig), std::move(httpPoller));
  238. return MakeIntrusive<TSharedTransactionPinger>(
  239. std::move(httpClient),
  240. config->AsyncTxPingerPoolThreads);
  241. }
  242. ////////////////////////////////////////////////////////////////////////////////
  243. } // namespace NYT