zstd_dict_codec.cpp 8.5 KB


  1. #include "zstd_dict_codec.h"
  2. #include <library/cpp/packers/packers.h>
  3. #include <util/generic/ptr.h>
  4. #include <util/generic/refcount.h>
  5. #include <util/generic/noncopyable.h>
  6. #include <util/string/builder.h>
  7. #include <util/system/src_location.h>
  8. #include <util/ysaveload.h>
  9. #define ZDICT_STATIC_LINKING_ONLY
  10. #include <contrib/libs/zstd/include/zdict.h>
  11. #include <contrib/libs/zstd/include/zstd.h>
  12. #include <contrib/libs/zstd/include/zstd_errors.h>
  13. // See IGNIETFERRO-320 for possible bugs
  14. namespace NCodecs {
  15. class TZStdDictCodec::TImpl: public TAtomicRefCount<TZStdDictCodec::TImpl> {
  16. template <class T, size_t Deleter(T*)>
  17. class TPtrHolder : TMoveOnly {
  18. T* Ptr = nullptr;
  19. public:
  20. TPtrHolder() = default;
  21. TPtrHolder(T* dict)
  22. : Ptr(dict)
  23. {
  24. }
  25. T* Get() {
  26. return Ptr;
  27. }
  28. const T* Get() const {
  29. return Ptr;
  30. }
  31. void Reset(T* dict) {
  32. Dispose();
  33. Ptr = dict;
  34. }
  35. void Dispose() {
  36. if (Ptr) {
  37. Deleter(Ptr);
  38. Ptr = nullptr;
  39. }
  40. }
  41. ~TPtrHolder() {
  42. Dispose();
  43. }
  44. };
  45. using TCDict = TPtrHolder<ZSTD_CDict, ZSTD_freeCDict>;
  46. using TDDict = TPtrHolder<ZSTD_DDict, ZSTD_freeDDict>;
  47. using TCCtx = TPtrHolder<ZSTD_CCtx, ZSTD_freeCCtx>;
  48. using TDCtx = TPtrHolder<ZSTD_DCtx, ZSTD_freeDCtx>;
  49. using TSizePacker = NPackers::TPacker<ui64>;
  50. public:
  51. static const ui32 SampleSize = (1 << 22) * 5;
  52. explicit TImpl(ui32 comprLevel)
  53. : CompressionLevel(comprLevel)
  54. {
  55. const size_t zeroSz = TSizePacker().MeasureLeaf(0);
  56. Zero.Resize(zeroSz);
  57. TSizePacker().PackLeaf(Zero.data(), 0, zeroSz);
  58. }
  59. ui32 GetCompressionLevel() const {
  60. return CompressionLevel;
  61. }
  62. ui8 Encode(TStringBuf in, TBuffer& outbuf) const {
  63. outbuf.Clear();
  64. if (in.empty()) {
  65. return 0;
  66. }
  67. TSizePacker packer;
  68. const char* rawBeg = in.data();
  69. const size_t rawSz = in.size();
  70. const size_t szSz = packer.MeasureLeaf(rawSz);
  71. const size_t maxDatSz = ZSTD_compressBound(rawSz);
  72. outbuf.Resize(szSz + maxDatSz);
  73. packer.PackLeaf(outbuf.data(), rawSz, szSz);
  74. TCCtx ctx{CheckPtr(ZSTD_createCCtx(), __LOCATION__)};
  75. const size_t resSz = CheckSize(ZSTD_compress_usingCDict(
  76. ctx.Get(), outbuf.data() + szSz, maxDatSz, rawBeg, rawSz, CDict.Get()),
  77. __LOCATION__);
  78. if (resSz < rawSz) {
  79. outbuf.Resize(resSz + szSz);
  80. } else {
  81. outbuf.Resize(Zero.size() + rawSz);
  82. memcpy(outbuf.data(), Zero.data(), Zero.size());
  83. memcpy(outbuf.data() + Zero.size(), rawBeg, rawSz);
  84. }
  85. return 0;
  86. }
  87. void Decode(TStringBuf in, TBuffer& outbuf) const {
  88. outbuf.Clear();
  89. if (in.empty()) {
  90. return;
  91. }
  92. TSizePacker packer;
  93. const char* rawBeg = in.data();
  94. size_t rawSz = in.size();
  95. const size_t szSz = packer.SkipLeaf(rawBeg);
  96. ui64 datSz = 0;
  97. packer.UnpackLeaf(rawBeg, datSz);
  98. rawBeg += szSz;
  99. rawSz -= szSz;
  100. if (!datSz) {
  101. outbuf.ReserveExactNeverCallMeInSaneCode(rawSz);
  102. outbuf.Resize(rawSz);
  103. memcpy(outbuf.data(), rawBeg, rawSz);
  104. } else {
  105. // size_t zSz = ZSTD_getDecompressedSize(rawBeg, rawSz);
  106. // Y_ENSURE_EX(datSz == zSz, TCodecException() << datSz << " != " << zSz);
  107. outbuf.ReserveExactNeverCallMeInSaneCode(datSz);
  108. outbuf.Resize(datSz);
  109. TDCtx ctx{CheckPtr(ZSTD_createDCtx(), __LOCATION__)};
  110. CheckSize(ZSTD_decompress_usingDDict(
  111. ctx.Get(), outbuf.data(), outbuf.size(), rawBeg, rawSz, DDict.Get()),
  112. __LOCATION__);
  113. outbuf.Resize(datSz);
  114. }
  115. }
  116. bool Learn(ISequenceReader& in, bool throwOnError) {
  117. TBuffer data;
  118. TVector<size_t> lens;
  119. data.Reserve(2 * SampleSize);
  120. TStringBuf r;
  121. while (in.NextRegion(r)) {
  122. if (!r) {
  123. continue;
  124. }
  125. data.Append(r.data(), r.size());
  126. lens.push_back(r.size());
  127. }
  128. ZDICT_legacy_params_t params;
  129. memset(&params, 0, sizeof(params));
  130. params.zParams.compressionLevel = 1;
  131. params.zParams.notificationLevel = 1;
  132. Dict.Resize(Max<size_t>(1 << 20, data.Size() + 16 * lens.size()));
  133. if (!lens) {
  134. Dict.Reset();
  135. } else {
  136. size_t trainResult = ZDICT_trainFromBuffer_legacy(
  137. Dict.data(), Dict.size(), data.Data(), const_cast<const size_t*>(&lens[0]), lens.size(), params);
  138. if (ZSTD_isError(trainResult)) {
  139. if (!throwOnError) {
  140. return false;
  141. }
  142. CheckSize(trainResult, __LOCATION__);
  143. }
  144. Dict.Resize(trainResult);
  145. Dict.ShrinkToFit();
  146. }
  147. InitContexts();
  148. return true;
  149. }
  150. void Save(IOutputStream* out) const {
  151. ::Save(out, Dict);
  152. }
  153. void Load(IInputStream* in) {
  154. ::Load(in, Dict);
  155. InitContexts();
  156. }
  157. void InitContexts() {
  158. CDict.Reset(CheckPtr(ZSTD_createCDict(Dict.data(), Dict.size(), CompressionLevel), __LOCATION__));
  159. DDict.Reset(CheckPtr(ZSTD_createDDict(Dict.data(), Dict.size()), __LOCATION__));
  160. }
  161. static size_t CheckSize(size_t sz, TSourceLocation loc) {
  162. if (ZSTD_isError(sz)) {
  163. ythrow TCodecException() << loc << " " << ZSTD_getErrorName(sz) << " (code " << (int)ZSTD_getErrorCode(sz) << ")";
  164. }
  165. return sz;
  166. }
  167. template <class T>
  168. static T* CheckPtr(T* t, TSourceLocation loc) {
  169. Y_ENSURE_EX(t, TCodecException() << loc << " "
  170. << "unexpected nullptr");
  171. return t;
  172. }
  173. private:
  174. ui32 CompressionLevel = 1;
  175. TBuffer Zero;
  176. TBuffer Dict;
  177. TCDict CDict;
  178. TDDict DDict;
  179. };
  180. TZStdDictCodec::TZStdDictCodec(ui32 comprLevel)
  181. : Impl(new TImpl(comprLevel))
  182. {
  183. MyTraits.NeedsTraining = true;
  184. MyTraits.SizeOnEncodeMultiplier = 2;
  185. MyTraits.SizeOnDecodeMultiplier = 10;
  186. MyTraits.RecommendedSampleSize = TImpl::SampleSize; // same as for solar
  187. }
  188. TZStdDictCodec::~TZStdDictCodec() {
  189. }
  190. TString TZStdDictCodec::GetName() const {
  191. return TStringBuilder() << MyName() << "-" << Impl->GetCompressionLevel();
  192. }
  193. ui8 TZStdDictCodec::Encode(TStringBuf in, TBuffer& out) const {
  194. return Impl->Encode(in, out);
  195. }
  196. void TZStdDictCodec::Decode(TStringBuf in, TBuffer& out) const {
  197. Impl->Decode(in, out);
  198. }
  199. void TZStdDictCodec::DoLearn(ISequenceReader& in) {
  200. Impl = new TImpl(Impl->GetCompressionLevel());
  201. Impl->Learn(in, true/*throwOnError*/);
  202. }
  203. bool TZStdDictCodec::DoTryToLearn(ISequenceReader& in) {
  204. Impl = new TImpl(Impl->GetCompressionLevel());
  205. return Impl->Learn(in, false/*throwOnError*/);
  206. }
  207. void TZStdDictCodec::Save(IOutputStream* out) const {
  208. Impl->Save(out);
  209. }
  210. void TZStdDictCodec::Load(IInputStream* in) {
  211. Impl->Load(in);
  212. }
  213. TVector<TString> TZStdDictCodec::ListCompressionNames() {
  214. TVector<TString> res;
  215. for (int i = 1; i <= ZSTD_maxCLevel(); ++i) {
  216. res.emplace_back(TStringBuilder() << MyName() << "-" << i);
  217. }
  218. return res;
  219. }
  220. int TZStdDictCodec::ParseCompressionName(TStringBuf name) {
  221. int c = 0;
  222. TryFromString(name.After('-'), c);
  223. Y_ENSURE_EX(name.Before('-') == MyName() && c > 0 && c <= ZSTD_maxCLevel(), TCodecException() << "invald codec name" << name);
  224. return c;
  225. }
  226. }