123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- #include "compression.h"
- #include <util/generic/buffer.h>
- #include <util/generic/cast.h>
- #include <util/generic/ptr.h>
- #include <util/generic/scope.h>
- #include <util/generic/size_literals.h>
- #include <util/stream/format.h>
- #include <util/stream/output.h>
- #include <util/stream/walk.h>
- #include <contrib/libs/lz4/lz4.h>
- #include <contrib/libs/xxhash/xxhash.h>
- #include <zlib.h>
- #define ZSTD_STATIC_LINKING_ONLY
- #include <contrib/libs/zstd/include/zstd.h>
- namespace NMonitoring {
- namespace {
- ///////////////////////////////////////////////////////////////////////////////
- // Frame
- ///////////////////////////////////////////////////////////////////////////////
- using TCompressedSize = ui32;
- using TUncompressedSize = ui32;
- using TCheckSum = ui32;
- constexpr size_t COMPRESSED_FRAME_SIZE_LIMIT = 512_KB;
- constexpr size_t UNCOMPRESSED_FRAME_SIZE_LIMIT = COMPRESSED_FRAME_SIZE_LIMIT;
- constexpr size_t FRAME_SIZE_LIMIT = 2_MB;
- constexpr size_t DEFAULT_FRAME_LEN = 64_KB;
- struct Y_PACKED TFrameHeader {
- TCompressedSize CompressedSize;
- TUncompressedSize UncompressedSize;
- };
- struct Y_PACKED TFrameFooter {
- TCheckSum CheckSum;
- };
- ///////////////////////////////////////////////////////////////////////////////
- // TBlock
- ///////////////////////////////////////////////////////////////////////////////
- struct TBlock: public TStringBuf {
- template <typename T>
- TBlock(T&& t)
- : TStringBuf(t.data(), t.size())
- {
- Y_ENSURE(t.data() != nullptr);
- }
- char* data() noexcept {
- return const_cast<char*>(TStringBuf::data());
- }
- };
- ///////////////////////////////////////////////////////////////////////////////
- // XXHASH
- ///////////////////////////////////////////////////////////////////////////////
- struct TXxHash32 {
- static TCheckSum Calc(TBlock in) {
- static const ui32 SEED = 0x1337c0de;
- return XXH32(in.data(), in.size(), SEED);
- }
- static bool Check(TBlock in, TCheckSum checksum) {
- return Calc(in) == checksum;
- }
- };
- ///////////////////////////////////////////////////////////////////////////////
- // Adler32
- ///////////////////////////////////////////////////////////////////////////////
- struct TAdler32 {
- static TCheckSum Calc(TBlock in) {
- return adler32(1L, reinterpret_cast<const Bytef*>(in.data()), in.size());
- }
- static bool Check(TBlock in, TCheckSum checksum) {
- return Calc(in) == checksum;
- }
- };
- ///////////////////////////////////////////////////////////////////////////////
- // LZ4
- ///////////////////////////////////////////////////////////////////////////////
- struct TLz4Codec {
- static size_t MaxCompressedLength(size_t in) {
- int result = LZ4_compressBound(static_cast<int>(in));
- Y_ENSURE(result != 0, "lz4 input size is too large");
- return result;
- }
- static size_t Compress(TBlock in, TBlock out) {
- int rc = LZ4_compress_default(
- in.data(),
- out.data(),
- SafeIntegerCast<int>(in.size()),
- SafeIntegerCast<int>(out.size()));
- Y_ENSURE(rc != 0, "lz4 compression failed");
- return rc;
- }
- static void Decompress(TBlock in, TBlock out) {
- int rc = LZ4_decompress_safe(
- in.data(),
- out.data(),
- SafeIntegerCast<int>(in.size()),
- SafeIntegerCast<int>(out.size()));
- Y_ENSURE(rc >= 0, "the lz4 stream is detected malformed");
- }
- };
- ///////////////////////////////////////////////////////////////////////////////
- // ZSTD
- ///////////////////////////////////////////////////////////////////////////////
- struct TZstdCodec {
- static const int LEVEL = 11;
- static size_t MaxCompressedLength(size_t in) {
- return ZSTD_compressBound(in);
- }
- static size_t Compress(TBlock in, TBlock out) {
- size_t rc = ZSTD_compress(out.data(), out.size(), in.data(), in.size(), LEVEL);
- if (Y_UNLIKELY(ZSTD_isError(rc))) {
- ythrow yexception() << TStringBuf("zstd compression failed: ")
- << ZSTD_getErrorName(rc);
- }
- return rc;
- }
- static void Decompress(TBlock in, TBlock out) {
- size_t rc = ZSTD_decompress(out.data(), out.size(), in.data(), in.size());
- if (Y_UNLIKELY(ZSTD_isError(rc))) {
- ythrow yexception() << TStringBuf("zstd decompression failed: ")
- << ZSTD_getErrorName(rc);
- }
- Y_ENSURE(rc == out.size(), "zstd decompressed wrong size");
- }
- };
- ///////////////////////////////////////////////////////////////////////////////
- // ZLIB
- ///////////////////////////////////////////////////////////////////////////////
- struct TZlibCodec {
- static const int LEVEL = 6;
- static size_t MaxCompressedLength(size_t in) {
- return compressBound(in);
- }
- static size_t Compress(TBlock in, TBlock out) {
- uLong ret = out.size();
- int rc = compress2(
- reinterpret_cast<Bytef*>(out.data()),
- &ret,
- reinterpret_cast<const Bytef*>(in.data()),
- in.size(),
- LEVEL);
- Y_ENSURE(rc == Z_OK, "zlib compression failed");
- return ret;
- }
- static void Decompress(TBlock in, TBlock out) {
- uLong ret = out.size();
- int rc = uncompress(
- reinterpret_cast<Bytef*>(out.data()),
- &ret,
- reinterpret_cast<const Bytef*>(in.data()),
- in.size());
- Y_ENSURE(rc == Z_OK, "zlib decompression failed");
- Y_ENSURE(ret == out.size(), "zlib decompressed wrong size");
- }
- };
- //
- // Framed streams use next frame structure:
- //
- // +-----------------+-------------------+============+------------------+
- // | compressed size | uncompressed size | data | check sum |
- // +-----------------+-------------------+============+------------------+
- // 4 bytes 4 bytes var len 4 bytes
- //
- ///////////////////////////////////////////////////////////////////////////////
- // TFramedInputStream
- ///////////////////////////////////////////////////////////////////////////////
- template <typename TCodecAlg, typename TCheckSumAlg>
- class TFramedDecompressStream final: public IWalkInput {
- public:
- explicit TFramedDecompressStream(IInputStream* in)
- : In_(in)
- {
- }
- private:
- size_t DoUnboundedNext(const void** ptr) override {
- if (!In_) {
- return 0;
- }
- TFrameHeader header;
- In_->LoadOrFail(&header, sizeof(header));
- if (header.CompressedSize == 0) {
- In_ = nullptr;
- return 0;
- }
- Y_ENSURE(header.CompressedSize <= COMPRESSED_FRAME_SIZE_LIMIT, "Compressed frame size is limited to "
- << HumanReadableSize(COMPRESSED_FRAME_SIZE_LIMIT, SF_BYTES)
- << " but is " << HumanReadableSize(header.CompressedSize, SF_BYTES));
- Y_ENSURE(header.UncompressedSize <= UNCOMPRESSED_FRAME_SIZE_LIMIT, "Uncompressed frame size is limited to "
- << HumanReadableSize(UNCOMPRESSED_FRAME_SIZE_LIMIT, SF_BYTES)
- << " but is " << HumanReadableSize(header.UncompressedSize, SF_BYTES));
- Compressed_.Resize(header.CompressedSize);
- In_->LoadOrFail(Compressed_.Data(), header.CompressedSize);
- TFrameFooter footer;
- In_->LoadOrFail(&footer, sizeof(footer));
- Y_ENSURE(TCheckSumAlg::Check(Compressed_, footer.CheckSum),
- "corrupted stream: check sum mismatch");
- Uncompressed_.Resize(header.UncompressedSize);
- TCodecAlg::Decompress(Compressed_, Uncompressed_);
- *ptr = Uncompressed_.Data();
- return Uncompressed_.Size();
- }
- private:
- IInputStream* In_;
- TBuffer Compressed_;
- TBuffer Uncompressed_;
- };
- ///////////////////////////////////////////////////////////////////////////////
- // TFramedOutputStream
- ///////////////////////////////////////////////////////////////////////////////
- template <typename TCodecAlg, typename TCheckSumAlg>
- class TFramedCompressStream final: public IFramedCompressStream {
- public:
- explicit TFramedCompressStream(IOutputStream* out)
- : Out_(out)
- , Uncompressed_(DEFAULT_FRAME_LEN)
- {
- }
- ~TFramedCompressStream() override {
- try {
- Finish();
- } catch (...) {
- }
- }
- private:
- void DoWrite(const void* buf, size_t len) override {
- const char* in = static_cast<const char*>(buf);
- while (len != 0) {
- const size_t avail = Uncompressed_.Avail();
- if (len < avail) {
- Uncompressed_.Append(in, len);
- return;
- }
- Uncompressed_.Append(in, avail);
- Y_ASSERT(Uncompressed_.Avail() == 0);
- in += avail;
- len -= avail;
- WriteCompressedFrame();
- }
- }
- void FlushWithoutEmptyFrame() override {
- if (Out_ && !Uncompressed_.Empty()) {
- WriteCompressedFrame();
- }
- }
- void FinishAndWriteEmptyFrame() override {
- if (Out_) {
- Y_DEFER {
- Out_ = nullptr;
- };
- if (!Uncompressed_.Empty()) {
- WriteCompressedFrame();
- }
- WriteEmptyFrame();
- }
- }
- void DoFlush() override {
- FlushWithoutEmptyFrame();
- }
- void DoFinish() override {
- FinishAndWriteEmptyFrame();
- }
- void WriteCompressedFrame() {
- static const auto framePayload = sizeof(TFrameHeader) + sizeof(TFrameFooter);
- const auto maxFrameSize = ui64(TCodecAlg::MaxCompressedLength(Uncompressed_.Size())) + framePayload;
- Y_ENSURE(maxFrameSize <= FRAME_SIZE_LIMIT, "Frame size in encoder is limited to "
- << HumanReadableSize(FRAME_SIZE_LIMIT, SF_BYTES)
- << " but is " << HumanReadableSize(maxFrameSize, SF_BYTES));
- Frame_.Resize(maxFrameSize);
- // compress
- TBlock compressedBlock = Frame_;
- compressedBlock.Skip(sizeof(TFrameHeader));
- compressedBlock.Trunc(TCodecAlg::Compress(Uncompressed_, compressedBlock));
- // add header
- auto header = reinterpret_cast<TFrameHeader*>(Frame_.Data());
- header->CompressedSize = SafeIntegerCast<TCompressedSize>(compressedBlock.size());
- header->UncompressedSize = SafeIntegerCast<TUncompressedSize>(Uncompressed_.Size());
- // add footer
- auto footer = reinterpret_cast<TFrameFooter*>(
- Frame_.Data() + sizeof(TFrameHeader) + header->CompressedSize);
- footer->CheckSum = TCheckSumAlg::Calc(compressedBlock);
- // write
- Out_->Write(Frame_.Data(), header->CompressedSize + framePayload);
- Uncompressed_.Clear();
- }
- void WriteEmptyFrame() {
- static const auto framePayload = sizeof(TFrameHeader) + sizeof(TFrameFooter);
- char buf[framePayload] = {0};
- Out_->Write(buf, sizeof(buf));
- }
- private:
- IOutputStream* Out_;
- TBuffer Uncompressed_;
- TBuffer Frame_;
- };
- }
- THolder<IInputStream> CompressedInput(IInputStream* in, ECompression alg) {
- switch (alg) {
- case ECompression::IDENTITY:
- return nullptr;
- case ECompression::ZLIB:
- return MakeHolder<TFramedDecompressStream<TZlibCodec, TAdler32>>(in);
- case ECompression::ZSTD:
- return MakeHolder<TFramedDecompressStream<TZstdCodec, TXxHash32>>(in);
- case ECompression::LZ4:
- return MakeHolder<TFramedDecompressStream<TLz4Codec, TXxHash32>>(in);
- case ECompression::UNKNOWN:
- return nullptr;
- }
- Y_ABORT("invalid compression algorithm");
- }
- THolder<IFramedCompressStream> CompressedOutput(IOutputStream* out, ECompression alg) {
- switch (alg) {
- case ECompression::IDENTITY:
- return nullptr;
- case ECompression::ZLIB:
- return MakeHolder<TFramedCompressStream<TZlibCodec, TAdler32>>(out);
- case ECompression::ZSTD:
- return MakeHolder<TFramedCompressStream<TZstdCodec, TXxHash32>>(out);
- case ECompression::LZ4:
- return MakeHolder<TFramedCompressStream<TLz4Codec, TXxHash32>>(out);
- case ECompression::UNKNOWN:
- return nullptr;
- }
- Y_ABORT("invalid compression algorithm");
- }
- }
|