123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- #include "chunk.h"
- #include "headers.h"
- #include <util/string/cast.h>
- #include <util/generic/utility.h>
- #include <util/generic/yexception.h>
- static inline size_t ParseHex(const TString& s) {
- if (s.empty()) {
- ythrow yexception() << "can not parse chunk length(empty string)";
- }
- size_t ret = 0;
- for (TString::const_iterator c = s.begin(); c != s.end(); ++c) {
- const char ch = *c;
- if (ch >= '0' && ch <= '9') {
- ret *= 16;
- ret += ch - '0';
- } else if (ch >= 'a' && ch <= 'f') {
- ret *= 16;
- ret += 10 + ch - 'a';
- } else if (ch >= 'A' && ch <= 'F') {
- ret *= 16;
- ret += 10 + ch - 'A';
- } else if (ch == ';') {
- break;
- } else if (isspace(ch)) {
- continue;
- } else {
- ythrow yexception() << "can not parse chunk length(" << s.data() << ")";
- }
- }
- return ret;
- }
- static inline char* ToHex(size_t len, char* buf) {
- do {
- const size_t val = len % 16;
- *--buf = (val < 10) ? (val + '0') : (val - 10 + 'a');
- len /= 16;
- } while (len);
- return buf;
- }
- class TChunkedInput::TImpl {
- public:
- inline TImpl(IInputStream* slave, TMaybe<THttpHeaders>* trailers)
- : Slave_(slave)
- , Trailers_(trailers)
- , Pending_(0)
- , LastChunkReaded_(false)
- {
- if (Trailers_) {
- Trailers_->Clear();
- }
- }
- inline ~TImpl() {
- }
- inline size_t Read(void* buf, size_t len) {
- return Perform(len, [this, buf](size_t toRead) { return Slave_->Read(buf, toRead); });
- }
- inline size_t Skip(size_t len) {
- return Perform(len, [this](size_t toSkip) { return Slave_->Skip(toSkip); });
- }
- private:
- template <class Operation>
- inline size_t Perform(size_t len, const Operation& operation) {
- if (!HavePendingData()) {
- return 0;
- }
- const size_t toProcess = Min(Pending_, len);
- if (toProcess) {
- const size_t processed = operation(toProcess);
- if (!processed) {
- ythrow yexception() << "malformed http chunk";
- }
- Pending_ -= processed;
- return processed;
- }
- return 0;
- }
- inline bool HavePendingData() {
- if (LastChunkReaded_) {
- return false;
- }
- if (!Pending_) {
- if (!ProceedToNextChunk()) {
- return false;
- }
- }
- return true;
- }
- inline bool ProceedToNextChunk() {
- TString len(Slave_->ReadLine());
- if (len.empty()) {
- /*
- * skip crlf from previous chunk
- */
- len = Slave_->ReadLine();
- }
- Pending_ = ParseHex(len);
- if (Pending_) {
- return true;
- }
- if (Trailers_) {
- Trailers_->ConstructInPlace(Slave_);
- }
- LastChunkReaded_ = true;
- return false;
- }
- private:
- IInputStream* Slave_;
- TMaybe<THttpHeaders>* Trailers_;
- size_t Pending_;
- bool LastChunkReaded_;
- };
- TChunkedInput::TChunkedInput(IInputStream* slave, TMaybe<THttpHeaders>* trailers)
- : Impl_(new TImpl(slave, trailers))
- {
- }
- TChunkedInput::~TChunkedInput() {
- }
- size_t TChunkedInput::DoRead(void* buf, size_t len) {
- return Impl_->Read(buf, len);
- }
- size_t TChunkedInput::DoSkip(size_t len) {
- return Impl_->Skip(len);
- }
- class TChunkedOutput::TImpl {
- typedef IOutputStream::TPart TPart;
- public:
- inline TImpl(IOutputStream* slave)
- : Slave_(slave)
- {
- }
- inline ~TImpl() {
- }
- inline void Write(const void* buf, size_t len) {
- const char* ptr = (const char*)buf;
- while (len) {
- const size_t portion = Min<size_t>(len, 1024 * 16);
- WriteImpl(ptr, portion);
- ptr += portion;
- len -= portion;
- }
- }
- inline void WriteImpl(const void* buf, size_t len) {
- char tmp[32];
- char* e = tmp + sizeof(tmp);
- char* b = ToHex(len, e);
- const TPart parts[] = {
- TPart(b, e - b),
- TPart::CrLf(),
- TPart(buf, len),
- TPart::CrLf(),
- };
- Slave_->Write(parts, sizeof(parts) / sizeof(*parts));
- }
- inline void Flush() {
- Slave_->Flush();
- }
- inline void Finish() {
- Slave_->Write("0\r\n\r\n", 5);
- Flush();
- }
- private:
- IOutputStream* Slave_;
- };
- TChunkedOutput::TChunkedOutput(IOutputStream* slave)
- : Impl_(new TImpl(slave))
- {
- }
- TChunkedOutput::~TChunkedOutput() {
- try {
- Finish();
- } catch (...) {
- }
- }
- void TChunkedOutput::DoWrite(const void* buf, size_t len) {
- if (Impl_.Get()) {
- Impl_->Write(buf, len);
- } else {
- ythrow yexception() << "can not write to finished stream";
- }
- }
- void TChunkedOutput::DoFlush() {
- if (Impl_.Get()) {
- Impl_->Flush();
- }
- }
- void TChunkedOutput::DoFinish() {
- if (Impl_.Get()) {
- Impl_->Finish();
- Impl_.Destroy();
- }
- }
|