sqlite_aclk.h 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifndef NETDATA_SQLITE_ACLK_H
  3. #define NETDATA_SQLITE_ACLK_H
  4. #include "sqlite3.h"
  5. // TODO: To be added
  6. #include "../../aclk/schema-wrappers/chart_stream.h"
  7. #ifndef ACLK_MAX_CHART_BATCH
  8. #define ACLK_MAX_CHART_BATCH (200)
  9. #endif
  10. #ifndef ACLK_MAX_CHART_BATCH_COUNT
  11. #define ACLK_MAX_CHART_BATCH_COUNT (10)
  12. #endif
  13. #define ACLK_MAX_ALERT_UPDATES (5)
  14. #define ACLK_DATABASE_CLEANUP_FIRST (60)
  15. #define ACLK_DATABASE_ROTATION_DELAY (60)
  16. #define ACLK_DATABASE_CLEANUP_INTERVAL (3600)
  17. #define ACLK_DATABASE_ROTATION_INTERVAL (3600)
  18. #define ACLK_DELETE_ACK_INTERNAL (600)
  19. #define ACLK_DELETE_ACK_ALERTS_INTERNAL (86400)
  20. #define ACLK_SYNC_QUERY_SIZE 512
  21. struct aclk_completion {
  22. uv_mutex_t mutex;
  23. uv_cond_t cond;
  24. volatile unsigned completed;
  25. };
  26. static inline void init_aclk_completion(struct aclk_completion *p)
  27. {
  28. p->completed = 0;
  29. fatal_assert(0 == uv_cond_init(&p->cond));
  30. fatal_assert(0 == uv_mutex_init(&p->mutex));
  31. }
  32. static inline void destroy_aclk_completion(struct aclk_completion *p)
  33. {
  34. uv_cond_destroy(&p->cond);
  35. uv_mutex_destroy(&p->mutex);
  36. }
  37. static inline void wait_for_aclk_completion(struct aclk_completion *p)
  38. {
  39. uv_mutex_lock(&p->mutex);
  40. while (0 == p->completed) {
  41. uv_cond_wait(&p->cond, &p->mutex);
  42. }
  43. fatal_assert(1 == p->completed);
  44. uv_mutex_unlock(&p->mutex);
  45. }
  46. static inline void aclk_complete(struct aclk_completion *p)
  47. {
  48. uv_mutex_lock(&p->mutex);
  49. p->completed = 1;
  50. uv_mutex_unlock(&p->mutex);
  51. uv_cond_broadcast(&p->cond);
  52. }
  53. extern uv_mutex_t aclk_async_lock;
  54. static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out)
  55. {
  56. uuid_unparse_lower(*uuid, out);
  57. out[8] = '_';
  58. out[13] = '_';
  59. out[18] = '_';
  60. out[23] = '_';
  61. }
  62. static inline char *get_str_from_uuid(uuid_t *uuid)
  63. {
  64. char uuid_str[GUID_LEN + 1];
  65. if (unlikely(!uuid)) {
  66. uuid_t zero_uuid;
  67. uuid_clear(zero_uuid);
  68. uuid_unparse_lower(zero_uuid, uuid_str);
  69. }
  70. else
  71. uuid_unparse_lower(*uuid, uuid_str);
  72. return strdupz(uuid_str);
  73. }
  74. #define TABLE_ACLK_CHART "CREATE TABLE IF NOT EXISTS aclk_chart_%s (sequence_id INTEGER PRIMARY KEY, " \
  75. "date_created, date_updated, date_submitted, status, uuid, type, unique_id, " \
  76. "update_count default 1, unique(uuid, status));"
  77. #define TABLE_ACLK_CHART_PAYLOAD "CREATE TABLE IF NOT EXISTS aclk_chart_payload_%s (unique_id BLOB PRIMARY KEY, " \
  78. "uuid, claim_id, type, date_created, payload);"
  79. #define TABLE_ACLK_CHART_LATEST "CREATE TABLE IF NOT EXISTS aclk_chart_latest_%s (uuid BLOB PRIMARY KEY, " \
  80. "unique_id, date_submitted);"
  81. #define TRIGGER_ACLK_CHART_PAYLOAD "CREATE TRIGGER IF NOT EXISTS aclk_tr_chart_payload_%s " \
  82. "after insert on aclk_chart_payload_%s " \
  83. "begin insert into aclk_chart_%s (uuid, unique_id, type, status, date_created) values " \
  84. " (new.uuid, new.unique_id, new.type, 'pending', strftime('%%s')) on conflict(uuid, status) " \
  85. " do update set unique_id = new.unique_id, update_count = update_count + 1; " \
  86. "end;"
  87. #define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
  88. "alert_unique_id, date_created, date_submitted, date_cloud_ack, " \
  89. "unique(alert_unique_id)); " \
  90. "insert into aclk_alert_%s (alert_unique_id, date_created) " \
  91. "select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s where new_status <> 0 and new_status <> -2 order by unique_id asc on conflict (alert_unique_id) do nothing;"
  92. #define INDEX_ACLK_CHART "CREATE INDEX IF NOT EXISTS aclk_chart_index_%s ON aclk_chart_%s (unique_id);"
  93. #define INDEX_ACLK_CHART_LATEST "CREATE INDEX IF NOT EXISTS aclk_chart_latest_index_%s ON aclk_chart_latest_%s (unique_id);"
  94. #define INDEX_ACLK_ALERT "CREATE INDEX IF NOT EXISTS aclk_alert_index_%s ON aclk_alert_%s (alert_unique_id);"
  95. enum aclk_database_opcode {
  96. ACLK_DATABASE_NOOP = 0,
  97. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  98. ACLK_DATABASE_ADD_CHART,
  99. ACLK_DATABASE_ADD_DIMENSION,
  100. ACLK_DATABASE_PUSH_CHART,
  101. ACLK_DATABASE_PUSH_CHART_CONFIG,
  102. ACLK_DATABASE_RESET_CHART,
  103. ACLK_DATABASE_CHART_ACK,
  104. ACLK_DATABASE_UPD_RETENTION,
  105. ACLK_DATABASE_DIM_DELETION,
  106. ACLK_DATABASE_ORPHAN_HOST,
  107. #endif
  108. ACLK_DATABASE_ALARM_HEALTH_LOG,
  109. ACLK_DATABASE_CLEANUP,
  110. ACLK_DATABASE_DELETE_HOST,
  111. ACLK_DATABASE_NODE_INFO,
  112. ACLK_DATABASE_PUSH_ALERT,
  113. ACLK_DATABASE_PUSH_ALERT_CONFIG,
  114. ACLK_DATABASE_PUSH_ALERT_SNAPSHOT,
  115. ACLK_DATABASE_QUEUE_REMOVED_ALERTS,
  116. ACLK_DATABASE_TIMER
  117. };
  118. struct aclk_chart_payload_t {
  119. long sequence_id;
  120. long last_sequence_id;
  121. char *payload;
  122. struct aclk_chart_payload_t *next;
  123. };
  124. struct aclk_database_cmd {
  125. enum aclk_database_opcode opcode;
  126. void *data;
  127. void *data_param;
  128. int count;
  129. uint64_t param1;
  130. struct aclk_completion *completion;
  131. };
  132. #define ACLK_DATABASE_CMD_Q_MAX_SIZE (16384)
  133. struct aclk_database_cmdqueue {
  134. unsigned head, tail;
  135. struct aclk_database_cmd cmd_array[ACLK_DATABASE_CMD_Q_MAX_SIZE];
  136. };
  137. struct aclk_database_worker_config {
  138. uv_thread_t thread;
  139. char uuid_str[GUID_LEN + 1];
  140. char node_id[GUID_LEN + 1];
  141. char host_guid[GUID_LEN + 1];
  142. uint64_t chart_sequence_id; // last chart_sequence_id
  143. time_t chart_timestamp; // last chart timestamp
  144. time_t cleanup_after; // Start a cleanup after this timestamp
  145. time_t startup_time; // When the sync thread started
  146. time_t rotation_after;
  147. uint64_t batch_id; // batch id to use
  148. uint64_t alerts_batch_id; // batch id for alerts to use
  149. uint64_t alerts_start_seq_id; // cloud has asked to start streaming from
  150. uint64_t alert_sequence_id; // last alert sequence_id
  151. uint32_t chart_payload_count;
  152. uint64_t alerts_snapshot_id; //will contain the snapshot_id value if snapshot was requested
  153. uint64_t alerts_ack_sequence_id; //last sequence_id ack'ed from cloud via sendsnapshot message
  154. uv_loop_t *loop;
  155. RRDHOST *host;
  156. uv_async_t async;
  157. /* FIFO command queue */
  158. uv_mutex_t cmd_mutex;
  159. uv_cond_t cmd_cond;
  160. volatile unsigned queue_size;
  161. struct aclk_database_cmdqueue cmd_queue;
  162. uint32_t retry_count;
  163. int chart_updates;
  164. int alert_updates;
  165. time_t batch_created;
  166. int node_info_send;
  167. int chart_pending;
  168. int chart_reset_count;
  169. volatile unsigned is_shutting_down;
  170. volatile unsigned is_orphan;
  171. struct aclk_database_worker_config *next;
  172. };
  173. static inline RRDHOST *find_host_by_node_id(char *node_id)
  174. {
  175. uuid_t node_uuid;
  176. if (unlikely(!node_id))
  177. return NULL;
  178. if (uuid_parse(node_id, node_uuid))
  179. return NULL;
  180. RRDHOST *host = localhost;
  181. while(host) {
  182. if (host->node_id && !(uuid_compare(*host->node_id, node_uuid)))
  183. return host;
  184. host = host->next;
  185. }
  186. return NULL;
  187. }
  188. extern sqlite3 *db_meta;
  189. extern int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
  190. extern void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
  191. extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
  192. int aclk_worker_enq_cmd(char *node_id, struct aclk_database_cmd *cmd);
  193. void aclk_data_rotated(void);
  194. void sql_aclk_sync_init(void);
  195. void sql_check_aclk_table_list(struct aclk_database_worker_config *wc);
  196. void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
  197. void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
  198. int claimed();
  199. void aclk_sync_exit_all();
  200. struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id);
  201. #endif //NETDATA_SQLITE_ACLK_H