Browse Source

Add snapshot message and calls to sql_queue_removed_alerts_to_aclk (#11664)

Emmanuel Vasilakis 3 years ago
parent
commit
0882ed03b4

+ 2 - 0
aclk/aclk.c

@@ -25,6 +25,8 @@
 
 int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
 
+int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload
+
 time_t aclk_block_until = 0;
 
 aclk_env_t *aclk_env = NULL;

+ 1 - 0
aclk/aclk_api.h

@@ -18,6 +18,7 @@ extern int aclk_disable_runtime;
 extern int aclk_disable_single_updates;
 
 extern int aclk_stats_enabled;
+extern int aclk_alert_reloaded;
 
 extern int aclk_ng;
 

+ 11 - 0
aclk/aclk_rx_msgs.c

@@ -398,6 +398,17 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t
         freez(config_hash);
         return;
     }
+    if (!strcmp(message_type, "SendAlarmSnapshot")) {
+        struct send_alarm_snapshot *sas = parse_send_alarm_snapshot(msg, msg_len);
+        if (!sas->node_id || !sas->claim_id) {
+            error("Error parsing SendAlarmSnapshot");
+            destroy_send_alarm_snapshot(sas);
+            return;
+        }
+        aclk_process_send_alarm_snapshot(sas->node_id, sas->claim_id, sas->snapshot_id, sas->sequence_id);
+        destroy_send_alarm_snapshot(sas);
+        return;
+    }
     error ("Unknown new cloud arch message type received \"%s\"", message_type);
 }
 #endif

+ 4 - 0
database/sqlite/sqlite_aclk.c

@@ -412,6 +412,10 @@ void aclk_database_worker(void *arg)
                     debug(D_ACLK_SYNC, "Pushing alarm health log to the cloud for %s", wc->host_guid);
                     aclk_push_alarm_health_log(wc, cmd);
                     break;
+                case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:
+                    debug(D_ACLK_SYNC, "Pushing alert snapshot to the cloud for node %s", wc->host_guid);
+                    aclk_push_alert_snapshot_event(wc, cmd);
+                    break;
 
 // NODE OPERATIONS
                 case ACLK_DATABASE_NODE_INFO:

+ 3 - 0
database/sqlite/sqlite_aclk.h

@@ -124,6 +124,7 @@ enum aclk_database_opcode {
     ACLK_DATABASE_NODE_INFO,
     ACLK_DATABASE_PUSH_ALERT,
     ACLK_DATABASE_PUSH_ALERT_CONFIG,
+    ACLK_DATABASE_PUSH_ALERT_SNAPSHOT,
     ACLK_DATABASE_PUSH_CHART,
     ACLK_DATABASE_PUSH_CHART_CONFIG,
     ACLK_DATABASE_RESET_CHART,
@@ -170,6 +171,8 @@ struct aclk_database_worker_config {
     uint64_t alerts_batch_id; // batch id for alerts to use
     uint64_t alerts_start_seq_id; // cloud has asked to start streaming from
     uint64_t alert_sequence_id; // last alert sequence_id
+    uint64_t alerts_snapshot_id; //will contain the snapshot_id value if snapshot was requested
+    uint64_t alerts_ack_sequence_id; //last sequence_id ack'ed from cloud via sendsnapshot message
     uv_loop_t *loop;
     RRDHOST *host;
     uv_async_t async;

+ 245 - 1
database/sqlite/sqlite_aclk_alert.c

@@ -147,7 +147,11 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
             sql,
             "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64
             "; UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id < %"PRIu64
-            " and date_cloud_ack is null",
+            " and date_cloud_ack is null "
+            "; UPDATE aclk_alert_%s SET date_submitted = strftime('%%s','now') WHERE sequence_id < %"PRIu64
+            " and date_submitted is null",
+            wc->uuid_str,
+            wc->alerts_start_seq_id,
             wc->uuid_str,
             wc->alerts_start_seq_id,
             wc->uuid_str,
@@ -556,6 +560,11 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
 
 int sql_queue_removed_alerts_to_aclk(RRDHOST *host)
 {
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+    if (!aclk_use_new_cloud_arch) {
+        return 0;
+    }
+
     CHECK_SQLITE_CONNECTION(db_meta);
 
     struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker;
@@ -574,6 +583,241 @@ int sql_queue_removed_alerts_to_aclk(RRDHOST *host)
     db_execute(buffer_tostring(sql));
 
     buffer_free(sql);
+#else
+    UNUSED(host);
+#endif
+    return 0;
+}
+
+void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id)
+{
+    UNUSED(claim_id);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+    if (unlikely(!node_id))
+        return;
+
+    uuid_t node_uuid;
+    if (uuid_parse(node_id, node_uuid))
+        return;
+
+    struct aclk_database_worker_config *wc = NULL;
+    rrd_wrlock();
+    RRDHOST *host = find_host_by_node_id(node_id);
+    if (likely(host))
+        wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+    rrd_unlock();
+
+    if (likely(wc)) {
+        info(
+            "Send alerts snapshot requested for %s with snapshot_id %" PRIu64 " and ack sequence_id %" PRIu64,
+            node_id,
+            snapshot_id,
+            sequence_id);
+        __sync_synchronize();
+        wc->alerts_snapshot_id = snapshot_id;
+        wc->alerts_ack_sequence_id = sequence_id;
+        __sync_synchronize();
+
+        struct aclk_database_cmd cmd;
+        memset(&cmd, 0, sizeof(cmd));
+        cmd.opcode = ACLK_DATABASE_PUSH_ALERT_SNAPSHOT;
+        cmd.data_param = NULL;
+        cmd.completion = NULL;
+        aclk_database_enq_cmd(wc, &cmd);
+    } else
+        error("ACLK synchronization thread is not active for host %s", host->hostname);
+#else
+    UNUSED(node_id);
+    UNUSED(snapshot_id);
+    UNUSED(sequence_id);
+#endif
+    return;
+}
+
+void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id)
+{
+    BUFFER *sql = buffer_create(1024);
+
+    if (alerts_ack_sequence_id != 0) {
+        buffer_sprintf(
+            sql,
+            "UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id <= %" PRIu64 "",
+            uuid_str,
+            alerts_ack_sequence_id);
+        db_execute(buffer_tostring(sql));
+    }
+
+    buffer_free(sql);
+}
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
+{
+    char *edit_command = ae->source ? health_edit_command_from_source(ae->source) : strdupz("UNKNOWN=0");
+    char config_hash_id[GUID_LEN + 1];
+    uuid_unparse_lower(ae->config_hash_id, config_hash_id);
+
+    alarm_log->chart = strdupz((char *)ae->chart);
+    alarm_log->name = strdupz((char *)ae->name);
+    alarm_log->family = strdupz((char *)ae->family);
+
+    alarm_log->batch_id = 0;
+    alarm_log->sequence_id = 0;
+    alarm_log->when = (time_t)ae->when;
+
+    alarm_log->config_hash = strdupz((char *)config_hash_id);
+
+    alarm_log->utc_offset = host->utc_offset;
+    alarm_log->timezone = strdupz((char *)host->abbrev_timezone);
+    alarm_log->exec_path = ae->exec ? strdupz((char *)ae->exec) : strdupz((char *)host->health_default_exec);
+    alarm_log->conf_source = strdupz(ae->source);
+
+    alarm_log->command = strdupz(edit_command);
+
+    alarm_log->duration = (time_t)ae->duration;
+    alarm_log->non_clear_duration = (time_t)ae->non_clear_duration;
+    alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status);
+    alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status);
+    alarm_log->delay = (int)ae->delay;
+    alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp;
+    alarm_log->last_repeat = (time_t)ae->last_repeat;
+
+    alarm_log->silenced =
+        ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp((char *)ae->recipient, "silent", 6))) ?
+            1 :
+            0;
+
+    alarm_log->value_string = strdupz(ae->new_value_string);
+    alarm_log->old_value_string = strdupz(ae->old_value_string);
+
+    alarm_log->value = (!isnan(ae->new_value)) ? (calculated_number)ae->new_value : 0;
+    alarm_log->old_value = (!isnan(ae->old_value)) ? (calculated_number)ae->old_value : 0;
+
+    alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
+    alarm_log->rendered_info = strdupz(ae->info);
+
+    freez(edit_command);
+}
+#endif
+
+static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark)
+{
+    ALARM_ENTRY *ae = host->health_log.alarms;
+
+    while (ae) {
+        if (ae->alarm_id == alarm_id && ae->unique_id > mark &&
+            (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL))
+            return 1;
+        ae = ae->next;
+    }
 
     return 0;
 }
+
+#define ALARM_EVENTS_PER_CHUNK 10
+void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+{
+#ifndef ENABLE_NEW_CLOUD_PROTOCOL
+    UNUSED(wc);
+    UNUSED(cmd);
+#else
+    UNUSED(cmd);
+    // we perhaps we don't need this for snapshots
+    if (unlikely(!wc->alert_updates)) {
+        debug(D_ACLK_SYNC, "Ignoring alert push snapshot event, updates have been turned off for node %s", wc->node_id);
+        return;
+    }
+
+    char *claim_id = is_agent_claimed();
+    if (unlikely(!claim_id))
+        return;
+
+    aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id);
+
+    RRDHOST *host = wc->host;
+    uint32_t cnt = 0;
+
+    netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
+
+    ALARM_ENTRY *ae = host->health_log.alarms;
+
+    for (; ae; ae = ae->next) {
+        if (likely(ae->updated_by_id))
+            continue;
+
+        if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
+            continue;
+
+        if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
+            continue;
+
+        cnt++;
+    }
+
+    if (cnt) {
+        uint32_t chunk = 1, chunks = 0;
+
+        chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
+        ae = host->health_log.alarms;
+
+        cnt = 0;
+        struct alarm_snapshot alarm_snap;
+        alarm_snap.node_id = wc->node_id;
+        alarm_snap.claim_id = claim_id;
+        alarm_snap.snapshot_id = wc->alerts_snapshot_id;
+        alarm_snap.chunks = chunks;
+        alarm_snap.chunk = chunk;
+
+        alarm_snapshot_proto_ptr_t snapshot_proto;
+        snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
+
+        for (; ae; ae = ae->next) {
+            if (likely(ae->updated_by_id))
+                continue;
+
+            if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
+                continue;
+
+            if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
+                continue;
+
+            struct alarm_log_entry alarm_log;
+            alarm_log.node_id = wc->node_id;
+            alarm_log.claim_id = claim_id;
+
+            health_alarm_entry2proto_nolock(&alarm_log, ae, host);
+            add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
+
+            cnt++;
+
+            if (cnt == ALARM_EVENTS_PER_CHUNK) {
+                aclk_send_alarm_snapshot(snapshot_proto);
+
+                cnt = 0;
+
+                if (chunk < chunks) {
+                    chunk++;
+
+                    struct alarm_snapshot alarm_snap;
+                    alarm_snap.node_id = wc->node_id;
+                    alarm_snap.claim_id = claim_id;
+                    alarm_snap.snapshot_id = wc->alerts_snapshot_id;
+                    alarm_snap.chunks = chunks;
+                    alarm_snap.chunk = chunk;
+
+                    snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
+                }
+            }
+            destroy_alarm_log_entry(&alarm_log);
+        }
+        if (cnt)
+            aclk_send_alarm_snapshot(snapshot_proto);
+    }
+
+    netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
+    wc->alerts_snapshot_id = 0;
+
+    freez(claim_id);
+#endif
+    return;
+}

+ 2 - 0
database/sqlite/sqlite_aclk_alert.h

@@ -13,5 +13,7 @@ void aclk_send_alarm_configuration (char *config_hash);
 int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
 void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id);
 int sql_queue_removed_alerts_to_aclk(RRDHOST *host);
+void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
+void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id);
 
 #endif //NETDATA_SQLITE_ACLK_ALERT_H

+ 10 - 0
health/health.c

@@ -230,6 +230,9 @@ void health_reload(void) {
     if (netdata_cloud_setting) {
         aclk_single_update_enable();
         aclk_alarm_reload();
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+        aclk_alert_reloaded = 1;
+#endif
     }
 #endif
 }
@@ -1035,6 +1038,13 @@ void *health_main(void *ptr) {
                 rrdhost_unlock(host);
             }
 
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+            if (aclk_alert_reloaded) {
+                sql_queue_removed_alerts_to_aclk(host);
+                aclk_alert_reloaded = 0;
+            }
+#endif
+
             if (unlikely(netdata_exit))
                 break;