abortable_http_response.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. #include "abortable_http_response.h"
  2. #include <util/system/mutex.h>
  3. #include <util/generic/singleton.h>
  4. #include <util/generic/hash_set.h>
  5. namespace NYT {
  6. ////////////////////////////////////////////////////////////////////////////////
  7. class TAbortableHttpResponseRegistry {
  8. public:
  9. TOutageId StartOutage(TString urlPattern, const TOutageOptions& options)
  10. {
  11. auto g = Guard(Lock_);
  12. auto id = NextId_++;
  13. IdToOutage_.emplace(id, TOutageEntry{std::move(urlPattern), options.ResponseCount_, options.LengthLimit_});
  14. return id;
  15. }
  16. void StopOutage(TOutageId id)
  17. {
  18. auto g = Guard(Lock_);
  19. IdToOutage_.erase(id);
  20. }
  21. void Add(IAbortableHttpResponse* response)
  22. {
  23. auto g = Guard(Lock_);
  24. for (auto& [id, entry] : IdToOutage_) {
  25. if (entry.Counter > 0 && response->GetUrl().find(entry.Pattern) != TString::npos) {
  26. response->SetLengthLimit(entry.LengthLimit);
  27. entry.Counter -= 1;
  28. }
  29. }
  30. ResponseList_.PushBack(response);
  31. }
  32. void Remove(IAbortableHttpResponse* response)
  33. {
  34. auto g = Guard(Lock_);
  35. response->Unlink();
  36. }
  37. static TAbortableHttpResponseRegistry& Get()
  38. {
  39. return *Singleton<TAbortableHttpResponseRegistry>();
  40. }
  41. int AbortAll(const TString& urlPattern)
  42. {
  43. int result = 0;
  44. for (auto& response : ResponseList_) {
  45. if (!response.IsAborted() && response.GetUrl().find(urlPattern) != TString::npos) {
  46. response.Abort();
  47. ++result;
  48. }
  49. }
  50. return result;
  51. }
  52. private:
  53. struct TOutageEntry
  54. {
  55. TString Pattern;
  56. size_t Counter;
  57. size_t LengthLimit;
  58. };
  59. private:
  60. TOutageId NextId_ = 0;
  61. TIntrusiveList<IAbortableHttpResponse> ResponseList_;
  62. THashMap<TOutageId, TOutageEntry> IdToOutage_;
  63. TMutex Lock_;
  64. };
  65. ////////////////////////////////////////////////////////////////////////////////
  66. TAbortableHttpResponse::TOutage::TOutage(
  67. TString urlPattern,
  68. TAbortableHttpResponseRegistry& registry,
  69. const TOutageOptions& options)
  70. : UrlPattern_(std::move(urlPattern))
  71. , Registry_(registry)
  72. , Id_(registry.StartOutage(UrlPattern_, options))
  73. { }
  74. TAbortableHttpResponse::TOutage::~TOutage()
  75. {
  76. Stop();
  77. }
  78. void TAbortableHttpResponse::TOutage::Stop()
  79. {
  80. if (!Stopped_) {
  81. Registry_.StopOutage(Id_);
  82. Stopped_ = true;
  83. }
  84. }
  85. ////////////////////////////////////////////////////////////////////////////////
  86. TAbortableHttpResponseBase::TAbortableHttpResponseBase(const TString& url)
  87. : Url_(url)
  88. {
  89. TAbortableHttpResponseRegistry::Get().Add(this);
  90. }
  91. TAbortableHttpResponseBase::~TAbortableHttpResponseBase()
  92. {
  93. TAbortableHttpResponseRegistry::Get().Remove(this);
  94. }
  95. void TAbortableHttpResponseBase::Abort()
  96. {
  97. Aborted_ = true;
  98. }
  99. void TAbortableHttpResponseBase::SetLengthLimit(size_t limit)
  100. {
  101. LengthLimit_ = limit;
  102. if (LengthLimit_ == 0) {
  103. Abort();
  104. }
  105. }
  106. const TString& TAbortableHttpResponseBase::GetUrl() const
  107. {
  108. return Url_;
  109. }
  110. bool TAbortableHttpResponseBase::IsAborted() const
  111. {
  112. return Aborted_;
  113. }
  114. ////////////////////////////////////////////////////////////////////////////////
  115. TAbortableHttpResponse::TAbortableHttpResponse(
  116. TRequestContext context,
  117. IInputStream* socketStream,
  118. const TString& url)
  119. : THttpResponse(std::move(context), socketStream)
  120. , TAbortableHttpResponseBase(url)
  121. {
  122. }
  123. size_t TAbortableHttpResponse::DoRead(void* buf, size_t len)
  124. {
  125. if (Aborted_) {
  126. ythrow TAbortedForTestPurpose() << "response was aborted";
  127. }
  128. len = std::min(len, LengthLimit_);
  129. auto read = THttpResponse::DoRead(buf, len);
  130. LengthLimit_ -= read;
  131. if (LengthLimit_ == 0) {
  132. Abort();
  133. }
  134. return read;
  135. }
  136. size_t TAbortableHttpResponse::DoSkip(size_t len)
  137. {
  138. if (Aborted_) {
  139. ythrow TAbortedForTestPurpose() << "response was aborted";
  140. }
  141. return THttpResponse::DoSkip(len);
  142. }
  143. int TAbortableHttpResponse::AbortAll(const TString& urlPattern)
  144. {
  145. return TAbortableHttpResponseRegistry::Get().AbortAll(urlPattern);
  146. }
  147. TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage(
  148. const TString& urlPattern,
  149. const TOutageOptions& options)
  150. {
  151. return TOutage(urlPattern, TAbortableHttpResponseRegistry::Get(), options);
  152. }
  153. TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage(
  154. const TString& urlPattern,
  155. size_t responseCount)
  156. {
  157. return StartOutage(urlPattern, TOutageOptions().ResponseCount(responseCount));
  158. }
  159. TAbortableCoreHttpResponse::TAbortableCoreHttpResponse(
  160. std::unique_ptr<IInputStream> stream,
  161. const TString& url)
  162. : TAbortableHttpResponseBase(url)
  163. , Stream_(std::move(stream))
  164. {
  165. }
  166. size_t TAbortableCoreHttpResponse::DoRead(void* buf, size_t len)
  167. {
  168. if (Aborted_) {
  169. ythrow TAbortedForTestPurpose() << "response was aborted";
  170. }
  171. len = std::min(len, LengthLimit_);
  172. auto read = Stream_->Read(buf, len);
  173. LengthLimit_ -= read;
  174. if (LengthLimit_ == 0) {
  175. Abort();
  176. }
  177. return read;
  178. }
  179. size_t TAbortableCoreHttpResponse::DoSkip(size_t len)
  180. {
  181. if (Aborted_) {
  182. ythrow TAbortedForTestPurpose() << "response was aborted";
  183. }
  184. return Stream_->Skip(len);
  185. }
  186. ////////////////////////////////////////////////////////////////////////////////
  187. } // namespace NYT