chunk.cpp 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. #include "chunk.h"
  2. #include "headers.h"
  3. #include <util/string/cast.h>
  4. #include <util/generic/utility.h>
  5. #include <util/generic/yexception.h>
  6. static inline size_t ParseHex(const TString& s) {
  7. if (s.empty()) {
  8. ythrow yexception() << "can not parse chunk length(empty string)";
  9. }
  10. size_t ret = 0;
  11. for (TString::const_iterator c = s.begin(); c != s.end(); ++c) {
  12. const char ch = *c;
  13. if (ch >= '0' && ch <= '9') {
  14. ret *= 16;
  15. ret += ch - '0';
  16. } else if (ch >= 'a' && ch <= 'f') {
  17. ret *= 16;
  18. ret += 10 + ch - 'a';
  19. } else if (ch >= 'A' && ch <= 'F') {
  20. ret *= 16;
  21. ret += 10 + ch - 'A';
  22. } else if (ch == ';') {
  23. break;
  24. } else if (isspace(ch)) {
  25. continue;
  26. } else {
  27. ythrow yexception() << "can not parse chunk length(" << s.data() << ")";
  28. }
  29. }
  30. return ret;
  31. }
  32. static inline char* ToHex(size_t len, char* buf) {
  33. do {
  34. const size_t val = len % 16;
  35. *--buf = (val < 10) ? (val + '0') : (val - 10 + 'a');
  36. len /= 16;
  37. } while (len);
  38. return buf;
  39. }
  40. class TChunkedInput::TImpl {
  41. public:
  42. inline TImpl(IInputStream* slave, TMaybe<THttpHeaders>* trailers)
  43. : Slave_(slave)
  44. , Trailers_(trailers)
  45. , Pending_(0)
  46. , LastChunkReaded_(false)
  47. {
  48. if (Trailers_) {
  49. Trailers_->Clear();
  50. }
  51. }
  52. inline ~TImpl() {
  53. }
  54. inline size_t Read(void* buf, size_t len) {
  55. return Perform(len, [this, buf](size_t toRead) { return Slave_->Read(buf, toRead); });
  56. }
  57. inline size_t Skip(size_t len) {
  58. return Perform(len, [this](size_t toSkip) { return Slave_->Skip(toSkip); });
  59. }
  60. private:
  61. template <class Operation>
  62. inline size_t Perform(size_t len, const Operation& operation) {
  63. if (!HavePendingData()) {
  64. return 0;
  65. }
  66. const size_t toProcess = Min(Pending_, len);
  67. if (toProcess) {
  68. const size_t processed = operation(toProcess);
  69. if (!processed) {
  70. ythrow yexception() << "malformed http chunk";
  71. }
  72. Pending_ -= processed;
  73. return processed;
  74. }
  75. return 0;
  76. }
  77. inline bool HavePendingData() {
  78. if (LastChunkReaded_) {
  79. return false;
  80. }
  81. if (!Pending_) {
  82. if (!ProceedToNextChunk()) {
  83. return false;
  84. }
  85. }
  86. return true;
  87. }
  88. inline bool ProceedToNextChunk() {
  89. TString len(Slave_->ReadLine());
  90. if (len.empty()) {
  91. /*
  92. * skip crlf from previous chunk
  93. */
  94. len = Slave_->ReadLine();
  95. }
  96. Pending_ = ParseHex(len);
  97. if (Pending_) {
  98. return true;
  99. }
  100. if (Trailers_) {
  101. Trailers_->ConstructInPlace(Slave_);
  102. }
  103. LastChunkReaded_ = true;
  104. return false;
  105. }
  106. private:
  107. IInputStream* Slave_;
  108. TMaybe<THttpHeaders>* Trailers_;
  109. size_t Pending_;
  110. bool LastChunkReaded_;
  111. };
  112. TChunkedInput::TChunkedInput(IInputStream* slave, TMaybe<THttpHeaders>* trailers)
  113. : Impl_(new TImpl(slave, trailers))
  114. {
  115. }
  116. TChunkedInput::~TChunkedInput() {
  117. }
  118. size_t TChunkedInput::DoRead(void* buf, size_t len) {
  119. return Impl_->Read(buf, len);
  120. }
  121. size_t TChunkedInput::DoSkip(size_t len) {
  122. return Impl_->Skip(len);
  123. }
  124. class TChunkedOutput::TImpl {
  125. typedef IOutputStream::TPart TPart;
  126. public:
  127. inline TImpl(IOutputStream* slave)
  128. : Slave_(slave)
  129. {
  130. }
  131. inline ~TImpl() {
  132. }
  133. inline void Write(const void* buf, size_t len) {
  134. const char* ptr = (const char*)buf;
  135. while (len) {
  136. const size_t portion = Min<size_t>(len, 1024 * 16);
  137. WriteImpl(ptr, portion);
  138. ptr += portion;
  139. len -= portion;
  140. }
  141. }
  142. inline void WriteImpl(const void* buf, size_t len) {
  143. char tmp[32];
  144. char* e = tmp + sizeof(tmp);
  145. char* b = ToHex(len, e);
  146. const TPart parts[] = {
  147. TPart(b, e - b),
  148. TPart::CrLf(),
  149. TPart(buf, len),
  150. TPart::CrLf(),
  151. };
  152. Slave_->Write(parts, sizeof(parts) / sizeof(*parts));
  153. }
  154. inline void Flush() {
  155. Slave_->Flush();
  156. }
  157. inline void Finish() {
  158. Slave_->Write("0\r\n\r\n", 5);
  159. Flush();
  160. }
  161. private:
  162. IOutputStream* Slave_;
  163. };
  164. TChunkedOutput::TChunkedOutput(IOutputStream* slave)
  165. : Impl_(new TImpl(slave))
  166. {
  167. }
  168. TChunkedOutput::~TChunkedOutput() {
  169. try {
  170. Finish();
  171. } catch (...) {
  172. }
  173. }
  174. void TChunkedOutput::DoWrite(const void* buf, size_t len) {
  175. if (Impl_.Get()) {
  176. Impl_->Write(buf, len);
  177. } else {
  178. ythrow yexception() << "can not write to finished stream";
  179. }
  180. }
  181. void TChunkedOutput::DoFlush() {
  182. if (Impl_.Get()) {
  183. Impl_->Flush();
  184. }
  185. }
  186. void TChunkedOutput::DoFinish() {
  187. if (Impl_.Get()) {
  188. Impl_->Finish();
  189. Impl_.Destroy();
  190. }
  191. }