#include "chunk.h" #include "headers.h" #include #include #include static inline size_t ParseHex(const TString& s) { if (s.empty()) { ythrow yexception() << "can not parse chunk length(empty string)"; } size_t ret = 0; for (TString::const_iterator c = s.begin(); c != s.end(); ++c) { const char ch = *c; if (ch >= '0' && ch <= '9') { ret *= 16; ret += ch - '0'; } else if (ch >= 'a' && ch <= 'f') { ret *= 16; ret += 10 + ch - 'a'; } else if (ch >= 'A' && ch <= 'F') { ret *= 16; ret += 10 + ch - 'A'; } else if (ch == ';') { break; } else if (isspace(ch)) { continue; } else { ythrow yexception() << "can not parse chunk length(" << s.data() << ")"; } } return ret; } static inline char* ToHex(size_t len, char* buf) { do { const size_t val = len % 16; *--buf = (val < 10) ? (val + '0') : (val - 10 + 'a'); len /= 16; } while (len); return buf; } class TChunkedInput::TImpl { public: inline TImpl(IInputStream* slave, TMaybe* trailers) : Slave_(slave) , Trailers_(trailers) , Pending_(0) , LastChunkReaded_(false) { if (Trailers_) { Trailers_->Clear(); } } inline ~TImpl() { } inline size_t Read(void* buf, size_t len) { return Perform(len, [this, buf](size_t toRead) { return Slave_->Read(buf, toRead); }); } inline size_t Skip(size_t len) { return Perform(len, [this](size_t toSkip) { return Slave_->Skip(toSkip); }); } private: template inline size_t Perform(size_t len, const Operation& operation) { if (!HavePendingData()) { return 0; } const size_t toProcess = Min(Pending_, len); if (toProcess) { const size_t processed = operation(toProcess); if (!processed) { ythrow yexception() << "malformed http chunk"; } Pending_ -= processed; return processed; } return 0; } inline bool HavePendingData() { if (LastChunkReaded_) { return false; } if (!Pending_) { if (!ProceedToNextChunk()) { return false; } } return true; } inline bool ProceedToNextChunk() { TString len(Slave_->ReadLine()); if (len.empty()) { /* * skip crlf from previous chunk */ len = Slave_->ReadLine(); } Pending_ = ParseHex(len); if (Pending_) { return true; } if (Trailers_) { Trailers_->ConstructInPlace(Slave_); } LastChunkReaded_ = true; return false; } private: IInputStream* Slave_; TMaybe* Trailers_; size_t Pending_; bool LastChunkReaded_; }; TChunkedInput::TChunkedInput(IInputStream* slave, TMaybe* trailers) : Impl_(new TImpl(slave, trailers)) { } TChunkedInput::~TChunkedInput() { } size_t TChunkedInput::DoRead(void* buf, size_t len) { return Impl_->Read(buf, len); } size_t TChunkedInput::DoSkip(size_t len) { return Impl_->Skip(len); } class TChunkedOutput::TImpl { typedef IOutputStream::TPart TPart; public: inline TImpl(IOutputStream* slave) : Slave_(slave) { } inline ~TImpl() { } inline void Write(const void* buf, size_t len) { const char* ptr = (const char*)buf; while (len) { const size_t portion = Min(len, 1024 * 16); WriteImpl(ptr, portion); ptr += portion; len -= portion; } } inline void WriteImpl(const void* buf, size_t len) { char tmp[32]; char* e = tmp + sizeof(tmp); char* b = ToHex(len, e); const TPart parts[] = { TPart(b, e - b), TPart::CrLf(), TPart(buf, len), TPart::CrLf(), }; Slave_->Write(parts, sizeof(parts) / sizeof(*parts)); } inline void Flush() { Slave_->Flush(); } inline void Finish() { Slave_->Write("0\r\n\r\n", 5); Flush(); } private: IOutputStream* Slave_; }; TChunkedOutput::TChunkedOutput(IOutputStream* slave) : Impl_(new TImpl(slave)) { } TChunkedOutput::~TChunkedOutput() { try { Finish(); } catch (...) { } } void TChunkedOutput::DoWrite(const void* buf, size_t len) { if (Impl_.Get()) { Impl_->Write(buf, len); } else { ythrow yexception() << "can not write to finished stream"; } } void TChunkedOutput::DoFlush() { if (Impl_.Get()) { Impl_->Flush(); } } void TChunkedOutput::DoFinish() { if (Impl_.Get()) { Impl_->Finish(); Impl_.Destroy(); } }