http.cpp 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109
  1. #include "http.h"
  2. #include "abortable_http_response.h"
  3. #include "core.h"
  4. #include "helpers.h"
  5. #include <yt/cpp/mapreduce/common/helpers.h>
  6. #include <yt/cpp/mapreduce/common/retry_lib.h>
  7. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  8. #include <yt/cpp/mapreduce/interface/config.h>
  9. #include <yt/cpp/mapreduce/interface/errors.h>
  10. #include <yt/cpp/mapreduce/interface/error_codes.h>
  11. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  12. #include <yt/yt/core/http/http.h>
  13. #include <library/cpp/json/json_writer.h>
  14. #include <library/cpp/string_utils/base64/base64.h>
  15. #include <library/cpp/string_utils/quote/quote.h>
  16. #include <util/generic/singleton.h>
  17. #include <util/generic/algorithm.h>
  18. #include <util/stream/mem.h>
  19. #include <util/string/builder.h>
  20. #include <util/string/cast.h>
  21. #include <util/string/escape.h>
  22. #include <util/string/printf.h>
  23. #include <util/system/byteorder.h>
  24. #include <util/system/getpid.h>
  25. #include <exception>
  26. namespace NYT {
  27. ////////////////////////////////////////////////////////////////////////////////
  28. std::exception_ptr WrapSystemError(
  29. const TRequestContext& context,
  30. const std::exception& ex)
  31. {
  32. if (auto errorResponse = dynamic_cast<const TErrorResponse*>(&ex); errorResponse != nullptr) {
  33. return std::make_exception_ptr(errorResponse);
  34. }
  35. auto message = NYT::Format("Request %qv to %qv failed", context.RequestId, context.HostName + context.Method);
  36. TYtError outer(1, message, {TYtError(NClusterErrorCodes::Generic, ex.what())}, {
  37. {"request_id", context.RequestId},
  38. {"host", context.HostName},
  39. {"method", context.Method},
  40. });
  41. TTransportError errorResponse(std::move(outer));
  42. return std::make_exception_ptr(errorResponse);
  43. }
  44. ////////////////////////////////////////////////////////////////////////////////
  45. class THttpRequest::TRequestStream
  46. : public IOutputStream
  47. {
  48. public:
  49. TRequestStream(THttpRequest* httpRequest, const TSocket& s)
  50. : HttpRequest_(httpRequest)
  51. , SocketOutput_(s)
  52. , HttpOutput_(static_cast<IOutputStream*>(&SocketOutput_))
  53. {
  54. HttpOutput_.EnableKeepAlive(true);
  55. }
  56. private:
  57. void DoWrite(const void* buf, size_t len) override
  58. {
  59. WrapWriteFunc([&] {
  60. HttpOutput_.Write(buf, len);
  61. });
  62. }
  63. void DoWriteV(const TPart* parts, size_t count) override
  64. {
  65. WrapWriteFunc([&] {
  66. HttpOutput_.Write(parts, count);
  67. });
  68. }
  69. void DoWriteC(char ch) override
  70. {
  71. WrapWriteFunc([&] {
  72. HttpOutput_.Write(ch);
  73. });
  74. }
  75. void DoFlush() override
  76. {
  77. WrapWriteFunc([&] {
  78. HttpOutput_.Flush();
  79. });
  80. }
  81. void DoFinish() override
  82. {
  83. WrapWriteFunc([&] {
  84. HttpOutput_.Finish();
  85. });
  86. }
  87. void WrapWriteFunc(std::function<void()> func)
  88. {
  89. CheckErrorState();
  90. try {
  91. func();
  92. } catch (const std::exception& ex) {
  93. HandleWriteException(ex);
  94. }
  95. }
  96. // In many cases http proxy stops reading request and resets connection
  97. // if error has happend. This function tries to read error response
  98. // in such cases.
  99. void HandleWriteException(const std::exception& ex) {
  100. Y_ABORT_UNLESS(WriteError_ == nullptr);
  101. WriteError_ = WrapSystemError(HttpRequest_->Context_, ex);
  102. Y_ABORT_UNLESS(WriteError_ != nullptr);
  103. try {
  104. HttpRequest_->GetResponseStream();
  105. } catch (const TErrorResponse &) {
  106. throw;
  107. } catch (...) {
  108. }
  109. std::rethrow_exception(WriteError_);
  110. }
  111. void CheckErrorState()
  112. {
  113. if (WriteError_) {
  114. std::rethrow_exception(WriteError_);
  115. }
  116. }
  117. private:
  118. THttpRequest* const HttpRequest_;
  119. TSocketOutput SocketOutput_;
  120. THttpOutput HttpOutput_;
  121. std::exception_ptr WriteError_;
  122. };
  123. ////////////////////////////////////////////////////////////////////////////////
  124. THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi)
  125. : Method_(method)
  126. , Command_(command)
  127. , IsApi_(isApi)
  128. { }
  129. void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite)
  130. {
  131. auto it = Parameters_.find(key);
  132. if (it == Parameters_.end()) {
  133. Parameters_.emplace(key, std::move(value));
  134. } else {
  135. if (overwrite) {
  136. it->second = std::move(value);
  137. } else {
  138. ythrow yexception() << "Duplicate key: " << key;
  139. }
  140. }
  141. }
  142. void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite)
  143. {
  144. for (const auto& p : newParameters.AsMap()) {
  145. AddParameter(p.first, p.second, overwrite);
  146. }
  147. }
  148. void THttpHeader::RemoveParameter(const TString& key)
  149. {
  150. Parameters_.erase(key);
  151. }
  152. TNode THttpHeader::GetParameters() const
  153. {
  154. return Parameters_;
  155. }
  156. void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite)
  157. {
  158. if (transactionId) {
  159. AddParameter("transaction_id", GetGuidAsString(transactionId), overwrite);
  160. } else {
  161. RemoveParameter("transaction_id");
  162. }
  163. }
  164. void THttpHeader::AddPath(const TString& path, bool overwrite)
  165. {
  166. AddParameter("path", path, overwrite);
  167. }
  168. void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite)
  169. {
  170. AddParameter("operation_id", GetGuidAsString(operationId), overwrite);
  171. }
  172. void THttpHeader::AddMutationId()
  173. {
  174. TGUID guid;
  175. // Some users use `fork()' with yt wrapper
  176. // (actually they use python + multiprocessing)
  177. // and CreateGuid is not resistant to `fork()', so spice it a little bit.
  178. //
  179. // Check IGNIETFERRO-610
  180. CreateGuid(&guid);
  181. guid.dw[2] = GetPID() ^ MicroSeconds();
  182. AddParameter("mutation_id", GetGuidAsString(guid), true);
  183. }
  184. bool THttpHeader::HasMutationId() const
  185. {
  186. return Parameters_.contains("mutation_id");
  187. }
  188. void THttpHeader::SetToken(const TString& token)
  189. {
  190. Token_ = token;
  191. }
  192. void THttpHeader::SetProxyAddress(const TString& proxyAddress)
  193. {
  194. ProxyAddress_ = proxyAddress;
  195. }
  196. void THttpHeader::SetHostPort(const TString& hostPort)
  197. {
  198. HostPort_ = hostPort;
  199. }
  200. void THttpHeader::SetImpersonationUser(const TString& impersonationUser)
  201. {
  202. ImpersonationUser_ = impersonationUser;
  203. }
  204. void THttpHeader::SetServiceTicket(const TString& ticket)
  205. {
  206. ServiceTicket_ = ticket;
  207. }
  208. void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format)
  209. {
  210. InputFormat_ = format;
  211. }
  212. void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format)
  213. {
  214. OutputFormat_ = format;
  215. }
  216. TMaybe<TFormat> THttpHeader::GetOutputFormat() const
  217. {
  218. return OutputFormat_;
  219. }
  220. void THttpHeader::SetRequestCompression(const TString& compression)
  221. {
  222. RequestCompression_ = compression;
  223. }
  224. void THttpHeader::SetResponseCompression(const TString& compression)
  225. {
  226. ResponseCompression_ = compression;
  227. }
  228. TString THttpHeader::GetCommand() const
  229. {
  230. return Command_;
  231. }
  232. TString THttpHeader::GetUrl(bool needProxy) const
  233. {
  234. TStringStream url;
  235. if (needProxy && !ProxyAddress_.empty()) {
  236. url << ProxyAddress_ << "/";
  237. return url.Str();
  238. }
  239. if (!ProxyAddress_.empty()) {
  240. url << HostPort_;
  241. }
  242. if (IsApi_) {
  243. url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command_;
  244. } else {
  245. url << "/" << Command_;
  246. }
  247. return url.Str();
  248. }
  249. bool THttpHeader::ShouldAcceptFraming() const
  250. {
  251. return TConfig::Get()->CommandsWithFraming.contains(Command_);
  252. }
  253. TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const
  254. {
  255. TStringStream result;
  256. result << Method_ << " " << GetUrl() << " HTTP/1.1\r\n";
  257. GetHeader(HostPort_.empty() ? hostName : HostPort_, requestId, includeParameters).Get()->WriteTo(&result);
  258. if (ShouldAcceptFraming()) {
  259. result << "X-YT-Accept-Framing: 1\r\n";
  260. }
  261. result << "\r\n";
  262. return result.Str();
  263. }
  264. NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const
  265. {
  266. auto headers = New<NHttp::THeaders>();
  267. headers->Add("Host", hostName);
  268. headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
  269. if (!Token_.empty()) {
  270. headers->Add("Authorization", "OAuth " + Token_);
  271. }
  272. if (!ServiceTicket_.empty()) {
  273. headers->Add("X-Ya-Service-Ticket", ServiceTicket_);
  274. }
  275. if (!ImpersonationUser_.empty()) {
  276. headers->Add("X-Yt-User-Name", ImpersonationUser_);
  277. }
  278. if (Method_ == "PUT" || Method_ == "POST") {
  279. headers->Add("Transfer-Encoding", "chunked");
  280. }
  281. headers->Add("X-YT-Correlation-Id", requestId);
  282. headers->Add("X-YT-Header-Format", "<format=text>yson");
  283. headers->Add("Content-Encoding", RequestCompression_);
  284. headers->Add("Accept-Encoding", ResponseCompression_);
  285. auto printYTHeader = [&headers] (const char* headerName, const TString& value) {
  286. static const size_t maxHttpHeaderSize = 64 << 10;
  287. if (!value) {
  288. return;
  289. }
  290. if (value.size() <= maxHttpHeaderSize) {
  291. headers->Add(headerName, value);
  292. return;
  293. }
  294. TString encoded;
  295. Base64Encode(value, encoded);
  296. auto ptr = encoded.data();
  297. auto finish = encoded.data() + encoded.size();
  298. size_t index = 0;
  299. do {
  300. auto end = Min(ptr + maxHttpHeaderSize, finish);
  301. headers->Add(Format("%v%v", headerName, index++), TString(ptr, end));
  302. ptr = end;
  303. } while (ptr != finish);
  304. };
  305. if (InputFormat_) {
  306. printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat_->Config));
  307. }
  308. if (OutputFormat_) {
  309. printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat_->Config));
  310. }
  311. if (includeParameters) {
  312. printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters_));
  313. }
  314. return NHttp::THeadersPtrWrapper(std::move(headers));
  315. }
  316. const TString& THttpHeader::GetMethod() const
  317. {
  318. return Method_;
  319. }
  320. ////////////////////////////////////////////////////////////////////////////////
  321. TAddressCache* TAddressCache::Get()
  322. {
  323. return Singleton<TAddressCache>();
  324. }
  325. bool ContainsAddressOfRequiredVersion(const TAddressCache::TAddressPtr& address)
  326. {
  327. if (!TConfig::Get()->ForceIpV4 && !TConfig::Get()->ForceIpV6) {
  328. return true;
  329. }
  330. for (auto i = address->Begin(); i != address->End(); ++i) {
  331. const auto& addressInfo = *i;
  332. if (TConfig::Get()->ForceIpV4 && addressInfo.ai_family == AF_INET) {
  333. return true;
  334. }
  335. if (TConfig::Get()->ForceIpV6 && addressInfo.ai_family == AF_INET6) {
  336. return true;
  337. }
  338. }
  339. return false;
  340. }
  341. TAddressCache::TAddressPtr TAddressCache::Resolve(const TString& hostName)
  342. {
  343. auto address = FindAddress(hostName);
  344. if (address) {
  345. return address;
  346. }
  347. TString host(hostName);
  348. ui16 port = 80;
  349. auto colon = hostName.find(':');
  350. if (colon != TString::npos) {
  351. port = FromString<ui16>(hostName.substr(colon + 1));
  352. host = hostName.substr(0, colon);
  353. }
  354. auto retryPolicy = CreateDefaultRequestRetryPolicy(TConfig::Get());
  355. auto error = yexception() << "can not resolve address of required version for host " << hostName;
  356. while (true) {
  357. address = new TNetworkAddress(host, port);
  358. if (ContainsAddressOfRequiredVersion(address)) {
  359. break;
  360. }
  361. retryPolicy->NotifyNewAttempt();
  362. YT_LOG_DEBUG("Failed to resolve address of required version for host %v, retrying: %v",
  363. hostName,
  364. retryPolicy->GetAttemptDescription());
  365. if (auto backoffDuration = retryPolicy->OnGenericError(error)) {
  366. NDetail::TWaitProxy::Get()->Sleep(*backoffDuration);
  367. } else {
  368. ythrow error;
  369. }
  370. }
  371. AddAddress(hostName, address);
  372. return address;
  373. }
  374. TAddressCache::TAddressPtr TAddressCache::FindAddress(const TString& hostName) const
  375. {
  376. TCacheEntry entry;
  377. {
  378. TReadGuard guard(Lock_);
  379. auto it = Cache_.find(hostName);
  380. if (it == Cache_.end()) {
  381. return nullptr;
  382. }
  383. entry = it->second;
  384. }
  385. if (TInstant::Now() > entry.ExpirationTime) {
  386. YT_LOG_DEBUG("Address resolution cache entry for host %v is expired, will retry resolution",
  387. hostName);
  388. return nullptr;
  389. }
  390. if (!ContainsAddressOfRequiredVersion(entry.Address)) {
  391. YT_LOG_DEBUG("Address of required version not found for host %v, will retry resolution",
  392. hostName);
  393. return nullptr;
  394. }
  395. return entry.Address;
  396. }
  397. void TAddressCache::AddAddress(TString hostName, TAddressPtr address)
  398. {
  399. auto entry = TCacheEntry{
  400. .Address = std::move(address),
  401. .ExpirationTime = TInstant::Now() + TConfig::Get()->AddressCacheExpirationTimeout,
  402. };
  403. {
  404. TWriteGuard guard(Lock_);
  405. Cache_.emplace(std::move(hostName), std::move(entry));
  406. }
  407. }
  408. ////////////////////////////////////////////////////////////////////////////////
  409. TConnectionPool* TConnectionPool::Get()
  410. {
  411. return Singleton<TConnectionPool>();
  412. }
  413. TConnectionPtr TConnectionPool::Connect(
  414. const TString& hostName,
  415. TDuration socketTimeout)
  416. {
  417. Refresh();
  418. if (socketTimeout == TDuration::Zero()) {
  419. socketTimeout = TConfig::Get()->SocketTimeout;
  420. }
  421. {
  422. auto guard = Guard(Lock_);
  423. auto now = TInstant::Now();
  424. auto range = Connections_.equal_range(hostName);
  425. for (auto it = range.first; it != range.second; ++it) {
  426. auto& connection = it->second;
  427. if (connection->DeadLine < now) {
  428. continue;
  429. }
  430. if (!AtomicCas(&connection->Busy, 1, 0)) {
  431. continue;
  432. }
  433. connection->DeadLine = now + socketTimeout;
  434. connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
  435. return connection;
  436. }
  437. }
  438. TConnectionPtr connection(new TConnection);
  439. auto networkAddress = TAddressCache::Get()->Resolve(hostName);
  440. TSocketHolder socket(DoConnect(networkAddress));
  441. SetNonBlock(socket, false);
  442. connection->Socket.Reset(new TSocket(socket.Release()));
  443. connection->DeadLine = TInstant::Now() + socketTimeout;
  444. connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
  445. {
  446. auto guard = Guard(Lock_);
  447. static ui32 connectionId = 0;
  448. connection->Id = ++connectionId;
  449. Connections_.insert({hostName, connection});
  450. }
  451. YT_LOG_DEBUG("New connection to %v #%v opened",
  452. hostName,
  453. connection->Id);
  454. return connection;
  455. }
  456. void TConnectionPool::Release(TConnectionPtr connection)
  457. {
  458. auto socketTimeout = TConfig::Get()->SocketTimeout;
  459. auto newDeadline = TInstant::Now() + socketTimeout;
  460. {
  461. auto guard = Guard(Lock_);
  462. connection->DeadLine = newDeadline;
  463. }
  464. connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
  465. AtomicSet(connection->Busy, 0);
  466. Refresh();
  467. }
  468. void TConnectionPool::Invalidate(
  469. const TString& hostName,
  470. TConnectionPtr connection)
  471. {
  472. auto guard = Guard(Lock_);
  473. auto range = Connections_.equal_range(hostName);
  474. for (auto it = range.first; it != range.second; ++it) {
  475. if (it->second == connection) {
  476. YT_LOG_DEBUG("Closing connection #%v",
  477. connection->Id);
  478. Connections_.erase(it);
  479. return;
  480. }
  481. }
  482. }
  483. void TConnectionPool::Refresh()
  484. {
  485. auto guard = Guard(Lock_);
  486. // simple, since we don't expect too many connections
  487. using TItem = std::pair<TInstant, TConnectionMap::iterator>;
  488. std::vector<TItem> sortedConnections;
  489. for (auto it = Connections_.begin(); it != Connections_.end(); ++it) {
  490. sortedConnections.emplace_back(it->second->DeadLine, it);
  491. }
  492. std::sort(
  493. sortedConnections.begin(),
  494. sortedConnections.end(),
  495. [] (const TItem& a, const TItem& b) -> bool {
  496. return a.first < b.first;
  497. });
  498. auto removeCount = static_cast<int>(Connections_.size()) - TConfig::Get()->ConnectionPoolSize;
  499. const auto now = TInstant::Now();
  500. for (const auto& item : sortedConnections) {
  501. const auto& mapIterator = item.second;
  502. auto connection = mapIterator->second;
  503. if (AtomicGet(connection->Busy)) {
  504. continue;
  505. }
  506. if (removeCount > 0) {
  507. Connections_.erase(mapIterator);
  508. YT_LOG_DEBUG("Closing connection #%v (too many opened connections)",
  509. connection->Id);
  510. --removeCount;
  511. continue;
  512. }
  513. if (connection->DeadLine < now) {
  514. Connections_.erase(mapIterator);
  515. YT_LOG_DEBUG("Closing connection #%v (timeout)",
  516. connection->Id);
  517. }
  518. }
  519. }
  520. SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address)
  521. {
  522. int lastError = 0;
  523. for (auto i = address->Begin(); i != address->End(); ++i) {
  524. struct addrinfo* info = &*i;
  525. if (TConfig::Get()->ForceIpV4 && info->ai_family != AF_INET) {
  526. continue;
  527. }
  528. if (TConfig::Get()->ForceIpV6 && info->ai_family != AF_INET6) {
  529. continue;
  530. }
  531. TSocketHolder socket(
  532. ::socket(info->ai_family, info->ai_socktype, info->ai_protocol));
  533. if (socket.Closed()) {
  534. lastError = LastSystemError();
  535. continue;
  536. }
  537. SetNonBlock(socket, true);
  538. if (TConfig::Get()->SocketPriority) {
  539. SetSocketPriority(socket, *TConfig::Get()->SocketPriority);
  540. }
  541. if (connect(socket, info->ai_addr, info->ai_addrlen) == 0)
  542. return socket.Release();
  543. int err = LastSystemError();
  544. if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
  545. struct pollfd p = {
  546. socket,
  547. POLLOUT,
  548. 0
  549. };
  550. const ssize_t n = PollD(&p, 1, TInstant::Now() + TConfig::Get()->ConnectTimeout);
  551. if (n < 0) {
  552. ythrow TSystemError(-(int)n) << "can not connect to " << info;
  553. }
  554. CheckedGetSockOpt(socket, SOL_SOCKET, SO_ERROR, err, "socket error");
  555. if (!err)
  556. return socket.Release();
  557. }
  558. lastError = err;
  559. continue;
  560. }
  561. ythrow TSystemError(lastError) << "can not connect to " << *address;
  562. }
  563. ////////////////////////////////////////////////////////////////////////////////
  564. class THttpResponse::THttpInputWrapped
  565. : public IInputStream
  566. {
  567. public:
  568. explicit THttpInputWrapped(TRequestContext context, IInputStream* input)
  569. : Context_(std::move(context))
  570. , HttpInput_(input)
  571. { }
  572. const THttpHeaders& Headers() const noexcept
  573. {
  574. return HttpInput_.Headers();
  575. }
  576. const TString& FirstLine() const noexcept
  577. {
  578. return HttpInput_.FirstLine();
  579. }
  580. bool IsKeepAlive() const noexcept
  581. {
  582. return HttpInput_.IsKeepAlive();
  583. }
  584. const TMaybe<THttpHeaders>& Trailers() const noexcept
  585. {
  586. return HttpInput_.Trailers();
  587. }
  588. private:
  589. size_t DoRead(void* buf, size_t len) override
  590. {
  591. try {
  592. return HttpInput_.Read(buf, len);
  593. } catch (const std::exception& ex) {
  594. auto wrapped = WrapSystemError(Context_, ex);
  595. std::rethrow_exception(wrapped);
  596. }
  597. }
  598. size_t DoSkip(size_t len) override
  599. {
  600. try {
  601. return HttpInput_.Skip(len);
  602. } catch (const std::exception& ex) {
  603. auto wrapped = WrapSystemError(Context_, ex);
  604. std::rethrow_exception(wrapped);
  605. }
  606. }
  607. private:
  608. const TRequestContext Context_;
  609. THttpInput HttpInput_;
  610. };
  611. THttpResponse::THttpResponse(
  612. TRequestContext context,
  613. IInputStream* socketStream)
  614. : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream))
  615. , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing"))
  616. , Context_(std::move(context))
  617. {
  618. if (auto proxyHeader = HttpInput_->Headers().FindHeader("X-YT-Proxy")) {
  619. Context_.HostName = proxyHeader->Value();
  620. }
  621. HttpCode_ = ParseHttpRetCode(HttpInput_->FirstLine());
  622. if (HttpCode_ == 200 || HttpCode_ == 202) {
  623. return;
  624. }
  625. ErrorResponse_ = TErrorResponse(HttpCode_, Context_.RequestId);
  626. auto logAndSetError = [&] (const TString& rawError) {
  627. YT_LOG_ERROR("RSP %v - HTTP %v - %v",
  628. Context_.RequestId,
  629. HttpCode_,
  630. rawError.data());
  631. ErrorResponse_->SetRawError(rawError);
  632. };
  633. switch (HttpCode_) {
  634. case 429:
  635. logAndSetError("request rate limit exceeded");
  636. break;
  637. case 500:
  638. logAndSetError(::TStringBuilder() << "internal error in proxy " << Context_.HostName);
  639. break;
  640. default: {
  641. TStringStream httpHeaders;
  642. httpHeaders << "HTTP headers (";
  643. for (const auto& header : HttpInput_->Headers()) {
  644. httpHeaders << header.Name() << ": " << header.Value() << "; ";
  645. }
  646. httpHeaders << ")";
  647. auto errorString = Sprintf("RSP %s - HTTP %d - %s",
  648. Context_.RequestId.data(),
  649. HttpCode_,
  650. httpHeaders.Str().data());
  651. YT_LOG_ERROR("%v",
  652. errorString.data());
  653. if (auto parsedResponse = ParseError(HttpInput_->Headers())) {
  654. ErrorResponse_ = parsedResponse.GetRef();
  655. } else {
  656. ErrorResponse_->SetRawError(
  657. errorString + " - X-YT-Error is missing in headers");
  658. }
  659. break;
  660. }
  661. }
  662. }
  663. THttpResponse::~THttpResponse()
  664. { }
  665. const THttpHeaders& THttpResponse::Headers() const
  666. {
  667. return HttpInput_->Headers();
  668. }
  669. void THttpResponse::CheckErrorResponse() const
  670. {
  671. if (ErrorResponse_) {
  672. throw *ErrorResponse_;
  673. }
  674. }
  675. bool THttpResponse::IsExhausted() const
  676. {
  677. return IsExhausted_;
  678. }
  679. int THttpResponse::GetHttpCode() const
  680. {
  681. return HttpCode_;
  682. }
  683. const TString& THttpResponse::GetHostName() const
  684. {
  685. return Context_.HostName;
  686. }
  687. bool THttpResponse::IsKeepAlive() const
  688. {
  689. return HttpInput_->IsKeepAlive();
  690. }
  691. TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers)
  692. {
  693. for (const auto& header : headers) {
  694. if (header.Name() == "X-YT-Error") {
  695. TErrorResponse errorResponse(HttpCode_, Context_.RequestId);
  696. errorResponse.ParseFromJsonError(header.Value());
  697. if (errorResponse.IsOk()) {
  698. return Nothing();
  699. }
  700. return errorResponse;
  701. }
  702. }
  703. return Nothing();
  704. }
  705. size_t THttpResponse::DoRead(void* buf, size_t len)
  706. {
  707. size_t read;
  708. if (Unframe_) {
  709. read = UnframeRead(buf, len);
  710. } else {
  711. read = HttpInput_->Read(buf, len);
  712. }
  713. if (read == 0 && len != 0) {
  714. // THttpInput MUST return defined (but may be empty)
  715. // trailers when it is exhausted.
  716. Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(),
  717. "trailers MUST be defined for exhausted stream");
  718. CheckTrailers(HttpInput_->Trailers().GetRef());
  719. IsExhausted_ = true;
  720. }
  721. return read;
  722. }
  723. size_t THttpResponse::DoSkip(size_t len)
  724. {
  725. size_t skipped;
  726. if (Unframe_) {
  727. skipped = UnframeSkip(len);
  728. } else {
  729. skipped = HttpInput_->Skip(len);
  730. }
  731. if (skipped == 0 && len != 0) {
  732. // THttpInput MUST return defined (but may be empty)
  733. // trailers when it is exhausted.
  734. Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(),
  735. "trailers MUST be defined for exhausted stream");
  736. CheckTrailers(HttpInput_->Trailers().GetRef());
  737. IsExhausted_ = true;
  738. }
  739. return skipped;
  740. }
  741. void THttpResponse::CheckTrailers(const THttpHeaders& trailers)
  742. {
  743. if (auto errorResponse = ParseError(trailers)) {
  744. errorResponse->SetIsFromTrailers(true);
  745. YT_LOG_ERROR("RSP %v - %v",
  746. Context_.RequestId,
  747. errorResponse.GetRef().what());
  748. ythrow errorResponse.GetRef();
  749. }
  750. }
  751. static ui32 ReadDataFrameSize(IInputStream* stream)
  752. {
  753. ui32 littleEndianSize;
  754. auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize));
  755. if (read < sizeof(littleEndianSize)) {
  756. ythrow yexception() << "Bad data frame header: " <<
  757. "expected " << sizeof(littleEndianSize) << " bytes, got " << read;
  758. }
  759. return LittleToHost(littleEndianSize);
  760. }
  761. bool THttpResponse::RefreshFrameIfNecessary()
  762. {
  763. while (RemainingFrameSize_ == 0) {
  764. ui8 frameTypeByte;
  765. auto read = HttpInput_->Read(&frameTypeByte, sizeof(frameTypeByte));
  766. if (read == 0) {
  767. return false;
  768. }
  769. auto frameType = static_cast<EFrameType>(frameTypeByte);
  770. switch (frameType) {
  771. case EFrameType::KeepAlive:
  772. break;
  773. case EFrameType::Data:
  774. RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get());
  775. break;
  776. default:
  777. ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte);
  778. }
  779. }
  780. return true;
  781. }
  782. size_t THttpResponse::UnframeRead(void* buf, size_t len)
  783. {
  784. if (!RefreshFrameIfNecessary()) {
  785. return 0;
  786. }
  787. auto read = HttpInput_->Read(buf, Min(len, RemainingFrameSize_));
  788. RemainingFrameSize_ -= read;
  789. return read;
  790. }
  791. size_t THttpResponse::UnframeSkip(size_t len)
  792. {
  793. if (!RefreshFrameIfNecessary()) {
  794. return 0;
  795. }
  796. auto skipped = HttpInput_->Skip(Min(len, RemainingFrameSize_));
  797. RemainingFrameSize_ -= skipped;
  798. return skipped;
  799. }
  800. ////////////////////////////////////////////////////////////////////////////////
  801. THttpRequest::THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout)
  802. : Context_(TRequestContext{
  803. .RequestId = std::move(requestId),
  804. .HostName = std::move(hostName),
  805. .Method = header.GetUrl(/*needProxy=*/ false)
  806. })
  807. , Header_(std::move(header))
  808. , Url_(Header_.GetUrl(true))
  809. , SocketTimeout_(socketTimeout)
  810. { }
  811. THttpRequest::~THttpRequest()
  812. {
  813. if (!Connection_) {
  814. return;
  815. }
  816. if (Input_ && Input_->IsKeepAlive() && Input_->IsExhausted()) {
  817. // We should return to the pool only connections where HTTP response was fully read.
  818. // Otherwise next reader might read our remaining data and misinterpret them (YT-6510).
  819. TConnectionPool::Get()->Release(Connection_);
  820. } else {
  821. TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_);
  822. }
  823. }
  824. TString THttpRequest::GetRequestId() const
  825. {
  826. return Context_.RequestId;
  827. }
  828. IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters)
  829. {
  830. YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool",
  831. Context_.RequestId,
  832. Context_.HostName);
  833. StartTime_ = TInstant::Now();
  834. try {
  835. Connection_ = TConnectionPool::Get()->Connect(Context_.HostName, SocketTimeout_);
  836. } catch (const std::exception& ex) {
  837. auto wrapped = WrapSystemError(Context_, ex);
  838. std::rethrow_exception(wrapped);
  839. }
  840. YT_LOG_DEBUG("REQ %v - connection #%v",
  841. Context_.RequestId,
  842. Connection_->Id);
  843. auto strHeader = Header_.GetHeaderAsString(Context_.HostName, Context_.RequestId, includeParameters);
  844. LogRequest(Header_, Url_, includeParameters, Context_.RequestId, Context_.HostName);
  845. LoggedAttributes_ = GetLoggedAttributes(Header_, Url_, includeParameters, 128);
  846. auto outputFormat = Header_.GetOutputFormat();
  847. if (outputFormat && outputFormat->IsTextYson()) {
  848. LogResponse_ = true;
  849. }
  850. RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get());
  851. RequestStream_->Write(strHeader.data(), strHeader.size());
  852. return RequestStream_.Get();
  853. }
  854. IOutputStream* THttpRequest::StartRequest()
  855. {
  856. return StartRequestImpl(true);
  857. }
  858. void THttpRequest::FinishRequest()
  859. {
  860. RequestStream_->Flush();
  861. RequestStream_->Finish();
  862. }
  863. void THttpRequest::SmallRequest(TMaybe<TStringBuf> body)
  864. {
  865. if (!body && (Header_.GetMethod() == "PUT" || Header_.GetMethod() == "POST")) {
  866. const auto& parameters = Header_.GetParameters();
  867. auto parametersStr = NodeToYsonString(parameters);
  868. auto* output = StartRequestImpl(false);
  869. output->Write(parametersStr);
  870. FinishRequest();
  871. } else {
  872. auto* output = StartRequest();
  873. if (body) {
  874. output->Write(*body);
  875. }
  876. FinishRequest();
  877. }
  878. }
  879. THttpResponse* THttpRequest::GetResponseStream()
  880. {
  881. if (!Input_) {
  882. SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get()));
  883. if (TConfig::Get()->UseAbortableResponse) {
  884. Y_ABORT_UNLESS(!Url_.empty());
  885. Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_));
  886. } else {
  887. Input_.Reset(new THttpResponse(Context_, SocketInput_.Get()));
  888. }
  889. Input_->CheckErrorResponse();
  890. }
  891. return Input_.Get();
  892. }
  893. TString THttpRequest::GetResponse()
  894. {
  895. TString result = GetResponseStream()->ReadAll();
  896. TStringStream loggedAttributes;
  897. loggedAttributes
  898. << "Time: " << TInstant::Now() - StartTime_ << "; "
  899. << "HostName: " << GetResponseStream()->GetHostName() << "; "
  900. << LoggedAttributes_;
  901. if (LogResponse_) {
  902. constexpr auto sizeLimit = 1 << 7;
  903. YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
  904. Context_.RequestId,
  905. TruncateForLogs(result, sizeLimit),
  906. loggedAttributes.Str());
  907. } else {
  908. YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
  909. Context_.RequestId,
  910. result.size(),
  911. loggedAttributes.Str());
  912. }
  913. return result;
  914. }
  915. int THttpRequest::GetHttpCode() {
  916. return GetResponseStream()->GetHttpCode();
  917. }
  918. void THttpRequest::InvalidateConnection()
  919. {
  920. TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_);
  921. Connection_.Reset();
  922. }
  923. ////////////////////////////////////////////////////////////////////////////////
  924. } // namespace NYT