compression.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "compression.h"
  3. #include "compression_gzip.h"
  4. #ifdef ENABLE_LZ4
  5. #include "compression_lz4.h"
  6. #endif
  7. #ifdef ENABLE_ZSTD
  8. #include "compression_zstd.h"
  9. #endif
  10. #ifdef ENABLE_BROTLI
  11. #include "compression_brotli.h"
  12. #endif
  13. int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX] = {
  14. [COMPRESSION_ALGORITHM_NONE] = 0,
  15. [COMPRESSION_ALGORITHM_ZSTD] = 3, // 1 (faster) - 22 (smaller)
  16. [COMPRESSION_ALGORITHM_LZ4] = 1, // 1 (smaller) - 9 (faster)
  17. [COMPRESSION_ALGORITHM_BROTLI] = 3, // 0 (faster) - 11 (smaller)
  18. [COMPRESSION_ALGORITHM_GZIP] = 1, // 1 (faster) - 9 (smaller)
  19. };
  20. void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order) {
  21. // empty all slots
  22. for(size_t i = 0; i < COMPRESSION_ALGORITHM_MAX ;i++)
  23. rpt->config.compression_priorities[i] = STREAM_CAP_NONE;
  24. char *s = strdupz(order);
  25. char *words[COMPRESSION_ALGORITHM_MAX + 100] = { NULL };
  26. size_t num_words = quoted_strings_splitter_pluginsd(s, words, COMPRESSION_ALGORITHM_MAX + 100);
  27. size_t slot = 0;
  28. STREAM_CAPABILITIES added = STREAM_CAP_NONE;
  29. for(size_t i = 0; i < num_words && slot < COMPRESSION_ALGORITHM_MAX ;i++) {
  30. if((STREAM_CAP_ZSTD_AVAILABLE) && strcasecmp(words[i], "zstd") == 0 && !(added & STREAM_CAP_ZSTD)) {
  31. rpt->config.compression_priorities[slot++] = STREAM_CAP_ZSTD;
  32. added |= STREAM_CAP_ZSTD;
  33. }
  34. else if((STREAM_CAP_LZ4_AVAILABLE) && strcasecmp(words[i], "lz4") == 0 && !(added & STREAM_CAP_LZ4)) {
  35. rpt->config.compression_priorities[slot++] = STREAM_CAP_LZ4;
  36. added |= STREAM_CAP_LZ4;
  37. }
  38. else if((STREAM_CAP_BROTLI_AVAILABLE) && strcasecmp(words[i], "brotli") == 0 && !(added & STREAM_CAP_BROTLI)) {
  39. rpt->config.compression_priorities[slot++] = STREAM_CAP_BROTLI;
  40. added |= STREAM_CAP_BROTLI;
  41. }
  42. else if(strcasecmp(words[i], "gzip") == 0 && !(added & STREAM_CAP_GZIP)) {
  43. rpt->config.compression_priorities[slot++] = STREAM_CAP_GZIP;
  44. added |= STREAM_CAP_GZIP;
  45. }
  46. }
  47. freez(s);
  48. // make sure all participate
  49. if((STREAM_CAP_ZSTD_AVAILABLE) && slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_ZSTD))
  50. rpt->config.compression_priorities[slot++] = STREAM_CAP_ZSTD;
  51. if((STREAM_CAP_LZ4_AVAILABLE) && slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_LZ4))
  52. rpt->config.compression_priorities[slot++] = STREAM_CAP_LZ4;
  53. if((STREAM_CAP_BROTLI_AVAILABLE) && slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_BROTLI))
  54. rpt->config.compression_priorities[slot++] = STREAM_CAP_BROTLI;
  55. if(slot < COMPRESSION_ALGORITHM_MAX && !(added & STREAM_CAP_GZIP))
  56. rpt->config.compression_priorities[slot++] = STREAM_CAP_GZIP;
  57. }
  58. void rrdpush_select_receiver_compression_algorithm(struct receiver_state *rpt) {
  59. if (!rpt->config.rrdpush_compression)
  60. rpt->capabilities &= ~STREAM_CAP_COMPRESSIONS_AVAILABLE;
  61. // select the right compression before sending our capabilities to the child
  62. if(stream_has_more_than_one_capability_of(rpt->capabilities, STREAM_CAP_COMPRESSIONS_AVAILABLE)) {
  63. STREAM_CAPABILITIES compressions = rpt->capabilities & STREAM_CAP_COMPRESSIONS_AVAILABLE;
  64. for(int i = 0; i < COMPRESSION_ALGORITHM_MAX; i++) {
  65. STREAM_CAPABILITIES c = rpt->config.compression_priorities[i];
  66. if(!(c & STREAM_CAP_COMPRESSIONS_AVAILABLE))
  67. continue;
  68. if(compressions & c) {
  69. STREAM_CAPABILITIES exclude = compressions;
  70. exclude &= ~c;
  71. rpt->capabilities &= ~exclude;
  72. break;
  73. }
  74. }
  75. }
  76. }
  77. bool rrdpush_compression_initialize(struct sender_state *s) {
  78. rrdpush_compressor_destroy(&s->compressor);
  79. // IMPORTANT
  80. // KEEP THE SAME ORDER IN DECOMPRESSION
  81. if(stream_has_capability(s, STREAM_CAP_ZSTD))
  82. s->compressor.algorithm = COMPRESSION_ALGORITHM_ZSTD;
  83. else if(stream_has_capability(s, STREAM_CAP_LZ4))
  84. s->compressor.algorithm = COMPRESSION_ALGORITHM_LZ4;
  85. else if(stream_has_capability(s, STREAM_CAP_BROTLI))
  86. s->compressor.algorithm = COMPRESSION_ALGORITHM_BROTLI;
  87. else if(stream_has_capability(s, STREAM_CAP_GZIP))
  88. s->compressor.algorithm = COMPRESSION_ALGORITHM_GZIP;
  89. else
  90. s->compressor.algorithm = COMPRESSION_ALGORITHM_NONE;
  91. if(s->compressor.algorithm != COMPRESSION_ALGORITHM_NONE) {
  92. s->compressor.level = rrdpush_compression_levels[s->compressor.algorithm];
  93. rrdpush_compressor_init(&s->compressor);
  94. return true;
  95. }
  96. return false;
  97. }
  98. bool rrdpush_decompression_initialize(struct receiver_state *rpt) {
  99. rrdpush_decompressor_destroy(&rpt->decompressor);
  100. // IMPORTANT
  101. // KEEP THE SAME ORDER IN COMPRESSION
  102. if(stream_has_capability(rpt, STREAM_CAP_ZSTD))
  103. rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_ZSTD;
  104. else if(stream_has_capability(rpt, STREAM_CAP_LZ4))
  105. rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_LZ4;
  106. else if(stream_has_capability(rpt, STREAM_CAP_BROTLI))
  107. rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_BROTLI;
  108. else if(stream_has_capability(rpt, STREAM_CAP_GZIP))
  109. rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_GZIP;
  110. else
  111. rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_NONE;
  112. if(rpt->decompressor.algorithm != COMPRESSION_ALGORITHM_NONE) {
  113. rrdpush_decompressor_init(&rpt->decompressor);
  114. return true;
  115. }
  116. return false;
  117. }
  118. /*
  119. * In case of stream compression buffer overflow
  120. * Inform the user through the error log file and
  121. * deactivate compression by downgrading the stream protocol.
  122. */
  123. void rrdpush_compression_deactivate(struct sender_state *s) {
  124. switch(s->compressor.algorithm) {
  125. case COMPRESSION_ALGORITHM_MAX:
  126. case COMPRESSION_ALGORITHM_NONE:
  127. netdata_log_error("STREAM_COMPRESSION: compression error on 'host:%s' without any compression enabled. Ignoring error.",
  128. rrdhost_hostname(s->host));
  129. break;
  130. case COMPRESSION_ALGORITHM_GZIP:
  131. netdata_log_error("STREAM_COMPRESSION: GZIP compression error on 'host:%s'. Disabling GZIP for this node.",
  132. rrdhost_hostname(s->host));
  133. s->disabled_capabilities |= STREAM_CAP_GZIP;
  134. break;
  135. case COMPRESSION_ALGORITHM_LZ4:
  136. netdata_log_error("STREAM_COMPRESSION: LZ4 compression error on 'host:%s'. Disabling ZSTD for this node.",
  137. rrdhost_hostname(s->host));
  138. s->disabled_capabilities |= STREAM_CAP_LZ4;
  139. break;
  140. case COMPRESSION_ALGORITHM_ZSTD:
  141. netdata_log_error("STREAM_COMPRESSION: ZSTD compression error on 'host:%s'. Disabling ZSTD for this node.",
  142. rrdhost_hostname(s->host));
  143. s->disabled_capabilities |= STREAM_CAP_ZSTD;
  144. break;
  145. case COMPRESSION_ALGORITHM_BROTLI:
  146. netdata_log_error("STREAM_COMPRESSION: BROTLI compression error on 'host:%s'. Disabling BROTLI for this node.",
  147. rrdhost_hostname(s->host));
  148. s->disabled_capabilities |= STREAM_CAP_BROTLI;
  149. break;
  150. }
  151. }
  152. // ----------------------------------------------------------------------------
  153. // compressor public API
  154. void rrdpush_compressor_init(struct compressor_state *state) {
  155. switch(state->algorithm) {
  156. #ifdef ENABLE_ZSTD
  157. case COMPRESSION_ALGORITHM_ZSTD:
  158. rrdpush_compressor_init_zstd(state);
  159. break;
  160. #endif
  161. #ifdef ENABLE_LZ4
  162. case COMPRESSION_ALGORITHM_LZ4:
  163. rrdpush_compressor_init_lz4(state);
  164. break;
  165. #endif
  166. #ifdef ENABLE_BROTLI
  167. case COMPRESSION_ALGORITHM_BROTLI:
  168. rrdpush_compressor_init_brotli(state);
  169. break;
  170. #endif
  171. default:
  172. case COMPRESSION_ALGORITHM_GZIP:
  173. rrdpush_compressor_init_gzip(state);
  174. break;
  175. }
  176. simple_ring_buffer_reset(&state->input);
  177. simple_ring_buffer_reset(&state->output);
  178. }
  179. void rrdpush_compressor_destroy(struct compressor_state *state) {
  180. switch(state->algorithm) {
  181. #ifdef ENABLE_ZSTD
  182. case COMPRESSION_ALGORITHM_ZSTD:
  183. rrdpush_compressor_destroy_zstd(state);
  184. break;
  185. #endif
  186. #ifdef ENABLE_LZ4
  187. case COMPRESSION_ALGORITHM_LZ4:
  188. rrdpush_compressor_destroy_lz4(state);
  189. break;
  190. #endif
  191. #ifdef ENABLE_BROTLI
  192. case COMPRESSION_ALGORITHM_BROTLI:
  193. rrdpush_compressor_destroy_brotli(state);
  194. break;
  195. #endif
  196. default:
  197. case COMPRESSION_ALGORITHM_GZIP:
  198. rrdpush_compressor_destroy_gzip(state);
  199. break;
  200. }
  201. state->initialized = false;
  202. simple_ring_buffer_destroy(&state->input);
  203. simple_ring_buffer_destroy(&state->output);
  204. }
  205. size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out) {
  206. size_t ret = 0;
  207. switch(state->algorithm) {
  208. #ifdef ENABLE_ZSTD
  209. case COMPRESSION_ALGORITHM_ZSTD:
  210. ret = rrdpush_compress_zstd(state, data, size, out);
  211. break;
  212. #endif
  213. #ifdef ENABLE_LZ4
  214. case COMPRESSION_ALGORITHM_LZ4:
  215. ret = rrdpush_compress_lz4(state, data, size, out);
  216. break;
  217. #endif
  218. #ifdef ENABLE_BROTLI
  219. case COMPRESSION_ALGORITHM_BROTLI:
  220. ret = rrdpush_compress_brotli(state, data, size, out);
  221. break;
  222. #endif
  223. default:
  224. case COMPRESSION_ALGORITHM_GZIP:
  225. ret = rrdpush_compress_gzip(state, data, size, out);
  226. break;
  227. }
  228. if(unlikely(ret >= COMPRESSION_MAX_CHUNK)) {
  229. netdata_log_error("RRDPUSH_COMPRESS: compressed data is %zu bytes, which is >= than the max chunk size %d",
  230. ret, COMPRESSION_MAX_CHUNK);
  231. return 0;
  232. }
  233. return ret;
  234. }
  235. // ----------------------------------------------------------------------------
  236. // decompressor public API
  237. void rrdpush_decompressor_destroy(struct decompressor_state *state) {
  238. if(unlikely(!state->initialized))
  239. return;
  240. switch(state->algorithm) {
  241. #ifdef ENABLE_ZSTD
  242. case COMPRESSION_ALGORITHM_ZSTD:
  243. rrdpush_decompressor_destroy_zstd(state);
  244. break;
  245. #endif
  246. #ifdef ENABLE_LZ4
  247. case COMPRESSION_ALGORITHM_LZ4:
  248. rrdpush_decompressor_destroy_lz4(state);
  249. break;
  250. #endif
  251. #ifdef ENABLE_BROTLI
  252. case COMPRESSION_ALGORITHM_BROTLI:
  253. rrdpush_decompressor_destroy_brotli(state);
  254. break;
  255. #endif
  256. default:
  257. case COMPRESSION_ALGORITHM_GZIP:
  258. rrdpush_decompressor_destroy_gzip(state);
  259. break;
  260. }
  261. simple_ring_buffer_destroy(&state->output);
  262. state->initialized = false;
  263. }
  264. void rrdpush_decompressor_init(struct decompressor_state *state) {
  265. switch(state->algorithm) {
  266. #ifdef ENABLE_ZSTD
  267. case COMPRESSION_ALGORITHM_ZSTD:
  268. rrdpush_decompressor_init_zstd(state);
  269. break;
  270. #endif
  271. #ifdef ENABLE_LZ4
  272. case COMPRESSION_ALGORITHM_LZ4:
  273. rrdpush_decompressor_init_lz4(state);
  274. break;
  275. #endif
  276. #ifdef ENABLE_BROTLI
  277. case COMPRESSION_ALGORITHM_BROTLI:
  278. rrdpush_decompressor_init_brotli(state);
  279. break;
  280. #endif
  281. default:
  282. case COMPRESSION_ALGORITHM_GZIP:
  283. rrdpush_decompressor_init_gzip(state);
  284. break;
  285. }
  286. state->signature_size = RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
  287. simple_ring_buffer_reset(&state->output);
  288. }
  289. size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
  290. if (unlikely(state->output.read_pos != state->output.write_pos))
  291. fatal("RRDPUSH_DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
  292. size_t ret = 0;
  293. switch(state->algorithm) {
  294. #ifdef ENABLE_ZSTD
  295. case COMPRESSION_ALGORITHM_ZSTD:
  296. ret = rrdpush_decompress_zstd(state, compressed_data, compressed_size);
  297. break;
  298. #endif
  299. #ifdef ENABLE_LZ4
  300. case COMPRESSION_ALGORITHM_LZ4:
  301. ret = rrdpush_decompress_lz4(state, compressed_data, compressed_size);
  302. break;
  303. #endif
  304. #ifdef ENABLE_BROTLI
  305. case COMPRESSION_ALGORITHM_BROTLI:
  306. ret = rrdpush_decompress_brotli(state, compressed_data, compressed_size);
  307. break;
  308. #endif
  309. default:
  310. case COMPRESSION_ALGORITHM_GZIP:
  311. ret = rrdpush_decompress_gzip(state, compressed_data, compressed_size);
  312. break;
  313. }
  314. // for backwards compatibility we cannot check for COMPRESSION_MAX_MSG_SIZE,
  315. // because old children may send this big payloads.
  316. if(unlikely(ret > COMPRESSION_MAX_CHUNK)) {
  317. netdata_log_error("RRDPUSH_DECOMPRESS: decompressed data is %zu bytes, which is bigger than the max msg size %d",
  318. ret, COMPRESSION_MAX_CHUNK);
  319. return 0;
  320. }
  321. return ret;
  322. }
  323. // ----------------------------------------------------------------------------
  324. // unit test
  325. static inline long int my_random (void) {
  326. return random();
  327. }
  328. void unittest_generate_random_name(char *dst, size_t size) {
  329. if(size < 7)
  330. size = 7;
  331. size_t len = 5 + my_random() % (size - 6);
  332. for(size_t i = 0; i < len ; i++) {
  333. if(my_random() % 2 == 0)
  334. dst[i] = 'A' + my_random() % 26;
  335. else
  336. dst[i] = 'a' + my_random() % 26;
  337. }
  338. dst[len] = '\0';
  339. }
  340. void unittest_generate_message(BUFFER *wb, time_t now_s, size_t counter) {
  341. bool with_slots = true;
  342. NUMBER_ENCODING integer_encoding = NUMBER_ENCODING_BASE64;
  343. NUMBER_ENCODING doubles_encoding = NUMBER_ENCODING_BASE64;
  344. time_t update_every = 1;
  345. time_t point_end_time_s = now_s;
  346. time_t wall_clock_time_s = now_s;
  347. size_t chart_slot = counter + 1;
  348. size_t dimensions = 2 + my_random() % 5;
  349. char chart[RRD_ID_LENGTH_MAX + 1] = "name";
  350. unittest_generate_random_name(chart, 5 + my_random() % 30);
  351. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
  352. if(with_slots) {
  353. buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
  354. buffer_print_uint64_encoded(wb, integer_encoding, chart_slot);
  355. }
  356. buffer_fast_strcat(wb, " '", 2);
  357. buffer_strcat(wb, chart);
  358. buffer_fast_strcat(wb, "' ", 2);
  359. buffer_print_uint64_encoded(wb, integer_encoding, update_every);
  360. buffer_fast_strcat(wb, " ", 1);
  361. buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s);
  362. buffer_fast_strcat(wb, " ", 1);
  363. if(point_end_time_s == wall_clock_time_s)
  364. buffer_fast_strcat(wb, "#", 1);
  365. else
  366. buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time_s);
  367. buffer_fast_strcat(wb, "\n", 1);
  368. for(size_t d = 0; d < dimensions ;d++) {
  369. size_t dim_slot = d + 1;
  370. char dim_id[RRD_ID_LENGTH_MAX + 1] = "dimension";
  371. unittest_generate_random_name(dim_id, 10 + my_random() % 20);
  372. int64_t last_collected_value = (my_random() % 2 == 0) ? (int64_t)(counter + d) : (int64_t)my_random();
  373. NETDATA_DOUBLE value = (my_random() % 2 == 0) ? (NETDATA_DOUBLE)my_random() / ((NETDATA_DOUBLE)my_random() + 1) : (NETDATA_DOUBLE)last_collected_value;
  374. SN_FLAGS flags = (my_random() % 1000 == 0) ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS;
  375. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
  376. if(with_slots) {
  377. buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
  378. buffer_print_uint64_encoded(wb, integer_encoding, dim_slot);
  379. }
  380. buffer_fast_strcat(wb, " '", 2);
  381. buffer_strcat(wb, dim_id);
  382. buffer_fast_strcat(wb, "' ", 2);
  383. buffer_print_int64_encoded(wb, integer_encoding, last_collected_value);
  384. buffer_fast_strcat(wb, " ", 1);
  385. if((NETDATA_DOUBLE)last_collected_value == value)
  386. buffer_fast_strcat(wb, "#", 1);
  387. else
  388. buffer_print_netdata_double_encoded(wb, doubles_encoding, value);
  389. buffer_fast_strcat(wb, " ", 1);
  390. buffer_print_sn_flags(wb, flags, true);
  391. buffer_fast_strcat(wb, "\n", 1);
  392. }
  393. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
  394. }
  395. int unittest_rrdpush_compression_speed(compression_algorithm_t algorithm, const char *name) {
  396. fprintf(stderr, "\nTesting streaming compression speed with %s\n", name);
  397. struct compressor_state cctx = {
  398. .initialized = false,
  399. .algorithm = algorithm,
  400. };
  401. struct decompressor_state dctx = {
  402. .initialized = false,
  403. .algorithm = algorithm,
  404. };
  405. rrdpush_compressor_init(&cctx);
  406. rrdpush_decompressor_init(&dctx);
  407. int errors = 0;
  408. BUFFER *wb = buffer_create(COMPRESSION_MAX_MSG_SIZE, NULL);
  409. time_t now_s = now_realtime_sec();
  410. usec_t compression_ut = 0;
  411. usec_t decompression_ut = 0;
  412. size_t bytes_compressed = 0;
  413. size_t bytes_uncompressed = 0;
  414. usec_t compression_started_ut = now_monotonic_usec();
  415. usec_t decompression_started_ut = compression_started_ut;
  416. for(int i = 0; i < 10000 ;i++) {
  417. compression_started_ut = now_monotonic_usec();
  418. decompression_ut += compression_started_ut - decompression_started_ut;
  419. buffer_flush(wb);
  420. while(buffer_strlen(wb) < COMPRESSION_MAX_MSG_SIZE - 1024)
  421. unittest_generate_message(wb, now_s, i);
  422. const char *txt = buffer_tostring(wb);
  423. size_t txt_len = buffer_strlen(wb);
  424. bytes_uncompressed += txt_len;
  425. const char *out;
  426. size_t size = rrdpush_compress(&cctx, txt, txt_len, &out);
  427. bytes_compressed += size;
  428. decompression_started_ut = now_monotonic_usec();
  429. compression_ut += decompression_started_ut - compression_started_ut;
  430. if(size == 0) {
  431. fprintf(stderr, "iteration %d: compressed size %zu is zero\n",
  432. i, size);
  433. errors++;
  434. goto cleanup;
  435. }
  436. else if(size >= COMPRESSION_MAX_CHUNK) {
  437. fprintf(stderr, "iteration %d: compressed size %zu exceeds max allowed size\n",
  438. i, size);
  439. errors++;
  440. goto cleanup;
  441. }
  442. else {
  443. size_t dtxt_len = rrdpush_decompress(&dctx, out, size);
  444. char *dtxt = (char *) &dctx.output.data[dctx.output.read_pos];
  445. if(rrdpush_decompressed_bytes_in_buffer(&dctx) != dtxt_len) {
  446. fprintf(stderr, "iteration %d: decompressed size %zu does not rrdpush_decompressed_bytes_in_buffer() %zu\n",
  447. i, dtxt_len, rrdpush_decompressed_bytes_in_buffer(&dctx)
  448. );
  449. errors++;
  450. goto cleanup;
  451. }
  452. if(!dtxt_len) {
  453. fprintf(stderr, "iteration %d: decompressed size is zero\n", i);
  454. errors++;
  455. goto cleanup;
  456. }
  457. else if(dtxt_len != txt_len) {
  458. fprintf(stderr, "iteration %d: decompressed size %zu does not match original size %zu\n",
  459. i, dtxt_len, txt_len
  460. );
  461. errors++;
  462. goto cleanup;
  463. }
  464. else {
  465. if(memcmp(txt, dtxt, txt_len) != 0) {
  466. fprintf(stderr, "iteration %d: decompressed data '%s' do not match original data length %zu\n",
  467. i, dtxt, txt_len);
  468. errors++;
  469. goto cleanup;
  470. }
  471. }
  472. }
  473. // here we are supposed to copy the data and advance the position
  474. dctx.output.read_pos += rrdpush_decompressed_bytes_in_buffer(&dctx);
  475. }
  476. cleanup:
  477. rrdpush_compressor_destroy(&cctx);
  478. rrdpush_decompressor_destroy(&dctx);
  479. if(errors)
  480. fprintf(stderr, "Compression with %s: FAILED (%d errors)\n", name, errors);
  481. else
  482. fprintf(stderr, "Compression with %s: OK "
  483. "(compression %zu usec, decompression %zu usec, bytes raw %zu, compressed %zu, savings ratio %0.2f%%)\n",
  484. name, compression_ut, decompression_ut,
  485. bytes_uncompressed, bytes_compressed,
  486. 100.0 - (double)bytes_compressed * 100.0 / (double)bytes_uncompressed);
  487. return errors;
  488. }
  489. int unittest_rrdpush_compression(compression_algorithm_t algorithm, const char *name) {
  490. fprintf(stderr, "\nTesting streaming compression with %s\n", name);
  491. struct compressor_state cctx = {
  492. .initialized = false,
  493. .algorithm = algorithm,
  494. };
  495. struct decompressor_state dctx = {
  496. .initialized = false,
  497. .algorithm = algorithm,
  498. };
  499. char txt[COMPRESSION_MAX_MSG_SIZE];
  500. rrdpush_compressor_init(&cctx);
  501. rrdpush_decompressor_init(&dctx);
  502. int errors = 0;
  503. memset(txt, '=', COMPRESSION_MAX_MSG_SIZE);
  504. for(int i = 0; i < COMPRESSION_MAX_MSG_SIZE ;i++) {
  505. txt[i] = 'A' + (i % 26);
  506. size_t txt_len = i + 1;
  507. const char *out;
  508. size_t size = rrdpush_compress(&cctx, txt, txt_len, &out);
  509. if(size == 0) {
  510. fprintf(stderr, "iteration %d: compressed size %zu is zero\n",
  511. i, size);
  512. errors++;
  513. goto cleanup;
  514. }
  515. else if(size >= COMPRESSION_MAX_CHUNK) {
  516. fprintf(stderr, "iteration %d: compressed size %zu exceeds max allowed size\n",
  517. i, size);
  518. errors++;
  519. goto cleanup;
  520. }
  521. else {
  522. size_t dtxt_len = rrdpush_decompress(&dctx, out, size);
  523. char *dtxt = (char *) &dctx.output.data[dctx.output.read_pos];
  524. if(rrdpush_decompressed_bytes_in_buffer(&dctx) != dtxt_len) {
  525. fprintf(stderr, "iteration %d: decompressed size %zu does not rrdpush_decompressed_bytes_in_buffer() %zu\n",
  526. i, dtxt_len, rrdpush_decompressed_bytes_in_buffer(&dctx)
  527. );
  528. errors++;
  529. goto cleanup;
  530. }
  531. if(!dtxt_len) {
  532. fprintf(stderr, "iteration %d: decompressed size is zero\n", i);
  533. errors++;
  534. goto cleanup;
  535. }
  536. else if(dtxt_len != txt_len) {
  537. fprintf(stderr, "iteration %d: decompressed size %zu does not match original size %zu\n",
  538. i, dtxt_len, txt_len
  539. );
  540. errors++;
  541. goto cleanup;
  542. }
  543. else {
  544. if(memcmp(txt, dtxt, txt_len) != 0) {
  545. txt[txt_len] = '\0';
  546. dtxt[txt_len + 5] = '\0';
  547. fprintf(stderr, "iteration %d: decompressed data '%s' do not match original data '%s' of length %zu\n",
  548. i, dtxt, txt, txt_len);
  549. errors++;
  550. goto cleanup;
  551. }
  552. }
  553. }
  554. // fill the compressed buffer with garbage
  555. memset((void *)out, 'x', size);
  556. // here we are supposed to copy the data and advance the position
  557. dctx.output.read_pos += rrdpush_decompressed_bytes_in_buffer(&dctx);
  558. }
  559. cleanup:
  560. rrdpush_compressor_destroy(&cctx);
  561. rrdpush_decompressor_destroy(&dctx);
  562. if(errors)
  563. fprintf(stderr, "Compression with %s: FAILED (%d errors)\n", name, errors);
  564. else
  565. fprintf(stderr, "Compression with %s: OK\n", name);
  566. return errors;
  567. }
  568. int unittest_rrdpush_compressions(void) {
  569. int ret = 0;
  570. ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_ZSTD, "ZSTD");
  571. ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_LZ4, "LZ4");
  572. ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_BROTLI, "BROTLI");
  573. ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_GZIP, "GZIP");
  574. ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_ZSTD, "ZSTD");
  575. ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_LZ4, "LZ4");
  576. ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_BROTLI, "BROTLI");
  577. ret += unittest_rrdpush_compression_speed(COMPRESSION_ALGORITHM_GZIP, "GZIP");
  578. return ret;
  579. }