http.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. #pragma once
  2. #include "fwd.h"
  3. #include <yt/cpp/mapreduce/interface/common.h>
  4. #include <yt/cpp/mapreduce/interface/errors.h>
  5. #include <yt/cpp/mapreduce/interface/format.h>
  6. #include <yt/cpp/mapreduce/interface/io.h>
  7. #include <yt/cpp/mapreduce/interface/node.h>
  8. #include <library/cpp/deprecated/atomic/atomic.h>
  9. #include <library/cpp/http/io/stream.h>
  10. #include <util/generic/hash.h>
  11. #include <util/generic/hash_multi_map.h>
  12. #include <util/generic/strbuf.h>
  13. #include <util/generic/guid.h>
  14. #include <util/network/socket.h>
  15. #include <util/stream/input.h>
  16. #include <util/system/mutex.h>
  17. #include <util/system/rwlock.h>
  18. #include <util/generic/ptr.h>
  19. namespace NYT {
  20. class TNode;
  21. namespace NHttp {
  22. struct THeadersPtrWrapper;
  23. } // NHttp
  24. ////////////////////////////////////////////////////////////////////////////////
  25. enum class EFrameType
  26. {
  27. Data = 0x01,
  28. KeepAlive = 0x02,
  29. };
  30. struct TRequestContext
  31. {
  32. TString RequestId;
  33. TString HostName;
  34. TString Method;
  35. };
  36. class THttpHeader
  37. {
  38. public:
  39. THttpHeader(const TString& method, const TString& command, bool isApi = true);
  40. void AddParameter(const TString& key, TNode value, bool overwrite = false);
  41. void RemoveParameter(const TString& key);
  42. void MergeParameters(const TNode& parameters, bool overwrite = false);
  43. TNode GetParameters() const;
  44. void AddTransactionId(const TTransactionId& transactionId, bool overwrite = false);
  45. void AddPath(const TString& path, bool overwrite = false);
  46. void AddOperationId(const TOperationId& operationId, bool overwrite = false);
  47. void AddMutationId();
  48. bool HasMutationId() const;
  49. void SetToken(const TString& token);
  50. void SetProxyAddress(const TString& proxyAddress);
  51. void SetHostPort(const TString& hostPort);
  52. void SetImpersonationUser(const TString& impersonationUser);
  53. void SetServiceTicket(const TString& ticket);
  54. void SetInputFormat(const TMaybe<TFormat>& format);
  55. void SetOutputFormat(const TMaybe<TFormat>& format);
  56. TMaybe<TFormat> GetOutputFormat() const;
  57. void SetRequestCompression(const TString& compression);
  58. void SetResponseCompression(const TString& compression);
  59. TString GetCommand() const;
  60. TString GetUrl(bool needProxy = false) const;
  61. TString GetHeaderAsString(const TString& hostName, const TString& requestId, bool includeParameters = true) const;
  62. NHttp::THeadersPtrWrapper GetHeader(const TString& hostName, const TString& requestId, bool includeParameters) const;
  63. const TString& GetMethod() const;
  64. private:
  65. bool ShouldAcceptFraming() const;
  66. private:
  67. const TString Method_;
  68. const TString Command_;
  69. const bool IsApi_;
  70. TNode::TMapType Parameters_;
  71. TString ImpersonationUser_;
  72. TString Token_;
  73. TString ServiceTicket_;
  74. TNode Attributes_;
  75. TString ProxyAddress_;
  76. TString HostPort_;
  77. TMaybe<TFormat> InputFormat_ = TFormat::YsonText();
  78. TMaybe<TFormat> OutputFormat_ = TFormat::YsonText();
  79. TString RequestCompression_ = "identity";
  80. TString ResponseCompression_ = "identity";
  81. };
  82. ////////////////////////////////////////////////////////////////////////////////
  83. class TAddressCache
  84. {
  85. public:
  86. using TAddressPtr = TAtomicSharedPtr<TNetworkAddress>;
  87. static TAddressCache* Get();
  88. TAddressPtr Resolve(const TString& hostName);
  89. private:
  90. struct TCacheEntry {
  91. TAddressPtr Address;
  92. TInstant ExpirationTime;
  93. };
  94. private:
  95. TAddressPtr FindAddress(const TString& hostName) const;
  96. void AddAddress(TString hostName, TAddressPtr address);
  97. private:
  98. TRWMutex Lock_;
  99. THashMap<TString, TCacheEntry> Cache_;
  100. };
  101. ////////////////////////////////////////////////////////////////////////////////
  102. struct TConnection
  103. {
  104. THolder<TSocket> Socket;
  105. TAtomic Busy = 1;
  106. TInstant DeadLine;
  107. ui32 Id;
  108. };
  109. using TConnectionPtr = TAtomicSharedPtr<TConnection>;
  110. class TConnectionPool
  111. {
  112. public:
  113. using TConnectionMap = THashMultiMap<TString, TConnectionPtr>;
  114. static TConnectionPool* Get();
  115. TConnectionPtr Connect(const TString& hostName, TDuration socketTimeout);
  116. void Release(TConnectionPtr connection);
  117. void Invalidate(const TString& hostName, TConnectionPtr connection);
  118. private:
  119. void Refresh();
  120. static SOCKET DoConnect(TAddressCache::TAddressPtr address);
  121. private:
  122. TMutex Lock_;
  123. TConnectionMap Connections_;
  124. };
  125. ////////////////////////////////////////////////////////////////////////////////
  126. //
  127. // Input stream that handles YT-specific header/trailer errors
  128. // and throws TErrorResponse if it finds any.
  129. class THttpResponse
  130. : public IInputStream
  131. {
  132. public:
  133. // 'requestId' and 'hostName' are provided for debug reasons
  134. // (they will appear in some error messages).
  135. THttpResponse(
  136. TRequestContext context,
  137. IInputStream* socketStream);
  138. ~THttpResponse();
  139. const THttpHeaders& Headers() const;
  140. void CheckErrorResponse() const;
  141. bool IsExhausted() const;
  142. int GetHttpCode() const;
  143. const TString& GetHostName() const;
  144. bool IsKeepAlive() const;
  145. protected:
  146. size_t DoRead(void* buf, size_t len) override;
  147. size_t DoSkip(size_t len) override;
  148. private:
  149. void CheckTrailers(const THttpHeaders& trailers);
  150. TMaybe<TErrorResponse> ParseError(const THttpHeaders& headers);
  151. size_t UnframeRead(void* buf, size_t len);
  152. size_t UnframeSkip(size_t len);
  153. bool RefreshFrameIfNecessary();
  154. private:
  155. class THttpInputWrapped;
  156. private:
  157. THolder<THttpInputWrapped> HttpInput_;
  158. const bool Unframe_;
  159. TRequestContext Context_;
  160. int HttpCode_ = 0;
  161. TMaybe<TErrorResponse> ErrorResponse_;
  162. bool IsExhausted_ = false;
  163. size_t RemainingFrameSize_ = 0;
  164. };
  165. ////////////////////////////////////////////////////////////////////////////////
  166. class THttpRequest
  167. {
  168. public:
  169. THttpRequest(TString requestId, TString hostName, THttpHeader header, TDuration socketTimeout);
  170. ~THttpRequest();
  171. TString GetRequestId() const;
  172. IOutputStream* StartRequest();
  173. void FinishRequest();
  174. void SmallRequest(TMaybe<TStringBuf> body);
  175. THttpResponse* GetResponseStream();
  176. TString GetResponse();
  177. void InvalidateConnection();
  178. int GetHttpCode();
  179. private:
  180. IOutputStream* StartRequestImpl(bool includeParameters);
  181. private:
  182. class TRequestStream;
  183. private:
  184. const TRequestContext Context_;
  185. const THttpHeader Header_;
  186. const TString Url_;
  187. const TDuration SocketTimeout_;
  188. TInstant StartTime_;
  189. TString LoggedAttributes_;
  190. TConnectionPtr Connection_;
  191. THolder<TRequestStream> RequestStream_;
  192. THolder<TSocketInput> SocketInput_;
  193. THolder<THttpResponse> Input_;
  194. bool LogResponse_ = false;
  195. };
  196. ////////////////////////////////////////////////////////////////////////////////
  197. } // namespace NYT