sqlite_aclk_alert.c 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_functions.h"
  3. #include "sqlite_aclk_alert.h"
  4. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  5. #include "../../aclk/aclk_alarm_api.h"
  6. #include "../../aclk/aclk.h"
  7. #endif
  8. // will replace call to aclk_update_alarm in health/health_log.c
  9. // and handle both cases
  10. int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae)
  11. {
  12. //check aclk architecture and handle old json alarm update to cloud
  13. //include also the valid statuses for this case
  14. #ifdef ENABLE_ACLK
  15. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  16. if (!aclk_use_new_cloud_arch && aclk_connected) {
  17. #endif
  18. if ((ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) ||
  19. ((ae->old_status == RRDCALC_STATUS_WARNING || ae->old_status == RRDCALC_STATUS_CRITICAL))) {
  20. aclk_update_alarm(host, ae);
  21. }
  22. #endif
  23. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  24. }
  25. if (!claimed())
  26. return 0;
  27. if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED)
  28. return 0;
  29. if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED)
  30. return 0;
  31. if (unlikely(!host->dbsync_worker))
  32. return 1;
  33. if (unlikely(uuid_is_null(ae->config_hash_id)))
  34. return 0;
  35. int rc = 0;
  36. CHECK_SQLITE_CONNECTION(db_meta);
  37. sqlite3_stmt *res_alert = NULL;
  38. char uuid_str[GUID_LEN + 1];
  39. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  40. BUFFER *sql = buffer_create(1024);
  41. buffer_sprintf(
  42. sql,
  43. "INSERT INTO aclk_alert_%s (alert_unique_id, date_created) "
  44. "VALUES (@alert_unique_id, strftime('%%s')) on conflict (alert_unique_id) do nothing; ",
  45. uuid_str);
  46. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res_alert, 0);
  47. if (unlikely(rc != SQLITE_OK)) {
  48. error_report("Failed to prepare statement to store alert event");
  49. buffer_free(sql);
  50. return 1;
  51. }
  52. rc = sqlite3_bind_int(res_alert, 1, ae->unique_id);
  53. if (unlikely(rc != SQLITE_OK))
  54. goto bind_fail;
  55. rc = execute_insert(res_alert);
  56. if (unlikely(rc != SQLITE_DONE)) {
  57. error_report("Failed to store alert event %u, rc = %d", ae->unique_id, rc);
  58. goto bind_fail;
  59. }
  60. ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
  61. bind_fail:
  62. if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK))
  63. error_report("Failed to reset statement in store alert event, rc = %d", rc);
  64. buffer_free(sql);
  65. return 0;
  66. #else
  67. UNUSED(host);
  68. UNUSED(ae);
  69. #endif
  70. return 0;
  71. }
  72. int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
  73. {
  74. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  75. switch(status) {
  76. case RRDCALC_STATUS_REMOVED:
  77. return ALARM_STATUS_REMOVED;
  78. case RRDCALC_STATUS_UNDEFINED:
  79. return ALARM_STATUS_NOT_A_NUMBER;
  80. case RRDCALC_STATUS_CLEAR:
  81. return ALARM_STATUS_CLEAR;
  82. case RRDCALC_STATUS_WARNING:
  83. return ALARM_STATUS_WARNING;
  84. case RRDCALC_STATUS_CRITICAL:
  85. return ALARM_STATUS_CRITICAL;
  86. default:
  87. return ALARM_STATUS_UNKNOWN;
  88. }
  89. #else
  90. UNUSED(status);
  91. return 1;
  92. #endif
  93. }
  94. void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  95. {
  96. #ifndef ENABLE_NEW_CLOUD_PROTOCOL
  97. UNUSED(wc);
  98. UNUSED(cmd);
  99. #else
  100. int rc;
  101. if (unlikely(!wc->alert_updates)) {
  102. log_access("ACLK STA [%s (%s)]: Ignoring alert push event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
  103. return;
  104. }
  105. char *claim_id = is_agent_claimed();
  106. if (unlikely(!claim_id))
  107. return;
  108. if (unlikely(!wc->host)) {
  109. freez(claim_id);
  110. return;
  111. }
  112. BUFFER *sql = buffer_create(1024);
  113. if (wc->alerts_start_seq_id != 0) {
  114. buffer_sprintf(
  115. sql,
  116. "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64
  117. "; UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id < %"PRIu64
  118. " and date_cloud_ack is null "
  119. "; UPDATE aclk_alert_%s SET date_submitted = strftime('%%s','now') WHERE sequence_id < %"PRIu64
  120. " and date_submitted is null",
  121. wc->uuid_str,
  122. wc->alerts_start_seq_id,
  123. wc->uuid_str,
  124. wc->alerts_start_seq_id,
  125. wc->uuid_str,
  126. wc->alerts_start_seq_id);
  127. db_execute(buffer_tostring(sql));
  128. buffer_reset(sql);
  129. wc->alerts_start_seq_id = 0;
  130. }
  131. int limit = cmd.count > 0 ? cmd.count : 1;
  132. sqlite3_stmt *res = NULL;
  133. buffer_sprintf(sql, "select aa.sequence_id, hl.unique_id, hl.alarm_id, hl.config_hash_id, hl.updated_by_id, hl.when_key, \
  134. hl.duration, hl.non_clear_duration, hl.flags, hl.exec_run_timestamp, hl.delay_up_to_timestamp, hl.name, \
  135. hl.chart, hl.family, hl.exec, hl.recipient, hl.source, hl.units, hl.info, hl.exec_code, hl.new_status, \
  136. hl.old_status, hl.delay, hl.new_value, hl.old_value, hl.last_repeat \
  137. from health_log_%s hl, aclk_alert_%s aa \
  138. where hl.unique_id = aa.alert_unique_id and aa.date_submitted is null \
  139. order by aa.sequence_id asc limit %d;", wc->uuid_str, wc->uuid_str, limit);
  140. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  141. if (rc != SQLITE_OK) {
  142. error_report("Failed to prepare statement when trying to send an alert update via ACLK");
  143. buffer_free(sql);
  144. freez(claim_id);
  145. return;
  146. }
  147. char uuid_str[GUID_LEN + 1];
  148. uint64_t first_sequence_id = 0;
  149. uint64_t last_sequence_id = 0;
  150. static __thread uint64_t log_first_sequence_id = 0;
  151. static __thread uint64_t log_last_sequence_id = 0;
  152. while (sqlite3_step(res) == SQLITE_ROW) {
  153. struct alarm_log_entry alarm_log;
  154. char old_value_string[100 + 1];
  155. char new_value_string[100 + 1];
  156. alarm_log.node_id = wc->node_id;
  157. alarm_log.claim_id = claim_id;
  158. alarm_log.chart = strdupz((char *)sqlite3_column_text(res, 12));
  159. alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11));
  160. alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
  161. alarm_log.batch_id = wc->alerts_batch_id;
  162. alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  163. alarm_log.when = (time_t) sqlite3_column_int64(res, 5);
  164. uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), uuid_str);
  165. alarm_log.config_hash = strdupz((char *)uuid_str);
  166. alarm_log.utc_offset = wc->host->utc_offset;
  167. alarm_log.timezone = strdupz((char *)wc->host->abbrev_timezone);
  168. alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) :
  169. strdupz((char *)wc->host->health_default_exec);
  170. alarm_log.conf_source = strdupz((char *)sqlite3_column_text(res, 16));
  171. char *edit_command = sqlite3_column_bytes(res, 16) > 0 ?
  172. health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) :
  173. strdupz("UNKNOWN=0=UNKNOWN");
  174. alarm_log.command = strdupz(edit_command);
  175. alarm_log.duration = (time_t) sqlite3_column_int64(res, 6);
  176. alarm_log.non_clear_duration = (time_t) sqlite3_column_int64(res, 7);
  177. alarm_log.status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 20));
  178. alarm_log.old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 21));
  179. alarm_log.delay = (int) sqlite3_column_int(res, 22);
  180. alarm_log.delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 10);
  181. alarm_log.last_repeat = (time_t) sqlite3_column_int64(res, 25);
  182. alarm_log.silenced = ((sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) ||
  183. (sqlite3_column_type(res, 15) != SQLITE_NULL &&
  184. !strncmp((char *)sqlite3_column_text(res, 15), "silent", 6))) ?
  185. 1 :
  186. 0;
  187. alarm_log.value_string =
  188. sqlite3_column_type(res, 23) == SQLITE_NULL ?
  189. strdupz((char *)"-") :
  190. strdupz((char *)format_value_and_unit(
  191. new_value_string, 100, sqlite3_column_double(res, 23), (char *)sqlite3_column_text(res, 17), -1));
  192. alarm_log.old_value_string =
  193. sqlite3_column_type(res, 24) == SQLITE_NULL ?
  194. strdupz((char *)"-") :
  195. strdupz((char *)format_value_and_unit(
  196. old_value_string, 100, sqlite3_column_double(res, 24), (char *)sqlite3_column_text(res, 17), -1));
  197. alarm_log.value = (calculated_number) sqlite3_column_double(res, 23);
  198. alarm_log.old_value = (calculated_number) sqlite3_column_double(res, 24);
  199. alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  200. alarm_log.rendered_info = sqlite3_column_type(res, 18) == SQLITE_NULL ?
  201. strdupz((char *)"") :
  202. strdupz((char *)sqlite3_column_text(res, 18));
  203. aclk_send_alarm_log_entry(&alarm_log);
  204. if (first_sequence_id == 0)
  205. first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  206. if (log_first_sequence_id == 0)
  207. log_first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  208. last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  209. log_last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
  210. destroy_alarm_log_entry(&alarm_log);
  211. freez(edit_command);
  212. }
  213. if (first_sequence_id) {
  214. buffer_flush(sql);
  215. buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=strftime('%%s') "
  216. "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
  217. wc->uuid_str, first_sequence_id, last_sequence_id);
  218. db_execute(buffer_tostring(sql));
  219. } else {
  220. if (log_first_sequence_id)
  221. log_access(
  222. "ACLK RES [%s (%s)]: ALERTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64,
  223. wc->node_id,
  224. wc->host ? wc->host->hostname : "N/A",
  225. log_first_sequence_id,
  226. log_last_sequence_id,
  227. wc->alerts_batch_id);
  228. log_first_sequence_id = 0;
  229. log_last_sequence_id = 0;
  230. }
  231. rc = sqlite3_finalize(res);
  232. if (unlikely(rc != SQLITE_OK))
  233. error_report("Failed to finalize statement to send alert entries from the database, rc = %d", rc);
  234. freez(claim_id);
  235. buffer_free(sql);
  236. #endif
  237. return;
  238. }
  239. void aclk_send_alarm_health_log(char *node_id)
  240. {
  241. if (unlikely(!node_id))
  242. return;
  243. char *hostname= NULL;
  244. struct aclk_database_worker_config *wc = NULL;
  245. struct aclk_database_cmd cmd;
  246. memset(&cmd, 0, sizeof(cmd));
  247. cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG;
  248. rrd_rdlock();
  249. RRDHOST *host = find_host_by_node_id(node_id);
  250. if (likely(host)) {
  251. wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  252. hostname = host->hostname;
  253. }
  254. else
  255. hostname = get_hostname_by_node_id(node_id);
  256. rrd_unlock();
  257. log_access("ACLK REQ [%s (%s)]: HEALTH LOG request received", node_id, hostname ? hostname : "N/A");
  258. if (unlikely(!host))
  259. freez(hostname);
  260. if (wc)
  261. aclk_database_enq_cmd(wc, &cmd);
  262. else {
  263. if (aclk_worker_enq_cmd(node_id, &cmd))
  264. log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
  265. }
  266. return;
  267. }
  268. void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  269. {
  270. UNUSED(cmd);
  271. #ifndef ENABLE_NEW_CLOUD_PROTOCOL
  272. UNUSED(wc);
  273. #else
  274. int rc;
  275. char *claim_id = is_agent_claimed();
  276. if (unlikely(!claim_id))
  277. return;
  278. RRDHOST *host = wc->host;
  279. if (unlikely(!host)) {
  280. rrd_rdlock();
  281. host = find_host_by_node_id(wc->node_id);
  282. rrd_unlock();
  283. if (unlikely(!host)) {
  284. log_access(
  285. "AC [%s (N/A)]: ACLK synchronization thread for %s is not yet linked to HOST.",
  286. wc->node_id,
  287. wc->host_guid);
  288. freez(claim_id);
  289. return;
  290. }
  291. }
  292. uint64_t first_sequence = 0;
  293. uint64_t last_sequence = 0;
  294. struct timeval first_timestamp;
  295. struct timeval last_timestamp;
  296. BUFFER *sql = buffer_create(1024);
  297. sqlite3_stmt *res = NULL;
  298. //TODO: make this better: include info from health log too
  299. buffer_sprintf(sql, "SELECT MIN(sequence_id), MIN(date_created), MAX(sequence_id), MAX(date_created) " \
  300. "FROM aclk_alert_%s;", wc->uuid_str);
  301. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  302. if (rc != SQLITE_OK) {
  303. error_report("Failed to prepare statement to get health log statistics from the database");
  304. buffer_free(sql);
  305. freez(claim_id);
  306. return;
  307. }
  308. first_timestamp.tv_sec = 0;
  309. first_timestamp.tv_usec = 0;
  310. last_timestamp.tv_sec = 0;
  311. last_timestamp.tv_usec = 0;
  312. while (sqlite3_step(res) == SQLITE_ROW) {
  313. first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
  314. if (sqlite3_column_bytes(res, 1) > 0) {
  315. first_timestamp.tv_sec = sqlite3_column_int64(res, 1);
  316. }
  317. last_sequence = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
  318. if (sqlite3_column_bytes(res, 3) > 0) {
  319. last_timestamp.tv_sec = sqlite3_column_int64(res, 3);
  320. }
  321. }
  322. struct alarm_log_entries log_entries;
  323. log_entries.first_seq_id = first_sequence;
  324. log_entries.first_when = first_timestamp;
  325. log_entries.last_seq_id = last_sequence;
  326. log_entries.last_when = last_timestamp;
  327. struct alarm_log_health alarm_log;
  328. alarm_log.claim_id = claim_id;
  329. alarm_log.node_id = wc->node_id;
  330. alarm_log.log_entries = log_entries;
  331. alarm_log.status = wc->alert_updates == 0 ? 2 : 1;
  332. alarm_log.enabled = (int)host->health_enabled;
  333. wc->alert_sequence_id = last_sequence;
  334. aclk_send_alarm_log_health(&alarm_log);
  335. log_access("ACLK RES [%s (%s)]: HEALTH LOG SENT from %"PRIu64" to %"PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", first_sequence, last_sequence);
  336. rc = sqlite3_finalize(res);
  337. if (unlikely(rc != SQLITE_OK))
  338. error_report("Failed to reset statement to get health log statistics from the database, rc = %d", rc);
  339. freez(claim_id);
  340. buffer_free(sql);
  341. #endif
  342. return;
  343. }
  344. void aclk_send_alarm_configuration(char *config_hash)
  345. {
  346. if (unlikely(!config_hash))
  347. return;
  348. struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) localhost->dbsync_worker;
  349. if (unlikely(!wc)) {
  350. return;
  351. }
  352. log_access("ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
  353. struct aclk_database_cmd cmd;
  354. memset(&cmd, 0, sizeof(cmd));
  355. cmd.opcode = ACLK_DATABASE_PUSH_ALERT_CONFIG;
  356. cmd.data_param = (void *) strdupz(config_hash);
  357. cmd.completion = NULL;
  358. aclk_database_enq_cmd(wc, &cmd);
  359. return;
  360. }
  361. #define SQL_SELECT_ALERT_CONFIG "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \
  362. "module, charts, families, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \
  363. "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \
  364. "p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;"
  365. int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  366. {
  367. UNUSED(wc);
  368. #ifndef ENABLE_NEW_CLOUD_PROTOCOL
  369. UNUSED(cmd);
  370. #else
  371. int rc = 0;
  372. CHECK_SQLITE_CONNECTION(db_meta);
  373. sqlite3_stmt *res = NULL;
  374. char *config_hash = (char *) cmd.data_param;
  375. rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0);
  376. if (rc != SQLITE_OK) {
  377. error_report("Failed to prepare statement when trying to fetch an alarm hash configuration");
  378. return 1;
  379. }
  380. uuid_t hash_uuid;
  381. if (uuid_parse(config_hash, hash_uuid))
  382. return 1;
  383. rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC);
  384. if (unlikely(rc != SQLITE_OK))
  385. goto bind_fail;
  386. struct aclk_alarm_configuration alarm_config;
  387. struct provide_alarm_configuration p_alarm_config;
  388. p_alarm_config.cfg_hash = NULL;
  389. if (sqlite3_step(res) == SQLITE_ROW) {
  390. alarm_config.alarm = sqlite3_column_bytes(res, 0) > 0 ? strdupz((char *)sqlite3_column_text(res, 0)) : NULL;
  391. alarm_config.tmpl = sqlite3_column_bytes(res, 1) > 0 ? strdupz((char *)sqlite3_column_text(res, 1)) : NULL;
  392. alarm_config.on_chart = sqlite3_column_bytes(res, 2) > 0 ? strdupz((char *)sqlite3_column_text(res, 2)) : NULL;
  393. alarm_config.classification = sqlite3_column_bytes(res, 3) > 0 ? strdupz((char *)sqlite3_column_text(res, 3)) : NULL;
  394. alarm_config.type = sqlite3_column_bytes(res, 4) > 0 ? strdupz((char *)sqlite3_column_text(res, 4)) : NULL;
  395. alarm_config.component = sqlite3_column_bytes(res, 5) > 0 ? strdupz((char *)sqlite3_column_text(res, 5)) : NULL;
  396. alarm_config.os = sqlite3_column_bytes(res, 6) > 0 ? strdupz((char *)sqlite3_column_text(res, 6)) : NULL;
  397. alarm_config.hosts = sqlite3_column_bytes(res, 7) > 0 ? strdupz((char *)sqlite3_column_text(res, 7)) : NULL;
  398. alarm_config.plugin = sqlite3_column_bytes(res, 8) > 0 ? strdupz((char *)sqlite3_column_text(res, 8)) : NULL;
  399. alarm_config.module = sqlite3_column_bytes(res, 9) > 0 ? strdupz((char *)sqlite3_column_text(res, 9)) : NULL;
  400. alarm_config.charts = sqlite3_column_bytes(res, 10) > 0 ? strdupz((char *)sqlite3_column_text(res, 10)) : NULL;
  401. alarm_config.families = sqlite3_column_bytes(res, 11) > 0 ? strdupz((char *)sqlite3_column_text(res, 11)) : NULL;
  402. alarm_config.lookup = sqlite3_column_bytes(res, 12) > 0 ? strdupz((char *)sqlite3_column_text(res, 12)) : NULL;
  403. alarm_config.every = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
  404. alarm_config.units = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : NULL;
  405. alarm_config.green = sqlite3_column_bytes(res, 15) > 0 ? strdupz((char *)sqlite3_column_text(res, 15)) : NULL;
  406. alarm_config.red = sqlite3_column_bytes(res, 16) > 0 ? strdupz((char *)sqlite3_column_text(res, 16)) : NULL;
  407. alarm_config.calculation_expr = sqlite3_column_bytes(res, 17) > 0 ? strdupz((char *)sqlite3_column_text(res, 17)) : NULL;
  408. alarm_config.warning_expr = sqlite3_column_bytes(res, 18) > 0 ? strdupz((char *)sqlite3_column_text(res, 18)) : NULL;
  409. alarm_config.critical_expr = sqlite3_column_bytes(res, 19) > 0 ? strdupz((char *)sqlite3_column_text(res, 19)) : NULL;
  410. alarm_config.recipient = sqlite3_column_bytes(res, 20) > 0 ? strdupz((char *)sqlite3_column_text(res, 20)) : NULL;
  411. alarm_config.exec = sqlite3_column_bytes(res, 21) > 0 ? strdupz((char *)sqlite3_column_text(res, 21)) : NULL;
  412. alarm_config.delay = sqlite3_column_bytes(res, 22) > 0 ? strdupz((char *)sqlite3_column_text(res, 22)) : NULL;
  413. alarm_config.repeat = sqlite3_column_bytes(res, 23) > 0 ? strdupz((char *)sqlite3_column_text(res, 23)) : NULL;
  414. alarm_config.info = sqlite3_column_bytes(res, 24) > 0 ? strdupz((char *)sqlite3_column_text(res, 24)) : NULL;
  415. alarm_config.options = sqlite3_column_bytes(res, 25) > 0 ? strdupz((char *)sqlite3_column_text(res, 25)) : NULL;
  416. alarm_config.host_labels = sqlite3_column_bytes(res, 26) > 0 ? strdupz((char *)sqlite3_column_text(res, 26)) : NULL;
  417. alarm_config.p_db_lookup_dimensions = NULL;
  418. alarm_config.p_db_lookup_method = NULL;
  419. alarm_config.p_db_lookup_options = NULL;
  420. alarm_config.p_db_lookup_after = 0;
  421. alarm_config.p_db_lookup_before = 0;
  422. if (sqlite3_column_bytes(res, 30) > 0) {
  423. alarm_config.p_db_lookup_dimensions = sqlite3_column_bytes(res, 27) > 0 ? strdupz((char *)sqlite3_column_text(res, 27)) : NULL;
  424. alarm_config.p_db_lookup_method = sqlite3_column_bytes(res, 28) > 0 ? strdupz((char *)sqlite3_column_text(res, 28)) : NULL;
  425. BUFFER *tmp_buf = buffer_create(1024);
  426. buffer_data_options2string(tmp_buf, sqlite3_column_int(res, 29));
  427. alarm_config.p_db_lookup_options = strdupz((char *)buffer_tostring(tmp_buf));
  428. buffer_free(tmp_buf);
  429. alarm_config.p_db_lookup_after = sqlite3_column_int(res, 30);
  430. alarm_config.p_db_lookup_before = sqlite3_column_int(res, 31);
  431. }
  432. alarm_config.p_update_every = sqlite3_column_int(res, 32);
  433. p_alarm_config.cfg_hash = strdupz((char *) config_hash);
  434. p_alarm_config.cfg = alarm_config;
  435. }
  436. if (likely(p_alarm_config.cfg_hash)) {
  437. log_access("ACLK RES [%s (%s)]: Sent alert config %s.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
  438. aclk_send_provide_alarm_cfg(&p_alarm_config);
  439. freez((char *) cmd.data_param);
  440. freez(p_alarm_config.cfg_hash);
  441. destroy_aclk_alarm_configuration(&alarm_config);
  442. }
  443. else
  444. log_access("ACLK STA [%s (%s)]: Alert config for %s not found.", wc->node_id, wc->host ? wc->host->hostname : "N/A", config_hash);
  445. bind_fail:
  446. rc = sqlite3_finalize(res);
  447. if (unlikely(rc != SQLITE_OK))
  448. error_report("Failed to reset statement when pushing alarm config hash, rc = %d", rc);
  449. return rc;
  450. #endif
  451. return 0;
  452. }
  453. // Start streaming alerts
  454. void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id)
  455. {
  456. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  457. if (unlikely(!node_id))
  458. return;
  459. //log_access("ACLK REQ [%s (N/A)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64".", node_id, start_seq_id, batch_id);
  460. uuid_t node_uuid;
  461. if (uuid_parse(node_id, node_uuid))
  462. return;
  463. struct aclk_database_worker_config *wc = NULL;
  464. rrd_rdlock();
  465. RRDHOST *host = find_host_by_node_id(node_id);
  466. rrd_unlock();
  467. if (likely(host)) {
  468. wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
  469. (struct aclk_database_worker_config *)host->dbsync_worker :
  470. (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
  471. if (unlikely(!host->health_enabled)) {
  472. log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
  473. return;
  474. }
  475. } else
  476. wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
  477. if (likely(wc)) {
  478. log_access("ACLK REQ [%s (%s)]: ALERTS STREAM from %"PRIu64" batch=%"PRIu64, node_id, wc->host ? wc->host->hostname : "N/A", start_seq_id, batch_id);
  479. __sync_synchronize();
  480. wc->alerts_batch_id = batch_id;
  481. wc->alerts_start_seq_id = start_seq_id;
  482. wc->alert_updates = 1;
  483. __sync_synchronize();
  484. }
  485. else
  486. log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
  487. #else
  488. UNUSED(node_id);
  489. UNUSED(start_seq_id);
  490. UNUSED(batch_id);
  491. #endif
  492. return;
  493. }
  494. void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  495. {
  496. UNUSED(cmd);
  497. #ifndef ENABLE_NEW_CLOUD_PROTOCOL
  498. UNUSED(wc);
  499. #else
  500. BUFFER *sql = buffer_create(1024);
  501. buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \
  502. "select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s " \
  503. "where new_status = -2 and updated_by_id = 0 and unique_id not in " \
  504. "(select alert_unique_id from aclk_alert_%s) order by unique_id asc " \
  505. "on conflict (alert_unique_id) do nothing;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
  506. db_execute(buffer_tostring(sql));
  507. log_access("ACLK STA [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
  508. buffer_free(sql);
  509. #endif
  510. return;
  511. }
  512. void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
  513. {
  514. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  515. if (unlikely(!host->dbsync_worker))
  516. return;
  517. struct aclk_database_cmd cmd;
  518. memset(&cmd, 0, sizeof(cmd));
  519. cmd.opcode = ACLK_DATABASE_QUEUE_REMOVED_ALERTS;
  520. cmd.data = NULL;
  521. cmd.data_param = NULL;
  522. cmd.completion = NULL;
  523. aclk_database_enq_cmd((struct aclk_database_worker_config *) host->dbsync_worker, &cmd);
  524. #else
  525. UNUSED(host);
  526. #endif
  527. }
  528. void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id)
  529. {
  530. UNUSED(claim_id);
  531. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  532. if (unlikely(!node_id))
  533. return;
  534. uuid_t node_uuid;
  535. if (uuid_parse(node_id, node_uuid))
  536. return;
  537. struct aclk_database_worker_config *wc = NULL;
  538. rrd_rdlock();
  539. RRDHOST *host = find_host_by_node_id(node_id);
  540. if (likely(host))
  541. wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  542. rrd_unlock();
  543. if (likely(wc)) {
  544. log_access(
  545. "IN [%s (%s)]: Request to send alerts snapshot, snapshot_id %" PRIu64 " and ack_sequence_id %" PRIu64,
  546. wc->node_id,
  547. wc->host ? wc->host->hostname : "N/A",
  548. snapshot_id,
  549. sequence_id);
  550. if (wc->alerts_snapshot_id == snapshot_id)
  551. return;
  552. __sync_synchronize();
  553. wc->alerts_snapshot_id = snapshot_id;
  554. wc->alerts_ack_sequence_id = sequence_id;
  555. __sync_synchronize();
  556. struct aclk_database_cmd cmd;
  557. memset(&cmd, 0, sizeof(cmd));
  558. cmd.opcode = ACLK_DATABASE_PUSH_ALERT_SNAPSHOT;
  559. cmd.data_param = NULL;
  560. cmd.completion = NULL;
  561. aclk_database_enq_cmd(wc, &cmd);
  562. } else
  563. log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id);
  564. #else
  565. UNUSED(node_id);
  566. UNUSED(snapshot_id);
  567. UNUSED(sequence_id);
  568. #endif
  569. return;
  570. }
  571. void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id)
  572. {
  573. BUFFER *sql = buffer_create(1024);
  574. if (alerts_ack_sequence_id != 0) {
  575. buffer_sprintf(
  576. sql,
  577. "UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id <= %" PRIu64 "",
  578. uuid_str,
  579. alerts_ack_sequence_id);
  580. db_execute(buffer_tostring(sql));
  581. }
  582. buffer_free(sql);
  583. }
  584. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  585. void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
  586. {
  587. char *edit_command = ae->source ? health_edit_command_from_source(ae->source) : strdupz("UNKNOWN=0=UNKNOWN");
  588. char config_hash_id[GUID_LEN + 1];
  589. uuid_unparse_lower(ae->config_hash_id, config_hash_id);
  590. alarm_log->chart = strdupz((char *)ae->chart);
  591. alarm_log->name = strdupz((char *)ae->name);
  592. alarm_log->family = strdupz((char *)ae->family);
  593. alarm_log->batch_id = 0;
  594. alarm_log->sequence_id = 0;
  595. alarm_log->when = (time_t)ae->when;
  596. alarm_log->config_hash = strdupz((char *)config_hash_id);
  597. alarm_log->utc_offset = host->utc_offset;
  598. alarm_log->timezone = strdupz((char *)host->abbrev_timezone);
  599. alarm_log->exec_path = ae->exec ? strdupz((char *)ae->exec) : strdupz((char *)host->health_default_exec);
  600. alarm_log->conf_source = ae->source ? strdupz((char *)ae->source) : strdupz((char *)"");
  601. alarm_log->command = strdupz((char *)edit_command);
  602. alarm_log->duration = (time_t)ae->duration;
  603. alarm_log->non_clear_duration = (time_t)ae->non_clear_duration;
  604. alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status);
  605. alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status);
  606. alarm_log->delay = (int)ae->delay;
  607. alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp;
  608. alarm_log->last_repeat = (time_t)ae->last_repeat;
  609. alarm_log->silenced =
  610. ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp((char *)ae->recipient, "silent", 6))) ?
  611. 1 :
  612. 0;
  613. alarm_log->value_string = strdupz(ae->new_value_string);
  614. alarm_log->old_value_string = strdupz(ae->old_value_string);
  615. alarm_log->value = (!isnan(ae->new_value)) ? (calculated_number)ae->new_value : 0;
  616. alarm_log->old_value = (!isnan(ae->old_value)) ? (calculated_number)ae->old_value : 0;
  617. alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
  618. alarm_log->rendered_info = ae->info ? strdupz(ae->info) : strdupz((char *)"");
  619. freez(edit_command);
  620. }
  621. #endif
  622. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  623. static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark)
  624. {
  625. ALARM_ENTRY *ae = host->health_log.alarms;
  626. while (ae) {
  627. if (ae->alarm_id == alarm_id && ae->unique_id > mark &&
  628. (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL))
  629. return 1;
  630. ae = ae->next;
  631. }
  632. return 0;
  633. }
  634. #endif
  635. #define ALARM_EVENTS_PER_CHUNK 10
  636. void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
  637. {
  638. #ifndef ENABLE_NEW_CLOUD_PROTOCOL
  639. UNUSED(wc);
  640. UNUSED(cmd);
  641. #else
  642. UNUSED(cmd);
  643. // we perhaps we don't need this for snapshots
  644. if (unlikely(!wc->alert_updates)) {
  645. log_access("ACLK STA [%s (%s)]: Ignoring alert snapshot event, updates have been turned off for this node.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
  646. return;
  647. }
  648. if (unlikely(!wc->host)) {
  649. error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid);
  650. return;
  651. }
  652. if (unlikely(!wc->alerts_snapshot_id))
  653. return;
  654. char *claim_id = is_agent_claimed();
  655. if (unlikely(!claim_id))
  656. return;
  657. log_access("ACLK REQ [%s (%s)]: Sending alerts snapshot, snapshot_id %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", wc->alerts_snapshot_id);
  658. aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id);
  659. RRDHOST *host = wc->host;
  660. uint32_t cnt = 0;
  661. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  662. ALARM_ENTRY *ae = host->health_log.alarms;
  663. for (; ae; ae = ae->next) {
  664. if (likely(ae->updated_by_id))
  665. continue;
  666. if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  667. continue;
  668. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  669. continue;
  670. cnt++;
  671. }
  672. if (cnt) {
  673. uint32_t chunk = 1, chunks = 0;
  674. chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
  675. ae = host->health_log.alarms;
  676. cnt = 0;
  677. struct alarm_snapshot alarm_snap;
  678. alarm_snap.node_id = wc->node_id;
  679. alarm_snap.claim_id = claim_id;
  680. alarm_snap.snapshot_id = wc->alerts_snapshot_id;
  681. alarm_snap.chunks = chunks;
  682. alarm_snap.chunk = chunk;
  683. alarm_snapshot_proto_ptr_t snapshot_proto = NULL;
  684. for (; ae; ae = ae->next) {
  685. if (likely(ae->updated_by_id))
  686. continue;
  687. if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
  688. continue;
  689. if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
  690. continue;
  691. cnt++;
  692. struct alarm_log_entry alarm_log;
  693. alarm_log.node_id = wc->node_id;
  694. alarm_log.claim_id = claim_id;
  695. if (!snapshot_proto)
  696. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  697. health_alarm_entry2proto_nolock(&alarm_log, ae, host);
  698. add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
  699. if (cnt == ALARM_EVENTS_PER_CHUNK) {
  700. aclk_send_alarm_snapshot(snapshot_proto);
  701. cnt = 0;
  702. if (chunk < chunks) {
  703. chunk++;
  704. struct alarm_snapshot alarm_snap;
  705. alarm_snap.node_id = wc->node_id;
  706. alarm_snap.claim_id = claim_id;
  707. alarm_snap.snapshot_id = wc->alerts_snapshot_id;
  708. alarm_snap.chunks = chunks;
  709. alarm_snap.chunk = chunk;
  710. snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
  711. }
  712. }
  713. destroy_alarm_log_entry(&alarm_log);
  714. }
  715. if (cnt)
  716. aclk_send_alarm_snapshot(snapshot_proto);
  717. }
  718. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  719. wc->alerts_snapshot_id = 0;
  720. freez(claim_id);
  721. #endif
  722. return;
  723. }
  724. void sql_aclk_alert_clean_dead_entries(RRDHOST *host)
  725. {
  726. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  727. if (!claimed())
  728. return;
  729. if (unlikely(!host->dbsync_worker))
  730. return;
  731. char uuid_str[GUID_LEN + 1];
  732. uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
  733. BUFFER *sql = buffer_create(1024);
  734. buffer_sprintf(sql,"delete from aclk_alert_%s where alert_unique_id not in "
  735. " (select unique_id from health_log_%s); ", uuid_str, uuid_str);
  736. char *err_msg = NULL;
  737. int rc = sqlite3_exec(db_meta, buffer_tostring(sql), NULL, NULL, &err_msg);
  738. if (rc != SQLITE_OK) {
  739. error_report("Failed when trying to clean stale ACLK alert entries from aclk_alert_%s, error message \"%s""",
  740. uuid_str, err_msg);
  741. sqlite3_free(err_msg);
  742. }
  743. buffer_free(sql);
  744. #else
  745. UNUSED(host);
  746. #endif
  747. }
  748. int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status)
  749. {
  750. int rc;
  751. struct aclk_database_worker_config *wc = NULL;
  752. wc = (struct aclk_database_worker_config *)host->dbsync_worker;
  753. if (!wc)
  754. return 1;
  755. proto_alert_status->alert_updates = wc->alert_updates;
  756. proto_alert_status->alerts_batch_id = wc->alerts_batch_id;
  757. BUFFER *sql = buffer_create(1024);
  758. sqlite3_stmt *res = NULL;
  759. buffer_sprintf(sql, "SELECT MIN(sequence_id), MAX(sequence_id), " \
  760. "(select MAX(sequence_id) from aclk_alert_%s where date_cloud_ack is not NULL), " \
  761. "(select MAX(sequence_id) from aclk_alert_%s where date_submitted is not NULL) " \
  762. "FROM aclk_alert_%s where date_submitted is null;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
  763. rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
  764. if (rc != SQLITE_OK) {
  765. error_report("Failed to prepare statement to get alert log status from the database.");
  766. buffer_free(sql);
  767. return 1;
  768. }
  769. while (sqlite3_step(res) == SQLITE_ROW) {
  770. proto_alert_status->pending_min_sequence_id = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
  771. proto_alert_status->pending_max_sequence_id = sqlite3_column_bytes(res, 1) > 0 ? (uint64_t) sqlite3_column_int64(res, 1) : 0;
  772. proto_alert_status->last_acked_sequence_id = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
  773. proto_alert_status->last_submitted_sequence_id = sqlite3_column_bytes(res, 3) > 0 ? (uint64_t) sqlite3_column_int64(res, 3) : 0;
  774. }
  775. rc = sqlite3_finalize(res);
  776. if (unlikely(rc != SQLITE_OK))
  777. error_report("Failed to finalize statement to get alert log status from the database, rc = %d", rc);
  778. buffer_free(sql);
  779. return 0;
  780. }