sqlite_aclk_alert.c 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_functions.h"
  3. #include "sqlite_aclk_alert.h"
  4. #ifdef ENABLE_ACLK
  5. #include "../../aclk/aclk_alarm_api.h"
  6. #endif
  7. #define SQL_GET_ALERT_REMOVE_TIME "SELECT when_key FROM health_log_%s WHERE alarm_id = %u " \
  8. "AND unique_id > %u AND unique_id < %u " \
  9. "AND new_status = -2;"
  10. time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after_unique_id, char *uuid_str) {
  11. sqlite3_stmt *res = NULL;
  12. time_t when = 0;
  13. char sql[ACLK_SYNC_QUERY_SIZE];
  14. snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_GET_ALERT_REMOVE_TIME, uuid_str, alarm_id, after_unique_id, before_unique_id);
  15. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  16. if (rc != SQLITE_OK) {
  17. error_report("Failed to prepare statement when trying to find removed gap.");
  18. return 0;
  19. }
  20. rc = sqlite3_step_monitored(res);
  21. if (likely(rc == SQLITE_ROW)) {
  22. when = (time_t) sqlite3_column_int64(res, 0);
  23. }
  24. rc = sqlite3_finalize(res);
  25. if (unlikely(rc != SQLITE_OK))
  26. error_report("Failed to finalize statement when trying to find removed gap, rc = %d", rc);
  27. return when;
  28. }
  29. #define SQL_UPDATE_FILTERED_ALERT "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u where filtered_alert_unique_id = %u"
  30. void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) {
  31. char sql[ACLK_SYNC_QUERY_SIZE];
  32. snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str, ae->unique_id, unique_id);
  33. sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL);
  34. ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
  35. }
  36. #define SQL_SELECT_ALERT_BY_UNIQUE_ID "SELECT hl.unique_id FROM health_log_%s hl, alert_hash ah WHERE hl.unique_id = %u " \
  37. "AND hl.config_hash_id = ah.hash_id " \
  38. "AND ah.warn IS NULL AND ah.crit IS NULL;"
  39. static inline bool is_event_from_alert_variable_config(uint32_t unique_id, char *uuid_str) {
  40. sqlite3_stmt *res = NULL;
  41. int rc = 0;
  42. bool ret = false;
  43. char sql[ACLK_SYNC_QUERY_SIZE];
  44. snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_SELECT_ALERT_BY_UNIQUE_ID, uuid_str, unique_id);
  45. rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  46. if (rc != SQLITE_OK) {
  47. error_report("Failed to prepare statement when trying to check for alert variables.");
  48. return false;
  49. }
  50. rc = sqlite3_step_monitored(res);
  51. if (likely(rc == SQLITE_ROW)) {
  52. ret = true;
  53. }
  54. rc = sqlite3_finalize(res);
  55. if (unlikely(rc != SQLITE_OK))
  56. error_report("Failed to finalize statement when trying to check for alert variables, rc = %d", rc);
  57. return ret;
  58. }
  59. #define MAX_REMOVED_PERIOD 86400
  60. //decide if some events should be sent or not
  61. #define SQL_SELECT_ALERT_BY_ID "SELECT hl.new_status, hl.config_hash_id, hl.unique_id FROM health_log_%s hl, aclk_alert_%s aa " \
  62. "WHERE hl.unique_id = aa.filtered_alert_unique_id " \
  63. "AND hl.alarm_id = %u " \
  64. "ORDER BY alarm_event_id DESC LIMIT 1;"
  65. int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
  66. {
  67. sqlite3_stmt *res = NULL;
  68. char uuid_str[UUID_STR_LEN];
  69. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  70. int send = 1;
  71. if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) {
  72. return 0;
  73. }
  74. if (unlikely(uuid_is_null(ae->config_hash_id)))
  75. return 0;
  76. char sql[ACLK_SYNC_QUERY_SIZE];
  77. uuid_t config_hash_id;
  78. RRDCALC_STATUS status;
  79. uint32_t unique_id;
  80. //get the previous sent event of this alarm_id
  81. //base the search on the last filtered event
  82. snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_SELECT_ALERT_BY_ID, uuid_str, uuid_str, ae->alarm_id);
  83. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  84. if (rc != SQLITE_OK) {
  85. error_report("Failed to prepare statement when trying to filter alert events.");
  86. send = 1;
  87. return send;
  88. }
  89. rc = sqlite3_step_monitored(res);
  90. if (likely(rc == SQLITE_ROW)) {
  91. status = (RRDCALC_STATUS) sqlite3_column_int(res, 0);
  92. if (sqlite3_column_type(res, 1) != SQLITE_NULL)
  93. uuid_copy(config_hash_id, *((uuid_t *) sqlite3_column_blob(res, 1)));
  94. unique_id = (uint32_t) sqlite3_column_int64(res, 2);
  95. } else {
  96. send = 1;
  97. goto done;
  98. }
  99. if (ae->new_status != (RRDCALC_STATUS)status) {
  100. send = 1;
  101. goto done;
  102. }
  103. if (uuid_memcmp(&ae->config_hash_id, &config_hash_id)) {
  104. send = 1;
  105. goto done;
  106. }
  107. //same status, same config
  108. if (ae->new_status == RRDCALC_STATUS_CLEAR || ae->new_status == RRDCALC_STATUS_UNDEFINED) {
  109. send = 0;
  110. update_filtered(ae, unique_id, uuid_str);
  111. goto done;
  112. }
  113. //detect a long off period of the agent, TODO make global
  114. if (ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) {
  115. time_t when = removed_when(ae->alarm_id, ae->unique_id, unique_id, uuid_str);
  116. if (when && (when + (time_t)MAX_REMOVED_PERIOD) < ae->when) {
  117. send = 1;
  118. goto done;
  119. } else {
  120. send = 0;
  121. update_filtered(ae, unique_id, uuid_str);
  122. goto done;
  123. }
  124. }
  125. done:
  126. rc = sqlite3_finalize(res);
  127. if (unlikely(rc != SQLITE_OK))
  128. error_report("Failed to finalize statement when trying to filter alert events, rc = %d", rc);
  129. return send;
  130. }
  131. // will replace call to aclk_update_alarm in health/health_log.c
  132. // and handle both cases
  133. #define SQL_QUEUE_ALERT_TO_CLOUD "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
  134. "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) ON CONFLICT (alert_unique_id) do nothing;"
  135. int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
  136. {
  137. if(!service_running(SERVICE_ACLK))
  138. return 0;
  139. if (!claimed())
  140. return 0;
  141. if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) {
  142. return 0;
  143. }
  144. CHECK_SQLITE_CONNECTION(db_meta);
  145. if (!skip_filter) {
  146. if (!should_send_to_cloud(host, ae)) {
  147. return 0;
  148. }
  149. }
  150. char uuid_str[UUID_STR_LEN];
  151. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  152. if (is_event_from_alert_variable_config(ae->unique_id, uuid_str))
  153. return 0;
  154. sqlite3_stmt *res_alert = NULL;
  155. char sql[ACLK_SYNC_QUERY_SIZE];
  156. snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str);
  157. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res_alert, 0);
  158. if (unlikely(rc != SQLITE_OK)) {
  159. error_report("Failed to prepare statement to store alert event");
  160. return 1;
  161. }
  162. rc = sqlite3_bind_int(res_alert, 1, (int) ae->unique_id);
  163. if (unlikely(rc != SQLITE_OK))
  164. goto bind_fail;
  165. rc = execute_insert(res_alert);
  166. if (unlikely(rc != SQLITE_DONE)) {
  167. error_report("Failed to store alert event %u, rc = %d", ae->unique_id, rc);
  168. goto bind_fail;
  169. }
  170. ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
  171. rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  172. bind_fail:
  173. if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK))
  174. error_report("Failed to reset statement in store alert event, rc = %d", rc);
  175. return 0;
  176. }
  177. int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
  178. {
  179. #ifdef ENABLE_ACLK
  180. switch(status) {
  181. case RRDCALC_STATUS_REMOVED:
  182. return ALARM_STATUS_REMOVED;
  183. case RRDCALC_STATUS_UNDEFINED:
  184. return ALARM_STATUS_NOT_A_NUMBER;
  185. case RRDCALC_STATUS_CLEAR:
  186. return ALARM_STATUS_CLEAR;
  187. case RRDCALC_STATUS_WARNING:
  188. return ALARM_STATUS_WARNING;
  189. case RRDCALC_STATUS_CRITICAL:
  190. return ALARM_STATUS_CRITICAL;
  191. default:
  192. return ALARM_STATUS_UNKNOWN;
  193. }
  194. #else
  195. UNUSED(status);
  196. return 1;
  197. #endif
  198. }
  199. void aclk_push_alert_event(struct aclk_sync_host_config *wc)
  200. {
  201. #ifndef ENABLE_ACLK
  202. UNUSED(wc);
  203. #else
  204. int rc;
  205. if (unlikely(!wc->alert_updates)) {
  206. log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
  207. return;
  208. }
  209. char *claim_id = get_agent_claimid();
  210. if (unlikely(!claim_id))
  211. return;
  212. if (unlikely(!wc->host)) {
  213. freez(claim_id);
  214. return;
  215. }
  216. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  217. int limit = ACLK_MAX_ALERT_UPDATES;
  218. sqlite3_stmt *res = NULL;
  219. buffer_sprintf(sql, "select aa.sequence_id, hl.unique_id, hl.alarm_id, hl.config_hash_id, hl.updated_by_id, hl.when_key, " \
  220. " hl.duration, hl.non_clear_duration, hl.flags, hl.exec_run_timestamp, hl.delay_up_to_timestamp, hl.name, " \
  221. " hl.chart, hl.family, hl.exec, hl.recipient, hl.source, hl.units, hl.info, hl.exec_code, hl.new_status, " \
  222. " hl.old_status, hl.delay, hl.new_value, hl.old_value, hl.last_repeat, hl.chart_context " \
  223. " from health_log_%s hl, aclk_alert_%s aa " \
  224. " where hl.unique_id = aa.alert_unique_id and aa.date_submitted is null " \
  225. " order by aa.sequence_id asc limit %d;", wc->uuid_str, wc->uuid_str, limit);
  226. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  227. if (rc != SQLITE_OK) {
  228. // Try to create tables
  229. if (wc->host)
  230. sql_create_health_log_table(wc->host);
  231. BUFFER *sql_fix = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  232. buffer_sprintf(sql_fix, TABLE_ACLK_ALERT, wc->uuid_str);
  233. rc = db_execute(db_meta, buffer_tostring(sql_fix));
  234. if (unlikely(rc))
  235. error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host));
  236. else {
  237. buffer_flush(sql_fix);
  238. buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str);
  239. if (unlikely(db_execute(db_meta, buffer_tostring(sql_fix))))
  240. error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host));
  241. }
  242. buffer_free(sql_fix);
  243. // Try again
  244. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  245. if (rc != SQLITE_OK) {
  246. error_report("Failed to prepare statement when trying to send an alert update via ACLK");
  247. buffer_free(sql);
  248. freez(claim_id);
  249. return;
  250. }
  251. }
  252. char uuid_str[GUID_LEN + 1];
  253. uint64_t first_sequence_id = 0;
  254. uint64_t last_sequence_id = 0;
  255. static __thread uint64_t log_first_sequence_id = 0;
  256. static __thread uint64_t log_last_sequence_id = 0;
  257. while (sqlite3_step_monitored(res) == SQLITE_ROW) {
  258. struct alarm_log_entry alarm_log;
  259. char old_value_string[100 + 1];
  260. char new_value_string[100 + 1];
  261. alarm_log.node_id = wc->node_id;
  262. alarm_log.claim_id = claim_id;
  263. alarm_log.chart = strdupz((char *)sqlite3_column_text(res, 12));
  264. alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11));
  265. alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
  266. //alarm_log.batch_id = wc->alerts_batch_id;
  267. //alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  268. alarm_log.when = (time_t) sqlite3_column_int64(res, 5);
  269. uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), uuid_str);
  270. alarm_log.config_hash = strdupz((char *)uuid_str);
  271. alarm_log.utc_offset = wc->host->utc_offset;
  272. alarm_log.timezone = strdupz(rrdhost_abbrev_timezone(wc->host));
  273. alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) :
  274. strdupz((char *)string2str(wc->host->health.health_default_exec));
  275. alarm_log.conf_source = strdupz((char *)sqlite3_column_text(res, 16));
  276. char *edit_command = sqlite3_column_bytes(res, 16) > 0 ?
  277. health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) :
  278. strdupz("UNKNOWN=0=UNKNOWN");
  279. alarm_log.command = strdupz(edit_command);
  280. alarm_log.duration = (time_t) sqlite3_column_int64(res, 6);
  281. alarm_log.non_clear_duration = (time_t) sqlite3_column_int64(res, 7);
  282. alarm_log.status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 20));
  283. alarm_log.old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 21));
  284. alarm_log.delay = (int) sqlite3_column_int(res, 22);
  285. alarm_log.delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 10);
  286. alarm_log.last_repeat = (time_t) sqlite3_column_int64(res, 25);
  287. alarm_log.silenced = ((sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) ||
  288. (sqlite3_column_type(res, 15) != SQLITE_NULL &&
  289. !strncmp((char *)sqlite3_column_text(res, 15), "silent", 6))) ?
  290. 1 :
  291. 0;
  292. alarm_log.value_string =
  293. sqlite3_column_type(res, 23) == SQLITE_NULL ?
  294. strdupz((char *)"-") :
  295. strdupz((char *)format_value_and_unit(
  296. new_value_string, 100, sqlite3_column_double(res, 23), (char *)sqlite3_column_text(res, 17), -1));
  297. alarm_log.old_value_string =
  298. sqlite3_column_type(res, 24) == SQLITE_NULL ?
  299. strdupz((char *)"-") :
  300. strdupz((char *)format_value_and_unit(
  301. old_value_string, 100, sqlite3_column_double(res, 24), (char *)sqlite3_column_text(res, 17), -1));
  302. alarm_log.value = (NETDATA_DOUBLE) sqlite3_column_double(res, 23);
  303. alarm_log.old_value = (NETDATA_DOUBLE) sqlite3_column_double(res, 24);
  304. alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  305. alarm_log.rendered_info = sqlite3_column_type(res, 18) == SQLITE_NULL ?
  306. strdupz((char *)"") :
  307. strdupz((char *)sqlite3_column_text(res, 18));
  308. alarm_log.chart_context = sqlite3_column_type(res, 26) == SQLITE_NULL ?
  309. strdupz((char *)"") :
  310. strdupz((char *)sqlite3_column_text(res, 26));
  311. aclk_send_alarm_log_entry(&alarm_log);
  312. if (first_sequence_id == 0)
  313. first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  314. if (log_first_sequence_id == 0)
  315. log_first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  316. last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  317. log_last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  318. destroy_alarm_log_entry(&alarm_log);
  319. freez(edit_command);
  320. }
  321. if (first_sequence_id) {
  322. buffer_flush(sql);
  323. buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
  324. "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
  325. wc->uuid_str, first_sequence_id, last_sequence_id);
  326. if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
  327. error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host));
  328. // Mark to do one more check
  329. rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  330. } else {
  331. if (log_first_sequence_id)
  332. log_access(
  333. "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "",
  334. wc->node_id,
  335. wc->host ? rrdhost_hostname(wc->host) : "N/A",
  336. log_first_sequence_id,
  337. log_last_sequence_id);
  338. log_first_sequence_id = 0;
  339. log_last_sequence_id = 0;
  340. }
  341. rc = sqlite3_finalize(res);
  342. if (unlikely(rc != SQLITE_OK))
  343. error_report("Failed to finalize statement to send alert entries from the database, rc = %d", rc);
  344. freez(claim_id);
  345. buffer_free(sql);
  346. #endif
  347. }
  348. void aclk_push_alert_events_for_all_hosts(void)
  349. {
  350. RRDHOST *host;
  351. dfe_start_reentrant(rrdhost_root_index, host) {
  352. if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS))
  353. continue;
  354. internal_error(true, "ACLK SYNC: Scanning host %s", rrdhost_hostname(host));
  355. rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  356. struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
  357. if (likely(wc))
  358. aclk_push_alert_event(wc);
  359. }
  360. dfe_done(host);
  361. }
  362. void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
  363. {
  364. char uuid_str[GUID_LEN + 1];
  365. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  366. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  367. buffer_sprintf(sql,"delete from aclk_alert_%s; " \
  368. "insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
  369. "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \
  370. "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \
  371. "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str);
  372. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  373. if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
  374. error_report("Failed to queue existing ACLK alert events for host %s", rrdhost_hostname(host));
  375. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  376. buffer_free(sql);
  377. rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  378. }
  379. void aclk_send_alarm_configuration(char *config_hash)
  380. {
  381. if (unlikely(!config_hash))
  382. return;
  383. struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) localhost->aclk_sync_host_config;
  384. if (unlikely(!wc))
  385. return;
  386. log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
  387. aclk_push_alert_config(wc->node_id, config_hash);
  388. }
  389. #define SQL_SELECT_ALERT_CONFIG "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \
  390. "module, charts, families, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \
  391. "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \
  392. "p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;"
  393. int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused)
  394. {
  395. int rc = 0;
  396. #ifdef ENABLE_ACLK
  397. CHECK_SQLITE_CONNECTION(db_meta);
  398. sqlite3_stmt *res = NULL;
  399. struct aclk_sync_host_config *wc = NULL;
  400. RRDHOST *host = find_host_by_node_id(node_id);
  401. if (unlikely(!host)) {
  402. freez(config_hash);
  403. freez(node_id);
  404. return 1;
  405. }
  406. wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
  407. if (unlikely(!wc)) {
  408. freez(config_hash);
  409. freez(node_id);
  410. return 1;
  411. }
  412. rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0);
  413. if (rc != SQLITE_OK) {
  414. error_report("Failed to prepare statement when trying to fetch an alarm hash configuration");
  415. return 1;
  416. }
  417. uuid_t hash_uuid;
  418. if (uuid_parse(config_hash, hash_uuid))
  419. return 1;
  420. rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC);
  421. if (unlikely(rc != SQLITE_OK))
  422. goto bind_fail;
  423. struct aclk_alarm_configuration alarm_config;
  424. struct provide_alarm_configuration p_alarm_config;
  425. p_alarm_config.cfg_hash = NULL;
  426. if (sqlite3_step_monitored(res) == SQLITE_ROW) {
  427. alarm_config.alarm = sqlite3_column_bytes(res, 0) > 0 ? strdupz((char *)sqlite3_column_text(res, 0)) : NULL;
  428. alarm_config.tmpl = sqlite3_column_bytes(res, 1) > 0 ? strdupz((char *)sqlite3_column_text(res, 1)) : NULL;
  429. alarm_config.on_chart = sqlite3_column_bytes(res, 2) > 0 ? strdupz((char *)sqlite3_column_text(res, 2)) : NULL;
  430. alarm_config.classification = sqlite3_column_bytes(res, 3) > 0 ? strdupz((char *)sqlite3_column_text(res, 3)) : NULL;
  431. alarm_config.type = sqlite3_column_bytes(res, 4) > 0 ? strdupz((char *)sqlite3_column_text(res, 4)) : NULL;
  432. alarm_config.component = sqlite3_column_bytes(res, 5) > 0 ? strdupz((char *)sqlite3_column_text(res, 5)) : NULL;
  433. alarm_config.os = sqlite3_column_bytes(res, 6) > 0 ? strdupz((char *)sqlite3_column_text(res, 6)) : NULL;
  434. alarm_config.hosts = sqlite3_column_bytes(res, 7) > 0 ? strdupz((char *)sqlite3_column_text(res, 7)) : NULL;
  435. alarm_config.plugin = sqlite3_column_bytes(res, 8) > 0 ? strdupz((char *)sqlite3_column_text(res, 8)) : NULL;
  436. alarm_config.module = sqlite3_column_bytes(res, 9) > 0 ? strdupz((char *)sqlite3_column_text(res, 9)) : NULL;
  437. alarm_config.charts = sqlite3_column_bytes(res, 10) > 0 ? strdupz((char *)sqlite3_column_text(res, 10)) : NULL;
  438. alarm_config.families = sqlite3_column_bytes(res, 11) > 0 ? strdupz((char *)sqlite3_column_text(res, 11)) : NULL;
  439. alarm_config.lookup = sqlite3_column_bytes(res, 12) > 0 ? strdupz((char *)sqlite3_column_text(res, 12)) : NULL;
  440. alarm_config.every = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
  441. alarm_config.units = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : NULL;
  442. alarm_config.green = sqlite3_column_bytes(res, 15) > 0 ? strdupz((char *)sqlite3_column_text(res, 15)) : NULL;
  443. alarm_config.red = sqlite3_column_bytes(res, 16) > 0 ? strdupz((char *)sqlite3_column_text(res, 16)) : NULL;
  444. alarm_config.calculation_expr = sqlite3_column_bytes(res, 17) > 0 ? strdupz((char *)sqlite3_column_text(res, 17)) : NULL;
  445. alarm_config.warning_expr = sqlite3_column_bytes(res, 18) > 0 ? strdupz((char *)sqlite3_column_text(res, 18)) : NULL;
  446. alarm_config.critical_expr = sqlite3_column_bytes(res, 19) > 0 ? strdupz((char *)sqlite3_column_text(res, 19)) : NULL;
  447. alarm_config.recipient = sqlite3_column_bytes(res, 20) > 0 ? strdupz((char *)sqlite3_column_text(res, 20)) : NULL;
  448. alarm_config.exec = sqlite3_column_bytes(res, 21) > 0 ? strdupz((char *)sqlite3_column_text(res, 21)) : NULL;
  449. alarm_config.delay = sqlite3_column_bytes(res, 22) > 0 ? strdupz((char *)sqlite3_column_text(res, 22)) : NULL;
  450. alarm_config.repeat = sqlite3_column_bytes(res, 23) > 0 ? strdupz((char *)sqlite3_column_text(res, 23)) : NULL;
  451. alarm_config.info = sqlite3_column_bytes(res, 24) > 0 ? strdupz((char *)sqlite3_column_text(res, 24)) : NULL;
  452. alarm_config.options = sqlite3_column_bytes(res, 25) > 0 ? strdupz((char *)sqlite3_column_text(res, 25)) : NULL;
  453. alarm_config.host_labels = sqlite3_column_bytes(res, 26) > 0 ? strdupz((char *)sqlite3_column_text(res, 26)) : NULL;
  454. alarm_config.p_db_lookup_dimensions = NULL;
  455. alarm_config.p_db_lookup_method = NULL;
  456. alarm_config.p_db_lookup_options = NULL;
  457. alarm_config.p_db_lookup_after = 0;
  458. alarm_config.p_db_lookup_before = 0;
  459. if (sqlite3_column_bytes(res, 30) > 0) {
  460. alarm_config.p_db_lookup_dimensions = sqlite3_column_bytes(res, 27) > 0 ? strdupz((char *)sqlite3_column_text(res, 27)) : NULL;
  461. alarm_config.p_db_lookup_method = sqlite3_column_bytes(res, 28) > 0 ? strdupz((char *)sqlite3_column_text(res, 28)) : NULL;
  462. BUFFER *tmp_buf = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  463. buffer_data_options2string(tmp_buf, sqlite3_column_int(res, 29));
  464. alarm_config.p_db_lookup_options = strdupz((char *)buffer_tostring(tmp_buf));
  465. buffer_free(tmp_buf);
  466. alarm_config.p_db_lookup_after = sqlite3_column_int(res, 30);
  467. alarm_config.p_db_lookup_before = sqlite3_column_int(res, 31);
  468. }
  469. alarm_config.p_update_every = sqlite3_column_int(res, 32);
  470. p_alarm_config.cfg_hash = strdupz((char *) config_hash);
  471. p_alarm_config.cfg = alarm_config;
  472. }
  473. if (likely(p_alarm_config.cfg_hash)) {
  474. log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
  475. aclk_send_provide_alarm_cfg(&p_alarm_config);
  476. freez(p_alarm_config.cfg_hash);
  477. destroy_aclk_alarm_configuration(&alarm_config);
  478. }
  479. else
  480. log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
  481. bind_fail:
  482. rc = sqlite3_finalize(res);
  483. if (unlikely(rc != SQLITE_OK))
  484. error_report("Failed to reset statement when pushing alarm config hash, rc = %d", rc);
  485. freez(config_hash);
  486. freez(node_id);
  487. #endif
  488. return rc;
  489. }
  490. // Start streaming alerts
  491. void aclk_start_alert_streaming(char *node_id, bool resets)
  492. {
  493. if (unlikely(!node_id))
  494. return;
  495. uuid_t node_uuid;
  496. if (uuid_parse(node_id, node_uuid))
  497. return;
  498. RRDHOST *host = find_host_by_node_id(node_id);
  499. if (unlikely(!host))
  500. return;
  501. struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
  502. if (unlikely(!wc))
  503. return;
  504. if (unlikely(!host->health.health_enabled)) {
  505. log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
  506. return;
  507. }
  508. if (resets) {
  509. log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
  510. sql_queue_existing_alerts_to_aclk(host);
  511. } else
  512. log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
  513. wc->alert_updates = 1;
  514. wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS;
  515. }
  516. #define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
  517. "SELECT unique_id alert_unique_id, UNIXEPOCH(), unique_id alert_unique_id FROM health_log_%s " \
  518. "WHERE new_status = -2 AND updated_by_id = 0 AND unique_id NOT IN " \
  519. "(SELECT alert_unique_id FROM aclk_alert_%s) " \
  520. "AND config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \
  521. "ORDER BY unique_id ASC " \
  522. "ON CONFLICT (alert_unique_id) DO NOTHING;"
  523. void sql_process_queue_removed_alerts_to_aclk(char *node_id)
  524. {
  525. struct aclk_sync_host_config *wc;
  526. RRDHOST *host = find_host_by_node_id(node_id);
  527. freez(node_id);
  528. if (unlikely(!host || !(wc = host->aclk_sync_host_config)))
  529. return;
  530. char sql[ACLK_SYNC_QUERY_SIZE * 2];
  531. snprintfz(sql,ACLK_SYNC_QUERY_SIZE * 2 - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str, wc->uuid_str);
  532. if (unlikely(db_execute(db_meta, sql))) {
  533. log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS FAILED", wc->node_id, rrdhost_hostname(wc->host));
  534. error_report("Failed to queue ACLK alert removed entries for host %s", rrdhost_hostname(wc->host));
  535. }
  536. else
  537. log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host));
  538. rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  539. wc->alert_queue_removed = 0;
  540. }
  541. void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
  542. {
  543. if (unlikely(!host->aclk_sync_host_config))
  544. return;
  545. if (!claimed() || !host->node_id)
  546. return;
  547. char node_id[UUID_STR_LEN];
  548. uuid_unparse_lower(*host->node_id, node_id);
  549. aclk_push_node_removed_alerts(node_id);
  550. }
  551. void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid)
  552. {
  553. uuid_t node_uuid;
  554. if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
  555. return;
  556. RRDHOST *host = find_host_by_node_id(node_id);
  557. if (unlikely(!host)) {
  558. log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
  559. return;
  560. }
  561. struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
  562. if (unlikely(!wc)) {
  563. log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
  564. return;
  565. }
  566. log_access(
  567. "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s",
  568. node_id,
  569. wc->host ? rrdhost_hostname(wc->host) : "N/A",
  570. snapshot_uuid);
  571. if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid))
  572. return;
  573. __sync_synchronize();
  574. wc->alerts_snapshot_uuid = strdupz(snapshot_uuid);
  575. __sync_synchronize();
  576. aclk_push_node_alert_snapshot(node_id);
  577. }
  578. #ifdef ENABLE_ACLK
  579. void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
  580. {
  581. char *edit_command = ae->source ? health_edit_command_from_source(ae_source(ae)) : strdupz("UNKNOWN=0=UNKNOWN");
  582. char config_hash_id[GUID_LEN + 1];
  583. uuid_unparse_lower(ae->config_hash_id, config_hash_id);
  584. alarm_log->chart = strdupz(ae_chart_name(ae));
  585. alarm_log->name = strdupz(ae_name(ae));
  586. alarm_log->family = strdupz(ae_family(ae));
  587. alarm_log->batch_id = 0;
  588. alarm_log->sequence_id = 0;
  589. alarm_log->when = (time_t)ae->when;
  590. alarm_log->config_hash = strdupz((char *)config_hash_id);
  591. alarm_log->utc_offset = host->utc_offset;
  592. alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host));
  593. alarm_log->exec_path = ae->exec ? strdupz(ae_exec(ae)) : strdupz((char *)string2str(host->health.health_default_exec));
  594. alarm_log->conf_source = ae->source ? strdupz(ae_source(ae)) : strdupz((char *)"");
  595. alarm_log->command = strdupz((char *)edit_command);
  596. alarm_log->duration = (time_t)ae->duration;
  597. alarm_log->non_clear_duration = (time_t)ae->non_clear_duration;
  598. alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status);
  599. alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status);
  600. alarm_log->delay = (int)ae->delay;
  601. alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp;
  602. alarm_log->last_repeat = (time_t)ae->last_repeat;
  603. alarm_log->silenced =
  604. ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp(ae_recipient(ae), "silent", 6))) ?
  605. 1 :
  606. 0;
  607. alarm_log->value_string = strdupz(ae_new_value_string(ae));
  608. alarm_log->old_value_string = strdupz(ae_old_value_string(ae));
  609. alarm_log->value = (!isnan(ae->new_value)) ? (NETDATA_DOUBLE)ae->new_value : 0;
  610. alarm_log->old_value = (!isnan(ae->old_value)) ? (NETDATA_DOUBLE)ae->old_value : 0;
  611. alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  612. alarm_log->rendered_info = strdupz(ae_info(ae));
  613. alarm_log->chart_context = strdupz(ae_chart_context(ae));
  614. freez(edit_command);
  615. }
  616. #endif
  617. #ifdef ENABLE_ACLK
  618. static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark)
  619. {
  620. ALARM_ENTRY *ae = host->health_log.alarms;
  621. while (ae) {
  622. if (ae->alarm_id == alarm_id && ae->unique_id >mark &&
  623. (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL))
  624. return 1;
  625. ae = ae->next;
  626. }
  627. return 0;
  628. }
  629. #endif
  630. #define ALARM_EVENTS_PER_CHUNK 10
  631. void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
  632. {
  633. #ifdef ENABLE_ACLK
  634. RRDHOST *host = find_host_by_node_id(node_id);
  635. if (unlikely(!host)) {
  636. log_access("AC [%s (N/A)]: Node id not found", node_id);
  637. freez(node_id);
  638. return;
  639. }
  640. freez(node_id);
  641. struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
  642. // we perhaps we don't need this for snapshots
  643. if (unlikely(!wc->alert_updates)) {
  644. log_access(
  645. "ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.",
  646. wc->node_id,
  647. wc->host ? rrdhost_hostname(wc->host) : "N/A");
  648. return;
  649. }
  650. if (unlikely(!wc->alerts_snapshot_uuid))
  651. return;
  652. char *claim_id = get_agent_claimid();
  653. if (unlikely(!claim_id))
  654. return;
  655. log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid);
  656. uint32_t cnt = 0;
  657. char uuid_str[UUID_STR_LEN];
  658. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  659. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  660. ALARM_ENTRY *ae = host->health_log.alarms;
  661. for (; ae; ae = ae->next) {
  662. if (likely(ae->updated_by_id))
  663. continue;
  664. if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  665. continue;
  666. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  667. continue;
  668. if (is_event_from_alert_variable_config(ae->unique_id, uuid_str))
  669. continue;
  670. cnt++;
  671. }
  672. if (cnt) {
  673. uint32_t chunk = 1, chunks = 0;
  674. chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
  675. ae = host->health_log.alarms;
  676. cnt = 0;
  677. struct alarm_snapshot alarm_snap;
  678. alarm_snap.node_id = wc->node_id;
  679. alarm_snap.claim_id = claim_id;
  680. alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
  681. alarm_snap.chunks = chunks;
  682. alarm_snap.chunk = chunk;
  683. alarm_snapshot_proto_ptr_t snapshot_proto = NULL;
  684. for (; ae; ae = ae->next) {
  685. if (likely(ae->updated_by_id))
  686. continue;
  687. if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  688. continue;
  689. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  690. continue;
  691. if (is_event_from_alert_variable_config(ae->unique_id, uuid_str))
  692. continue;
  693. cnt++;
  694. struct alarm_log_entry alarm_log;
  695. alarm_log.node_id = wc->node_id;
  696. alarm_log.claim_id = claim_id;
  697. if (!snapshot_proto)
  698. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  699. health_alarm_entry2proto_nolock(&alarm_log, ae, host);
  700. add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
  701. if (cnt == ALARM_EVENTS_PER_CHUNK) {
  702. aclk_send_alarm_snapshot(snapshot_proto);
  703. cnt = 0;
  704. if (chunk < chunks) {
  705. chunk++;
  706. struct alarm_snapshot alarm_snap;
  707. alarm_snap.node_id = wc->node_id;
  708. alarm_snap.claim_id = claim_id;
  709. alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
  710. alarm_snap.chunks = chunks;
  711. alarm_snap.chunk = chunk;
  712. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  713. }
  714. }
  715. destroy_alarm_log_entry(&alarm_log);
  716. }
  717. if (cnt)
  718. aclk_send_alarm_snapshot(snapshot_proto);
  719. }
  720. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  721. wc->alerts_snapshot_uuid = NULL;
  722. freez(claim_id);
  723. #endif
  724. }
  725. #define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE filtered_alert_unique_id NOT IN (SELECT unique_id FROM health_log_%s);"
  726. void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
  727. {
  728. if (!claimed())
  729. return;
  730. char uuid_str[UUID_STR_LEN];
  731. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  732. char sql[512];
  733. snprintfz(sql,511,SQL_DELETE_ALERT_ENTRIES, uuid_str, uuid_str);
  734. char *err_msg = NULL;
  735. int rc = sqlite3_exec_monitored(db_meta, sql, NULL, NULL, &err_msg);
  736. if (rc != SQLITE_OK) {
  737. error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s\"", uuid_str, err_msg);
  738. sqlite3_free(err_msg);
  739. }
  740. }
  741. #define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \
  742. "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \
  743. "FROM aclk_alert_%s WHERE date_submitted IS NULL;"
  744. int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
  745. {
  746. int rc;
  747. struct aclk_sync_host_config *wc = NULL;
  748. wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
  749. if (!wc)
  750. return 1;
  751. proto_alert_status->alert_updates = wc->alert_updates;
  752. char sql[ACLK_SYNC_QUERY_SIZE];
  753. sqlite3_stmt *res = NULL;
  754. snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str);
  755. rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  756. if (rc != SQLITE_OK) {
  757. error_report("Failed to prepare statement to get alert log status from the database.");
  758. return 1;
  759. }
  760. while (sqlite3_step_monitored(res) == SQLITE_ROW) {
  761. proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
  762. proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
  763. proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
  764. }
  765. rc = sqlite3_finalize(res);
  766. if (unlikely(rc != SQLITE_OK))
  767. error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc);
  768. return 0;
  769. }
  770. void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused)
  771. {
  772. if (unlikely(!node_id))
  773. return;
  774. struct aclk_sync_host_config *wc = NULL;
  775. RRDHOST *host = find_host_by_node_id(node_id);
  776. if (unlikely(!host))
  777. return;
  778. wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
  779. if (unlikely(!wc)) {
  780. log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id);
  781. return;
  782. }
  783. log_access("ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host));
  784. wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS;
  785. }
  786. typedef struct active_alerts {
  787. char *name;
  788. char *chart;
  789. RRDCALC_STATUS status;
  790. } active_alerts_t;
  791. static inline int compare_active_alerts(const void * a, const void * b) {
  792. active_alerts_t *active_alerts_a = (active_alerts_t *)a;
  793. active_alerts_t *active_alerts_b = (active_alerts_t *)b;
  794. if( !(strcmp(active_alerts_a->name, active_alerts_b->name)) )
  795. {
  796. return strcmp(active_alerts_a->chart, active_alerts_b->chart);
  797. }
  798. else
  799. return strcmp(active_alerts_a->name, active_alerts_b->name);
  800. }
  801. void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused)
  802. {
  803. #ifdef ENABLE_ACLK
  804. struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
  805. if (unlikely(!wc)) {
  806. log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host));
  807. return;
  808. }
  809. //TODO: make sure all pending events are sent.
  810. if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) {
  811. //postpone checkpoint send
  812. wc->alert_checkpoint_req++;
  813. log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host));
  814. return;
  815. }
  816. //TODO: lock rc here, or make sure it's called when health decides
  817. //count them
  818. RRDCALC *rc;
  819. uint32_t cnt = 0;
  820. size_t len = 0;
  821. active_alerts_t *active_alerts = NULL;
  822. foreach_rrdcalc_in_rrdhost_read(host, rc) {
  823. if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec))
  824. continue;
  825. if (rc->status == RRDCALC_STATUS_WARNING ||
  826. rc->status == RRDCALC_STATUS_CRITICAL) {
  827. cnt++;
  828. }
  829. }
  830. foreach_rrdcalc_in_rrdhost_done(rc);
  831. if (cnt) {
  832. active_alerts = callocz(cnt, sizeof(active_alerts_t));
  833. cnt = 0;
  834. foreach_rrdcalc_in_rrdhost_read(host, rc) {
  835. if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec))
  836. continue;
  837. if (rc->status == RRDCALC_STATUS_WARNING ||
  838. rc->status == RRDCALC_STATUS_CRITICAL) {
  839. active_alerts[cnt].name = (char *)rrdcalc_name(rc);
  840. len += string_strlen(rc->name);
  841. active_alerts[cnt].chart = (char *)rrdcalc_chart_name(rc);
  842. len += string_strlen(rc->chart);
  843. active_alerts[cnt].status = rc->status;
  844. len++;
  845. cnt++;
  846. }
  847. }
  848. foreach_rrdcalc_in_rrdhost_done(rc);
  849. }
  850. BUFFER *alarms_to_hash;
  851. if (cnt) {
  852. qsort (active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts);
  853. alarms_to_hash = buffer_create(len, NULL);
  854. for (uint32_t i=0;i<cnt;i++) {
  855. buffer_strcat(alarms_to_hash, active_alerts[i].name);
  856. buffer_strcat(alarms_to_hash, active_alerts[i].chart);
  857. if (active_alerts[i].status == RRDCALC_STATUS_WARNING)
  858. buffer_strcat(alarms_to_hash, "W");
  859. else if (active_alerts[i].status == RRDCALC_STATUS_CRITICAL)
  860. buffer_strcat(alarms_to_hash, "C");
  861. }
  862. } else {
  863. alarms_to_hash = buffer_create(1, NULL);
  864. buffer_strcat(alarms_to_hash, "");
  865. len = 0;
  866. }
  867. freez(active_alerts);
  868. char hash[SHA256_DIGEST_LENGTH + 1];
  869. if (hash256_string((const unsigned char *)buffer_tostring(alarms_to_hash), len, hash)) {
  870. hash[SHA256_DIGEST_LENGTH] = 0;
  871. struct alarm_checkpoint alarm_checkpoint;
  872. char *claim_id = get_agent_claimid();
  873. alarm_checkpoint.claim_id = claim_id;
  874. alarm_checkpoint.node_id = wc->node_id;
  875. alarm_checkpoint.checksum = (char *)hash;
  876. aclk_send_provide_alarm_checkpoint(&alarm_checkpoint);
  877. freez(claim_id);
  878. log_access("ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host));
  879. } else {
  880. log_access("ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host));
  881. }
  882. wc->alert_checkpoint_req = 0;
  883. buffer_free(alarms_to_hash);
  884. #endif
  885. }