http.h 6.7 KB


  1. #pragma once
  2. #include "fwd.h"
  3. #include <yt/cpp/mapreduce/interface/common.h>
  4. #include <yt/cpp/mapreduce/interface/errors.h>
  5. #include <yt/cpp/mapreduce/interface/format.h>
  6. #include <yt/cpp/mapreduce/interface/io.h>
  7. #include <yt/cpp/mapreduce/interface/node.h>
  8. #include <library/cpp/deprecated/atomic/atomic.h>
  9. #include <library/cpp/http/io/stream.h>
  10. #include <util/generic/hash.h>
  11. #include <util/generic/hash_multi_map.h>
  12. #include <util/generic/strbuf.h>
  13. #include <util/generic/guid.h>
  14. #include <util/network/socket.h>
  15. #include <util/stream/input.h>
  16. #include <util/system/mutex.h>
  17. #include <util/system/rwlock.h>
  18. #include <util/generic/ptr.h>
  19. namespace NYT {
  20. class TNode;
  21. namespace NHttp {
  22. struct THeadersPtrWrapper;
  23. } // NHttp
  24. ////////////////////////////////////////////////////////////////////////////////
  25. enum class EFrameType
  26. {
  27. Data = 0x01,
  28. KeepAlive = 0x02,
  29. };
  30. struct TRequestContext
  31. {
  32. TString RequestId;
  33. TString HostName;
  34. TString Method;
  35. };
  36. class THttpHeader
  37. {
  38. public:
  39. THttpHeader(const TString& method, const TString& command, bool isApi = true);
  40. void AddParameter(const TString& key, TNode value, bool overwrite = false);
  41. void RemoveParameter(const TString& key);
  42. void MergeParameters(const TNode& parameters, bool overwrite = false);
  43. TNode GetParameters() const;
  44. void AddTransactionId(const TTransactionId& transactionId, bool overwrite = false);
  45. void AddPath(const TString& path, bool overwrite = false);
  46. void AddOperationId(const TOperationId& operationId, bool overwrite = false);
  47. TMutationId AddMutationId();
  48. bool HasMutationId() const;
  49. void SetMutationId(TMutationId mutationId);
  50. void SetToken(const TString& token);
  51. void SetProxyAddress(const TString& proxyAddress);
  52. void SetHostPort(const TString& hostPort);
  53. void SetImpersonationUser(const TString& impersonationUser);
  54. void SetServiceTicket(const TString& ticket);
  55. void SetInputFormat(const TMaybe<TFormat>& format);
  56. void SetOutputFormat(const TMaybe<TFormat>& format);
  57. TMaybe<TFormat> GetOutputFormat() const;
  58. void SetRequestCompression(const TString& compression);
  59. void SetResponseCompression(const TString& compression);
  60. TString GetCommand() const;
  61. TString GetUrl(bool needProxy = false) const;
  62. TString GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters = true) const;
  63. NHttp::THeadersPtrWrapper GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const;
  64. const TString& GetMethod() const;
  65. private:
  66. bool ShouldAcceptFraming() const;
  67. private:
  68. const TString Method_;
  69. const TString Command_;
  70. const bool IsApi_;
  71. TNode::TMapType Parameters_;
  72. TString ImpersonationUser_;
  73. TString Token_;
  74. TString ServiceTicket_;
  75. TNode Attributes_;
  76. TString ProxyAddress_;
  77. TString HostPort_;
  78. TMaybe<TFormat> InputFormat_ = TFormat::YsonText();
  79. TMaybe<TFormat> OutputFormat_ = TFormat::YsonText();
  80. TString RequestCompression_ = "identity";
  81. TString ResponseCompression_ = "identity";
  82. };
  83. ////////////////////////////////////////////////////////////////////////////////
  84. class TAddressCache
  85. {
  86. public:
  87. using TAddressPtr = TAtomicSharedPtr<TNetworkAddress>;
  88. static TAddressCache* Get();
  89. TAddressPtr Resolve(const TString& hostName);
  90. private:
  91. struct TCacheEntry {
  92. TAddressPtr Address;
  93. TInstant ExpirationTime;
  94. };
  95. private:
  96. TAddressPtr FindAddress(const TString& hostName) const;
  97. void AddAddress(TString hostName, TAddressPtr address);
  98. private:
  99. TRWMutex Lock_;
  100. THashMap<TString, TCacheEntry> Cache_;
  101. };
  102. ////////////////////////////////////////////////////////////////////////////////
  103. struct TConnection
  104. {
  105. std::unique_ptr<TSocket> Socket;
  106. TAtomic Busy = 1;
  107. TInstant DeadLine;
  108. ui32 Id;
  109. };
  110. using TConnectionPtr = TAtomicSharedPtr<TConnection>;
  111. class TConnectionPool
  112. {
  113. public:
  114. using TConnectionMap = THashMultiMap<TString, TConnectionPtr>;
  115. static TConnectionPool* Get();
  116. TConnectionPtr Connect(const TString& hostName, TDuration socketTimeout);
  117. void Release(TConnectionPtr connection);
  118. void Invalidate(const TString& hostName, TConnectionPtr connection);
  119. private:
  120. void Refresh();
  121. static SOCKET DoConnect(TAddressCache::TAddressPtr address);
  122. private:
  123. TMutex Lock_;
  124. TConnectionMap Connections_;
  125. };
  126. ////////////////////////////////////////////////////////////////////////////////
  127. //
  128. // Input stream that handles YT-specific header/trailer errors
  129. // and throws TErrorResponse if it finds any.
  130. class THttpResponse
  131. : public IInputStream
  132. {
  133. public:
  134. // 'requestId' and 'hostName' are provided for debug reasons
  135. // (they will appear in some error messages).
  136. THttpResponse(
  137. TRequestContext context,
  138. IInputStream* socketStream);
  139. ~THttpResponse();
  140. const THttpHeaders& Headers() const;
  141. void CheckErrorResponse() const;
  142. bool IsExhausted() const;
  143. int GetHttpCode() const;
  144. const TString& GetHostName() const;
  145. bool IsKeepAlive() const;
  146. protected:
  147. size_t DoRead(void* buf, size_t len) override;
  148. size_t DoSkip(size_t len) override;
  149. private:
  150. void CheckTrailers(const THttpHeaders& trailers);
  151. TMaybe<TErrorResponse> ParseError(const THttpHeaders& headers);
  152. size_t UnframeRead(void* buf, size_t len);
  153. size_t UnframeSkip(size_t len);
  154. bool RefreshFrameIfNecessary();
  155. private:
  156. class THttpInputWrapped;
  157. private:
  158. std::unique_ptr<THttpInputWrapped> HttpInput_;
  159. const bool Unframe_;
  160. TRequestContext Context_;
  161. int HttpCode_ = 0;
  162. TMaybe<TErrorResponse> ErrorResponse_;
  163. bool IsExhausted_ = false;
  164. size_t RemainingFrameSize_ = 0;
  165. };
  166. ////////////////////////////////////////////////////////////////////////////////
  167. class THttpRequest
  168. {
  169. public:
  170. THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout);
  171. ~THttpRequest();
  172. TString GetRequestId() const;
  173. IOutputStream* StartRequest();
  174. void FinishRequest();
  175. void SmallRequest(TMaybe<TStringBuf> body);
  176. THttpResponse* GetResponseStream();
  177. TString GetResponse();
  178. void InvalidateConnection();
  179. int GetHttpCode();
  180. private:
  181. IOutputStream* StartRequestImpl(bool includeParameters);
  182. private:
  183. class TRequestStream;
  184. private:
  185. const TRequestContext Context_;
  186. const THttpHeader Header_;
  187. const TString Url_;
  188. const TDuration SocketTimeout_;
  189. TInstant StartTime_;
  190. TString LoggedAttributes_;
  191. TConnectionPtr Connection_;
  192. std::unique_ptr<TRequestStream> RequestStream_;
  193. std::unique_ptr<TSocketInput> SocketInput_;
  194. std::unique_ptr<THttpResponse> Input_;
  195. bool LogResponse_ = false;
  196. };
  197. ////////////////////////////////////////////////////////////////////////////////
  198. } // namespace NYT