http_client.cpp 18 KB


  1. #include "http_client.h"
  2. #include "abortable_http_response.h"
  3. #include "core.h"
  4. #include "helpers.h"
  5. #include "http.h"
  6. #include <yt/cpp/mapreduce/interface/config.h>
  7. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  8. #include <yt/yt/core/concurrency/thread_pool_poller.h>
  9. #include <yt/yt/core/http/client.h>
  10. #include <yt/yt/core/http/config.h>
  11. #include <yt/yt/core/http/http.h>
  12. #include <yt/yt/core/https/client.h>
  13. #include <yt/yt/core/https/config.h>
  14. #include <library/cpp/yson/node/node_io.h>
  15. namespace NYT::NHttpClient {
  16. namespace {
  17. TString CreateHost(TStringBuf host, TStringBuf port)
  18. {
  19. if (!port.empty()) {
  20. return Format("%v:%v", host, port);
  21. }
  22. return TString(host);
  23. }
  24. TMaybe<TErrorResponse> GetErrorResponse(const TString& hostName, const TString& requestId, const NHttp::IResponsePtr& response)
  25. {
  26. auto httpCode = response->GetStatusCode();
  27. if (httpCode == NHttp::EStatusCode::OK || httpCode == NHttp::EStatusCode::Accepted) {
  28. return {};
  29. }
  30. TErrorResponse errorResponse(static_cast<int>(httpCode), requestId);
  31. auto logAndSetError = [&] (const TString& rawError) {
  32. YT_LOG_ERROR("RSP %v - HTTP %v - %v",
  33. requestId,
  34. httpCode,
  35. rawError.data());
  36. errorResponse.SetRawError(rawError);
  37. };
  38. switch (httpCode) {
  39. case NHttp::EStatusCode::TooManyRequests:
  40. logAndSetError("request rate limit exceeded");
  41. break;
  42. case NHttp::EStatusCode::InternalServerError:
  43. logAndSetError("internal error in proxy " + hostName);
  44. break;
  45. default: {
  46. TStringStream httpHeaders;
  47. httpHeaders << "HTTP headers (";
  48. for (const auto& [headerName, headerValue] : response->GetHeaders()->Dump()) {
  49. httpHeaders << headerName << ": " << headerValue << "; ";
  50. }
  51. httpHeaders << ")";
  52. auto errorString = Sprintf("RSP %s - HTTP %d - %s",
  53. requestId.data(),
  54. static_cast<int>(httpCode),
  55. httpHeaders.Str().data());
  56. YT_LOG_ERROR("%v",
  57. errorString.data());
  58. if (auto errorHeader = response->GetHeaders()->Find("X-YT-Error")) {
  59. errorResponse.ParseFromJsonError(*errorHeader);
  60. if (errorResponse.IsOk()) {
  61. return Nothing();
  62. }
  63. return errorResponse;
  64. }
  65. errorResponse.SetRawError(
  66. errorString + " - X-YT-Error is missing in headers");
  67. break;
  68. }
  69. }
  70. return errorResponse;
  71. }
  72. void CheckErrorResponse(const TString& hostName, const TString& requestId, const NHttp::IResponsePtr& response)
  73. {
  74. auto errorResponse = GetErrorResponse(hostName, requestId, response);
  75. if (errorResponse) {
  76. throw *errorResponse;
  77. }
  78. }
  79. } // namespace
  80. ////////////////////////////////////////////////////////////////////////////////
  81. class TDefaultHttpResponse
  82. : public IHttpResponse
  83. {
  84. public:
  85. TDefaultHttpResponse(std::unique_ptr<THttpRequest> request)
  86. : Request_(std::move(request))
  87. { }
  88. int GetStatusCode() override
  89. {
  90. return Request_->GetHttpCode();
  91. }
  92. IInputStream* GetResponseStream() override
  93. {
  94. return Request_->GetResponseStream();
  95. }
  96. TString GetResponse() override
  97. {
  98. return Request_->GetResponse();
  99. }
  100. TString GetRequestId() const override
  101. {
  102. return Request_->GetRequestId();
  103. }
  104. private:
  105. std::unique_ptr<THttpRequest> Request_;
  106. };
  107. class TDefaultHttpRequest
  108. : public IHttpRequest
  109. {
  110. public:
  111. TDefaultHttpRequest(std::unique_ptr<THttpRequest> request, IOutputStream* stream)
  112. : Request_(std::move(request))
  113. , Stream_(stream)
  114. { }
  115. IOutputStream* GetStream() override
  116. {
  117. return Stream_;
  118. }
  119. IHttpResponsePtr Finish() override
  120. {
  121. Request_->FinishRequest();
  122. return std::make_unique<TDefaultHttpResponse>(std::move(Request_));
  123. }
  124. private:
  125. std::unique_ptr<THttpRequest> Request_;
  126. IOutputStream* Stream_;
  127. };
  128. class TDefaultHttpClient
  129. : public IHttpClient
  130. {
  131. public:
  132. IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header, TMaybe<TStringBuf> body) override
  133. {
  134. auto request = std::make_unique<THttpRequest>(requestId);
  135. auto urlRef = NHttp::ParseUrl(url);
  136. request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
  137. request->SmallRequest(header, body);
  138. return std::make_unique<TDefaultHttpResponse>(std::move(request));
  139. }
  140. IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& config, const THttpHeader& header) override
  141. {
  142. auto request = std::make_unique<THttpRequest>(requestId);
  143. auto urlRef = NHttp::ParseUrl(url);
  144. request->Connect(CreateHost(urlRef.Host, urlRef.PortStr), config.SocketTimeout);
  145. auto stream = request->StartRequest(header);
  146. return std::make_unique<TDefaultHttpRequest>(std::move(request), stream);
  147. }
  148. };
  149. ////////////////////////////////////////////////////////////////////////////////
  150. struct TCoreRequestContext
  151. {
  152. TString HostName;
  153. TString Url;
  154. TString RequestId;
  155. bool LogResponse;
  156. TInstant StartTime;
  157. TString LoggedAttributes;
  158. };
  159. class TCoreHttpResponse
  160. : public IHttpResponse
  161. {
  162. public:
  163. TCoreHttpResponse(
  164. TCoreRequestContext context,
  165. NHttp::IResponsePtr response)
  166. : Context_(std::move(context))
  167. , Response_(std::move(response))
  168. { }
  169. int GetStatusCode() override
  170. {
  171. return static_cast<int>(Response_->GetStatusCode());
  172. }
  173. IInputStream* GetResponseStream() override
  174. {
  175. if (!Stream_) {
  176. auto stream = std::make_unique<TWrappedStream>(
  177. NConcurrency::CreateSyncAdapter(NConcurrency::CreateCopyingAdapter(Response_), NConcurrency::EWaitForStrategy::WaitFor),
  178. Response_,
  179. Context_.RequestId);
  180. CheckErrorResponse(Context_.HostName, Context_.RequestId, Response_);
  181. if (TConfig::Get()->UseAbortableResponse) {
  182. Y_ABORT_UNLESS(!Context_.Url.empty());
  183. Stream_ = std::make_unique<TAbortableCoreHttpResponse>(std::move(stream), Context_.Url);
  184. } else {
  185. Stream_ = std::move(stream);
  186. }
  187. }
  188. return Stream_.get();
  189. }
  190. TString GetResponse() override
  191. {
  192. auto result = GetResponseStream()->ReadAll();
  193. TStringStream loggedAttributes;
  194. loggedAttributes
  195. << "Time: " << TInstant::Now() - Context_.StartTime << "; "
  196. << "HostName: " << Context_.HostName << "; "
  197. << Context_.LoggedAttributes;
  198. if (Context_.LogResponse) {
  199. constexpr auto sizeLimit = 1 << 7;
  200. YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
  201. Context_.RequestId,
  202. TruncateForLogs(result, sizeLimit),
  203. loggedAttributes.Str());
  204. } else {
  205. YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
  206. Context_.RequestId,
  207. result.size(),
  208. loggedAttributes.Str());
  209. }
  210. return result;
  211. }
  212. TString GetRequestId() const override
  213. {
  214. return Context_.RequestId;
  215. }
  216. private:
  217. class TWrappedStream
  218. : public IInputStream
  219. {
  220. public:
  221. TWrappedStream(std::unique_ptr<IInputStream> underlying, NHttp::IResponsePtr response, TString requestId)
  222. : Underlying_(std::move(underlying))
  223. , Response_(std::move(response))
  224. , RequestId_(std::move(requestId))
  225. { }
  226. protected:
  227. size_t DoRead(void* buf, size_t len) override
  228. {
  229. size_t read = Underlying_->Read(buf, len);
  230. if (read == 0 && len != 0) {
  231. CheckTrailers(Response_->GetTrailers());
  232. }
  233. return read;
  234. }
  235. size_t DoSkip(size_t len) override
  236. {
  237. size_t skipped = Underlying_->Skip(len);
  238. if (skipped == 0 && len != 0) {
  239. CheckTrailers(Response_->GetTrailers());
  240. }
  241. return skipped;
  242. }
  243. private:
  244. void CheckTrailers(const NHttp::THeadersPtr& trailers)
  245. {
  246. if (auto errorResponse = ParseError(trailers)) {
  247. errorResponse->SetIsFromTrailers(true);
  248. YT_LOG_ERROR("RSP %v - %v",
  249. RequestId_,
  250. errorResponse.GetRef().what());
  251. ythrow errorResponse.GetRef();
  252. }
  253. }
  254. TMaybe<TErrorResponse> ParseError(const NHttp::THeadersPtr& headers)
  255. {
  256. if (auto errorHeader = headers->Find("X-YT-Error")) {
  257. TErrorResponse errorResponse(static_cast<int>(Response_->GetStatusCode()), RequestId_);
  258. errorResponse.ParseFromJsonError(*errorHeader);
  259. if (errorResponse.IsOk()) {
  260. return Nothing();
  261. }
  262. return errorResponse;
  263. }
  264. return Nothing();
  265. }
  266. private:
  267. std::unique_ptr<IInputStream> Underlying_;
  268. NHttp::IResponsePtr Response_;
  269. TString RequestId_;
  270. };
  271. private:
  272. TCoreRequestContext Context_;
  273. NHttp::IResponsePtr Response_;
  274. std::unique_ptr<IInputStream> Stream_;
  275. };
  276. class TCoreHttpRequest
  277. : public IHttpRequest
  278. {
  279. public:
  280. TCoreHttpRequest(TCoreRequestContext context, NHttp::IActiveRequestPtr activeRequest)
  281. : Context_(std::move(context))
  282. , ActiveRequest_(std::move(activeRequest))
  283. , Stream_(NConcurrency::CreateBufferedSyncAdapter(ActiveRequest_->GetRequestStream()))
  284. , WrappedStream_(this, Stream_.get())
  285. { }
  286. IOutputStream* GetStream() override
  287. {
  288. return &WrappedStream_;
  289. }
  290. IHttpResponsePtr Finish() override
  291. {
  292. WrappedStream_.Flush();
  293. auto response = ActiveRequest_->Finish().Get().ValueOrThrow();
  294. return std::make_unique<TCoreHttpResponse>(std::move(Context_), std::move(response));
  295. }
  296. IHttpResponsePtr FinishWithError()
  297. {
  298. auto response = ActiveRequest_->GetResponse();
  299. return std::make_unique<TCoreHttpResponse>(std::move(Context_), std::move(response));
  300. }
  301. private:
  302. class TWrappedStream
  303. : public IOutputStream
  304. {
  305. public:
  306. TWrappedStream(TCoreHttpRequest* httpRequest, IOutputStream* underlying)
  307. : HttpRequest_(httpRequest)
  308. , Underlying_(underlying)
  309. { }
  310. private:
  311. void DoWrite(const void* buf, size_t len) override
  312. {
  313. WrapWriteFunc([&] {
  314. Underlying_->Write(buf, len);
  315. });
  316. }
  317. void DoWriteV(const TPart* parts, size_t count) override
  318. {
  319. WrapWriteFunc([&] {
  320. Underlying_->Write(parts, count);
  321. });
  322. }
  323. void DoWriteC(char ch) override
  324. {
  325. WrapWriteFunc([&] {
  326. Underlying_->Write(ch);
  327. });
  328. }
  329. void DoFlush() override
  330. {
  331. WrapWriteFunc([&] {
  332. Underlying_->Flush();
  333. });
  334. }
  335. void DoFinish() override
  336. {
  337. WrapWriteFunc([&] {
  338. Underlying_->Finish();
  339. });
  340. }
  341. void WrapWriteFunc(std::function<void()> func)
  342. {
  343. CheckErrorState();
  344. try {
  345. func();
  346. } catch (const std::exception&) {
  347. HandleWriteException();
  348. }
  349. }
  350. // In many cases http proxy stops reading request and resets connection
  351. // if error has happend. This function tries to read error response
  352. // in such cases.
  353. void HandleWriteException() {
  354. Y_ABORT_UNLESS(WriteError_ == nullptr);
  355. WriteError_ = std::current_exception();
  356. Y_ABORT_UNLESS(WriteError_ != nullptr);
  357. try {
  358. HttpRequest_->FinishWithError()->GetResponseStream();
  359. } catch (const TErrorResponse &) {
  360. throw;
  361. } catch (...) {
  362. }
  363. std::rethrow_exception(WriteError_);
  364. }
  365. void CheckErrorState()
  366. {
  367. if (WriteError_) {
  368. std::rethrow_exception(WriteError_);
  369. }
  370. }
  371. private:
  372. TCoreHttpRequest* const HttpRequest_;
  373. IOutputStream* Underlying_;
  374. std::exception_ptr WriteError_;
  375. };
  376. private:
  377. TCoreRequestContext Context_;
  378. NHttp::IActiveRequestPtr ActiveRequest_;
  379. std::unique_ptr<IOutputStream> Stream_;
  380. TWrappedStream WrappedStream_;
  381. };
  382. class TCoreHttpClient
  383. : public IHttpClient
  384. {
  385. public:
  386. TCoreHttpClient(bool useTLS, const TConfigPtr& config)
  387. : Poller_(NConcurrency::CreateThreadPoolPoller(1, "http_poller")) // TODO(nadya73): YT-18363: move threads count to config
  388. {
  389. if (useTLS) {
  390. auto httpsConfig = NYT::New<NYT::NHttps::TClientConfig>();
  391. httpsConfig->MaxIdleConnections = config->ConnectionPoolSize;
  392. Client_ = NHttps::CreateClient(httpsConfig, Poller_);
  393. } else {
  394. auto httpConfig = NYT::New<NYT::NHttp::TClientConfig>();
  395. httpConfig->MaxIdleConnections = config->ConnectionPoolSize;
  396. Client_ = NHttp::CreateClient(httpConfig, Poller_);
  397. }
  398. }
  399. IHttpResponsePtr Request(const TString& url, const TString& requestId, const THttpConfig& /*config*/, const THttpHeader& header, TMaybe<TStringBuf> body) override
  400. {
  401. TCoreRequestContext context = CreateContext(url, requestId, header);
  402. // TODO(nadya73): YT-18363: pass socket timeouts from THttpConfig
  403. NHttp::IResponsePtr response;
  404. auto logRequest = [&](bool includeParameters) {
  405. LogRequest(header, url, includeParameters, requestId, context.HostName);
  406. context.LoggedAttributes = GetLoggedAttributes(header, url, includeParameters, 128);
  407. };
  408. if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) {
  409. const auto& parameters = header.GetParameters();
  410. auto parametersStr = NodeToYsonString(parameters);
  411. bool includeParameters = false;
  412. auto headers = header.GetHeader(context.HostName, requestId, includeParameters).Get();
  413. logRequest(includeParameters);
  414. auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers);
  415. activeRequest->GetRequestStream()->Write(TSharedRef::FromString(parametersStr)).Get().ThrowOnError();
  416. response = activeRequest->Finish().Get().ValueOrThrow();
  417. } else {
  418. auto bodyRef = TSharedRef::FromString(TString(body ? *body : ""));
  419. bool includeParameters = true;
  420. auto headers = header.GetHeader(context.HostName, requestId, includeParameters).Get();
  421. logRequest(includeParameters);
  422. if (header.GetMethod() == "GET") {
  423. response = RequestImpl(header.GetMethod(), url, headers, bodyRef);
  424. } else {
  425. auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers);
  426. auto request = std::make_unique<TCoreHttpRequest>(std::move(context), std::move(activeRequest));
  427. if (body) {
  428. request->GetStream()->Write(*body);
  429. }
  430. return request->Finish();
  431. }
  432. }
  433. return std::make_unique<TCoreHttpResponse>(std::move(context), std::move(response));
  434. }
  435. IHttpRequestPtr StartRequest(const TString& url, const TString& requestId, const THttpConfig& /*config*/, const THttpHeader& header) override
  436. {
  437. TCoreRequestContext context = CreateContext(url, requestId, header);
  438. LogRequest(header, url, true, requestId, context.HostName);
  439. context.LoggedAttributes = GetLoggedAttributes(header, url, true, 128);
  440. auto headers = header.GetHeader(context.HostName, requestId, true).Get();
  441. auto activeRequest = StartRequestImpl(header.GetMethod(), url, headers);
  442. return std::make_unique<TCoreHttpRequest>(std::move(context), std::move(activeRequest));
  443. }
  444. private:
  445. TCoreRequestContext CreateContext(const TString& url, const TString& requestId, const THttpHeader& header)
  446. {
  447. TCoreRequestContext context;
  448. context.Url = url;
  449. context.RequestId = requestId;
  450. auto urlRef = NHttp::ParseUrl(url);
  451. context.HostName = CreateHost(urlRef.Host, urlRef.PortStr);
  452. context.LogResponse = false;
  453. auto outputFormat = header.GetOutputFormat();
  454. if (outputFormat && outputFormat->IsTextYson()) {
  455. context.LogResponse = true;
  456. }
  457. context.StartTime = TInstant::Now();
  458. return context;
  459. }
  460. NHttp::IResponsePtr RequestImpl(const TString& method, const TString& url, const NHttp::THeadersPtr& headers, const TSharedRef& body)
  461. {
  462. if (method == "GET") {
  463. return Client_->Get(url, headers).Get().ValueOrThrow();
  464. } else if (method == "POST") {
  465. return Client_->Post(url, body, headers).Get().ValueOrThrow();
  466. } else if (method == "PUT") {
  467. return Client_->Put(url, body, headers).Get().ValueOrThrow();
  468. } else {
  469. YT_LOG_FATAL("Unsupported http method (Method: %v, Url: %v)",
  470. method,
  471. url);
  472. }
  473. }
  474. NHttp::IActiveRequestPtr StartRequestImpl(const TString& method, const TString& url, const NHttp::THeadersPtr& headers)
  475. {
  476. if (method == "POST") {
  477. return Client_->StartPost(url, headers).Get().ValueOrThrow();
  478. } else if (method == "PUT") {
  479. return Client_->StartPut(url, headers).Get().ValueOrThrow();
  480. } else {
  481. YT_LOG_FATAL("Unsupported http method (Method: %v, Url: %v)",
  482. method,
  483. url);
  484. }
  485. }
  486. NConcurrency::IThreadPoolPollerPtr Poller_;
  487. NHttp::IClientPtr Client_;
  488. };
  489. ////////////////////////////////////////////////////////////////////////////////
  490. IHttpClientPtr CreateDefaultHttpClient()
  491. {
  492. return std::make_shared<TDefaultHttpClient>();
  493. }
  494. IHttpClientPtr CreateCoreHttpClient(bool useTLS, const TConfigPtr& config)
  495. {
  496. return std::make_shared<TCoreHttpClient>(useTLS, config);
  497. }
  498. ////////////////////////////////////////////////////////////////////////////////
  499. } // namespace NYT::NHttpClient