sqlite_aclk_alert.c 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_functions.h"
  3. #include "sqlite_aclk_alert.h"
  4. #ifdef ENABLE_ACLK
  5. #include "../../aclk/aclk_alarm_api.h"
  6. #endif
  7. #define SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param) \
  8. ({ \
  9. int _param = (param); \
  10. sqlite3_column_bytes((res), (_param)) ? strdupz((char *)sqlite3_column_text((res), (_param))) : NULL; \
  11. })
  12. #define SQL_UPDATE_FILTERED_ALERT \
  13. "UPDATE aclk_alert_%s SET filtered_alert_unique_id = @new_alert, date_created = UNIXEPOCH() " \
  14. "WHERE filtered_alert_unique_id = @old_alert"
  15. static void update_filtered(ALARM_ENTRY *ae, int64_t unique_id, char *uuid_str)
  16. {
  17. sqlite3_stmt *res = NULL;
  18. char sql[ACLK_SYNC_QUERY_SIZE];
  19. snprintfz(sql, sizeof(sql) - 1, SQL_UPDATE_FILTERED_ALERT, uuid_str);
  20. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  21. if (rc != SQLITE_OK) {
  22. error_report("Failed to prepare statement when trying to update_filtered");
  23. return;
  24. }
  25. rc = sqlite3_bind_int64(res, 1, ae->unique_id);
  26. if (unlikely(rc != SQLITE_OK)) {
  27. error_report("Failed to bind ae unique_id for update_filtered");
  28. goto done;
  29. }
  30. rc = sqlite3_bind_int64(res, 2, unique_id);
  31. if (unlikely(rc != SQLITE_OK)) {
  32. error_report("Failed to bind unique_id for update_filtered");
  33. goto done;
  34. }
  35. rc = sqlite3_step_monitored(res);
  36. if (likely(rc == SQLITE_DONE))
  37. ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
  38. done:
  39. rc = sqlite3_finalize(res);
  40. if (unlikely(rc != SQLITE_OK))
  41. error_report("Failed to finalize statement when trying to update_filtered, rc = %d", rc);
  42. }
  43. #define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID \
  44. "SELECT hld.unique_id FROM health_log hl, alert_hash ah, health_log_detail hld " \
  45. "WHERE hld.unique_id = @unique_id AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id " \
  46. "AND hl.host_id = @host_id AND ah.warn IS NULL AND ah.crit IS NULL"
  47. static inline bool is_event_from_alert_variable_config(int64_t unique_id, uuid_t *host_id)
  48. {
  49. sqlite3_stmt *res = NULL;
  50. int rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, -1, &res, 0);
  51. if (rc != SQLITE_OK) {
  52. error_report("Failed to prepare statement when trying to check for alert variables.");
  53. return false;
  54. }
  55. bool ret = false;
  56. rc = sqlite3_bind_int64(res, 1, unique_id);
  57. if (unlikely(rc != SQLITE_OK)) {
  58. error_report("Failed to bind unique_id for checking alert variable.");
  59. goto done;
  60. }
  61. rc = sqlite3_bind_blob(res, 2, host_id, sizeof(*host_id), SQLITE_STATIC);
  62. if (unlikely(rc != SQLITE_OK)) {
  63. error_report("Failed to bind host_id for checking alert variable.");
  64. goto done;
  65. }
  66. rc = sqlite3_step_monitored(res);
  67. if (likely(rc == SQLITE_ROW))
  68. ret = true;
  69. done:
  70. rc = sqlite3_finalize(res);
  71. if (unlikely(rc != SQLITE_OK))
  72. error_report("Failed to finalize statement when trying to check for alert variables, rc = %d", rc);
  73. return ret;
  74. }
  75. #define MAX_REMOVED_PERIOD 604800 //a week
  76. //decide if some events should be sent or not
  77. #define SQL_SELECT_ALERT_BY_ID \
  78. "SELECT hld.new_status, hl.config_hash_id, hld.unique_id FROM health_log hl, aclk_alert_%s aa, health_log_detail hld " \
  79. "WHERE hl.host_id = @host_id AND hld.unique_id = aa.filtered_alert_unique_id " \
  80. "AND hld.alarm_id = @alarm_id AND hl.health_log_id = hld.health_log_id " \
  81. "ORDER BY hld.rowid DESC LIMIT 1"
  82. static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
  83. {
  84. sqlite3_stmt *res = NULL;
  85. if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED)
  86. return 0;
  87. if (unlikely(uuid_is_null(ae->config_hash_id) || !host->aclk_config))
  88. return 0;
  89. char sql[ACLK_SYNC_QUERY_SIZE];
  90. //get the previous sent event of this alarm_id
  91. //base the search on the last filtered event
  92. snprintfz(sql, sizeof(sql) - 1, SQL_SELECT_ALERT_BY_ID, host->aclk_config->uuid_str);
  93. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  94. if (rc != SQLITE_OK) {
  95. error_report("Failed to prepare statement when trying should_send_to_cloud.");
  96. return true;
  97. }
  98. bool send = false;
  99. rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
  100. if (unlikely(rc != SQLITE_OK)) {
  101. error_report("Failed to bind host_id for checking should_send_to_cloud");
  102. goto done;
  103. }
  104. rc = sqlite3_bind_int(res, 2, (int) ae->alarm_id);
  105. if (unlikely(rc != SQLITE_OK)) {
  106. error_report("Failed to bind alarm_id for checking should_send_to_cloud");
  107. goto done;
  108. }
  109. rc = sqlite3_step_monitored(res);
  110. if (likely(rc == SQLITE_ROW)) {
  111. uuid_t config_hash_id;
  112. RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 0);
  113. if (sqlite3_column_type(res, 1) != SQLITE_NULL)
  114. uuid_copy(config_hash_id, *((uuid_t *)sqlite3_column_blob(res, 1)));
  115. int64_t unique_id = sqlite3_column_int64(res, 2);
  116. if (ae->new_status != (RRDCALC_STATUS)status || uuid_memcmp(&ae->config_hash_id, &config_hash_id))
  117. send = true;
  118. else
  119. update_filtered(ae, unique_id, host->aclk_config->uuid_str);
  120. } else
  121. send = true;
  122. done:
  123. rc = sqlite3_finalize(res);
  124. if (unlikely(rc != SQLITE_OK))
  125. error_report("Failed to finalize statement when trying should_send_to_cloud, rc = %d", rc);
  126. return send;
  127. }
  128. #define SQL_QUEUE_ALERT_TO_CLOUD \
  129. "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
  130. "VALUES (@alert_unique_id, UNIXEPOCH(), @alert_unique_id) ON CONFLICT (alert_unique_id) DO NOTHING"
  131. void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter)
  132. {
  133. sqlite3_stmt *res_alert = NULL;
  134. char sql[ACLK_SYNC_QUERY_SIZE];
  135. if (!service_running(SERVICE_ACLK))
  136. return;
  137. if (!claimed() || ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED)
  138. return;
  139. if (false == skip_filter && !should_send_to_cloud(host, ae))
  140. return;
  141. if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid))
  142. return;
  143. snprintfz(sql, sizeof(sql) - 1, SQL_QUEUE_ALERT_TO_CLOUD, host->aclk_config->uuid_str);
  144. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res_alert, 0);
  145. if (unlikely(rc != SQLITE_OK)) {
  146. error_report("Failed to prepare statement to store alert event");
  147. return;
  148. }
  149. rc = sqlite3_bind_int64(res_alert, 1, ae->unique_id);
  150. if (unlikely(rc != SQLITE_OK))
  151. goto done;
  152. rc = execute_insert(res_alert);
  153. if (unlikely(rc == SQLITE_DONE)) {
  154. ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
  155. rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  156. } else
  157. error_report("Failed to store alert event %"PRId64", rc = %d", ae->unique_id, rc);
  158. done:
  159. if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK))
  160. error_report("Failed to reset statement in store alert event, rc = %d", rc);
  161. }
  162. int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
  163. {
  164. #ifdef ENABLE_ACLK
  165. switch(status) {
  166. case RRDCALC_STATUS_REMOVED:
  167. return ALARM_STATUS_REMOVED;
  168. case RRDCALC_STATUS_UNDEFINED:
  169. return ALARM_STATUS_NOT_A_NUMBER;
  170. case RRDCALC_STATUS_CLEAR:
  171. return ALARM_STATUS_CLEAR;
  172. case RRDCALC_STATUS_WARNING:
  173. return ALARM_STATUS_WARNING;
  174. case RRDCALC_STATUS_CRITICAL:
  175. return ALARM_STATUS_CRITICAL;
  176. default:
  177. return ALARM_STATUS_UNKNOWN;
  178. }
  179. #else
  180. UNUSED(status);
  181. return 1;
  182. #endif
  183. }
  184. static inline char *sqlite3_uuid_unparse_strdupz(sqlite3_stmt *res, int iCol) {
  185. char uuid_str[UUID_STR_LEN];
  186. if(sqlite3_column_type(res, iCol) == SQLITE_NULL)
  187. uuid_str[0] = '\0';
  188. else
  189. uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, iCol)), uuid_str);
  190. return strdupz(uuid_str);
  191. }
  192. static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) {
  193. char *ret;
  194. if(sqlite3_column_type(res, iCol) == SQLITE_NULL)
  195. ret = "";
  196. else
  197. ret = (char *)sqlite3_column_text(res, iCol);
  198. return strdupz(ret);
  199. }
  200. static void aclk_push_alert_event(struct aclk_sync_cfg_t *wc __maybe_unused)
  201. {
  202. #ifdef ENABLE_ACLK
  203. int rc;
  204. if (unlikely(!wc->alert_updates)) {
  205. nd_log(NDLS_ACCESS, NDLP_NOTICE,
  206. "ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.",
  207. wc->node_id,
  208. wc->host ? rrdhost_hostname(wc->host) : "N/A");
  209. return;
  210. }
  211. char *claim_id = get_agent_claimid();
  212. if (unlikely(!claim_id))
  213. return;
  214. if (unlikely(!wc->host)) {
  215. freez(claim_id);
  216. return;
  217. }
  218. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  219. sqlite3_stmt *res = NULL;
  220. buffer_sprintf(
  221. sql,
  222. "SELECT aa.sequence_id, hld.unique_id, hld.alarm_id, hl.config_hash_id, hld.updated_by_id, hld.when_key, "
  223. " hld.duration, hld.non_clear_duration, hld.flags, hld.exec_run_timestamp, hld.delay_up_to_timestamp, hl.name, "
  224. " hl.chart, hl.exec, hl.recipient, ha.source, hl.units, hld.info, hld.exec_code, hld.new_status, "
  225. " hld.old_status, hld.delay, hld.new_value, hld.old_value, hld.last_repeat, hl.chart_context, hld.transition_id, "
  226. " hld.alarm_event_id, hl.chart_name, hld.summary "
  227. " FROM health_log hl, aclk_alert_%s aa, alert_hash ha, health_log_detail hld "
  228. " WHERE hld.unique_id = aa.alert_unique_id AND hl.config_hash_id = ha.hash_id AND aa.date_submitted IS NULL "
  229. " AND hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id "
  230. " ORDER BY aa.sequence_id ASC LIMIT "ACLK_MAX_ALERT_UPDATES,
  231. wc->uuid_str);
  232. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  233. if (rc != SQLITE_OK) {
  234. BUFFER *sql_fix = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  235. buffer_sprintf(sql_fix, TABLE_ACLK_ALERT, wc->uuid_str);
  236. rc = db_execute(db_meta, buffer_tostring(sql_fix));
  237. if (unlikely(rc))
  238. error_report("Failed to create ACLK alert table for host %s", rrdhost_hostname(wc->host));
  239. buffer_free(sql_fix);
  240. // Try again
  241. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  242. if (rc != SQLITE_OK) {
  243. error_report("Failed to prepare statement when trying to send an alert update via ACLK");
  244. buffer_free(sql);
  245. freez(claim_id);
  246. return;
  247. }
  248. }
  249. rc = sqlite3_bind_blob(res, 1, &wc->host->host_uuid, sizeof(wc->host->host_uuid), SQLITE_STATIC);
  250. if (unlikely(rc != SQLITE_OK)) {
  251. error_report("Failed to bind host_id for pushing alert event.");
  252. goto done;
  253. }
  254. uint64_t first_sequence_id = 0;
  255. uint64_t last_sequence_id = 0;
  256. while (sqlite3_step_monitored(res) == SQLITE_ROW) {
  257. struct alarm_log_entry alarm_log;
  258. char old_value_string[100 + 1];
  259. char new_value_string[100 + 1];
  260. alarm_log.node_id = wc->node_id;
  261. alarm_log.claim_id = claim_id;
  262. alarm_log.chart = strdupz((char *)sqlite3_column_text(res, 12));
  263. alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11));
  264. alarm_log.when = (time_t) sqlite3_column_int64(res, 5);
  265. alarm_log.config_hash = sqlite3_uuid_unparse_strdupz(res, 3);
  266. alarm_log.utc_offset = wc->host->utc_offset;
  267. alarm_log.timezone = strdupz(rrdhost_abbrev_timezone(wc->host));
  268. alarm_log.exec_path = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) :
  269. strdupz((char *)string2str(wc->host->health.health_default_exec));
  270. alarm_log.conf_source = sqlite3_column_bytes(res, 15) > 0 ? strdupz((char *)sqlite3_column_text(res, 15)) : strdupz("");
  271. char *edit_command = sqlite3_column_bytes(res, 15) > 0 ?
  272. health_edit_command_from_source((char *)sqlite3_column_text(res, 15)) :
  273. strdupz("UNKNOWN=0=UNKNOWN");
  274. alarm_log.command = strdupz(edit_command);
  275. alarm_log.duration = (time_t) sqlite3_column_int64(res, 6);
  276. alarm_log.non_clear_duration = (time_t) sqlite3_column_int64(res, 7);
  277. alarm_log.status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 19));
  278. alarm_log.old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 20));
  279. alarm_log.delay = (int) sqlite3_column_int(res, 21);
  280. alarm_log.delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 10);
  281. alarm_log.last_repeat = (time_t) sqlite3_column_int64(res, 24);
  282. alarm_log.silenced = ((sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) ||
  283. (sqlite3_column_type(res, 14) != SQLITE_NULL &&
  284. !strncmp((char *)sqlite3_column_text(res, 14), "silent", 6))) ?
  285. 1 :
  286. 0;
  287. alarm_log.value_string =
  288. sqlite3_column_type(res, 22) == SQLITE_NULL ?
  289. strdupz((char *)"-") :
  290. strdupz((char *)format_value_and_unit(
  291. new_value_string, 100, sqlite3_column_double(res, 22), (char *)sqlite3_column_text(res, 16), -1));
  292. alarm_log.old_value_string =
  293. sqlite3_column_type(res, 23) == SQLITE_NULL ?
  294. strdupz((char *)"-") :
  295. strdupz((char *)format_value_and_unit(
  296. old_value_string, 100, sqlite3_column_double(res, 23), (char *)sqlite3_column_text(res, 16), -1));
  297. alarm_log.value = (NETDATA_DOUBLE) sqlite3_column_double(res, 22);
  298. alarm_log.old_value = (NETDATA_DOUBLE) sqlite3_column_double(res, 23);
  299. alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  300. alarm_log.rendered_info = sqlite3_text_strdupz_empty(res, 17);
  301. alarm_log.chart_context = sqlite3_text_strdupz_empty(res, 25);
  302. alarm_log.transition_id = sqlite3_uuid_unparse_strdupz(res, 26);
  303. alarm_log.event_id = (time_t) sqlite3_column_int64(res, 27);
  304. alarm_log.chart_name = sqlite3_text_strdupz_empty(res, 28);
  305. alarm_log.summary = sqlite3_text_strdupz_empty(res, 29);
  306. aclk_send_alarm_log_entry(&alarm_log);
  307. if (first_sequence_id == 0)
  308. first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  309. if (wc->alerts_log_first_sequence_id == 0)
  310. wc->alerts_log_first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  311. last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  312. wc->alerts_log_last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  313. destroy_alarm_log_entry(&alarm_log);
  314. freez(edit_command);
  315. }
  316. if (first_sequence_id) {
  317. buffer_flush(sql);
  318. buffer_sprintf(
  319. sql,
  320. "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
  321. "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64,
  322. wc->uuid_str,
  323. first_sequence_id,
  324. last_sequence_id);
  325. if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
  326. error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host));
  327. // Mark to do one more check
  328. rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  329. } else {
  330. if (wc->alerts_log_first_sequence_id)
  331. nd_log(NDLS_ACCESS, NDLP_DEBUG,
  332. "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 "",
  333. wc->node_id,
  334. wc->host ? rrdhost_hostname(wc->host) : "N/A",
  335. wc->alerts_log_first_sequence_id,
  336. wc->alerts_log_last_sequence_id);
  337. wc->alerts_log_first_sequence_id = 0;
  338. wc->alerts_log_last_sequence_id = 0;
  339. }
  340. done:
  341. rc = sqlite3_finalize(res);
  342. if (unlikely(rc != SQLITE_OK))
  343. error_report("Failed to finalize statement to send alert entries from the database, rc = %d", rc);
  344. freez(claim_id);
  345. buffer_free(sql);
  346. #endif
  347. }
  348. void aclk_push_alert_events_for_all_hosts(void)
  349. {
  350. RRDHOST *host;
  351. dfe_start_reentrant(rrdhost_root_index, host) {
  352. if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) ||
  353. !rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS))
  354. continue;
  355. rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  356. struct aclk_sync_cfg_t *wc = host->aclk_config;
  357. if (likely(wc))
  358. aclk_push_alert_event(wc);
  359. }
  360. dfe_done(host);
  361. }
  362. void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
  363. {
  364. sqlite3_stmt *res = NULL;
  365. int rc;
  366. struct aclk_sync_cfg_t *wc = host->aclk_config;
  367. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  368. rw_spinlock_write_lock(&host->health_log.spinlock);
  369. buffer_sprintf(sql, "DELETE FROM aclk_alert_%s", wc->uuid_str);
  370. if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
  371. goto skip;
  372. buffer_flush(sql);
  373. buffer_sprintf(
  374. sql,
  375. "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) "
  376. "SELECT hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id FROM health_log_detail hld, health_log hl "
  377. "WHERE hld.new_status <> 0 AND hld.new_status <> -2 AND hl.health_log_id = hld.health_log_id AND hl.config_hash_id IS NOT NULL "
  378. "AND hld.updated_by_id = 0 AND hl.host_id = @host_id ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING",
  379. wc->uuid_str);
  380. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  381. if (rc != SQLITE_OK) {
  382. error_report("Failed to prepare statement when trying to queue existing alerts.");
  383. goto skip;
  384. }
  385. rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
  386. if (unlikely(rc != SQLITE_OK)) {
  387. error_report("Failed to bind host_id for when trying to queue existing alerts.");
  388. goto done;
  389. }
  390. rc = execute_insert(res);
  391. if (unlikely(rc != SQLITE_DONE))
  392. error_report("Failed to queue existing alerts, rc = %d", rc);
  393. else
  394. rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  395. done:
  396. rc = sqlite3_finalize(res);
  397. if (unlikely(rc != SQLITE_OK))
  398. error_report("Failed to finalize statement to queue existing alerts, rc = %d", rc);
  399. skip:
  400. rw_spinlock_write_unlock(&host->health_log.spinlock);
  401. buffer_free(sql);
  402. }
  403. void aclk_send_alarm_configuration(char *config_hash)
  404. {
  405. if (unlikely(!config_hash))
  406. return;
  407. struct aclk_sync_cfg_t *wc = localhost->aclk_config;
  408. if (unlikely(!wc))
  409. return;
  410. nd_log(NDLS_ACCESS, NDLP_DEBUG,
  411. "ACLK REQ [%s (%s)]: Request to send alert config %s.",
  412. wc->node_id,
  413. wc->host ? rrdhost_hostname(wc->host) : "N/A",
  414. config_hash);
  415. aclk_push_alert_config(wc->node_id, config_hash);
  416. }
  417. #define SQL_SELECT_ALERT_CONFIG \
  418. "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \
  419. "module, charts, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \
  420. "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \
  421. "p_db_lookup_before, p_update_every, chart_labels, summary FROM alert_hash WHERE hash_id = @hash_id"
  422. void aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused)
  423. {
  424. #ifdef ENABLE_ACLK
  425. int rc;
  426. sqlite3_stmt *res = NULL;
  427. struct aclk_sync_cfg_t *wc;
  428. RRDHOST *host = find_host_by_node_id(node_id);
  429. if (unlikely(!host || !(wc = host->aclk_config))) {
  430. freez(config_hash);
  431. freez(node_id);
  432. return;
  433. }
  434. rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0);
  435. if (rc != SQLITE_OK) {
  436. error_report("Failed to prepare statement when trying to fetch an alarm hash configuration");
  437. return;
  438. }
  439. uuid_t hash_uuid;
  440. if (uuid_parse(config_hash, hash_uuid))
  441. return;
  442. rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC);
  443. if (unlikely(rc != SQLITE_OK))
  444. goto bind_fail;
  445. struct aclk_alarm_configuration alarm_config;
  446. struct provide_alarm_configuration p_alarm_config;
  447. p_alarm_config.cfg_hash = NULL;
  448. if (sqlite3_step_monitored(res) == SQLITE_ROW) {
  449. int param = 0;
  450. alarm_config.alarm = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  451. alarm_config.tmpl = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  452. alarm_config.on_chart = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  453. alarm_config.classification = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  454. alarm_config.type = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  455. alarm_config.component = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  456. alarm_config.os = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  457. alarm_config.hosts = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  458. alarm_config.plugin = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  459. alarm_config.module = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  460. alarm_config.charts = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  461. alarm_config.lookup = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  462. alarm_config.every = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  463. alarm_config.units = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  464. alarm_config.green = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  465. alarm_config.red = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  466. alarm_config.calculation_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  467. alarm_config.warning_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  468. alarm_config.critical_expr = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  469. alarm_config.recipient = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  470. alarm_config.exec = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  471. alarm_config.delay = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  472. alarm_config.repeat = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  473. alarm_config.info = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  474. alarm_config.options = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++);
  475. alarm_config.host_labels = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 25
  476. alarm_config.p_db_lookup_dimensions = NULL;
  477. alarm_config.p_db_lookup_method = NULL;
  478. alarm_config.p_db_lookup_options = NULL;
  479. alarm_config.p_db_lookup_after = 0;
  480. alarm_config.p_db_lookup_before = 0;
  481. if (sqlite3_column_bytes(res, 29) > 0) {
  482. alarm_config.p_db_lookup_dimensions = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 26
  483. alarm_config.p_db_lookup_method = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, param++); // Current param 27
  484. if (param != 28)
  485. netdata_log_error("aclk_push_alert_config_event: Unexpected param number %d", param);
  486. BUFFER *tmp_buf = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  487. buffer_data_options2string(tmp_buf, sqlite3_column_int(res, 28));
  488. alarm_config.p_db_lookup_options = strdupz((char *)buffer_tostring(tmp_buf));
  489. buffer_free(tmp_buf);
  490. alarm_config.p_db_lookup_after = sqlite3_column_int(res, 29);
  491. alarm_config.p_db_lookup_before = sqlite3_column_int(res, 30);
  492. }
  493. alarm_config.p_update_every = sqlite3_column_int(res, 31);
  494. alarm_config.chart_labels = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, 32);
  495. alarm_config.summary = SQLITE3_COLUMN_STRDUPZ_OR_NULL(res, 33);
  496. p_alarm_config.cfg_hash = strdupz((char *) config_hash);
  497. p_alarm_config.cfg = alarm_config;
  498. }
  499. if (likely(p_alarm_config.cfg_hash)) {
  500. nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
  501. aclk_send_provide_alarm_cfg(&p_alarm_config);
  502. freez(p_alarm_config.cfg_hash);
  503. destroy_aclk_alarm_configuration(&alarm_config);
  504. }
  505. else
  506. nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
  507. bind_fail:
  508. rc = sqlite3_finalize(res);
  509. if (unlikely(rc != SQLITE_OK))
  510. error_report("Failed to reset statement when pushing alarm config hash, rc = %d", rc);
  511. freez(config_hash);
  512. freez(node_id);
  513. #endif
  514. }
  515. // Start streaming alerts
  516. void aclk_start_alert_streaming(char *node_id, bool resets)
  517. {
  518. uuid_t node_uuid;
  519. if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
  520. return;
  521. struct aclk_sync_cfg_t *wc;
  522. RRDHOST *host = find_host_by_node_id(node_id);
  523. if (unlikely(!host || !(wc = host->aclk_config)))
  524. return;
  525. if (unlikely(!host->health.health_enabled)) {
  526. nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
  527. return;
  528. }
  529. if (resets) {
  530. nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
  531. sql_queue_existing_alerts_to_aclk(host);
  532. } else
  533. nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
  534. wc->alert_updates = 1;
  535. wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS;
  536. }
  537. #define SQL_QUEUE_REMOVE_ALERTS \
  538. "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
  539. "SELECT hld.unique_id alert_unique_id, UNIXEPOCH(), hld.unique_id alert_unique_id FROM health_log hl, health_log_detail hld " \
  540. "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \
  541. "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \
  542. "AND hl.config_hash_id NOT IN (SELECT hash_id FROM alert_hash WHERE warn IS NULL AND crit IS NULL) " \
  543. "AND hl.name || hl.chart NOT IN (select name || chart FROM health_log WHERE name = hl.name AND " \
  544. "chart = hl.chart AND alarm_id > hl.alarm_id AND host_id = hl.host_id) " \
  545. "ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING"
  546. void sql_process_queue_removed_alerts_to_aclk(char *node_id)
  547. {
  548. struct aclk_sync_cfg_t *wc;
  549. RRDHOST *host = find_host_by_node_id(node_id);
  550. freez(node_id);
  551. if (unlikely(!host || !(wc = host->aclk_config)))
  552. return;
  553. char sql[ACLK_SYNC_QUERY_SIZE * 2];
  554. sqlite3_stmt *res = NULL;
  555. snprintfz(sql, sizeof(sql) - 1, SQL_QUEUE_REMOVE_ALERTS, wc->uuid_str, wc->uuid_str);
  556. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  557. if (rc != SQLITE_OK) {
  558. error_report("Failed to prepare statement when trying to queue removed alerts.");
  559. return;
  560. }
  561. rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
  562. if (unlikely(rc != SQLITE_OK)) {
  563. error_report("Failed to bind host_id for when trying to queue remvoed alerts.");
  564. goto skip;
  565. }
  566. rc = execute_insert(res);
  567. if (likely(rc == SQLITE_DONE)) {
  568. nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host));
  569. rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
  570. wc->alert_queue_removed = 0;
  571. }
  572. skip:
  573. rc = sqlite3_finalize(res);
  574. if (unlikely(rc != SQLITE_OK))
  575. error_report("Failed to finalize statement to queue removed alerts, rc = %d", rc);
  576. }
  577. void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
  578. {
  579. if (unlikely(!host->aclk_config || !claimed() || !host->node_id))
  580. return;
  581. char node_id[UUID_STR_LEN];
  582. uuid_unparse_lower(*host->node_id, node_id);
  583. aclk_push_node_removed_alerts(node_id);
  584. }
  585. void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid)
  586. {
  587. uuid_t node_uuid;
  588. if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
  589. return;
  590. struct aclk_sync_cfg_t *wc;
  591. RRDHOST *host = find_host_by_node_id(node_id);
  592. if (unlikely(!host || !(wc = host->aclk_config))) {
  593. nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
  594. return;
  595. }
  596. nd_log(NDLS_ACCESS, NDLP_DEBUG,
  597. "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s",
  598. node_id,
  599. wc->host ? rrdhost_hostname(wc->host) : "N/A",
  600. snapshot_uuid);
  601. if (wc->alerts_snapshot_uuid && !strcmp(wc->alerts_snapshot_uuid,snapshot_uuid))
  602. return;
  603. wc->alerts_snapshot_uuid = strdupz(snapshot_uuid);
  604. aclk_push_node_alert_snapshot(node_id);
  605. }
  606. #ifdef ENABLE_ACLK
  607. void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
  608. {
  609. char *edit_command = ae->source ? health_edit_command_from_source(ae_source(ae)) : strdupz("UNKNOWN=0=UNKNOWN");
  610. char config_hash_id[UUID_STR_LEN];
  611. uuid_unparse_lower(ae->config_hash_id, config_hash_id);
  612. char transition_id[UUID_STR_LEN];
  613. uuid_unparse_lower(ae->transition_id, transition_id);
  614. alarm_log->chart = strdupz(ae_chart_id(ae));
  615. alarm_log->name = strdupz(ae_name(ae));
  616. alarm_log->when = ae->when;
  617. alarm_log->config_hash = strdupz((char *)config_hash_id);
  618. alarm_log->utc_offset = host->utc_offset;
  619. alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host));
  620. alarm_log->exec_path = ae->exec ? strdupz(ae_exec(ae)) : strdupz((char *)string2str(host->health.health_default_exec));
  621. alarm_log->conf_source = ae->source ? strdupz(ae_source(ae)) : strdupz((char *)"");
  622. alarm_log->command = strdupz((char *)edit_command);
  623. alarm_log->duration = (time_t)ae->duration;
  624. alarm_log->non_clear_duration = (time_t)ae->non_clear_duration;
  625. alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status);
  626. alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status);
  627. alarm_log->delay = ae->delay;
  628. alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp;
  629. alarm_log->last_repeat = (time_t)ae->last_repeat;
  630. alarm_log->silenced =
  631. ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp(ae_recipient(ae), "silent", 6))) ?
  632. 1 :
  633. 0;
  634. alarm_log->value_string = strdupz(ae_new_value_string(ae));
  635. alarm_log->old_value_string = strdupz(ae_old_value_string(ae));
  636. alarm_log->value = (!isnan(ae->new_value)) ? (NETDATA_DOUBLE)ae->new_value : 0;
  637. alarm_log->old_value = (!isnan(ae->old_value)) ? (NETDATA_DOUBLE)ae->old_value : 0;
  638. alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  639. alarm_log->rendered_info = strdupz(ae_info(ae));
  640. alarm_log->chart_context = strdupz(ae_chart_context(ae));
  641. alarm_log->chart_name = strdupz(ae_chart_name(ae));
  642. alarm_log->transition_id = strdupz((char *)transition_id);
  643. alarm_log->event_id = (uint64_t) ae->alarm_event_id;
  644. alarm_log->summary = strdupz(ae_summary(ae));
  645. freez(edit_command);
  646. }
  647. #endif
  648. #ifdef ENABLE_ACLK
  649. static bool have_recent_alarm(RRDHOST *host, int64_t alarm_id, int64_t mark)
  650. {
  651. ALARM_ENTRY *ae = host->health_log.alarms;
  652. while (ae) {
  653. if (ae->alarm_id == alarm_id && ae->unique_id >mark &&
  654. (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL))
  655. return true;
  656. ae = ae->next;
  657. }
  658. return false;
  659. }
  660. #endif
  661. #define ALARM_EVENTS_PER_CHUNK 1000
  662. void aclk_push_alert_snapshot_event(char *node_id __maybe_unused)
  663. {
  664. #ifdef ENABLE_ACLK
  665. RRDHOST *host = find_host_by_node_id(node_id);
  666. if (unlikely(!host)) {
  667. nd_log(NDLS_ACCESS, NDLP_WARNING, "AC [%s (N/A)]: Node id not found", node_id);
  668. freez(node_id);
  669. return;
  670. }
  671. freez(node_id);
  672. struct aclk_sync_cfg_t *wc = host->aclk_config;
  673. // we perhaps we don't need this for snapshots
  674. if (unlikely(!wc->alert_updates)) {
  675. nd_log(NDLS_ACCESS, NDLP_NOTICE,
  676. "ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.",
  677. wc->node_id,
  678. wc->host ? rrdhost_hostname(wc->host) : "N/A");
  679. return;
  680. }
  681. if (unlikely(!wc->alerts_snapshot_uuid))
  682. return;
  683. char *claim_id = get_agent_claimid();
  684. if (unlikely(!claim_id))
  685. return;
  686. nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_uuid %s", wc->node_id, rrdhost_hostname(wc->host), wc->alerts_snapshot_uuid);
  687. uint32_t cnt = 0;
  688. rw_spinlock_read_lock(&host->health_log.spinlock);
  689. ALARM_ENTRY *ae = host->health_log.alarms;
  690. for (; ae; ae = ae->next) {
  691. if (likely(ae->updated_by_id))
  692. continue;
  693. if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  694. continue;
  695. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  696. continue;
  697. if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid))
  698. continue;
  699. cnt++;
  700. }
  701. if (cnt) {
  702. uint32_t chunks;
  703. chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
  704. ae = host->health_log.alarms;
  705. cnt = 0;
  706. struct alarm_snapshot alarm_snap;
  707. alarm_snap.node_id = wc->node_id;
  708. alarm_snap.claim_id = claim_id;
  709. alarm_snap.snapshot_uuid = wc->alerts_snapshot_uuid;
  710. alarm_snap.chunks = chunks;
  711. alarm_snap.chunk = 1;
  712. alarm_snapshot_proto_ptr_t snapshot_proto = NULL;
  713. for (; ae; ae = ae->next) {
  714. if (likely(ae->updated_by_id) || unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  715. continue;
  716. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  717. continue;
  718. if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid))
  719. continue;
  720. cnt++;
  721. struct alarm_log_entry alarm_log;
  722. alarm_log.node_id = wc->node_id;
  723. alarm_log.claim_id = claim_id;
  724. if (!snapshot_proto)
  725. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  726. health_alarm_entry2proto_nolock(&alarm_log, ae, host);
  727. add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
  728. if (cnt == ALARM_EVENTS_PER_CHUNK) {
  729. aclk_send_alarm_snapshot(snapshot_proto);
  730. cnt = 0;
  731. if (alarm_snap.chunk < chunks) {
  732. alarm_snap.chunk++;
  733. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  734. }
  735. }
  736. destroy_alarm_log_entry(&alarm_log);
  737. }
  738. if (cnt)
  739. aclk_send_alarm_snapshot(snapshot_proto);
  740. }
  741. rw_spinlock_read_unlock(&host->health_log.spinlock);
  742. wc->alerts_snapshot_uuid = NULL;
  743. freez(claim_id);
  744. #endif
  745. }
  746. #define SQL_DELETE_ALERT_ENTRIES "DELETE FROM aclk_alert_%s WHERE date_created < UNIXEPOCH() - @period"
  747. void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
  748. {
  749. struct aclk_sync_cfg_t *wc = host->aclk_config;
  750. if (unlikely(!wc))
  751. return;
  752. char sql[ACLK_SYNC_QUERY_SIZE];
  753. snprintfz(sql, sizeof(sql) - 1, SQL_DELETE_ALERT_ENTRIES, wc->uuid_str);
  754. sqlite3_stmt *res = NULL;
  755. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  756. if (rc != SQLITE_OK) {
  757. error_report("Failed to prepare statement for cleaning stale ACLK alert entries.");
  758. return;
  759. }
  760. rc = sqlite3_bind_int64(res, 1, MAX_REMOVED_PERIOD);
  761. if (unlikely(rc != SQLITE_OK)) {
  762. error_report("Failed to bind MAX_REMOVED_PERIOD parameter.");
  763. goto skip;
  764. }
  765. rc = sqlite3_step_monitored(res);
  766. if (rc != SQLITE_DONE)
  767. error_report("Failed to execute DELETE query for cleaning stale ACLK alert entries.");
  768. skip:
  769. rc = sqlite3_finalize(res);
  770. if (unlikely(rc != SQLITE_OK))
  771. error_report("Failed to finalize statement for cleaning stale ACLK alert entries.");
  772. }
  773. #define SQL_GET_MIN_MAX_ALERT_SEQ "SELECT MIN(sequence_id), MAX(sequence_id), " \
  774. "(SELECT MAX(sequence_id) FROM aclk_alert_%s WHERE date_submitted IS NOT NULL) " \
  775. "FROM aclk_alert_%s WHERE date_submitted IS NULL"
  776. int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
  777. {
  778. struct aclk_sync_cfg_t *wc = host->aclk_config;
  779. if (!wc)
  780. return 1;
  781. proto_alert_status->alert_updates = wc->alert_updates;
  782. char sql[ACLK_SYNC_QUERY_SIZE];
  783. sqlite3_stmt *res = NULL;
  784. snprintfz(sql, sizeof(sql) - 1, SQL_GET_MIN_MAX_ALERT_SEQ, wc->uuid_str, wc->uuid_str);
  785. int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  786. if (rc != SQLITE_OK) {
  787. error_report("Failed to prepare statement to get alert log status from the database.");
  788. return 1;
  789. }
  790. while (sqlite3_step_monitored(res) == SQLITE_ROW) {
  791. proto_alert_status->pending_min_sequence_id =
  792. sqlite3_column_bytes(res, 0) > 0 ? (uint64_t)sqlite3_column_int64(res, 0) : 0;
  793. proto_alert_status->pending_max_sequence_id =
  794. sqlite3_column_bytes(res, 1) > 0 ? (uint64_t)sqlite3_column_int64(res, 1) : 0;
  795. proto_alert_status->last_submitted_sequence_id =
  796. sqlite3_column_bytes(res, 2) > 0 ? (uint64_t)sqlite3_column_int64(res, 2) : 0;
  797. }
  798. rc = sqlite3_finalize(res);
  799. if (unlikely(rc != SQLITE_OK))
  800. error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc);
  801. return 0;
  802. }
  803. void aclk_send_alarm_checkpoint(char *node_id, char *claim_id __maybe_unused)
  804. {
  805. if (unlikely(!node_id))
  806. return;
  807. struct aclk_sync_cfg_t *wc;
  808. RRDHOST *host = find_host_by_node_id(node_id);
  809. if (unlikely(!host || !(wc = host->aclk_config)))
  810. nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", node_id);
  811. else {
  812. nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK REQ [%s (%s)]: ALERTS CHECKPOINT REQUEST RECEIVED", node_id, rrdhost_hostname(host));
  813. wc->alert_checkpoint_req = SEND_CHECKPOINT_AFTER_HEALTH_LOOPS;
  814. }
  815. }
  816. typedef struct active_alerts {
  817. char *name;
  818. char *chart;
  819. RRDCALC_STATUS status;
  820. } active_alerts_t;
  821. static inline int compare_active_alerts(const void *a, const void *b)
  822. {
  823. active_alerts_t *active_alerts_a = (active_alerts_t *)a;
  824. active_alerts_t *active_alerts_b = (active_alerts_t *)b;
  825. if (!(strcmp(active_alerts_a->name, active_alerts_b->name))) {
  826. return strcmp(active_alerts_a->chart, active_alerts_b->chart);
  827. } else
  828. return strcmp(active_alerts_a->name, active_alerts_b->name);
  829. }
  830. #define BATCH_ALLOCATED 10
  831. void aclk_push_alarm_checkpoint(RRDHOST *host __maybe_unused)
  832. {
  833. #ifdef ENABLE_ACLK
  834. struct aclk_sync_cfg_t *wc = host->aclk_config;
  835. if (unlikely(!wc)) {
  836. nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT REQUEST RECEIVED FOR INVALID NODE", rrdhost_hostname(host));
  837. return;
  838. }
  839. if (rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS)) {
  840. //postpone checkpoint send
  841. wc->alert_checkpoint_req += 3;
  842. nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK REQ [%s (N/A)]: ALERTS CHECKPOINT POSTPONED", rrdhost_hostname(host));
  843. return;
  844. }
  845. RRDCALC *rc;
  846. uint32_t cnt = 0;
  847. size_t len = 0;
  848. active_alerts_t *active_alerts = callocz(BATCH_ALLOCATED, sizeof(active_alerts_t));
  849. foreach_rrdcalc_in_rrdhost_read(host, rc) {
  850. if(unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec))
  851. continue;
  852. if (rc->status == RRDCALC_STATUS_WARNING ||
  853. rc->status == RRDCALC_STATUS_CRITICAL) {
  854. if (cnt && !(cnt % BATCH_ALLOCATED)) {
  855. active_alerts = reallocz(active_alerts, (BATCH_ALLOCATED * ((cnt / BATCH_ALLOCATED) + 1)) * sizeof(active_alerts_t));
  856. }
  857. active_alerts[cnt].name = (char *)rrdcalc_name(rc);
  858. len += string_strlen(rc->name);
  859. active_alerts[cnt].chart = (char *)rrdcalc_chart_name(rc);
  860. len += string_strlen(rc->chart);
  861. active_alerts[cnt].status = rc->status;
  862. len++;
  863. cnt++;
  864. }
  865. }
  866. foreach_rrdcalc_in_rrdhost_done(rc);
  867. BUFFER *alarms_to_hash;
  868. if (cnt) {
  869. qsort(active_alerts, cnt, sizeof(active_alerts_t), compare_active_alerts);
  870. alarms_to_hash = buffer_create(len, NULL);
  871. for (uint32_t i = 0; i < cnt; i++) {
  872. buffer_strcat(alarms_to_hash, active_alerts[i].name);
  873. buffer_strcat(alarms_to_hash, active_alerts[i].chart);
  874. if (active_alerts[i].status == RRDCALC_STATUS_WARNING)
  875. buffer_fast_strcat(alarms_to_hash, "W", 1);
  876. else if (active_alerts[i].status == RRDCALC_STATUS_CRITICAL)
  877. buffer_fast_strcat(alarms_to_hash, "C", 1);
  878. }
  879. } else {
  880. alarms_to_hash = buffer_create(1, NULL);
  881. buffer_strcat(alarms_to_hash, "");
  882. len = 0;
  883. }
  884. freez(active_alerts);
  885. char hash[SHA256_DIGEST_LENGTH + 1];
  886. if (hash256_string((const unsigned char *)buffer_tostring(alarms_to_hash), len, hash)) {
  887. hash[SHA256_DIGEST_LENGTH] = 0;
  888. struct alarm_checkpoint alarm_checkpoint;
  889. char *claim_id = get_agent_claimid();
  890. alarm_checkpoint.claim_id = claim_id;
  891. alarm_checkpoint.node_id = wc->node_id;
  892. alarm_checkpoint.checksum = (char *)hash;
  893. aclk_send_provide_alarm_checkpoint(&alarm_checkpoint);
  894. freez(claim_id);
  895. nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: ALERTS CHECKPOINT SENT", wc->node_id, rrdhost_hostname(host));
  896. } else
  897. nd_log(NDLS_ACCESS, NDLP_ERR, "ACLK RES [%s (%s)]: FAILED TO CREATE ALERTS CHECKPOINT HASH", wc->node_id, rrdhost_hostname(host));
  898. wc->alert_checkpoint_req = 0;
  899. buffer_free(alarms_to_hash);
  900. #endif
  901. }