http.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. #pragma once
  2. #include "fwd.h"
  3. #include <yt/cpp/mapreduce/interface/common.h>
  4. #include <yt/cpp/mapreduce/interface/errors.h>
  5. #include <yt/cpp/mapreduce/interface/format.h>
  6. #include <yt/cpp/mapreduce/interface/io.h>
  7. #include <yt/cpp/mapreduce/interface/node.h>
  8. #include <library/cpp/deprecated/atomic/atomic.h>
  9. #include <library/cpp/http/io/stream.h>
  10. #include <util/generic/hash.h>
  11. #include <util/generic/hash_multi_map.h>
  12. #include <util/generic/strbuf.h>
  13. #include <util/generic/guid.h>
  14. #include <util/network/socket.h>
  15. #include <util/stream/input.h>
  16. #include <util/system/mutex.h>
  17. #include <util/system/rwlock.h>
  18. #include <util/generic/ptr.h>
  19. namespace NYT {
  20. class TNode;
  21. namespace NHttp {
  22. struct THeadersPtrWrapper;
  23. } // NHttp
  24. ///////////////////////////////////////////////////////////////////////////////
  25. enum class EFrameType
  26. {
  27. Data = 0x01,
  28. KeepAlive = 0x02,
  29. };
  30. class THttpHeader
  31. {
  32. public:
  33. THttpHeader(const TString& method, const TString& command, bool isApi = true);
  34. void AddParameter(const TString& key, TNode value, bool overwrite = false);
  35. void RemoveParameter(const TString& key);
  36. void MergeParameters(const TNode& parameters, bool overwrite = false);
  37. TNode GetParameters() const;
  38. void AddTransactionId(const TTransactionId& transactionId, bool overwrite = false);
  39. void AddPath(const TString& path, bool overwrite = false);
  40. void AddOperationId(const TOperationId& operationId, bool overwrite = false);
  41. void AddMutationId();
  42. bool HasMutationId() const;
  43. void SetToken(const TString& token);
  44. void SetProxyAddress(const TString& proxyAddress);
  45. void SetHostPort(const TString& hostPort);
  46. void SetImpersonationUser(const TString& impersonationUser);
  47. void SetServiceTicket(const TString& ticket);
  48. void SetInputFormat(const TMaybe<TFormat>& format);
  49. void SetOutputFormat(const TMaybe<TFormat>& format);
  50. TMaybe<TFormat> GetOutputFormat() const;
  51. void SetRequestCompression(const TString& compression);
  52. void SetResponseCompression(const TString& compression);
  53. TString GetCommand() const;
  54. TString GetUrl(bool needProxy = false) const;
  55. TString GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters = true) const;
  56. NHttp::THeadersPtrWrapper GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const;
  57. const TString& GetMethod() const;
  58. private:
  59. bool ShouldAcceptFraming() const;
  60. private:
  61. const TString Method;
  62. const TString Command;
  63. const bool IsApi;
  64. TNode::TMapType Parameters;
  65. TString ImpersonationUser;
  66. TString Token;
  67. TString ServiceTicket;
  68. TNode Attributes;
  69. TString ProxyAddress;
  70. TString HostPort;
  71. private:
  72. TMaybe<TFormat> InputFormat = TFormat::YsonText();
  73. TMaybe<TFormat> OutputFormat = TFormat::YsonText();
  74. TString RequestCompression = "identity";
  75. TString ResponseCompression = "identity";
  76. };
  77. ////////////////////////////////////////////////////////////////////////////////
  78. class TAddressCache
  79. {
  80. public:
  81. using TAddressPtr = TAtomicSharedPtr<TNetworkAddress>;
  82. static TAddressCache* Get();
  83. TAddressPtr Resolve(const TString& hostName);
  84. private:
  85. struct TCacheEntry {
  86. TAddressPtr Address;
  87. TInstant ExpirationTime;
  88. };
  89. private:
  90. TAddressPtr FindAddress(const TString& hostName) const;
  91. void AddAddress(TString hostName, TAddressPtr address);
  92. private:
  93. TRWMutex Lock_;
  94. THashMap<TString, TCacheEntry> Cache_;
  95. };
  96. ////////////////////////////////////////////////////////////////////////////////
  97. struct TConnection
  98. {
  99. THolder<TSocket> Socket;
  100. TAtomic Busy = 1;
  101. TInstant DeadLine;
  102. ui32 Id;
  103. };
  104. using TConnectionPtr = TAtomicSharedPtr<TConnection>;
  105. class TConnectionPool
  106. {
  107. public:
  108. using TConnectionMap = THashMultiMap<TString, TConnectionPtr>;
  109. static TConnectionPool* Get();
  110. TConnectionPtr Connect(const TString& hostName, TDuration socketTimeout);
  111. void Release(TConnectionPtr connection);
  112. void Invalidate(const TString& hostName, TConnectionPtr connection);
  113. private:
  114. void Refresh();
  115. static SOCKET DoConnect(TAddressCache::TAddressPtr address);
  116. private:
  117. TMutex Lock_;
  118. TConnectionMap Connections_;
  119. };
  120. ////////////////////////////////////////////////////////////////////////////////
  121. //
  122. // Input stream that handles YT-specific header/trailer errors
  123. // and throws TErrorResponse if it finds any.
  124. class THttpResponse
  125. : public IInputStream
  126. {
  127. public:
  128. // 'requestId' and 'hostName' are provided for debug reasons
  129. // (they will appear in some error messages).
  130. THttpResponse(
  131. IInputStream* socketStream,
  132. const TString& requestId,
  133. const TString& hostName);
  134. const THttpHeaders& Headers() const;
  135. void CheckErrorResponse() const;
  136. bool IsExhausted() const;
  137. int GetHttpCode() const;
  138. const TString& GetHostName() const;
  139. bool IsKeepAlive() const;
  140. protected:
  141. size_t DoRead(void* buf, size_t len) override;
  142. size_t DoSkip(size_t len) override;
  143. private:
  144. void CheckTrailers(const THttpHeaders& trailers);
  145. TMaybe<TErrorResponse> ParseError(const THttpHeaders& headers);
  146. size_t UnframeRead(void* buf, size_t len);
  147. size_t UnframeSkip(size_t len);
  148. bool RefreshFrameIfNecessary();
  149. private:
  150. THttpInput HttpInput_;
  151. const TString RequestId_;
  152. const TString HostName_;
  153. int HttpCode_ = 0;
  154. TMaybe<TErrorResponse> ErrorResponse_;
  155. bool IsExhausted_ = false;
  156. const bool Unframe_;
  157. size_t RemainingFrameSize_ = 0;
  158. };
  159. ////////////////////////////////////////////////////////////////////////////////
  160. class THttpRequest
  161. {
  162. public:
  163. THttpRequest();
  164. THttpRequest(const TString& requestId);
  165. ~THttpRequest();
  166. TString GetRequestId() const;
  167. void Connect(TString hostName, TDuration socketTimeout = TDuration::Zero());
  168. IOutputStream* StartRequest(const THttpHeader& header);
  169. void FinishRequest();
  170. void SmallRequest(const THttpHeader& header, TMaybe<TStringBuf> body);
  171. THttpResponse* GetResponseStream();
  172. TString GetResponse();
  173. void InvalidateConnection();
  174. int GetHttpCode();
  175. private:
  176. IOutputStream* StartRequestImpl(const THttpHeader& header, bool includeParameters);
  177. private:
  178. class TRequestStream;
  179. private:
  180. TString HostName;
  181. TString RequestId;
  182. TString Url_;
  183. TInstant StartTime_;
  184. TString LoggedAttributes_;
  185. TConnectionPtr Connection;
  186. THolder<TRequestStream> RequestStream_;
  187. THolder<TSocketInput> SocketInput;
  188. THolder<THttpResponse> Input;
  189. bool LogResponse = false;
  190. };
  191. ////////////////////////////////////////////////////////////////////////////////
  192. } // namespace NYT