123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- #pragma once
- #include <util/system/yassert.h>
- #include <util/system/byteorder.h>
- #include <util/memory/addstorage.h>
- #include <util/generic/buffer.h>
- #include <util/generic/utility.h>
- #include <util/generic/singleton.h>
- #include <util/stream/mem.h>
- #include "error.h"
- static inline ui8 HostToLittle(ui8 t) noexcept {
- return t;
- }
- static inline ui8 LittleToHost(ui8 t) noexcept {
- return t;
- }
- struct TCommonData {
- static const size_t overhead = sizeof(ui16) + sizeof(ui8);
- };
- const size_t SIGNATURE_SIZE = 4;
- template <class TCompressor, class TBase>
- class TCompressorBase: public TAdditionalStorage<TCompressorBase<TCompressor, TBase>>, public TCompressor, public TCommonData {
- public:
- inline TCompressorBase(IOutputStream* slave, ui16 blockSize)
- : Slave_(slave)
- , BlockSize_(blockSize)
- {
- /*
- * save signature
- */
- static_assert(sizeof(TCompressor::signature) - 1 == SIGNATURE_SIZE, "expect sizeof(TCompressor::signature) - 1 == SIGNATURE_SIZE");
- Slave_->Write(TCompressor::signature, sizeof(TCompressor::signature) - 1);
- /*
- * save version
- */
- this->Save((ui32)1);
- /*
- * save block size
- */
- this->Save(BlockSize());
- }
- inline ~TCompressorBase() {
- }
- inline void Write(const char* buf, size_t len) {
- while (len) {
- const ui16 toWrite = (ui16)Min<size_t>(len, this->BlockSize());
- this->WriteBlock(buf, toWrite);
- buf += toWrite;
- len -= toWrite;
- }
- }
- inline void Flush() {
- }
- inline void Finish() {
- this->Flush();
- this->WriteBlock(nullptr, 0);
- }
- template <class T>
- static inline void Save(T t, IOutputStream* out) {
- t = HostToLittle(t);
- out->Write(&t, sizeof(t));
- }
- template <class T>
- inline void Save(T t) {
- Save(t, Slave_);
- }
- private:
- inline void* Block() const noexcept {
- return this->AdditionalData();
- }
- inline ui16 BlockSize() const noexcept {
- return BlockSize_;
- }
- inline void WriteBlock(const void* ptr, ui16 len) {
- Y_ASSERT(len <= this->BlockSize());
- ui8 compressed = false;
- if (len) {
- const size_t out = this->Compress((const char*)ptr, len, (char*)Block(), this->AdditionalDataLength());
- // catch compressor buffer overrun (e.g. SEARCH-2043)
- //Y_ABORT_UNLESS(out <= this->Hint(this->BlockSize()));
- if (out < len || TCompressor::SaveIncompressibleChunks()) {
- compressed = true;
- ptr = Block();
- len = (ui16)out;
- }
- }
- char tmp[overhead];
- TMemoryOutput header(tmp, sizeof(tmp));
- this->Save(len, &header);
- this->Save(compressed, &header);
- using TPart = IOutputStream::TPart;
- if (ptr) {
- const TPart parts[] = {
- TPart(tmp, sizeof(tmp)),
- TPart(ptr, len),
- };
- Slave_->Write(parts, sizeof(parts) / sizeof(*parts));
- } else {
- Slave_->Write(tmp, sizeof(tmp));
- }
- }
- private:
- IOutputStream* Slave_;
- const ui16 BlockSize_;
- };
- template <class T>
- static inline T GLoad(IInputStream* input) {
- T t;
- if (input->Load(&t, sizeof(t)) != sizeof(t)) {
- ythrow TDecompressorError() << "stream error";
- }
- return LittleToHost(t);
- }
- class TDecompressSignature {
- public:
- inline TDecompressSignature(IInputStream* input) {
- if (input->Load(Buffer_, SIGNATURE_SIZE) != SIGNATURE_SIZE) {
- ythrow TDecompressorError() << "can not load stream signature";
- }
- }
- template <class TDecompressor>
- inline bool Check() const {
- static_assert(sizeof(TDecompressor::signature) - 1 == SIGNATURE_SIZE, "expect sizeof(TDecompressor::signature) - 1 == SIGNATURE_SIZE");
- return memcmp(TDecompressor::signature, Buffer_, SIGNATURE_SIZE) == 0;
- }
- private:
- char Buffer_[SIGNATURE_SIZE];
- };
- template <class TDecompressor>
- static inline IInputStream* ConsumeSignature(IInputStream* input) {
- TDecompressSignature sign(input);
- if (!sign.Check<TDecompressor>()) {
- ythrow TDecompressorError() << "incorrect signature";
- }
- return input;
- }
- template <class TDecompressor>
- class TDecompressorBaseImpl: public TDecompressor, public TCommonData {
- public:
- static inline ui32 CheckVer(ui32 v) {
- if (v != 1) {
- ythrow yexception() << TStringBuf("incorrect stream version: ") << v;
- }
- return v;
- }
- inline TDecompressorBaseImpl(IInputStream* slave)
- : Slave_(slave)
- , Input_(nullptr, 0)
- , Eof_(false)
- , Version_(CheckVer(Load<ui32>()))
- , BlockSize_(Load<ui16>())
- , OutBufSize_(TDecompressor::Hint(BlockSize_))
- , Tmp_(2 * OutBufSize_)
- , In_(Tmp_.Data())
- , Out_(In_ + OutBufSize_)
- {
- this->InitFromStream(Slave_);
- }
- inline ~TDecompressorBaseImpl() {
- }
- inline size_t Read(void* buf, size_t len) {
- size_t ret = Input_.Read(buf, len);
- if (ret) {
- return ret;
- }
- if (Eof_) {
- return 0;
- }
- this->FillNextBlock();
- ret = Input_.Read(buf, len);
- if (ret) {
- return ret;
- }
- Eof_ = true;
- return 0;
- }
- inline void FillNextBlock() {
- char tmp[overhead];
- if (Slave_->Load(tmp, sizeof(tmp)) != sizeof(tmp)) {
- ythrow TDecompressorError() << "can not read block header";
- }
- TMemoryInput header(tmp, sizeof(tmp));
- const ui16 len = GLoad<ui16>(&header);
- if (len > Tmp_.Capacity()) {
- ythrow TDecompressorError() << "invalid len inside block header";
- }
- const ui8 compressed = GLoad<ui8>(&header);
- if (compressed > 1) {
- ythrow TDecompressorError() << "broken header";
- }
- if (Slave_->Load(In_, len) != len) {
- ythrow TDecompressorError() << "can not read data";
- }
- if (compressed) {
- const size_t ret = this->Decompress(In_, len, Out_, OutBufSize_);
- Input_.Reset(Out_, ret);
- } else {
- Input_.Reset(In_, len);
- }
- }
- template <class T>
- inline T Load() {
- return GLoad<T>(Slave_);
- }
- protected:
- IInputStream* Slave_;
- TMemoryInput Input_;
- bool Eof_;
- const ui32 Version_;
- const ui16 BlockSize_;
- const size_t OutBufSize_;
- TBuffer Tmp_;
- char* In_;
- char* Out_;
- };
- template <class TDecompressor, class TBase>
- class TDecompressorBase: public TDecompressorBaseImpl<TDecompressor> {
- public:
- inline TDecompressorBase(IInputStream* slave)
- : TDecompressorBaseImpl<TDecompressor>(ConsumeSignature<TDecompressor>(slave))
- {
- }
- inline ~TDecompressorBase() {
- }
- };
- #define DEF_COMPRESSOR_COMMON(rname, name) \
- rname::~rname() { \
- try { \
- Finish(); \
- } catch (...) { \
- } \
- } \
- \
- void rname::DoWrite(const void* buf, size_t len) { \
- if (!Impl_) { \
- ythrow yexception() << "can not write to finalized stream"; \
- } \
- \
- Impl_->Write((const char*)buf, len); \
- } \
- \
- void rname::DoFlush() { \
- if (!Impl_) { \
- ythrow yexception() << "can not flush finalized stream"; \
- } \
- \
- Impl_->Flush(); \
- } \
- \
- void rname::DoFinish() { \
- THolder<TImpl> impl(Impl_.Release()); \
- \
- if (impl) { \
- impl->Finish(); \
- } \
- }
- #define DEF_COMPRESSOR(rname, name) \
- class rname::TImpl: public TCompressorBase<name, TImpl> { \
- public: \
- inline TImpl(IOutputStream* out, ui16 blockSize) \
- : TCompressorBase<name, TImpl>(out, blockSize) { \
- } \
- }; \
- \
- rname::rname(IOutputStream* slave, ui16 blockSize) \
- : Impl_(new (TImpl::Hint(blockSize)) TImpl(slave, blockSize)) { \
- } \
- \
- DEF_COMPRESSOR_COMMON(rname, name)
- #define DEF_DECOMPRESSOR(rname, name) \
- class rname::TImpl: public TDecompressorBase<name, TImpl> { \
- public: \
- inline TImpl(IInputStream* in) \
- : TDecompressorBase<name, TImpl>(in) { \
- } \
- }; \
- \
- rname::rname(IInputStream* slave) \
- : Impl_(new TImpl(slave)) { \
- } \
- \
- rname::~rname() { \
- } \
- \
- size_t rname::DoRead(void* buf, size_t len) { \
- return Impl_->Read(buf, len); \
- }
- template <class T>
- struct TInputHolder {
- static inline T Set(T t) noexcept {
- return t;
- }
- };
- template <class T>
- struct TInputHolder<TAutoPtr<T>> {
- inline T* Set(TAutoPtr<T> v) noexcept {
- V_ = v;
- return V_.Get();
- }
- TAutoPtr<T> V_;
- };
- // Decompressing input streams without signature verification
- template <class TInput, class TDecompressor>
- class TLzDecompressInput: public TInputHolder<TInput>, public IInputStream {
- public:
- inline TLzDecompressInput(TInput in)
- : Impl_(this->Set(in))
- {
- }
- private:
- size_t DoRead(void* buf, size_t len) override {
- return Impl_.Read(buf, len);
- }
- private:
- TDecompressorBaseImpl<TDecompressor> Impl_;
- };
|