compression.h 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdpush.h"
  3. #ifndef NETDATA_RRDPUSH_COMPRESSION_H
  4. #define NETDATA_RRDPUSH_COMPRESSION_H 1
  5. // signature MUST end with a newline
  6. #if COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)
  7. #error "COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)"
  8. #endif
  9. typedef uint32_t rrdpush_signature_t;
  10. #define RRDPUSH_COMPRESSION_SIGNATURE ((rrdpush_signature_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
  11. #define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((rrdpush_signature_t) 0xffU | (0x80U << 8) | (0x80U << 16) | (0xffU << 24))
  12. #define RRDPUSH_COMPRESSION_SIGNATURE_SIZE sizeof(rrdpush_signature_t)
  13. static inline rrdpush_signature_t rrdpush_compress_encode_signature(size_t compressed_data_size) {
  14. rrdpush_signature_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
  15. return len | RRDPUSH_COMPRESSION_SIGNATURE;
  16. }
  17. typedef enum {
  18. COMPRESSION_ALGORITHM_NONE = 0,
  19. COMPRESSION_ALGORITHM_ZSTD,
  20. COMPRESSION_ALGORITHM_LZ4,
  21. COMPRESSION_ALGORITHM_GZIP,
  22. COMPRESSION_ALGORITHM_BROTLI,
  23. // terminator
  24. COMPRESSION_ALGORITHM_MAX,
  25. } compression_algorithm_t;
  26. extern int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX];
  27. // this defines the order the algorithms will be selected by the receiver (parent)
  28. #define RRDPUSH_COMPRESSION_ALGORITHMS_ORDER "zstd lz4 brotli gzip"
  29. // ----------------------------------------------------------------------------
  30. typedef struct simple_ring_buffer {
  31. const char *data;
  32. size_t size;
  33. size_t read_pos;
  34. size_t write_pos;
  35. } SIMPLE_RING_BUFFER;
  36. static inline void simple_ring_buffer_reset(SIMPLE_RING_BUFFER *b) {
  37. b->read_pos = b->write_pos = 0;
  38. }
  39. static inline void simple_ring_buffer_make_room(SIMPLE_RING_BUFFER *b, size_t size) {
  40. if(b->write_pos + size > b->size) {
  41. if(!b->size)
  42. b->size = COMPRESSION_MAX_CHUNK;
  43. else
  44. b->size *= 2;
  45. if(b->write_pos + size > b->size)
  46. b->size += size;
  47. b->data = (const char *)reallocz((void *)b->data, b->size);
  48. }
  49. }
  50. static inline void simple_ring_buffer_append_data(SIMPLE_RING_BUFFER *b, const void *data, size_t size) {
  51. simple_ring_buffer_make_room(b, size);
  52. memcpy((void *)(b->data + b->write_pos), data, size);
  53. b->write_pos += size;
  54. }
  55. static inline void simple_ring_buffer_destroy(SIMPLE_RING_BUFFER *b) {
  56. freez((void *)b->data);
  57. b->data = NULL;
  58. b->read_pos = b->write_pos = b->size = 0;
  59. }
  60. // ----------------------------------------------------------------------------
  61. struct compressor_state {
  62. bool initialized;
  63. compression_algorithm_t algorithm;
  64. SIMPLE_RING_BUFFER input;
  65. SIMPLE_RING_BUFFER output;
  66. int level;
  67. void *stream;
  68. struct {
  69. size_t total_compressed;
  70. size_t total_uncompressed;
  71. size_t total_compressions;
  72. } sender_locked;
  73. };
  74. void rrdpush_compressor_init(struct compressor_state *state);
  75. void rrdpush_compressor_destroy(struct compressor_state *state);
  76. size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out);
  77. // ----------------------------------------------------------------------------
  78. struct decompressor_state {
  79. bool initialized;
  80. compression_algorithm_t algorithm;
  81. size_t signature_size;
  82. size_t total_compressed;
  83. size_t total_uncompressed;
  84. size_t total_compressions;
  85. SIMPLE_RING_BUFFER output;
  86. void *stream;
  87. };
  88. void rrdpush_decompressor_destroy(struct decompressor_state *state);
  89. void rrdpush_decompressor_init(struct decompressor_state *state);
  90. size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
  91. static inline size_t rrdpush_decompress_decode_signature(const char *data, size_t data_size) {
  92. if (unlikely(!data || !data_size))
  93. return 0;
  94. if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
  95. return 0;
  96. rrdpush_signature_t sign = *(rrdpush_signature_t *)data;
  97. if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
  98. return 0;
  99. size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
  100. return length;
  101. }
  102. static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
  103. if(unlikely(state->output.read_pos != state->output.write_pos))
  104. fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
  105. return rrdpush_decompress_decode_signature(header, header_size);
  106. }
  107. static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
  108. if(unlikely(state->output.read_pos > state->output.write_pos))
  109. fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
  110. return state->output.write_pos - state->output.read_pos;
  111. }
  112. static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
  113. if (unlikely(!state || !size || !dst))
  114. return 0;
  115. size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);
  116. if(unlikely(!remaining))
  117. return 0;
  118. size_t bytes_to_return = size;
  119. if(bytes_to_return > remaining)
  120. bytes_to_return = remaining;
  121. memcpy(dst, state->output.data + state->output.read_pos, bytes_to_return);
  122. state->output.read_pos += bytes_to_return;
  123. if(unlikely(state->output.read_pos > state->output.write_pos))
  124. fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
  125. return bytes_to_return;
  126. }
  127. // ----------------------------------------------------------------------------
  128. #endif // NETDATA_RRDPUSH_COMPRESSION_H 1