abortable_http_response.cpp 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. #include "abortable_http_response.h"
  2. #include <util/system/mutex.h>
  3. #include <util/generic/singleton.h>
  4. #include <util/generic/hash_set.h>
  5. namespace NYT {
  6. ////////////////////////////////////////////////////////////////////////////////
  7. class TAbortableHttpResponseRegistry {
  8. public:
  9. TOutageId StartOutage(TString urlPattern, const TOutageOptions& options)
  10. {
  11. auto g = Guard(Lock_);
  12. auto id = NextId_++;
  13. IdToOutage.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_});
  14. return id;
  15. }
  16. void StopOutage(TOutageId id)
  17. {
  18. auto g = Guard(Lock_);
  19. IdToOutage.erase(id);
  20. }
  21. void Add(IAbortableHttpResponse* response)
  22. {
  23. auto g = Guard(Lock_);
  24. for (auto& [id, entry] : IdToOutage) {
  25. if (entry.Counter > 0 && response->GetUrl().find(entry.Pattern) != TString::npos) {
  26. response->SetLengthLimit(entry.LengthLimit);
  27. entry.Counter -= 1;
  28. }
  29. }
  30. ResponseList_.PushBack(response);
  31. }
  32. void Remove(IAbortableHttpResponse* response)
  33. {
  34. auto g = Guard(Lock_);
  35. response->Unlink();
  36. }
  37. static TAbortableHttpResponseRegistry& Get()
  38. {
  39. return *Singleton<TAbortableHttpResponseRegistry>();
  40. }
  41. int AbortAll(const TString& urlPattern)
  42. {
  43. int result = 0;
  44. for (auto& response : ResponseList_) {
  45. if (!response.IsAborted() && response.GetUrl().find(urlPattern) != TString::npos) {
  46. response.Abort();
  47. ++result;
  48. }
  49. }
  50. return result;
  51. }
  52. private:
  53. struct TOutageEntry
  54. {
  55. TString Pattern;
  56. size_t Counter;
  57. size_t LengthLimit;
  58. };
  59. private:
  60. TOutageId NextId_ = 0;
  61. TIntrusiveList<IAbortableHttpResponse> ResponseList_;
  62. THashMap<TOutageId, TOutageEntry> IdToOutage;
  63. TMutex Lock_;
  64. };
  65. ////////////////////////////////////////////////////////////////////////////////
  66. TAbortableHttpResponse::TOutage::TOutage(
  67. TString urlPattern,
  68. TAbortableHttpResponseRegistry& registry,
  69. const TOutageOptions& options)
  70. : UrlPattern_(std::move(urlPattern))
  71. , Registry_(registry)
  72. , Id_(registry.StartOutage(UrlPattern_, options))
  73. { }
  74. TAbortableHttpResponse::TOutage::~TOutage()
  75. {
  76. Stop();
  77. }
  78. void TAbortableHttpResponse::TOutage::Stop()
  79. {
  80. if (!Stopped_) {
  81. Registry_.StopOutage(Id_);
  82. Stopped_ = true;
  83. }
  84. }
  85. ////////////////////////////////////////////////////////////////////////////////
  86. TAbortableHttpResponseBase::TAbortableHttpResponseBase(const TString& url)
  87. : Url_(url)
  88. {
  89. TAbortableHttpResponseRegistry::Get().Add(this);
  90. }
  91. TAbortableHttpResponseBase::~TAbortableHttpResponseBase()
  92. {
  93. TAbortableHttpResponseRegistry::Get().Remove(this);
  94. }
  95. void TAbortableHttpResponseBase::Abort()
  96. {
  97. Aborted_ = true;
  98. }
  99. void TAbortableHttpResponseBase::SetLengthLimit(size_t limit)
  100. {
  101. LengthLimit_ = limit;
  102. if (LengthLimit_ == 0) {
  103. Abort();
  104. }
  105. }
  106. const TString& TAbortableHttpResponseBase::GetUrl() const
  107. {
  108. return Url_;
  109. }
  110. bool TAbortableHttpResponseBase::IsAborted() const
  111. {
  112. return Aborted_;
  113. }
  114. ////////////////////////////////////////////////////////////////////////////////
  115. TAbortableHttpResponse::TAbortableHttpResponse(
  116. IInputStream* socketStream,
  117. const TString& requestId,
  118. const TString& hostName,
  119. const TString& url)
  120. : THttpResponse(socketStream, requestId, hostName)
  121. , TAbortableHttpResponseBase(url)
  122. {
  123. }
  124. size_t TAbortableHttpResponse::DoRead(void* buf, size_t len)
  125. {
  126. if (Aborted_) {
  127. ythrow TAbortedForTestPurpose() << "response was aborted";
  128. }
  129. len = std::min(len, LengthLimit_);
  130. auto read = THttpResponse::DoRead(buf, len);
  131. LengthLimit_ -= read;
  132. if (LengthLimit_ == 0) {
  133. Abort();
  134. }
  135. return read;
  136. }
  137. size_t TAbortableHttpResponse::DoSkip(size_t len)
  138. {
  139. if (Aborted_) {
  140. ythrow TAbortedForTestPurpose() << "response was aborted";
  141. }
  142. return THttpResponse::DoSkip(len);
  143. }
  144. int TAbortableHttpResponse::AbortAll(const TString& urlPattern)
  145. {
  146. return TAbortableHttpResponseRegistry::Get().AbortAll(urlPattern);
  147. }
  148. TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage(
  149. const TString& urlPattern,
  150. const TOutageOptions& options)
  151. {
  152. return TOutage(urlPattern, TAbortableHttpResponseRegistry::Get(), options);
  153. }
  154. TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage(
  155. const TString& urlPattern,
  156. size_t responseCount)
  157. {
  158. return StartOutage(urlPattern, TOutageOptions().ResponseCount(responseCount));
  159. }
  160. TAbortableCoreHttpResponse::TAbortableCoreHttpResponse(
  161. std::unique_ptr<IInputStream> stream,
  162. const TString& url)
  163. : TAbortableHttpResponseBase(url)
  164. , Stream_(std::move(stream))
  165. {
  166. }
  167. size_t TAbortableCoreHttpResponse::DoRead(void* buf, size_t len)
  168. {
  169. if (Aborted_) {
  170. ythrow TAbortedForTestPurpose() << "response was aborted";
  171. }
  172. len = std::min(len, LengthLimit_);
  173. auto read = Stream_->Read(buf, len);
  174. LengthLimit_ -= read;
  175. if (LengthLimit_ == 0) {
  176. Abort();
  177. }
  178. return read;
  179. }
  180. size_t TAbortableCoreHttpResponse::DoSkip(size_t len)
  181. {
  182. if (Aborted_) {
  183. ythrow TAbortedForTestPurpose() << "response was aborted";
  184. }
  185. return Stream_->Skip(len);
  186. }
  187. ////////////////////////////////////////////////////////////////////////////////
  188. } // namespace NYT