sqlite_aclk.c 26 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_functions.h"
  3. #include "sqlite_aclk.h"
  4. #include "sqlite_aclk_node.h"
  5. void sanity_check(void) {
  6. // make sure the compiler will stop on misconfigurations
  7. BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED);
  8. }
  9. static int sql_check_aclk_table(void *data, int argc, char **argv, char **column)
  10. {
  11. struct aclk_database_worker_config *wc = data;
  12. UNUSED(argc);
  13. UNUSED(column);
  14. debug(D_ACLK_SYNC,"Scheduling aclk sync table check for node %s", (char *) argv[0]);
  15. struct aclk_database_cmd cmd;
  16. memset(&cmd, 0, sizeof(cmd));
  17. cmd.opcode = ACLK_DATABASE_DELETE_HOST;
  18. cmd.data = strdupz((char *) argv[0]);
  19. aclk_database_enq_cmd_noblock(wc, &cmd);
  20. return 0;
  21. }
  22. #define SQL_SELECT_ACLK_ACTIVE_LIST "SELECT REPLACE(SUBSTR(name,19),'_','-') FROM sqlite_schema " \
  23. "WHERE name LIKE 'aclk_chart_latest_%' AND type IN ('table');"
  24. static void sql_check_aclk_table_list(struct aclk_database_worker_config *wc)
  25. {
  26. char *err_msg = NULL;
  27. debug(D_ACLK_SYNC,"Cleaning tables for nodes that do not exist");
  28. int rc = sqlite3_exec_monitored(db_meta, SQL_SELECT_ACLK_ACTIVE_LIST, sql_check_aclk_table, (void *) wc, &err_msg);
  29. if (rc != SQLITE_OK) {
  30. error_report("Query failed when trying to check for obsolete ACLK sync tables, %s", err_msg);
  31. sqlite3_free(err_msg);
  32. }
  33. }
  34. static void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  35. {
  36. UNUSED(cmd);
  37. debug(D_ACLK, "Checking database for %s", wc->host_guid);
  38. BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
  39. buffer_sprintf(sql,"DELETE FROM aclk_alert_%s WHERE date_submitted IS NOT NULL AND "
  40. "CAST(date_cloud_ack AS INT) < unixepoch()-%d;", wc->uuid_str, ACLK_DELETE_ACK_ALERTS_INTERNAL);
  41. db_execute(buffer_tostring(sql));
  42. buffer_free(sql);
  43. }
  44. #define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;"
  45. static int is_host_available(uuid_t *host_id)
  46. {
  47. sqlite3_stmt *res = NULL;
  48. int rc;
  49. if (unlikely(!db_meta)) {
  50. if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
  51. error_report("Database has not been initialized");
  52. return 1;
  53. }
  54. rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_HOST_BY_UUID, -1, &res, 0);
  55. if (unlikely(rc != SQLITE_OK)) {
  56. error_report("Failed to prepare statement to select node instance information for a node");
  57. return 1;
  58. }
  59. rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
  60. if (unlikely(rc != SQLITE_OK)) {
  61. error_report("Failed to bind host_id parameter to select node instance information");
  62. goto failed;
  63. }
  64. rc = sqlite3_step_monitored(res);
  65. failed:
  66. if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
  67. error_report("Failed to finalize the prepared statement when checking host existence");
  68. return (rc == SQLITE_ROW);
  69. }
  70. // OPCODE: ACLK_DATABASE_DELETE_HOST
  71. void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  72. {
  73. UNUSED(wc);
  74. char uuid_str[GUID_LEN + 1];
  75. char host_str[GUID_LEN + 1];
  76. int rc;
  77. uuid_t host_uuid;
  78. char *host_guid = (char *)cmd.data;
  79. if (unlikely(!host_guid))
  80. return;
  81. rc = uuid_parse(host_guid, host_uuid);
  82. freez(host_guid);
  83. if (rc)
  84. return;
  85. uuid_unparse_lower(host_uuid, host_str);
  86. uuid_unparse_lower_fix(&host_uuid, uuid_str);
  87. debug(D_ACLK_SYNC, "Checking if I should delete aclk tables for node %s", host_str);
  88. if (is_host_available(&host_uuid)) {
  89. debug(D_ACLK_SYNC, "Host %s exists, not deleting aclk sync tables", host_str);
  90. return;
  91. }
  92. debug(D_ACLK_SYNC, "Host %s does NOT exist, can delete aclk sync tables", host_str);
  93. sqlite3_stmt *res = NULL;
  94. BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
  95. buffer_sprintf(sql,"SELECT 'drop '||type||' IF EXISTS '||name||';' FROM sqlite_schema " \
  96. "WHERE name LIKE 'aclk_%%_%s' AND type IN ('table', 'trigger', 'index');", uuid_str);
  97. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  98. if (rc != SQLITE_OK) {
  99. error_report("Failed to prepare statement to clean up aclk tables");
  100. goto fail;
  101. }
  102. buffer_flush(sql);
  103. while (sqlite3_step_monitored(res) == SQLITE_ROW)
  104. buffer_strcat(sql, (char *) sqlite3_column_text(res, 0));
  105. rc = sqlite3_finalize(res);
  106. if (unlikely(rc != SQLITE_OK))
  107. error_report("Failed to finalize statement to clean up aclk tables, rc = %d", rc);
  108. db_execute(buffer_tostring(sql));
  109. fail:
  110. buffer_free(sql);
  111. }
  112. uv_mutex_t aclk_async_lock;
  113. struct aclk_database_worker_config *aclk_thread_head = NULL;
  114. int claimed()
  115. {
  116. int rc;
  117. rrdhost_aclk_state_lock(localhost);
  118. rc = (localhost->aclk_state.claimed_id != NULL);
  119. rrdhost_aclk_state_unlock(localhost);
  120. return rc;
  121. }
  122. void aclk_add_worker_thread(struct aclk_database_worker_config *wc)
  123. {
  124. if (unlikely(!wc))
  125. return;
  126. uv_mutex_lock(&aclk_async_lock);
  127. if (unlikely(!wc->host)) {
  128. wc->next = aclk_thread_head;
  129. aclk_thread_head = wc;
  130. }
  131. uv_mutex_unlock(&aclk_async_lock);
  132. }
  133. void aclk_del_worker_thread(struct aclk_database_worker_config *wc)
  134. {
  135. if (unlikely(!wc))
  136. return;
  137. uv_mutex_lock(&aclk_async_lock);
  138. struct aclk_database_worker_config **tmp = &aclk_thread_head;
  139. while (*tmp && (*tmp) != wc)
  140. tmp = &(*tmp)->next;
  141. if (*tmp)
  142. *tmp = wc->next;
  143. uv_mutex_unlock(&aclk_async_lock);
  144. }
  145. int aclk_worker_thread_exists(char *guid)
  146. {
  147. int rc = 0;
  148. uv_mutex_lock(&aclk_async_lock);
  149. struct aclk_database_worker_config *tmp = aclk_thread_head;
  150. while (tmp && !rc) {
  151. rc = strcmp(tmp->uuid_str, guid) == 0;
  152. tmp = tmp->next;
  153. }
  154. uv_mutex_unlock(&aclk_async_lock);
  155. return rc;
  156. }
  157. void aclk_database_init_cmd_queue(struct aclk_database_worker_config *wc)
  158. {
  159. wc->cmd_queue.head = wc->cmd_queue.tail = 0;
  160. wc->queue_size = 0;
  161. fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
  162. fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
  163. }
  164. int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd)
  165. {
  166. unsigned queue_size;
  167. /* wait for free space in queue */
  168. uv_mutex_lock(&wc->cmd_mutex);
  169. if ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE || wc->is_shutting_down) {
  170. uv_mutex_unlock(&wc->cmd_mutex);
  171. return 1;
  172. }
  173. fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
  174. /* enqueue command */
  175. wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
  176. wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
  177. wc->cmd_queue.tail + 1 : 0;
  178. wc->queue_size = queue_size + 1;
  179. uv_mutex_unlock(&wc->cmd_mutex);
  180. return 0;
  181. }
  182. void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd)
  183. {
  184. unsigned queue_size;
  185. /* wait for free space in queue */
  186. uv_mutex_lock(&wc->cmd_mutex);
  187. if (wc->is_shutting_down) {
  188. uv_mutex_unlock(&wc->cmd_mutex);
  189. return;
  190. }
  191. while ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
  192. uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
  193. }
  194. fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
  195. /* enqueue command */
  196. wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
  197. wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
  198. wc->cmd_queue.tail + 1 : 0;
  199. wc->queue_size = queue_size + 1;
  200. uv_mutex_unlock(&wc->cmd_mutex);
  201. /* wake up event loop */
  202. int rc = uv_async_send(&wc->async);
  203. if (unlikely(rc))
  204. debug(D_ACLK_SYNC, "Failed to wake up event loop");
  205. }
  206. struct aclk_database_cmd aclk_database_deq_cmd(struct aclk_database_worker_config* wc)
  207. {
  208. struct aclk_database_cmd ret;
  209. unsigned queue_size;
  210. uv_mutex_lock(&wc->cmd_mutex);
  211. queue_size = wc->queue_size;
  212. if (queue_size == 0 || wc->is_shutting_down) {
  213. memset(&ret, 0, sizeof(ret));
  214. ret.opcode = ACLK_DATABASE_NOOP;
  215. ret.completion = NULL;
  216. if (wc->is_shutting_down)
  217. uv_cond_signal(&wc->cmd_cond);
  218. } else {
  219. /* dequeue command */
  220. ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
  221. if (queue_size == 1) {
  222. wc->cmd_queue.head = wc->cmd_queue.tail = 0;
  223. } else {
  224. wc->cmd_queue.head = wc->cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
  225. wc->cmd_queue.head + 1 : 0;
  226. }
  227. wc->queue_size = queue_size - 1;
  228. /* wake up producers */
  229. uv_cond_signal(&wc->cmd_cond);
  230. }
  231. uv_mutex_unlock(&wc->cmd_mutex);
  232. return ret;
  233. }
  234. struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id)
  235. {
  236. if (unlikely(!node_id))
  237. return NULL;
  238. uv_mutex_lock(&aclk_async_lock);
  239. struct aclk_database_worker_config *wc = aclk_thread_head;
  240. while (wc) {
  241. if (!strcmp(wc->node_id, node_id))
  242. break;
  243. wc = wc->next;
  244. }
  245. uv_mutex_unlock(&aclk_async_lock);
  246. return (wc);
  247. }
  248. void aclk_sync_exit_all()
  249. {
  250. rrd_rdlock();
  251. RRDHOST *host;
  252. rrdhost_foreach_read(host) {
  253. struct aclk_database_worker_config *wc = host->dbsync_worker;
  254. if (wc) {
  255. wc->is_shutting_down = 1;
  256. (void) aclk_database_deq_cmd(wc);
  257. uv_cond_signal(&wc->cmd_cond);
  258. }
  259. }
  260. rrd_unlock();
  261. uv_mutex_lock(&aclk_async_lock);
  262. struct aclk_database_worker_config *wc = aclk_thread_head;
  263. while (wc) {
  264. wc->is_shutting_down = 1;
  265. wc = wc->next;
  266. }
  267. uv_mutex_unlock(&aclk_async_lock);
  268. }
  269. enum {
  270. IDX_HOST_ID,
  271. IDX_HOSTNAME,
  272. IDX_REGISTRY,
  273. IDX_UPDATE_EVERY,
  274. IDX_OS,
  275. IDX_TIMEZONE,
  276. IDX_TAGS,
  277. IDX_HOPS,
  278. IDX_MEMORY_MODE,
  279. IDX_ABBREV_TIMEZONE,
  280. IDX_UTC_OFFSET,
  281. IDX_PROGRAM_NAME,
  282. IDX_PROGRAM_VERSION,
  283. IDX_ENTRIES,
  284. IDX_HEALTH_ENABLED,
  285. };
  286. static int create_host_callback(void *data, int argc, char **argv, char **column)
  287. {
  288. UNUSED(data);
  289. UNUSED(argc);
  290. UNUSED(column);
  291. char guid[UUID_STR_LEN];
  292. uuid_unparse_lower(*(uuid_t *)argv[IDX_HOST_ID], guid);
  293. struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
  294. __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
  295. system_info->hops = str2i((const char *) argv[IDX_HOPS]);
  296. sql_build_host_system_info((uuid_t *)argv[IDX_HOST_ID], system_info);
  297. RRDHOST *host = rrdhost_find_or_create(
  298. (const char *) argv[IDX_HOSTNAME]
  299. , (const char *) argv[IDX_REGISTRY]
  300. , guid
  301. , (const char *) argv[IDX_OS]
  302. , (const char *) argv[IDX_TIMEZONE]
  303. , (const char *) argv[IDX_ABBREV_TIMEZONE]
  304. , argv[IDX_UTC_OFFSET] ? str2uint32_t(argv[IDX_UTC_OFFSET]) : 0
  305. , (const char *) argv[IDX_TAGS]
  306. , (const char *) (argv[IDX_PROGRAM_NAME] ? argv[IDX_PROGRAM_NAME] : "unknown")
  307. , (const char *) (argv[IDX_PROGRAM_VERSION] ? argv[IDX_PROGRAM_VERSION] : "unknown")
  308. , argv[3] ? str2i(argv[IDX_UPDATE_EVERY]) : 1
  309. , argv[13] ? str2i(argv[IDX_ENTRIES]) : 0
  310. , default_rrd_memory_mode
  311. , 0 // health
  312. , 0 // rrdpush enabled
  313. , NULL //destination
  314. , NULL // api key
  315. , NULL // send charts matching
  316. , false // rrdpush_enable_replication
  317. , 0 // rrdpush_seconds_to_replicate
  318. , 0 // rrdpush_replication_step
  319. , system_info
  320. , 1
  321. );
  322. if (likely(host))
  323. host->rrdlabels = sql_load_host_labels((uuid_t *)argv[IDX_HOST_ID]);
  324. #ifdef NETDATA_INTERNAL_CHECKS
  325. char node_str[UUID_STR_LEN] = "<none>";
  326. if (likely(host->node_id))
  327. uuid_unparse_lower(*host->node_id, node_str);
  328. internal_error(true, "Adding archived host \"%s\" with GUID \"%s\" node id = \"%s\"", rrdhost_hostname(host), host->machine_guid, node_str);
  329. #endif
  330. return 0;
  331. }
  332. #ifdef ENABLE_ACLK
  333. static int aclk_start_sync_thread(void *data, int argc, char **argv, char **column)
  334. {
  335. char uuid_str[GUID_LEN + 1];
  336. UNUSED(data);
  337. UNUSED(argc);
  338. UNUSED(column);
  339. uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str);
  340. RRDHOST *host = rrdhost_find_by_guid(uuid_str);
  341. if (host == localhost)
  342. return 0;
  343. sql_create_aclk_table(host, (uuid_t *) argv[0], (uuid_t *) argv[1]);
  344. return 0;
  345. }
  346. #endif
  347. void sql_aclk_sync_init(void)
  348. {
  349. char *err_msg = NULL;
  350. int rc;
  351. if (unlikely(!db_meta)) {
  352. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) {
  353. return;
  354. }
  355. error_report("Database has not been initialized");
  356. return;
  357. }
  358. info("Creating archived hosts");
  359. rc = sqlite3_exec_monitored(db_meta, "SELECT host_id, hostname, registry_hostname, update_every, os, "
  360. "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, "
  361. "program_version, entries, health_enabled FROM host WHERE hops >0;",
  362. create_host_callback, NULL, &err_msg);
  363. if (rc != SQLITE_OK) {
  364. error_report("SQLite error when loading archived hosts, rc = %d (%s)", rc, err_msg);
  365. sqlite3_free(err_msg);
  366. }
  367. #ifdef ENABLE_ACLK
  368. fatal_assert(0 == uv_mutex_init(&aclk_async_lock));
  369. rc = sqlite3_exec_monitored(db_meta, "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni WHERE "
  370. "h.host_id = ni.host_id AND ni.node_id IS NOT NULL;", aclk_start_sync_thread, NULL, &err_msg);
  371. if (rc != SQLITE_OK) {
  372. error_report("SQLite error when starting ACLK sync threads, rc = %d (%s)", rc, err_msg);
  373. sqlite3_free(err_msg);
  374. }
  375. info("ACLK sync initialization completed");
  376. #endif
  377. }
  378. static void async_cb(uv_async_t *handle)
  379. {
  380. uv_stop(handle->loop);
  381. uv_update_time(handle->loop);
  382. debug(D_ACLK_SYNC, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
  383. }
  384. #define TIMER_PERIOD_MS (1000)
  385. static void timer_cb(uv_timer_t* handle)
  386. {
  387. uv_stop(handle->loop);
  388. uv_update_time(handle->loop);
  389. #ifdef ENABLE_ACLK
  390. struct aclk_database_worker_config *wc = handle->data;
  391. struct aclk_database_cmd cmd;
  392. memset(&cmd, 0, sizeof(cmd));
  393. cmd.opcode = ACLK_DATABASE_TIMER;
  394. aclk_database_enq_cmd_noblock(wc, &cmd);
  395. time_t now = now_realtime_sec();
  396. if (wc->cleanup_after && wc->cleanup_after < now) {
  397. cmd.opcode = ACLK_DATABASE_CLEANUP;
  398. if (!aclk_database_enq_cmd_noblock(wc, &cmd))
  399. wc->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
  400. }
  401. if (aclk_connected) {
  402. if (wc->alert_updates && !wc->pause_alert_updates) {
  403. cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
  404. cmd.count = ACLK_MAX_ALERT_UPDATES;
  405. aclk_database_enq_cmd_noblock(wc, &cmd);
  406. }
  407. }
  408. #endif
  409. }
  410. static void aclk_database_worker(void *arg)
  411. {
  412. service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
  413. worker_register("ACLKSYNC");
  414. worker_register_job_name(ACLK_DATABASE_NOOP, "noop");
  415. worker_register_job_name(ACLK_DATABASE_ORPHAN_HOST, "node orphan");
  416. worker_register_job_name(ACLK_DATABASE_ALARM_HEALTH_LOG, "alert log");
  417. worker_register_job_name(ACLK_DATABASE_CLEANUP, "cleanup");
  418. worker_register_job_name(ACLK_DATABASE_DELETE_HOST, "node delete");
  419. worker_register_job_name(ACLK_DATABASE_NODE_INFO, "node info");
  420. worker_register_job_name(ACLK_DATABASE_NODE_COLLECTORS, "node collectors");
  421. worker_register_job_name(ACLK_DATABASE_PUSH_ALERT, "alert push");
  422. worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_CONFIG, "alert conf push");
  423. worker_register_job_name(ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, "alert snapshot");
  424. worker_register_job_name(ACLK_DATABASE_QUEUE_REMOVED_ALERTS, "alerts check");
  425. worker_register_job_name(ACLK_DATABASE_TIMER, "timer");
  426. struct aclk_database_worker_config *wc = arg;
  427. uv_loop_t *loop;
  428. int ret;
  429. enum aclk_database_opcode opcode;
  430. uv_timer_t timer_req;
  431. struct aclk_database_cmd cmd;
  432. char threadname[NETDATA_THREAD_NAME_MAX+1];
  433. if (wc->host)
  434. snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host));
  435. else {
  436. snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", wc->uuid_str);
  437. threadname[11] = '\0';
  438. }
  439. uv_thread_set_name_np(wc->thread, threadname);
  440. loop = wc->loop = mallocz(sizeof(uv_loop_t));
  441. ret = uv_loop_init(loop);
  442. if (ret) {
  443. error("uv_loop_init(): %s", uv_strerror(ret));
  444. goto error_after_loop_init;
  445. }
  446. loop->data = wc;
  447. ret = uv_async_init(wc->loop, &wc->async, async_cb);
  448. if (ret) {
  449. error("uv_async_init(): %s", uv_strerror(ret));
  450. goto error_after_async_init;
  451. }
  452. wc->async.data = wc;
  453. ret = uv_timer_init(loop, &timer_req);
  454. if (ret) {
  455. error("uv_timer_init(): %s", uv_strerror(ret));
  456. goto error_after_timer_init;
  457. }
  458. timer_req.data = wc;
  459. fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
  460. wc->node_info_send = 1;
  461. info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, (unsigned long int) sizeof(*wc));
  462. memset(&cmd, 0, sizeof(cmd));
  463. wc->startup_time = now_realtime_sec();
  464. wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST;
  465. debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count);
  466. while (likely(!netdata_exit)) {
  467. worker_is_idle();
  468. uv_run(loop, UV_RUN_DEFAULT);
  469. /* wait for commands */
  470. do {
  471. cmd = aclk_database_deq_cmd(wc);
  472. if (netdata_exit)
  473. break;
  474. opcode = cmd.opcode;
  475. if(likely(opcode != ACLK_DATABASE_NOOP))
  476. worker_is_busy(opcode);
  477. switch (opcode) {
  478. case ACLK_DATABASE_NOOP:
  479. /* the command queue was empty, do nothing */
  480. break;
  481. // MAINTENANCE
  482. case ACLK_DATABASE_CLEANUP:
  483. debug(D_ACLK_SYNC, "Database cleanup for %s", wc->host_guid);
  484. if (wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST + 2 < now_realtime_sec() && claimed() && aclk_connected) {
  485. cmd.opcode = ACLK_DATABASE_NODE_INFO;
  486. cmd.completion = NULL;
  487. (void) aclk_database_enq_cmd_noblock(wc, &cmd);
  488. }
  489. sql_maint_aclk_sync_database(wc, cmd);
  490. if (wc->host == localhost)
  491. sql_check_aclk_table_list(wc);
  492. break;
  493. case ACLK_DATABASE_DELETE_HOST:
  494. debug(D_ACLK_SYNC,"Cleaning ACLK tables for %s", (char *) cmd.data);
  495. sql_delete_aclk_table_list(wc, cmd);
  496. break;
  497. // ALERTS
  498. case ACLK_DATABASE_PUSH_ALERT_CONFIG:
  499. debug(D_ACLK_SYNC,"Pushing chart config info to the cloud for %s", wc->host_guid);
  500. aclk_push_alert_config_event(wc, cmd);
  501. break;
  502. case ACLK_DATABASE_PUSH_ALERT:
  503. debug(D_ACLK_SYNC, "Pushing alert info to the cloud for %s", wc->host_guid);
  504. aclk_push_alert_event(wc, cmd);
  505. break;
  506. case ACLK_DATABASE_ALARM_HEALTH_LOG:
  507. debug(D_ACLK_SYNC, "Pushing alarm health log to the cloud for %s", wc->host_guid);
  508. aclk_push_alarm_health_log(wc, cmd);
  509. break;
  510. case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:
  511. debug(D_ACLK_SYNC, "Pushing alert snapshot to the cloud for node %s", wc->host_guid);
  512. aclk_push_alert_snapshot_event(wc, cmd);
  513. break;
  514. case ACLK_DATABASE_QUEUE_REMOVED_ALERTS:
  515. debug(D_ACLK_SYNC, "Queueing removed alerts for node %s", wc->host_guid);
  516. sql_process_queue_removed_alerts_to_aclk(wc, cmd);
  517. break;
  518. // NODE OPERATIONS
  519. case ACLK_DATABASE_NODE_INFO:
  520. debug(D_ACLK_SYNC,"Sending node info for %s", wc->uuid_str);
  521. sql_build_node_info(wc, cmd);
  522. break;
  523. case ACLK_DATABASE_NODE_COLLECTORS:
  524. debug(D_ACLK_SYNC,"Sending node collectors info for %s", wc->uuid_str);
  525. sql_build_node_collectors(wc);
  526. break;
  527. #ifdef ENABLE_ACLK
  528. // NODE_INSTANCE DETECTION
  529. case ACLK_DATABASE_ORPHAN_HOST:
  530. wc->host = NULL;
  531. wc->is_orphan = 1;
  532. aclk_add_worker_thread(wc);
  533. break;
  534. #endif
  535. case ACLK_DATABASE_TIMER:
  536. if (unlikely(localhost && !wc->host && !wc->is_orphan)) {
  537. if (claimed()) {
  538. wc->host = rrdhost_find_by_guid(wc->host_guid);
  539. if (wc->host) {
  540. info("HOST %s (%s) detected as active", rrdhost_hostname(wc->host), wc->host_guid);
  541. snprintfz(threadname, NETDATA_THREAD_NAME_MAX, "ACLK[%s]", rrdhost_hostname(wc->host));
  542. uv_thread_set_name_np(wc->thread, threadname);
  543. wc->host->dbsync_worker = wc;
  544. if (unlikely(!wc->hostname))
  545. wc->hostname = strdupz(rrdhost_hostname(wc->host));
  546. aclk_del_worker_thread(wc);
  547. wc->node_info_send = 1;
  548. }
  549. }
  550. }
  551. if (wc->node_info_send && localhost && claimed() && aclk_connected) {
  552. cmd.opcode = ACLK_DATABASE_NODE_INFO;
  553. cmd.completion = NULL;
  554. wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd);
  555. }
  556. if (wc->node_collectors_send && wc->node_collectors_send + 30 < now_realtime_sec()) {
  557. cmd.opcode = ACLK_DATABASE_NODE_COLLECTORS;
  558. cmd.completion = NULL;
  559. wc->node_collectors_send = aclk_database_enq_cmd_noblock(wc, &cmd);
  560. }
  561. if (localhost == wc->host)
  562. (void) sqlite3_wal_checkpoint(db_meta, NULL);
  563. break;
  564. default:
  565. debug(D_ACLK_SYNC, "%s: default.", __func__);
  566. break;
  567. }
  568. if (cmd.completion)
  569. aclk_complete(cmd.completion);
  570. } while (opcode != ACLK_DATABASE_NOOP);
  571. }
  572. if (!uv_timer_stop(&timer_req))
  573. uv_close((uv_handle_t *)&timer_req, NULL);
  574. /* cleanup operations of the event loop */
  575. //info("Shutting down ACLK sync event loop for %s", wc->host_guid);
  576. /*
  577. * uv_async_send after uv_close does not seem to crash in linux at the moment,
  578. * it is however undocumented behaviour we need to be aware if this becomes
  579. * an issue in the future.
  580. */
  581. uv_close((uv_handle_t *)&wc->async, NULL);
  582. uv_run(loop, UV_RUN_DEFAULT);
  583. info("Shutting down ACLK sync event loop complete for host %s", wc->host_guid);
  584. /* TODO: don't let the API block by waiting to enqueue commands */
  585. uv_cond_destroy(&wc->cmd_cond);
  586. int rc;
  587. do {
  588. rc = uv_loop_close(loop);
  589. } while (rc != UV_EBUSY);
  590. freez(loop);
  591. rrd_rdlock();
  592. if (likely(wc->host))
  593. wc->host->dbsync_worker = NULL;
  594. freez(wc->hostname);
  595. freez(wc);
  596. rrd_unlock();
  597. worker_unregister();
  598. return;
  599. error_after_timer_init:
  600. uv_close((uv_handle_t *)&wc->async, NULL);
  601. error_after_async_init:
  602. fatal_assert(0 == uv_loop_close(loop));
  603. error_after_loop_init:
  604. freez(loop);
  605. worker_unregister();
  606. }
  607. // -------------------------------------------------------------
  608. void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
  609. {
  610. #ifdef ENABLE_ACLK
  611. char uuid_str[GUID_LEN + 1];
  612. char host_guid[GUID_LEN + 1];
  613. uuid_unparse_lower_fix(host_uuid, uuid_str);
  614. if (aclk_worker_thread_exists(uuid_str))
  615. return;
  616. uuid_unparse_lower(*host_uuid, host_guid);
  617. BUFFER *sql = buffer_create(ACLK_SYNC_QUERY_SIZE, &netdata_buffers_statistics.buffers_sqlite);
  618. buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str);
  619. db_execute(buffer_tostring(sql));
  620. buffer_flush(sql);
  621. buffer_sprintf(sql, INDEX_ACLK_ALERT, uuid_str, uuid_str);
  622. db_execute(buffer_tostring(sql));
  623. buffer_free(sql);
  624. if (likely(host) && unlikely(host->dbsync_worker))
  625. return;
  626. struct aclk_database_worker_config *wc = callocz(1, sizeof(struct aclk_database_worker_config));
  627. if (node_id && !uuid_is_null(*node_id))
  628. uuid_unparse_lower(*node_id, wc->node_id);
  629. if (likely(host)) {
  630. host->dbsync_worker = (void *)wc;
  631. wc->hostname = strdupz(rrdhost_hostname(host));
  632. if (node_id && !host->node_id) {
  633. host->node_id = mallocz(sizeof(*host->node_id));
  634. uuid_copy(*host->node_id, *node_id);
  635. }
  636. }
  637. else
  638. wc->hostname = get_hostname_by_node_id(wc->node_id);
  639. wc->host = host;
  640. strcpy(wc->uuid_str, uuid_str);
  641. strcpy(wc->host_guid, host_guid);
  642. wc->alert_updates = 0;
  643. aclk_database_init_cmd_queue(wc);
  644. aclk_add_worker_thread(wc);
  645. fatal_assert(0 == uv_thread_create(&(wc->thread), aclk_database_worker, wc));
  646. #else
  647. UNUSED(host);
  648. UNUSED(host_uuid);
  649. UNUSED(node_id);
  650. #endif
  651. }