circular_buffer.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. /** @file circular_buffer.c
  3. * @brief This is the implementation of a circular buffer to be used
  4. * for saving collected logs in memory, until they are stored
  5. * into the database.
  6. */
  7. #include "circular_buffer.h"
  8. #include "helper.h"
  9. #include "parser.h"
  10. struct qsort_item {
  11. Circ_buff_item_t *cbi;
  12. struct File_info *pfi;
  13. };
  14. static int qsort_timestamp (const void *item_a, const void *item_b) {
  15. return ( (int64_t)((struct qsort_item*)item_a)->cbi->timestamp -
  16. (int64_t)((struct qsort_item*)item_b)->cbi->timestamp);
  17. }
  18. static int reverse_qsort_timestamp (const void * item_a, const void * item_b) {
  19. return -qsort_timestamp(item_a, item_b);
  20. }
  21. /**
  22. * @brief Search circular buffers according to the query_params.
  23. * @details If multiple buffers are to be searched, the results will be sorted
  24. * according to timestamps.
  25. *
  26. * Note that buff->tail can only be changed through circ_buff_read_done(), and
  27. * circ_buff_search() and circ_buff_read_done() are mutually exclusive due
  28. * to uv_mutex_lock() and uv_mutex_unlock() in queries and when writing to DB.
  29. *
  30. * @param p_query_params Query parameters to search according to.
  31. * @param p_file_infos File_info structs to be searched.
  32. */
  33. void circ_buff_search(logs_query_params_t *const p_query_params, struct File_info *const p_file_infos[]) {
  34. for(int pfi_off = 0; p_file_infos[pfi_off]; pfi_off++)
  35. uv_rwlock_rdlock(&p_file_infos[pfi_off]->circ_buff->buff_realloc_rwlock);
  36. int buffs_size = 0,
  37. buff_max_num_of_items = 0;
  38. while(p_file_infos[buffs_size]){
  39. if(p_file_infos[buffs_size]->circ_buff->num_of_items > buff_max_num_of_items)
  40. buff_max_num_of_items = p_file_infos[buffs_size]->circ_buff->num_of_items;
  41. buffs_size++;
  42. }
  43. struct qsort_item items[buffs_size * buff_max_num_of_items + 1]; // worst case allocation
  44. int items_off = 0;
  45. for(int buff_off = 0; p_file_infos[buff_off]; buff_off++){
  46. Circ_buff_t *buff = p_file_infos[buff_off]->circ_buff;
  47. /* TODO: The following 3 operations need to be replaced with a struct
  48. * to gurantee atomicity. */
  49. int head = __atomic_load_n(&buff->head, __ATOMIC_SEQ_CST) % buff->num_of_items;
  50. int tail = __atomic_load_n(&buff->tail, __ATOMIC_SEQ_CST) % buff->num_of_items;
  51. int full = __atomic_load_n(&buff->full, __ATOMIC_SEQ_CST);
  52. if ((head == tail) && !full) continue; // Nothing to do if buff is empty
  53. for (int i = tail; i != head; i = (i + 1) % buff->num_of_items){
  54. items[items_off].cbi = &buff->items[i];
  55. items[items_off++].pfi = p_file_infos[buff_off];
  56. }
  57. }
  58. items[items_off].cbi = NULL;
  59. items[items_off].pfi = NULL;
  60. if(items[0].cbi)
  61. qsort(items, items_off, sizeof(items[0]), p_query_params->order_by_asc ? qsort_timestamp : reverse_qsort_timestamp);
  62. BUFFER *const res_buff = p_query_params->results_buff;
  63. logs_query_res_hdr_t res_hdr = { // result header
  64. .timestamp = p_query_params->act_to_ts,
  65. .text_size = 0,
  66. .matches = 0,
  67. .log_source = "",
  68. .log_type = ""
  69. };
  70. for (int i = 0; items[i].cbi; i++) {
  71. /* If exceeding quota or timeout is reached and new timestamp is different than previous,
  72. * terminate query but inform caller about act_to_ts to continue from (its next value) in next call. */
  73. if( (res_buff->len >= p_query_params->quota || terminate_logs_manag_query(p_query_params)) &&
  74. items[i].cbi->timestamp != res_hdr.timestamp){
  75. p_query_params->act_to_ts = res_hdr.timestamp;
  76. break;
  77. }
  78. res_hdr.timestamp = items[i].cbi->timestamp;
  79. res_hdr.text_size = items[i].cbi->text_size;
  80. strncpyz(res_hdr.log_source, log_src_t_str[items[i].pfi->log_source], sizeof(res_hdr.log_source) - 1);
  81. strncpyz(res_hdr.log_type, log_src_type_t_str[items[i].pfi->log_type], sizeof(res_hdr.log_type) - 1);
  82. strncpyz(res_hdr.basename, items[i].pfi->file_basename, sizeof(res_hdr.basename) - 1);
  83. strncpyz(res_hdr.filename, items[i].pfi->filename, sizeof(res_hdr.filename) - 1);
  84. strncpyz(res_hdr.chartname, items[i].pfi->chartname, sizeof(res_hdr.chartname) - 1);
  85. if (p_query_params->order_by_asc ?
  86. ( res_hdr.timestamp >= p_query_params->req_from_ts && res_hdr.timestamp <= p_query_params->req_to_ts ) :
  87. ( res_hdr.timestamp >= p_query_params->req_to_ts && res_hdr.timestamp <= p_query_params->req_from_ts) ){
  88. /* In case of search_keyword, less than sizeof(res_hdr) + temp_msg.text_size
  89. * space is required, but go for worst case scenario for now */
  90. buffer_increase(res_buff, sizeof(res_hdr) + res_hdr.text_size);
  91. if(!p_query_params->keyword || !*p_query_params->keyword || !strcmp(p_query_params->keyword, " ")){
  92. /* NOTE: relying on items[i]->cbi->num_lines to get number of log lines
  93. * might not be 100% correct, since parsing must have taken place
  94. * already to return correct count. Maybe an issue under heavy load. */
  95. res_hdr.matches = items[i].cbi->num_lines;
  96. memcpy(&res_buff->buffer[res_buff->len + sizeof(res_hdr)], items[i].cbi->data, res_hdr.text_size);
  97. }
  98. else {
  99. res_hdr.matches = search_keyword( items[i].cbi->data, res_hdr.text_size,
  100. &res_buff->buffer[res_buff->len + sizeof(res_hdr)],
  101. &res_hdr.text_size, p_query_params->keyword, NULL,
  102. p_query_params->ignore_case);
  103. m_assert( (res_hdr.matches > 0 && res_hdr.text_size > 0) ||
  104. (res_hdr.matches == 0 && res_hdr.text_size == 0),
  105. "res_hdr.matches and res_hdr.text_size must both be > 0 or == 0.");
  106. if(unlikely(res_hdr.matches < 0))
  107. break; /* res_hdr.matches < 0 - error during keyword search */
  108. }
  109. if(res_hdr.text_size){
  110. res_buff->buffer[res_buff->len + sizeof(res_hdr) + res_hdr.text_size - 1] = '\n'; // replace '\0' with '\n'
  111. memcpy(&res_buff->buffer[res_buff->len], &res_hdr, sizeof(res_hdr));
  112. res_buff->len += sizeof(res_hdr) + res_hdr.text_size;
  113. p_query_params->num_lines += res_hdr.matches;
  114. }
  115. m_assert(TEST_MS_TIMESTAMP_VALID(res_hdr.timestamp), "res_hdr.timestamp is invalid");
  116. }
  117. }
  118. for(int pfi_off = 0; p_file_infos[pfi_off]; pfi_off++)
  119. uv_rwlock_rdunlock(&p_file_infos[pfi_off]->circ_buff->buff_realloc_rwlock);
  120. }
  121. /**
  122. * @brief Query circular buffer if there is space for item insertion.
  123. * @param buff Circular buffer to query for available space.
  124. * @param requested_text_space Size of raw (uncompressed) space needed.
  125. * @note If buff->allow_dropped_logs is 0, then this function will block and
  126. * it will only return once there is available space as requested. In this
  127. * case, it will never return 0.
  128. * @return \p requested_text_space if there is enough space, else 0.
  129. */
  130. size_t circ_buff_prepare_write(Circ_buff_t *const buff, size_t const requested_text_space){
  131. /* Calculate how much is the maximum compressed space that will
  132. * be required on top of the requested space for the raw data. */
  133. buff->in->text_compressed_size = (size_t) LZ4_compressBound(requested_text_space);
  134. m_assert(buff->in->text_compressed_size != 0, "requested text compressed space is zero");
  135. size_t const required_space = requested_text_space + buff->in->text_compressed_size;
  136. size_t available_text_space = 0;
  137. size_t total_cached_mem_ex_in;
  138. try_to_acquire_space:
  139. total_cached_mem_ex_in = 0;
  140. for (int i = 0; i < buff->num_of_items; i++){
  141. total_cached_mem_ex_in += buff->items[i].data_max_size;
  142. }
  143. /* If the required space is more than the allocated space of the input
  144. * buffer, then we need to check if the input buffer can be reallocated:
  145. *
  146. * a) If the total memory consumption of the circular buffer plus the
  147. * required space is less than the limit set by "circular buffer max size"
  148. * for this log source, then the input buffer can be reallocated.
  149. *
  150. * b) If the total memory consumption of the circular buffer plus the
  151. * required space is more than the limit set by "circular buffer max size"
  152. * for this log source, we will attempt to reclaim some of the circular
  153. * buffer allocated memory from any empty items.
  154. *
  155. * c) If after reclaiming the total memory consumption is still beyond the
  156. * configuration limit, either 0 will be returned as the available space
  157. * for raw logs in the input buffer, or the function will block and repeat
  158. * the same process, until there is available space to be returned, depending
  159. * of the configuration value of buff->allow_dropped_logs.
  160. * */
  161. if(required_space > buff->in->data_max_size) {
  162. if(likely(total_cached_mem_ex_in + required_space <= buff->total_cached_mem_max)){
  163. buff->in->data_max_size = required_space;
  164. buff->in->data = reallocz(buff->in->data, buff->in->data_max_size);
  165. available_text_space = requested_text_space;
  166. }
  167. else if(likely(__atomic_load_n(&buff->full, __ATOMIC_SEQ_CST) == 0)){
  168. int head = __atomic_load_n(&buff->head, __ATOMIC_SEQ_CST) % buff->num_of_items;
  169. int tail = __atomic_load_n(&buff->tail, __ATOMIC_SEQ_CST) % buff->num_of_items;
  170. for (int i = (head == tail ? (head + 1) % buff->num_of_items : head);
  171. i != tail; i = (i + 1) % buff->num_of_items) {
  172. m_assert(i <= buff->num_of_items, "i > buff->num_of_items");
  173. buff->items[i].data_max_size = 1;
  174. buff->items[i].data = reallocz(buff->items[i].data, buff->items[i].data_max_size);
  175. }
  176. total_cached_mem_ex_in = 0;
  177. for (int i = 0; i < buff->num_of_items; i++){
  178. total_cached_mem_ex_in += buff->items[i].data_max_size;
  179. }
  180. if(total_cached_mem_ex_in + required_space <= buff->total_cached_mem_max){
  181. buff->in->data_max_size = required_space;
  182. buff->in->data = reallocz(buff->in->data, buff->in->data_max_size);
  183. available_text_space = requested_text_space;
  184. }
  185. else available_text_space = 0;
  186. }
  187. } else available_text_space = requested_text_space;
  188. __atomic_store_n(&buff->total_cached_mem, total_cached_mem_ex_in + buff->in->data_max_size, __ATOMIC_RELAXED);
  189. if(unlikely(!buff->allow_dropped_logs && !available_text_space)){
  190. sleep_usec(CIRC_BUFF_PREP_WR_RETRY_AFTER_MS * USEC_PER_MS);
  191. goto try_to_acquire_space;
  192. }
  193. m_assert(available_text_space || buff->allow_dropped_logs, "!available_text_space == 0 && !buff->allow_dropped_logs");
  194. return available_text_space;
  195. }
  196. /**
  197. * @brief Insert item from temporary input buffer to circular buffer.
  198. * @param buff Circular buffer to insert the item into
  199. * @return 0 in case of success or -1 in case there was an error (e.g. buff
  200. * is out of space).
  201. */
  202. int circ_buff_insert(Circ_buff_t *const buff){
  203. // TODO: Probably can be changed to __ATOMIC_RELAXED, but ideally a mutex should be used here.
  204. int head = __atomic_load_n(&buff->head, __ATOMIC_SEQ_CST) % buff->num_of_items;
  205. int tail = __atomic_load_n(&buff->tail, __ATOMIC_SEQ_CST) % buff->num_of_items;
  206. int full = __atomic_load_n(&buff->full, __ATOMIC_SEQ_CST);
  207. /* If circular buffer does not have any free items, it will be expanded
  208. * by reallocating the `items` array and adding one more item. */
  209. if (unlikely(( head == tail ) && full )) {
  210. debug_log( "buff out of space! will be expanded.");
  211. uv_rwlock_wrlock(&buff->buff_realloc_rwlock);
  212. Circ_buff_item_t *items_new = callocz(buff->num_of_items + 1, sizeof(Circ_buff_item_t));
  213. for(int i = 0; i < buff->num_of_items; i++){
  214. Circ_buff_item_t *item_old = &buff->items[head++ % buff->num_of_items];
  215. items_new[i] = *item_old;
  216. }
  217. freez(buff->items);
  218. buff->items = items_new;
  219. buff->parse = buff->parse - buff->tail;
  220. head = buff->head = buff->num_of_items++;
  221. buff->tail = buff->read = 0;
  222. buff->full = 0;
  223. __atomic_add_fetch(&buff->buff_realloc_cnt, 1, __ATOMIC_RELAXED);
  224. uv_rwlock_wrunlock(&buff->buff_realloc_rwlock);
  225. }
  226. Circ_buff_item_t *cur_item = &buff->items[head];
  227. char *tmp_data = cur_item->data;
  228. size_t tmp_data_max_size = cur_item->data_max_size;
  229. cur_item->status = buff->in->status;
  230. cur_item->timestamp = buff->in->timestamp;
  231. cur_item->data = buff->in->data;
  232. cur_item->text_size = buff->in->text_size;
  233. cur_item->text_compressed = buff->in->text_compressed;
  234. cur_item->text_compressed_size = buff->in->text_compressed_size;
  235. cur_item->data_max_size = buff->in->data_max_size;
  236. cur_item->num_lines = buff->in->num_lines;
  237. buff->in->status = CIRC_BUFF_ITEM_STATUS_UNPROCESSED;
  238. buff->in->timestamp = 0;
  239. buff->in->data = tmp_data;
  240. buff->in->text_size = 0;
  241. // buff->in->text_compressed = tmp_data;
  242. buff->in->text_compressed_size = 0;
  243. buff->in->data_max_size = tmp_data_max_size;
  244. buff->in->num_lines = 0;
  245. __atomic_add_fetch(&buff->text_size_total, cur_item->text_size, __ATOMIC_SEQ_CST);
  246. if( __atomic_add_fetch(&buff->text_compressed_size_total, cur_item->text_compressed_size, __ATOMIC_SEQ_CST)){
  247. __atomic_store_n(&buff->compression_ratio,
  248. __atomic_load_n(&buff->text_size_total, __ATOMIC_SEQ_CST) /
  249. __atomic_load_n(&buff->text_compressed_size_total, __ATOMIC_SEQ_CST),
  250. __ATOMIC_SEQ_CST);
  251. } else __atomic_store_n( &buff->compression_ratio, 0, __ATOMIC_SEQ_CST);
  252. if(unlikely(__atomic_add_fetch(&buff->head, 1, __ATOMIC_SEQ_CST) % buff->num_of_items ==
  253. __atomic_load_n(&buff->tail, __ATOMIC_SEQ_CST) % buff->num_of_items)){
  254. __atomic_store_n(&buff->full, 1, __ATOMIC_SEQ_CST);
  255. }
  256. __atomic_or_fetch(&cur_item->status, CIRC_BUFF_ITEM_STATUS_PARSED | CIRC_BUFF_ITEM_STATUS_STREAMED, __ATOMIC_SEQ_CST);
  257. return 0;
  258. }
  259. /**
  260. * @brief Return pointer to next item to be read from the circular buffer.
  261. * @param buff Circular buffer to get next item from.
  262. * @return Pointer to the next circular buffer item to be read, or NULL
  263. * if there are no more items to be read.
  264. */
  265. Circ_buff_item_t *circ_buff_read_item(Circ_buff_t *const buff) {
  266. Circ_buff_item_t *item = &buff->items[buff->read % buff->num_of_items];
  267. m_assert(__atomic_load_n(&item->status, __ATOMIC_RELAXED) <= CIRC_BUFF_ITEM_STATUS_DONE, "Invalid status");
  268. if( /* No more records to be retrieved from the buffer - pay attention that
  269. * there is no `% buff->num_of_items` operation, as we need to check
  270. * the case where buff->read is exactly equal to buff->head. */
  271. (buff->read == (__atomic_load_n(&buff->head, __ATOMIC_SEQ_CST))) ||
  272. /* Current item either not parsed or streamed */
  273. (__atomic_load_n(&item->status, __ATOMIC_RELAXED) != CIRC_BUFF_ITEM_STATUS_DONE) ){
  274. return NULL;
  275. }
  276. __atomic_sub_fetch(&buff->text_size_total, item->text_size, __ATOMIC_SEQ_CST);
  277. if( __atomic_sub_fetch(&buff->text_compressed_size_total, item->text_compressed_size, __ATOMIC_SEQ_CST)){
  278. __atomic_store_n(&buff->compression_ratio,
  279. __atomic_load_n(&buff->text_size_total, __ATOMIC_SEQ_CST) /
  280. __atomic_load_n(&buff->text_compressed_size_total, __ATOMIC_SEQ_CST),
  281. __ATOMIC_SEQ_CST);
  282. } else __atomic_store_n( &buff->compression_ratio, 0, __ATOMIC_SEQ_CST);
  283. buff->read++;
  284. return item;
  285. }
  286. /**
  287. * @brief Complete buffer read process.
  288. * @param buff Circular buffer to complete read process on.
  289. */
  290. void circ_buff_read_done(Circ_buff_t *const buff){
  291. /* Even if one item was read, it means buffer cannot be full anymore */
  292. if(__atomic_load_n(&buff->tail, __ATOMIC_RELAXED) != buff->read)
  293. __atomic_store_n(&buff->full, 0, __ATOMIC_SEQ_CST);
  294. __atomic_store_n(&buff->tail, buff->read, __ATOMIC_SEQ_CST);
  295. }
  296. /**
  297. * @brief Create a new circular buffer.
  298. * @param num_of_items Number of Circ_buff_item_t items in the buffer.
  299. * @param max_size Maximum memory the circular buffer can occupy.
  300. * @param allow_dropped_logs Maximum memory the circular buffer can occupy.
  301. * @return Pointer to the new circular buffer structure.
  302. */
  303. Circ_buff_t *circ_buff_init(const int num_of_items,
  304. const size_t max_size,
  305. const int allow_dropped_logs ) {
  306. Circ_buff_t *buff = callocz(1, sizeof(Circ_buff_t));
  307. buff->num_of_items = num_of_items;
  308. buff->items = callocz(buff->num_of_items, sizeof(Circ_buff_item_t));
  309. buff->in = callocz(1, sizeof(Circ_buff_item_t));
  310. uv_rwlock_init(&buff->buff_realloc_rwlock);
  311. buff->total_cached_mem_max = max_size;
  312. buff->allow_dropped_logs = allow_dropped_logs;
  313. return buff;
  314. }
  315. /**
  316. * @brief Destroy a circular buffer.
  317. * @param buff Circular buffer to be destroyed.
  318. */
  319. void circ_buff_destroy(Circ_buff_t *buff){
  320. for (int i = 0; i < buff->num_of_items; i++) freez(buff->items[i].data);
  321. freez(buff->items);
  322. freez(buff->in->data);
  323. freez(buff->in);
  324. freez(buff);
  325. };