compression_gzip.c 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "compression_gzip.h"
  3. #include <zlib.h>
  4. void rrdpush_compressor_init_gzip(struct compressor_state *state) {
  5. if (!state->initialized) {
  6. state->initialized = true;
  7. // Initialize deflate stream
  8. z_stream *strm = state->stream = (z_stream *) mallocz(sizeof(z_stream));
  9. strm->zalloc = Z_NULL;
  10. strm->zfree = Z_NULL;
  11. strm->opaque = Z_NULL;
  12. if(state->level < Z_BEST_SPEED)
  13. state->level = Z_BEST_SPEED;
  14. if(state->level > Z_BEST_COMPRESSION)
  15. state->level = Z_BEST_COMPRESSION;
  16. // int r = deflateInit2(strm, Z_BEST_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
  17. int r = deflateInit2(strm, state->level, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
  18. if (r != Z_OK) {
  19. netdata_log_error("Failed to initialize deflate with error: %d", r);
  20. freez(state->stream);
  21. state->initialized = false;
  22. return;
  23. }
  24. }
  25. }
  26. void rrdpush_compressor_destroy_gzip(struct compressor_state *state) {
  27. if (state->stream) {
  28. deflateEnd(state->stream);
  29. freez(state->stream);
  30. state->stream = NULL;
  31. }
  32. }
  33. size_t rrdpush_compress_gzip(struct compressor_state *state, const char *data, size_t size, const char **out) {
  34. if (unlikely(!state || !size || !out))
  35. return 0;
  36. simple_ring_buffer_make_room(&state->output, deflateBound(state->stream, size));
  37. z_stream *strm = state->stream;
  38. strm->avail_in = (uInt)size;
  39. strm->next_in = (Bytef *)data;
  40. strm->avail_out = (uInt)state->output.size;
  41. strm->next_out = (Bytef *)state->output.data;
  42. int ret = deflate(strm, Z_SYNC_FLUSH);
  43. if (ret != Z_OK && ret != Z_STREAM_END) {
  44. netdata_log_error("STREAM: deflate() failed with error %d", ret);
  45. return 0;
  46. }
  47. if(strm->avail_in != 0) {
  48. netdata_log_error("STREAM: deflate() did not use all the input buffer, %u bytes out of %zu remain",
  49. strm->avail_in, size);
  50. return 0;
  51. }
  52. if(strm->avail_out == 0) {
  53. netdata_log_error("STREAM: deflate() needs a bigger output buffer than the one we provided "
  54. "(output buffer %zu bytes, compressed payload %zu bytes)",
  55. state->output.size, size);
  56. return 0;
  57. }
  58. size_t compressed_data_size = state->output.size - strm->avail_out;
  59. if(compressed_data_size == 0) {
  60. netdata_log_error("STREAM: deflate() did not produce any output "
  61. "(output buffer %zu bytes, compressed payload %zu bytes)",
  62. state->output.size, size);
  63. return 0;
  64. }
  65. state->sender_locked.total_compressions++;
  66. state->sender_locked.total_uncompressed += size;
  67. state->sender_locked.total_compressed += compressed_data_size;
  68. *out = state->output.data;
  69. return compressed_data_size;
  70. }
  71. void rrdpush_decompressor_init_gzip(struct decompressor_state *state) {
  72. if (!state->initialized) {
  73. state->initialized = true;
  74. // Initialize inflate stream
  75. z_stream *strm = state->stream = (z_stream *)mallocz(sizeof(z_stream));
  76. strm->zalloc = Z_NULL;
  77. strm->zfree = Z_NULL;
  78. strm->opaque = Z_NULL;
  79. int r = inflateInit2(strm, 15 + 16);
  80. if (r != Z_OK) {
  81. netdata_log_error("Failed to initialize inflateInit2() with error: %d", r);
  82. freez(state->stream);
  83. state->initialized = false;
  84. return;
  85. }
  86. simple_ring_buffer_make_room(&state->output, COMPRESSION_MAX_CHUNK);
  87. }
  88. }
  89. void rrdpush_decompressor_destroy_gzip(struct decompressor_state *state) {
  90. if (state->stream) {
  91. inflateEnd(state->stream);
  92. freez(state->stream);
  93. state->stream = NULL;
  94. }
  95. }
  96. size_t rrdpush_decompress_gzip(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
  97. if (unlikely(!state || !compressed_data || !compressed_size))
  98. return 0;
  99. // The state.output ring buffer is always EMPTY at this point,
  100. // meaning that (state->output.read_pos == state->output.write_pos)
  101. // However, THEY ARE NOT ZERO.
  102. z_stream *strm = state->stream;
  103. strm->avail_in = (uInt)compressed_size;
  104. strm->next_in = (Bytef *)compressed_data;
  105. strm->avail_out = (uInt)state->output.size;
  106. strm->next_out = (Bytef *)state->output.data;
  107. int ret = inflate(strm, Z_SYNC_FLUSH);
  108. if (ret != Z_STREAM_END && ret != Z_OK) {
  109. netdata_log_error("RRDPUSH DECOMPRESS: inflate() failed with error %d", ret);
  110. return 0;
  111. }
  112. if(strm->avail_in != 0) {
  113. netdata_log_error("RRDPUSH DECOMPRESS: inflate() did not use all compressed data we provided "
  114. "(compressed payload %zu bytes, remaining to be uncompressed %u)"
  115. , compressed_size, strm->avail_in);
  116. return 0;
  117. }
  118. if(strm->avail_out == 0) {
  119. netdata_log_error("RRDPUSH DECOMPRESS: inflate() needs a bigger output buffer than the one we provided "
  120. "(compressed payload %zu bytes, output buffer size %zu bytes)"
  121. , compressed_size, state->output.size);
  122. return 0;
  123. }
  124. size_t decompressed_size = state->output.size - strm->avail_out;
  125. state->output.read_pos = 0;
  126. state->output.write_pos = decompressed_size;
  127. state->total_compressed += compressed_size;
  128. state->total_uncompressed += decompressed_size;
  129. state->total_compressions++;
  130. return decompressed_size;
  131. }