123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398 |
- #include "yarchive.h"
- #include <util/generic/algorithm.h>
- #include <util/generic/hash.h>
- #include <util/generic/utility.h>
- #include <util/generic/vector.h>
- #include <util/generic/yexception.h>
- #include <util/memory/blob.h>
- #include <util/memory/tempbuf.h>
- #include <util/stream/input.h>
- #include <util/stream/length.h>
- #include <util/stream/mem.h>
- #include <util/stream/output.h>
- #include <util/stream/zlib.h>
- #include <util/system/byteorder.h>
- #include <util/ysaveload.h>
- template <class T>
- static inline void ESSave(IOutputStream* out, const T& t_in) {
- T t = HostToLittle(t_in);
- out->Write((const void*)&t, sizeof(t));
- }
- static inline void ESSave(IOutputStream* out, const TString& s) {
- ESSave(out, (ui32) s.size());
- out->Write(s.data(), s.size());
- }
- template <class T>
- static inline T ESLoad(IInputStream* in) {
- T t = T();
- if (in->Load(&t, sizeof(t)) != sizeof(t)) {
- ythrow TSerializeException() << "malformed archive";
- }
- return LittleToHost(t);
- }
- template <>
- inline TString ESLoad<TString>(IInputStream* in) {
- size_t len = ESLoad<ui32>(in);
- TString ret;
- TTempBuf tmp;
- while (len) {
- const size_t toread = Min(len, tmp.Size());
- const size_t readed = in->Read(tmp.Data(), toread);
- if (!readed) {
- ythrow TSerializeException() << "malformed archive";
- }
- ret.append(tmp.Data(), readed);
- len -= readed;
- }
- return ret;
- }
- namespace {
- class TArchiveRecordDescriptor: public TSimpleRefCount<TArchiveRecordDescriptor> {
- public:
- inline TArchiveRecordDescriptor(ui64 off, ui64 len, const TString& name)
- : Off_(off)
- , Len_(len)
- , Name_(name)
- {
- }
- inline TArchiveRecordDescriptor(IInputStream* in)
- : Off_(ESLoad<ui64>(in))
- , Len_(ESLoad<ui64>(in))
- , Name_(ESLoad<TString>(in))
- {
- }
- inline ~TArchiveRecordDescriptor() = default;
- inline void SaveTo(IOutputStream* out) const {
- ESSave(out, Off_);
- ESSave(out, Len_);
- ESSave(out, Name_);
- }
- inline const TString& Name() const noexcept {
- return Name_;
- }
- inline ui64 Length() const noexcept {
- return Len_;
- }
- inline ui64 Offset() const noexcept {
- return Off_;
- }
- private:
- ui64 Off_;
- ui64 Len_;
- TString Name_;
- };
- typedef TIntrusivePtr<TArchiveRecordDescriptor> TArchiveRecordDescriptorRef;
- }
- class TArchiveWriter::TImpl {
- using TDict = THashMap<TString, TArchiveRecordDescriptorRef>;
- public:
- inline TImpl(IOutputStream& out, bool compress)
- : Off_(0)
- , Out_(&out)
- , UseCompression(compress)
- {
- }
- inline ~TImpl() = default;
- inline void Flush() {
- Out_->Flush();
- }
- inline void Finish() {
- TCountingOutput out(Out_);
- {
- TZLibCompress compress(&out);
- ESSave(&compress, (ui32)Dict_.size());
- for (const auto& kv : Dict_) {
- kv.second->SaveTo(&compress);
- }
- ESSave(&compress, static_cast<ui8>(UseCompression));
- compress.Finish();
- }
- ESSave(Out_, out.Counter());
- Out_->Flush();
- }
- inline void Add(const TString& key, IInputStream* src) {
- Y_ENSURE(!Dict_.contains(key), "key " << key.data() << " already stored");
- TCountingOutput out(Out_);
- if (UseCompression) {
- TZLibCompress compress(&out);
- TransferData(src, &compress);
- compress.Finish();
- } else {
- size_t skip_size = ArchiveWriterDefaultDataAlignment - Off_ % ArchiveWriterDefaultDataAlignment;
- if (skip_size == ArchiveWriterDefaultDataAlignment) {
- skip_size = 0;
- }
- while(skip_size > 0) {
- Out_->Write(char(0));
- Off_ += 1;
- skip_size -= 1;
- }
- TransferData(src, &out);
- out.Finish();
- }
- TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(Off_, out.Counter(), key));
- Dict_[key] = descr;
- Off_ += out.Counter();
- }
- inline void AddSynonym(const TString& existingKey, const TString& newKey) {
- Y_ENSURE(Dict_.contains(existingKey), "key " << existingKey.data() << " not stored yet");
- Y_ENSURE(!Dict_.contains(newKey), "key " << newKey.data() << " already stored");
- TArchiveRecordDescriptorRef existingDescr = Dict_[existingKey];
- TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(existingDescr->Offset(), existingDescr->Length(), newKey));
- Dict_[newKey] = descr;
- }
- private:
- ui64 Off_;
- IOutputStream* Out_;
- TDict Dict_;
- const bool UseCompression;
- };
- TArchiveWriter::TArchiveWriter(IOutputStream* out, bool compress)
- : Impl_(new TImpl(*out, compress))
- {
- }
- TArchiveWriter::~TArchiveWriter() {
- try {
- Finish();
- } catch (...) {
- }
- }
- void TArchiveWriter::Flush() {
- if (Impl_.Get()) {
- Impl_->Flush();
- }
- }
- void TArchiveWriter::Finish() {
- if (Impl_.Get()) {
- Impl_->Finish();
- Impl_.Destroy();
- }
- }
- void TArchiveWriter::Add(const TString& key, IInputStream* src) {
- Y_ENSURE(Impl_.Get(), "archive already closed");
- Impl_->Add(key, src);
- }
- void TArchiveWriter::AddSynonym(const TString& existingKey, const TString& newKey) {
- Y_ENSURE(Impl_.Get(), "archive already closed");
- Impl_->AddSynonym(existingKey, newKey);
- }
- namespace {
- class TArchiveInputStreamBase {
- public:
- inline TArchiveInputStreamBase(const TBlob& b)
- : Blob_(b)
- , Input_(b.Data(), b.Size())
- {
- }
- protected:
- TBlob Blob_;
- TMemoryInput Input_;
- };
- class TArchiveInputStream: public TArchiveInputStreamBase, public TZLibDecompress {
- public:
- inline TArchiveInputStream(const TBlob& b)
- : TArchiveInputStreamBase(b)
- , TZLibDecompress(&Input_)
- {
- }
- ~TArchiveInputStream() override = default;
- };
- }
- class TArchiveReader::TImpl {
- typedef THashMap<TString, TArchiveRecordDescriptorRef> TDict;
- public:
- inline TImpl(const TBlob& blob)
- : Blob_(blob)
- , UseDecompression(true)
- {
- ReadDict();
- }
- inline ~TImpl() = default;
- inline void ReadDict() {
- Y_ENSURE(Blob_.Size() >= sizeof(ui64), "too small blob");
- const char* end = (const char*)Blob_.End();
- const char* ptr = end - sizeof(ui64);
- ui64 dictlen = 0;
- memcpy(&dictlen, ptr, sizeof(ui64));
- dictlen = LittleToHost(dictlen);
- Y_ENSURE(dictlen <= Blob_.Size() - sizeof(ui64), "bad blob");
- const char* beg = ptr - dictlen;
- TMemoryInput mi(beg, dictlen);
- TZLibDecompress d(&mi);
- const ui32 count = ESLoad<ui32>(&d);
- for (size_t i = 0; i < count; ++i) {
- TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(&d));
- Recs_.push_back(descr);
- Dict_[descr->Name()] = descr;
- }
- Sort(Recs_.begin(), Recs_.end(), [](const auto& lhs, const auto& rhs) -> bool {
- return lhs->Offset() < rhs->Offset();
- });
- try {
- UseDecompression = static_cast<bool>(ESLoad<ui8>(&d));
- } catch (const TSerializeException&) {
- // that's ok - just old format
- UseDecompression = true;
- }
- }
- inline size_t Count() const noexcept {
- return Recs_.size();
- }
- inline TString KeyByIndex(size_t n) const {
- if (n < Count()) {
- return Recs_[n]->Name();
- }
- ythrow yexception() << "incorrect index";
- }
- inline bool Has(const TStringBuf key) const {
- return Dict_.contains(key);
- }
- inline TAutoPtr<IInputStream> ObjectByKey(const TStringBuf key) const {
- TBlob subBlob = BlobByKey(key);
- if (UseDecompression) {
- return new TArchiveInputStream(subBlob);
- } else {
- return new TMemoryInput(subBlob.Data(), subBlob.Length());
- }
- }
- inline TBlob ObjectBlobByKey(const TStringBuf key) const {
- TBlob subBlob = BlobByKey(key);
- if (UseDecompression) {
- TArchiveInputStream st(subBlob);
- return TBlob::FromStream(st);
- } else {
- return subBlob;
- }
- }
- inline TBlob BlobByKey(const TStringBuf key) const {
- const auto it = Dict_.find(key);
- Y_ENSURE(it != Dict_.end(), "key " << key.data() << " not found");
- const size_t off = it->second->Offset();
- const size_t len = it->second->Length();
- /*
- * TODO - overflow check
- */
- return Blob_.SubBlob(off, off + len);
- }
- inline bool Compressed() const {
- return UseDecompression;
- }
- private:
- TBlob Blob_;
- TVector<TArchiveRecordDescriptorRef> Recs_;
- TDict Dict_;
- bool UseDecompression;
- };
- TArchiveReader::TArchiveReader(const TBlob& data)
- : Impl_(new TImpl(data))
- {
- }
- TArchiveReader::~TArchiveReader() {}
- size_t TArchiveReader::Count() const noexcept {
- return Impl_->Count();
- }
- TString TArchiveReader::KeyByIndex(size_t n) const {
- return Impl_->KeyByIndex(n);
- }
- bool TArchiveReader::Has(const TStringBuf key) const {
- return Impl_->Has(key);
- }
- TAutoPtr<IInputStream> TArchiveReader::ObjectByKey(const TStringBuf key) const {
- return Impl_->ObjectByKey(key);
- }
- TBlob TArchiveReader::ObjectBlobByKey(const TStringBuf key) const {
- return Impl_->ObjectBlobByKey(key);
- }
- TBlob TArchiveReader::BlobByKey(const TStringBuf key) const {
- return Impl_->BlobByKey(key);
- }
- bool TArchiveReader::Compressed() const {
- return Impl_->Compressed();
- }
|