compression_zstd.c 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "compression_zstd.h"
  3. #ifdef ENABLE_ZSTD
  4. #include <zstd.h>
  5. void rrdpush_compressor_init_zstd(struct compressor_state *state) {
  6. if(!state->initialized) {
  7. state->initialized = true;
  8. state->stream = ZSTD_createCStream();
  9. if(state->level < 1)
  10. state->level = 1;
  11. if(state->level > ZSTD_maxCLevel())
  12. state->level = ZSTD_maxCLevel();
  13. size_t ret = ZSTD_initCStream(state->stream, state->level);
  14. if(ZSTD_isError(ret))
  15. netdata_log_error("STREAM: ZSTD_initCStream() returned error: %s", ZSTD_getErrorName(ret));
  16. // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_compressionLevel, 1);
  17. // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_strategy, ZSTD_fast);
  18. }
  19. }
  20. void rrdpush_compressor_destroy_zstd(struct compressor_state *state) {
  21. if(state->stream) {
  22. ZSTD_freeCStream(state->stream);
  23. state->stream = NULL;
  24. }
  25. }
  26. size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out) {
  27. if(unlikely(!state || !size || !out))
  28. return 0;
  29. ZSTD_inBuffer inBuffer = {
  30. .pos = 0,
  31. .size = size,
  32. .src = data,
  33. };
  34. size_t wanted_size = MAX(ZSTD_compressBound(inBuffer.size - inBuffer.pos), ZSTD_CStreamOutSize());
  35. simple_ring_buffer_make_room(&state->output, wanted_size);
  36. ZSTD_outBuffer outBuffer = {
  37. .pos = 0,
  38. .size = state->output.size,
  39. .dst = (void *)state->output.data,
  40. };
  41. // compress
  42. size_t ret = ZSTD_compressStream(state->stream, &outBuffer, &inBuffer);
  43. // error handling
  44. if(ZSTD_isError(ret)) {
  45. netdata_log_error("STREAM: ZSTD_compressStream() return error: %s", ZSTD_getErrorName(ret));
  46. return 0;
  47. }
  48. if(inBuffer.pos < inBuffer.size) {
  49. netdata_log_error("STREAM: ZSTD_compressStream() left unprocessed input (source payload %zu bytes, consumed %zu bytes)",
  50. inBuffer.size, inBuffer.pos);
  51. return 0;
  52. }
  53. if(outBuffer.pos == 0) {
  54. // ZSTD needs more input to flush the output, so let's flush it manually
  55. ret = ZSTD_flushStream(state->stream, &outBuffer);
  56. if(ZSTD_isError(ret)) {
  57. netdata_log_error("STREAM: ZSTD_flushStream() return error: %s", ZSTD_getErrorName(ret));
  58. return 0;
  59. }
  60. if(outBuffer.pos == 0) {
  61. netdata_log_error("STREAM: ZSTD_compressStream() returned zero compressed bytes "
  62. "(source is %zu bytes, output buffer can fit %zu bytes) "
  63. , size, outBuffer.size);
  64. return 0;
  65. }
  66. }
  67. state->sender_locked.total_compressions++;
  68. state->sender_locked.total_uncompressed += size;
  69. state->sender_locked.total_compressed += outBuffer.pos;
  70. // return values
  71. *out = state->output.data;
  72. return outBuffer.pos;
  73. }
  74. void rrdpush_decompressor_init_zstd(struct decompressor_state *state) {
  75. if(!state->initialized) {
  76. state->initialized = true;
  77. state->stream = ZSTD_createDStream();
  78. size_t ret = ZSTD_initDStream(state->stream);
  79. if(ZSTD_isError(ret))
  80. netdata_log_error("STREAM: ZSTD_initDStream() returned error: %s", ZSTD_getErrorName(ret));
  81. simple_ring_buffer_make_room(&state->output, MAX(COMPRESSION_MAX_CHUNK, ZSTD_DStreamOutSize()));
  82. }
  83. }
  84. void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state) {
  85. if (state->stream) {
  86. ZSTD_freeDStream(state->stream);
  87. state->stream = NULL;
  88. }
  89. }
  90. size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
  91. if (unlikely(!state || !compressed_data || !compressed_size))
  92. return 0;
  93. // The state.output ring buffer is always EMPTY at this point,
  94. // meaning that (state->output.read_pos == state->output.write_pos)
  95. // However, THEY ARE NOT ZERO.
  96. ZSTD_inBuffer inBuffer = {
  97. .pos = 0,
  98. .size = compressed_size,
  99. .src = compressed_data,
  100. };
  101. ZSTD_outBuffer outBuffer = {
  102. .pos = 0,
  103. .dst = (char *)state->output.data,
  104. .size = state->output.size,
  105. };
  106. size_t ret = ZSTD_decompressStream(
  107. state->stream
  108. , &outBuffer
  109. , &inBuffer);
  110. if(ZSTD_isError(ret)) {
  111. netdata_log_error("STREAM: ZSTD_decompressStream() return error: %s", ZSTD_getErrorName(ret));
  112. return 0;
  113. }
  114. if(inBuffer.pos < inBuffer.size)
  115. fatal("RRDPUSH DECOMPRESS: ZSTD ZSTD_decompressStream() decompressed %zu bytes, "
  116. "but %zu bytes of compressed data remain",
  117. inBuffer.pos, inBuffer.size);
  118. size_t decompressed_size = outBuffer.pos;
  119. state->output.read_pos = 0;
  120. state->output.write_pos = outBuffer.pos;
  121. // statistics
  122. state->total_compressed += compressed_size;
  123. state->total_uncompressed += decompressed_size;
  124. state->total_compressions++;
  125. return decompressed_size;
  126. }
  127. #endif // ENABLE_ZSTD