aclk_query_queue.c 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk_query_queue.h"
  3. #include "aclk_query.h"
  4. #include "aclk_stats.h"
  5. static netdata_mutex_t aclk_query_queue_mutex = NETDATA_MUTEX_INITIALIZER;
  6. #define ACLK_QUEUE_LOCK netdata_mutex_lock(&aclk_query_queue_mutex)
  7. #define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&aclk_query_queue_mutex)
  8. static struct aclk_query_queue {
  9. aclk_query_t head;
  10. aclk_query_t tail;
  11. int block_push;
  12. } aclk_query_queue = {
  13. .head = NULL,
  14. .tail = NULL,
  15. .block_push = 0
  16. };
  17. static inline int _aclk_queue_query(aclk_query_t query)
  18. {
  19. now_realtime_timeval(&query->created_tv);
  20. query->created = now_realtime_usec();
  21. ACLK_QUEUE_LOCK;
  22. if (aclk_query_queue.block_push) {
  23. ACLK_QUEUE_UNLOCK;
  24. if(!netdata_exit)
  25. error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
  26. aclk_query_free(query);
  27. return 1;
  28. }
  29. if (!aclk_query_queue.head) {
  30. aclk_query_queue.head = query;
  31. aclk_query_queue.tail = query;
  32. ACLK_QUEUE_UNLOCK;
  33. return 0;
  34. }
  35. // TODO deduplication
  36. aclk_query_queue.tail->next = query;
  37. aclk_query_queue.tail = query;
  38. ACLK_QUEUE_UNLOCK;
  39. return 0;
  40. }
  41. int aclk_queue_query(aclk_query_t query)
  42. {
  43. int ret = _aclk_queue_query(query);
  44. if (!ret) {
  45. QUERY_THREAD_WAKEUP;
  46. if (aclk_stats_enabled) {
  47. ACLK_STATS_LOCK;
  48. aclk_metrics_per_sample.queries_queued++;
  49. ACLK_STATS_UNLOCK;
  50. }
  51. }
  52. return ret;
  53. }
  54. aclk_query_t aclk_queue_pop(void)
  55. {
  56. aclk_query_t ret;
  57. ACLK_QUEUE_LOCK;
  58. if (aclk_query_queue.block_push) {
  59. ACLK_QUEUE_UNLOCK;
  60. if(!netdata_exit)
  61. error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
  62. return NULL;
  63. }
  64. ret = aclk_query_queue.head;
  65. if (!ret) {
  66. ACLK_QUEUE_UNLOCK;
  67. return ret;
  68. }
  69. aclk_query_queue.head = ret->next;
  70. if (unlikely(!aclk_query_queue.head))
  71. aclk_query_queue.tail = aclk_query_queue.head;
  72. ACLK_QUEUE_UNLOCK;
  73. ret->next = NULL;
  74. return ret;
  75. }
  76. void aclk_queue_flush(void)
  77. {
  78. aclk_query_t query = aclk_queue_pop();
  79. while (query) {
  80. aclk_query_free(query);
  81. query = aclk_queue_pop();
  82. };
  83. }
  84. aclk_query_t aclk_query_new(aclk_query_type_t type)
  85. {
  86. aclk_query_t query = callocz(1, sizeof(struct aclk_query));
  87. query->type = type;
  88. return query;
  89. }
  90. void aclk_query_free(aclk_query_t query)
  91. {
  92. switch (query->type) {
  93. case HTTP_API_V2:
  94. freez(query->data.http_api_v2.payload);
  95. if (query->data.http_api_v2.query != query->dedup_id)
  96. freez(query->data.http_api_v2.query);
  97. break;
  98. case NODE_STATE_UPDATE:
  99. case REGISTER_NODE:
  100. case CHART_DIMS_UPDATE:
  101. case CHART_CONFIG_UPDATED:
  102. case CHART_RESET:
  103. case RETENTION_UPDATED:
  104. case UPDATE_NODE_INFO:
  105. case ALARM_LOG_HEALTH:
  106. case ALARM_PROVIDE_CFG:
  107. case ALARM_SNAPSHOT:
  108. case UPDATE_NODE_COLLECTORS:
  109. case PROTO_BIN_MESSAGE:
  110. if (!use_mqtt_5)
  111. freez(query->data.bin_payload.payload);
  112. break;
  113. default:
  114. break;
  115. }
  116. freez(query->dedup_id);
  117. freez(query->callback_topic);
  118. freez(query->msg_id);
  119. freez(query);
  120. }
  121. void aclk_queue_lock(void)
  122. {
  123. ACLK_QUEUE_LOCK;
  124. aclk_query_queue.block_push = 1;
  125. ACLK_QUEUE_UNLOCK;
  126. }
  127. void aclk_queue_unlock(void)
  128. {
  129. ACLK_QUEUE_LOCK;
  130. aclk_query_queue.block_push = 0;
  131. ACLK_QUEUE_UNLOCK;
  132. }