aclk_query_queue.c 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk_query_queue.h"
  3. #include "aclk_query.h"
  4. #include "aclk_stats.h"
  5. static netdata_mutex_t aclk_query_queue_mutex = NETDATA_MUTEX_INITIALIZER;
  6. #define ACLK_QUEUE_LOCK netdata_mutex_lock(&aclk_query_queue_mutex)
  7. #define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&aclk_query_queue_mutex)
  8. static struct aclk_query_queue {
  9. aclk_query_t head;
  10. aclk_query_t tail;
  11. int block_push;
  12. } aclk_query_queue = {
  13. .head = NULL,
  14. .tail = NULL,
  15. .block_push = 0
  16. };
  17. static inline int _aclk_queue_query(aclk_query_t query)
  18. {
  19. query->created = now_realtime_usec();
  20. ACLK_QUEUE_LOCK;
  21. if (aclk_query_queue.block_push) {
  22. ACLK_QUEUE_UNLOCK;
  23. if(!netdata_exit)
  24. error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
  25. aclk_query_free(query);
  26. return 1;
  27. }
  28. if (!aclk_query_queue.head) {
  29. aclk_query_queue.head = query;
  30. aclk_query_queue.tail = query;
  31. ACLK_QUEUE_UNLOCK;
  32. return 0;
  33. }
  34. // TODO deduplication
  35. aclk_query_queue.tail->next = query;
  36. aclk_query_queue.tail = query;
  37. ACLK_QUEUE_UNLOCK;
  38. return 0;
  39. }
  40. int aclk_queue_query(aclk_query_t query)
  41. {
  42. int ret = _aclk_queue_query(query);
  43. if (!ret) {
  44. QUERY_THREAD_WAKEUP;
  45. if (aclk_stats_enabled) {
  46. ACLK_STATS_LOCK;
  47. aclk_metrics_per_sample.queries_queued++;
  48. ACLK_STATS_UNLOCK;
  49. }
  50. }
  51. return ret;
  52. }
  53. aclk_query_t aclk_queue_pop(void)
  54. {
  55. aclk_query_t ret;
  56. ACLK_QUEUE_LOCK;
  57. if (aclk_query_queue.block_push) {
  58. ACLK_QUEUE_UNLOCK;
  59. if(!netdata_exit)
  60. error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
  61. return NULL;
  62. }
  63. ret = aclk_query_queue.head;
  64. if (!ret) {
  65. ACLK_QUEUE_UNLOCK;
  66. return ret;
  67. }
  68. aclk_query_queue.head = ret->next;
  69. if (unlikely(!aclk_query_queue.head))
  70. aclk_query_queue.tail = aclk_query_queue.head;
  71. ACLK_QUEUE_UNLOCK;
  72. ret->next = NULL;
  73. return ret;
  74. }
  75. void aclk_queue_flush(void)
  76. {
  77. aclk_query_t query = aclk_queue_pop();
  78. while (query) {
  79. aclk_query_free(query);
  80. query = aclk_queue_pop();
  81. };
  82. }
  83. aclk_query_t aclk_query_new(aclk_query_type_t type)
  84. {
  85. aclk_query_t query = callocz(1, sizeof(struct aclk_query));
  86. query->type = type;
  87. return query;
  88. }
  89. void aclk_query_free(aclk_query_t query)
  90. {
  91. if (query->type == HTTP_API_V2) {
  92. freez(query->data.http_api_v2.payload);
  93. if (query->data.http_api_v2.query != query->dedup_id)
  94. freez(query->data.http_api_v2.query);
  95. }
  96. if (query->type == CHART_NEW)
  97. freez(query->data.chart_add_del.chart_name);
  98. if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update)
  99. json_object_put(query->data.alarm_update);
  100. freez(query->dedup_id);
  101. freez(query->callback_topic);
  102. freez(query->msg_id);
  103. freez(query);
  104. }
  105. void aclk_queue_lock(void)
  106. {
  107. ACLK_QUEUE_LOCK;
  108. aclk_query_queue.block_push = 1;
  109. ACLK_QUEUE_UNLOCK;
  110. }