sqlite_aclk_alert.c 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_functions.h"
  3. #include "sqlite_aclk_alert.h"
  4. #ifdef ENABLE_ACLK
  5. #include "../../aclk/aclk_alarm_api.h"
  6. #include "../../aclk/aclk.h"
  7. #endif
  8. time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after_unique_id, char *uuid_str) {
  9. sqlite3_stmt *res = NULL;
  10. int rc = 0;
  11. time_t when = 0;
  12. char sql[ACLK_SYNC_QUERY_SIZE];
  13. snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select when_key from health_log_%s where alarm_id = %u " \
  14. "and unique_id > %u and unique_id < %u " \
  15. "and new_status = -2;", uuid_str, alarm_id, after_unique_id, before_unique_id);
  16. rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  17. if (rc != SQLITE_OK) {
  18. error_report("Failed to prepare statement when trying to find removed gap.");
  19. return 0;
  20. }
  21. rc = sqlite3_step(res);
  22. if (likely(rc == SQLITE_ROW)) {
  23. when = (time_t) sqlite3_column_int64(res, 0);
  24. }
  25. rc = sqlite3_finalize(res);
  26. if (unlikely(rc != SQLITE_OK))
  27. error_report("Failed to finalize statement when trying to find removed gap, rc = %d", rc);
  28. return when;
  29. }
  30. #define MAX_REMOVED_PERIOD 900
  31. //decide if some events should be sent or not
  32. int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
  33. {
  34. sqlite3_stmt *res = NULL;
  35. char uuid_str[GUID_LEN + 1];
  36. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  37. int send = 1, rc = 0;
  38. if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) {
  39. return 0;
  40. }
  41. if (unlikely(uuid_is_null(ae->config_hash_id)))
  42. return 0;
  43. char sql[ACLK_SYNC_QUERY_SIZE];
  44. uuid_t config_hash_id;
  45. RRDCALC_STATUS status;
  46. uint32_t unique_id;
  47. //get the previous sent event of this alarm_id
  48. snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \
  49. where hl.unique_id = aa.alert_unique_id \
  50. and hl.alarm_id = %u and hl.unique_id <> %u \
  51. order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id, ae->unique_id);
  52. rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  53. if (rc != SQLITE_OK) {
  54. error_report("Failed to prepare statement when trying to filter alert events.");
  55. send = 1;
  56. return send;
  57. }
  58. rc = sqlite3_step(res);
  59. if (likely(rc == SQLITE_ROW)) {
  60. status = (RRDCALC_STATUS) sqlite3_column_int(res, 0);
  61. if (sqlite3_column_type(res, 1) != SQLITE_NULL)
  62. uuid_copy(config_hash_id, *((uuid_t *) sqlite3_column_blob(res, 1)));
  63. unique_id = (uint32_t) sqlite3_column_int64(res, 2);
  64. } else {
  65. send = 1;
  66. goto done;
  67. }
  68. if (ae->new_status != (RRDCALC_STATUS)status) {
  69. send = 1;
  70. goto done;
  71. }
  72. if (uuid_compare(ae->config_hash_id, config_hash_id)) {
  73. send = 1;
  74. goto done;
  75. }
  76. //same status, same config
  77. if (ae->new_status == RRDCALC_STATUS_CLEAR) {
  78. send = 0;
  79. goto done;
  80. }
  81. //detect a long off period of the agent, TODO make global
  82. if (ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) {
  83. time_t when = removed_when(ae->alarm_id, ae->unique_id, unique_id, uuid_str);
  84. if (when && (when + (time_t)MAX_REMOVED_PERIOD) < ae->when) {
  85. send = 1;
  86. goto done;
  87. } else {
  88. send = 0;
  89. goto done;
  90. }
  91. }
  92. done:
  93. rc = sqlite3_finalize(res);
  94. if (unlikely(rc != SQLITE_OK))
  95. error_report("Failed to finalize statement when trying to filter alert events, rc = %d", rc);
  96. return send;
  97. }
  98. // will replace call to aclk_update_alarm in health/health_log.c
  99. // and handle both cases
  100. int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
  101. {
  102. if (!claimed())
  103. return 0;
  104. if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) {
  105. return 0;
  106. }
  107. if (!skip_filter) {
  108. if (!should_send_to_cloud(host, ae)) {
  109. return 0;
  110. }
  111. }
  112. int rc = 0;
  113. CHECK_SQLITE_CONNECTION(db_meta);
  114. sqlite3_stmt *res_alert = NULL;
  115. char uuid_str[GUID_LEN + 1];
  116. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  117. BUFFER *sql = buffer_create(1024);
  118. buffer_sprintf(
  119. sql,
  120. "INSERT INTO aclk_alert_%s (alert_unique_id, date_created) "
  121. "VALUES (@alert_unique_id, unixepoch()) on conflict (alert_unique_id) do nothing; ",
  122. uuid_str);
  123. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res_alert, 0);
  124. if (unlikely(rc != SQLITE_OK)) {
  125. error_report("Failed to prepare statement to store alert event");
  126. buffer_free(sql);
  127. return 1;
  128. }
  129. rc = sqlite3_bind_int(res_alert, 1, ae->unique_id);
  130. if (unlikely(rc != SQLITE_OK))
  131. goto bind_fail;
  132. rc = execute_insert(res_alert);
  133. if (unlikely(rc != SQLITE_DONE)) {
  134. error_report("Failed to store alert event %u, rc = %d", ae->unique_id, rc);
  135. goto bind_fail;
  136. }
  137. ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
  138. struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  139. if (wc) {
  140. wc->pause_alert_updates = 0;
  141. }
  142. bind_fail:
  143. if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK))
  144. error_report("Failed to reset statement in store alert event, rc = %d", rc);
  145. buffer_free(sql);
  146. return 0;
  147. }
  148. int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
  149. {
  150. #ifdef ENABLE_ACLK
  151. switch(status) {
  152. case RRDCALC_STATUS_REMOVED:
  153. return ALARM_STATUS_REMOVED;
  154. case RRDCALC_STATUS_UNDEFINED:
  155. return ALARM_STATUS_NOT_A_NUMBER;
  156. case RRDCALC_STATUS_CLEAR:
  157. return ALARM_STATUS_CLEAR;
  158. case RRDCALC_STATUS_WARNING:
  159. return ALARM_STATUS_WARNING;
  160. case RRDCALC_STATUS_CRITICAL:
  161. return ALARM_STATUS_CRITICAL;
  162. default:
  163. return ALARM_STATUS_UNKNOWN;
  164. }
  165. #else
  166. UNUSED(status);
  167. return 1;
  168. #endif
  169. }
  170. void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  171. {
  172. #ifndef ENABLE_ACLK
  173. UNUSED(wc);
  174. UNUSED(cmd);
  175. #else
  176. int rc;
  177. if (unlikely(!wc->alert_updates)) {
  178. log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
  179. return;
  180. }
  181. char *claim_id = get_agent_claimid();
  182. if (unlikely(!claim_id))
  183. return;
  184. if (unlikely(!wc->host)) {
  185. freez(claim_id);
  186. return;
  187. }
  188. BUFFER *sql = buffer_create(1024);
  189. if (wc->alerts_start_seq_id != 0) {
  190. buffer_sprintf(
  191. sql,
  192. "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64
  193. "; UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id < %"PRIu64
  194. " and date_cloud_ack is null "
  195. "; UPDATE aclk_alert_%s SET date_submitted = unixepoch() WHERE sequence_id < %"PRIu64
  196. " and date_submitted is null",
  197. wc->uuid_str,
  198. wc->alerts_start_seq_id,
  199. wc->uuid_str,
  200. wc->alerts_start_seq_id,
  201. wc->uuid_str,
  202. wc->alerts_start_seq_id);
  203. db_execute(buffer_tostring(sql));
  204. buffer_reset(sql);
  205. wc->alerts_start_seq_id = 0;
  206. }
  207. int limit = cmd.count > 0 ? cmd.count : 1;
  208. sqlite3_stmt *res = NULL;
  209. buffer_sprintf(sql, "select aa.sequence_id, hl.unique_id, hl.alarm_id, hl.config_hash_id, hl.updated_by_id, hl.when_key, \
  210. hl.duration, hl.non_clear_duration, hl.flags, hl.exec_run_timestamp, hl.delay_up_to_timestamp, hl.name, \
  211. hl.chart, hl.family, hl.exec, hl.recipient, hl.source, hl.units, hl.info, hl.exec_code, hl.new_status, \
  212. hl.old_status, hl.delay, hl.new_value, hl.old_value, hl.last_repeat, hl.chart_context \
  213. from health_log_%s hl, aclk_alert_%s aa \
  214. where hl.unique_id = aa.alert_unique_id and aa.date_submitted is null \
  215. order by aa.sequence_id asc limit %d;", wc->uuid_str, wc->uuid_str, limit);
  216. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  217. if (rc != SQLITE_OK) {
  218. error_report("Failed to prepare statement when trying to send an alert update via ACLK");
  219. buffer_free(sql);
  220. freez(claim_id);
  221. return;
  222. }
  223. char uuid_str[GUID_LEN + 1];
  224. uint64_t first_sequence_id = 0;
  225. uint64_t last_sequence_id = 0;
  226. static __thread uint64_t log_first_sequence_id = 0;
  227. static __thread uint64_t log_last_sequence_id = 0;
  228. while (sqlite3_step(res) == SQLITE_ROW) {
  229. struct alarm_log_entry alarm_log;
  230. char old_value_string[100 + 1];
  231. char new_value_string[100 + 1];
  232. alarm_log.node_id = wc->node_id;
  233. alarm_log.claim_id = claim_id;
  234. alarm_log.chart = strdupz((char *)sqlite3_column_text(res, 12));
  235. alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11));
  236. alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
  237. alarm_log.batch_id = wc->alerts_batch_id;
  238. alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  239. alarm_log.when = (time_t) sqlite3_column_int64(res, 5);
  240. uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), uuid_str);
  241. alarm_log.config_hash = strdupz((char *)uuid_str);
  242. alarm_log.utc_offset = wc->host->utc_offset;
  243. alarm_log.timezone = strdupz((char *)wc->host->abbrev_timezone);
  244. alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) :
  245. strdupz((char *)wc->host->health_default_exec);
  246. alarm_log.conf_source = strdupz((char *)sqlite3_column_text(res, 16));
  247. char *edit_command = sqlite3_column_bytes(res, 16) > 0 ?
  248. health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) :
  249. strdupz("UNKNOWN=0=UNKNOWN");
  250. alarm_log.command = strdupz(edit_command);
  251. alarm_log.duration = (time_t) sqlite3_column_int64(res, 6);
  252. alarm_log.non_clear_duration = (time_t) sqlite3_column_int64(res, 7);
  253. alarm_log.status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 20));
  254. alarm_log.old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 21));
  255. alarm_log.delay = (int) sqlite3_column_int(res, 22);
  256. alarm_log.delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 10);
  257. alarm_log.last_repeat = (time_t) sqlite3_column_int64(res, 25);
  258. alarm_log.silenced = ((sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) ||
  259. (sqlite3_column_type(res, 15) != SQLITE_NULL &&
  260. !strncmp((char *)sqlite3_column_text(res, 15), "silent", 6))) ?
  261. 1 :
  262. 0;
  263. alarm_log.value_string =
  264. sqlite3_column_type(res, 23) == SQLITE_NULL ?
  265. strdupz((char *)"-") :
  266. strdupz((char *)format_value_and_unit(
  267. new_value_string, 100, sqlite3_column_double(res, 23), (char *)sqlite3_column_text(res, 17), -1));
  268. alarm_log.old_value_string =
  269. sqlite3_column_type(res, 24) == SQLITE_NULL ?
  270. strdupz((char *)"-") :
  271. strdupz((char *)format_value_and_unit(
  272. old_value_string, 100, sqlite3_column_double(res, 24), (char *)sqlite3_column_text(res, 17), -1));
  273. alarm_log.value = (NETDATA_DOUBLE) sqlite3_column_double(res, 23);
  274. alarm_log.old_value = (NETDATA_DOUBLE) sqlite3_column_double(res, 24);
  275. alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  276. alarm_log.rendered_info = sqlite3_column_type(res, 18) == SQLITE_NULL ?
  277. strdupz((char *)"") :
  278. strdupz((char *)sqlite3_column_text(res, 18));
  279. alarm_log.chart_context = sqlite3_column_type(res, 26) == SQLITE_NULL ?
  280. strdupz((char *)"") :
  281. strdupz((char *)sqlite3_column_text(res, 26));
  282. aclk_send_alarm_log_entry(&alarm_log);
  283. if (first_sequence_id == 0)
  284. first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  285. if (log_first_sequence_id == 0)
  286. log_first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  287. last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  288. log_last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  289. destroy_alarm_log_entry(&alarm_log);
  290. freez(edit_command);
  291. }
  292. if (first_sequence_id) {
  293. buffer_flush(sql);
  294. buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
  295. "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
  296. wc->uuid_str, first_sequence_id, last_sequence_id);
  297. db_execute(buffer_tostring(sql));
  298. } else {
  299. if (log_first_sequence_id)
  300. log_access(
  301. "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64,
  302. wc->node_id,
  303. wc->host ? wc->host->hostname : "N/A",
  304. log_first_sequence_id,
  305. log_last_sequence_id,
  306. wc->alerts_batch_id);
  307. log_first_sequence_id = 0;
  308. log_last_sequence_id = 0;
  309. wc->pause_alert_updates = 1;
  310. }
  311. rc = sqlite3_finalize(res);
  312. if (unlikely(rc != SQLITE_OK))
  313. error_report("Failed to finalize statement to send alert entries from the database, rc = %d", rc);
  314. freez(claim_id);
  315. buffer_free(sql);
  316. #endif
  317. return;
  318. }
  319. void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
  320. {
  321. char uuid_str[GUID_LEN + 1];
  322. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  323. BUFFER *sql = buffer_create(1024);
  324. buffer_sprintf(sql,"delete from aclk_alert_%s; " \
  325. "insert into aclk_alert_%s (alert_unique_id, date_created) " \
  326. "select unique_id alert_unique_id, unixepoch() from health_log_%s " \
  327. "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \
  328. "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str);
  329. db_execute(buffer_tostring(sql));
  330. buffer_free(sql);
  331. struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  332. if (wc) {
  333. wc->pause_alert_updates = 0;
  334. }
  335. }
  336. void aclk_send_alarm_health_log(char *node_id)
  337. {
  338. if (unlikely(!node_id))
  339. return;
  340. struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id);
  341. if (likely(!wc)) {
  342. rrd_rdlock();
  343. RRDHOST *host = find_host_by_node_id(node_id);
  344. rrd_unlock();
  345. if (likely(host))
  346. wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  347. }
  348. if (!wc) {
  349. log_access("ACLK REQ [%s (N/A)]: HEALTH LOG REQUEST RECEIVED FOR INVALID NODE", node_id);
  350. return;
  351. }
  352. log_access("ACLK REQ [%s (%s)]: HEALTH LOG REQUEST RECEIVED", node_id, wc->hostname ? wc->hostname : "N/A");
  353. struct aclk_database_cmd cmd;
  354. memset(&cmd, 0, sizeof(cmd));
  355. cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG;
  356. aclk_database_enq_cmd(wc, &cmd);
  357. return;
  358. }
  359. void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  360. {
  361. UNUSED(cmd);
  362. #ifndef ENABLE_ACLK
  363. UNUSED(wc);
  364. #else
  365. int rc;
  366. char *claim_id = get_agent_claimid();
  367. if (unlikely(!claim_id))
  368. return;
  369. RRDHOST *host = wc->host;
  370. if (unlikely(!host)) {
  371. rrd_rdlock();
  372. host = find_host_by_node_id(wc->node_id);
  373. rrd_unlock();
  374. if (unlikely(!host)) {
  375. log_access(
  376. "AC [%s (N/A)]: ACLK synchronization thread for %s is not yet linked to HOST.",
  377. wc->node_id,
  378. wc->host_guid);
  379. freez(claim_id);
  380. return;
  381. }
  382. }
  383. uint64_t first_sequence = 0;
  384. uint64_t last_sequence = 0;
  385. struct timeval first_timestamp;
  386. struct timeval last_timestamp;
  387. BUFFER *sql = buffer_create(1024);
  388. sqlite3_stmt *res = NULL;
  389. //TODO: make this better: include info from health log too
  390. buffer_sprintf(sql, "SELECT MIN(sequence_id), MIN(date_created), MAX(sequence_id), MAX(date_created) " \
  391. "FROM aclk_alert_%s;", wc->uuid_str);
  392. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  393. if (rc != SQLITE_OK) {
  394. error_report("Failed to prepare statement to get health log statistics from the database");
  395. buffer_free(sql);
  396. freez(claim_id);
  397. return;
  398. }
  399. first_timestamp.tv_sec = 0;
  400. first_timestamp.tv_usec = 0;
  401. last_timestamp.tv_sec = 0;
  402. last_timestamp.tv_usec = 0;
  403. while (sqlite3_step(res) == SQLITE_ROW) {
  404. first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
  405. if (sqlite3_column_bytes(res, 1) > 0) {
  406. first_timestamp.tv_sec = sqlite3_column_int64(res, 1);
  407. }
  408. last_sequence = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
  409. if (sqlite3_column_bytes(res, 3) > 0) {
  410. last_timestamp.tv_sec = sqlite3_column_int64(res, 3);
  411. }
  412. }
  413. struct alarm_log_entries log_entries;
  414. log_entries.first_seq_id = first_sequence;
  415. log_entries.first_when = first_timestamp;
  416. log_entries.last_seq_id = last_sequence;
  417. log_entries.last_when = last_timestamp;
  418. struct alarm_log_health alarm_log;
  419. alarm_log.claim_id = claim_id;
  420. alarm_log.node_id = wc->node_id;
  421. alarm_log.log_entries = log_entries;
  422. alarm_log.status = wc->alert_updates == 0 ? 2 : 1;
  423. alarm_log.enabled = (int)host->health_enabled;
  424. wc->alert_sequence_id = last_sequence;
  425. aclk_send_alarm_log_health(&alarm_log);
  426. log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %"PRIu64" to %"PRIu64, wc->node_id, wc->hostname ? wc->hostname : "N/A", first_sequence, last_sequence);
  427. rc = sqlite3_finalize(res);
  428. if (unlikely(rc != SQLITE_OK))
  429. error_report("Failed to reset statement to get health log statistics from the database, rc = %d", rc);
  430. freez(claim_id);
  431. buffer_free(sql);
  432. aclk_alert_reloaded = 1;
  433. #endif
  434. return;
  435. }
  436. void aclk_send_alarm_configuration(char *config_hash)
  437. {
  438. if (unlikely(!config_hash))
  439. return;
  440. struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) localhost->dbsync_worker;
  441. if (unlikely(!wc)) {
  442. return;
  443. }
  444. log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
  445. struct aclk_database_cmd cmd;
  446. memset(&cmd, 0, sizeof(cmd));
  447. cmd.opcode = ACLK_DATABASE_PUSH_ALERT_CONFIG;
  448. cmd.data_param = (void *) strdupz(config_hash);
  449. cmd.completion = NULL;
  450. aclk_database_enq_cmd(wc, &cmd);
  451. return;
  452. }
  453. #define SQL_SELECT_ALERT_CONFIG "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \
  454. "module, charts, families, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \
  455. "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \
  456. "p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;"
  457. int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  458. {
  459. UNUSED(wc);
  460. #ifndef ENABLE_ACLK
  461. UNUSED(cmd);
  462. #else
  463. int rc = 0;
  464. CHECK_SQLITE_CONNECTION(db_meta);
  465. sqlite3_stmt *res = NULL;
  466. char *config_hash = (char *) cmd.data_param;
  467. rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0);
  468. if (rc != SQLITE_OK) {
  469. error_report("Failed to prepare statement when trying to fetch an alarm hash configuration");
  470. return 1;
  471. }
  472. uuid_t hash_uuid;
  473. if (uuid_parse(config_hash, hash_uuid))
  474. return 1;
  475. rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC);
  476. if (unlikely(rc != SQLITE_OK))
  477. goto bind_fail;
  478. struct aclk_alarm_configuration alarm_config;
  479. struct provide_alarm_configuration p_alarm_config;
  480. p_alarm_config.cfg_hash = NULL;
  481. if (sqlite3_step(res) == SQLITE_ROW) {
  482. alarm_config.alarm = sqlite3_column_bytes(res, 0) > 0 ? strdupz((char *)sqlite3_column_text(res, 0)) : NULL;
  483. alarm_config.tmpl = sqlite3_column_bytes(res, 1) > 0 ? strdupz((char *)sqlite3_column_text(res, 1)) : NULL;
  484. alarm_config.on_chart = sqlite3_column_bytes(res, 2) > 0 ? strdupz((char *)sqlite3_column_text(res, 2)) : NULL;
  485. alarm_config.classification = sqlite3_column_bytes(res, 3) > 0 ? strdupz((char *)sqlite3_column_text(res, 3)) : NULL;
  486. alarm_config.type = sqlite3_column_bytes(res, 4) > 0 ? strdupz((char *)sqlite3_column_text(res, 4)) : NULL;
  487. alarm_config.component = sqlite3_column_bytes(res, 5) > 0 ? strdupz((char *)sqlite3_column_text(res, 5)) : NULL;
  488. alarm_config.os = sqlite3_column_bytes(res, 6) > 0 ? strdupz((char *)sqlite3_column_text(res, 6)) : NULL;
  489. alarm_config.hosts = sqlite3_column_bytes(res, 7) > 0 ? strdupz((char *)sqlite3_column_text(res, 7)) : NULL;
  490. alarm_config.plugin = sqlite3_column_bytes(res, 8) > 0 ? strdupz((char *)sqlite3_column_text(res, 8)) : NULL;
  491. alarm_config.module = sqlite3_column_bytes(res, 9) > 0 ? strdupz((char *)sqlite3_column_text(res, 9)) : NULL;
  492. alarm_config.charts = sqlite3_column_bytes(res, 10) > 0 ? strdupz((char *)sqlite3_column_text(res, 10)) : NULL;
  493. alarm_config.families = sqlite3_column_bytes(res, 11) > 0 ? strdupz((char *)sqlite3_column_text(res, 11)) : NULL;
  494. alarm_config.lookup = sqlite3_column_bytes(res, 12) > 0 ? strdupz((char *)sqlite3_column_text(res, 12)) : NULL;
  495. alarm_config.every = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
  496. alarm_config.units = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : NULL;
  497. alarm_config.green = sqlite3_column_bytes(res, 15) > 0 ? strdupz((char *)sqlite3_column_text(res, 15)) : NULL;
  498. alarm_config.red = sqlite3_column_bytes(res, 16) > 0 ? strdupz((char *)sqlite3_column_text(res, 16)) : NULL;
  499. alarm_config.calculation_expr = sqlite3_column_bytes(res, 17) > 0 ? strdupz((char *)sqlite3_column_text(res, 17)) : NULL;
  500. alarm_config.warning_expr = sqlite3_column_bytes(res, 18) > 0 ? strdupz((char *)sqlite3_column_text(res, 18)) : NULL;
  501. alarm_config.critical_expr = sqlite3_column_bytes(res, 19) > 0 ? strdupz((char *)sqlite3_column_text(res, 19)) : NULL;
  502. alarm_config.recipient = sqlite3_column_bytes(res, 20) > 0 ? strdupz((char *)sqlite3_column_text(res, 20)) : NULL;
  503. alarm_config.exec = sqlite3_column_bytes(res, 21) > 0 ? strdupz((char *)sqlite3_column_text(res, 21)) : NULL;
  504. alarm_config.delay = sqlite3_column_bytes(res, 22) > 0 ? strdupz((char *)sqlite3_column_text(res, 22)) : NULL;
  505. alarm_config.repeat = sqlite3_column_bytes(res, 23) > 0 ? strdupz((char *)sqlite3_column_text(res, 23)) : NULL;
  506. alarm_config.info = sqlite3_column_bytes(res, 24) > 0 ? strdupz((char *)sqlite3_column_text(res, 24)) : NULL;
  507. alarm_config.options = sqlite3_column_bytes(res, 25) > 0 ? strdupz((char *)sqlite3_column_text(res, 25)) : NULL;
  508. alarm_config.host_labels = sqlite3_column_bytes(res, 26) > 0 ? strdupz((char *)sqlite3_column_text(res, 26)) : NULL;
  509. alarm_config.p_db_lookup_dimensions = NULL;
  510. alarm_config.p_db_lookup_method = NULL;
  511. alarm_config.p_db_lookup_options = NULL;
  512. alarm_config.p_db_lookup_after = 0;
  513. alarm_config.p_db_lookup_before = 0;
  514. if (sqlite3_column_bytes(res, 30) > 0) {
  515. alarm_config.p_db_lookup_dimensions = sqlite3_column_bytes(res, 27) > 0 ? strdupz((char *)sqlite3_column_text(res, 27)) : NULL;
  516. alarm_config.p_db_lookup_method = sqlite3_column_bytes(res, 28) > 0 ? strdupz((char *)sqlite3_column_text(res, 28)) : NULL;
  517. BUFFER *tmp_buf = buffer_create(1024);
  518. buffer_data_options2string(tmp_buf, sqlite3_column_int(res, 29));
  519. alarm_config.p_db_lookup_options = strdupz((char *)buffer_tostring(tmp_buf));
  520. buffer_free(tmp_buf);
  521. alarm_config.p_db_lookup_after = sqlite3_column_int(res, 30);
  522. alarm_config.p_db_lookup_before = sqlite3_column_int(res, 31);
  523. }
  524. alarm_config.p_update_every = sqlite3_column_int(res, 32);
  525. p_alarm_config.cfg_hash = strdupz((char *) config_hash);
  526. p_alarm_config.cfg = alarm_config;
  527. }
  528. if (likely(p_alarm_config.cfg_hash)) {
  529. log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
  530. aclk_send_provide_alarm_cfg(&p_alarm_config);
  531. freez((char *) cmd.data_param);
  532. freez(p_alarm_config.cfg_hash);
  533. destroy_aclk_alarm_configuration(&alarm_config);
  534. }
  535. else
  536. log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
  537. bind_fail:
  538. rc = sqlite3_finalize(res);
  539. if (unlikely(rc != SQLITE_OK))
  540. error_report("Failed to reset statement when pushing alarm config hash, rc = %d", rc);
  541. return rc;
  542. #endif
  543. return 0;
  544. }
  545. // Start streaming alerts
  546. void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id)
  547. {
  548. if (unlikely(!node_id))
  549. return;
  550. //log_access("ACLK REQ [%s (N/A)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64".", node_id, start_seq_id, batch_id);
  551. uuid_t node_uuid;
  552. if (uuid_parse(node_id, node_uuid))
  553. return;
  554. struct aclk_database_worker_config *wc = NULL;
  555. rrd_rdlock();
  556. RRDHOST *host = find_host_by_node_id(node_id);
  557. rrd_unlock();
  558. if (likely(host)) {
  559. wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
  560. (struct aclk_database_worker_config *)host->dbsync_worker :
  561. (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
  562. if (unlikely(!host->health_enabled)) {
  563. log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
  564. return;
  565. }
  566. if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1))
  567. sql_queue_existing_alerts_to_aclk(host);
  568. } else
  569. wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
  570. if (likely(wc)) {
  571. log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? wc->host->hostname : "N/A", start_seq_id, batch_id);
  572. __sync_synchronize();
  573. wc->alerts_batch_id = batch_id;
  574. wc->alerts_start_seq_id = start_seq_id;
  575. wc->alert_updates = 1;
  576. wc->pause_alert_updates = 0;
  577. __sync_synchronize();
  578. }
  579. else
  580. log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
  581. return;
  582. }
  583. void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  584. {
  585. UNUSED(cmd);
  586. BUFFER *sql = buffer_create(1024);
  587. buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \
  588. "select unique_id alert_unique_id, unixepoch() from health_log_%s " \
  589. "where new_status = -2 and updated_by_id = 0 and unique_id not in " \
  590. "(select alert_unique_id from aclk_alert_%s) order by unique_id asc " \
  591. "on conflict (alert_unique_id) do nothing;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
  592. db_execute(buffer_tostring(sql));
  593. log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? wc->host->hostname : "N/A");
  594. buffer_free(sql);
  595. wc->pause_alert_updates = 0;
  596. return;
  597. }
  598. void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
  599. {
  600. if (unlikely(!host->dbsync_worker))
  601. return;
  602. if (!claimed())
  603. return;
  604. struct aclk_database_cmd cmd;
  605. memset(&cmd, 0, sizeof(cmd));
  606. cmd.opcode = ACLK_DATABASE_QUEUE_REMOVED_ALERTS;
  607. cmd.data = NULL;
  608. cmd.data_param = NULL;
  609. cmd.completion = NULL;
  610. aclk_database_enq_cmd((struct aclk_database_worker_config *) host->dbsync_worker, &cmd);
  611. }
  612. void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id)
  613. {
  614. UNUSED(claim_id);
  615. if (unlikely(!node_id))
  616. return;
  617. uuid_t node_uuid;
  618. if (uuid_parse(node_id, node_uuid))
  619. return;
  620. struct aclk_database_worker_config *wc = NULL;
  621. rrd_rdlock();
  622. RRDHOST *host = find_host_by_node_id(node_id);
  623. if (likely(host))
  624. wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  625. rrd_unlock();
  626. if (likely(wc)) {
  627. log_access(
  628. "IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64,
  629. wc->node_id,
  630. wc->host ? wc->host->hostname : "N/A",
  631. snapshot_id,
  632. sequence_id);
  633. if (wc->alerts_snapshot_id == snapshot_id)
  634. return;
  635. __sync_synchronize();
  636. wc->alerts_snapshot_id = snapshot_id;
  637. wc->alerts_ack_sequence_id = sequence_id;
  638. __sync_synchronize();
  639. struct aclk_database_cmd cmd;
  640. memset(&cmd, 0, sizeof(cmd));
  641. cmd.opcode = ACLK_DATABASE_PUSH_ALERT_SNAPSHOT;
  642. cmd.data_param = NULL;
  643. cmd.completion = NULL;
  644. aclk_database_enq_cmd(wc, &cmd);
  645. } else
  646. log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
  647. return;
  648. }
  649. void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id)
  650. {
  651. BUFFER *sql = buffer_create(1024);
  652. if (alerts_ack_sequence_id != 0) {
  653. buffer_sprintf(
  654. sql,
  655. "UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id <= %" PRIu64 "",
  656. uuid_str,
  657. alerts_ack_sequence_id);
  658. db_execute(buffer_tostring(sql));
  659. }
  660. buffer_free(sql);
  661. }
  662. #ifdef ENABLE_ACLK
  663. void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
  664. {
  665. char *edit_command = ae->source ? health_edit_command_from_source(ae->source) : strdupz("UNKNOWN=0=UNKNOWN");
  666. char config_hash_id[GUID_LEN + 1];
  667. uuid_unparse_lower(ae->config_hash_id, config_hash_id);
  668. alarm_log->chart = strdupz((char *)ae->chart);
  669. alarm_log->name = strdupz((char *)ae->name);
  670. alarm_log->family = strdupz((char *)ae->family);
  671. alarm_log->batch_id = 0;
  672. alarm_log->sequence_id = 0;
  673. alarm_log->when = (time_t)ae->when;
  674. alarm_log->config_hash = strdupz((char *)config_hash_id);
  675. alarm_log->utc_offset = host->utc_offset;
  676. alarm_log->timezone = strdupz((char *)host->abbrev_timezone);
  677. alarm_log->exec_path = ae->exec ? strdupz((char *)ae->exec) : strdupz((char *)host->health_default_exec);
  678. alarm_log->conf_source = ae->source ? strdupz((char *)ae->source) : strdupz((char *)"");
  679. alarm_log->command = strdupz((char *)edit_command);
  680. alarm_log->duration = (time_t)ae->duration;
  681. alarm_log->non_clear_duration = (time_t)ae->non_clear_duration;
  682. alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status);
  683. alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status);
  684. alarm_log->delay = (int)ae->delay;
  685. alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp;
  686. alarm_log->last_repeat = (time_t)ae->last_repeat;
  687. alarm_log->silenced =
  688. ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp((char *)ae->recipient, "silent", 6))) ?
  689. 1 :
  690. 0;
  691. alarm_log->value_string = strdupz(ae->new_value_string);
  692. alarm_log->old_value_string = strdupz(ae->old_value_string);
  693. alarm_log->value = (!isnan(ae->new_value)) ? (NETDATA_DOUBLE)ae->new_value : 0;
  694. alarm_log->old_value = (!isnan(ae->old_value)) ? (NETDATA_DOUBLE)ae->old_value : 0;
  695. alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  696. alarm_log->rendered_info = ae->info ? strdupz(ae->info) : strdupz((char *)"");
  697. alarm_log->chart_context = ae->chart_context ? strdupz(ae->chart_context) : strdupz((char *)"");
  698. freez(edit_command);
  699. }
  700. #endif
  701. #ifdef ENABLE_ACLK
  702. static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark)
  703. {
  704. ALARM_ENTRY *ae = host->health_log.alarms;
  705. while (ae) {
  706. if (ae->alarm_id == alarm_id && ae->unique_id > mark &&
  707. (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL))
  708. return 1;
  709. ae = ae->next;
  710. }
  711. return 0;
  712. }
  713. #endif
  714. #define ALARM_EVENTS_PER_CHUNK 10
  715. void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  716. {
  717. #ifndef ENABLE_ACLK
  718. UNUSED(wc);
  719. UNUSED(cmd);
  720. #else
  721. UNUSED(cmd);
  722. // we perhaps we don't need this for snapshots
  723. if (unlikely(!wc->alert_updates)) {
  724. log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
  725. return;
  726. }
  727. if (unlikely(!wc->host)) {
  728. error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid);
  729. return;
  730. }
  731. if (unlikely(!wc->alerts_snapshot_id))
  732. return;
  733. char *claim_id = get_agent_claimid();
  734. if (unlikely(!claim_id))
  735. return;
  736. log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id);
  737. aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id);
  738. RRDHOST *host = wc->host;
  739. uint32_t cnt = 0;
  740. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  741. ALARM_ENTRY *ae = host->health_log.alarms;
  742. for (; ae; ae = ae->next) {
  743. if (likely(ae->updated_by_id))
  744. continue;
  745. if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  746. continue;
  747. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  748. continue;
  749. cnt++;
  750. }
  751. if (cnt) {
  752. uint32_t chunk = 1, chunks = 0;
  753. chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
  754. ae = host->health_log.alarms;
  755. cnt = 0;
  756. struct alarm_snapshot alarm_snap;
  757. alarm_snap.node_id = wc->node_id;
  758. alarm_snap.claim_id = claim_id;
  759. alarm_snap.snapshot_id = wc->alerts_snapshot_id;
  760. alarm_snap.chunks = chunks;
  761. alarm_snap.chunk = chunk;
  762. alarm_snapshot_proto_ptr_t snapshot_proto = NULL;
  763. for (; ae; ae = ae->next) {
  764. if (likely(ae->updated_by_id))
  765. continue;
  766. if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  767. continue;
  768. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  769. continue;
  770. cnt++;
  771. struct alarm_log_entry alarm_log;
  772. alarm_log.node_id = wc->node_id;
  773. alarm_log.claim_id = claim_id;
  774. if (!snapshot_proto)
  775. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  776. health_alarm_entry2proto_nolock(&alarm_log, ae, host);
  777. add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
  778. if (cnt == ALARM_EVENTS_PER_CHUNK) {
  779. aclk_send_alarm_snapshot(snapshot_proto);
  780. cnt = 0;
  781. if (chunk < chunks) {
  782. chunk++;
  783. struct alarm_snapshot alarm_snap;
  784. alarm_snap.node_id = wc->node_id;
  785. alarm_snap.claim_id = claim_id;
  786. alarm_snap.snapshot_id = wc->alerts_snapshot_id;
  787. alarm_snap.chunks = chunks;
  788. alarm_snap.chunk = chunk;
  789. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  790. }
  791. }
  792. destroy_alarm_log_entry(&alarm_log);
  793. }
  794. if (cnt)
  795. aclk_send_alarm_snapshot(snapshot_proto);
  796. }
  797. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  798. wc->alerts_snapshot_id = 0;
  799. freez(claim_id);
  800. #endif
  801. return;
  802. }
  803. void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
  804. {
  805. if (!claimed())
  806. return;
  807. char uuid_str[GUID_LEN + 1];
  808. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  809. BUFFER *sql = buffer_create(1024);
  810. buffer_sprintf(sql,"delete from aclk_alert_%s where alert_unique_id not in "
  811. " (select unique_id from health_log_%s); ", uuid_str, uuid_str);
  812. char *err_msg = NULL;
  813. int rc = sqlite3_exec(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg);
  814. if (rc != SQLITE_OK) {
  815. error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s""",
  816. uuid_str, err_msg);
  817. sqlite3_free(err_msg);
  818. }
  819. buffer_free(sql);
  820. }
  821. int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
  822. {
  823. int rc;
  824. struct aclk_database_worker_config *wc = NULL;
  825. wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  826. if (!wc)
  827. return 1;
  828. proto_alert_status->alert_updates = wc->alert_updates;
  829. proto_alert_status->alerts_batch_id = wc->alerts_batch_id;
  830. BUFFER *sql = buffer_create(1024);
  831. sqlite3_stmt *res = NULL;
  832. buffer_sprintf(sql, "SELECT MIN(sequence_id), MAX(sequence_id), " \
  833. "(select MAX(sequence_id) from aclk_alert_%s where date_cloud_ack is not NULL), " \
  834. "(select MAX(sequence_id) from aclk_alert_%s where date_submitted is not NULL) " \
  835. "FROM aclk_alert_%s where date_submitted is null;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
  836. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  837. if (rc != SQLITE_OK) {
  838. error_report("Failed to prepare statement to get alert log status from the database.");
  839. buffer_free(sql);
  840. return 1;
  841. }
  842. while (sqlite3_step(res) == SQLITE_ROW) {
  843. proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
  844. proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
  845. proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
  846. proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0;
  847. }
  848. rc = sqlite3_finalize(res);
  849. if (unlikely(rc != SQLITE_OK))
  850. error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc);
  851. buffer_free(sql);
  852. return 0;
  853. }