yarchive.cpp 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. #include "yarchive.h"
  2. #include <util/generic/algorithm.h>
  3. #include <util/generic/hash.h>
  4. #include <util/generic/utility.h>
  5. #include <util/generic/vector.h>
  6. #include <util/generic/yexception.h>
  7. #include <util/memory/blob.h>
  8. #include <util/memory/tempbuf.h>
  9. #include <util/stream/input.h>
  10. #include <util/stream/length.h>
  11. #include <util/stream/mem.h>
  12. #include <util/stream/output.h>
  13. #include <util/stream/zlib.h>
  14. #include <util/system/byteorder.h>
  15. #include <util/ysaveload.h>
  16. template <class T>
  17. static inline void ESSave(IOutputStream* out, const T& t_in) {
  18. T t = HostToLittle(t_in);
  19. out->Write((const void*)&t, sizeof(t));
  20. }
  21. static inline void ESSave(IOutputStream* out, const TString& s) {
  22. ESSave(out, (ui32) s.size());
  23. out->Write(s.data(), s.size());
  24. }
  25. template <class T>
  26. static inline T ESLoad(IInputStream* in) {
  27. T t = T();
  28. if (in->Load(&t, sizeof(t)) != sizeof(t)) {
  29. ythrow TSerializeException() << "malformed archive";
  30. }
  31. return LittleToHost(t);
  32. }
  33. template <>
  34. inline TString ESLoad<TString>(IInputStream* in) {
  35. size_t len = ESLoad<ui32>(in);
  36. TString ret;
  37. TTempBuf tmp;
  38. while (len) {
  39. const size_t toread = Min(len, tmp.Size());
  40. const size_t readed = in->Read(tmp.Data(), toread);
  41. if (!readed) {
  42. ythrow TSerializeException() << "malformed archive";
  43. }
  44. ret.append(tmp.Data(), readed);
  45. len -= readed;
  46. }
  47. return ret;
  48. }
  49. namespace {
  50. class TArchiveRecordDescriptor: public TSimpleRefCount<TArchiveRecordDescriptor> {
  51. public:
  52. inline TArchiveRecordDescriptor(ui64 off, ui64 len, const TString& name)
  53. : Off_(off)
  54. , Len_(len)
  55. , Name_(name)
  56. {
  57. }
  58. inline TArchiveRecordDescriptor(IInputStream* in)
  59. : Off_(ESLoad<ui64>(in))
  60. , Len_(ESLoad<ui64>(in))
  61. , Name_(ESLoad<TString>(in))
  62. {
  63. }
  64. inline ~TArchiveRecordDescriptor() = default;
  65. inline void SaveTo(IOutputStream* out) const {
  66. ESSave(out, Off_);
  67. ESSave(out, Len_);
  68. ESSave(out, Name_);
  69. }
  70. inline const TString& Name() const noexcept {
  71. return Name_;
  72. }
  73. inline ui64 Length() const noexcept {
  74. return Len_;
  75. }
  76. inline ui64 Offset() const noexcept {
  77. return Off_;
  78. }
  79. private:
  80. ui64 Off_;
  81. ui64 Len_;
  82. TString Name_;
  83. };
  84. typedef TIntrusivePtr<TArchiveRecordDescriptor> TArchiveRecordDescriptorRef;
  85. }
  86. class TArchiveWriter::TImpl {
  87. using TDict = THashMap<TString, TArchiveRecordDescriptorRef>;
  88. public:
  89. inline TImpl(IOutputStream& out, bool compress)
  90. : Off_(0)
  91. , Out_(&out)
  92. , UseCompression(compress)
  93. {
  94. }
  95. inline ~TImpl() = default;
  96. inline void Flush() {
  97. Out_->Flush();
  98. }
  99. inline void Finish() {
  100. TCountingOutput out(Out_);
  101. {
  102. TZLibCompress compress(&out);
  103. ESSave(&compress, (ui32)Dict_.size());
  104. for (const auto& kv : Dict_) {
  105. kv.second->SaveTo(&compress);
  106. }
  107. ESSave(&compress, static_cast<ui8>(UseCompression));
  108. compress.Finish();
  109. }
  110. ESSave(Out_, out.Counter());
  111. Out_->Flush();
  112. }
  113. inline void Add(const TString& key, IInputStream* src) {
  114. Y_ENSURE(!Dict_.contains(key), "key " << key.data() << " already stored");
  115. TCountingOutput out(Out_);
  116. if (UseCompression) {
  117. TZLibCompress compress(&out);
  118. TransferData(src, &compress);
  119. compress.Finish();
  120. } else {
  121. size_t skip_size = ArchiveWriterDefaultDataAlignment - Off_ % ArchiveWriterDefaultDataAlignment;
  122. if (skip_size == ArchiveWriterDefaultDataAlignment) {
  123. skip_size = 0;
  124. }
  125. while(skip_size > 0) {
  126. Out_->Write(char(0));
  127. Off_ += 1;
  128. skip_size -= 1;
  129. }
  130. TransferData(src, &out);
  131. out.Finish();
  132. }
  133. TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(Off_, out.Counter(), key));
  134. Dict_[key] = descr;
  135. Off_ += out.Counter();
  136. }
  137. inline void AddSynonym(const TString& existingKey, const TString& newKey) {
  138. Y_ENSURE(Dict_.contains(existingKey), "key " << existingKey.data() << " not stored yet");
  139. Y_ENSURE(!Dict_.contains(newKey), "key " << newKey.data() << " already stored");
  140. TArchiveRecordDescriptorRef existingDescr = Dict_[existingKey];
  141. TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(existingDescr->Offset(), existingDescr->Length(), newKey));
  142. Dict_[newKey] = descr;
  143. }
  144. private:
  145. ui64 Off_;
  146. IOutputStream* Out_;
  147. TDict Dict_;
  148. const bool UseCompression;
  149. };
  150. TArchiveWriter::TArchiveWriter(IOutputStream* out, bool compress)
  151. : Impl_(new TImpl(*out, compress))
  152. {
  153. }
  154. TArchiveWriter::~TArchiveWriter() {
  155. try {
  156. Finish();
  157. } catch (...) {
  158. }
  159. }
  160. void TArchiveWriter::Flush() {
  161. if (Impl_.Get()) {
  162. Impl_->Flush();
  163. }
  164. }
  165. void TArchiveWriter::Finish() {
  166. if (Impl_.Get()) {
  167. Impl_->Finish();
  168. Impl_.Destroy();
  169. }
  170. }
  171. void TArchiveWriter::Add(const TString& key, IInputStream* src) {
  172. Y_ENSURE(Impl_.Get(), "archive already closed");
  173. Impl_->Add(key, src);
  174. }
  175. void TArchiveWriter::AddSynonym(const TString& existingKey, const TString& newKey) {
  176. Y_ENSURE(Impl_.Get(), "archive already closed");
  177. Impl_->AddSynonym(existingKey, newKey);
  178. }
  179. namespace {
  180. class TArchiveInputStreamBase {
  181. public:
  182. inline TArchiveInputStreamBase(const TBlob& b)
  183. : Blob_(b)
  184. , Input_(b.Data(), b.Size())
  185. {
  186. }
  187. protected:
  188. TBlob Blob_;
  189. TMemoryInput Input_;
  190. };
  191. class TArchiveInputStream: public TArchiveInputStreamBase, public TZLibDecompress {
  192. public:
  193. inline TArchiveInputStream(const TBlob& b)
  194. : TArchiveInputStreamBase(b)
  195. , TZLibDecompress(&Input_)
  196. {
  197. }
  198. ~TArchiveInputStream() override = default;
  199. };
  200. }
  201. class TArchiveReader::TImpl {
  202. typedef THashMap<TString, TArchiveRecordDescriptorRef> TDict;
  203. public:
  204. inline TImpl(const TBlob& blob)
  205. : Blob_(blob)
  206. , UseDecompression(true)
  207. {
  208. ReadDict();
  209. }
  210. inline ~TImpl() = default;
  211. inline void ReadDict() {
  212. Y_ENSURE(Blob_.Size() >= sizeof(ui64), "too small blob");
  213. const char* end = (const char*)Blob_.End();
  214. const char* ptr = end - sizeof(ui64);
  215. ui64 dictlen = 0;
  216. memcpy(&dictlen, ptr, sizeof(ui64));
  217. dictlen = LittleToHost(dictlen);
  218. Y_ENSURE(dictlen <= Blob_.Size() - sizeof(ui64), "bad blob");
  219. const char* beg = ptr - dictlen;
  220. TMemoryInput mi(beg, dictlen);
  221. TZLibDecompress d(&mi);
  222. const ui32 count = ESLoad<ui32>(&d);
  223. for (size_t i = 0; i < count; ++i) {
  224. TArchiveRecordDescriptorRef descr(new TArchiveRecordDescriptor(&d));
  225. Recs_.push_back(descr);
  226. Dict_[descr->Name()] = descr;
  227. }
  228. Sort(Recs_.begin(), Recs_.end(), [](const auto& lhs, const auto& rhs) -> bool {
  229. return lhs->Offset() < rhs->Offset();
  230. });
  231. try {
  232. UseDecompression = static_cast<bool>(ESLoad<ui8>(&d));
  233. } catch (const TSerializeException&) {
  234. // that's ok - just old format
  235. UseDecompression = true;
  236. }
  237. }
  238. inline size_t Count() const noexcept {
  239. return Recs_.size();
  240. }
  241. inline TString KeyByIndex(size_t n) const {
  242. if (n < Count()) {
  243. return Recs_[n]->Name();
  244. }
  245. ythrow yexception() << "incorrect index";
  246. }
  247. inline bool Has(const TStringBuf key) const {
  248. return Dict_.contains(key);
  249. }
  250. inline TAutoPtr<IInputStream> ObjectByKey(const TStringBuf key) const {
  251. TBlob subBlob = BlobByKey(key);
  252. if (UseDecompression) {
  253. return new TArchiveInputStream(subBlob);
  254. } else {
  255. return new TMemoryInput(subBlob.Data(), subBlob.Length());
  256. }
  257. }
  258. inline TBlob ObjectBlobByKey(const TStringBuf key) const {
  259. TBlob subBlob = BlobByKey(key);
  260. if (UseDecompression) {
  261. TArchiveInputStream st(subBlob);
  262. return TBlob::FromStream(st);
  263. } else {
  264. return subBlob;
  265. }
  266. }
  267. inline TBlob BlobByKey(const TStringBuf key) const {
  268. const auto it = Dict_.find(key);
  269. Y_ENSURE(it != Dict_.end(), "key " << key.data() << " not found");
  270. const size_t off = it->second->Offset();
  271. const size_t len = it->second->Length();
  272. /*
  273. * TODO - overflow check
  274. */
  275. return Blob_.SubBlob(off, off + len);
  276. }
  277. inline bool Compressed() const {
  278. return UseDecompression;
  279. }
  280. private:
  281. TBlob Blob_;
  282. TVector<TArchiveRecordDescriptorRef> Recs_;
  283. TDict Dict_;
  284. bool UseDecompression;
  285. };
  286. TArchiveReader::TArchiveReader(const TBlob& data)
  287. : Impl_(new TImpl(data))
  288. {
  289. }
  290. TArchiveReader::~TArchiveReader() {}
  291. size_t TArchiveReader::Count() const noexcept {
  292. return Impl_->Count();
  293. }
  294. TString TArchiveReader::KeyByIndex(size_t n) const {
  295. return Impl_->KeyByIndex(n);
  296. }
  297. bool TArchiveReader::Has(const TStringBuf key) const {
  298. return Impl_->Has(key);
  299. }
  300. TAutoPtr<IInputStream> TArchiveReader::ObjectByKey(const TStringBuf key) const {
  301. return Impl_->ObjectByKey(key);
  302. }
  303. TBlob TArchiveReader::ObjectBlobByKey(const TStringBuf key) const {
  304. return Impl_->ObjectBlobByKey(key);
  305. }
  306. TBlob TArchiveReader::BlobByKey(const TStringBuf key) const {
  307. return Impl_->BlobByKey(key);
  308. }
  309. bool TArchiveReader::Compressed() const {
  310. return Impl_->Compressed();
  311. }