stream.cpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002
  1. #include "stream.h"
  2. #include "compression.h"
  3. #include "chunk.h"
  4. #include <util/stream/buffered.h>
  5. #include <util/stream/length.h>
  6. #include <util/stream/multi.h>
  7. #include <util/stream/null.h>
  8. #include <util/stream/tee.h>
  9. #include <util/system/compat.h>
  10. #include <util/system/yassert.h>
  11. #include <util/network/socket.h>
  12. #include <util/string/cast.h>
  13. #include <util/string/strip.h>
  14. #include <util/generic/string.h>
  15. #include <util/generic/utility.h>
  16. #include <util/generic/hash_set.h>
  17. #include <util/generic/yexception.h>
  18. #define HEADERCMP(header, str) \
  19. case sizeof(str) - 1: \
  20. if (!stricmp((header).Name().data(), str))
  21. namespace {
  22. inline size_t SuggestBufferSize() {
  23. return 8192;
  24. }
  25. inline TStringBuf Trim(const char* b, const char* e) noexcept {
  26. return StripString(TStringBuf(b, e));
  27. }
  28. inline TStringBuf RmSemiColon(const TStringBuf& s) {
  29. return s.Before(';');
  30. }
  31. template <class T, size_t N>
  32. class TStreams: private TNonCopyable {
  33. struct TDelete {
  34. inline void operator()(T* t) noexcept {
  35. delete t;
  36. }
  37. };
  38. typedef T* TPtr;
  39. public:
  40. inline TStreams() noexcept
  41. : Beg_(T_ + N)
  42. {
  43. }
  44. inline ~TStreams() {
  45. TDelete f;
  46. ForEach(f);
  47. }
  48. template <class S>
  49. inline S* Add(S* t) noexcept {
  50. return (S*)AddImpl((T*)t);
  51. }
  52. template <class Functor>
  53. inline void ForEach(Functor& f) {
  54. const TPtr* end = T_ + N;
  55. for (TPtr* cur = Beg_; cur != end; ++cur) {
  56. f(*cur);
  57. }
  58. }
  59. TPtr Top() {
  60. const TPtr* end = T_ + N;
  61. return end == Beg_ ? nullptr : *Beg_;
  62. }
  63. private:
  64. inline T* AddImpl(T* t) noexcept {
  65. Y_ASSERT(Beg_ > T_);
  66. return (*--Beg_ = t);
  67. }
  68. private:
  69. TPtr T_[N];
  70. TPtr* Beg_;
  71. };
  72. template <class TStream>
  73. class TLazy: public IOutputStream {
  74. public:
  75. TLazy(IOutputStream* out, ui16 bs)
  76. : Output_(out)
  77. , BlockSize_(bs)
  78. {
  79. }
  80. void DoWrite(const void* buf, size_t len) override {
  81. ConstructSlave();
  82. Slave_->Write(buf, len);
  83. }
  84. void DoFlush() override {
  85. ConstructSlave();
  86. Slave_->Flush();
  87. }
  88. void DoFinish() override {
  89. ConstructSlave();
  90. Slave_->Finish();
  91. }
  92. private:
  93. inline void ConstructSlave() {
  94. if (!Slave_) {
  95. Slave_.Reset(new TStream(Output_, BlockSize_));
  96. }
  97. }
  98. private:
  99. IOutputStream* Output_;
  100. ui16 BlockSize_;
  101. THolder<IOutputStream> Slave_;
  102. };
  103. }
  104. class THttpInput::TImpl {
  105. typedef THashSet<TString> TAcceptCodings;
  106. public:
  107. inline TImpl(IInputStream* slave)
  108. : Slave_(slave)
  109. , Buffered_(Slave_, SuggestBufferSize())
  110. , ChunkedInput_(nullptr)
  111. , Input_(nullptr)
  112. , FirstLine_(ReadFirstLine(Buffered_))
  113. , Headers_(&Buffered_)
  114. , KeepAlive_(false)
  115. , HasContentLength_(false)
  116. , ContentLength_(0)
  117. , ContentEncoded_(false)
  118. , Expect100Continue_(false)
  119. {
  120. BuildInputChain();
  121. Y_ASSERT(Input_);
  122. }
  123. static TString ReadFirstLine(TBufferedInput& in) {
  124. TString s;
  125. Y_ENSURE_EX(in.ReadLine(s), THttpReadException() << "Failed to get first line");
  126. return s;
  127. }
  128. inline ~TImpl() {
  129. }
  130. inline size_t Read(void* buf, size_t len) {
  131. return Perform(len, [this, buf](size_t toRead) { return Input_->Read(buf, toRead); });
  132. }
  133. inline size_t Skip(size_t len) {
  134. return Perform(len, [this](size_t toSkip) { return Input_->Skip(toSkip); });
  135. }
  136. inline const TString& FirstLine() const noexcept {
  137. return FirstLine_;
  138. }
  139. inline const THttpHeaders& Headers() const noexcept {
  140. return Headers_;
  141. }
  142. inline const TMaybe<THttpHeaders>& Trailers() const noexcept {
  143. return Trailers_;
  144. }
  145. inline bool IsKeepAlive() const noexcept {
  146. return KeepAlive_;
  147. }
  148. inline bool AcceptEncoding(const TString& s) const {
  149. return Codings_.find(to_lower(s)) != Codings_.end();
  150. }
  151. inline bool GetContentLength(ui64& value) const noexcept {
  152. if (HasContentLength_) {
  153. value = ContentLength_;
  154. return true;
  155. }
  156. return false;
  157. }
  158. inline bool ContentEncoded() const noexcept {
  159. return ContentEncoded_;
  160. }
  161. inline bool HasContent() const noexcept {
  162. return HasContentLength_ || ChunkedInput_;
  163. }
  164. inline bool HasExpect100Continue() const noexcept {
  165. return Expect100Continue_;
  166. }
  167. private:
  168. template <class Operation>
  169. inline size_t Perform(size_t len, const Operation& operation) {
  170. size_t processed = operation(len);
  171. if (processed == 0 && len > 0) {
  172. if (!ChunkedInput_) {
  173. Trailers_.ConstructInPlace();
  174. } else {
  175. // Read the header of the trailing chunk. It remains in
  176. // the TChunkedInput stream if the HTTP response is compressed.
  177. char symbol;
  178. if (ChunkedInput_->Read(&symbol, 1) != 0) {
  179. ythrow THttpParseException() << "some data remaining in TChunkedInput";
  180. }
  181. }
  182. }
  183. return processed;
  184. }
  185. struct TParsedHeaders {
  186. bool Chunked = false;
  187. bool KeepAlive = false;
  188. TStringBuf LZipped;
  189. };
  190. struct TTrEnc {
  191. inline void operator()(const TStringBuf& s) {
  192. if (s == TStringBuf("chunked")) {
  193. p->Chunked = true;
  194. }
  195. }
  196. TParsedHeaders* p;
  197. };
  198. struct TAccCoding {
  199. inline void operator()(const TStringBuf& s) {
  200. c->insert(ToString(s));
  201. }
  202. TAcceptCodings* c;
  203. };
  204. template <class Functor>
  205. inline void ForEach(TString in, Functor& f) {
  206. in.to_lower();
  207. const char* b = in.begin();
  208. const char* c = b;
  209. const char* e = in.end();
  210. while (c != e) {
  211. if (*c == ',') {
  212. f(RmSemiColon(Trim(b, c)));
  213. b = c + 1;
  214. }
  215. ++c;
  216. }
  217. if (b != c) {
  218. f(RmSemiColon(Trim(b, c)));
  219. }
  220. }
  221. inline bool IsRequest() const {
  222. // https://datatracker.ietf.org/doc/html/rfc7231#section-4
  223. // more rare methods: https://www.iana.org/assignments/http-methods/http-methods.xhtml
  224. return EqualToOneOf(to_lower(FirstLine().substr(0, FirstLine().find(" "))), "get", "post", "put", "head", "delete", "connect", "options", "trace", "patch");
  225. }
  226. inline void BuildInputChain() {
  227. TParsedHeaders p;
  228. size_t pos = FirstLine_.rfind(' ');
  229. // In HTTP/1.1 Keep-Alive is turned on by default
  230. if (pos != TString::npos && strcmp(FirstLine_.c_str() + pos + 1, "HTTP/1.1") == 0) {
  231. p.KeepAlive = true; //request
  232. } else if (strnicmp(FirstLine_.data(), "HTTP/1.1", 8) == 0) {
  233. p.KeepAlive = true; //reply
  234. }
  235. for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) {
  236. const THttpInputHeader& header = *h;
  237. switch (header.Name().size()) {
  238. HEADERCMP(header, "transfer-encoding") {
  239. TTrEnc f = {&p};
  240. ForEach(header.Value(), f);
  241. }
  242. break;
  243. HEADERCMP(header, "content-encoding") {
  244. p.LZipped = header.Value();
  245. }
  246. break;
  247. HEADERCMP(header, "accept-encoding") {
  248. TAccCoding f = {&Codings_};
  249. ForEach(header.Value(), f);
  250. }
  251. break;
  252. HEADERCMP(header, "content-length") {
  253. HasContentLength_ = true;
  254. ContentLength_ = FromString(header.Value());
  255. }
  256. break;
  257. HEADERCMP(header, "connection") {
  258. // accept header "Connection: Keep-Alive, TE"
  259. if (strnicmp(header.Value().data(), "keep-alive", 10) == 0) {
  260. p.KeepAlive = true;
  261. } else if (stricmp(header.Value().data(), "close") == 0) {
  262. p.KeepAlive = false;
  263. }
  264. }
  265. break;
  266. HEADERCMP(header, "expect") {
  267. auto findContinue = [&](const TStringBuf& s) {
  268. if (strnicmp(s.data(), "100-continue", 13) == 0) {
  269. Expect100Continue_ = true;
  270. }
  271. };
  272. ForEach(header.Value(), findContinue);
  273. }
  274. break;
  275. }
  276. }
  277. if (p.Chunked) {
  278. ChunkedInput_ = Streams_.Add(new TChunkedInput(&Buffered_, &Trailers_));
  279. Input_ = ChunkedInput_;
  280. } else {
  281. // disable buffering
  282. Buffered_.Reset(&Cnull);
  283. Input_ = Streams_.Add(new TMultiInput(&Buffered_, Slave_));
  284. if (IsRequest() || HasContentLength_) {
  285. /*
  286. * TODO - we have other cases
  287. */
  288. Input_ = Streams_.Add(new TLengthLimitedInput(Input_, ContentLength_));
  289. }
  290. }
  291. if (auto decoder = TCompressionCodecFactory::Instance().FindDecoder(p.LZipped)) {
  292. ContentEncoded_ = true;
  293. Input_ = Streams_.Add((*decoder)(Input_).Release());
  294. }
  295. KeepAlive_ = p.KeepAlive;
  296. }
  297. private:
  298. IInputStream* Slave_;
  299. /*
  300. * input helpers
  301. */
  302. TBufferedInput Buffered_;
  303. TStreams<IInputStream, 8> Streams_;
  304. IInputStream* ChunkedInput_;
  305. /*
  306. * final input stream
  307. */
  308. IInputStream* Input_;
  309. TString FirstLine_;
  310. THttpHeaders Headers_;
  311. TMaybe<THttpHeaders> Trailers_;
  312. bool KeepAlive_;
  313. TAcceptCodings Codings_;
  314. bool HasContentLength_;
  315. ui64 ContentLength_;
  316. bool ContentEncoded_;
  317. bool Expect100Continue_;
  318. };
  319. THttpInput::THttpInput(IInputStream* slave)
  320. : Impl_(new TImpl(slave))
  321. {
  322. }
  323. THttpInput::THttpInput(THttpInput&& httpInput) = default;
  324. THttpInput::~THttpInput() {
  325. }
  326. size_t THttpInput::DoRead(void* buf, size_t len) {
  327. return Impl_->Read(buf, len);
  328. }
  329. size_t THttpInput::DoSkip(size_t len) {
  330. return Impl_->Skip(len);
  331. }
  332. const THttpHeaders& THttpInput::Headers() const noexcept {
  333. return Impl_->Headers();
  334. }
  335. const TMaybe<THttpHeaders>& THttpInput::Trailers() const noexcept {
  336. return Impl_->Trailers();
  337. }
  338. const TString& THttpInput::FirstLine() const noexcept {
  339. return Impl_->FirstLine();
  340. }
  341. bool THttpInput::IsKeepAlive() const noexcept {
  342. return Impl_->IsKeepAlive();
  343. }
  344. bool THttpInput::AcceptEncoding(const TString& coding) const {
  345. return Impl_->AcceptEncoding(coding);
  346. }
  347. TString THttpInput::BestCompressionScheme(TArrayRef<const TStringBuf> codings) const {
  348. return NHttp::ChooseBestCompressionScheme(
  349. [this](const TString& coding) {
  350. return AcceptEncoding(coding);
  351. },
  352. codings
  353. );
  354. }
  355. TString THttpInput::BestCompressionScheme() const {
  356. return BestCompressionScheme(TCompressionCodecFactory::Instance().GetBestCodecs());
  357. }
  358. bool THttpInput::GetContentLength(ui64& value) const noexcept {
  359. return Impl_->GetContentLength(value);
  360. }
  361. bool THttpInput::ContentEncoded() const noexcept {
  362. return Impl_->ContentEncoded();
  363. }
  364. bool THttpInput::HasContent() const noexcept {
  365. return Impl_->HasContent();
  366. }
  367. bool THttpInput::HasExpect100Continue() const noexcept {
  368. return Impl_->HasExpect100Continue();
  369. }
  370. class THttpOutput::TImpl {
  371. class TSizeCalculator: public IOutputStream {
  372. public:
  373. inline TSizeCalculator() noexcept {
  374. }
  375. ~TSizeCalculator() override {
  376. }
  377. void DoWrite(const void* /*buf*/, size_t len) override {
  378. Length_ += len;
  379. }
  380. inline size_t Length() const noexcept {
  381. return Length_;
  382. }
  383. private:
  384. size_t Length_ = 0;
  385. };
  386. enum TState {
  387. Begin = 0,
  388. FirstLineSent = 1,
  389. HeadersSent = 2
  390. };
  391. struct TFlush {
  392. inline void operator()(IOutputStream* s) {
  393. s->Flush();
  394. }
  395. };
  396. struct TFinish {
  397. inline void operator()(IOutputStream* s) {
  398. s->Finish();
  399. }
  400. };
  401. public:
  402. inline TImpl(IOutputStream* slave, THttpInput* request)
  403. : Slave_(slave)
  404. , State_(Begin)
  405. , Output_(Slave_)
  406. , Request_(request)
  407. , Version_(1100)
  408. , KeepAliveEnabled_(false)
  409. , BodyEncodingEnabled_(true)
  410. , CompressionHeaderEnabled_(true)
  411. , Finished_(false)
  412. {
  413. }
  414. inline ~TImpl() {
  415. }
  416. inline void SendContinue() {
  417. Output_->Write("HTTP/1.1 100 Continue\r\n\r\n");
  418. Output_->Flush();
  419. }
  420. inline void Write(const void* buf, size_t len) {
  421. if (Finished_) {
  422. ythrow THttpException() << "can not write to finished stream";
  423. }
  424. if (State_ == HeadersSent) {
  425. Output_->Write(buf, len);
  426. return;
  427. }
  428. const char* b = (const char*)buf;
  429. const char* e = b + len;
  430. const char* c = b;
  431. while (c != e) {
  432. if (*c == '\n') {
  433. Line_.append(b, c);
  434. if (!Line_.empty() && Line_.back() == '\r') {
  435. Line_.pop_back();
  436. }
  437. b = c + 1;
  438. Process(Line_);
  439. if (State_ == HeadersSent) {
  440. Output_->Write(b, e - b);
  441. return;
  442. }
  443. Line_.clear();
  444. }
  445. ++c;
  446. }
  447. if (b != c) {
  448. Line_.append(b, c);
  449. }
  450. }
  451. inline void Flush() {
  452. TFlush f;
  453. Streams_.ForEach(f);
  454. Slave_->Flush(); // see SEARCH-1030
  455. }
  456. inline void Finish() {
  457. if (Finished_) {
  458. return;
  459. }
  460. TFinish f;
  461. Streams_.ForEach(f);
  462. Slave_->Finish(); // see SEARCH-1030
  463. Finished_ = true;
  464. }
  465. inline const THttpHeaders& SentHeaders() const noexcept {
  466. return Headers_;
  467. }
  468. inline void EnableCompression(TArrayRef<const TStringBuf> schemas) {
  469. ComprSchemas_ = schemas;
  470. }
  471. inline void EnableKeepAlive(bool enable) {
  472. KeepAliveEnabled_ = enable;
  473. }
  474. inline void EnableBodyEncoding(bool enable) {
  475. BodyEncodingEnabled_ = enable;
  476. }
  477. inline void EnableCompressionHeader(bool enable) {
  478. CompressionHeaderEnabled_ = enable;
  479. }
  480. inline bool IsCompressionEnabled() const noexcept {
  481. return !ComprSchemas_.empty();
  482. }
  483. inline bool IsKeepAliveEnabled() const noexcept {
  484. return KeepAliveEnabled_;
  485. }
  486. inline bool IsBodyEncodingEnabled() const noexcept {
  487. return BodyEncodingEnabled_;
  488. }
  489. inline bool IsCompressionHeaderEnabled() const noexcept {
  490. return CompressionHeaderEnabled_;
  491. }
  492. inline bool CanBeKeepAlive() const noexcept {
  493. return SupportChunkedTransfer() && IsKeepAliveEnabled() && (Request_ ? Request_->IsKeepAlive() : true);
  494. }
  495. inline const TString& FirstLine() const noexcept {
  496. return FirstLine_;
  497. }
  498. inline size_t SentSize() const noexcept {
  499. return SizeCalculator_.Length();
  500. }
  501. private:
  502. static inline bool IsResponse(const TString& s) noexcept {
  503. return strnicmp(s.data(), "HTTP/", 5) == 0;
  504. }
  505. static inline bool IsRequest(const TString& s) noexcept {
  506. return !IsResponse(s);
  507. }
  508. inline bool IsHttpRequest() const noexcept {
  509. return IsRequest(FirstLine_);
  510. }
  511. inline bool HasResponseBody() const noexcept {
  512. if (IsHttpResponse()) {
  513. if (Request_ && Request_->FirstLine().StartsWith(TStringBuf("HEAD")))
  514. return false;
  515. if (FirstLine_.size() > 9 && strncmp(FirstLine_.data() + 9, "204", 3) == 0)
  516. return false;
  517. return true;
  518. }
  519. return false;
  520. }
  521. inline bool IsHttpResponse() const noexcept {
  522. return IsResponse(FirstLine_);
  523. }
  524. inline bool HasRequestBody() const noexcept {
  525. return strnicmp(FirstLine_.data(), "POST", 4) == 0 ||
  526. strnicmp(FirstLine_.data(), "PATCH", 5) == 0 ||
  527. strnicmp(FirstLine_.data(), "PUT", 3) == 0;
  528. }
  529. static inline size_t ParseHttpVersion(const TString& s) {
  530. if (s.empty()) {
  531. ythrow THttpParseException() << "malformed http stream";
  532. }
  533. size_t parsed_version = 0;
  534. if (IsResponse(s)) {
  535. const char* b = s.data() + 5;
  536. while (*b && *b != ' ') {
  537. if (*b != '.') {
  538. parsed_version *= 10;
  539. parsed_version += (*b - '0');
  540. }
  541. ++b;
  542. }
  543. } else {
  544. /*
  545. * s not empty here
  546. */
  547. const char* e = s.end() - 1;
  548. const char* b = s.begin();
  549. size_t mult = 1;
  550. while (e != b && *e != '/') {
  551. if (*e != '.') {
  552. parsed_version += (*e - '0') * mult;
  553. mult *= 10;
  554. }
  555. --e;
  556. }
  557. }
  558. return parsed_version * 100;
  559. }
  560. inline void ParseHttpVersion() {
  561. size_t parsed_version = ParseHttpVersion(FirstLine_);
  562. if (Request_) {
  563. parsed_version = Min(parsed_version, ParseHttpVersion(Request_->FirstLine()));
  564. }
  565. Version_ = parsed_version;
  566. }
  567. inline void Process(const TString& s) {
  568. Y_ASSERT(State_ != HeadersSent);
  569. if (State_ == Begin) {
  570. FirstLine_ = s;
  571. ParseHttpVersion();
  572. State_ = FirstLineSent;
  573. } else {
  574. if (s.empty()) {
  575. BuildOutputStream();
  576. WriteCached();
  577. State_ = HeadersSent;
  578. } else {
  579. AddHeader(THttpInputHeader(s));
  580. }
  581. }
  582. }
  583. inline void WriteCachedImpl(IOutputStream* s) const {
  584. s->Write(FirstLine_.data(), FirstLine_.size());
  585. s->Write("\r\n", 2);
  586. Headers_.OutTo(s);
  587. s->Write("\r\n", 2);
  588. s->Finish();
  589. }
  590. inline void WriteCached() {
  591. size_t buflen = 0;
  592. {
  593. TSizeCalculator out;
  594. WriteCachedImpl(&out);
  595. buflen = out.Length();
  596. }
  597. {
  598. TBufferedOutput out(Slave_, buflen);
  599. WriteCachedImpl(&out);
  600. }
  601. if (IsHttpRequest() && !HasRequestBody()) {
  602. /*
  603. * if this is http request, then send it now
  604. */
  605. Slave_->Flush();
  606. }
  607. }
  608. inline bool SupportChunkedTransfer() const noexcept {
  609. return Version_ >= 1100;
  610. }
  611. inline void BuildOutputStream() {
  612. if (CanBeKeepAlive()) {
  613. AddOrReplaceHeader(THttpInputHeader("Connection", "Keep-Alive"));
  614. } else {
  615. AddOrReplaceHeader(THttpInputHeader("Connection", "Close"));
  616. }
  617. if (IsHttpResponse()) {
  618. if (Request_ && IsCompressionEnabled() && HasResponseBody()) {
  619. TString scheme = Request_->BestCompressionScheme(ComprSchemas_);
  620. if (scheme != "identity") {
  621. AddOrReplaceHeader(THttpInputHeader("Content-Encoding", scheme));
  622. RemoveHeader("Content-Length");
  623. }
  624. }
  625. RebuildStream();
  626. } else {
  627. if (IsCompressionEnabled()) {
  628. AddOrReplaceHeader(THttpInputHeader("Accept-Encoding", BuildAcceptEncoding()));
  629. }
  630. if (HasRequestBody()) {
  631. RebuildStream();
  632. }
  633. }
  634. }
  635. inline TString BuildAcceptEncoding() const {
  636. TString ret;
  637. for (const auto& coding : ComprSchemas_) {
  638. if (ret) {
  639. ret += ", ";
  640. }
  641. ret += coding;
  642. }
  643. return ret;
  644. }
  645. inline void RebuildStream() {
  646. bool keepAlive = false;
  647. const TCompressionCodecFactory::TEncoderConstructor* encoder = nullptr;
  648. bool chunked = false;
  649. bool haveContentLength = false;
  650. for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) {
  651. const THttpInputHeader& header = *h;
  652. const TString hl = to_lower(header.Name());
  653. if (hl == TStringBuf("connection")) {
  654. keepAlive = to_lower(header.Value()) == TStringBuf("keep-alive");
  655. } else if (IsCompressionHeaderEnabled() && hl == TStringBuf("content-encoding")) {
  656. encoder = TCompressionCodecFactory::Instance().FindEncoder(to_lower(header.Value()));
  657. } else if (hl == TStringBuf("transfer-encoding")) {
  658. chunked = to_lower(header.Value()) == TStringBuf("chunked");
  659. } else if (hl == TStringBuf("content-length")) {
  660. haveContentLength = true;
  661. }
  662. }
  663. if (!haveContentLength && !chunked && (IsHttpRequest() || HasResponseBody()) && SupportChunkedTransfer() && (keepAlive || encoder || IsHttpRequest())) {
  664. AddHeader(THttpInputHeader("Transfer-Encoding", "chunked"));
  665. chunked = true;
  666. }
  667. if (IsBodyEncodingEnabled() && chunked) {
  668. Output_ = Streams_.Add(new TChunkedOutput(Output_));
  669. }
  670. Output_ = Streams_.Add(new TTeeOutput(Output_, &SizeCalculator_));
  671. if (IsBodyEncodingEnabled() && encoder) {
  672. Output_ = Streams_.Add((*encoder)(Output_).Release());
  673. }
  674. }
  675. inline void AddHeader(const THttpInputHeader& hdr) {
  676. Headers_.AddHeader(hdr);
  677. }
  678. inline void AddOrReplaceHeader(const THttpInputHeader& hdr) {
  679. Headers_.AddOrReplaceHeader(hdr);
  680. }
  681. inline void RemoveHeader(const TString& hdr) {
  682. Headers_.RemoveHeader(hdr);
  683. }
  684. private:
  685. IOutputStream* Slave_;
  686. TState State_;
  687. IOutputStream* Output_;
  688. TStreams<IOutputStream, 8> Streams_;
  689. TString Line_;
  690. TString FirstLine_;
  691. THttpHeaders Headers_;
  692. THttpInput* Request_;
  693. size_t Version_;
  694. TArrayRef<const TStringBuf> ComprSchemas_;
  695. bool KeepAliveEnabled_;
  696. bool BodyEncodingEnabled_;
  697. bool CompressionHeaderEnabled_;
  698. bool Finished_;
  699. TSizeCalculator SizeCalculator_;
  700. };
  701. THttpOutput::THttpOutput(IOutputStream* slave)
  702. : Impl_(new TImpl(slave, nullptr))
  703. {
  704. }
  705. THttpOutput::THttpOutput(IOutputStream* slave, THttpInput* request)
  706. : Impl_(new TImpl(slave, request))
  707. {
  708. }
  709. THttpOutput::~THttpOutput() {
  710. try {
  711. Finish();
  712. } catch (...) {
  713. }
  714. }
  715. void THttpOutput::DoWrite(const void* buf, size_t len) {
  716. Impl_->Write(buf, len);
  717. }
  718. void THttpOutput::DoFlush() {
  719. Impl_->Flush();
  720. }
  721. void THttpOutput::DoFinish() {
  722. Impl_->Finish();
  723. }
  724. const THttpHeaders& THttpOutput::SentHeaders() const noexcept {
  725. return Impl_->SentHeaders();
  726. }
  727. void THttpOutput::EnableCompression(bool enable) {
  728. if (enable) {
  729. EnableCompression(TCompressionCodecFactory::Instance().GetBestCodecs());
  730. } else {
  731. TArrayRef<TStringBuf> codings;
  732. EnableCompression(codings);
  733. }
  734. }
  735. void THttpOutput::EnableCompression(TArrayRef<const TStringBuf> schemas) {
  736. Impl_->EnableCompression(schemas);
  737. }
  738. void THttpOutput::EnableKeepAlive(bool enable) {
  739. Impl_->EnableKeepAlive(enable);
  740. }
  741. void THttpOutput::EnableBodyEncoding(bool enable) {
  742. Impl_->EnableBodyEncoding(enable);
  743. }
  744. void THttpOutput::EnableCompressionHeader(bool enable) {
  745. Impl_->EnableCompressionHeader(enable);
  746. }
  747. bool THttpOutput::IsKeepAliveEnabled() const noexcept {
  748. return Impl_->IsKeepAliveEnabled();
  749. }
  750. bool THttpOutput::IsBodyEncodingEnabled() const noexcept {
  751. return Impl_->IsBodyEncodingEnabled();
  752. }
  753. bool THttpOutput::IsCompressionEnabled() const noexcept {
  754. return Impl_->IsCompressionEnabled();
  755. }
  756. bool THttpOutput::IsCompressionHeaderEnabled() const noexcept {
  757. return Impl_->IsCompressionHeaderEnabled();
  758. }
  759. bool THttpOutput::CanBeKeepAlive() const noexcept {
  760. return Impl_->CanBeKeepAlive();
  761. }
  762. void THttpOutput::SendContinue() {
  763. Impl_->SendContinue();
  764. }
  765. const TString& THttpOutput::FirstLine() const noexcept {
  766. return Impl_->FirstLine();
  767. }
  768. size_t THttpOutput::SentSize() const noexcept {
  769. return Impl_->SentSize();
  770. }
  771. unsigned ParseHttpRetCode(const TStringBuf& ret) {
  772. const TStringBuf code = StripString(StripString(ret.After(' ')).Before(' '));
  773. return FromString<unsigned>(code.data(), code.size());
  774. }
  775. void SendMinimalHttpRequest(TSocket& s, const TStringBuf& host, const TStringBuf& request, const TStringBuf& agent, const TStringBuf& from) {
  776. TSocketOutput so(s);
  777. THttpOutput output(&so);
  778. output.EnableKeepAlive(false);
  779. output.EnableCompression(false);
  780. const IOutputStream::TPart parts[] = {
  781. IOutputStream::TPart(TStringBuf("GET ")),
  782. IOutputStream::TPart(request),
  783. IOutputStream::TPart(TStringBuf(" HTTP/1.1")),
  784. IOutputStream::TPart::CrLf(),
  785. IOutputStream::TPart(TStringBuf("Host: ")),
  786. IOutputStream::TPart(host),
  787. IOutputStream::TPart::CrLf(),
  788. IOutputStream::TPart(TStringBuf("User-Agent: ")),
  789. IOutputStream::TPart(agent),
  790. IOutputStream::TPart::CrLf(),
  791. IOutputStream::TPart(TStringBuf("From: ")),
  792. IOutputStream::TPart(from),
  793. IOutputStream::TPart::CrLf(),
  794. IOutputStream::TPart::CrLf(),
  795. };
  796. output.Write(parts, sizeof(parts) / sizeof(*parts));
  797. output.Finish();
  798. }
  799. TArrayRef<const TStringBuf> SupportedCodings() {
  800. return TCompressionCodecFactory::Instance().GetBestCodecs();
  801. }