#include "yarchive.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include template static inline void ESSave(IOutputStream* out, const T& t_in) { T t = HostToLittle(t_in); out->Write((const void*)&t, sizeof(t)); } static inline void ESSave(IOutputStream* out, const TString& s) { ESSave(out, (ui32) s.size()); out->Write(s.data(), s.size()); } template static inline T ESLoad(IInputStream* in) { T t = T(); if (in->Load(&t, sizeof(t)) != sizeof(t)) { ythrow TSerializeException() << "malformed archive"; } return LittleToHost(t); } template <> inline TString ESLoad(IInputStream* in) { size_t len = ESLoad(in); TString ret; TTempBuf tmp; while (len) { const size_t toread = Min(len, tmp.Size()); const size_t readed = in->Read(tmp.Data(), toread); if (!readed) { ythrow TSerializeException() << "malformed archive"; } ret.append(tmp.Data(), readed); len -= readed; } return ret; } namespace { class TArchiveRecordDescriptor: public TSimpleRefCount { public: inline TArchiveRecordDescriptor(ui64 off, ui64 len, const TString& name) : Off_(off) , Len_(len) , Name_(name) { } inline TArchiveRecordDescriptor(IInputStream* in) : Off_(ESLoad(in)) , Len_(ESLoad(in)) , Name_(ESLoad(in)) { } inline ~TArchiveRecordDescriptor() = default; inline void SaveTo(IOutputStream* out) const { ESSave(out, Off_); ESSave(out, Len_); ESSave(out, Name_); } inline const TString& Name() const noexcept { return Name_; } inline ui64 Length() const noexcept { return Len_; } inline ui64 Offset() const noexcept { return Off_; } private: ui64 Off_; ui64 Len_; TString Name_; }; typedef TIntrusivePtr TArchiveRecordDescriptorRef; } class TArchiveWriter::TImpl { using TDict = THashMap; public: inline TImpl(IOutputStream& out, bool compress) : Off_(0) , Out_(&out) , UseCompression(compress) { } inline ~TImpl() = default; inline void Flush() { Out_->Flush(); } inline void Finish() { TCountingOutput out(Out_); { TZLibCompress compress(&out); ESSave(&compress, (ui32)Dict_.size()); for (const auto& kv : Dict_) { kv.second->SaveTo(&compress); } ESSave(&compress, static_cast(UseCompression)); compress.Finish(); } ESSave(Out_, out.Counter()); Out_->Flush(); } inline void Add(const TString& key, IInputStream* src) { Y_ENSURE(!Dict_.contains(key), "key " << key.data() << " already stored"); TCountingOutput out(Out_); if (UseCompression) { TZLibCompress compress(&out); TransferData(src, &compress); compress.Finish(); } else { size_t skip_size = ArchiveWriterDefaultDataAlignment - Off_ % ArchiveWriterDefaultDataAlignment; if (skip_size == ArchiveWriterDefaultDataAlignment) { skip_size = 0; } while(skip_size > 0) { Out_->Write(char(0)); Off_ += 1; skip_size -= 1; } TransferData(src, &out); out.Finish(); } TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(Off_, out.Counter(), key)); Dict_[key] = descr; Off_ += out.Counter(); } inline void AddSynonym(const TString& existingKey, const TString& newKey) { Y_ENSURE(Dict_.contains(existingKey), "key " << existingKey.data() << " not stored yet"); Y_ENSURE(!Dict_.contains(newKey), "key " << newKey.data() << " already stored"); TArchiveRecordDescriptorRef existingDescr = Dict_[existingKey]; TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(existingDescr->Offset(), existingDescr->Length(), newKey)); Dict_[newKey] = descr; } private: ui64 Off_; IOutputStream* Out_; TDict Dict_; const bool UseCompression; }; TArchiveWriter::TArchiveWriter(IOutputStream* out, bool compress) : Impl_(new TImpl(*out, compress)) { } TArchiveWriter::~TArchiveWriter() { try { Finish(); } catch (...) { } } void TArchiveWriter::Flush() { if (Impl_.Get()) { Impl_->Flush(); } } void TArchiveWriter::Finish() { if (Impl_.Get()) { Impl_->Finish(); Impl_.Destroy(); } } void TArchiveWriter::Add(const TString& key, IInputStream* src) { Y_ENSURE(Impl_.Get(), "archive already closed"); Impl_->Add(key, src); } void TArchiveWriter::AddSynonym(const TString& existingKey, const TString& newKey) { Y_ENSURE(Impl_.Get(), "archive already closed"); Impl_->AddSynonym(existingKey, newKey); } namespace { class TArchiveInputStreamBase { public: inline TArchiveInputStreamBase(const TBlob& b) : Blob_(b) , Input_(b.Data(), b.Size()) { } protected: TBlob Blob_; TMemoryInput Input_; }; class TArchiveInputStream: public TArchiveInputStreamBase, public TZLibDecompress { public: inline TArchiveInputStream(const TBlob& b) : TArchiveInputStreamBase(b) , TZLibDecompress(&Input_) { } ~TArchiveInputStream() override = default; }; } class TArchiveReader::TImpl { typedef THashMap TDict; public: inline TImpl(const TBlob& blob) : Blob_(blob) , UseDecompression(true) { ReadDict(); } inline ~TImpl() = default; inline void ReadDict() { Y_ENSURE(Blob_.Size() >= sizeof(ui64), "too small blob"); const char* end = (const char*)Blob_.End(); const char* ptr = end - sizeof(ui64); ui64 dictlen = 0; memcpy(&dictlen, ptr, sizeof(ui64)); dictlen = LittleToHost(dictlen); Y_ENSURE(dictlen <= Blob_.Size() - sizeof(ui64), "bad blob"); const char* beg = ptr - dictlen; TMemoryInput mi(beg, dictlen); TZLibDecompress d(&mi); const ui32 count = ESLoad(&d); for (size_t i = 0; i < count; ++i) { TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(&d)); Recs_.push_back(descr); Dict_[descr->Name()] = descr; } Sort(Recs_.begin(), Recs_.end(), [](const auto& lhs, const auto& rhs) -> bool { return lhs->Offset() < rhs->Offset(); }); try { UseDecompression = static_cast(ESLoad(&d)); } catch (const TSerializeException&) { // that's ok - just old format UseDecompression = true; } } inline size_t Count() const noexcept { return Recs_.size(); } inline TString KeyByIndex(size_t n) const { if (n < Count()) { return Recs_[n]->Name(); } ythrow yexception() << "incorrect index"; } inline bool Has(const TStringBuf key) const { return Dict_.contains(key); } inline TAutoPtr ObjectByKey(const TStringBuf key) const { TBlob subBlob = BlobByKey(key); if (UseDecompression) { return new TArchiveInputStream(subBlob); } else { return new TMemoryInput(subBlob.Data(), subBlob.Length()); } } inline TBlob ObjectBlobByKey(const TStringBuf key) const { TBlob subBlob = BlobByKey(key); if (UseDecompression) { TArchiveInputStream st(subBlob); return TBlob::FromStream(st); } else { return subBlob; } } inline TBlob BlobByKey(const TStringBuf key) const { const auto it = Dict_.find(key); Y_ENSURE(it != Dict_.end(), "key " << key.data() << " not found"); const size_t off = it->second->Offset(); const size_t len = it->second->Length(); /* * TODO - overflow check */ return Blob_.SubBlob(off, off + len); } inline bool Compressed() const { return UseDecompression; } private: TBlob Blob_; TVector Recs_; TDict Dict_; bool UseDecompression; }; TArchiveReader::TArchiveReader(const TBlob& data) : Impl_(new TImpl(data)) { } TArchiveReader::~TArchiveReader() {} size_t TArchiveReader::Count() const noexcept { return Impl_->Count(); } TString TArchiveReader::KeyByIndex(size_t n) const { return Impl_->KeyByIndex(n); } bool TArchiveReader::Has(const TStringBuf key) const { return Impl_->Has(key); } TAutoPtr TArchiveReader::ObjectByKey(const TStringBuf key) const { return Impl_->ObjectByKey(key); } TBlob TArchiveReader::ObjectBlobByKey(const TStringBuf key) const { return Impl_->ObjectBlobByKey(key); } TBlob TArchiveReader::BlobByKey(const TStringBuf key) const { return Impl_->BlobByKey(key); } bool TArchiveReader::Compressed() const { return Impl_->Compressed(); }