123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #include "compression_lz4.h"
- #ifdef ENABLE_LZ4
- #include "lz4.h"
- // ----------------------------------------------------------------------------
- // compress
- void rrdpush_compressor_init_lz4(struct compressor_state *state) {
- if(!state->initialized) {
- state->initialized = true;
- state->stream = LZ4_createStream();
- // LZ4 needs access to the last 64KB of source data
- // so, we keep twice the size of each message
- simple_ring_buffer_make_room(&state->input, 65536 + COMPRESSION_MAX_CHUNK * 2);
- }
- }
- void rrdpush_compressor_destroy_lz4(struct compressor_state *state) {
- if (state->stream) {
- LZ4_freeStream(state->stream);
- state->stream = NULL;
- }
- }
- /*
- * Compress the given block of data
- * Compressed data will remain in the internal buffer until the next invocation
- * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
- * or 0 in case of error
- */
- size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out) {
- if(unlikely(!state || !size || !out))
- return 0;
- // we need to keep the last 64K of our previous source data
- // as they were in the ring buffer
- simple_ring_buffer_make_room(&state->output, LZ4_COMPRESSBOUND(size));
- if(state->input.write_pos + size > state->input.size)
- // the input buffer cannot fit out data, restart from zero
- simple_ring_buffer_reset(&state->input);
- simple_ring_buffer_append_data(&state->input, data, size);
- long int compressed_data_size = LZ4_compress_fast_continue(
- state->stream,
- state->input.data + state->input.read_pos,
- (char *)state->output.data,
- (int)(state->input.write_pos - state->input.read_pos),
- (int)state->output.size,
- state->level);
- if (compressed_data_size <= 0) {
- netdata_log_error("STREAM: LZ4_compress_fast_continue() returned %ld "
- "(source is %zu bytes, output buffer can fit %zu bytes)",
- compressed_data_size, size, state->output.size);
- return 0;
- }
- state->input.read_pos = state->input.write_pos;
- state->sender_locked.total_compressions++;
- state->sender_locked.total_uncompressed += size;
- state->sender_locked.total_compressed += compressed_data_size;
- *out = state->output.data;
- return compressed_data_size;
- }
- // ----------------------------------------------------------------------------
- // decompress
- void rrdpush_decompressor_init_lz4(struct decompressor_state *state) {
- if(!state->initialized) {
- state->initialized = true;
- state->stream = LZ4_createStreamDecode();
- simple_ring_buffer_make_room(&state->output, 65536 + COMPRESSION_MAX_CHUNK * 2);
- }
- }
- void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state) {
- if (state->stream) {
- LZ4_freeStreamDecode(state->stream);
- state->stream = NULL;
- }
- }
- /*
- * Decompress the compressed data in the internal buffer
- * Return the size of uncompressed data or 0 for error
- */
- size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
- if (unlikely(!state || !compressed_data || !compressed_size))
- return 0;
- // The state.output ring buffer is always EMPTY at this point,
- // meaning that (state->output.read_pos == state->output.write_pos)
- // However, THEY ARE NOT ZERO.
- if (unlikely(state->output.write_pos + COMPRESSION_MAX_CHUNK > state->output.size))
- // the input buffer cannot fit out data, restart from zero
- simple_ring_buffer_reset(&state->output);
- long int decompressed_size = LZ4_decompress_safe_continue(
- state->stream
- , compressed_data
- , (char *)(state->output.data + state->output.write_pos)
- , (int)compressed_size
- , (int)(state->output.size - state->output.write_pos)
- );
- if (unlikely(decompressed_size < 0)) {
- netdata_log_error("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() returned negative value: %ld "
- "(compressed chunk is %zu bytes)"
- , decompressed_size, compressed_size);
- return 0;
- }
- if(unlikely(decompressed_size + state->output.write_pos > state->output.size))
- fatal("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() overflown the stream_buffer "
- "(size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu)"
- , state->output.size
- , state->output.write_pos
- , decompressed_size
- , (size_t)(state->output.write_pos + decompressed_size - state->output.size)
- );
- state->output.write_pos += decompressed_size;
- // statistics
- state->total_compressed += compressed_size;
- state->total_uncompressed += decompressed_size;
- state->total_compressions++;
- return decompressed_size;
- }
- #endif // ENABLE_LZ4
|