compression.cpp 14 KB


  1. #include "compression.h"
  2. #include <util/generic/buffer.h>
  3. #include <util/generic/cast.h>
  4. #include <util/generic/ptr.h>
  5. #include <util/generic/scope.h>
  6. #include <util/generic/size_literals.h>
  7. #include <util/stream/format.h>
  8. #include <util/stream/output.h>
  9. #include <util/stream/walk.h>
  10. #include <contrib/libs/lz4/lz4.h>
  11. #include <contrib/libs/xxhash/xxhash.h>
  12. #include <zlib.h>
  13. #define ZSTD_STATIC_LINKING_ONLY
  14. #include <contrib/libs/zstd/include/zstd.h>
  15. namespace NMonitoring {
  16. namespace {
  17. ///////////////////////////////////////////////////////////////////////////////
  18. // Frame
  19. ///////////////////////////////////////////////////////////////////////////////
  20. using TCompressedSize = ui32;
  21. using TUncompressedSize = ui32;
  22. using TCheckSum = ui32;
  23. constexpr size_t COMPRESSED_FRAME_SIZE_LIMIT = 512_KB;
  24. constexpr size_t UNCOMPRESSED_FRAME_SIZE_LIMIT = COMPRESSED_FRAME_SIZE_LIMIT;
  25. constexpr size_t FRAME_SIZE_LIMIT = 2_MB;
  26. constexpr size_t DEFAULT_FRAME_LEN = 64_KB;
  27. struct Y_PACKED TFrameHeader {
  28. TCompressedSize CompressedSize;
  29. TUncompressedSize UncompressedSize;
  30. };
  31. struct Y_PACKED TFrameFooter {
  32. TCheckSum CheckSum;
  33. };
  34. ///////////////////////////////////////////////////////////////////////////////
  35. // TBlock
  36. ///////////////////////////////////////////////////////////////////////////////
  37. struct TBlock: public TStringBuf {
  38. template <typename T>
  39. TBlock(T&& t)
  40. : TStringBuf(t.data(), t.size())
  41. {
  42. Y_ENSURE(t.data() != nullptr);
  43. }
  44. char* data() noexcept {
  45. return const_cast<char*>(TStringBuf::data());
  46. }
  47. };
  48. ///////////////////////////////////////////////////////////////////////////////
  49. // XXHASH
  50. ///////////////////////////////////////////////////////////////////////////////
  51. struct TXxHash32 {
  52. static TCheckSum Calc(TBlock in) {
  53. static const ui32 SEED = 0x1337c0de;
  54. return XXH32(in.data(), in.size(), SEED);
  55. }
  56. static bool Check(TBlock in, TCheckSum checksum) {
  57. return Calc(in) == checksum;
  58. }
  59. };
  60. ///////////////////////////////////////////////////////////////////////////////
  61. // Adler32
  62. ///////////////////////////////////////////////////////////////////////////////
  63. struct TAdler32 {
  64. static TCheckSum Calc(TBlock in) {
  65. return adler32(1L, reinterpret_cast<const Bytef*>(in.data()), in.size());
  66. }
  67. static bool Check(TBlock in, TCheckSum checksum) {
  68. return Calc(in) == checksum;
  69. }
  70. };
  71. ///////////////////////////////////////////////////////////////////////////////
  72. // LZ4
  73. ///////////////////////////////////////////////////////////////////////////////
  74. struct TLz4Codec {
  75. static size_t MaxCompressedLength(size_t in) {
  76. int result = LZ4_compressBound(static_cast<int>(in));
  77. Y_ENSURE(result != 0, "lz4 input size is too large");
  78. return result;
  79. }
  80. static size_t Compress(TBlock in, TBlock out) {
  81. int rc = LZ4_compress_default(
  82. in.data(),
  83. out.data(),
  84. SafeIntegerCast<int>(in.size()),
  85. SafeIntegerCast<int>(out.size()));
  86. Y_ENSURE(rc != 0, "lz4 compression failed");
  87. return rc;
  88. }
  89. static void Decompress(TBlock in, TBlock out) {
  90. int rc = LZ4_decompress_safe(
  91. in.data(),
  92. out.data(),
  93. SafeIntegerCast<int>(in.size()),
  94. SafeIntegerCast<int>(out.size()));
  95. Y_ENSURE(rc >= 0, "the lz4 stream is detected malformed");
  96. }
  97. };
  98. ///////////////////////////////////////////////////////////////////////////////
  99. // ZSTD
  100. ///////////////////////////////////////////////////////////////////////////////
  101. struct TZstdCodec {
  102. static const int LEVEL = 11;
  103. static size_t MaxCompressedLength(size_t in) {
  104. return ZSTD_compressBound(in);
  105. }
  106. static size_t Compress(TBlock in, TBlock out) {
  107. size_t rc = ZSTD_compress(out.data(), out.size(), in.data(), in.size(), LEVEL);
  108. if (Y_UNLIKELY(ZSTD_isError(rc))) {
  109. ythrow yexception() << TStringBuf("zstd compression failed: ")
  110. << ZSTD_getErrorName(rc);
  111. }
  112. return rc;
  113. }
  114. static void Decompress(TBlock in, TBlock out) {
  115. size_t rc = ZSTD_decompress(out.data(), out.size(), in.data(), in.size());
  116. if (Y_UNLIKELY(ZSTD_isError(rc))) {
  117. ythrow yexception() << TStringBuf("zstd decompression failed: ")
  118. << ZSTD_getErrorName(rc);
  119. }
  120. Y_ENSURE(rc == out.size(), "zstd decompressed wrong size");
  121. }
  122. };
  123. ///////////////////////////////////////////////////////////////////////////////
  124. // ZLIB
  125. ///////////////////////////////////////////////////////////////////////////////
  126. struct TZlibCodec {
  127. static const int LEVEL = 6;
  128. static size_t MaxCompressedLength(size_t in) {
  129. return compressBound(in);
  130. }
  131. static size_t Compress(TBlock in, TBlock out) {
  132. uLong ret = out.size();
  133. int rc = compress2(
  134. reinterpret_cast<Bytef*>(out.data()),
  135. &ret,
  136. reinterpret_cast<const Bytef*>(in.data()),
  137. in.size(),
  138. LEVEL);
  139. Y_ENSURE(rc == Z_OK, "zlib compression failed");
  140. return ret;
  141. }
  142. static void Decompress(TBlock in, TBlock out) {
  143. uLong ret = out.size();
  144. int rc = uncompress(
  145. reinterpret_cast<Bytef*>(out.data()),
  146. &ret,
  147. reinterpret_cast<const Bytef*>(in.data()),
  148. in.size());
  149. Y_ENSURE(rc == Z_OK, "zlib decompression failed");
  150. Y_ENSURE(ret == out.size(), "zlib decompressed wrong size");
  151. }
  152. };
  153. //
  154. // Framed streams use next frame structure:
  155. //
  156. // +-----------------+-------------------+============+------------------+
  157. // | compressed size | uncompressed size | data | check sum |
  158. // +-----------------+-------------------+============+------------------+
  159. // 4 bytes 4 bytes var len 4 bytes
  160. //
  161. ///////////////////////////////////////////////////////////////////////////////
  162. // TFramedInputStream
  163. ///////////////////////////////////////////////////////////////////////////////
  164. template <typename TCodecAlg, typename TCheckSumAlg>
  165. class TFramedDecompressStream final: public IWalkInput {
  166. public:
  167. explicit TFramedDecompressStream(IInputStream* in)
  168. : In_(in)
  169. {
  170. }
  171. private:
  172. size_t DoUnboundedNext(const void** ptr) override {
  173. if (!In_) {
  174. return 0;
  175. }
  176. TFrameHeader header;
  177. In_->LoadOrFail(&header, sizeof(header));
  178. if (header.CompressedSize == 0) {
  179. In_ = nullptr;
  180. return 0;
  181. }
  182. Y_ENSURE(header.CompressedSize <= COMPRESSED_FRAME_SIZE_LIMIT, "Compressed frame size is limited to "
  183. << HumanReadableSize(COMPRESSED_FRAME_SIZE_LIMIT, SF_BYTES)
  184. << " but is " << HumanReadableSize(header.CompressedSize, SF_BYTES));
  185. Y_ENSURE(header.UncompressedSize <= UNCOMPRESSED_FRAME_SIZE_LIMIT, "Uncompressed frame size is limited to "
  186. << HumanReadableSize(UNCOMPRESSED_FRAME_SIZE_LIMIT, SF_BYTES)
  187. << " but is " << HumanReadableSize(header.UncompressedSize, SF_BYTES));
  188. Compressed_.Resize(header.CompressedSize);
  189. In_->LoadOrFail(Compressed_.Data(), header.CompressedSize);
  190. TFrameFooter footer;
  191. In_->LoadOrFail(&footer, sizeof(footer));
  192. Y_ENSURE(TCheckSumAlg::Check(Compressed_, footer.CheckSum),
  193. "corrupted stream: check sum mismatch");
  194. Uncompressed_.Resize(header.UncompressedSize);
  195. TCodecAlg::Decompress(Compressed_, Uncompressed_);
  196. *ptr = Uncompressed_.Data();
  197. return Uncompressed_.Size();
  198. }
  199. private:
  200. IInputStream* In_;
  201. TBuffer Compressed_;
  202. TBuffer Uncompressed_;
  203. };
  204. ///////////////////////////////////////////////////////////////////////////////
  205. // TFramedOutputStream
  206. ///////////////////////////////////////////////////////////////////////////////
  207. template <typename TCodecAlg, typename TCheckSumAlg>
  208. class TFramedCompressStream final: public IFramedCompressStream {
  209. public:
  210. explicit TFramedCompressStream(IOutputStream* out)
  211. : Out_(out)
  212. , Uncompressed_(DEFAULT_FRAME_LEN)
  213. {
  214. }
  215. ~TFramedCompressStream() override {
  216. try {
  217. Finish();
  218. } catch (...) {
  219. }
  220. }
  221. private:
  222. void DoWrite(const void* buf, size_t len) override {
  223. const char* in = static_cast<const char*>(buf);
  224. while (len != 0) {
  225. const size_t avail = Uncompressed_.Avail();
  226. if (len < avail) {
  227. Uncompressed_.Append(in, len);
  228. return;
  229. }
  230. Uncompressed_.Append(in, avail);
  231. Y_ASSERT(Uncompressed_.Avail() == 0);
  232. in += avail;
  233. len -= avail;
  234. WriteCompressedFrame();
  235. }
  236. }
  237. void FlushWithoutEmptyFrame() override {
  238. if (Out_ && !Uncompressed_.Empty()) {
  239. WriteCompressedFrame();
  240. }
  241. }
  242. void FinishAndWriteEmptyFrame() override {
  243. if (Out_) {
  244. Y_DEFER {
  245. Out_ = nullptr;
  246. };
  247. if (!Uncompressed_.Empty()) {
  248. WriteCompressedFrame();
  249. }
  250. WriteEmptyFrame();
  251. }
  252. }
  253. void DoFlush() override {
  254. FlushWithoutEmptyFrame();
  255. }
  256. void DoFinish() override {
  257. FinishAndWriteEmptyFrame();
  258. }
  259. void WriteCompressedFrame() {
  260. static const auto framePayload = sizeof(TFrameHeader) + sizeof(TFrameFooter);
  261. const auto maxFrameSize = ui64(TCodecAlg::MaxCompressedLength(Uncompressed_.Size())) + framePayload;
  262. Y_ENSURE(maxFrameSize <= FRAME_SIZE_LIMIT, "Frame size in encoder is limited to "
  263. << HumanReadableSize(FRAME_SIZE_LIMIT, SF_BYTES)
  264. << " but is " << HumanReadableSize(maxFrameSize, SF_BYTES));
  265. Frame_.Resize(maxFrameSize);
  266. // compress
  267. TBlock compressedBlock = Frame_;
  268. compressedBlock.Skip(sizeof(TFrameHeader));
  269. compressedBlock.Trunc(TCodecAlg::Compress(Uncompressed_, compressedBlock));
  270. // add header
  271. auto header = reinterpret_cast<TFrameHeader*>(Frame_.Data());
  272. header->CompressedSize = SafeIntegerCast<TCompressedSize>(compressedBlock.size());
  273. header->UncompressedSize = SafeIntegerCast<TUncompressedSize>(Uncompressed_.Size());
  274. // add footer
  275. auto footer = reinterpret_cast<TFrameFooter*>(
  276. Frame_.Data() + sizeof(TFrameHeader) + header->CompressedSize);
  277. footer->CheckSum = TCheckSumAlg::Calc(compressedBlock);
  278. // write
  279. Out_->Write(Frame_.Data(), header->CompressedSize + framePayload);
  280. Uncompressed_.Clear();
  281. }
  282. void WriteEmptyFrame() {
  283. static const auto framePayload = sizeof(TFrameHeader) + sizeof(TFrameFooter);
  284. char buf[framePayload] = {0};
  285. Out_->Write(buf, sizeof(buf));
  286. }
  287. private:
  288. IOutputStream* Out_;
  289. TBuffer Uncompressed_;
  290. TBuffer Frame_;
  291. };
  292. }
  293. THolder<IInputStream> CompressedInput(IInputStream* in, ECompression alg) {
  294. switch (alg) {
  295. case ECompression::IDENTITY:
  296. return nullptr;
  297. case ECompression::ZLIB:
  298. return MakeHolder<TFramedDecompressStream<TZlibCodec, TAdler32>>(in);
  299. case ECompression::ZSTD:
  300. return MakeHolder<TFramedDecompressStream<TZstdCodec, TXxHash32>>(in);
  301. case ECompression::LZ4:
  302. return MakeHolder<TFramedDecompressStream<TLz4Codec, TXxHash32>>(in);
  303. case ECompression::UNKNOWN:
  304. return nullptr;
  305. }
  306. Y_ABORT("invalid compression algorithm");
  307. }
  308. THolder<IFramedCompressStream> CompressedOutput(IOutputStream* out, ECompression alg) {
  309. switch (alg) {
  310. case ECompression::IDENTITY:
  311. return nullptr;
  312. case ECompression::ZLIB:
  313. return MakeHolder<TFramedCompressStream<TZlibCodec, TAdler32>>(out);
  314. case ECompression::ZSTD:
  315. return MakeHolder<TFramedCompressStream<TZstdCodec, TXxHash32>>(out);
  316. case ECompression::LZ4:
  317. return MakeHolder<TFramedCompressStream<TLz4Codec, TXxHash32>>(out);
  318. case ECompression::UNKNOWN:
  319. return nullptr;
  320. }
  321. Y_ABORT("invalid compression algorithm");
  322. }
  323. }