compression_lz4.c 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "compression_lz4.h"
  3. #ifdef ENABLE_LZ4
  4. #include "lz4.h"
  5. // ----------------------------------------------------------------------------
  6. // compress
  7. void rrdpush_compressor_init_lz4(struct compressor_state *state) {
  8. if(!state->initialized) {
  9. state->initialized = true;
  10. state->stream = LZ4_createStream();
  11. // LZ4 needs access to the last 64KB of source data
  12. // so, we keep twice the size of each message
  13. simple_ring_buffer_make_room(&state->input, 65536 + COMPRESSION_MAX_CHUNK * 2);
  14. }
  15. }
  16. void rrdpush_compressor_destroy_lz4(struct compressor_state *state) {
  17. if (state->stream) {
  18. LZ4_freeStream(state->stream);
  19. state->stream = NULL;
  20. }
  21. }
  22. /*
  23. * Compress the given block of data
  24. * Compressed data will remain in the internal buffer until the next invocation
  25. * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
  26. * or 0 in case of error
  27. */
  28. size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out) {
  29. if(unlikely(!state || !size || !out))
  30. return 0;
  31. // we need to keep the last 64K of our previous source data
  32. // as they were in the ring buffer
  33. simple_ring_buffer_make_room(&state->output, LZ4_COMPRESSBOUND(size));
  34. if(state->input.write_pos + size > state->input.size)
  35. // the input buffer cannot fit out data, restart from zero
  36. simple_ring_buffer_reset(&state->input);
  37. simple_ring_buffer_append_data(&state->input, data, size);
  38. long int compressed_data_size = LZ4_compress_fast_continue(
  39. state->stream,
  40. state->input.data + state->input.read_pos,
  41. (char *)state->output.data,
  42. (int)(state->input.write_pos - state->input.read_pos),
  43. (int)state->output.size,
  44. state->level);
  45. if (compressed_data_size <= 0) {
  46. netdata_log_error("STREAM: LZ4_compress_fast_continue() returned %ld "
  47. "(source is %zu bytes, output buffer can fit %zu bytes)",
  48. compressed_data_size, size, state->output.size);
  49. return 0;
  50. }
  51. state->input.read_pos = state->input.write_pos;
  52. state->sender_locked.total_compressions++;
  53. state->sender_locked.total_uncompressed += size;
  54. state->sender_locked.total_compressed += compressed_data_size;
  55. *out = state->output.data;
  56. return compressed_data_size;
  57. }
  58. // ----------------------------------------------------------------------------
  59. // decompress
  60. void rrdpush_decompressor_init_lz4(struct decompressor_state *state) {
  61. if(!state->initialized) {
  62. state->initialized = true;
  63. state->stream = LZ4_createStreamDecode();
  64. simple_ring_buffer_make_room(&state->output, 65536 + COMPRESSION_MAX_CHUNK * 2);
  65. }
  66. }
  67. void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state) {
  68. if (state->stream) {
  69. LZ4_freeStreamDecode(state->stream);
  70. state->stream = NULL;
  71. }
  72. }
  73. /*
  74. * Decompress the compressed data in the internal buffer
  75. * Return the size of uncompressed data or 0 for error
  76. */
  77. size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
  78. if (unlikely(!state || !compressed_data || !compressed_size))
  79. return 0;
  80. // The state.output ring buffer is always EMPTY at this point,
  81. // meaning that (state->output.read_pos == state->output.write_pos)
  82. // However, THEY ARE NOT ZERO.
  83. if (unlikely(state->output.write_pos + COMPRESSION_MAX_CHUNK > state->output.size))
  84. // the input buffer cannot fit out data, restart from zero
  85. simple_ring_buffer_reset(&state->output);
  86. long int decompressed_size = LZ4_decompress_safe_continue(
  87. state->stream
  88. , compressed_data
  89. , (char *)(state->output.data + state->output.write_pos)
  90. , (int)compressed_size
  91. , (int)(state->output.size - state->output.write_pos)
  92. );
  93. if (unlikely(decompressed_size < 0)) {
  94. netdata_log_error("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() returned negative value: %ld "
  95. "(compressed chunk is %zu bytes)"
  96. , decompressed_size, compressed_size);
  97. return 0;
  98. }
  99. if(unlikely(decompressed_size + state->output.write_pos > state->output.size))
  100. fatal("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() overflown the stream_buffer "
  101. "(size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu)"
  102. , state->output.size
  103. , state->output.write_pos
  104. , decompressed_size
  105. , (size_t)(state->output.write_pos + decompressed_size - state->output.size)
  106. );
  107. state->output.write_pos += decompressed_size;
  108. // statistics
  109. state->total_compressed += compressed_size;
  110. state->total_uncompressed += decompressed_size;
  111. state->total_compressions++;
  112. return decompressed_size;
  113. }
  114. #endif // ENABLE_LZ4