compression_brotli.c 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "compression_brotli.h"
  3. #ifdef ENABLE_BROTLI
  4. #include <brotli/encode.h>
  5. #include <brotli/decode.h>
  6. void rrdpush_compressor_init_brotli(struct compressor_state *state) {
  7. if (!state->initialized) {
  8. state->initialized = true;
  9. state->stream = BrotliEncoderCreateInstance(NULL, NULL, NULL);
  10. if (state->level < BROTLI_MIN_QUALITY) {
  11. state->level = BROTLI_MIN_QUALITY;
  12. } else if (state->level > BROTLI_MAX_QUALITY) {
  13. state->level = BROTLI_MAX_QUALITY;
  14. }
  15. BrotliEncoderSetParameter(state->stream, BROTLI_PARAM_QUALITY, state->level);
  16. }
  17. }
  18. void rrdpush_compressor_destroy_brotli(struct compressor_state *state) {
  19. if (state->stream) {
  20. BrotliEncoderDestroyInstance(state->stream);
  21. state->stream = NULL;
  22. }
  23. }
  24. size_t rrdpush_compress_brotli(struct compressor_state *state, const char *data, size_t size, const char **out) {
  25. if (unlikely(!state || !size || !out))
  26. return 0;
  27. simple_ring_buffer_make_room(&state->output, MAX(BrotliEncoderMaxCompressedSize(size), COMPRESSION_MAX_CHUNK));
  28. size_t available_out = state->output.size;
  29. size_t available_in = size;
  30. const uint8_t *next_in = (const uint8_t *)data;
  31. uint8_t *next_out = (uint8_t *)state->output.data;
  32. if (!BrotliEncoderCompressStream(state->stream, BROTLI_OPERATION_FLUSH, &available_in, &next_in, &available_out, &next_out, NULL)) {
  33. netdata_log_error("STREAM: Brotli compression failed.");
  34. return 0;
  35. }
  36. if(available_in != 0) {
  37. netdata_log_error("STREAM: BrotliEncoderCompressStream() did not use all the input buffer, %zu bytes out of %zu remain",
  38. available_in, size);
  39. return 0;
  40. }
  41. size_t compressed_size = state->output.size - available_out;
  42. if(available_out == 0) {
  43. netdata_log_error("STREAM: BrotliEncoderCompressStream() needs a bigger output buffer than the one we provided "
  44. "(output buffer %zu bytes, compressed payload %zu bytes)",
  45. state->output.size, size);
  46. return 0;
  47. }
  48. if(compressed_size == 0) {
  49. netdata_log_error("STREAM: BrotliEncoderCompressStream() did not produce any output from the input provided "
  50. "(input buffer %zu bytes)",
  51. size);
  52. return 0;
  53. }
  54. state->sender_locked.total_compressions++;
  55. state->sender_locked.total_uncompressed += size - available_in;
  56. state->sender_locked.total_compressed += compressed_size;
  57. *out = state->output.data;
  58. return compressed_size;
  59. }
  60. void rrdpush_decompressor_init_brotli(struct decompressor_state *state) {
  61. if (!state->initialized) {
  62. state->initialized = true;
  63. state->stream = BrotliDecoderCreateInstance(NULL, NULL, NULL);
  64. simple_ring_buffer_make_room(&state->output, COMPRESSION_MAX_CHUNK);
  65. }
  66. }
  67. void rrdpush_decompressor_destroy_brotli(struct decompressor_state *state) {
  68. if (state->stream) {
  69. BrotliDecoderDestroyInstance(state->stream);
  70. state->stream = NULL;
  71. }
  72. }
  73. size_t rrdpush_decompress_brotli(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
  74. if (unlikely(!state || !compressed_data || !compressed_size))
  75. return 0;
  76. // The state.output ring buffer is always EMPTY at this point,
  77. // meaning that (state->output.read_pos == state->output.write_pos)
  78. // However, THEY ARE NOT ZERO.
  79. size_t available_out = state->output.size;
  80. size_t available_in = compressed_size;
  81. const uint8_t *next_in = (const uint8_t *)compressed_data;
  82. uint8_t *next_out = (uint8_t *)state->output.data;
  83. if (BrotliDecoderDecompressStream(state->stream, &available_in, &next_in, &available_out, &next_out, NULL) == BROTLI_DECODER_RESULT_ERROR) {
  84. netdata_log_error("STREAM: Brotli decompression failed.");
  85. return 0;
  86. }
  87. if(available_in != 0) {
  88. netdata_log_error("STREAM: BrotliDecoderDecompressStream() did not use all the input buffer, %zu bytes out of %zu remain",
  89. available_in, compressed_size);
  90. return 0;
  91. }
  92. size_t decompressed_size = state->output.size - available_out;
  93. if(available_out == 0) {
  94. netdata_log_error("STREAM: BrotliDecoderDecompressStream() needs a bigger output buffer than the one we provided "
  95. "(output buffer %zu bytes, compressed payload %zu bytes)",
  96. state->output.size, compressed_size);
  97. return 0;
  98. }
  99. if(decompressed_size == 0) {
  100. netdata_log_error("STREAM: BrotliDecoderDecompressStream() did not produce any output from the input provided "
  101. "(input buffer %zu bytes)",
  102. compressed_size);
  103. return 0;
  104. }
  105. state->output.read_pos = 0;
  106. state->output.write_pos = decompressed_size;
  107. state->total_compressed += compressed_size - available_in;
  108. state->total_uncompressed += decompressed_size;
  109. state->total_compressions++;
  110. return decompressed_size;
  111. }
  112. #endif // ENABLE_BROTLI