aclk_query_queue.c 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk_query_queue.h"
  3. #include "aclk_query.h"
  4. #include "aclk_stats.h"
  5. static netdata_mutex_t aclk_query_queue_mutex = NETDATA_MUTEX_INITIALIZER;
  6. #define ACLK_QUEUE_LOCK netdata_mutex_lock(&aclk_query_queue_mutex)
  7. #define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&aclk_query_queue_mutex)
  8. static struct aclk_query_queue {
  9. aclk_query_t head;
  10. int block_push;
  11. } aclk_query_queue = {
  12. .head = NULL,
  13. .block_push = 0
  14. };
  15. static inline int _aclk_queue_query(aclk_query_t query)
  16. {
  17. now_monotonic_high_precision_timeval(&query->created_tv);
  18. query->created = now_realtime_usec();
  19. ACLK_QUEUE_LOCK;
  20. if (aclk_query_queue.block_push) {
  21. ACLK_QUEUE_UNLOCK;
  22. if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES))
  23. netdata_log_error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
  24. aclk_query_free(query);
  25. return 1;
  26. }
  27. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(aclk_query_queue.head, query, prev, next);
  28. ACLK_QUEUE_UNLOCK;
  29. return 0;
  30. }
  31. int aclk_queue_query(aclk_query_t query)
  32. {
  33. int ret = _aclk_queue_query(query);
  34. if (!ret) {
  35. QUERY_THREAD_WAKEUP;
  36. if (aclk_stats_enabled) {
  37. ACLK_STATS_LOCK;
  38. aclk_metrics_per_sample.queries_queued++;
  39. ACLK_STATS_UNLOCK;
  40. }
  41. }
  42. return ret;
  43. }
  44. aclk_query_t aclk_queue_pop(void)
  45. {
  46. aclk_query_t ret;
  47. ACLK_QUEUE_LOCK;
  48. if (aclk_query_queue.block_push) {
  49. ACLK_QUEUE_UNLOCK;
  50. if(service_running(SERVICE_ACLK | ABILITY_DATA_QUERIES))
  51. netdata_log_error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
  52. return NULL;
  53. }
  54. ret = aclk_query_queue.head;
  55. if (!ret) {
  56. ACLK_QUEUE_UNLOCK;
  57. return ret;
  58. }
  59. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(aclk_query_queue.head, ret, prev, next);
  60. ACLK_QUEUE_UNLOCK;
  61. ret->next = NULL;
  62. return ret;
  63. }
  64. void aclk_queue_flush(void)
  65. {
  66. aclk_query_t query = aclk_queue_pop();
  67. while (query) {
  68. aclk_query_free(query);
  69. query = aclk_queue_pop();
  70. }
  71. }
  72. aclk_query_t aclk_query_new(aclk_query_type_t type)
  73. {
  74. aclk_query_t query = callocz(1, sizeof(struct aclk_query));
  75. query->type = type;
  76. return query;
  77. }
  78. void aclk_query_free(aclk_query_t query)
  79. {
  80. switch (query->type) {
  81. case HTTP_API_V2:
  82. freez(query->data.http_api_v2.payload);
  83. if (query->data.http_api_v2.query != query->dedup_id)
  84. freez(query->data.http_api_v2.query);
  85. break;
  86. default:
  87. break;
  88. }
  89. freez(query->dedup_id);
  90. freez(query->callback_topic);
  91. freez(query->msg_id);
  92. freez(query);
  93. }
  94. void aclk_queue_lock(void)
  95. {
  96. ACLK_QUEUE_LOCK;
  97. aclk_query_queue.block_push = 1;
  98. ACLK_QUEUE_UNLOCK;
  99. }
  100. void aclk_queue_unlock(void)
  101. {
  102. ACLK_QUEUE_LOCK;
  103. aclk_query_queue.block_push = 0;
  104. ACLK_QUEUE_UNLOCK;
  105. }