zstd_compression.cpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #include "zstd_compression.h"
  2. #include "compression.h"
  3. #include <yt/yt/core/misc/finally.h>
  4. #include <contrib/libs/zstd/include/zstd.h>
  5. namespace NYT::NLogging {
  6. ////////////////////////////////////////////////////////////////////////////////
  7. // ZstdSyncTag is the constant part of a skippable frame appended after each zstd frame.
  8. // It is ignored by tools and allows positioning after last fully written frame upon file opening.
  9. constexpr const char ZstdSyncTag[] = {
  10. '\x50', '\x2a', '\x4d', '\x18', // zstd skippable frame magic number
  11. '\x18', '\x00', '\x00', '\x00', // data size: 128-bit ID + 64-bit offset
  12. // 128-bit sync tag ID
  13. '\xf6', '\x79', '\x9c', '\x4e', '\xd1', '\x09', '\x90', '\x7e',
  14. '\x29', '\x91', '\xd9', '\xe6', '\xbe', '\xe4', '\x84', '\x40'
  15. // 64-bit offset is written separately.
  16. };
  17. constexpr i64 MaxZstdFrameLength = ZSTD_COMPRESSBOUND(MaxZstdFrameUncompressedLength);
  18. constexpr i64 ZstdSyncTagLength = sizeof(ZstdSyncTag) + sizeof(ui64);
  19. constexpr i64 TailScanLength = MaxZstdFrameLength + 2 * ZstdSyncTagLength;
  20. ////////////////////////////////////////////////////////////////////////////////
  21. static std::optional<i64> FindSyncTag(const char* buf, size_t size, i64 offset)
  22. {
  23. const char* syncTag = nullptr;
  24. TStringBuf data(buf, size);
  25. TStringBuf zstdSyncTagView(ZstdSyncTag, sizeof(ZstdSyncTag));
  26. while (true) {
  27. size_t tagPos = data.find(zstdSyncTagView);
  28. if (tagPos == TStringBuf::npos) {
  29. break;
  30. }
  31. const char* tag = data.data() + tagPos;
  32. data.remove_prefix(tagPos + 1);
  33. if (ZstdSyncTagLength - 1 > data.size()) {
  34. continue;
  35. }
  36. ui64 tagOffset = ReadUnaligned<ui64>(tag + sizeof(ZstdSyncTag));
  37. ui64 tagOffsetExpected = offset + (tag - buf);
  38. if (tagOffset == tagOffsetExpected) {
  39. syncTag = tag;
  40. }
  41. }
  42. if (!syncTag) {
  43. return {};
  44. }
  45. return offset + (syncTag - buf);
  46. }
  47. ////////////////////////////////////////////////////////////////////////////////
  48. class TZstdLogCompressionCodec
  49. : public ILogCompressionCodec
  50. {
  51. public:
  52. explicit TZstdLogCompressionCodec(int compressionLevel)
  53. : CompressionLevel_(compressionLevel)
  54. { }
  55. i64 GetMaxBlockSize() const override
  56. {
  57. return MaxZstdFrameUncompressedLength;
  58. }
  59. void Compress(const TBuffer& input, TBuffer& output) override
  60. {
  61. auto context = ZSTD_createCCtx();
  62. auto contextGuard = Finally([&] {
  63. ZSTD_freeCCtx(context);
  64. });
  65. auto frameLength = ZSTD_COMPRESSBOUND(std::min<i64>(MaxZstdFrameUncompressedLength, input.Size()));
  66. output.Reserve(output.Size() + frameLength + ZstdSyncTagLength);
  67. size_t size = ZSTD_compressCCtx(
  68. context,
  69. output.Data() + output.Size(),
  70. frameLength,
  71. input.Data(),
  72. input.Size(),
  73. CompressionLevel_);
  74. if (ZSTD_isError(size)) {
  75. THROW_ERROR_EXCEPTION("ZSTD_compressCCtx() failed")
  76. << TErrorAttribute("zstd_error", ZSTD_getErrorName(size));
  77. }
  78. output.Advance(size);
  79. }
  80. void AddSyncTag(i64 offset, TBuffer& output) override
  81. {
  82. output.Append(ZstdSyncTag, sizeof(ZstdSyncTag));
  83. output.Append(reinterpret_cast<const char*>(&offset), sizeof(offset));
  84. }
  85. void Repair(TFile* file, i64& outputPosition) override
  86. {
  87. constexpr i64 scanOverlap = ZstdSyncTagLength - 1;
  88. i64 fileSize = file->GetLength();
  89. i64 bufSize = fileSize;
  90. i64 pos = Max<i64>(bufSize - TailScanLength, 0);
  91. bufSize -= pos;
  92. TBuffer buffer;
  93. outputPosition = 0;
  94. while (bufSize >= ZstdSyncTagLength) {
  95. buffer.Resize(0);
  96. buffer.Reserve(bufSize);
  97. size_t sz = file->Pread(buffer.Data(), bufSize, pos);
  98. buffer.Resize(sz);
  99. std::optional<i64> off = FindSyncTag(buffer.Data(), buffer.Size(), pos);
  100. if (off.has_value()) {
  101. outputPosition = *off + ZstdSyncTagLength;
  102. break;
  103. }
  104. i64 newPos = Max<i64>(pos - TailScanLength, 0);
  105. bufSize = Max<i64>(pos + scanOverlap - newPos, 0);
  106. pos = newPos;
  107. }
  108. file->Resize(outputPosition);
  109. }
  110. private:
  111. int CompressionLevel_;
  112. };
  113. DECLARE_REFCOUNTED_TYPE(TZstdLogCompressionCodec)
  114. DEFINE_REFCOUNTED_TYPE(TZstdLogCompressionCodec)
  115. ILogCompressionCodecPtr CreateZstdCompressionCodec(int compressionLevel)
  116. {
  117. return New<TZstdLogCompressionCodec>(compressionLevel);
  118. }
  119. ////////////////////////////////////////////////////////////////////////////////
  120. } // namespace NYT::NLogging