http.cpp 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118
  1. #include "http.h"
  2. #include "abortable_http_response.h"
  3. #include "core.h"
  4. #include "helpers.h"
  5. #include <yt/cpp/mapreduce/common/helpers.h>
  6. #include <yt/cpp/mapreduce/common/retry_lib.h>
  7. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  8. #include <yt/cpp/mapreduce/interface/config.h>
  9. #include <yt/cpp/mapreduce/interface/errors.h>
  10. #include <yt/cpp/mapreduce/interface/error_codes.h>
  11. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  12. #include <yt/yt/core/http/http.h>
  13. #include <library/cpp/json/json_writer.h>
  14. #include <library/cpp/string_utils/base64/base64.h>
  15. #include <library/cpp/string_utils/quote/quote.h>
  16. #include <util/generic/singleton.h>
  17. #include <util/generic/algorithm.h>
  18. #include <util/stream/mem.h>
  19. #include <util/string/builder.h>
  20. #include <util/string/cast.h>
  21. #include <util/string/escape.h>
  22. #include <util/string/printf.h>
  23. #include <util/system/byteorder.h>
  24. #include <util/system/getpid.h>
  25. #include <exception>
  26. namespace NYT {
  27. ////////////////////////////////////////////////////////////////////////////////
  28. std::exception_ptr WrapSystemError(
  29. const TRequestContext& context,
  30. const std::exception& ex)
  31. {
  32. if (auto errorResponse = dynamic_cast<const TErrorResponse*>(&ex); errorResponse != nullptr) {
  33. return std::make_exception_ptr(errorResponse);
  34. }
  35. auto message = NYT::Format("Request %qv to %qv failed", context.RequestId, context.HostName + context.Method);
  36. TYtError outer(1, message, {TYtError(NClusterErrorCodes::Generic, ex.what())}, {
  37. {"request_id", context.RequestId},
  38. {"host", context.HostName},
  39. {"method", context.Method},
  40. });
  41. TTransportError errorResponse(std::move(outer));
  42. return std::make_exception_ptr(errorResponse);
  43. }
  44. ////////////////////////////////////////////////////////////////////////////////
  45. class THttpRequest::TRequestStream
  46. : public IOutputStream
  47. {
  48. public:
  49. TRequestStream(THttpRequest* httpRequest, const TSocket& s)
  50. : HttpRequest_(httpRequest)
  51. , SocketOutput_(s)
  52. , HttpOutput_(static_cast<IOutputStream*>(&SocketOutput_))
  53. {
  54. HttpOutput_.EnableKeepAlive(true);
  55. }
  56. private:
  57. void DoWrite(const void* buf, size_t len) override
  58. {
  59. WrapWriteFunc([&] {
  60. HttpOutput_.Write(buf, len);
  61. });
  62. }
  63. void DoWriteV(const TPart* parts, size_t count) override
  64. {
  65. WrapWriteFunc([&] {
  66. HttpOutput_.Write(parts, count);
  67. });
  68. }
  69. void DoWriteC(char ch) override
  70. {
  71. WrapWriteFunc([&] {
  72. HttpOutput_.Write(ch);
  73. });
  74. }
  75. void DoFlush() override
  76. {
  77. WrapWriteFunc([&] {
  78. HttpOutput_.Flush();
  79. });
  80. }
  81. void DoFinish() override
  82. {
  83. WrapWriteFunc([&] {
  84. HttpOutput_.Finish();
  85. });
  86. }
  87. void WrapWriteFunc(std::function<void()> func)
  88. {
  89. CheckErrorState();
  90. try {
  91. func();
  92. } catch (const std::exception& ex) {
  93. HandleWriteException(ex);
  94. }
  95. }
  96. // In many cases http proxy stops reading request and resets connection
  97. // if error has happend. This function tries to read error response
  98. // in such cases.
  99. void HandleWriteException(const std::exception& ex) {
  100. Y_ABORT_UNLESS(WriteError_ == nullptr);
  101. WriteError_ = WrapSystemError(HttpRequest_->Context_, ex);
  102. Y_ABORT_UNLESS(WriteError_ != nullptr);
  103. try {
  104. HttpRequest_->GetResponseStream();
  105. } catch (const TErrorResponse &) {
  106. throw;
  107. } catch (...) {
  108. }
  109. std::rethrow_exception(WriteError_);
  110. }
  111. void CheckErrorState()
  112. {
  113. if (WriteError_) {
  114. std::rethrow_exception(WriteError_);
  115. }
  116. }
  117. private:
  118. THttpRequest* const HttpRequest_;
  119. TSocketOutput SocketOutput_;
  120. THttpOutput HttpOutput_;
  121. std::exception_ptr WriteError_;
  122. };
  123. ////////////////////////////////////////////////////////////////////////////////
  124. THttpHeader::THttpHeader(const TString& method, const TString& command, bool isApi)
  125. : Method_(method)
  126. , Command_(command)
  127. , IsApi_(isApi)
  128. { }
  129. void THttpHeader::AddParameter(const TString& key, TNode value, bool overwrite)
  130. {
  131. auto it = Parameters_.find(key);
  132. if (it == Parameters_.end()) {
  133. Parameters_.emplace(key, std::move(value));
  134. } else {
  135. if (overwrite) {
  136. it->second = std::move(value);
  137. } else {
  138. ythrow yexception() << "Duplicate key: " << key;
  139. }
  140. }
  141. }
  142. void THttpHeader::MergeParameters(const TNode& newParameters, bool overwrite)
  143. {
  144. for (const auto& p : newParameters.AsMap()) {
  145. AddParameter(p.first, p.second, overwrite);
  146. }
  147. }
  148. void THttpHeader::RemoveParameter(const TString& key)
  149. {
  150. Parameters_.erase(key);
  151. }
  152. TNode THttpHeader::GetParameters() const
  153. {
  154. return Parameters_;
  155. }
  156. void THttpHeader::AddTransactionId(const TTransactionId& transactionId, bool overwrite)
  157. {
  158. if (transactionId) {
  159. AddParameter("transaction_id", GetGuidAsString(transactionId), overwrite);
  160. } else {
  161. RemoveParameter("transaction_id");
  162. }
  163. }
  164. void THttpHeader::AddPath(const TString& path, bool overwrite)
  165. {
  166. AddParameter("path", path, overwrite);
  167. }
  168. void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite)
  169. {
  170. AddParameter("operation_id", GetGuidAsString(operationId), overwrite);
  171. }
  172. TMutationId THttpHeader::AddMutationId()
  173. {
  174. TGUID guid;
  175. // Some users use `fork()' with yt wrapper
  176. // (actually they use python + multiprocessing)
  177. // and CreateGuid is not resistant to `fork()', so spice it a little bit.
  178. //
  179. // Check IGNIETFERRO-610
  180. CreateGuid(&guid);
  181. guid.dw[2] = GetPID() ^ MicroSeconds();
  182. AddParameter("mutation_id", GetGuidAsString(guid), true);
  183. return guid;
  184. }
  185. bool THttpHeader::HasMutationId() const
  186. {
  187. return Parameters_.contains("mutation_id");
  188. }
  189. void THttpHeader::SetMutationId(TMutationId mutationId)
  190. {
  191. AddParameter("mutation_id", GetGuidAsString(mutationId), /*overwrite*/ true);
  192. }
  193. void THttpHeader::SetToken(const TString& token)
  194. {
  195. Token_ = token;
  196. }
  197. void THttpHeader::SetProxyAddress(const TString& proxyAddress)
  198. {
  199. ProxyAddress_ = proxyAddress;
  200. }
  201. void THttpHeader::SetHostPort(const TString& hostPort)
  202. {
  203. HostPort_ = hostPort;
  204. }
  205. void THttpHeader::SetImpersonationUser(const TString& impersonationUser)
  206. {
  207. ImpersonationUser_ = impersonationUser;
  208. }
  209. void THttpHeader::SetServiceTicket(const TString& ticket)
  210. {
  211. ServiceTicket_ = ticket;
  212. }
  213. void THttpHeader::SetInputFormat(const TMaybe<TFormat>& format)
  214. {
  215. InputFormat_ = format;
  216. }
  217. void THttpHeader::SetOutputFormat(const TMaybe<TFormat>& format)
  218. {
  219. OutputFormat_ = format;
  220. }
  221. TMaybe<TFormat> THttpHeader::GetOutputFormat() const
  222. {
  223. return OutputFormat_;
  224. }
  225. void THttpHeader::SetRequestCompression(const TString& compression)
  226. {
  227. RequestCompression_ = compression;
  228. }
  229. void THttpHeader::SetResponseCompression(const TString& compression)
  230. {
  231. ResponseCompression_ = compression;
  232. }
  233. TString THttpHeader::GetCommand() const
  234. {
  235. return Command_;
  236. }
  237. TString THttpHeader::GetUrl(bool needProxy) const
  238. {
  239. TStringStream url;
  240. if (needProxy && !ProxyAddress_.empty()) {
  241. url << ProxyAddress_ << "/";
  242. return url.Str();
  243. }
  244. if (!ProxyAddress_.empty()) {
  245. url << HostPort_;
  246. }
  247. if (IsApi_) {
  248. url << "/api/" << TConfig::Get()->ApiVersion << "/" << Command_;
  249. } else {
  250. url << "/" << Command_;
  251. }
  252. return url.Str();
  253. }
  254. bool THttpHeader::ShouldAcceptFraming() const
  255. {
  256. return TConfig::Get()->CommandsWithFraming.contains(Command_);
  257. }
  258. TString THttpHeader::GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters) const
  259. {
  260. TStringStream result;
  261. result << Method_ << " " << GetUrl() << " HTTP/1.1\r\n";
  262. GetHeader(HostPort_.empty() ? hostName : HostPort_, requestId, includeParameters).Get()->WriteTo(&result);
  263. if (ShouldAcceptFraming()) {
  264. result << "X-YT-Accept-Framing: 1\r\n";
  265. }
  266. result << "\r\n";
  267. return result.Str();
  268. }
  269. NHttp::THeadersPtrWrapper THttpHeader::GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const
  270. {
  271. auto headers = New<NHttp::THeaders>();
  272. headers->Add("Host", hostName);
  273. headers->Add("User-Agent", TProcessState::Get()->ClientVersion);
  274. if (!Token_.empty()) {
  275. headers->Add("Authorization", "OAuth " + Token_);
  276. }
  277. if (!ServiceTicket_.empty()) {
  278. headers->Add("X-Ya-Service-Ticket", ServiceTicket_);
  279. }
  280. if (!ImpersonationUser_.empty()) {
  281. headers->Add("X-Yt-User-Name", ImpersonationUser_);
  282. }
  283. if (Method_ == "PUT" || Method_ == "POST") {
  284. headers->Add("Transfer-Encoding", "chunked");
  285. }
  286. headers->Add("X-YT-Correlation-Id", requestId);
  287. headers->Add("X-YT-Header-Format", "<format=text>yson");
  288. headers->Add("Content-Encoding", RequestCompression_);
  289. headers->Add("Accept-Encoding", ResponseCompression_);
  290. auto printYTHeader = [&headers] (const char* headerName, const TString& value) {
  291. static const size_t maxHttpHeaderSize = 64 << 10;
  292. if (!value) {
  293. return;
  294. }
  295. if (value.size() <= maxHttpHeaderSize) {
  296. headers->Add(headerName, value);
  297. return;
  298. }
  299. TString encoded;
  300. Base64Encode(value, encoded);
  301. auto ptr = encoded.data();
  302. auto finish = encoded.data() + encoded.size();
  303. size_t index = 0;
  304. do {
  305. auto end = Min(ptr + maxHttpHeaderSize, finish);
  306. headers->Add(Format("%v%v", headerName, index++), TString(ptr, end));
  307. ptr = end;
  308. } while (ptr != finish);
  309. };
  310. if (InputFormat_) {
  311. printYTHeader("X-YT-Input-Format", NodeToYsonString(InputFormat_->Config));
  312. }
  313. if (OutputFormat_) {
  314. printYTHeader("X-YT-Output-Format", NodeToYsonString(OutputFormat_->Config));
  315. }
  316. if (includeParameters) {
  317. printYTHeader("X-YT-Parameters", NodeToYsonString(Parameters_));
  318. }
  319. return NHttp::THeadersPtrWrapper(std::move(headers));
  320. }
  321. const TString& THttpHeader::GetMethod() const
  322. {
  323. return Method_;
  324. }
  325. ////////////////////////////////////////////////////////////////////////////////
  326. TAddressCache* TAddressCache::Get()
  327. {
  328. return Singleton<TAddressCache>();
  329. }
  330. bool ContainsAddressOfRequiredVersion(const TAddressCache::TAddressPtr& address)
  331. {
  332. if (!TConfig::Get()->ForceIpV4 && !TConfig::Get()->ForceIpV6) {
  333. return true;
  334. }
  335. for (auto i = address->Begin(); i != address->End(); ++i) {
  336. const auto& addressInfo = *i;
  337. if (TConfig::Get()->ForceIpV4 && addressInfo.ai_family == AF_INET) {
  338. return true;
  339. }
  340. if (TConfig::Get()->ForceIpV6 && addressInfo.ai_family == AF_INET6) {
  341. return true;
  342. }
  343. }
  344. return false;
  345. }
  346. TAddressCache::TAddressPtr TAddressCache::Resolve(const TString& hostName)
  347. {
  348. auto address = FindAddress(hostName);
  349. if (address) {
  350. return address;
  351. }
  352. TString host(hostName);
  353. ui16 port = 80;
  354. auto colon = hostName.find(':');
  355. if (colon != TString::npos) {
  356. port = FromString<ui16>(hostName.substr(colon + 1));
  357. host = hostName.substr(0, colon);
  358. }
  359. auto retryPolicy = CreateDefaultRequestRetryPolicy(TConfig::Get());
  360. auto error = yexception() << "can not resolve address of required version for host " << hostName;
  361. while (true) {
  362. address = new TNetworkAddress(host, port);
  363. if (ContainsAddressOfRequiredVersion(address)) {
  364. break;
  365. }
  366. retryPolicy->NotifyNewAttempt();
  367. YT_LOG_DEBUG("Failed to resolve address of required version for host %v, retrying: %v",
  368. hostName,
  369. retryPolicy->GetAttemptDescription());
  370. if (auto backoffDuration = retryPolicy->OnGenericError(error)) {
  371. NDetail::TWaitProxy::Get()->Sleep(*backoffDuration);
  372. } else {
  373. ythrow error;
  374. }
  375. }
  376. AddAddress(hostName, address);
  377. return address;
  378. }
  379. TAddressCache::TAddressPtr TAddressCache::FindAddress(const TString& hostName) const
  380. {
  381. TCacheEntry entry;
  382. {
  383. TReadGuard guard(Lock_);
  384. auto it = Cache_.find(hostName);
  385. if (it == Cache_.end()) {
  386. return nullptr;
  387. }
  388. entry = it->second;
  389. }
  390. if (TInstant::Now() > entry.ExpirationTime) {
  391. YT_LOG_DEBUG("Address resolution cache entry for host %v is expired, will retry resolution",
  392. hostName);
  393. return nullptr;
  394. }
  395. if (!ContainsAddressOfRequiredVersion(entry.Address)) {
  396. YT_LOG_DEBUG("Address of required version not found for host %v, will retry resolution",
  397. hostName);
  398. return nullptr;
  399. }
  400. return entry.Address;
  401. }
  402. void TAddressCache::AddAddress(TString hostName, TAddressPtr address)
  403. {
  404. auto entry = TCacheEntry{
  405. .Address = std::move(address),
  406. .ExpirationTime = TInstant::Now() + TConfig::Get()->AddressCacheExpirationTimeout,
  407. };
  408. {
  409. TWriteGuard guard(Lock_);
  410. Cache_.emplace(std::move(hostName), std::move(entry));
  411. }
  412. }
  413. ////////////////////////////////////////////////////////////////////////////////
  414. TConnectionPool* TConnectionPool::Get()
  415. {
  416. return Singleton<TConnectionPool>();
  417. }
  418. TConnectionPtr TConnectionPool::Connect(
  419. const TString& hostName,
  420. TDuration socketTimeout)
  421. {
  422. Refresh();
  423. if (socketTimeout == TDuration::Zero()) {
  424. socketTimeout = TConfig::Get()->SocketTimeout;
  425. }
  426. {
  427. auto guard = Guard(Lock_);
  428. auto now = TInstant::Now();
  429. auto range = Connections_.equal_range(hostName);
  430. for (auto it = range.first; it != range.second; ++it) {
  431. auto& connection = it->second;
  432. if (connection->DeadLine < now) {
  433. continue;
  434. }
  435. if (!AtomicCas(&connection->Busy, 1, 0)) {
  436. continue;
  437. }
  438. connection->DeadLine = now + socketTimeout;
  439. connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
  440. return connection;
  441. }
  442. }
  443. TConnectionPtr connection(new TConnection);
  444. auto networkAddress = TAddressCache::Get()->Resolve(hostName);
  445. TSocketHolder socket(DoConnect(networkAddress));
  446. SetNonBlock(socket, false);
  447. connection->Socket = std::make_unique<TSocket>(socket.Release());
  448. connection->DeadLine = TInstant::Now() + socketTimeout;
  449. connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
  450. {
  451. auto guard = Guard(Lock_);
  452. static ui32 connectionId = 0;
  453. connection->Id = ++connectionId;
  454. Connections_.insert({hostName, connection});
  455. }
  456. YT_LOG_DEBUG("New connection to %v #%v opened",
  457. hostName,
  458. connection->Id);
  459. return connection;
  460. }
  461. void TConnectionPool::Release(TConnectionPtr connection)
  462. {
  463. auto socketTimeout = TConfig::Get()->SocketTimeout;
  464. auto newDeadline = TInstant::Now() + socketTimeout;
  465. {
  466. auto guard = Guard(Lock_);
  467. connection->DeadLine = newDeadline;
  468. }
  469. connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
  470. AtomicSet(connection->Busy, 0);
  471. Refresh();
  472. }
  473. void TConnectionPool::Invalidate(
  474. const TString& hostName,
  475. TConnectionPtr connection)
  476. {
  477. auto guard = Guard(Lock_);
  478. auto range = Connections_.equal_range(hostName);
  479. for (auto it = range.first; it != range.second; ++it) {
  480. if (it->second == connection) {
  481. YT_LOG_DEBUG("Closing connection #%v",
  482. connection->Id);
  483. Connections_.erase(it);
  484. return;
  485. }
  486. }
  487. }
  488. void TConnectionPool::Refresh()
  489. {
  490. auto guard = Guard(Lock_);
  491. // simple, since we don't expect too many connections
  492. using TItem = std::pair<TInstant, TConnectionMap::iterator>;
  493. std::vector<TItem> sortedConnections;
  494. for (auto it = Connections_.begin(); it != Connections_.end(); ++it) {
  495. sortedConnections.emplace_back(it->second->DeadLine, it);
  496. }
  497. std::sort(
  498. sortedConnections.begin(),
  499. sortedConnections.end(),
  500. [] (const TItem& a, const TItem& b) -> bool {
  501. return a.first < b.first;
  502. });
  503. auto removeCount = static_cast<int>(Connections_.size()) - TConfig::Get()->ConnectionPoolSize;
  504. const auto now = TInstant::Now();
  505. for (const auto& item : sortedConnections) {
  506. const auto& mapIterator = item.second;
  507. auto connection = mapIterator->second;
  508. if (AtomicGet(connection->Busy)) {
  509. continue;
  510. }
  511. if (removeCount > 0) {
  512. Connections_.erase(mapIterator);
  513. YT_LOG_DEBUG("Closing connection #%v (too many opened connections)",
  514. connection->Id);
  515. --removeCount;
  516. continue;
  517. }
  518. if (connection->DeadLine < now) {
  519. Connections_.erase(mapIterator);
  520. YT_LOG_DEBUG("Closing connection #%v (timeout)",
  521. connection->Id);
  522. }
  523. }
  524. }
  525. SOCKET TConnectionPool::DoConnect(TAddressCache::TAddressPtr address)
  526. {
  527. int lastError = 0;
  528. for (auto i = address->Begin(); i != address->End(); ++i) {
  529. struct addrinfo* info = &*i;
  530. if (TConfig::Get()->ForceIpV4 && info->ai_family != AF_INET) {
  531. continue;
  532. }
  533. if (TConfig::Get()->ForceIpV6 && info->ai_family != AF_INET6) {
  534. continue;
  535. }
  536. TSocketHolder socket(
  537. ::socket(info->ai_family, info->ai_socktype, info->ai_protocol));
  538. if (socket.Closed()) {
  539. lastError = LastSystemError();
  540. continue;
  541. }
  542. SetNonBlock(socket, true);
  543. if (TConfig::Get()->SocketPriority) {
  544. SetSocketPriority(socket, *TConfig::Get()->SocketPriority);
  545. }
  546. if (connect(socket, info->ai_addr, info->ai_addrlen) == 0)
  547. return socket.Release();
  548. int err = LastSystemError();
  549. if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
  550. struct pollfd p = {
  551. socket,
  552. POLLOUT,
  553. 0
  554. };
  555. const ssize_t n = PollD(&p, 1, TInstant::Now() + TConfig::Get()->ConnectTimeout);
  556. if (n < 0) {
  557. ythrow TSystemError(-(int)n) << "can not connect to " << info;
  558. }
  559. CheckedGetSockOpt(socket, SOL_SOCKET, SO_ERROR, err, "socket error");
  560. if (!err)
  561. return socket.Release();
  562. }
  563. lastError = err;
  564. continue;
  565. }
  566. ythrow TSystemError(lastError) << "can not connect to " << *address;
  567. }
  568. ////////////////////////////////////////////////////////////////////////////////
  569. class THttpResponse::THttpInputWrapped
  570. : public IInputStream
  571. {
  572. public:
  573. explicit THttpInputWrapped(TRequestContext context, IInputStream* input)
  574. : Context_(std::move(context))
  575. , HttpInput_(input)
  576. { }
  577. const THttpHeaders& Headers() const noexcept
  578. {
  579. return HttpInput_.Headers();
  580. }
  581. const TString& FirstLine() const noexcept
  582. {
  583. return HttpInput_.FirstLine();
  584. }
  585. bool IsKeepAlive() const noexcept
  586. {
  587. return HttpInput_.IsKeepAlive();
  588. }
  589. const TMaybe<THttpHeaders>& Trailers() const noexcept
  590. {
  591. return HttpInput_.Trailers();
  592. }
  593. private:
  594. size_t DoRead(void* buf, size_t len) override
  595. {
  596. try {
  597. return HttpInput_.Read(buf, len);
  598. } catch (const std::exception& ex) {
  599. auto wrapped = WrapSystemError(Context_, ex);
  600. std::rethrow_exception(wrapped);
  601. }
  602. }
  603. size_t DoSkip(size_t len) override
  604. {
  605. try {
  606. return HttpInput_.Skip(len);
  607. } catch (const std::exception& ex) {
  608. auto wrapped = WrapSystemError(Context_, ex);
  609. std::rethrow_exception(wrapped);
  610. }
  611. }
  612. private:
  613. const TRequestContext Context_;
  614. THttpInput HttpInput_;
  615. };
  616. THttpResponse::THttpResponse(
  617. TRequestContext context,
  618. IInputStream* socketStream)
  619. : HttpInput_(std::make_unique<THttpInputWrapped>(context, socketStream))
  620. , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing"))
  621. , Context_(std::move(context))
  622. {
  623. if (auto proxyHeader = HttpInput_->Headers().FindHeader("X-YT-Proxy")) {
  624. Context_.HostName = proxyHeader->Value();
  625. }
  626. HttpCode_ = ParseHttpRetCode(HttpInput_->FirstLine());
  627. if (HttpCode_ == 200 || HttpCode_ == 202) {
  628. return;
  629. }
  630. auto logAndSetError = [&] (int code, const TString& rawError) {
  631. YT_LOG_ERROR("RSP %v - HTTP %v - %v",
  632. Context_.RequestId,
  633. HttpCode_,
  634. rawError.data());
  635. ErrorResponse_ = TErrorResponse(TYtError(code, rawError), Context_.RequestId);
  636. };
  637. switch (HttpCode_) {
  638. case 429:
  639. logAndSetError(NClusterErrorCodes::NSecurityClient::RequestQueueSizeLimitExceeded, "request rate limit exceeded");
  640. break;
  641. case 500:
  642. logAndSetError(NClusterErrorCodes::NRpc::Unavailable, ::TStringBuilder() << "internal error in proxy " << Context_.HostName);
  643. break;
  644. case 503:
  645. logAndSetError(NClusterErrorCodes::NBus::TransportError, "service unavailable");
  646. break;
  647. default: {
  648. TStringStream httpHeaders;
  649. httpHeaders << "HTTP headers (";
  650. for (const auto& header : HttpInput_->Headers()) {
  651. httpHeaders << header.Name() << ": " << header.Value() << "; ";
  652. }
  653. httpHeaders << ")";
  654. auto errorString = Sprintf("RSP %s - HTTP %d - %s",
  655. Context_.RequestId.data(),
  656. HttpCode_,
  657. httpHeaders.Str().data());
  658. YT_LOG_ERROR("%v",
  659. errorString.data());
  660. if (auto parsedResponse = ParseError(HttpInput_->Headers())) {
  661. ErrorResponse_ = parsedResponse.GetRef();
  662. } else {
  663. ErrorResponse_ = TErrorResponse(TYtError(errorString + " - X-YT-Error is missing in headers"), Context_.RequestId);
  664. }
  665. break;
  666. }
  667. }
  668. }
  669. THttpResponse::~THttpResponse()
  670. { }
  671. const THttpHeaders& THttpResponse::Headers() const
  672. {
  673. return HttpInput_->Headers();
  674. }
  675. void THttpResponse::CheckErrorResponse() const
  676. {
  677. if (ErrorResponse_) {
  678. throw *ErrorResponse_;
  679. }
  680. }
  681. bool THttpResponse::IsExhausted() const
  682. {
  683. return IsExhausted_;
  684. }
  685. int THttpResponse::GetHttpCode() const
  686. {
  687. return HttpCode_;
  688. }
  689. const TString& THttpResponse::GetHostName() const
  690. {
  691. return Context_.HostName;
  692. }
  693. bool THttpResponse::IsKeepAlive() const
  694. {
  695. return HttpInput_->IsKeepAlive();
  696. }
  697. TMaybe<TErrorResponse> THttpResponse::ParseError(const THttpHeaders& headers)
  698. {
  699. for (const auto& header : headers) {
  700. if (header.Name() == "X-YT-Error") {
  701. TYtError error;
  702. error.ParseFrom(header.Value());
  703. TErrorResponse errorResponse(std::move(error), Context_.RequestId);
  704. if (errorResponse.IsOk()) {
  705. return Nothing();
  706. }
  707. return errorResponse;
  708. }
  709. }
  710. return Nothing();
  711. }
  712. size_t THttpResponse::DoRead(void* buf, size_t len)
  713. {
  714. size_t read;
  715. if (Unframe_) {
  716. read = UnframeRead(buf, len);
  717. } else {
  718. read = HttpInput_->Read(buf, len);
  719. }
  720. if (read == 0 && len != 0) {
  721. // THttpInput MUST return defined (but may be empty)
  722. // trailers when it is exhausted.
  723. Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(),
  724. "trailers MUST be defined for exhausted stream");
  725. CheckTrailers(HttpInput_->Trailers().GetRef());
  726. IsExhausted_ = true;
  727. }
  728. return read;
  729. }
  730. size_t THttpResponse::DoSkip(size_t len)
  731. {
  732. size_t skipped;
  733. if (Unframe_) {
  734. skipped = UnframeSkip(len);
  735. } else {
  736. skipped = HttpInput_->Skip(len);
  737. }
  738. if (skipped == 0 && len != 0) {
  739. // THttpInput MUST return defined (but may be empty)
  740. // trailers when it is exhausted.
  741. Y_ABORT_UNLESS(HttpInput_->Trailers().Defined(),
  742. "trailers MUST be defined for exhausted stream");
  743. CheckTrailers(HttpInput_->Trailers().GetRef());
  744. IsExhausted_ = true;
  745. }
  746. return skipped;
  747. }
  748. void THttpResponse::CheckTrailers(const THttpHeaders& trailers)
  749. {
  750. if (auto errorResponse = ParseError(trailers)) {
  751. errorResponse->SetIsFromTrailers(true);
  752. YT_LOG_ERROR("RSP %v - %v",
  753. Context_.RequestId,
  754. errorResponse.GetRef().what());
  755. ythrow errorResponse.GetRef();
  756. }
  757. }
  758. static ui32 ReadDataFrameSize(IInputStream* stream)
  759. {
  760. ui32 littleEndianSize;
  761. auto read = stream->Load(&littleEndianSize, sizeof(littleEndianSize));
  762. if (read < sizeof(littleEndianSize)) {
  763. ythrow yexception() << "Bad data frame header: " <<
  764. "expected " << sizeof(littleEndianSize) << " bytes, got " << read;
  765. }
  766. return LittleToHost(littleEndianSize);
  767. }
  768. bool THttpResponse::RefreshFrameIfNecessary()
  769. {
  770. while (RemainingFrameSize_ == 0) {
  771. ui8 frameTypeByte;
  772. auto read = HttpInput_->Read(&frameTypeByte, sizeof(frameTypeByte));
  773. if (read == 0) {
  774. return false;
  775. }
  776. auto frameType = static_cast<EFrameType>(frameTypeByte);
  777. switch (frameType) {
  778. case EFrameType::KeepAlive:
  779. break;
  780. case EFrameType::Data:
  781. RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.get());
  782. break;
  783. default:
  784. ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte);
  785. }
  786. }
  787. return true;
  788. }
  789. size_t THttpResponse::UnframeRead(void* buf, size_t len)
  790. {
  791. if (!RefreshFrameIfNecessary()) {
  792. return 0;
  793. }
  794. auto read = HttpInput_->Read(buf, Min(len, RemainingFrameSize_));
  795. RemainingFrameSize_ -= read;
  796. return read;
  797. }
  798. size_t THttpResponse::UnframeSkip(size_t len)
  799. {
  800. if (!RefreshFrameIfNecessary()) {
  801. return 0;
  802. }
  803. auto skipped = HttpInput_->Skip(Min(len, RemainingFrameSize_));
  804. RemainingFrameSize_ -= skipped;
  805. return skipped;
  806. }
  807. ////////////////////////////////////////////////////////////////////////////////
  808. THttpRequest::THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout)
  809. : Context_(TRequestContext{
  810. .RequestId = std::move(requestId),
  811. .HostName = std::move(hostName),
  812. .Method = header.GetUrl(/*needProxy=*/ false)
  813. })
  814. , Header_(std::move(header))
  815. , Url_(Header_.GetUrl(true))
  816. , SocketTimeout_(socketTimeout)
  817. { }
  818. THttpRequest::~THttpRequest()
  819. {
  820. if (!Connection_) {
  821. return;
  822. }
  823. if (Input_ && Input_->IsKeepAlive() && Input_->IsExhausted()) {
  824. // We should return to the pool only connections where HTTP response was fully read.
  825. // Otherwise next reader might read our remaining data and misinterpret them (YT-6510).
  826. TConnectionPool::Get()->Release(Connection_);
  827. } else {
  828. TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_);
  829. }
  830. }
  831. TString THttpRequest::GetRequestId() const
  832. {
  833. return Context_.RequestId;
  834. }
  835. IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters)
  836. {
  837. YT_LOG_DEBUG("REQ %v - requesting connection to %v from connection pool",
  838. Context_.RequestId,
  839. Context_.HostName);
  840. StartTime_ = TInstant::Now();
  841. try {
  842. Connection_ = TConnectionPool::Get()->Connect(Context_.HostName, SocketTimeout_);
  843. } catch (const std::exception& ex) {
  844. auto wrapped = WrapSystemError(Context_, ex);
  845. std::rethrow_exception(wrapped);
  846. }
  847. YT_LOG_DEBUG("REQ %v - connection #%v",
  848. Context_.RequestId,
  849. Connection_->Id);
  850. auto strHeader = Header_.GetHeaderAsString(Context_.HostName, Context_.RequestId, includeParameters);
  851. LogRequest(Header_, Url_, includeParameters, Context_.RequestId, Context_.HostName);
  852. LoggedAttributes_ = GetLoggedAttributes(Header_, Url_, includeParameters, 128);
  853. auto outputFormat = Header_.GetOutputFormat();
  854. if (outputFormat && outputFormat->IsTextYson()) {
  855. LogResponse_ = true;
  856. }
  857. RequestStream_ = std::make_unique<TRequestStream>(this, *Connection_->Socket.get());
  858. RequestStream_->Write(strHeader.data(), strHeader.size());
  859. return RequestStream_.get();
  860. }
  861. IOutputStream* THttpRequest::StartRequest()
  862. {
  863. return StartRequestImpl(true);
  864. }
  865. void THttpRequest::FinishRequest()
  866. {
  867. RequestStream_->Flush();
  868. RequestStream_->Finish();
  869. }
  870. void THttpRequest::SmallRequest(TMaybe<TStringBuf> body)
  871. {
  872. if (!body && (Header_.GetMethod() == "PUT" || Header_.GetMethod() == "POST")) {
  873. const auto& parameters = Header_.GetParameters();
  874. auto parametersStr = NodeToYsonString(parameters);
  875. auto* output = StartRequestImpl(false);
  876. output->Write(parametersStr);
  877. FinishRequest();
  878. } else {
  879. auto* output = StartRequest();
  880. if (body) {
  881. output->Write(*body);
  882. }
  883. FinishRequest();
  884. }
  885. }
  886. THttpResponse* THttpRequest::GetResponseStream()
  887. {
  888. if (!Input_) {
  889. SocketInput_ = std::make_unique<TSocketInput>(*Connection_->Socket.get());
  890. if (TConfig::Get()->UseAbortableResponse) {
  891. Y_ABORT_UNLESS(!Url_.empty());
  892. Input_ = std::make_unique<TAbortableHttpResponse>(Context_, SocketInput_.get(), Url_);
  893. } else {
  894. Input_ = std::make_unique<THttpResponse>(Context_, SocketInput_.get());
  895. }
  896. Input_->CheckErrorResponse();
  897. }
  898. return Input_.get();
  899. }
  900. TString THttpRequest::GetResponse()
  901. {
  902. TString result = GetResponseStream()->ReadAll();
  903. TStringStream loggedAttributes;
  904. loggedAttributes
  905. << "Time: " << TInstant::Now() - StartTime_ << "; "
  906. << "HostName: " << GetResponseStream()->GetHostName() << "; "
  907. << LoggedAttributes_;
  908. if (LogResponse_) {
  909. constexpr auto sizeLimit = 1 << 7;
  910. YT_LOG_DEBUG("RSP %v - received response (Response: '%v'; %v)",
  911. Context_.RequestId,
  912. TruncateForLogs(result, sizeLimit),
  913. loggedAttributes.Str());
  914. } else {
  915. YT_LOG_DEBUG("RSP %v - received response of %v bytes (%v)",
  916. Context_.RequestId,
  917. result.size(),
  918. loggedAttributes.Str());
  919. }
  920. return result;
  921. }
  922. int THttpRequest::GetHttpCode() {
  923. return GetResponseStream()->GetHttpCode();
  924. }
  925. void THttpRequest::InvalidateConnection()
  926. {
  927. TConnectionPool::Get()->Invalidate(Context_.HostName, Connection_);
  928. Connection_.Reset();
  929. }
  930. ////////////////////////////////////////////////////////////////////////////////
  931. } // namespace NYT