sqlite_aclk.h 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifndef NETDATA_SQLITE_ACLK_H
  3. #define NETDATA_SQLITE_ACLK_H
  4. #include "sqlite3.h"
  5. #ifndef ACLK_MAX_CHART_BATCH
  6. #define ACLK_MAX_CHART_BATCH (200)
  7. #endif
  8. #ifndef ACLK_MAX_CHART_BATCH_COUNT
  9. #define ACLK_MAX_CHART_BATCH_COUNT (10)
  10. #endif
  11. #define ACLK_MAX_ALERT_UPDATES (5)
  12. #define ACLK_DATABASE_CLEANUP_FIRST (1200)
  13. #define ACLK_DATABASE_CLEANUP_INTERVAL (3600)
  14. #define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400)
  15. #define ACLK_SYNC_QUERY_SIZE 512
  16. struct aclk_completion {
  17. uv_mutex_t mutex;
  18. uv_cond_t cond;
  19. volatile unsigned completed;
  20. };
  21. static inline void init_aclk_completion(struct aclk_completion *p)
  22. {
  23. p->completed = 0;
  24. fatal_assert(0 == uv_cond_init(&p->cond));
  25. fatal_assert(0 == uv_mutex_init(&p->mutex));
  26. }
  27. static inline void destroy_aclk_completion(struct aclk_completion *p)
  28. {
  29. uv_cond_destroy(&p->cond);
  30. uv_mutex_destroy(&p->mutex);
  31. }
  32. static inline void wait_for_aclk_completion(struct aclk_completion *p)
  33. {
  34. uv_mutex_lock(&p->mutex);
  35. while (0 == p->completed) {
  36. uv_cond_wait(&p->cond, &p->mutex);
  37. }
  38. fatal_assert(1 == p->completed);
  39. uv_mutex_unlock(&p->mutex);
  40. }
  41. static inline void aclk_complete(struct aclk_completion *p)
  42. {
  43. uv_mutex_lock(&p->mutex);
  44. p->completed = 1;
  45. uv_mutex_unlock(&p->mutex);
  46. uv_cond_broadcast(&p->cond);
  47. }
  48. extern uv_mutex_t aclk_async_lock;
  49. static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out)
  50. {
  51. uuid_unparse_lower(*uuid, out);
  52. out[8] = '_';
  53. out[13] = '_';
  54. out[18] = '_';
  55. out[23] = '_';
  56. }
  57. #define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
  58. "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \
  59. "unique(alert_unique_id));"
  60. #define INDEX_ACLK_ALERT "CREATE INDEX IF NOT EXISTS aclk_alert_index_%s ON aclk_alert_%s (alert_unique_id);"
  61. enum aclk_database_opcode {
  62. ACLK_DATABASE_NOOP = 0,
  63. ACLK_DATABASE_ORPHAN_HOST,
  64. ACLK_DATABASE_ALARM_HEALTH_LOG,
  65. ACLK_DATABASE_CLEANUP,
  66. ACLK_DATABASE_DELETE_HOST,
  67. ACLK_DATABASE_NODE_INFO,
  68. ACLK_DATABASE_PUSH_ALERT,
  69. ACLK_DATABASE_PUSH_ALERT_CONFIG,
  70. ACLK_DATABASE_PUSH_ALERT_SNAPSHOT,
  71. ACLK_DATABASE_QUEUE_REMOVED_ALERTS,
  72. ACLK_DATABASE_NODE_COLLECTORS,
  73. ACLK_DATABASE_TIMER,
  74. // leave this last
  75. // we need it to check for worker utilization
  76. ACLK_MAX_ENUMERATIONS_DEFINED
  77. };
  78. struct aclk_database_cmd {
  79. enum aclk_database_opcode opcode;
  80. void *data;
  81. void *data_param;
  82. int count;
  83. struct aclk_completion *completion;
  84. };
  85. #define ACLK_DATABASE_CMD_Q_MAX_SIZE (1024)
  86. struct aclk_database_cmdqueue {
  87. unsigned head, tail;
  88. struct aclk_database_cmd cmd_array[ACLK_DATABASE_CMD_Q_MAX_SIZE];
  89. };
  90. struct aclk_database_worker_config {
  91. uv_thread_t thread;
  92. char uuid_str[GUID_LEN + 1];
  93. char node_id[GUID_LEN + 1];
  94. char host_guid[GUID_LEN + 1];
  95. char *hostname; // hostname to avoid constant lookups
  96. time_t cleanup_after; // Start a cleanup after this timestamp
  97. time_t startup_time; // When the sync thread started
  98. uint64_t alerts_batch_id; // batch id for alerts to use
  99. uint64_t alerts_start_seq_id; // cloud has asked to start streaming from
  100. uint64_t alert_sequence_id; // last alert sequence_id
  101. int pause_alert_updates;
  102. uint32_t chart_payload_count;
  103. uint64_t alerts_snapshot_id; //will contain the snapshot_id value if snapshot was requested
  104. uint64_t alerts_ack_sequence_id; //last sequence_id ack'ed from cloud via sendsnapshot message
  105. uv_loop_t *loop;
  106. RRDHOST *host;
  107. uv_async_t async;
  108. /* FIFO command queue */
  109. uv_mutex_t cmd_mutex;
  110. uv_cond_t cmd_cond;
  111. volatile unsigned queue_size;
  112. struct aclk_database_cmdqueue cmd_queue;
  113. int alert_updates;
  114. int node_info_send;
  115. time_t node_collectors_send;
  116. volatile unsigned is_shutting_down;
  117. volatile unsigned is_orphan;
  118. struct aclk_database_worker_config *next;
  119. };
  120. static inline RRDHOST *find_host_by_node_id(char *node_id)
  121. {
  122. uuid_t node_uuid;
  123. if (unlikely(!node_id))
  124. return NULL;
  125. if (uuid_parse(node_id, node_uuid))
  126. return NULL;
  127. rrd_rdlock();
  128. RRDHOST *host, *ret = NULL;
  129. rrdhost_foreach_read(host) {
  130. if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) {
  131. ret = host;
  132. break;
  133. }
  134. }
  135. rrd_unlock();
  136. return ret;
  137. }
  138. extern sqlite3 *db_meta;
  139. int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
  140. void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
  141. void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
  142. void sql_aclk_sync_init(void);
  143. int claimed();
  144. void aclk_sync_exit_all();
  145. struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id);
  146. #endif //NETDATA_SQLITE_ACLK_H