circular_buffer.c 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. #include "../libnetdata.h"
  2. struct circular_buffer *cbuffer_new(size_t initial, size_t max, size_t *statistics) {
  3. struct circular_buffer *buf = mallocz(sizeof(struct circular_buffer));
  4. buf->size = initial;
  5. buf->data = mallocz(initial);
  6. buf->write = 0;
  7. buf->read = 0;
  8. buf->max_size = max;
  9. buf->statistics = statistics;
  10. if(buf->statistics)
  11. __atomic_add_fetch(buf->statistics, sizeof(struct circular_buffer) + buf->size, __ATOMIC_RELAXED);
  12. return buf;
  13. }
  14. void cbuffer_free(struct circular_buffer *buf) {
  15. if (unlikely(!buf))
  16. return;
  17. if(buf->statistics)
  18. __atomic_sub_fetch(buf->statistics, sizeof(struct circular_buffer) + buf->size, __ATOMIC_RELAXED);
  19. freez(buf->data);
  20. freez(buf);
  21. }
  22. static int cbuffer_realloc_unsafe(struct circular_buffer *buf) {
  23. // Check that we can grow
  24. if (buf->size >= buf->max_size)
  25. return 1;
  26. size_t old_size = buf->size;
  27. size_t new_size = buf->size * 2;
  28. if (new_size > buf->max_size)
  29. new_size = buf->max_size;
  30. // We know that: size < new_size <= max_size
  31. // For simplicity align the current data at the bottom of the new buffer
  32. char *new_data = mallocz(new_size);
  33. if (buf->read == buf->write)
  34. buf->write = 0; // buffer is empty
  35. else if (buf->read < buf->write) {
  36. memcpy(new_data, buf->data + buf->read, buf->write - buf->read);
  37. buf->write -= buf->read;
  38. } else {
  39. size_t top_part = buf->size - buf->read;
  40. memcpy(new_data, buf->data + buf->read, top_part);
  41. memcpy(new_data + top_part, buf->data, buf->write);
  42. buf->write = top_part + buf->write;
  43. }
  44. buf->read = 0;
  45. // Switch buffers
  46. freez(buf->data);
  47. buf->data = new_data;
  48. buf->size = new_size;
  49. if(buf->statistics)
  50. __atomic_add_fetch(buf->statistics, new_size - old_size, __ATOMIC_RELAXED);
  51. return 0;
  52. }
  53. size_t cbuffer_available_size_unsafe(struct circular_buffer *buf) {
  54. size_t len = (buf->write >= buf->read) ? (buf->write - buf->read) : (buf->size - buf->read + buf->write);
  55. return buf->max_size - len;
  56. }
  57. int cbuffer_add_unsafe(struct circular_buffer *buf, const char *d, size_t d_len) {
  58. size_t len = (buf->write >= buf->read) ? (buf->write - buf->read) : (buf->size - buf->read + buf->write);
  59. while (d_len + len >= buf->size) {
  60. if (cbuffer_realloc_unsafe(buf)) {
  61. return 1;
  62. }
  63. }
  64. // Guarantee: write + d_len cannot hit read
  65. if (buf->write + d_len < buf->size) {
  66. memcpy(buf->data + buf->write, d, d_len);
  67. buf->write += d_len;
  68. }
  69. else {
  70. size_t top_part = buf->size - buf->write;
  71. memcpy(buf->data + buf->write, d, top_part);
  72. memcpy(buf->data, d + top_part, d_len - top_part);
  73. buf->write = d_len - top_part;
  74. }
  75. return 0;
  76. }
  77. // Assume caller does not remove too many bytes (i.e. read will jump over write)
  78. void cbuffer_remove_unsafe(struct circular_buffer *buf, size_t num) {
  79. buf->read += num;
  80. // Assume num < size (i.e. caller cannot remove more bytes than are in the buffer)
  81. if (buf->read >= buf->size)
  82. buf->read -= buf->size;
  83. }
  84. size_t cbuffer_next_unsafe(struct circular_buffer *buf, char **start) {
  85. if (start != NULL)
  86. *start = buf->data + buf->read;
  87. if (buf->read <= buf->write) {
  88. return buf->write - buf->read; // Includes empty case
  89. }
  90. return buf->size - buf->read;
  91. }
  92. void cbuffer_flush(struct circular_buffer*buf) {
  93. buf->write = 0;
  94. buf->read = 0;
  95. }