zstd_dict_codec.cpp 8.4 KB


  1. #include "zstd_dict_codec.h"
  2. #include <library/cpp/packers/packers.h>
  3. #include <util/generic/ptr.h>
  4. #include <util/generic/refcount.h>
  5. #include <util/generic/noncopyable.h>
  6. #include <util/string/builder.h>
  7. #include <util/system/src_location.h>
  8. #include <util/ysaveload.h>
  9. #define ZDICT_STATIC_LINKING_ONLY
  10. #include <contrib/libs/zstd/include/zdict.h>
  11. #include <contrib/libs/zstd/include/zstd.h>
  12. #include <contrib/libs/zstd/include/zstd_errors.h>
  13. // See IGNIETFERRO-320 for possible bugs
  14. namespace NCodecs {
  15. class TZStdDictCodec::TImpl: public TAtomicRefCount<TZStdDictCodec::TImpl> {
  16. template <class T, size_t Deleter(T*)>
  17. class TPtrHolder : TMoveOnly {
  18. T* Ptr = nullptr;
  19. public:
  20. TPtrHolder() = default;
  21. TPtrHolder(T* dict)
  22. : Ptr(dict)
  23. {
  24. }
  25. T* Get() {
  26. return Ptr;
  27. }
  28. const T* Get() const {
  29. return Ptr;
  30. }
  31. void Reset(T* dict) {
  32. Dispose();
  33. Ptr = dict;
  34. }
  35. void Dispose() {
  36. if (Ptr) {
  37. Deleter(Ptr);
  38. Ptr = nullptr;
  39. }
  40. }
  41. ~TPtrHolder() {
  42. Dispose();
  43. }
  44. };
  45. using TCDict = TPtrHolder<ZSTD_CDict, ZSTD_freeCDict>;
  46. using TDDict = TPtrHolder<ZSTD_DDict, ZSTD_freeDDict>;
  47. using TCCtx = TPtrHolder<ZSTD_CCtx, ZSTD_freeCCtx>;
  48. using TDCtx = TPtrHolder<ZSTD_DCtx, ZSTD_freeDCtx>;
  49. using TSizePacker = NPackers::TPacker<ui64>;
  50. public:
  51. static const ui32 SampleSize = (1 << 22) * 5;
  52. explicit TImpl(ui32 comprLevel)
  53. : CompressionLevel(comprLevel)
  54. {
  55. const size_t zeroSz = TSizePacker().MeasureLeaf(0);
  56. Zero.Resize(zeroSz);
  57. TSizePacker().PackLeaf(Zero.data(), 0, zeroSz);
  58. }
  59. ui32 GetCompressionLevel() const {
  60. return CompressionLevel;
  61. }
  62. ui8 Encode(TStringBuf in, TBuffer& outbuf) const {
  63. outbuf.Clear();
  64. if (in.empty()) {
  65. return 0;
  66. }
  67. TSizePacker packer;
  68. const char* rawBeg = in.data();
  69. const size_t rawSz = in.size();
  70. const size_t szSz = packer.MeasureLeaf(rawSz);
  71. const size_t maxDatSz = ZSTD_compressBound(rawSz);
  72. outbuf.Resize(szSz + maxDatSz);
  73. packer.PackLeaf(outbuf.data(), rawSz, szSz);
  74. TCCtx ctx{CheckPtr(ZSTD_createCCtx(), __LOCATION__)};
  75. const size_t resSz = CheckSize(ZSTD_compress_usingCDict(
  76. ctx.Get(), outbuf.data() + szSz, maxDatSz, rawBeg, rawSz, CDict.Get()),
  77. __LOCATION__);
  78. if (resSz < rawSz) {
  79. outbuf.Resize(resSz + szSz);
  80. } else {
  81. outbuf.Resize(Zero.size() + rawSz);
  82. memcpy(outbuf.data(), Zero.data(), Zero.size());
  83. memcpy(outbuf.data() + Zero.size(), rawBeg, rawSz);
  84. }
  85. return 0;
  86. }
  87. void Decode(TStringBuf in, TBuffer& outbuf) const {
  88. outbuf.Clear();
  89. if (in.empty()) {
  90. return;
  91. }
  92. TSizePacker packer;
  93. const char* rawBeg = in.data();
  94. size_t rawSz = in.size();
  95. const size_t szSz = packer.SkipLeaf(rawBeg);
  96. ui64 datSz = 0;
  97. packer.UnpackLeaf(rawBeg, datSz);
  98. rawBeg += szSz;
  99. rawSz -= szSz;
  100. if (!datSz) {
  101. outbuf.Resize(rawSz);
  102. memcpy(outbuf.data(), rawBeg, rawSz);
  103. } else {
  104. // size_t zSz = ZSTD_getDecompressedSize(rawBeg, rawSz);
  105. // Y_ENSURE_EX(datSz == zSz, TCodecException() << datSz << " != " << zSz);
  106. outbuf.Resize(datSz);
  107. TDCtx ctx{CheckPtr(ZSTD_createDCtx(), __LOCATION__)};
  108. CheckSize(ZSTD_decompress_usingDDict(
  109. ctx.Get(), outbuf.data(), outbuf.size(), rawBeg, rawSz, DDict.Get()),
  110. __LOCATION__);
  111. outbuf.Resize(datSz);
  112. }
  113. }
  114. bool Learn(ISequenceReader& in, bool throwOnError) {
  115. TBuffer data;
  116. TVector<size_t> lens;
  117. data.Reserve(2 * SampleSize);
  118. TStringBuf r;
  119. while (in.NextRegion(r)) {
  120. if (!r) {
  121. continue;
  122. }
  123. data.Append(r.data(), r.size());
  124. lens.push_back(r.size());
  125. }
  126. ZDICT_legacy_params_t params;
  127. memset(&params, 0, sizeof(params));
  128. params.zParams.compressionLevel = 1;
  129. params.zParams.notificationLevel = 1;
  130. Dict.Resize(Max<size_t>(1 << 20, data.Size() + 16 * lens.size()));
  131. if (!lens) {
  132. Dict.Reset();
  133. } else {
  134. size_t trainResult = ZDICT_trainFromBuffer_legacy(
  135. Dict.data(), Dict.size(), data.Data(), const_cast<const size_t*>(&lens[0]), lens.size(), params);
  136. if (ZSTD_isError(trainResult)) {
  137. if (!throwOnError) {
  138. return false;
  139. }
  140. CheckSize(trainResult, __LOCATION__);
  141. }
  142. Dict.Resize(trainResult);
  143. Dict.ShrinkToFit();
  144. }
  145. InitContexts();
  146. return true;
  147. }
  148. void Save(IOutputStream* out) const {
  149. ::Save(out, Dict);
  150. }
  151. void Load(IInputStream* in) {
  152. ::Load(in, Dict);
  153. InitContexts();
  154. }
  155. void InitContexts() {
  156. CDict.Reset(CheckPtr(ZSTD_createCDict(Dict.data(), Dict.size(), CompressionLevel), __LOCATION__));
  157. DDict.Reset(CheckPtr(ZSTD_createDDict(Dict.data(), Dict.size()), __LOCATION__));
  158. }
  159. static size_t CheckSize(size_t sz, TSourceLocation loc) {
  160. if (ZSTD_isError(sz)) {
  161. ythrow TCodecException() << loc << " " << ZSTD_getErrorName(sz) << " (code " << (int)ZSTD_getErrorCode(sz) << ")";
  162. }
  163. return sz;
  164. }
  165. template <class T>
  166. static T* CheckPtr(T* t, TSourceLocation loc) {
  167. Y_ENSURE_EX(t, TCodecException() << loc << " "
  168. << "unexpected nullptr");
  169. return t;
  170. }
  171. private:
  172. ui32 CompressionLevel = 1;
  173. TBuffer Zero;
  174. TBuffer Dict;
  175. TCDict CDict;
  176. TDDict DDict;
  177. };
  178. TZStdDictCodec::TZStdDictCodec(ui32 comprLevel)
  179. : Impl(new TImpl(comprLevel))
  180. {
  181. MyTraits.NeedsTraining = true;
  182. MyTraits.SizeOnEncodeMultiplier = 2;
  183. MyTraits.SizeOnDecodeMultiplier = 10;
  184. MyTraits.RecommendedSampleSize = TImpl::SampleSize; // same as for solar
  185. }
  186. TZStdDictCodec::~TZStdDictCodec() {
  187. }
  188. TString TZStdDictCodec::GetName() const {
  189. return TStringBuilder() << MyName() << "-" << Impl->GetCompressionLevel();
  190. }
  191. ui8 TZStdDictCodec::Encode(TStringBuf in, TBuffer& out) const {
  192. return Impl->Encode(in, out);
  193. }
  194. void TZStdDictCodec::Decode(TStringBuf in, TBuffer& out) const {
  195. Impl->Decode(in, out);
  196. }
  197. void TZStdDictCodec::DoLearn(ISequenceReader& in) {
  198. Impl = new TImpl(Impl->GetCompressionLevel());
  199. Impl->Learn(in, true/*throwOnError*/);
  200. }
  201. bool TZStdDictCodec::DoTryToLearn(ISequenceReader& in) {
  202. Impl = new TImpl(Impl->GetCompressionLevel());
  203. return Impl->Learn(in, false/*throwOnError*/);
  204. }
  205. void TZStdDictCodec::Save(IOutputStream* out) const {
  206. Impl->Save(out);
  207. }
  208. void TZStdDictCodec::Load(IInputStream* in) {
  209. Impl->Load(in);
  210. }
  211. TVector<TString> TZStdDictCodec::ListCompressionNames() {
  212. TVector<TString> res;
  213. for (int i = 1; i <= ZSTD_maxCLevel(); ++i) {
  214. res.emplace_back(TStringBuilder() << MyName() << "-" << i);
  215. }
  216. return res;
  217. }
  218. int TZStdDictCodec::ParseCompressionName(TStringBuf name) {
  219. int c = 0;
  220. TryFromString(name.After('-'), c);
  221. Y_ENSURE_EX(name.Before('-') == MyName() && c > 0 && c <= ZSTD_maxCLevel(), TCodecException() << "invald codec name" << name);
  222. return c;
  223. }
  224. }