sqlite_aclk_alert.c 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_functions.h"
  3. #include "sqlite_aclk_alert.h"
  4. #ifdef ENABLE_ACLK
  5. #include "../../aclk/aclk_alarm_api.h"
  6. #include "../../aclk/aclk.h"
  7. #endif
  8. time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after_unique_id, char *uuid_str) {
  9. sqlite3_stmt *res = NULL;
  10. int rc = 0;
  11. time_t when = 0;
  12. char sql[ACLK_SYNC_QUERY_SIZE];
  13. snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select when_key from health_log_%s where alarm_id = %u " \
  14. "and unique_id > %u and unique_id < %u " \
  15. "and new_status = -2;", uuid_str, alarm_id, after_unique_id, before_unique_id);
  16. rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  17. if (rc != SQLITE_OK) {
  18. error_report("Failed to prepare statement when trying to find removed gap.");
  19. return 0;
  20. }
  21. rc = sqlite3_step_monitored(res);
  22. if (likely(rc == SQLITE_ROW)) {
  23. when = (time_t) sqlite3_column_int64(res, 0);
  24. }
  25. rc = sqlite3_finalize(res);
  26. if (unlikely(rc != SQLITE_OK))
  27. error_report("Failed to finalize statement when trying to find removed gap, rc = %d", rc);
  28. return when;
  29. }
  30. void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) {
  31. char sql[ACLK_SYNC_QUERY_SIZE];
  32. snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u where filtered_alert_unique_id = %u", uuid_str, ae->unique_id, unique_id);
  33. sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL);
  34. ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
  35. }
  36. static inline bool is_event_from_alert_variable_config(uint32_t unique_id, char *uuid_str) {
  37. sqlite3_stmt *res = NULL;
  38. int rc = 0;
  39. bool ret = false;
  40. char sql[ACLK_SYNC_QUERY_SIZE];
  41. snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.unique_id from health_log_%s hl, alert_hash ah where hl.unique_id = %u " \
  42. "and hl.config_hash_id = ah.hash_id " \
  43. "and ah.warn is null and ah.crit is null;", uuid_str, unique_id);
  44. rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  45. if (rc != SQLITE_OK) {
  46. error_report("Failed to prepare statement when trying to check for alert variables.");
  47. return false;
  48. }
  49. rc = sqlite3_step_monitored(res);
  50. if (likely(rc == SQLITE_ROW)) {
  51. ret = true;
  52. }
  53. rc = sqlite3_finalize(res);
  54. if (unlikely(rc != SQLITE_OK))
  55. error_report("Failed to finalize statement when trying to check for alert variables, rc = %d", rc);
  56. return ret;
  57. }
  58. #define MAX_REMOVED_PERIOD 86400
  59. //decide if some events should be sent or not
  60. int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
  61. {
  62. sqlite3_stmt *res = NULL;
  63. char uuid_str[GUID_LEN + 1];
  64. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  65. int send = 1, rc = 0;
  66. if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) {
  67. return 0;
  68. }
  69. if (unlikely(uuid_is_null(ae->config_hash_id)))
  70. return 0;
  71. if (is_event_from_alert_variable_config(ae->unique_id, uuid_str))
  72. return 0;
  73. char sql[ACLK_SYNC_QUERY_SIZE];
  74. uuid_t config_hash_id;
  75. RRDCALC_STATUS status;
  76. uint32_t unique_id;
  77. //get the previous sent event of this alarm_id
  78. //base the search on the last filtered event
  79. snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \
  80. where hl.unique_id = aa.filtered_alert_unique_id \
  81. and hl.alarm_id = %u \
  82. order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id);
  83. rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
  84. if (rc != SQLITE_OK) {
  85. error_report("Failed to prepare statement when trying to filter alert events.");
  86. send = 1;
  87. return send;
  88. }
  89. rc = sqlite3_step_monitored(res);
  90. if (likely(rc == SQLITE_ROW)) {
  91. status = (RRDCALC_STATUS) sqlite3_column_int(res, 0);
  92. if (sqlite3_column_type(res, 1) != SQLITE_NULL)
  93. uuid_copy(config_hash_id, *((uuid_t *) sqlite3_column_blob(res, 1)));
  94. unique_id = (uint32_t) sqlite3_column_int64(res, 2);
  95. } else {
  96. send = 1;
  97. goto done;
  98. }
  99. if (ae->new_status != (RRDCALC_STATUS)status) {
  100. send = 1;
  101. goto done;
  102. }
  103. if (uuid_compare(ae->config_hash_id, config_hash_id)) {
  104. send = 1;
  105. goto done;
  106. }
  107. //same status, same config
  108. if (ae->new_status == RRDCALC_STATUS_CLEAR || ae->new_status == RRDCALC_STATUS_UNDEFINED) {
  109. send = 0;
  110. update_filtered(ae, unique_id, uuid_str);
  111. goto done;
  112. }
  113. //detect a long off period of the agent, TODO make global
  114. if (ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) {
  115. time_t when = removed_when(ae->alarm_id, ae->unique_id, unique_id, uuid_str);
  116. if (when && (when + (time_t)MAX_REMOVED_PERIOD) < ae->when) {
  117. send = 1;
  118. goto done;
  119. } else {
  120. send = 0;
  121. update_filtered(ae, unique_id, uuid_str);
  122. goto done;
  123. }
  124. }
  125. done:
  126. rc = sqlite3_finalize(res);
  127. if (unlikely(rc != SQLITE_OK))
  128. error_report("Failed to finalize statement when trying to filter alert events, rc = %d", rc);
  129. return send;
  130. }
  131. // will replace call to aclk_update_alarm in health/health_log.c
  132. // and handle both cases
  133. int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter)
  134. {
  135. if(!service_running(SERVICE_ACLK))
  136. return 0;
  137. if (!claimed())
  138. return 0;
  139. if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) {
  140. return 0;
  141. }
  142. CHECK_SQLITE_CONNECTION(db_meta);
  143. if (!skip_filter) {
  144. if (!should_send_to_cloud(host, ae)) {
  145. return 0;
  146. }
  147. }
  148. int rc = 0;
  149. sqlite3_stmt *res_alert = NULL;
  150. char uuid_str[GUID_LEN + 1];
  151. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  152. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  153. buffer_sprintf(
  154. sql,
  155. "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) "
  156. "VALUES (@alert_unique_id, unixepoch(), @alert_unique_id) on conflict (alert_unique_id) do nothing; ",
  157. uuid_str);
  158. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res_alert, 0);
  159. if (unlikely(rc != SQLITE_OK)) {
  160. error_report("Failed to prepare statement to store alert event");
  161. buffer_free(sql);
  162. return 1;
  163. }
  164. rc = sqlite3_bind_int(res_alert, 1, ae->unique_id);
  165. if (unlikely(rc != SQLITE_OK))
  166. goto bind_fail;
  167. rc = execute_insert(res_alert);
  168. if (unlikely(rc != SQLITE_DONE)) {
  169. error_report("Failed to store alert event %u, rc = %d", ae->unique_id, rc);
  170. goto bind_fail;
  171. }
  172. ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
  173. struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  174. if (wc) {
  175. wc->pause_alert_updates = 0;
  176. }
  177. bind_fail:
  178. if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK))
  179. error_report("Failed to reset statement in store alert event, rc = %d", rc);
  180. buffer_free(sql);
  181. return 0;
  182. }
  183. int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
  184. {
  185. #ifdef ENABLE_ACLK
  186. switch(status) {
  187. case RRDCALC_STATUS_REMOVED:
  188. return ALARM_STATUS_REMOVED;
  189. case RRDCALC_STATUS_UNDEFINED:
  190. return ALARM_STATUS_NOT_A_NUMBER;
  191. case RRDCALC_STATUS_CLEAR:
  192. return ALARM_STATUS_CLEAR;
  193. case RRDCALC_STATUS_WARNING:
  194. return ALARM_STATUS_WARNING;
  195. case RRDCALC_STATUS_CRITICAL:
  196. return ALARM_STATUS_CRITICAL;
  197. default:
  198. return ALARM_STATUS_UNKNOWN;
  199. }
  200. #else
  201. UNUSED(status);
  202. return 1;
  203. #endif
  204. }
  205. void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  206. {
  207. #ifndef ENABLE_ACLK
  208. UNUSED(wc);
  209. UNUSED(cmd);
  210. #else
  211. int rc;
  212. if (unlikely(!wc->alert_updates)) {
  213. log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
  214. return;
  215. }
  216. char *claim_id = get_agent_claimid();
  217. if (unlikely(!claim_id))
  218. return;
  219. if (unlikely(!wc->host)) {
  220. freez(claim_id);
  221. return;
  222. }
  223. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  224. if (wc->alerts_start_seq_id != 0) {
  225. buffer_sprintf(
  226. sql,
  227. "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64
  228. "; UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id < %"PRIu64
  229. " and date_cloud_ack is null "
  230. "; UPDATE aclk_alert_%s SET date_submitted = unixepoch() WHERE sequence_id < %"PRIu64
  231. " and date_submitted is null",
  232. wc->uuid_str,
  233. wc->alerts_start_seq_id,
  234. wc->uuid_str,
  235. wc->alerts_start_seq_id,
  236. wc->uuid_str,
  237. wc->alerts_start_seq_id);
  238. db_execute(buffer_tostring(sql));
  239. buffer_reset(sql);
  240. wc->alerts_start_seq_id = 0;
  241. }
  242. int limit = cmd.count > 0 ? cmd.count : 1;
  243. sqlite3_stmt *res = NULL;
  244. buffer_sprintf(sql, "select aa.sequence_id, hl.unique_id, hl.alarm_id, hl.config_hash_id, hl.updated_by_id, hl.when_key, " \
  245. " hl.duration, hl.non_clear_duration, hl.flags, hl.exec_run_timestamp, hl.delay_up_to_timestamp, hl.name, " \
  246. " hl.chart, hl.family, hl.exec, hl.recipient, hl.source, hl.units, hl.info, hl.exec_code, hl.new_status, " \
  247. " hl.old_status, hl.delay, hl.new_value, hl.old_value, hl.last_repeat, hl.chart_context " \
  248. " from health_log_%s hl, aclk_alert_%s aa " \
  249. " where hl.unique_id = aa.alert_unique_id and aa.date_submitted is null " \
  250. " order by aa.sequence_id asc limit %d;", wc->uuid_str, wc->uuid_str, limit);
  251. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  252. if (rc != SQLITE_OK) {
  253. // Try to create tables
  254. if (wc->host)
  255. sql_create_health_log_table(wc->host);
  256. BUFFER *sql_fix = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  257. buffer_sprintf(sql_fix, TABLE_ACLK_ALERT, wc->uuid_str);
  258. db_execute(buffer_tostring(sql_fix));
  259. buffer_flush(sql_fix);
  260. buffer_sprintf(sql_fix, INDEX_ACLK_ALERT, wc->uuid_str, wc->uuid_str);
  261. db_execute(buffer_tostring(sql_fix));
  262. buffer_free(sql_fix);
  263. // Try again
  264. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  265. if (rc != SQLITE_OK) {
  266. error_report("Failed to prepare statement when trying to send an alert update via ACLK");
  267. buffer_free(sql);
  268. freez(claim_id);
  269. return;
  270. }
  271. }
  272. char uuid_str[GUID_LEN + 1];
  273. uint64_t first_sequence_id = 0;
  274. uint64_t last_sequence_id = 0;
  275. static __thread uint64_t log_first_sequence_id = 0;
  276. static __thread uint64_t log_last_sequence_id = 0;
  277. while (sqlite3_step_monitored(res) == SQLITE_ROW) {
  278. struct alarm_log_entry alarm_log;
  279. char old_value_string[100 + 1];
  280. char new_value_string[100 + 1];
  281. alarm_log.node_id = wc->node_id;
  282. alarm_log.claim_id = claim_id;
  283. alarm_log.chart = strdupz((char *)sqlite3_column_text(res, 12));
  284. alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11));
  285. alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
  286. alarm_log.batch_id = wc->alerts_batch_id;
  287. alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  288. alarm_log.when = (time_t) sqlite3_column_int64(res, 5);
  289. uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), uuid_str);
  290. alarm_log.config_hash = strdupz((char *)uuid_str);
  291. alarm_log.utc_offset = wc->host->utc_offset;
  292. alarm_log.timezone = strdupz(rrdhost_abbrev_timezone(wc->host));
  293. alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) :
  294. strdupz((char *)string2str(wc->host->health.health_default_exec));
  295. alarm_log.conf_source = strdupz((char *)sqlite3_column_text(res, 16));
  296. char *edit_command = sqlite3_column_bytes(res, 16) > 0 ?
  297. health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) :
  298. strdupz("UNKNOWN=0=UNKNOWN");
  299. alarm_log.command = strdupz(edit_command);
  300. alarm_log.duration = (time_t) sqlite3_column_int64(res, 6);
  301. alarm_log.non_clear_duration = (time_t) sqlite3_column_int64(res, 7);
  302. alarm_log.status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 20));
  303. alarm_log.old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 21));
  304. alarm_log.delay = (int) sqlite3_column_int(res, 22);
  305. alarm_log.delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 10);
  306. alarm_log.last_repeat = (time_t) sqlite3_column_int64(res, 25);
  307. alarm_log.silenced = ((sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) ||
  308. (sqlite3_column_type(res, 15) != SQLITE_NULL &&
  309. !strncmp((char *)sqlite3_column_text(res, 15), "silent", 6))) ?
  310. 1 :
  311. 0;
  312. alarm_log.value_string =
  313. sqlite3_column_type(res, 23) == SQLITE_NULL ?
  314. strdupz((char *)"-") :
  315. strdupz((char *)format_value_and_unit(
  316. new_value_string, 100, sqlite3_column_double(res, 23), (char *)sqlite3_column_text(res, 17), -1));
  317. alarm_log.old_value_string =
  318. sqlite3_column_type(res, 24) == SQLITE_NULL ?
  319. strdupz((char *)"-") :
  320. strdupz((char *)format_value_and_unit(
  321. old_value_string, 100, sqlite3_column_double(res, 24), (char *)sqlite3_column_text(res, 17), -1));
  322. alarm_log.value = (NETDATA_DOUBLE) sqlite3_column_double(res, 23);
  323. alarm_log.old_value = (NETDATA_DOUBLE) sqlite3_column_double(res, 24);
  324. alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  325. alarm_log.rendered_info = sqlite3_column_type(res, 18) == SQLITE_NULL ?
  326. strdupz((char *)"") :
  327. strdupz((char *)sqlite3_column_text(res, 18));
  328. alarm_log.chart_context = sqlite3_column_type(res, 26) == SQLITE_NULL ?
  329. strdupz((char *)"") :
  330. strdupz((char *)sqlite3_column_text(res, 26));
  331. aclk_send_alarm_log_entry(&alarm_log);
  332. if (first_sequence_id == 0)
  333. first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  334. if (log_first_sequence_id == 0)
  335. log_first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  336. last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  337. log_last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  338. destroy_alarm_log_entry(&alarm_log);
  339. freez(edit_command);
  340. }
  341. if (first_sequence_id) {
  342. buffer_flush(sql);
  343. buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
  344. "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
  345. wc->uuid_str, first_sequence_id, last_sequence_id);
  346. db_execute(buffer_tostring(sql));
  347. } else {
  348. if (log_first_sequence_id)
  349. log_access(
  350. "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64,
  351. wc->node_id,
  352. wc->host ? rrdhost_hostname(wc->host) : "N/A",
  353. log_first_sequence_id,
  354. log_last_sequence_id,
  355. wc->alerts_batch_id);
  356. log_first_sequence_id = 0;
  357. log_last_sequence_id = 0;
  358. wc->pause_alert_updates = 1;
  359. }
  360. rc = sqlite3_finalize(res);
  361. if (unlikely(rc != SQLITE_OK))
  362. error_report("Failed to finalize statement to send alert entries from the database, rc = %d", rc);
  363. freez(claim_id);
  364. buffer_free(sql);
  365. #endif
  366. return;
  367. }
  368. void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
  369. {
  370. char uuid_str[GUID_LEN + 1];
  371. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  372. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  373. buffer_sprintf(sql,"delete from aclk_alert_%s; " \
  374. "insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
  375. "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \
  376. "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \
  377. "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str, uuid_str);
  378. db_execute(buffer_tostring(sql));
  379. buffer_free(sql);
  380. struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  381. if (wc) {
  382. wc->pause_alert_updates = 0;
  383. }
  384. }
  385. void aclk_send_alarm_health_log(char *node_id)
  386. {
  387. if (unlikely(!node_id))
  388. return;
  389. struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id);
  390. if (likely(!wc)) {
  391. RRDHOST *host = find_host_by_node_id(node_id);
  392. if (likely(host))
  393. wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  394. }
  395. if (!wc) {
  396. log_access("ACLK REQ [%s (N/A)]: HEALTH LOG REQUEST RECEIVED FOR INVALID NODE", node_id);
  397. return;
  398. }
  399. log_access("ACLK REQ [%s (%s)]: HEALTH LOG REQUEST RECEIVED", node_id, wc->hostname ? wc->hostname : "N/A");
  400. struct aclk_database_cmd cmd;
  401. memset(&cmd, 0, sizeof(cmd));
  402. cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG;
  403. aclk_database_enq_cmd(wc, &cmd);
  404. return;
  405. }
  406. void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  407. {
  408. UNUSED(cmd);
  409. #ifndef ENABLE_ACLK
  410. UNUSED(wc);
  411. #else
  412. int rc;
  413. char *claim_id = get_agent_claimid();
  414. if (unlikely(!claim_id))
  415. return;
  416. RRDHOST *host = wc->host;
  417. if (unlikely(!host)) {
  418. host = find_host_by_node_id(wc->node_id);
  419. if (unlikely(!host)) {
  420. log_access(
  421. "AC [%s (N/A)]: ACLK synchronization thread for %s is not yet linked to HOST.",
  422. wc->node_id,
  423. wc->host_guid);
  424. freez(claim_id);
  425. return;
  426. }
  427. }
  428. uint64_t first_sequence = 0;
  429. uint64_t last_sequence = 0;
  430. struct timeval first_timestamp;
  431. struct timeval last_timestamp;
  432. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  433. sqlite3_stmt *res = NULL;
  434. //TODO: make this better: include info from health log too
  435. buffer_sprintf(sql, "SELECT MIN(sequence_id), MIN(date_created), MAX(sequence_id), MAX(date_created) " \
  436. "FROM aclk_alert_%s;", wc->uuid_str);
  437. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  438. if (rc != SQLITE_OK) {
  439. error_report("Failed to prepare statement to get health log statistics from the database");
  440. buffer_free(sql);
  441. freez(claim_id);
  442. return;
  443. }
  444. first_timestamp.tv_sec = 0;
  445. first_timestamp.tv_usec = 0;
  446. last_timestamp.tv_sec = 0;
  447. last_timestamp.tv_usec = 0;
  448. while (sqlite3_step_monitored(res) == SQLITE_ROW) {
  449. first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
  450. if (sqlite3_column_bytes(res, 1) > 0) {
  451. first_timestamp.tv_sec = sqlite3_column_int64(res, 1);
  452. }
  453. last_sequence = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
  454. if (sqlite3_column_bytes(res, 3) > 0) {
  455. last_timestamp.tv_sec = sqlite3_column_int64(res, 3);
  456. }
  457. }
  458. struct alarm_log_entries log_entries;
  459. log_entries.first_seq_id = first_sequence;
  460. log_entries.first_when = first_timestamp;
  461. log_entries.last_seq_id = last_sequence;
  462. log_entries.last_when = last_timestamp;
  463. struct alarm_log_health alarm_log;
  464. alarm_log.claim_id = claim_id;
  465. alarm_log.node_id = wc->node_id;
  466. alarm_log.log_entries = log_entries;
  467. alarm_log.status = wc->alert_updates == 0 ? 2 : 1;
  468. alarm_log.enabled = (int)host->health.health_enabled;
  469. wc->alert_sequence_id = last_sequence;
  470. aclk_send_alarm_log_health(&alarm_log);
  471. log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %"PRIu64" to %"PRIu64, wc->node_id, wc->hostname ? wc->hostname : "N/A", first_sequence, last_sequence);
  472. rc = sqlite3_finalize(res);
  473. if (unlikely(rc != SQLITE_OK))
  474. error_report("Failed to reset statement to get health log statistics from the database, rc = %d", rc);
  475. freez(claim_id);
  476. buffer_free(sql);
  477. aclk_alert_reloaded = 1;
  478. #endif
  479. return;
  480. }
  481. void aclk_send_alarm_configuration(char *config_hash)
  482. {
  483. if (unlikely(!config_hash))
  484. return;
  485. struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) localhost->dbsync_worker;
  486. if (unlikely(!wc)) {
  487. return;
  488. }
  489. log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
  490. struct aclk_database_cmd cmd;
  491. memset(&cmd, 0, sizeof(cmd));
  492. cmd.opcode = ACLK_DATABASE_PUSH_ALERT_CONFIG;
  493. cmd.data_param = (void *) strdupz(config_hash);
  494. cmd.completion = NULL;
  495. aclk_database_enq_cmd(wc, &cmd);
  496. return;
  497. }
  498. #define SQL_SELECT_ALERT_CONFIG "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \
  499. "module, charts, families, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \
  500. "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \
  501. "p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;"
  502. int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  503. {
  504. UNUSED(wc);
  505. #ifndef ENABLE_ACLK
  506. UNUSED(cmd);
  507. #else
  508. int rc = 0;
  509. CHECK_SQLITE_CONNECTION(db_meta);
  510. sqlite3_stmt *res = NULL;
  511. char *config_hash = (char *) cmd.data_param;
  512. rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0);
  513. if (rc != SQLITE_OK) {
  514. error_report("Failed to prepare statement when trying to fetch an alarm hash configuration");
  515. return 1;
  516. }
  517. uuid_t hash_uuid;
  518. if (uuid_parse(config_hash, hash_uuid))
  519. return 1;
  520. rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC);
  521. if (unlikely(rc != SQLITE_OK))
  522. goto bind_fail;
  523. struct aclk_alarm_configuration alarm_config;
  524. struct provide_alarm_configuration p_alarm_config;
  525. p_alarm_config.cfg_hash = NULL;
  526. if (sqlite3_step_monitored(res) == SQLITE_ROW) {
  527. alarm_config.alarm = sqlite3_column_bytes(res, 0) > 0 ? strdupz((char *)sqlite3_column_text(res, 0)) : NULL;
  528. alarm_config.tmpl = sqlite3_column_bytes(res, 1) > 0 ? strdupz((char *)sqlite3_column_text(res, 1)) : NULL;
  529. alarm_config.on_chart = sqlite3_column_bytes(res, 2) > 0 ? strdupz((char *)sqlite3_column_text(res, 2)) : NULL;
  530. alarm_config.classification = sqlite3_column_bytes(res, 3) > 0 ? strdupz((char *)sqlite3_column_text(res, 3)) : NULL;
  531. alarm_config.type = sqlite3_column_bytes(res, 4) > 0 ? strdupz((char *)sqlite3_column_text(res, 4)) : NULL;
  532. alarm_config.component = sqlite3_column_bytes(res, 5) > 0 ? strdupz((char *)sqlite3_column_text(res, 5)) : NULL;
  533. alarm_config.os = sqlite3_column_bytes(res, 6) > 0 ? strdupz((char *)sqlite3_column_text(res, 6)) : NULL;
  534. alarm_config.hosts = sqlite3_column_bytes(res, 7) > 0 ? strdupz((char *)sqlite3_column_text(res, 7)) : NULL;
  535. alarm_config.plugin = sqlite3_column_bytes(res, 8) > 0 ? strdupz((char *)sqlite3_column_text(res, 8)) : NULL;
  536. alarm_config.module = sqlite3_column_bytes(res, 9) > 0 ? strdupz((char *)sqlite3_column_text(res, 9)) : NULL;
  537. alarm_config.charts = sqlite3_column_bytes(res, 10) > 0 ? strdupz((char *)sqlite3_column_text(res, 10)) : NULL;
  538. alarm_config.families = sqlite3_column_bytes(res, 11) > 0 ? strdupz((char *)sqlite3_column_text(res, 11)) : NULL;
  539. alarm_config.lookup = sqlite3_column_bytes(res, 12) > 0 ? strdupz((char *)sqlite3_column_text(res, 12)) : NULL;
  540. alarm_config.every = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
  541. alarm_config.units = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : NULL;
  542. alarm_config.green = sqlite3_column_bytes(res, 15) > 0 ? strdupz((char *)sqlite3_column_text(res, 15)) : NULL;
  543. alarm_config.red = sqlite3_column_bytes(res, 16) > 0 ? strdupz((char *)sqlite3_column_text(res, 16)) : NULL;
  544. alarm_config.calculation_expr = sqlite3_column_bytes(res, 17) > 0 ? strdupz((char *)sqlite3_column_text(res, 17)) : NULL;
  545. alarm_config.warning_expr = sqlite3_column_bytes(res, 18) > 0 ? strdupz((char *)sqlite3_column_text(res, 18)) : NULL;
  546. alarm_config.critical_expr = sqlite3_column_bytes(res, 19) > 0 ? strdupz((char *)sqlite3_column_text(res, 19)) : NULL;
  547. alarm_config.recipient = sqlite3_column_bytes(res, 20) > 0 ? strdupz((char *)sqlite3_column_text(res, 20)) : NULL;
  548. alarm_config.exec = sqlite3_column_bytes(res, 21) > 0 ? strdupz((char *)sqlite3_column_text(res, 21)) : NULL;
  549. alarm_config.delay = sqlite3_column_bytes(res, 22) > 0 ? strdupz((char *)sqlite3_column_text(res, 22)) : NULL;
  550. alarm_config.repeat = sqlite3_column_bytes(res, 23) > 0 ? strdupz((char *)sqlite3_column_text(res, 23)) : NULL;
  551. alarm_config.info = sqlite3_column_bytes(res, 24) > 0 ? strdupz((char *)sqlite3_column_text(res, 24)) : NULL;
  552. alarm_config.options = sqlite3_column_bytes(res, 25) > 0 ? strdupz((char *)sqlite3_column_text(res, 25)) : NULL;
  553. alarm_config.host_labels = sqlite3_column_bytes(res, 26) > 0 ? strdupz((char *)sqlite3_column_text(res, 26)) : NULL;
  554. alarm_config.p_db_lookup_dimensions = NULL;
  555. alarm_config.p_db_lookup_method = NULL;
  556. alarm_config.p_db_lookup_options = NULL;
  557. alarm_config.p_db_lookup_after = 0;
  558. alarm_config.p_db_lookup_before = 0;
  559. if (sqlite3_column_bytes(res, 30) > 0) {
  560. alarm_config.p_db_lookup_dimensions = sqlite3_column_bytes(res, 27) > 0 ? strdupz((char *)sqlite3_column_text(res, 27)) : NULL;
  561. alarm_config.p_db_lookup_method = sqlite3_column_bytes(res, 28) > 0 ? strdupz((char *)sqlite3_column_text(res, 28)) : NULL;
  562. BUFFER *tmp_buf = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  563. buffer_data_options2string(tmp_buf, sqlite3_column_int(res, 29));
  564. alarm_config.p_db_lookup_options = strdupz((char *)buffer_tostring(tmp_buf));
  565. buffer_free(tmp_buf);
  566. alarm_config.p_db_lookup_after = sqlite3_column_int(res, 30);
  567. alarm_config.p_db_lookup_before = sqlite3_column_int(res, 31);
  568. }
  569. alarm_config.p_update_every = sqlite3_column_int(res, 32);
  570. p_alarm_config.cfg_hash = strdupz((char *) config_hash);
  571. p_alarm_config.cfg = alarm_config;
  572. }
  573. if (likely(p_alarm_config.cfg_hash)) {
  574. log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
  575. aclk_send_provide_alarm_cfg(&p_alarm_config);
  576. freez((char *) cmd.data_param);
  577. freez(p_alarm_config.cfg_hash);
  578. destroy_aclk_alarm_configuration(&alarm_config);
  579. }
  580. else
  581. log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", config_hash);
  582. bind_fail:
  583. rc = sqlite3_finalize(res);
  584. if (unlikely(rc != SQLITE_OK))
  585. error_report("Failed to reset statement when pushing alarm config hash, rc = %d", rc);
  586. return rc;
  587. #endif
  588. return 0;
  589. }
  590. // Start streaming alerts
  591. void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id)
  592. {
  593. if (unlikely(!node_id))
  594. return;
  595. //log_access("ACLK REQ [%s (N/A)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64".", node_id, start_seq_id, batch_id);
  596. uuid_t node_uuid;
  597. if (uuid_parse(node_id, node_uuid))
  598. return;
  599. struct aclk_database_worker_config *wc = NULL;
  600. RRDHOST *host = find_host_by_node_id(node_id);
  601. if (likely(host)) {
  602. wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
  603. (struct aclk_database_worker_config *)host->dbsync_worker :
  604. (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
  605. if (unlikely(!host->health.health_enabled)) {
  606. log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
  607. return;
  608. }
  609. if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1))
  610. sql_queue_existing_alerts_to_aclk(host);
  611. } else
  612. wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
  613. if (likely(wc)) {
  614. log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", start_seq_id, batch_id);
  615. __sync_synchronize();
  616. wc->alerts_batch_id = batch_id;
  617. wc->alerts_start_seq_id = start_seq_id;
  618. wc->alert_updates = 1;
  619. wc->pause_alert_updates = 0;
  620. __sync_synchronize();
  621. }
  622. else
  623. log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
  624. return;
  625. }
  626. void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  627. {
  628. UNUSED(cmd);
  629. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  630. buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
  631. "select unique_id alert_unique_id, unixepoch(), unique_id alert_unique_id from health_log_%s " \
  632. "where new_status = -2 and updated_by_id = 0 and unique_id not in " \
  633. "(select alert_unique_id from aclk_alert_%s) order by unique_id asc " \
  634. "on conflict (alert_unique_id) do nothing;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
  635. db_execute(buffer_tostring(sql));
  636. log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
  637. buffer_free(sql);
  638. wc->pause_alert_updates = 0;
  639. return;
  640. }
  641. void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
  642. {
  643. if (unlikely(!host->dbsync_worker))
  644. return;
  645. if (!claimed())
  646. return;
  647. struct aclk_database_cmd cmd;
  648. memset(&cmd, 0, sizeof(cmd));
  649. cmd.opcode = ACLK_DATABASE_QUEUE_REMOVED_ALERTS;
  650. cmd.data = NULL;
  651. cmd.data_param = NULL;
  652. cmd.completion = NULL;
  653. aclk_database_enq_cmd((struct aclk_database_worker_config *) host->dbsync_worker, &cmd);
  654. }
  655. void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id)
  656. {
  657. UNUSED(claim_id);
  658. if (unlikely(!node_id))
  659. return;
  660. uuid_t node_uuid;
  661. if (uuid_parse(node_id, node_uuid))
  662. return;
  663. struct aclk_database_worker_config *wc = NULL;
  664. RRDHOST *host = find_host_by_node_id(node_id);
  665. if (likely(host))
  666. wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  667. if (likely(wc)) {
  668. log_access(
  669. "IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64,
  670. wc->node_id,
  671. wc->host ? rrdhost_hostname(wc->host) : "N/A",
  672. snapshot_id,
  673. sequence_id);
  674. if (wc->alerts_snapshot_id == snapshot_id)
  675. return;
  676. __sync_synchronize();
  677. wc->alerts_snapshot_id = snapshot_id;
  678. wc->alerts_ack_sequence_id = sequence_id;
  679. __sync_synchronize();
  680. struct aclk_database_cmd cmd;
  681. memset(&cmd, 0, sizeof(cmd));
  682. cmd.opcode = ACLK_DATABASE_PUSH_ALERT_SNAPSHOT;
  683. cmd.data_param = NULL;
  684. cmd.completion = NULL;
  685. aclk_database_enq_cmd(wc, &cmd);
  686. } else
  687. log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
  688. return;
  689. }
  690. void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id)
  691. {
  692. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  693. if (alerts_ack_sequence_id != 0) {
  694. buffer_sprintf(
  695. sql,
  696. "UPDATE aclk_alert_%s SET date_cloud_ack = unixepoch() WHERE sequence_id <= %" PRIu64 "",
  697. uuid_str,
  698. alerts_ack_sequence_id);
  699. db_execute(buffer_tostring(sql));
  700. }
  701. buffer_free(sql);
  702. }
  703. #ifdef ENABLE_ACLK
  704. void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
  705. {
  706. char *edit_command = ae->source ? health_edit_command_from_source(ae_source(ae)) : strdupz("UNKNOWN=0=UNKNOWN");
  707. char config_hash_id[GUID_LEN + 1];
  708. uuid_unparse_lower(ae->config_hash_id, config_hash_id);
  709. alarm_log->chart = strdupz(ae_chart_name(ae));
  710. alarm_log->name = strdupz(ae_name(ae));
  711. alarm_log->family = strdupz(ae_family(ae));
  712. alarm_log->batch_id = 0;
  713. alarm_log->sequence_id = 0;
  714. alarm_log->when = (time_t)ae->when;
  715. alarm_log->config_hash = strdupz((char *)config_hash_id);
  716. alarm_log->utc_offset = host->utc_offset;
  717. alarm_log->timezone = strdupz(rrdhost_abbrev_timezone(host));
  718. alarm_log->exec_path = ae->exec ? strdupz(ae_exec(ae)) : strdupz((char *)string2str(host->health.health_default_exec));
  719. alarm_log->conf_source = ae->source ? strdupz(ae_source(ae)) : strdupz((char *)"");
  720. alarm_log->command = strdupz((char *)edit_command);
  721. alarm_log->duration = (time_t)ae->duration;
  722. alarm_log->non_clear_duration = (time_t)ae->non_clear_duration;
  723. alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status);
  724. alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status);
  725. alarm_log->delay = (int)ae->delay;
  726. alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp;
  727. alarm_log->last_repeat = (time_t)ae->last_repeat;
  728. alarm_log->silenced =
  729. ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp(ae_recipient(ae), "silent", 6))) ?
  730. 1 :
  731. 0;
  732. alarm_log->value_string = strdupz(ae_new_value_string(ae));
  733. alarm_log->old_value_string = strdupz(ae_old_value_string(ae));
  734. alarm_log->value = (!isnan(ae->new_value)) ? (NETDATA_DOUBLE)ae->new_value : 0;
  735. alarm_log->old_value = (!isnan(ae->old_value)) ? (NETDATA_DOUBLE)ae->old_value : 0;
  736. alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  737. alarm_log->rendered_info = strdupz(ae_info(ae));
  738. alarm_log->chart_context = strdupz(ae_chart_context(ae));
  739. freez(edit_command);
  740. }
  741. #endif
  742. #ifdef ENABLE_ACLK
  743. static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, uint32_t mark)
  744. {
  745. ALARM_ENTRY *ae = host->health_log.alarms;
  746. while (ae) {
  747. if (ae->alarm_id == alarm_id && ae->unique_id >mark &&
  748. (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL))
  749. return 1;
  750. ae = ae->next;
  751. }
  752. return 0;
  753. }
  754. #endif
  755. #define ALARM_EVENTS_PER_CHUNK 10
  756. void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  757. {
  758. #ifndef ENABLE_ACLK
  759. UNUSED(wc);
  760. UNUSED(cmd);
  761. #else
  762. UNUSED(cmd);
  763. // we perhaps we don't need this for snapshots
  764. if (unlikely(!wc->alert_updates)) {
  765. log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
  766. return;
  767. }
  768. if (unlikely(!wc->host)) {
  769. error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid);
  770. return;
  771. }
  772. if (unlikely(!wc->alerts_snapshot_id))
  773. return;
  774. char *claim_id = get_agent_claimid();
  775. if (unlikely(!claim_id))
  776. return;
  777. log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A", wc->alerts_snapshot_id);
  778. aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id);
  779. RRDHOST *host = wc->host;
  780. uint32_t cnt = 0;
  781. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  782. ALARM_ENTRY *ae = host->health_log.alarms;
  783. for (; ae; ae = ae->next) {
  784. if (likely(ae->updated_by_id))
  785. continue;
  786. if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  787. continue;
  788. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  789. continue;
  790. cnt++;
  791. }
  792. if (cnt) {
  793. uint32_t chunk = 1, chunks = 0;
  794. chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
  795. ae = host->health_log.alarms;
  796. cnt = 0;
  797. struct alarm_snapshot alarm_snap;
  798. alarm_snap.node_id = wc->node_id;
  799. alarm_snap.claim_id = claim_id;
  800. alarm_snap.snapshot_id = wc->alerts_snapshot_id;
  801. alarm_snap.chunks = chunks;
  802. alarm_snap.chunk = chunk;
  803. alarm_snapshot_proto_ptr_t snapshot_proto = NULL;
  804. for (; ae; ae = ae->next) {
  805. if (likely(ae->updated_by_id))
  806. continue;
  807. if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  808. continue;
  809. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  810. continue;
  811. cnt++;
  812. struct alarm_log_entry alarm_log;
  813. alarm_log.node_id = wc->node_id;
  814. alarm_log.claim_id = claim_id;
  815. if (!snapshot_proto)
  816. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  817. health_alarm_entry2proto_nolock(&alarm_log, ae, host);
  818. add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
  819. if (cnt == ALARM_EVENTS_PER_CHUNK) {
  820. aclk_send_alarm_snapshot(snapshot_proto);
  821. cnt = 0;
  822. if (chunk < chunks) {
  823. chunk++;
  824. struct alarm_snapshot alarm_snap;
  825. alarm_snap.node_id = wc->node_id;
  826. alarm_snap.claim_id = claim_id;
  827. alarm_snap.snapshot_id = wc->alerts_snapshot_id;
  828. alarm_snap.chunks = chunks;
  829. alarm_snap.chunk = chunk;
  830. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  831. }
  832. }
  833. destroy_alarm_log_entry(&alarm_log);
  834. }
  835. if (cnt)
  836. aclk_send_alarm_snapshot(snapshot_proto);
  837. }
  838. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  839. wc->alerts_snapshot_id = 0;
  840. freez(claim_id);
  841. #endif
  842. return;
  843. }
  844. void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
  845. {
  846. if (!claimed())
  847. return;
  848. char uuid_str[GUID_LEN + 1];
  849. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  850. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  851. buffer_sprintf(sql,"delete from aclk_alert_%s where filtered_alert_unique_id not in "
  852. " (select unique_id from health_log_%s); ", uuid_str, uuid_str);
  853. char *err_msg = NULL;
  854. int rc = sqlite3_exec_monitored(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg);
  855. if (rc != SQLITE_OK) {
  856. error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s""",
  857. uuid_str, err_msg);
  858. sqlite3_free(err_msg);
  859. }
  860. buffer_free(sql);
  861. }
  862. int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
  863. {
  864. int rc;
  865. struct aclk_database_worker_config *wc = NULL;
  866. wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  867. if (!wc)
  868. return 1;
  869. proto_alert_status->alert_updates = wc->alert_updates;
  870. proto_alert_status->alerts_batch_id = wc->alerts_batch_id;
  871. BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  872. sqlite3_stmt *res = NULL;
  873. buffer_sprintf(sql, "SELECT MIN(sequence_id), MAX(sequence_id), " \
  874. "(select MAX(sequence_id) from aclk_alert_%s where date_cloud_ack is not NULL), " \
  875. "(select MAX(sequence_id) from aclk_alert_%s where date_submitted is not NULL) " \
  876. "FROM aclk_alert_%s where date_submitted is null;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
  877. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  878. if (rc != SQLITE_OK) {
  879. error_report("Failed to prepare statement to get alert log status from the database.");
  880. buffer_free(sql);
  881. return 1;
  882. }
  883. while (sqlite3_step_monitored(res) == SQLITE_ROW) {
  884. proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
  885. proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
  886. proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
  887. proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0;
  888. }
  889. rc = sqlite3_finalize(res);
  890. if (unlikely(rc != SQLITE_OK))
  891. error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc);
  892. buffer_free(sql);
  893. return 0;
  894. }