sqlite_aclk_alert.c 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_functions.h"
  3. #include "sqlite_aclk_alert.h"
  4. #ifdef ENABLE_ACLK
  5. #include "../../aclk/aclk_alarm_api.h"
  6. #endif
  7. #define SQL_UPDATE_FILTERED_ALERT "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u, date_created = unixepoch() where filtered_alert_unique_id = %u"
  8. void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) {
  9. char sql[ACLK_SYNC_QUERY_SIZE];
  10. snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str, ae->unique_id, unique_id);
  11. sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL);
  12. ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
  13. }
  14. #define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID "SELECT hld.unique_id FROM health_log hl, alert_hash ah, health_log_detail hld WHERE hld.unique_id = %u " \
  15. "AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id AND host_id = @host_id " \
  16. "AND ah.warn IS NULL AND ah.crit IS NULL;"
  17. static inline bool is_event_from_alert_variable_config(uint32_t unique_id, uuid_t *host_id) {
  18. sqlite3_stmt *res = NULL;
  19. int rc = 0;
  20. bool ret = false;
  21. char sql[ACLK_SYNC_QUERY_SIZE];
  22. snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, unique_id);
  23. rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  24. if (rc != SQLITE_OK) {
  25. error_report("Failed to prepare statement when trying to check for alert variables.");
  26. return false;
  27. }
  28. rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
  29. if (unlikely(rc != SQLITE_OK)) {
  30. error_report("Failed to bind host_id for checking alert variable.");
  31. sqlite3_finalize(res);
  32. return false;
  33. }
  34. rc = sqlite3_step_monitored(res);
  35. if (likely(rc == SQLITE_ROW)) {
  36. ret = true;
  37. }
  38. rc = sqlite3_finalize(res);
  39. if (unlikely(rc != SQLITE_OK))
  40. error_report("Failed to finalize statement when trying to check for alert variables, rc = %d", rc);
  41. return ret;
  42. }
  43. #define MAX_REMOVED_PERIOD 604800 //a week
  44. //decide if some events should be sent or not
  45. #define SQL_SELECT_ALERT_BY_ID "SELECT hld.new_status, hl.config_hash_id, hld.unique_id FROM health_log hl, aclk_alert_%s aa, health_log_detail hld " \
  46. "WHERE hld.unique_id = aa.filtered_alert_unique_id " \
  47. "AND hld.alarm_id = %u AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id " \
  48. "ORDER BY hld.alarm_event_id DESC LIMIT 1;"
  49. int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
  50. {
  51. sqlite3_stmt *res = NULL;
  52. char uuid_str[UUID_STR_LEN];
  53. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  54. int send = 1;
  55. if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) {
  56. return 0;
  57. }
  58. if (unlikely(uuid_is_null(ae->config_hash_id)))
  59. return 0;
  60. char sql[ACLK_SYNC_QUERY_SIZE];
  61. uuid_t config_hash_id;
  62. RRDCALC_STATUS status;
  63. uint32_t unique_id;
  64. //get the previous sent event of this alarm_id
  65. //base the search on the last filtered event
  66. snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, SQL_SELECT_ALERT_BY_ID, uuid_str, ae->alarm_id);
  67. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  68. if (rc != SQLITE_OK) {
  69. error_report("Failed to prepare statement when trying to filter alert events.");
  70. send = 1;
  71. return send;
  72. }
  73. rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
  74. if (unlikely(rc != SQLITE_OK)) {
  75. error_report("Failed to bind host_id for checking alert variable.");
  76. sqlite3_finalize(res);
  77. return false;
  78. }
  79. rc = sqlite3_step_monitored(res);
  80. if (likely(rc == SQLITE_ROW)) {
  81. status = (RRDCALC_STATUS) sqlite3_column_int(res, 0);
  82. if (sqlite3_column_type(res, 1) != SQLITE_NULL)
  83. uuid_copy(config_hash_id, *((uuid_t *) sqlite3_column_blob(res, 1)));
  84. unique_id = (uint32_t) sqlite3_column_int64(res, 2);
  85. } else {
  86. send = 1;
  87. goto done;
  88. }
  89. if (ae->new_status != (RRDCALC_STATUS)status) {
  90. send = 1;
  91. goto done;
  92. }
  93. if (uuid_memcmp(&ae->config_hash_id, &config_hash_id)) {
  94. send = 1;
  95. goto done;
  96. }
  97. //same status, same config
  98. send = 0;
  99. update_filtered(ae, unique_id, uuid_str);
  100. done:
  101. rc = sqlite3_finalize(res);
  102. if (unlikely(rc != SQLITE_OK))
  103. error_report("Failed to finalize statement when trying to filter alert events, rc = %d", rc);
  104. return send;
  105. }
  106. #define SQL_QUEUE_ALERT_TO_CLOUD "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
  107. "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) ON CONFLICT (alert_unique_id) do nothing;"
  108. int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
  109. {
  110. if(!service_running(SERVICE_ACLK))
  111. return 0;
  112. if (!claimed())
  113. return 0;
  114. if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) {
  115. return 0;
  116. }
  117. CHECK_SQLITE_CONNECTION(db_meta);
  118. if (!skip_filter) {
  119. if (!should_send_to_cloud(host, ae)) {
  120. return 0;
  121. }
  122. }
  123. char uuid_str[UUID_STR_LEN];
  124. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  125. if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid))
  126. return 0;
  127. sqlite3_stmt *res_alert = NULL;
  128. char sql[ACLK_SYNC_QUERY_SIZE];
  129. snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str);
  130. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res_alert, 0);
  131. if (unlikely(rc != SQLITE_OK)) {
  132. error_report("Failed to prepare statement to store alert event");
  133. return 1;
  134. }
  135. rc = sqlite3_bind_int(res_alert, 1, (int) ae->unique_id);
  136. if (unlikely(rc != SQLITE_OK))
  137. goto bind_fail;
  138. rc = execute_insert(res_alert);
  139. if (unlikely(rc != SQLITE_DONE)) {
  140. error_report("Failed to store alert event %u, rc = %d", ae->unique_id, rc);
  141. goto bind_fail;
  142. }
  143. ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
  144. rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  145. bind_fail:
  146. if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK))
  147. error_report("Failed to reset statement in store alert event, rc = %d", rc);
  148. return 0;
  149. }
  150. int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
  151. {
  152. #ifdef ENABLE_ACLK
  153. switch(status) {
  154. case RRDCALC_STATUS_REMOVED:
  155. return ALARM_STATUS_REMOVED;
  156. case RRDCALC_STATUS_UNDEFINED:
  157. return ALARM_STATUS_NOT_A_NUMBER;
  158. case RRDCALC_STATUS_CLEAR:
  159. return ALARM_STATUS_CLEAR;
  160. case RRDCALC_STATUS_WARNING:
  161. return ALARM_STATUS_WARNING;
  162. case RRDCALC_STATUS_CRITICAL:
  163. return ALARM_STATUS_CRITICAL;
  164. default:
  165. return ALARM_STATUS_UNKNOWN;
  166. }
  167. #else
  168. UNUSED(status);
  169. return 1;
  170. #endif
  171. }
  172. static inline char *sqlite3_uuid_unparse_strdupz(sqlite3_stmt *res, int iCol) {
  173. char uuid_str[UUID_STR_LEN];
  174. if(sqlite3_column_type(res, iCol) == SQLITE_NULL)
  175. uuid_str[0] = '\0';
  176. else
  177. uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, iCol)), uuid_str);
  178. return strdupz(uuid_str);
  179. }
  180. static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) {
  181. char *ret;
  182. if(sqlite3_column_type(res, iCol) == SQLITE_NULL)
  183. ret = "";
  184. else
  185. ret = (char *)sqlite3_column_text(res, iCol);
  186. return strdupz(ret);
  187. }
  188. void aclk_push_alert_event(struct aclk_sync_host_config *wc)
  189. {
  190. #ifndef ENABLE_ACLK
  191. UNUSED(wc);
  192. #else
  193. int rc;
  194. if (unlikely(!wc->alert_updates)) {
  195. netdata_log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
  196. return;
  197. }
  198. char *claim_id = get_agent_claimid();
  199. if (unlikely(!claim_id))
  200. return;
  201. if (unlikely(!wc->host)) {
  202. freez(claim_id);
  203. return;
  204. }
  205. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  206. int limit = ACLK_MAX_ALERT_UPDATES;
  207. sqlite3_stmt *res = NULL;
  208. buffer_sprintf(sql, "select aa.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, " \
  209. " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, " \
  210. " hl.chart, hl.family, hl.exec, hl.recipient, ha.source, hl.units, hld.info, hld.exec_code, hld.new_status, " \
  211. " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, hld.alarm_event_id, hl.chart_name " \
  212. " from health_log hl, aclk_alert_%s aa, alert_hash ha, health_log_detail hld " \
  213. " where hld.unique_id = aa.alert_unique_id and hl.config_hash_id = ha.hash_id and aa.date_submitted is null " \
  214. " and hl.host_id = @host_id and hl.health_log_id = hld.health_log_id " \
  215. " order by aa.sequence_id asc limit %d;", wc->uuid_str, limit);
  216. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  217. if (rc != SQLITE_OK) {
  218. BUFFER *sql_fix = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  219. buffer_sprintf(sql_fix, TABLE_ACLK_ALERT, wc->uuid_str);
  220. rc = db_execute(db_meta, buffer_tostring(sql_fix));
  221. if (unlikely(rc))
  222. error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host));
  223. else {
  224. buffer_flush(sql_fix);
  225. buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str);
  226. if (unlikely(db_execute(db_meta, buffer_tostring(sql_fix))))
  227. error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host));
  228. }
  229. buffer_free(sql_fix);
  230. // Try again
  231. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  232. if (rc != SQLITE_OK) {
  233. error_report("Failed to prepare statement when trying to send an alert update via ACLK");
  234. buffer_free(sql);
  235. freez(claim_id);
  236. return;
  237. }
  238. }
  239. rc = sqlite3_bind_blob(res, 1, &wc->host->host_uuid, sizeof(wc->host->host_uuid), SQLITE_STATIC);
  240. if (unlikely(rc != SQLITE_OK)) {
  241. error_report("Failed to bind host_id for pushing alert event.");
  242. sqlite3_finalize(res);
  243. buffer_free(sql);
  244. freez(claim_id);
  245. return;
  246. }
  247. uint64_t first_sequence_id = 0;
  248. uint64_t last_sequence_id = 0;
  249. while (sqlite3_step_monitored(res) == SQLITE_ROW) {
  250. struct alarm_log_entry alarm_log;
  251. char old_value_string[100 + 1];
  252. char new_value_string[100 + 1];
  253. alarm_log.node_id = wc->node_id;
  254. alarm_log.claim_id = claim_id;
  255. alarm_log.chart = strdupz((char *)sqlite3_column_text(res, 12));
  256. alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11));
  257. alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
  258. //alarm_log.batch_id = wc->alerts_batch_id;
  259. //alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  260. alarm_log.when = (time_t) sqlite3_column_int64(res, 5);
  261. alarm_log.config_hash = sqlite3_uuid_unparse_strdupz(res, 3);
  262. alarm_log.utc_offset = wc->host->utc_offset;
  263. alarm_log.timezone = strdupz(rrdhost_abbrev_timezone(wc->host));
  264. alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) :
  265. strdupz((char *)string2str(wc->host->health.health_default_exec));
  266. alarm_log.conf_source = sqlite3_column_bytes(res, 16) > 0 ? strdupz((char *)sqlite3_column_text(res, 16)) : strdupz("");
  267. char *edit_command = sqlite3_column_bytes(res, 16) > 0 ?
  268. health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) :
  269. strdupz("UNKNOWN=0=UNKNOWN");
  270. alarm_log.command = strdupz(edit_command);
  271. alarm_log.duration = (time_t) sqlite3_column_int64(res, 6);
  272. alarm_log.non_clear_duration = (time_t) sqlite3_column_int64(res, 7);
  273. alarm_log.status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 20));
  274. alarm_log.old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 21));
  275. alarm_log.delay = (int) sqlite3_column_int(res, 22);
  276. alarm_log.delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 10);
  277. alarm_log.last_repeat = (time_t) sqlite3_column_int64(res, 25);
  278. alarm_log.silenced = ((sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) ||
  279. (sqlite3_column_type(res, 15) != SQLITE_NULL &&
  280. !strncmp((char *)sqlite3_column_text(res, 15), "silent", 6))) ?
  281. 1 :
  282. 0;
  283. alarm_log.value_string =
  284. sqlite3_column_type(res, 23) == SQLITE_NULL ?
  285. strdupz((char *)"-") :
  286. strdupz((char *)format_value_and_unit(
  287. new_value_string, 100, sqlite3_column_double(res, 23), (char *)sqlite3_column_text(res, 17), -1));
  288. alarm_log.old_value_string =
  289. sqlite3_column_type(res, 24) == SQLITE_NULL ?
  290. strdupz((char *)"-") :
  291. strdupz((char *)format_value_and_unit(
  292. old_value_string, 100, sqlite3_column_double(res, 24), (char *)sqlite3_column_text(res, 17), -1));
  293. alarm_log.value = (NETDATA_DOUBLE) sqlite3_column_double(res, 23);
  294. alarm_log.old_value = (NETDATA_DOUBLE) sqlite3_column_double(res, 24);
  295. alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  296. alarm_log.rendered_info = sqlite3_text_strdupz_empty(res, 18);
  297. alarm_log.chart_context = sqlite3_text_strdupz_empty(res, 26);
  298. alarm_log.transition_id = sqlite3_uuid_unparse_strdupz(res, 27);
  299. alarm_log.event_id = (time_t) sqlite3_column_int64(res, 28);
  300. alarm_log.chart_name = sqlite3_text_strdupz_empty(res, 29);
  301. aclk_send_alarm_log_entry(&alarm_log);
  302. if (first_sequence_id == 0)
  303. first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  304. if (wc->alerts_log_first_sequence_id == 0)
  305. wc->alerts_log_first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  306. last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  307. wc->alerts_log_last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  308. destroy_alarm_log_entry(&alarm_log);
  309. freez(edit_command);
  310. }
  311. if (first_sequence_id) {
  312. buffer_flush(sql);
  313. buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
  314. "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
  315. wc->uuid_str, first_sequence_id, last_sequence_id);
  316. if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
  317. error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host));
  318. // Mark to do one more check
  319. rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  320. } else {
  321. if (wc->alerts_log_first_sequence_id)
  322. netdata_log_access(
  323. "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "",
  324. wc->node_id,
  325. wc->host ? rrdhost_hostname(wc->host) : "N/A",
  326. wc->alerts_log_first_sequence_id,
  327. wc->alerts_log_last_sequence_id);
  328. wc->alerts_log_first_sequence_id = 0;
  329. wc->alerts_log_last_sequence_id = 0;
  330. }
  331. rc = sqlite3_finalize(res);
  332. if (unlikely(rc != SQLITE_OK))
  333. error_report("Failed to finalize statement to send alert entries from the database, rc = %d", rc);
  334. freez(claim_id);
  335. buffer_free(sql);
  336. #endif
  337. }
  338. void aclk_push_alert_events_for_all_hosts(void)
  339. {
  340. RRDHOST *host;
  341. dfe_start_reentrant(rrdhost_root_index, host) {
  342. if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS))
  343. continue;
  344. internal_error(true, "ACLK SYNC: Scanning host %s", rrdhost_hostname(host));
  345. rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  346. struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
  347. if (likely(wc))
  348. aclk_push_alert_event(wc);
  349. }
  350. dfe_done(host);
  351. }
  352. void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
  353. {
  354. char uuid_str[UUID_STR_LEN];
  355. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  356. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  357. sqlite3_stmt *res = NULL;
  358. int rc;
  359. rw_spinlock_write_lock(&host->health_log.spinlock);
  360. buffer_sprintf(sql, "delete from aclk_alert_%s; ", uuid_str);
  361. if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) {
  362. rw_spinlock_write_unlock(&host->health_log.spinlock);
  363. buffer_free(sql);
  364. return;
  365. }
  366. buffer_flush(sql);
  367. buffer_sprintf(sql, "insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
  368. "select hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id from health_log_detail hld, health_log hl " \
  369. "where hld.new_status <> 0 and hld.new_status <> -2 and hl.health_log_id = hld.health_log_id and hl.config_hash_id is not null " \
  370. "and hld.updated_by_id = 0 and hl.host_id = @host_id order by hld.unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str);
  371. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  372. if (rc != SQLITE_OK) {
  373. error_report("Failed to prepare statement when trying to queue existing alerts.");
  374. rw_spinlock_write_unlock(&host->health_log.spinlock);
  375. buffer_free(sql);
  376. return;
  377. }
  378. rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
  379. if (unlikely(rc != SQLITE_OK)) {
  380. error_report("Failed to bind host_id for when trying to queue existing alerts.");
  381. sqlite3_finalize(res);
  382. rw_spinlock_write_unlock(&host->health_log.spinlock);
  383. buffer_free(sql);
  384. return;
  385. }
  386. rc = execute_insert(res);
  387. if (unlikely(rc != SQLITE_DONE)) {
  388. error_report("Failed to queue existing alerts, rc = %d", rc);
  389. }
  390. rc = sqlite3_finalize(res);
  391. if (unlikely(rc != SQLITE_OK))
  392. error_report("Failed to finalize statement to queue existing alerts, rc = %d", rc);
  393. rw_spinlock_write_unlock(&host->health_log.spinlock);
  394. buffer_free(sql);
  395. rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  396. }
  397. void aclk_send_alarm_configuration(char *config_hash)
  398. {
  399. if (unlikely(!config_hash))
  400. return;
  401. struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) localhost->aclk_sync_host_config;
  402. if (unlikely(!wc))
  403. return;
  404. netdata_log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
  405. aclk_push_alert_config(wc->node_id, config_hash);
  406. }
  407. #define SQL_SELECT_ALERT_CONFIG "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \
  408. "module, charts, families, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \
  409. "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \
  410. "p_db_lookup_before, p_update_every, chart_labels FROM alert_hash WHERE hash_id = @hash_id;"
  411. int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused)
  412. {
  413. int rc = 0;
  414. #ifdef ENABLE_ACLK
  415. CHECK_SQLITE_CONNECTION(db_meta);
  416. sqlite3_stmt *res = NULL;
  417. struct aclk_sync_host_config *wc = NULL;
  418. RRDHOST *host = find_host_by_node_id(node_id);
  419. if (unlikely(!host)) {
  420. freez(config_hash);
  421. freez(node_id);
  422. return 1;
  423. }
  424. wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
  425. if (unlikely(!wc)) {
  426. freez(config_hash);
  427. freez(node_id);
  428. return 1;
  429. }
  430. rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0);
  431. if (rc != SQLITE_OK) {
  432. error_report("Failed to prepare statement when trying to fetch an alarm hash configuration");
  433. return 1;
  434. }
  435. uuid_t hash_uuid;
  436. if (uuid_parse(config_hash, hash_uuid))
  437. return 1;
  438. rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC);
  439. if (unlikely(rc != SQLITE_OK))
  440. goto bind_fail;
  441. struct aclk_alarm_configuration alarm_config;
  442. struct provide_alarm_configuration p_alarm_config;
  443. p_alarm_config.cfg_hash = NULL;
  444. if (sqlite3_step_monitored(res) == SQLITE_ROW) {
  445. alarm_config.alarm = sqlite3_column_bytes(res, 0) > 0 ? strdupz((char *)sqlite3_column_text(res, 0)) : NULL;
  446. alarm_config.tmpl = sqlite3_column_bytes(res, 1) > 0 ? strdupz((char *)sqlite3_column_text(res, 1)) : NULL;
  447. alarm_config.on_chart = sqlite3_column_bytes(res, 2) > 0 ? strdupz((char *)sqlite3_column_text(res, 2)) : NULL;
  448. alarm_config.classification = sqlite3_column_bytes(res, 3) > 0 ? strdupz((char *)sqlite3_column_text(res, 3)) : NULL;
  449. alarm_config.type = sqlite3_column_bytes(res, 4) > 0 ? strdupz((char *)sqlite3_column_text(res, 4)) : NULL;
  450. alarm_config.component = sqlite3_column_bytes(res, 5) > 0 ? strdupz((char *)sqlite3_column_text(res, 5)) : NULL;
  451. alarm_config.os = sqlite3_column_bytes(res, 6) > 0 ? strdupz((char *)sqlite3_column_text(res, 6)) : NULL;
  452. alarm_config.hosts = sqlite3_column_bytes(res, 7) > 0 ? strdupz((char *)sqlite3_column_text(res, 7)) : NULL;
  453. alarm_config.plugin = sqlite3_column_bytes(res, 8) > 0 ? strdupz((char *)sqlite3_column_text(res, 8)) : NULL;
  454. alarm_config.module = sqlite3_column_bytes(res, 9) > 0 ? strdupz((char *)sqlite3_column_text(res, 9)) : NULL;
  455. alarm_config.charts = sqlite3_column_bytes(res, 10) > 0 ? strdupz((char *)sqlite3_column_text(res, 10)) : NULL;
  456. alarm_config.families = sqlite3_column_bytes(res, 11) > 0 ? strdupz((char *)sqlite3_column_text(res, 11)) : NULL;
  457. alarm_config.lookup = sqlite3_column_bytes(res, 12) > 0 ? strdupz((char *)sqlite3_column_text(res, 12)) : NULL;
  458. alarm_config.every = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
  459. alarm_config.units = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : NULL;
  460. alarm_config.green = sqlite3_column_bytes(res, 15) > 0 ? strdupz((char *)sqlite3_column_text(res, 15)) : NULL;
  461. alarm_config.red = sqlite3_column_bytes(res, 16) > 0 ? strdupz((char *)sqlite3_column_text(res, 16)) : NULL;
  462. alarm_config.calculation_expr = sqlite3_column_bytes(res, 17) > 0 ? strdupz((char *)sqlite3_column_text(res, 17)) : NULL;
  463. alarm_config.warning_expr = sqlite3_column_bytes(res, 18) > 0 ? strdupz((char *)sqlite3_column_text(res, 18)) : NULL;
  464. alarm_config.critical_expr = sqlite3_column_bytes(res, 19) > 0 ? strdupz((char *)sqlite3_column_text(res, 19)) : NULL;
  465. alarm_config.recipient = sqlite3_column_bytes(res, 20) > 0 ? strdupz((char *)sqlite3_column_text(res, 20)) : NULL;
  466. alarm_config.exec = sqlite3_column_bytes(res, 21) > 0 ? strdupz((char *)sqlite3_column_text(res, 21)) : NULL;
  467. alarm_config.delay = sqlite3_column_bytes(res, 22) > 0 ? strdupz((char *)sqlite3_column_text(res, 22)) : NULL;
  468. alarm_config.repeat = sqlite3_column_bytes(res, 23) > 0 ? strdupz((char *)sqlite3_column_text(res, 23)) : NULL;
  469. alarm_config.info = sqlite3_column_bytes(res, 24) > 0 ? strdupz((char *)sqlite3_column_text(res, 24)) : NULL;
  470. alarm_config.options = sqlite3_column_bytes(res, 25) > 0 ? strdupz((char *)sqlite3_column_text(res, 25)) : NULL;
  471. alarm_config.host_labels = sqlite3_column_bytes(res, 26) > 0 ? strdupz((char *)sqlite3_column_text(res, 26)) : NULL;
  472. alarm_config.p_db_lookup_dimensions = NULL;
  473. alarm_config.p_db_lookup_method = NULL;
  474. alarm_config.p_db_lookup_options = NULL;
  475. alarm_config.p_db_lookup_after = 0;
  476. alarm_config.p_db_lookup_before = 0;
  477. if (sqlite3_column_bytes(res, 30) > 0) {
  478. alarm_config.p_db_lookup_dimensions = sqlite3_column_bytes(res, 27) > 0 ? strdupz((char *)sqlite3_column_text(res, 27)) : NULL;
  479. alarm_config.p_db_lookup_method = sqlite3_column_bytes(res, 28) > 0 ? strdupz((char *)sqlite3_column_text(res, 28)) : NULL;
  480. BUFFER *tmp_buf = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  481. buffer_data_options2string(tmp_buf, sqlite3_column_int(res, 29));
  482. alarm_config.p_db_lookup_options = strdupz((char *)buffer_tostring(tmp_buf));
  483. buffer_free(tmp_buf);
  484. alarm_config.p_db_lookup_after = sqlite3_column_int(res, 30);
  485. alarm_config.p_db_lookup_before = sqlite3_column_int(res, 31);
  486. }
  487. alarm_config.p_update_every = sqlite3_column_int(res, 32);
  488. alarm_config.chart_labels = sqlite3_column_bytes(res, 33) > 0 ? strdupz((char *)sqlite3_column_text(res, 33)) : NULL;
  489. p_alarm_config.cfg_hash = strdupz((char *) config_hash);
  490. p_alarm_config.cfg = alarm_config;
  491. }
  492. if (likely(p_alarm_config.cfg_hash)) {
  493. netdata_log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
  494. aclk_send_provide_alarm_cfg(&p_alarm_config);
  495. freez(p_alarm_config.cfg_hash);
  496. destroy_aclk_alarm_configuration(&alarm_config);
  497. }
  498. else
  499. netdata_log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
  500. bind_fail:
  501. rc = sqlite3_finalize(res);
  502. if (unlikely(rc != SQLITE_OK))
  503. error_report("Failed to reset statement when pushing alarm config hash, rc = %d", rc);
  504. freez(config_hash);
  505. freez(node_id);
  506. #endif
  507. return rc;
  508. }
  509. // Start streaming alerts
  510. void aclk_start_alert_streaming(char *node_id, bool resets)
  511. {
  512. if (unlikely(!node_id))
  513. return;
  514. uuid_t node_uuid;
  515. if (uuid_parse(node_id, node_uuid))
  516. return;
  517. RRDHOST *host = find_host_by_node_id(node_id);
  518. if (unlikely(!host))
  519. return;
  520. struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
  521. if (unlikely(!wc))
  522. return;
  523. if (unlikely(!host->health.health_enabled)) {
  524. netdata_log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
  525. return;
  526. }
  527. if (resets) {
  528. netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
  529. sql_queue_existing_alerts_to_aclk(host);
  530. } else
  531. netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
  532. wc->alert_updates = 1;
  533. wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS;
  534. }
  535. #define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
  536. "SELECT hld.unique_id alert_unique_id, UNIXEPOCH(), hld.unique_id alert_unique_id FROM health_log hl, health_log_detail hld " \
  537. "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \
  538. "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \
  539. "AND hl.config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \
  540. "ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING;"
  541. void sql_process_queue_removed_alerts_to_aclk(char *node_id)
  542. {
  543. struct aclk_sync_host_config *wc;
  544. RRDHOST *host = find_host_by_node_id(node_id);
  545. freez(node_id);
  546. if (unlikely(!host || !(wc = host->aclk_sync_host_config)))
  547. return;
  548. char sql[ACLK_SYNC_QUERY_SIZE * 2];
  549. sqlite3_stmt *res = NULL;
  550. snprintfz(sql, ACLK_SYNC_QUERY_SIZE * 2 - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str);
  551. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  552. if (rc != SQLITE_OK) {
  553. error_report("Failed to prepare statement when trying to queue removed alerts.");
  554. return;
  555. }
  556. rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
  557. if (unlikely(rc != SQLITE_OK)) {
  558. error_report("Failed to bind host_id for when trying to queue remvoed alerts.");
  559. sqlite3_finalize(res);
  560. return;
  561. }
  562. rc = execute_insert(res);
  563. if (unlikely(rc != SQLITE_DONE)) {
  564. sqlite3_finalize(res);
  565. error_report("Failed to queue removed alerts, rc = %d", rc);
  566. return;
  567. }
  568. rc = sqlite3_finalize(res);
  569. if (unlikely(rc != SQLITE_OK))
  570. error_report("Failed to finalize statement to queue removed alerts, rc = %d", rc);
  571. netdata_log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host));
  572. rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  573. wc->alert_queue_removed = 0;
  574. }
  575. void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
  576. {
  577. if (unlikely(!host->aclk_sync_host_config))
  578. return;
  579. if (!claimed() || !host->node_id)
  580. return;
  581. char node_id[UUID_STR_LEN];
  582. uuid_unparse_lower(*host->node_id, node_id);
  583. aclk_push_node_removed_alerts(node_id);
  584. }
  585. void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid)
  586. {
  587. uuid_t node_uuid;
  588. if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
  589. return;
  590. RRDHOST *host = find_host_by_node_id(node_id);
  591. if (unlikely(!host)) {
  592. netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
  593. return;
  594. }
  595. struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
  596. if (unlikely(!wc)) {
  597. netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
  598. return;
  599. }
  600. netdata_log_access(
  601. "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s",
  602. node_id,
  603. wc->host ? rrdhost_hostname(wc->host) : "N/A",
  604. snapshot_uuid);
  605. if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid))
  606. return;
  607. __sync_synchronize();
  608. wc->alerts_snapshot_uuid = strdupz(snapshot_uuid);
  609. __sync_synchronize();
  610. aclk_push_node_alert_snapshot(node_id);
  611. }
  612. #ifdef ENABLE_ACLK
  613. void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
  614. {
  615. char *edit_command = ae->source ? health_edit_command_from_source(ae_source(ae)) : strdupz("UNKNOWN=0=UNKNOWN");
  616. char config_hash_id[UUID_STR_LEN];
  617. uuid_unparse_lower(ae->config_hash_id, config_hash_id);
  618. char transition_id[UUID_STR_LEN];
  619. uuid_unparse_lower(ae->transition_id, transition_id);
  620. alarm_log->chart = strdupz(ae_chart_id(ae));
  621. alarm_log->name = strdupz(ae_name(ae));
  622. alarm_log->family = strdupz(ae_family(ae));
  623. alarm_log->batch_id = 0;
  624. alarm_log->sequence_id = 0;
  625. alarm_log->when = (time_t)ae->when;
  626. alarm_log->config_hash = strdupz((char *)config_hash_id);
  627. alarm_log->utc_offset = host->utc_offset;
  628. alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host));
  629. alarm_log->exec_path = ae->exec ? strdupz(ae_exec(ae)) : strdupz((char *)string2str(host->health.health_default_exec));
  630. alarm_log->conf_source = ae->source ? strdupz(ae_source(ae)) : strdupz((char *)"");
  631. alarm_log->command = strdupz((char *)edit_command);
  632. alarm_log->duration = (time_t)ae->duration;
  633. alarm_log->non_clear_duration = (time_t)ae->non_clear_duration;
  634. alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status);
  635. alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status);
  636. alarm_log->delay = (int)ae->delay;
  637. alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp;
  638. alarm_log->last_repeat = (time_t)ae->last_repeat;
  639. alarm_log->silenced =
  640. ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp(ae_recipient(ae), "silent", 6))) ?
  641. 1 :
  642. 0;
  643. alarm_log->value_string = strdupz(ae_new_value_string(ae));
  644. alarm_log->old_value_string = strdupz(ae_old_value_string(ae));
  645. alarm_log->value = (!isnan(ae->new_value)) ? (NETDATA_DOUBLE)ae->new_value : 0;
  646. alarm_log->old_value = (!isnan(ae->old_value)) ? (NETDATA_DOUBLE)ae->old_value : 0;
  647. alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  648. alarm_log->rendered_info = strdupz(ae_info(ae));
  649. alarm_log->chart_context = strdupz(ae_chart_context(ae));
  650. alarm_log->chart_name = strdupz(ae_chart_name(ae));
  651. alarm_log->transition_id = strdupz((char *)transition_id);
  652. alarm_log->event_id = (uint64_t) ae->alarm_event_id;
  653. freez(edit_command);
  654. }
  655. #endif
  656. #ifdef ENABLE_ACLK
  657. static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark)
  658. {
  659. ALARM_ENTRY *ae = host->health_log.alarms;
  660. while (ae) {
  661. if (ae->alarm_id == alarm_id && ae->unique_id >mark &&
  662. (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL))
  663. return 1;
  664. ae = ae->next;
  665. }
  666. return 0;
  667. }
  668. #endif
  669. #define ALARM_EVENTS_PER_CHUNK 1000
  670. void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
  671. {
  672. #ifdef ENABLE_ACLK
  673. RRDHOST *host = find_host_by_node_id(node_id);
  674. if (unlikely(!host)) {
  675. netdata_log_access("AC [%s (N/A)]: Node id not found", node_id);
  676. freez(node_id);
  677. return;
  678. }
  679. freez(node_id);
  680. struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
  681. // we perhaps we don't need this for snapshots
  682. if (unlikely(!wc->alert_updates)) {
  683. netdata_log_access(
  684. "ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.",
  685. wc->node_id,
  686. wc->host ? rrdhost_hostname(wc->host) : "N/A");
  687. return;
  688. }
  689. if (unlikely(!wc->alerts_snapshot_uuid))
  690. return;
  691. char *claim_id = get_agent_claimid();
  692. if (unlikely(!claim_id))
  693. return;
  694. netdata_log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid);
  695. uint32_t cnt = 0;
  696. char uuid_str[UUID_STR_LEN];
  697. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  698. rw_spinlock_read_lock(&host->health_log.spinlock);
  699. ALARM_ENTRY *ae = host->health_log.alarms;
  700. for (; ae; ae = ae->next) {
  701. if (likely(ae->updated_by_id))
  702. continue;
  703. if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  704. continue;
  705. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  706. continue;
  707. if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid))
  708. continue;
  709. cnt++;
  710. }
  711. if (cnt) {
  712. uint32_t chunk = 1, chunks = 0;
  713. chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
  714. ae = host->health_log.alarms;
  715. cnt = 0;
  716. struct alarm_snapshot alarm_snap;
  717. alarm_snap.node_id = wc->node_id;
  718. alarm_snap.claim_id = claim_id;
  719. alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
  720. alarm_snap.chunks = chunks;
  721. alarm_snap.chunk = chunk;
  722. alarm_snapshot_proto_ptr_t snapshot_proto = NULL;
  723. for (; ae; ae = ae->next) {
  724. if (likely(ae->updated_by_id))
  725. continue;
  726. if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  727. continue;
  728. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  729. continue;
  730. if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid))
  731. continue;
  732. cnt++;
  733. struct alarm_log_entry alarm_log;
  734. alarm_log.node_id = wc->node_id;
  735. alarm_log.claim_id = claim_id;
  736. if (!snapshot_proto)
  737. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  738. health_alarm_entry2proto_nolock(&alarm_log, ae, host);
  739. add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
  740. if (cnt == ALARM_EVENTS_PER_CHUNK) {
  741. aclk_send_alarm_snapshot(snapshot_proto);
  742. cnt = 0;
  743. if (chunk < chunks) {
  744. chunk++;
  745. struct alarm_snapshot alarm_snap;
  746. alarm_snap.node_id = wc->node_id;
  747. alarm_snap.claim_id = claim_id;
  748. alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
  749. alarm_snap.chunks = chunks;
  750. alarm_snap.chunk = chunk;
  751. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  752. }
  753. }
  754. destroy_alarm_log_entry(&alarm_log);
  755. }
  756. if (cnt)
  757. aclk_send_alarm_snapshot(snapshot_proto);
  758. }
  759. rw_spinlock_read_unlock(&host->health_log.spinlock);
  760. wc->alerts_snapshot_uuid = NULL;
  761. freez(claim_id);
  762. #endif
  763. }
  764. #define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE date_created + %d < UNIXEPOCH();"
  765. void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
  766. {
  767. char uuid_str[UUID_STR_LEN];
  768. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  769. char sql[ACLK_SYNC_QUERY_SIZE];
  770. snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_DELETE_ALERT_ENTRIES, uuid_str, MAX_REMOVED_PERIOD);
  771. char *err_msg = NULL;
  772. int rc = sqlite3_exec_monitored(db_meta, sql, NULL, NULL, &err_msg);
  773. if (rc != SQLITE_OK) {
  774. error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s\"", uuid_str, err_msg);
  775. sqlite3_free(err_msg);
  776. }
  777. }
  778. #define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \
  779. "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \
  780. "FROM aclk_alert_%s WHERE date_submitted IS NULL;"
  781. int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
  782. {
  783. int rc;
  784. struct aclk_sync_host_config *wc = NULL;
  785. wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
  786. if (!wc)
  787. return 1;
  788. proto_alert_status->alert_updates = wc->alert_updates;
  789. char sql[ACLK_SYNC_QUERY_SIZE];
  790. sqlite3_stmt *res = NULL;
  791. snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str);
  792. rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  793. if (rc != SQLITE_OK) {
  794. error_report("Failed to prepare statement to get alert log status from the database.");
  795. return 1;
  796. }
  797. while (sqlite3_step_monitored(res) == SQLITE_ROW) {
  798. proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
  799. proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
  800. proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
  801. }
  802. rc = sqlite3_finalize(res);
  803. if (unlikely(rc != SQLITE_OK))
  804. error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc);
  805. return 0;
  806. }
  807. void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused)
  808. {
  809. if (unlikely(!node_id))
  810. return;
  811. struct aclk_sync_host_config *wc = NULL;
  812. RRDHOST *host = find_host_by_node_id(node_id);
  813. if (unlikely(!host))
  814. return;
  815. wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
  816. if (unlikely(!wc)) {
  817. netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id);
  818. return;
  819. }
  820. netdata_log_access("ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host));
  821. wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS;
  822. }
  823. typedef struct active_alerts {
  824. char *name;
  825. char *chart;
  826. RRDCALC_STATUS status;
  827. } active_alerts_t;
  828. static inline int compare_active_alerts(const void * a, const void * b) {
  829. active_alerts_t *active_alerts_a = (active_alerts_t *)a;
  830. active_alerts_t *active_alerts_b = (active_alerts_t *)b;
  831. if( !(strcmp(active_alerts_a->name, active_alerts_b->name)) )
  832. {
  833. return strcmp(active_alerts_a->chart, active_alerts_b->chart);
  834. }
  835. else
  836. return strcmp(active_alerts_a->name, active_alerts_b->name);
  837. }
  838. #define BATCH_ALLOCATED 10
  839. void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused)
  840. {
  841. #ifdef ENABLE_ACLK
  842. struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
  843. if (unlikely(!wc)) {
  844. netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host));
  845. return;
  846. }
  847. if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) {
  848. //postpone checkpoint send
  849. wc->alert_checkpoint_req+=3;
  850. netdata_log_access("ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host));
  851. return;
  852. }
  853. RRDCALC *rc;
  854. uint32_t cnt = 0;
  855. size_t len = 0;
  856. active_alerts_t *active_alerts = callocz(BATCH_ALLOCATED, sizeof(active_alerts_t));
  857. foreach_rrdcalc_in_rrdhost_read(host, rc) {
  858. if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec))
  859. continue;
  860. if (rc->status == RRDCALC_STATUS_WARNING ||
  861. rc->status == RRDCALC_STATUS_CRITICAL) {
  862. if (cnt && !(cnt % BATCH_ALLOCATED)) {
  863. active_alerts = reallocz(active_alerts, (BATCH_ALLOCATED * ((cnt / BATCH_ALLOCATED) + 1)) * sizeof(active_alerts_t));
  864. }
  865. active_alerts[cnt].name = (char *)rrdcalc_name(rc);
  866. len += string_strlen(rc->name);
  867. active_alerts[cnt].chart = (char *)rrdcalc_chart_name(rc);
  868. len += string_strlen(rc->chart);
  869. active_alerts[cnt].status = rc->status;
  870. len++;
  871. cnt++;
  872. }
  873. }
  874. foreach_rrdcalc_in_rrdhost_done(rc);
  875. BUFFER *alarms_to_hash;
  876. if (cnt) {
  877. qsort (active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts);
  878. alarms_to_hash = buffer_create(len, NULL);
  879. for (uint32_t i=0;i<cnt;i++) {
  880. buffer_strcat(alarms_to_hash, active_alerts[i].name);
  881. buffer_strcat(alarms_to_hash, active_alerts[i].chart);
  882. if (active_alerts[i].status == RRDCALC_STATUS_WARNING)
  883. buffer_strcat(alarms_to_hash, "W");
  884. else if (active_alerts[i].status == RRDCALC_STATUS_CRITICAL)
  885. buffer_strcat(alarms_to_hash, "C");
  886. }
  887. } else {
  888. alarms_to_hash = buffer_create(1, NULL);
  889. buffer_strcat(alarms_to_hash, "");
  890. len = 0;
  891. }
  892. freez(active_alerts);
  893. char hash[SHA256_DIGEST_LENGTH + 1];
  894. if (hash256_string((const unsigned char *)buffer_tostring(alarms_to_hash), len, hash)) {
  895. hash[SHA256_DIGEST_LENGTH] = 0;
  896. struct alarm_checkpoint alarm_checkpoint;
  897. char *claim_id = get_agent_claimid();
  898. alarm_checkpoint.claim_id = claim_id;
  899. alarm_checkpoint.node_id = wc->node_id;
  900. alarm_checkpoint.checksum = (char *)hash;
  901. aclk_send_provide_alarm_checkpoint(&alarm_checkpoint);
  902. freez(claim_id);
  903. netdata_log_access("ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host));
  904. } else {
  905. netdata_log_access("ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host));
  906. }
  907. wc->alert_checkpoint_req = 0;
  908. buffer_free(alarms_to_hash);
  909. #endif
  910. }