http.cpp 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033
  1. #include "http.h"
  2. #include "abortable_http_response.h"
  3. #include "core.h"
  4. #include "helpers.h"
  5. #include <yt/cpp/mapreduce/common/helpers.h>
  6. #include <yt/cpp/mapreduce/common/retry_lib.h>
  7. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  8. #include <yt/cpp/mapreduce/interface/config.h>
  9. #include <yt/cpp/mapreduce/interface/errors.h>
  10. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  11. #include <yt/yt/core/http/http.h>
  12. #include <library/cpp/json/json_writer.h>
  13. #include <library/cpp/string_utils/base64/base64.h>
  14. #include <library/cpp/string_utils/quote/quote.h>
  15. #include <util/generic/singleton.h>
  16. #include <util/generic/algorithm.h>
  17. #include <util/stream/mem.h>
  18. #include <util/string/builder.h>
  19. #include <util/string/cast.h>
  20. #include <util/string/escape.h>
  21. #include <util/string/printf.h>
  22. #include <util/system/byteorder.h>
  23. #include <util/system/getpid.h>
  24. #include <exception>
  25. namespace NYT {
  26. ////////////////////////////////////////////////////////////////////////////////
  27. class THttpRequest::TRequestStream
  28. : public IOutputStream
  29. {
  30. public:
  31. TRequestStream(THttpRequest* httpRequest, const TSocket& s)
  32. : HttpRequest_(httpRequest)
  33. , SocketOutput_(s)
  34. , HttpOutput_(static_cast<IOutputStream*>(&SocketOutput_))
  35. {
  36. HttpOutput_.EnableKeepAlive(true);
  37. }
  38. private:
  39. void DoWrite(const void* buf, size_t len) override
  40. {
  41. WrapWriteFunc([&] {
  42. HttpOutput_.Write(buf, len);
  43. });
  44. }
  45. void DoWriteV(const TPart* parts, size_t count) override
  46. {
  47. WrapWriteFunc([&] {
  48. HttpOutput_.Write(parts, count);
  49. });
  50. }
  51. void DoWriteC(char ch) override
  52. {
  53. WrapWriteFunc([&] {
  54. HttpOutput_.Write(ch);
  55. });
  56. }
  57. void DoFlush() override
  58. {
  59. WrapWriteFunc([&] {
  60. HttpOutput_.Flush();
  61. });
  62. }
  63. void DoFinish() override
  64. {
  65. WrapWriteFunc([&] {
  66. HttpOutput_.Finish();
  67. });
  68. }
  69. void WrapWriteFunc(std::function<void()> func)
  70. {
  71. CheckErrorState();
  72. try {
  73. func();
  74. } catch (const std::exception&) {
  75. HandleWriteException();
  76. }
  77. }
  78. // In many cases http proxy stops reading request and resets connection
  79. // if error has happend. This function tries to read error response
  80. // in such cases.
  81. void HandleWriteException() {
  82. Y_ABORT_UNLESS(WriteError_ == nullptr);
  83. WriteError_ = std::current_exception();
  84. Y_ABORT_UNLESS(WriteError_ != nullptr);
  85. try {
  86. HttpRequest_->GetResponseStream();
  87. } catch (const TErrorResponse &) {
  88. throw;
  89. } catch (...) {
  90. }
  91. std::rethrow_exception(WriteError_);
  92. }
  93. void CheckErrorState()
  94. {
  95. if (WriteError_) {
  96. std::rethrow_exception(WriteError_);
  97. }
  98. }
  99. private:
  100. THttpRequest* const HttpRequest_;
  101. TSocketOutput SocketOutput_;
  102. THttpOutput HttpOutput_;
  103. std::exception_ptr WriteError_;
  104. };
  105. ////////////////////////////////////////////////////////////////////////////////
  106. THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi)
  107. : Method(method)
  108. , Command(command)
  109. , IsApi(isApi)
  110. { }
  111. void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite)
  112. {
  113. auto it = Parameters.find(key);
  114. if (it == Parameters.end()) {
  115. Parameters.emplace(key, std::move(value));
  116. } else {
  117. if (overwrite) {
  118. it->second = std::move(value);
  119. } else {
  120. ythrow yexception() << "Duplicate key: " << key;
  121. }
  122. }
  123. }
  124. void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite)
  125. {
  126. for (const auto& p : newParameters.AsMap()) {
  127. AddParameter(p.first, p.second, overwrite);
  128. }
  129. }
  130. void THttpHeader::RemoveParameter(const TString& key)
  131. {
  132. Parameters.erase(key);
  133. }
  134. TNode THttpHeader::GetParameters() const
  135. {
  136. return Parameters;
  137. }
  138. void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite)
  139. {
  140. if (transactionId) {
  141. AddParameter("transaction_id", GetGuidAsString(transactionId), overwrite);
  142. } else {
  143. RemoveParameter("transaction_id");
  144. }
  145. }
  146. void THttpHeader::AddPath(const TString& path, bool overwrite)
  147. {
  148. AddParameter("path", path, overwrite);
  149. }
  150. void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite)
  151. {
  152. AddParameter("operation_id", GetGuidAsString(operationId), overwrite);
  153. }
  154. void THttpHeader::AddMutationId()
  155. {
  156. TGUID guid;
  157. // Some users use `fork()' with yt wrapper
  158. // (actually they use python + multiprocessing)
  159. // and CreateGuid is not resistant to `fork()', so spice it a little bit.
  160. //
  161. // Check IGNIETFERRO-610
  162. CreateGuid(&guid);
  163. guid.dw[2] = GetPID() ^ MicroSeconds();
  164. AddParameter("mutation_id", GetGuidAsString(guid), true);
  165. }
  166. bool THttpHeader::HasMutationId() const
  167. {
  168. return Parameters.contains("mutation_id");
  169. }
  170. void THttpHeader::SetToken(const TString& token)
  171. {
  172. Token = token;
  173. }
  174. void THttpHeader::SetProxyAddress(const TString& proxyAddress)
  175. {
  176. ProxyAddress = proxyAddress;
  177. }
  178. void THttpHeader::SetHostPort(const TString& hostPort)
  179. {
  180. HostPort = hostPort;
  181. }
  182. void THttpHeader::SetImpersonationUser(const TString& impersonationUser)
  183. {
  184. ImpersonationUser = impersonationUser;
  185. }
  186. void THttpHeader::SetServiceTicket(const TString& ticket)
  187. {
  188. ServiceTicket = ticket;
  189. }
  190. void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format)
  191. {
  192. InputFormat = format;
  193. }
  194. void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format)
  195. {
  196. OutputFormat = format;
  197. }
  198. TMaybe<TFormat> THttpHeader::GetOutputFormat() const
  199. {
  200. return OutputFormat;
  201. }
  202. void THttpHeader::SetRequestCompression(const TString& compression)
  203. {
  204. RequestCompression = compression;
  205. }
  206. void THttpHeader::SetResponseCompression(const TString& compression)
  207. {
  208. ResponseCompression = compression;
  209. }
  210. TString THttpHeader::GetCommand() const
  211. {
  212. return Command;
  213. }
  214. TString THttpHeader::GetUrl(bool needProxy) const
  215. {
  216. TStringStream url;
  217. if (needProxy && !ProxyAddress.empty()) {
  218. url << ProxyAddress << "/";
  219. return url.Str();
  220. }
  221. if (!ProxyAddress.empty()) {
  222. url << HostPort;
  223. }
  224. if (IsApi) {
  225. url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command;
  226. } else {
  227. url << "/" << Command;
  228. }
  229. return url.Str();
  230. }
  231. bool THttpHeader::ShouldAcceptFraming() const
  232. {
  233. return TConfig::Get()->CommandsWithFraming.contains(Command);
  234. }
  235. TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const
  236. {
  237. TStringStream result;
  238. result << Method << " " << GetUrl() << " HTTP/1.1\r\n";
  239. GetHeader(HostPort.Empty() ? hostName : HostPort, requestId, includeParameters).Get()->WriteTo(&result);
  240. if (ShouldAcceptFraming()) {
  241. result << "X-YT-Accept-Framing: 1\r\n";
  242. }
  243. result << "\r\n";
  244. return result.Str();
  245. }
  246. NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const
  247. {
  248. auto headers = New<NHttp::THeaders>();
  249. headers->Add("Host", hostName);
  250. headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
  251. if (!Token.empty()) {
  252. headers->Add("Authorization", "OAuth " + Token);
  253. }
  254. if (!ServiceTicket.empty()) {
  255. headers->Add("X-Ya-Service-Ticket", ServiceTicket);
  256. }
  257. if (!ImpersonationUser.empty()) {
  258. headers->Add("X-Yt-User-Name", ImpersonationUser);
  259. }
  260. if (Method == "PUT" || Method == "POST") {
  261. headers->Add("Transfer-Encoding", "chunked");
  262. }
  263. headers->Add("X-YT-Correlation-Id", requestId);
  264. headers->Add("X-YT-Header-Format", "<format=text>yson");
  265. headers->Add("Content-Encoding", RequestCompression);
  266. headers->Add("Accept-Encoding", ResponseCompression);
  267. auto printYTHeader = [&headers] (const char* headerName, const TString& value) {
  268. static const size_t maxHttpHeaderSize = 64 << 10;
  269. if (!value) {
  270. return;
  271. }
  272. if (value.size() <= maxHttpHeaderSize) {
  273. headers->Add(headerName, value);
  274. return;
  275. }
  276. TString encoded;
  277. Base64Encode(value, encoded);
  278. auto ptr = encoded.data();
  279. auto finish = encoded.data() + encoded.size();
  280. size_t index = 0;
  281. do {
  282. auto end = Min(ptr + maxHttpHeaderSize, finish);
  283. headers->Add(Format("%v%v", headerName, index++), TString(ptr, end));
  284. ptr = end;
  285. } while (ptr != finish);
  286. };
  287. if (InputFormat) {
  288. printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat->Config));
  289. }
  290. if (OutputFormat) {
  291. printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat->Config));
  292. }
  293. if (includeParameters) {
  294. printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters));
  295. }
  296. return NHttp::THeadersPtrWrapper(std::move(headers));
  297. }
  298. const TString& THttpHeader::GetMethod() const
  299. {
  300. return Method;
  301. }
  302. ////////////////////////////////////////////////////////////////////////////////
  303. TAddressCache* TAddressCache::Get()
  304. {
  305. return Singleton<TAddressCache>();
  306. }
  307. bool ContainsAddressOfRequiredVersion(const TAddressCache::TAddressPtr& address)
  308. {
  309. if (!TConfig::Get()->ForceIpV4 && !TConfig::Get()->ForceIpV6) {
  310. return true;
  311. }
  312. for (auto i = address->Begin(); i != address->End(); ++i) {
  313. const auto& addressInfo = *i;
  314. if (TConfig::Get()->ForceIpV4 && addressInfo.ai_family == AF_INET) {
  315. return true;
  316. }
  317. if (TConfig::Get()->ForceIpV6 && addressInfo.ai_family == AF_INET6) {
  318. return true;
  319. }
  320. }
  321. return false;
  322. }
  323. TAddressCache::TAddressPtr TAddressCache::Resolve(const TString& hostName)
  324. {
  325. auto address = FindAddress(hostName);
  326. if (address) {
  327. return address;
  328. }
  329. TString host(hostName);
  330. ui16 port = 80;
  331. auto colon = hostName.find(':');
  332. if (colon != TString::npos) {
  333. port = FromString<ui16>(hostName.substr(colon + 1));
  334. host = hostName.substr(0, colon);
  335. }
  336. auto retryPolicy = CreateDefaultRequestRetryPolicy(TConfig::Get());
  337. auto error = yexception() << "can not resolve address of required version for host " << hostName;
  338. while (true) {
  339. address = new TNetworkAddress(host, port);
  340. if (ContainsAddressOfRequiredVersion(address)) {
  341. break;
  342. }
  343. retryPolicy->NotifyNewAttempt();
  344. YT_LOG_DEBUG("Failed to resolve address of required version for host %v, retrying: %v",
  345. hostName,
  346. retryPolicy->GetAttemptDescription());
  347. if (auto backoffDuration = retryPolicy->OnGenericError(error)) {
  348. NDetail::TWaitProxy::Get()->Sleep(*backoffDuration);
  349. } else {
  350. ythrow error;
  351. }
  352. }
  353. AddAddress(hostName, address);
  354. return address;
  355. }
  356. TAddressCache::TAddressPtr TAddressCache::FindAddress(const TString& hostName) const
  357. {
  358. TCacheEntry entry;
  359. {
  360. TReadGuard guard(Lock_);
  361. auto it = Cache_.find(hostName);
  362. if (it == Cache_.end()) {
  363. return nullptr;
  364. }
  365. entry = it->second;
  366. }
  367. if (TInstant::Now() > entry.ExpirationTime) {
  368. YT_LOG_DEBUG("Address resolution cache entry for host %v is expired, will retry resolution",
  369. hostName);
  370. return nullptr;
  371. }
  372. if (!ContainsAddressOfRequiredVersion(entry.Address)) {
  373. YT_LOG_DEBUG("Address of required version not found for host %v, will retry resolution",
  374. hostName);
  375. return nullptr;
  376. }
  377. return entry.Address;
  378. }
  379. void TAddressCache::AddAddress(TString hostName, TAddressPtr address)
  380. {
  381. auto entry = TCacheEntry{
  382. .Address = std::move(address),
  383. .ExpirationTime = TInstant::Now() + TConfig::Get()->AddressCacheExpirationTimeout,
  384. };
  385. {
  386. TWriteGuard guard(Lock_);
  387. Cache_.emplace(std::move(hostName), std::move(entry));
  388. }
  389. }
  390. ////////////////////////////////////////////////////////////////////////////////
  391. TConnectionPool* TConnectionPool::Get()
  392. {
  393. return Singleton<TConnectionPool>();
  394. }
  395. TConnectionPtr TConnectionPool::Connect(
  396. const TString& hostName,
  397. TDuration socketTimeout)
  398. {
  399. Refresh();
  400. if (socketTimeout == TDuration::Zero()) {
  401. socketTimeout = TConfig::Get()->SocketTimeout;
  402. }
  403. {
  404. auto guard = Guard(Lock_);
  405. auto now = TInstant::Now();
  406. auto range = Connections_.equal_range(hostName);
  407. for (auto it = range.first; it != range.second; ++it) {
  408. auto& connection = it->second;
  409. if (connection->DeadLine < now) {
  410. continue;
  411. }
  412. if (!AtomicCas(&connection->Busy, 1, 0)) {
  413. continue;
  414. }
  415. connection->DeadLine = now + socketTimeout;
  416. connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
  417. return connection;
  418. }
  419. }
  420. TConnectionPtr connection(new TConnection);
  421. auto networkAddress = TAddressCache::Get()->Resolve(hostName);
  422. TSocketHolder socket(DoConnect(networkAddress));
  423. SetNonBlock(socket, false);
  424. connection->Socket.Reset(new TSocket(socket.Release()));
  425. connection->DeadLine = TInstant::Now() + socketTimeout;
  426. connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
  427. {
  428. auto guard = Guard(Lock_);
  429. static ui32 connectionId = 0;
  430. connection->Id = ++connectionId;
  431. Connections_.insert({hostName, connection});
  432. }
  433. YT_LOG_DEBUG("New connection to %v #%v opened",
  434. hostName,
  435. connection->Id);
  436. return connection;
  437. }
  438. void TConnectionPool::Release(TConnectionPtr connection)
  439. {
  440. auto socketTimeout = TConfig::Get()->SocketTimeout;
  441. auto newDeadline = TInstant::Now() + socketTimeout;
  442. {
  443. auto guard = Guard(Lock_);
  444. connection->DeadLine = newDeadline;
  445. }
  446. connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
  447. AtomicSet(connection->Busy, 0);
  448. Refresh();
  449. }
  450. void TConnectionPool::Invalidate(
  451. const TString& hostName,
  452. TConnectionPtr connection)
  453. {
  454. auto guard = Guard(Lock_);
  455. auto range = Connections_.equal_range(hostName);
  456. for (auto it = range.first; it != range.second; ++it) {
  457. if (it->second == connection) {
  458. YT_LOG_DEBUG("Closing connection #%v",
  459. connection->Id);
  460. Connections_.erase(it);
  461. return;
  462. }
  463. }
  464. }
  465. void TConnectionPool::Refresh()
  466. {
  467. auto guard = Guard(Lock_);
  468. // simple, since we don't expect too many connections
  469. using TItem = std::pair<TInstant, TConnectionMap::iterator>;
  470. std::vector<TItem> sortedConnections;
  471. for (auto it = Connections_.begin(); it != Connections_.end(); ++it) {
  472. sortedConnections.emplace_back(it->second->DeadLine, it);
  473. }
  474. std::sort(
  475. sortedConnections.begin(),
  476. sortedConnections.end(),
  477. [] (const TItem& a, const TItem& b) -> bool {
  478. return a.first < b.first;
  479. });
  480. auto removeCount = static_cast<int>(Connections_.size()) - TConfig::Get()->ConnectionPoolSize;
  481. const auto now = TInstant::Now();
  482. for (const auto& item : sortedConnections) {
  483. const auto& mapIterator = item.second;
  484. auto connection = mapIterator->second;
  485. if (AtomicGet(connection->Busy)) {
  486. continue;
  487. }
  488. if (removeCount > 0) {
  489. Connections_.erase(mapIterator);
  490. YT_LOG_DEBUG("Closing connection #%v (too many opened connections)",
  491. connection->Id);
  492. --removeCount;
  493. continue;
  494. }
  495. if (connection->DeadLine < now) {
  496. Connections_.erase(mapIterator);
  497. YT_LOG_DEBUG("Closing connection #%v (timeout)",
  498. connection->Id);
  499. }
  500. }
  501. }
  502. SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address)
  503. {
  504. int lastError = 0;
  505. for (auto i = address->Begin(); i != address->End(); ++i) {
  506. struct addrinfo* info = &*i;
  507. if (TConfig::Get()->ForceIpV4 && info->ai_family != AF_INET) {
  508. continue;
  509. }
  510. if (TConfig::Get()->ForceIpV6 && info->ai_family != AF_INET6) {
  511. continue;
  512. }
  513. TSocketHolder socket(
  514. ::socket(info->ai_family, info->ai_socktype, info->ai_protocol));
  515. if (socket.Closed()) {
  516. lastError = LastSystemError();
  517. continue;
  518. }
  519. SetNonBlock(socket, true);
  520. if (TConfig::Get()->SocketPriority) {
  521. SetSocketPriority(socket, *TConfig::Get()->SocketPriority);
  522. }
  523. if (connect(socket, info->ai_addr, info->ai_addrlen) == 0)
  524. return socket.Release();
  525. int err = LastSystemError();
  526. if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
  527. struct pollfd p = {
  528. socket,
  529. POLLOUT,
  530. 0
  531. };
  532. const ssize_t n = PollD(&p, 1, TInstant::Now() + TConfig::Get()->ConnectTimeout);
  533. if (n < 0) {
  534. ythrow TSystemError(-(int)n) << "can not connect to " << info;
  535. }
  536. CheckedGetSockOpt(socket, SOL_SOCKET, SO_ERROR, err, "socket error");
  537. if (!err)
  538. return socket.Release();
  539. }
  540. lastError = err;
  541. continue;
  542. }
  543. ythrow TSystemError(lastError) << "can not connect to " << *address;
  544. }
  545. ////////////////////////////////////////////////////////////////////////////////
  546. static TMaybe<TString> GetProxyName(const THttpInput& input)
  547. {
  548. if (auto proxyHeader = input.Headers().FindHeader("X-YT-Proxy")) {
  549. return proxyHeader->Value();
  550. }
  551. return Nothing();
  552. }
  553. THttpResponse::THttpResponse(
  554. IInputStream* socketStream,
  555. const TString& requestId,
  556. const TString& hostName)
  557. : HttpInput_(socketStream)
  558. , RequestId_(requestId)
  559. , HostName_(GetProxyName(HttpInput_).GetOrElse(hostName))
  560. , Unframe_(HttpInput_.Headers().HasHeader("X-YT-Framing"))
  561. {
  562. HttpCode_ = ParseHttpRetCode(HttpInput_.FirstLine());
  563. if (HttpCode_ == 200 || HttpCode_ == 202) {
  564. return;
  565. }
  566. ErrorResponse_ = TErrorResponse(HttpCode_, RequestId_);
  567. auto logAndSetError = [&] (const TString& rawError) {
  568. YT_LOG_ERROR("RSP %v - HTTP %v - %v",
  569. RequestId_,
  570. HttpCode_,
  571. rawError.data());
  572. ErrorResponse_->SetRawError(rawError);
  573. };
  574. switch (HttpCode_) {
  575. case 429:
  576. logAndSetError("request rate limit exceeded");
  577. break;
  578. case 500:
  579. logAndSetError(::TStringBuilder() << "internal error in proxy " << HostName_);
  580. break;
  581. default: {
  582. TStringStream httpHeaders;
  583. httpHeaders << "HTTP headers (";
  584. for (const auto& header : HttpInput_.Headers()) {
  585. httpHeaders << header.Name() << ": " << header.Value() << "; ";
  586. }
  587. httpHeaders << ")";
  588. auto errorString = Sprintf("RSP %s - HTTP %d - %s",
  589. RequestId_.data(),
  590. HttpCode_,
  591. httpHeaders.Str().data());
  592. YT_LOG_ERROR("%v",
  593. errorString.data());
  594. if (auto parsedResponse = ParseError(HttpInput_.Headers())) {
  595. ErrorResponse_ = parsedResponse.GetRef();
  596. } else {
  597. ErrorResponse_->SetRawError(
  598. errorString + " - X-YT-Error is missing in headers");
  599. }
  600. break;
  601. }
  602. }
  603. }
  604. const THttpHeaders& THttpResponse::Headers() const
  605. {
  606. return HttpInput_.Headers();
  607. }
  608. void THttpResponse::CheckErrorResponse() const
  609. {
  610. if (ErrorResponse_) {
  611. throw *ErrorResponse_;
  612. }
  613. }
  614. bool THttpResponse::IsExhausted() const
  615. {
  616. return IsExhausted_;
  617. }
  618. int THttpResponse::GetHttpCode() const
  619. {
  620. return HttpCode_;
  621. }
  622. const TString& THttpResponse::GetHostName() const
  623. {
  624. return HostName_;
  625. }
  626. bool THttpResponse::IsKeepAlive() const
  627. {
  628. return HttpInput_.IsKeepAlive();
  629. }
  630. TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers)
  631. {
  632. for (const auto& header : headers) {
  633. if (header.Name() == "X-YT-Error") {
  634. TErrorResponse errorResponse(HttpCode_, RequestId_);
  635. errorResponse.ParseFromJsonError(header.Value());
  636. if (errorResponse.IsOk()) {
  637. return Nothing();
  638. }
  639. return errorResponse;
  640. }
  641. }
  642. return Nothing();
  643. }
  644. size_t THttpResponse::DoRead(void* buf, size_t len)
  645. {
  646. size_t read;
  647. if (Unframe_) {
  648. read = UnframeRead(buf, len);
  649. } else {
  650. read = HttpInput_.Read(buf, len);
  651. }
  652. if (read == 0 && len != 0) {
  653. // THttpInput MUST return defined (but may be empty)
  654. // trailers when it is exhausted.
  655. Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(),
  656. "trailers MUST be defined for exhausted stream");
  657. CheckTrailers(HttpInput_.Trailers().GetRef());
  658. IsExhausted_ = true;
  659. }
  660. return read;
  661. }
  662. size_t THttpResponse::DoSkip(size_t len)
  663. {
  664. size_t skipped;
  665. if (Unframe_) {
  666. skipped = UnframeSkip(len);
  667. } else {
  668. skipped = HttpInput_.Skip(len);
  669. }
  670. if (skipped == 0 && len != 0) {
  671. // THttpInput MUST return defined (but may be empty)
  672. // trailers when it is exhausted.
  673. Y_ABORT_UNLESS(HttpInput_.Trailers().Defined(),
  674. "trailers MUST be defined for exhausted stream");
  675. CheckTrailers(HttpInput_.Trailers().GetRef());
  676. IsExhausted_ = true;
  677. }
  678. return skipped;
  679. }
  680. void THttpResponse::CheckTrailers(const THttpHeaders& trailers)
  681. {
  682. if (auto errorResponse = ParseError(trailers)) {
  683. errorResponse->SetIsFromTrailers(true);
  684. YT_LOG_ERROR("RSP %v - %v",
  685. RequestId_,
  686. errorResponse.GetRef().what());
  687. ythrow errorResponse.GetRef();
  688. }
  689. }
  690. static ui32 ReadDataFrameSize(THttpInput* stream)
  691. {
  692. ui32 littleEndianSize;
  693. auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize));
  694. if (read < sizeof(littleEndianSize)) {
  695. ythrow yexception() << "Bad data frame header: " <<
  696. "expected " << sizeof(littleEndianSize) << " bytes, got " << read;
  697. }
  698. return LittleToHost(littleEndianSize);
  699. }
  700. bool THttpResponse::RefreshFrameIfNecessary()
  701. {
  702. while (RemainingFrameSize_ == 0) {
  703. ui8 frameTypeByte;
  704. auto read = HttpInput_.Read(&frameTypeByte, sizeof(frameTypeByte));
  705. if (read == 0) {
  706. return false;
  707. }
  708. auto frameType = static_cast<EFrameType>(frameTypeByte);
  709. switch (frameType) {
  710. case EFrameType::KeepAlive:
  711. break;
  712. case EFrameType::Data:
  713. RemainingFrameSize_ = ReadDataFrameSize(&HttpInput_);
  714. break;
  715. default:
  716. ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte);
  717. }
  718. }
  719. return true;
  720. }
  721. size_t THttpResponse::UnframeRead(void* buf, size_t len)
  722. {
  723. if (!RefreshFrameIfNecessary()) {
  724. return 0;
  725. }
  726. auto read = HttpInput_.Read(buf, Min(len, RemainingFrameSize_));
  727. RemainingFrameSize_ -= read;
  728. return read;
  729. }
  730. size_t THttpResponse::UnframeSkip(size_t len)
  731. {
  732. if (!RefreshFrameIfNecessary()) {
  733. return 0;
  734. }
  735. auto skipped = HttpInput_.Skip(Min(len, RemainingFrameSize_));
  736. RemainingFrameSize_ -= skipped;
  737. return skipped;
  738. }
  739. ////////////////////////////////////////////////////////////////////////////////
  740. THttpRequest::THttpRequest()
  741. {
  742. RequestId = CreateGuidAsString();
  743. }
  744. THttpRequest::THttpRequest(const TString& requestId)
  745. : RequestId(requestId)
  746. { }
  747. THttpRequest::~THttpRequest()
  748. {
  749. if (!Connection) {
  750. return;
  751. }
  752. if (Input && Input->IsKeepAlive() && Input->IsExhausted()) {
  753. // We should return to the pool only connections where HTTP response was fully read.
  754. // Otherwise next reader might read our remaining data and misinterpret them (YT-6510).
  755. TConnectionPool::Get()->Release(Connection);
  756. } else {
  757. TConnectionPool::Get()->Invalidate(HostName, Connection);
  758. }
  759. }
  760. TString THttpRequest::GetRequestId() const
  761. {
  762. return RequestId;
  763. }
  764. void THttpRequest::Connect(TString hostName, TDuration socketTimeout)
  765. {
  766. HostName = std::move(hostName);
  767. YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool",
  768. RequestId,
  769. HostName);
  770. StartTime_ = TInstant::Now();
  771. Connection = TConnectionPool::Get()->Connect(HostName, socketTimeout);
  772. YT_LOG_DEBUG("REQ %v - connection #%v",
  773. RequestId,
  774. Connection->Id);
  775. }
  776. IOutputStream* THttpRequest::StartRequestImpl(const THttpHeader& header, bool includeParameters)
  777. {
  778. auto strHeader = header.GetHeaderAsString(HostName, RequestId, includeParameters);
  779. Url_ = header.GetUrl(true);
  780. LogRequest(header, Url_, includeParameters, RequestId, HostName);
  781. LoggedAttributes_ = GetLoggedAttributes(header, Url_, includeParameters, 128);
  782. auto outputFormat = header.GetOutputFormat();
  783. if (outputFormat && outputFormat->IsTextYson()) {
  784. LogResponse = true;
  785. }
  786. RequestStream_ = MakeHolder<TRequestStream>(this, *Connection->Socket.Get());
  787. RequestStream_->Write(strHeader.data(), strHeader.size());
  788. return RequestStream_.Get();
  789. }
  790. IOutputStream* THttpRequest::StartRequest(const THttpHeader& header)
  791. {
  792. return StartRequestImpl(header, true);
  793. }
  794. void THttpRequest::FinishRequest()
  795. {
  796. RequestStream_->Flush();
  797. RequestStream_->Finish();
  798. }
  799. void THttpRequest::SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body)
  800. {
  801. if (!body && (header.GetMethod() == "PUT" || header.GetMethod() == "POST")) {
  802. const auto& parameters = header.GetParameters();
  803. auto parametersStr = NodeToYsonString(parameters);
  804. auto* output = StartRequestImpl(header, false);
  805. output->Write(parametersStr);
  806. FinishRequest();
  807. } else {
  808. auto* output = StartRequest(header);
  809. if (body) {
  810. output->Write(*body);
  811. }
  812. FinishRequest();
  813. }
  814. }
  815. THttpResponse* THttpRequest::GetResponseStream()
  816. {
  817. if (!Input) {
  818. SocketInput.Reset(new TSocketInput(*Connection->Socket.Get()));
  819. if (TConfig::Get()->UseAbortableResponse) {
  820. Y_ABORT_UNLESS(!Url_.empty());
  821. Input.Reset(new TAbortableHttpResponse(SocketInput.Get(), RequestId, HostName, Url_));
  822. } else {
  823. Input.Reset(new THttpResponse(SocketInput.Get(), RequestId, HostName));
  824. }
  825. Input->CheckErrorResponse();
  826. }
  827. return Input.Get();
  828. }
  829. TString THttpRequest::GetResponse()
  830. {
  831. TString result = GetResponseStream()->ReadAll();
  832. TStringStream loggedAttributes;
  833. loggedAttributes
  834. << "Time: " << TInstant::Now() - StartTime_ << "; "
  835. << "HostName: " << GetResponseStream()->GetHostName() << "; "
  836. << LoggedAttributes_;
  837. if (LogResponse) {
  838. constexpr auto sizeLimit = 1 << 7;
  839. YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
  840. RequestId,
  841. TruncateForLogs(result, sizeLimit),
  842. loggedAttributes.Str());
  843. } else {
  844. YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
  845. RequestId,
  846. result.size(),
  847. loggedAttributes.Str());
  848. }
  849. return result;
  850. }
  851. int THttpRequest::GetHttpCode() {
  852. return GetResponseStream()->GetHttpCode();
  853. }
  854. void THttpRequest::InvalidateConnection()
  855. {
  856. TConnectionPool::Get()->Invalidate(HostName, Connection);
  857. Connection.Reset();
  858. }
  859. ////////////////////////////////////////////////////////////////////////////////
  860. } // namespace NYT