stream.cpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005
  1. #include "stream.h"
  2. #include "compression.h"
  3. #include "chunk.h"
  4. #include <util/stream/buffered.h>
  5. #include <util/stream/length.h>
  6. #include <util/stream/multi.h>
  7. #include <util/stream/null.h>
  8. #include <util/stream/tee.h>
  9. #include <util/system/compat.h>
  10. #include <util/system/yassert.h>
  11. #include <util/network/socket.h>
  12. #include <util/string/cast.h>
  13. #include <util/string/strip.h>
  14. #include <util/generic/string.h>
  15. #include <util/generic/utility.h>
  16. #include <util/generic/hash_set.h>
  17. #include <util/generic/yexception.h>
  18. #define HEADERCMP(header, str) \
  19. case sizeof(str) - 1: \
  20. if (!stricmp((header).Name().data(), str))
  21. namespace {
  22. inline size_t SuggestBufferSize() {
  23. return 8192;
  24. }
  25. inline TStringBuf Trim(const char* b, const char* e) noexcept {
  26. return StripString(TStringBuf(b, e));
  27. }
  28. inline TStringBuf RmSemiColon(const TStringBuf& s) {
  29. return s.Before(';');
  30. }
  31. template <class T, size_t N>
  32. class TStreams: private TNonCopyable {
  33. struct TDelete {
  34. inline void operator()(T* t) noexcept {
  35. delete t;
  36. }
  37. };
  38. typedef T* TPtr;
  39. public:
  40. inline TStreams() noexcept
  41. : Beg_(T_ + N)
  42. {
  43. }
  44. inline ~TStreams() {
  45. TDelete f;
  46. ForEach(f);
  47. }
  48. template <class S>
  49. inline S* Add(S* t) noexcept {
  50. return (S*)AddImpl((T*)t);
  51. }
  52. template <class Functor>
  53. inline void ForEach(Functor& f) {
  54. const TPtr* end = T_ + N;
  55. for (TPtr* cur = Beg_; cur != end; ++cur) {
  56. f(*cur);
  57. }
  58. }
  59. TPtr Top() {
  60. const TPtr* end = T_ + N;
  61. return end == Beg_ ? nullptr : *Beg_;
  62. }
  63. private:
  64. inline T* AddImpl(T* t) noexcept {
  65. Y_ASSERT(Beg_ > T_);
  66. return (*--Beg_ = t);
  67. }
  68. private:
  69. TPtr T_[N];
  70. TPtr* Beg_;
  71. };
  72. template <class TStream>
  73. class TLazy: public IOutputStream {
  74. public:
  75. TLazy(IOutputStream* out, ui16 bs)
  76. : Output_(out)
  77. , BlockSize_(bs)
  78. {
  79. }
  80. void DoWrite(const void* buf, size_t len) override {
  81. ConstructSlave();
  82. Slave_->Write(buf, len);
  83. }
  84. void DoFlush() override {
  85. ConstructSlave();
  86. Slave_->Flush();
  87. }
  88. void DoFinish() override {
  89. ConstructSlave();
  90. Slave_->Finish();
  91. }
  92. private:
  93. inline void ConstructSlave() {
  94. if (!Slave_) {
  95. Slave_.Reset(new TStream(Output_, BlockSize_));
  96. }
  97. }
  98. private:
  99. IOutputStream* Output_;
  100. ui16 BlockSize_;
  101. THolder<IOutputStream> Slave_;
  102. };
  103. }
  104. class THttpInput::TImpl {
  105. typedef THashSet<TString> TAcceptCodings;
  106. public:
  107. inline TImpl(IInputStream* slave)
  108. : Slave_(slave)
  109. , Buffered_(Slave_, SuggestBufferSize())
  110. , ChunkedInput_(nullptr)
  111. , Input_(nullptr)
  112. , FirstLine_(ReadFirstLine(Buffered_))
  113. , Headers_(&Buffered_)
  114. , KeepAlive_(false)
  115. , HasContentLength_(false)
  116. , ContentLength_(0)
  117. , ContentEncoded_(false)
  118. , Expect100Continue_(false)
  119. {
  120. BuildInputChain();
  121. Y_ASSERT(Input_);
  122. }
  123. static TString ReadFirstLine(TBufferedInput& in) {
  124. TString s;
  125. Y_ENSURE_EX(in.ReadLine(s), THttpReadException() << "Failed to get first line");
  126. return s;
  127. }
  128. inline ~TImpl() {
  129. }
  130. inline size_t Read(void* buf, size_t len) {
  131. return Perform(len, [this, buf](size_t toRead) { return Input_->Read(buf, toRead); });
  132. }
  133. inline size_t Skip(size_t len) {
  134. return Perform(len, [this](size_t toSkip) { return Input_->Skip(toSkip); });
  135. }
  136. inline const TString& FirstLine() const noexcept {
  137. return FirstLine_;
  138. }
  139. inline const THttpHeaders& Headers() const noexcept {
  140. return Headers_;
  141. }
  142. inline const TMaybe<THttpHeaders>& Trailers() const noexcept {
  143. return Trailers_;
  144. }
  145. inline bool IsKeepAlive() const noexcept {
  146. return KeepAlive_;
  147. }
  148. inline bool AcceptEncoding(const TString& s) const {
  149. return Codings_.find(to_lower(s)) != Codings_.end();
  150. }
  151. inline bool GetContentLength(ui64& value) const noexcept {
  152. if (HasContentLength_) {
  153. value = ContentLength_;
  154. return true;
  155. }
  156. return false;
  157. }
  158. inline bool ContentEncoded() const noexcept {
  159. return ContentEncoded_;
  160. }
  161. inline bool HasContent() const noexcept {
  162. return HasContentLength_ || ChunkedInput_;
  163. }
  164. inline bool HasExpect100Continue() const noexcept {
  165. return Expect100Continue_;
  166. }
  167. private:
  168. template <class Operation>
  169. inline size_t Perform(size_t len, const Operation& operation) {
  170. size_t processed = operation(len);
  171. if (processed == 0 && len > 0) {
  172. if (!ChunkedInput_) {
  173. Trailers_.ConstructInPlace();
  174. } else {
  175. // Read the header of the trailing chunk. It remains in
  176. // the TChunkedInput stream if the HTTP response is compressed.
  177. char symbol;
  178. if (ChunkedInput_->Read(&symbol, 1) != 0) {
  179. ythrow THttpParseException() << "some data remaining in TChunkedInput";
  180. }
  181. }
  182. }
  183. return processed;
  184. }
  185. struct TParsedHeaders {
  186. bool Chunked = false;
  187. bool KeepAlive = false;
  188. TStringBuf LZipped;
  189. };
  190. struct TTrEnc {
  191. inline void operator()(const TStringBuf& s) {
  192. if (s == TStringBuf("chunked")) {
  193. p->Chunked = true;
  194. }
  195. }
  196. TParsedHeaders* p;
  197. };
  198. struct TAccCoding {
  199. inline void operator()(const TStringBuf& s) {
  200. c->insert(ToString(s));
  201. }
  202. TAcceptCodings* c;
  203. };
  204. template <class Functor>
  205. inline void ForEach(TString in, Functor& f) {
  206. in.to_lower();
  207. const char* b = in.begin();
  208. const char* c = b;
  209. const char* e = in.end();
  210. while (c != e) {
  211. if (*c == ',') {
  212. f(RmSemiColon(Trim(b, c)));
  213. b = c + 1;
  214. }
  215. ++c;
  216. }
  217. if (b != c) {
  218. f(RmSemiColon(Trim(b, c)));
  219. }
  220. }
  221. inline bool IsRequest() const {
  222. return strnicmp(FirstLine().data(), "get", 3) == 0 ||
  223. strnicmp(FirstLine().data(), "post", 4) == 0 ||
  224. strnicmp(FirstLine().data(), "put", 3) == 0 ||
  225. strnicmp(FirstLine().data(), "patch", 5) == 0 ||
  226. strnicmp(FirstLine().data(), "head", 4) == 0 ||
  227. strnicmp(FirstLine().data(), "delete", 6) == 0;
  228. }
  229. inline void BuildInputChain() {
  230. TParsedHeaders p;
  231. size_t pos = FirstLine_.rfind(' ');
  232. // In HTTP/1.1 Keep-Alive is turned on by default
  233. if (pos != TString::npos && strcmp(FirstLine_.c_str() + pos + 1, "HTTP/1.1") == 0) {
  234. p.KeepAlive = true; //request
  235. } else if (strnicmp(FirstLine_.data(), "HTTP/1.1", 8) == 0) {
  236. p.KeepAlive = true; //reply
  237. }
  238. for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) {
  239. const THttpInputHeader& header = *h;
  240. switch (header.Name().size()) {
  241. HEADERCMP(header, "transfer-encoding") {
  242. TTrEnc f = {&p};
  243. ForEach(header.Value(), f);
  244. }
  245. break;
  246. HEADERCMP(header, "content-encoding") {
  247. p.LZipped = header.Value();
  248. }
  249. break;
  250. HEADERCMP(header, "accept-encoding") {
  251. TAccCoding f = {&Codings_};
  252. ForEach(header.Value(), f);
  253. }
  254. break;
  255. HEADERCMP(header, "content-length") {
  256. HasContentLength_ = true;
  257. ContentLength_ = FromString(header.Value());
  258. }
  259. break;
  260. HEADERCMP(header, "connection") {
  261. // accept header "Connection: Keep-Alive, TE"
  262. if (strnicmp(header.Value().data(), "keep-alive", 10) == 0) {
  263. p.KeepAlive = true;
  264. } else if (stricmp(header.Value().data(), "close") == 0) {
  265. p.KeepAlive = false;
  266. }
  267. }
  268. [[fallthrough]];
  269. HEADERCMP(header, "expect") {
  270. auto findContinue = [&](const TStringBuf& s) {
  271. if (strnicmp(s.data(), "100-continue", 13) == 0) {
  272. Expect100Continue_ = true;
  273. }
  274. };
  275. ForEach(header.Value(), findContinue);
  276. }
  277. break;
  278. }
  279. }
  280. if (p.Chunked) {
  281. ChunkedInput_ = Streams_.Add(new TChunkedInput(&Buffered_, &Trailers_));
  282. Input_ = ChunkedInput_;
  283. } else {
  284. // disable buffering
  285. Buffered_.Reset(&Cnull);
  286. Input_ = Streams_.Add(new TMultiInput(&Buffered_, Slave_));
  287. if (IsRequest() || HasContentLength_) {
  288. /*
  289. * TODO - we have other cases
  290. */
  291. Input_ = Streams_.Add(new TLengthLimitedInput(Input_, ContentLength_));
  292. }
  293. }
  294. if (auto decoder = TCompressionCodecFactory::Instance().FindDecoder(p.LZipped)) {
  295. ContentEncoded_ = true;
  296. Input_ = Streams_.Add((*decoder)(Input_).Release());
  297. }
  298. KeepAlive_ = p.KeepAlive;
  299. }
  300. private:
  301. IInputStream* Slave_;
  302. /*
  303. * input helpers
  304. */
  305. TBufferedInput Buffered_;
  306. TStreams<IInputStream, 8> Streams_;
  307. IInputStream* ChunkedInput_;
  308. /*
  309. * final input stream
  310. */
  311. IInputStream* Input_;
  312. TString FirstLine_;
  313. THttpHeaders Headers_;
  314. TMaybe<THttpHeaders> Trailers_;
  315. bool KeepAlive_;
  316. TAcceptCodings Codings_;
  317. bool HasContentLength_;
  318. ui64 ContentLength_;
  319. bool ContentEncoded_;
  320. bool Expect100Continue_;
  321. };
  322. THttpInput::THttpInput(IInputStream* slave)
  323. : Impl_(new TImpl(slave))
  324. {
  325. }
  326. THttpInput::THttpInput(THttpInput&& httpInput) = default;
  327. THttpInput::~THttpInput() {
  328. }
  329. size_t THttpInput::DoRead(void* buf, size_t len) {
  330. return Impl_->Read(buf, len);
  331. }
  332. size_t THttpInput::DoSkip(size_t len) {
  333. return Impl_->Skip(len);
  334. }
  335. const THttpHeaders& THttpInput::Headers() const noexcept {
  336. return Impl_->Headers();
  337. }
  338. const TMaybe<THttpHeaders>& THttpInput::Trailers() const noexcept {
  339. return Impl_->Trailers();
  340. }
  341. const TString& THttpInput::FirstLine() const noexcept {
  342. return Impl_->FirstLine();
  343. }
  344. bool THttpInput::IsKeepAlive() const noexcept {
  345. return Impl_->IsKeepAlive();
  346. }
  347. bool THttpInput::AcceptEncoding(const TString& coding) const {
  348. return Impl_->AcceptEncoding(coding);
  349. }
  350. TString THttpInput::BestCompressionScheme(TArrayRef<const TStringBuf> codings) const {
  351. return NHttp::ChooseBestCompressionScheme(
  352. [this](const TString& coding) {
  353. return AcceptEncoding(coding);
  354. },
  355. codings
  356. );
  357. }
  358. TString THttpInput::BestCompressionScheme() const {
  359. return BestCompressionScheme(TCompressionCodecFactory::Instance().GetBestCodecs());
  360. }
  361. bool THttpInput::GetContentLength(ui64& value) const noexcept {
  362. return Impl_->GetContentLength(value);
  363. }
  364. bool THttpInput::ContentEncoded() const noexcept {
  365. return Impl_->ContentEncoded();
  366. }
  367. bool THttpInput::HasContent() const noexcept {
  368. return Impl_->HasContent();
  369. }
  370. bool THttpInput::HasExpect100Continue() const noexcept {
  371. return Impl_->HasExpect100Continue();
  372. }
  373. class THttpOutput::TImpl {
  374. class TSizeCalculator: public IOutputStream {
  375. public:
  376. inline TSizeCalculator() noexcept {
  377. }
  378. ~TSizeCalculator() override {
  379. }
  380. void DoWrite(const void* /*buf*/, size_t len) override {
  381. Length_ += len;
  382. }
  383. inline size_t Length() const noexcept {
  384. return Length_;
  385. }
  386. private:
  387. size_t Length_ = 0;
  388. };
  389. enum TState {
  390. Begin = 0,
  391. FirstLineSent = 1,
  392. HeadersSent = 2
  393. };
  394. struct TFlush {
  395. inline void operator()(IOutputStream* s) {
  396. s->Flush();
  397. }
  398. };
  399. struct TFinish {
  400. inline void operator()(IOutputStream* s) {
  401. s->Finish();
  402. }
  403. };
  404. public:
  405. inline TImpl(IOutputStream* slave, THttpInput* request)
  406. : Slave_(slave)
  407. , State_(Begin)
  408. , Output_(Slave_)
  409. , Request_(request)
  410. , Version_(1100)
  411. , KeepAliveEnabled_(false)
  412. , BodyEncodingEnabled_(true)
  413. , CompressionHeaderEnabled_(true)
  414. , Finished_(false)
  415. {
  416. }
  417. inline ~TImpl() {
  418. }
  419. inline void SendContinue() {
  420. Output_->Write("HTTP/1.1 100 Continue\r\n\r\n");
  421. Output_->Flush();
  422. }
  423. inline void Write(const void* buf, size_t len) {
  424. if (Finished_) {
  425. ythrow THttpException() << "can not write to finished stream";
  426. }
  427. if (State_ == HeadersSent) {
  428. Output_->Write(buf, len);
  429. return;
  430. }
  431. const char* b = (const char*)buf;
  432. const char* e = b + len;
  433. const char* c = b;
  434. while (c != e) {
  435. if (*c == '\n') {
  436. Line_.append(b, c);
  437. if (!Line_.empty() && Line_.back() == '\r') {
  438. Line_.pop_back();
  439. }
  440. b = c + 1;
  441. Process(Line_);
  442. if (State_ == HeadersSent) {
  443. Output_->Write(b, e - b);
  444. return;
  445. }
  446. Line_.clear();
  447. }
  448. ++c;
  449. }
  450. if (b != c) {
  451. Line_.append(b, c);
  452. }
  453. }
  454. inline void Flush() {
  455. TFlush f;
  456. Streams_.ForEach(f);
  457. Slave_->Flush(); // see SEARCH-1030
  458. }
  459. inline void Finish() {
  460. if (Finished_) {
  461. return;
  462. }
  463. TFinish f;
  464. Streams_.ForEach(f);
  465. Slave_->Finish(); // see SEARCH-1030
  466. Finished_ = true;
  467. }
  468. inline const THttpHeaders& SentHeaders() const noexcept {
  469. return Headers_;
  470. }
  471. inline void EnableCompression(TArrayRef<const TStringBuf> schemas) {
  472. ComprSchemas_ = schemas;
  473. }
  474. inline void EnableKeepAlive(bool enable) {
  475. KeepAliveEnabled_ = enable;
  476. }
  477. inline void EnableBodyEncoding(bool enable) {
  478. BodyEncodingEnabled_ = enable;
  479. }
  480. inline void EnableCompressionHeader(bool enable) {
  481. CompressionHeaderEnabled_ = enable;
  482. }
  483. inline bool IsCompressionEnabled() const noexcept {
  484. return !ComprSchemas_.empty();
  485. }
  486. inline bool IsKeepAliveEnabled() const noexcept {
  487. return KeepAliveEnabled_;
  488. }
  489. inline bool IsBodyEncodingEnabled() const noexcept {
  490. return BodyEncodingEnabled_;
  491. }
  492. inline bool IsCompressionHeaderEnabled() const noexcept {
  493. return CompressionHeaderEnabled_;
  494. }
  495. inline bool CanBeKeepAlive() const noexcept {
  496. return SupportChunkedTransfer() && IsKeepAliveEnabled() && (Request_ ? Request_->IsKeepAlive() : true);
  497. }
  498. inline const TString& FirstLine() const noexcept {
  499. return FirstLine_;
  500. }
  501. inline size_t SentSize() const noexcept {
  502. return SizeCalculator_.Length();
  503. }
  504. private:
  505. static inline bool IsResponse(const TString& s) noexcept {
  506. return strnicmp(s.data(), "HTTP/", 5) == 0;
  507. }
  508. static inline bool IsRequest(const TString& s) noexcept {
  509. return !IsResponse(s);
  510. }
  511. inline bool IsHttpRequest() const noexcept {
  512. return IsRequest(FirstLine_);
  513. }
  514. inline bool HasResponseBody() const noexcept {
  515. if (IsHttpResponse()) {
  516. if (Request_ && Request_->FirstLine().StartsWith(TStringBuf("HEAD")))
  517. return false;
  518. if (FirstLine_.size() > 9 && strncmp(FirstLine_.data() + 9, "204", 3) == 0)
  519. return false;
  520. return true;
  521. }
  522. return false;
  523. }
  524. inline bool IsHttpResponse() const noexcept {
  525. return IsResponse(FirstLine_);
  526. }
  527. inline bool HasRequestBody() const noexcept {
  528. return strnicmp(FirstLine_.data(), "POST", 4) == 0 ||
  529. strnicmp(FirstLine_.data(), "PATCH", 5) == 0 ||
  530. strnicmp(FirstLine_.data(), "PUT", 3) == 0;
  531. }
  532. static inline size_t ParseHttpVersion(const TString& s) {
  533. if (s.empty()) {
  534. ythrow THttpParseException() << "malformed http stream";
  535. }
  536. size_t parsed_version = 0;
  537. if (IsResponse(s)) {
  538. const char* b = s.data() + 5;
  539. while (*b && *b != ' ') {
  540. if (*b != '.') {
  541. parsed_version *= 10;
  542. parsed_version += (*b - '0');
  543. }
  544. ++b;
  545. }
  546. } else {
  547. /*
  548. * s not empty here
  549. */
  550. const char* e = s.end() - 1;
  551. const char* b = s.begin();
  552. size_t mult = 1;
  553. while (e != b && *e != '/') {
  554. if (*e != '.') {
  555. parsed_version += (*e - '0') * mult;
  556. mult *= 10;
  557. }
  558. --e;
  559. }
  560. }
  561. return parsed_version * 100;
  562. }
  563. inline void ParseHttpVersion() {
  564. size_t parsed_version = ParseHttpVersion(FirstLine_);
  565. if (Request_) {
  566. parsed_version = Min(parsed_version, ParseHttpVersion(Request_->FirstLine()));
  567. }
  568. Version_ = parsed_version;
  569. }
  570. inline void Process(const TString& s) {
  571. Y_ASSERT(State_ != HeadersSent);
  572. if (State_ == Begin) {
  573. FirstLine_ = s;
  574. ParseHttpVersion();
  575. State_ = FirstLineSent;
  576. } else {
  577. if (s.empty()) {
  578. BuildOutputStream();
  579. WriteCached();
  580. State_ = HeadersSent;
  581. } else {
  582. AddHeader(THttpInputHeader(s));
  583. }
  584. }
  585. }
  586. inline void WriteCachedImpl(IOutputStream* s) const {
  587. s->Write(FirstLine_.data(), FirstLine_.size());
  588. s->Write("\r\n", 2);
  589. Headers_.OutTo(s);
  590. s->Write("\r\n", 2);
  591. s->Finish();
  592. }
  593. inline void WriteCached() {
  594. size_t buflen = 0;
  595. {
  596. TSizeCalculator out;
  597. WriteCachedImpl(&out);
  598. buflen = out.Length();
  599. }
  600. {
  601. TBufferedOutput out(Slave_, buflen);
  602. WriteCachedImpl(&out);
  603. }
  604. if (IsHttpRequest() && !HasRequestBody()) {
  605. /*
  606. * if this is http request, then send it now
  607. */
  608. Slave_->Flush();
  609. }
  610. }
  611. inline bool SupportChunkedTransfer() const noexcept {
  612. return Version_ >= 1100;
  613. }
  614. inline void BuildOutputStream() {
  615. if (CanBeKeepAlive()) {
  616. AddOrReplaceHeader(THttpInputHeader("Connection", "Keep-Alive"));
  617. } else {
  618. AddOrReplaceHeader(THttpInputHeader("Connection", "Close"));
  619. }
  620. if (IsHttpResponse()) {
  621. if (Request_ && IsCompressionEnabled() && HasResponseBody()) {
  622. TString scheme = Request_->BestCompressionScheme(ComprSchemas_);
  623. if (scheme != "identity") {
  624. AddOrReplaceHeader(THttpInputHeader("Content-Encoding", scheme));
  625. RemoveHeader("Content-Length");
  626. }
  627. }
  628. RebuildStream();
  629. } else {
  630. if (IsCompressionEnabled()) {
  631. AddOrReplaceHeader(THttpInputHeader("Accept-Encoding", BuildAcceptEncoding()));
  632. }
  633. if (HasRequestBody()) {
  634. RebuildStream();
  635. }
  636. }
  637. }
  638. inline TString BuildAcceptEncoding() const {
  639. TString ret;
  640. for (const auto& coding : ComprSchemas_) {
  641. if (ret) {
  642. ret += ", ";
  643. }
  644. ret += coding;
  645. }
  646. return ret;
  647. }
  648. inline void RebuildStream() {
  649. bool keepAlive = false;
  650. const TCompressionCodecFactory::TEncoderConstructor* encoder = nullptr;
  651. bool chunked = false;
  652. bool haveContentLength = false;
  653. for (THttpHeaders::TConstIterator h = Headers_.Begin(); h != Headers_.End(); ++h) {
  654. const THttpInputHeader& header = *h;
  655. const TString hl = to_lower(header.Name());
  656. if (hl == TStringBuf("connection")) {
  657. keepAlive = to_lower(header.Value()) == TStringBuf("keep-alive");
  658. } else if (IsCompressionHeaderEnabled() && hl == TStringBuf("content-encoding")) {
  659. encoder = TCompressionCodecFactory::Instance().FindEncoder(to_lower(header.Value()));
  660. } else if (hl == TStringBuf("transfer-encoding")) {
  661. chunked = to_lower(header.Value()) == TStringBuf("chunked");
  662. } else if (hl == TStringBuf("content-length")) {
  663. haveContentLength = true;
  664. }
  665. }
  666. if (!haveContentLength && !chunked && (IsHttpRequest() || HasResponseBody()) && SupportChunkedTransfer() && (keepAlive || encoder || IsHttpRequest())) {
  667. AddHeader(THttpInputHeader("Transfer-Encoding", "chunked"));
  668. chunked = true;
  669. }
  670. if (IsBodyEncodingEnabled() && chunked) {
  671. Output_ = Streams_.Add(new TChunkedOutput(Output_));
  672. }
  673. Output_ = Streams_.Add(new TTeeOutput(Output_, &SizeCalculator_));
  674. if (IsBodyEncodingEnabled() && encoder) {
  675. Output_ = Streams_.Add((*encoder)(Output_).Release());
  676. }
  677. }
  678. inline void AddHeader(const THttpInputHeader& hdr) {
  679. Headers_.AddHeader(hdr);
  680. }
  681. inline void AddOrReplaceHeader(const THttpInputHeader& hdr) {
  682. Headers_.AddOrReplaceHeader(hdr);
  683. }
  684. inline void RemoveHeader(const TString& hdr) {
  685. Headers_.RemoveHeader(hdr);
  686. }
  687. private:
  688. IOutputStream* Slave_;
  689. TState State_;
  690. IOutputStream* Output_;
  691. TStreams<IOutputStream, 8> Streams_;
  692. TString Line_;
  693. TString FirstLine_;
  694. THttpHeaders Headers_;
  695. THttpInput* Request_;
  696. size_t Version_;
  697. TArrayRef<const TStringBuf> ComprSchemas_;
  698. bool KeepAliveEnabled_;
  699. bool BodyEncodingEnabled_;
  700. bool CompressionHeaderEnabled_;
  701. bool Finished_;
  702. TSizeCalculator SizeCalculator_;
  703. };
  704. THttpOutput::THttpOutput(IOutputStream* slave)
  705. : Impl_(new TImpl(slave, nullptr))
  706. {
  707. }
  708. THttpOutput::THttpOutput(IOutputStream* slave, THttpInput* request)
  709. : Impl_(new TImpl(slave, request))
  710. {
  711. }
  712. THttpOutput::~THttpOutput() {
  713. try {
  714. Finish();
  715. } catch (...) {
  716. }
  717. }
  718. void THttpOutput::DoWrite(const void* buf, size_t len) {
  719. Impl_->Write(buf, len);
  720. }
  721. void THttpOutput::DoFlush() {
  722. Impl_->Flush();
  723. }
  724. void THttpOutput::DoFinish() {
  725. Impl_->Finish();
  726. }
  727. const THttpHeaders& THttpOutput::SentHeaders() const noexcept {
  728. return Impl_->SentHeaders();
  729. }
  730. void THttpOutput::EnableCompression(bool enable) {
  731. if (enable) {
  732. EnableCompression(TCompressionCodecFactory::Instance().GetBestCodecs());
  733. } else {
  734. TArrayRef<TStringBuf> codings;
  735. EnableCompression(codings);
  736. }
  737. }
  738. void THttpOutput::EnableCompression(TArrayRef<const TStringBuf> schemas) {
  739. Impl_->EnableCompression(schemas);
  740. }
  741. void THttpOutput::EnableKeepAlive(bool enable) {
  742. Impl_->EnableKeepAlive(enable);
  743. }
  744. void THttpOutput::EnableBodyEncoding(bool enable) {
  745. Impl_->EnableBodyEncoding(enable);
  746. }
  747. void THttpOutput::EnableCompressionHeader(bool enable) {
  748. Impl_->EnableCompressionHeader(enable);
  749. }
  750. bool THttpOutput::IsKeepAliveEnabled() const noexcept {
  751. return Impl_->IsKeepAliveEnabled();
  752. }
  753. bool THttpOutput::IsBodyEncodingEnabled() const noexcept {
  754. return Impl_->IsBodyEncodingEnabled();
  755. }
  756. bool THttpOutput::IsCompressionEnabled() const noexcept {
  757. return Impl_->IsCompressionEnabled();
  758. }
  759. bool THttpOutput::IsCompressionHeaderEnabled() const noexcept {
  760. return Impl_->IsCompressionHeaderEnabled();
  761. }
  762. bool THttpOutput::CanBeKeepAlive() const noexcept {
  763. return Impl_->CanBeKeepAlive();
  764. }
  765. void THttpOutput::SendContinue() {
  766. Impl_->SendContinue();
  767. }
  768. const TString& THttpOutput::FirstLine() const noexcept {
  769. return Impl_->FirstLine();
  770. }
  771. size_t THttpOutput::SentSize() const noexcept {
  772. return Impl_->SentSize();
  773. }
  774. unsigned ParseHttpRetCode(const TStringBuf& ret) {
  775. const TStringBuf code = StripString(StripString(ret.After(' ')).Before(' '));
  776. return FromString<unsigned>(code.data(), code.size());
  777. }
  778. void SendMinimalHttpRequest(TSocket& s, const TStringBuf& host, const TStringBuf& request, const TStringBuf& agent, const TStringBuf& from) {
  779. TSocketOutput so(s);
  780. THttpOutput output(&so);
  781. output.EnableKeepAlive(false);
  782. output.EnableCompression(false);
  783. const IOutputStream::TPart parts[] = {
  784. IOutputStream::TPart(TStringBuf("GET ")),
  785. IOutputStream::TPart(request),
  786. IOutputStream::TPart(TStringBuf(" HTTP/1.1")),
  787. IOutputStream::TPart::CrLf(),
  788. IOutputStream::TPart(TStringBuf("Host: ")),
  789. IOutputStream::TPart(host),
  790. IOutputStream::TPart::CrLf(),
  791. IOutputStream::TPart(TStringBuf("User-Agent: ")),
  792. IOutputStream::TPart(agent),
  793. IOutputStream::TPart::CrLf(),
  794. IOutputStream::TPart(TStringBuf("From: ")),
  795. IOutputStream::TPart(from),
  796. IOutputStream::TPart::CrLf(),
  797. IOutputStream::TPart::CrLf(),
  798. };
  799. output.Write(parts, sizeof(parts) / sizeof(*parts));
  800. output.Finish();
  801. }
  802. TArrayRef<const TStringBuf> SupportedCodings() {
  803. return TCompressionCodecFactory::Instance().GetBestCodecs();
  804. }