aclk_query_queue.c 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk_query_queue.h"
  3. #include "aclk_query.h"
  4. #include "aclk_stats.h"
  5. static netdata_mutex_t aclk_query_queue_mutex = NETDATA_MUTEX_INITIALIZER;
  6. #define ACLK_QUEUE_LOCK netdata_mutex_lock(&aclk_query_queue_mutex)
  7. #define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&aclk_query_queue_mutex)
  8. static struct aclk_query_queue {
  9. aclk_query_t head;
  10. aclk_query_t tail;
  11. int block_push;
  12. } aclk_query_queue = {
  13. .head = NULL,
  14. .tail = NULL,
  15. .block_push = 0
  16. };
  17. static inline int _aclk_queue_query(aclk_query_t query)
  18. {
  19. now_realtime_timeval(&query->created_tv);
  20. query->created = now_realtime_usec();
  21. ACLK_QUEUE_LOCK;
  22. if (aclk_query_queue.block_push) {
  23. ACLK_QUEUE_UNLOCK;
  24. if(!netdata_exit)
  25. error("Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
  26. aclk_query_free(query);
  27. return 1;
  28. }
  29. if (!aclk_query_queue.head) {
  30. aclk_query_queue.head = query;
  31. aclk_query_queue.tail = query;
  32. ACLK_QUEUE_UNLOCK;
  33. return 0;
  34. }
  35. // TODO deduplication
  36. aclk_query_queue.tail->next = query;
  37. aclk_query_queue.tail = query;
  38. ACLK_QUEUE_UNLOCK;
  39. return 0;
  40. }
  41. int aclk_queue_query(aclk_query_t query)
  42. {
  43. int ret = _aclk_queue_query(query);
  44. if (!ret) {
  45. QUERY_THREAD_WAKEUP;
  46. if (aclk_stats_enabled) {
  47. ACLK_STATS_LOCK;
  48. aclk_metrics_per_sample.queries_queued++;
  49. ACLK_STATS_UNLOCK;
  50. }
  51. }
  52. return ret;
  53. }
  54. aclk_query_t aclk_queue_pop(void)
  55. {
  56. aclk_query_t ret;
  57. ACLK_QUEUE_LOCK;
  58. if (aclk_query_queue.block_push) {
  59. ACLK_QUEUE_UNLOCK;
  60. if(!netdata_exit)
  61. error("POP Query Queue is blocked from accepting new requests. This is normally the case when ACLK prepares to shutdown.");
  62. return NULL;
  63. }
  64. ret = aclk_query_queue.head;
  65. if (!ret) {
  66. ACLK_QUEUE_UNLOCK;
  67. return ret;
  68. }
  69. aclk_query_queue.head = ret->next;
  70. if (unlikely(!aclk_query_queue.head))
  71. aclk_query_queue.tail = aclk_query_queue.head;
  72. ACLK_QUEUE_UNLOCK;
  73. ret->next = NULL;
  74. return ret;
  75. }
  76. void aclk_queue_flush(void)
  77. {
  78. aclk_query_t query = aclk_queue_pop();
  79. while (query) {
  80. aclk_query_free(query);
  81. query = aclk_queue_pop();
  82. };
  83. }
  84. aclk_query_t aclk_query_new(aclk_query_type_t type)
  85. {
  86. aclk_query_t query = callocz(1, sizeof(struct aclk_query));
  87. query->type = type;
  88. return query;
  89. }
  90. void aclk_query_free(aclk_query_t query)
  91. {
  92. switch (query->type) {
  93. case HTTP_API_V2:
  94. freez(query->data.http_api_v2.payload);
  95. if (query->data.http_api_v2.query != query->dedup_id)
  96. freez(query->data.http_api_v2.query);
  97. break;
  98. case CHART_NEW:
  99. freez(query->data.chart_add_del.chart_name);
  100. break;
  101. case ALARM_STATE_UPDATE:
  102. if (query->data.alarm_update)
  103. json_object_put(query->data.alarm_update);
  104. break;
  105. case NODE_STATE_UPDATE:
  106. freez((void*)query->data.node_update.claim_id);
  107. freez((void*)query->data.node_update.node_id);
  108. break;
  109. case REGISTER_NODE:
  110. freez((void*)query->data.node_creation.claim_id);
  111. freez((void*)query->data.node_creation.hostname);
  112. freez((void*)query->data.node_creation.machine_guid);
  113. break;
  114. case CHART_DIMS_UPDATE:
  115. case CHART_CONFIG_UPDATED:
  116. case CHART_RESET:
  117. case RETENTION_UPDATED:
  118. case UPDATE_NODE_INFO:
  119. case ALARM_LOG_HEALTH:
  120. case ALARM_PROVIDE_CFG:
  121. case ALARM_SNAPSHOT:
  122. freez(query->data.bin_payload.payload);
  123. break;
  124. default:
  125. break;
  126. }
  127. freez(query->dedup_id);
  128. freez(query->callback_topic);
  129. freez(query->msg_id);
  130. freez(query);
  131. }
  132. void aclk_queue_lock(void)
  133. {
  134. ACLK_QUEUE_LOCK;
  135. aclk_query_queue.block_push = 1;
  136. ACLK_QUEUE_UNLOCK;
  137. }
  138. void aclk_queue_unlock(void)
  139. {
  140. ACLK_QUEUE_LOCK;
  141. aclk_query_queue.block_push = 0;
  142. ACLK_QUEUE_UNLOCK;
  143. }