compressor.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. #pragma once
  2. #include <util/system/yassert.h>
  3. #include <util/system/byteorder.h>
  4. #include <util/memory/addstorage.h>
  5. #include <util/generic/buffer.h>
  6. #include <util/generic/utility.h>
  7. #include <util/generic/singleton.h>
  8. #include <util/stream/mem.h>
  9. #include "error.h"
  10. static inline ui8 HostToLittle(ui8 t) noexcept {
  11. return t;
  12. }
  13. static inline ui8 LittleToHost(ui8 t) noexcept {
  14. return t;
  15. }
  16. struct TCommonData {
  17. static const size_t overhead = sizeof(ui16) + sizeof(ui8);
  18. };
  19. const size_t SIGNATURE_SIZE = 4;
  20. template <class TCompressor, class TBase>
  21. class TCompressorBase: public TAdditionalStorage<TCompressorBase<TCompressor, TBase>>, public TCompressor, public TCommonData {
  22. public:
  23. inline TCompressorBase(IOutputStream* slave, ui16 blockSize)
  24. : Slave_(slave)
  25. , BlockSize_(blockSize)
  26. {
  27. /*
  28. * save signature
  29. */
  30. static_assert(sizeof(TCompressor::signature) - 1 == SIGNATURE_SIZE, "expect sizeof(TCompressor::signature) - 1 == SIGNATURE_SIZE");
  31. Slave_->Write(TCompressor::signature, sizeof(TCompressor::signature) - 1);
  32. /*
  33. * save version
  34. */
  35. this->Save((ui32)1);
  36. /*
  37. * save block size
  38. */
  39. this->Save(BlockSize());
  40. }
  41. inline ~TCompressorBase() {
  42. }
  43. inline void Write(const char* buf, size_t len) {
  44. while (len) {
  45. const ui16 toWrite = (ui16)Min<size_t>(len, this->BlockSize());
  46. this->WriteBlock(buf, toWrite);
  47. buf += toWrite;
  48. len -= toWrite;
  49. }
  50. }
  51. inline void Flush() {
  52. }
  53. inline void Finish() {
  54. this->Flush();
  55. this->WriteBlock(nullptr, 0);
  56. }
  57. template <class T>
  58. static inline void Save(T t, IOutputStream* out) {
  59. t = HostToLittle(t);
  60. out->Write(&t, sizeof(t));
  61. }
  62. template <class T>
  63. inline void Save(T t) {
  64. Save(t, Slave_);
  65. }
  66. private:
  67. inline void* Block() const noexcept {
  68. return this->AdditionalData();
  69. }
  70. inline ui16 BlockSize() const noexcept {
  71. return BlockSize_;
  72. }
  73. inline void WriteBlock(const void* ptr, ui16 len) {
  74. Y_ASSERT(len <= this->BlockSize());
  75. ui8 compressed = false;
  76. if (len) {
  77. const size_t out = this->Compress((const char*)ptr, len, (char*)Block(), this->AdditionalDataLength());
  78. // catch compressor buffer overrun (e.g. SEARCH-2043)
  79. //Y_ABORT_UNLESS(out <= this->Hint(this->BlockSize()));
  80. if (out < len || TCompressor::SaveIncompressibleChunks()) {
  81. compressed = true;
  82. ptr = Block();
  83. len = (ui16)out;
  84. }
  85. }
  86. char tmp[overhead];
  87. TMemoryOutput header(tmp, sizeof(tmp));
  88. this->Save(len, &header);
  89. this->Save(compressed, &header);
  90. using TPart = IOutputStream::TPart;
  91. if (ptr) {
  92. const TPart parts[] = {
  93. TPart(tmp, sizeof(tmp)),
  94. TPart(ptr, len),
  95. };
  96. Slave_->Write(parts, sizeof(parts) / sizeof(*parts));
  97. } else {
  98. Slave_->Write(tmp, sizeof(tmp));
  99. }
  100. }
  101. private:
  102. IOutputStream* Slave_;
  103. const ui16 BlockSize_;
  104. };
  105. template <class T>
  106. static inline T GLoad(IInputStream* input) {
  107. T t;
  108. if (input->Load(&t, sizeof(t)) != sizeof(t)) {
  109. ythrow TDecompressorError() << "stream error";
  110. }
  111. return LittleToHost(t);
  112. }
  113. class TDecompressSignature {
  114. public:
  115. inline TDecompressSignature(IInputStream* input) {
  116. if (input->Load(Buffer_, SIGNATURE_SIZE) != SIGNATURE_SIZE) {
  117. ythrow TDecompressorError() << "can not load stream signature";
  118. }
  119. }
  120. template <class TDecompressor>
  121. inline bool Check() const {
  122. static_assert(sizeof(TDecompressor::signature) - 1 == SIGNATURE_SIZE, "expect sizeof(TDecompressor::signature) - 1 == SIGNATURE_SIZE");
  123. return memcmp(TDecompressor::signature, Buffer_, SIGNATURE_SIZE) == 0;
  124. }
  125. private:
  126. char Buffer_[SIGNATURE_SIZE];
  127. };
  128. template <class TDecompressor>
  129. static inline IInputStream* ConsumeSignature(IInputStream* input) {
  130. TDecompressSignature sign(input);
  131. if (!sign.Check<TDecompressor>()) {
  132. ythrow TDecompressorError() << "incorrect signature";
  133. }
  134. return input;
  135. }
  136. template <class TDecompressor>
  137. class TDecompressorBaseImpl: public TDecompressor, public TCommonData {
  138. public:
  139. static inline ui32 CheckVer(ui32 v) {
  140. if (v != 1) {
  141. ythrow yexception() << TStringBuf("incorrect stream version: ") << v;
  142. }
  143. return v;
  144. }
  145. inline TDecompressorBaseImpl(IInputStream* slave)
  146. : Slave_(slave)
  147. , Input_(nullptr, 0)
  148. , Eof_(false)
  149. , Version_(CheckVer(Load<ui32>()))
  150. , BlockSize_(Load<ui16>())
  151. , OutBufSize_(TDecompressor::Hint(BlockSize_))
  152. , Tmp_(2 * OutBufSize_)
  153. , In_(Tmp_.Data())
  154. , Out_(In_ + OutBufSize_)
  155. {
  156. this->InitFromStream(Slave_);
  157. }
  158. inline ~TDecompressorBaseImpl() {
  159. }
  160. inline size_t Read(void* buf, size_t len) {
  161. size_t ret = Input_.Read(buf, len);
  162. if (ret) {
  163. return ret;
  164. }
  165. if (Eof_) {
  166. return 0;
  167. }
  168. this->FillNextBlock();
  169. ret = Input_.Read(buf, len);
  170. if (ret) {
  171. return ret;
  172. }
  173. Eof_ = true;
  174. return 0;
  175. }
  176. inline void FillNextBlock() {
  177. char tmp[overhead];
  178. if (Slave_->Load(tmp, sizeof(tmp)) != sizeof(tmp)) {
  179. ythrow TDecompressorError() << "can not read block header";
  180. }
  181. TMemoryInput header(tmp, sizeof(tmp));
  182. const ui16 len = GLoad<ui16>(&header);
  183. if (len > Tmp_.Capacity()) {
  184. ythrow TDecompressorError() << "invalid len inside block header";
  185. }
  186. const ui8 compressed = GLoad<ui8>(&header);
  187. if (compressed > 1) {
  188. ythrow TDecompressorError() << "broken header";
  189. }
  190. if (Slave_->Load(In_, len) != len) {
  191. ythrow TDecompressorError() << "can not read data";
  192. }
  193. if (compressed) {
  194. const size_t ret = this->Decompress(In_, len, Out_, OutBufSize_);
  195. Input_.Reset(Out_, ret);
  196. } else {
  197. Input_.Reset(In_, len);
  198. }
  199. }
  200. template <class T>
  201. inline T Load() {
  202. return GLoad<T>(Slave_);
  203. }
  204. protected:
  205. IInputStream* Slave_;
  206. TMemoryInput Input_;
  207. bool Eof_;
  208. const ui32 Version_;
  209. const ui16 BlockSize_;
  210. const size_t OutBufSize_;
  211. TBuffer Tmp_;
  212. char* In_;
  213. char* Out_;
  214. };
  215. template <class TDecompressor, class TBase>
  216. class TDecompressorBase: public TDecompressorBaseImpl<TDecompressor> {
  217. public:
  218. inline TDecompressorBase(IInputStream* slave)
  219. : TDecompressorBaseImpl<TDecompressor>(ConsumeSignature<TDecompressor>(slave))
  220. {
  221. }
  222. inline ~TDecompressorBase() {
  223. }
  224. };
  225. #define DEF_COMPRESSOR_COMMON(rname, name) \
  226. rname::~rname() { \
  227. try { \
  228. Finish(); \
  229. } catch (...) { \
  230. } \
  231. } \
  232. \
  233. void rname::DoWrite(const void* buf, size_t len) { \
  234. if (!Impl_) { \
  235. ythrow yexception() << "can not write to finalized stream"; \
  236. } \
  237. \
  238. Impl_->Write((const char*)buf, len); \
  239. } \
  240. \
  241. void rname::DoFlush() { \
  242. if (!Impl_) { \
  243. ythrow yexception() << "can not flush finalized stream"; \
  244. } \
  245. \
  246. Impl_->Flush(); \
  247. } \
  248. \
  249. void rname::DoFinish() { \
  250. THolder<TImpl> impl(Impl_.Release()); \
  251. \
  252. if (impl) { \
  253. impl->Finish(); \
  254. } \
  255. }
  256. #define DEF_COMPRESSOR(rname, name) \
  257. class rname::TImpl: public TCompressorBase<name, TImpl> { \
  258. public: \
  259. inline TImpl(IOutputStream* out, ui16 blockSize) \
  260. : TCompressorBase<name, TImpl>(out, blockSize) { \
  261. } \
  262. }; \
  263. \
  264. rname::rname(IOutputStream* slave, ui16 blockSize) \
  265. : Impl_(new (TImpl::Hint(blockSize)) TImpl(slave, blockSize)) { \
  266. } \
  267. \
  268. DEF_COMPRESSOR_COMMON(rname, name)
  269. #define DEF_DECOMPRESSOR(rname, name) \
  270. class rname::TImpl: public TDecompressorBase<name, TImpl> { \
  271. public: \
  272. inline TImpl(IInputStream* in) \
  273. : TDecompressorBase<name, TImpl>(in) { \
  274. } \
  275. }; \
  276. \
  277. rname::rname(IInputStream* slave) \
  278. : Impl_(new TImpl(slave)) { \
  279. } \
  280. \
  281. rname::~rname() { \
  282. } \
  283. \
  284. size_t rname::DoRead(void* buf, size_t len) { \
  285. return Impl_->Read(buf, len); \
  286. }
  287. template <class T>
  288. struct TInputHolder {
  289. static inline T Set(T t) noexcept {
  290. return t;
  291. }
  292. };
  293. template <class T>
  294. struct TInputHolder<TAutoPtr<T>> {
  295. inline T* Set(TAutoPtr<T> v) noexcept {
  296. V_ = v;
  297. return V_.Get();
  298. }
  299. TAutoPtr<T> V_;
  300. };
  301. // Decompressing input streams without signature verification
  302. template <class TInput, class TDecompressor>
  303. class TLzDecompressInput: public TInputHolder<TInput>, public IInputStream {
  304. public:
  305. inline TLzDecompressInput(TInput in)
  306. : Impl_(this->Set(in))
  307. {
  308. }
  309. private:
  310. size_t DoRead(void* buf, size_t len) override {
  311. return Impl_.Read(buf, len);
  312. }
  313. private:
  314. TDecompressorBaseImpl<TDecompressor> Impl_;
  315. };