Browse Source

Add 'funcs' capability (#13992)

* cleanup capas + add func capa

* make it const

* fixes

* freez
Timotej S 2 years ago
parent
commit
f289ba3449

+ 2 - 0
CMakeLists.txt

@@ -898,6 +898,8 @@ set(ACLK_FILES
         aclk/aclk_alarm_api.h
         aclk/aclk_contexts_api.c
         aclk/aclk_contexts_api.h
+        aclk/aclk_capas.c
+        aclk/aclk_capas.h
         aclk/schema-wrappers/connection.cc
         aclk/schema-wrappers/connection.h
         aclk/schema-wrappers/node_connection.cc

+ 2 - 0
Makefile.am

@@ -690,6 +690,8 @@ ACLK_FILES = \
     aclk/aclk_alarm_api.h \
     aclk/aclk_contexts_api.c \
     aclk/aclk_contexts_api.h \
+    aclk/aclk_capas.c \
+    aclk/aclk_capas.h \
     aclk/helpers/mqtt_wss_pal.h \
     aclk/helpers/ringbuffer_pal.h \
     aclk/schema-wrappers/connection.cc \

+ 5 - 16
aclk/aclk.c

@@ -13,6 +13,7 @@
 #include "aclk_rx_msgs.h"
 #include "https_client.h"
 #include "schema-wrappers/schema_wrappers.h"
+#include "aclk_capas.h"
 
 #include "aclk_proxy.h"
 
@@ -779,14 +780,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
     node_state_update.node_id = mallocz(UUID_STR_LEN);
     uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
 
-    struct capability caps[] = {
-        { .name = "proto", .version = 1,                     .enabled = 1 },
-        { .name = "ml",    .version = ml_capable(localhost), .enabled = ml_enabled(host) },
-        { .name = "mc",    .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
-        { .name = "ctx",   .version = 1,                     .enabled = 1 },
-        { .name = NULL,    .version = 0,                     .enabled = 0 }
-    };
-    node_state_update.capabilities = caps;
+    node_state_update.capabilities = aclk_get_agent_capas();
 
     rrdhost_aclk_state_lock(localhost);
     node_state_update.claim_id = localhost->aclk_state.claimed_id;
@@ -825,14 +819,7 @@ void aclk_send_node_instances()
             uuid_unparse_lower(list->host_id, host_id);
 
             RRDHOST *host = rrdhost_find_by_guid(host_id);
-            struct capability caps[] = {
-                { .name = "proto", .version = 1,                     .enabled = 1 },
-                { .name = "ml",    .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
-                { .name = "mc",    .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
-                { .name = "ctx",   .version = 1,                     .enabled = 1 },
-                { .name = NULL,    .version = 0,                     .enabled = 0 }
-            };
-            node_state_update.capabilities = caps;
+            node_state_update.capabilities = aclk_get_node_instance_capas(host);
 
             rrdhost_aclk_state_lock(localhost);
             node_state_update.claim_id = localhost->aclk_state.claimed_id;
@@ -841,6 +828,8 @@ void aclk_send_node_instances()
             info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
                  list->live,
                  list->hops);
+
+            freez((void*)node_state_update.capabilities);
             freez((void*)node_state_update.node_id);
             query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
             query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;

+ 47 - 0
aclk/aclk_capas.c

@@ -0,0 +1,47 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk_capas.h"
+
+#include "ml/ml.h"
+
+const struct capability *aclk_get_agent_capas()
+{
+    static struct capability agent_capabilities[] = {
+        { .name = "json",  .version = 2, .enabled = 0 },
+        { .name = "proto", .version = 1, .enabled = 1 },
+        { .name = "ml",    .version = 0, .enabled = 0 },
+        { .name = "mc",    .version = 0, .enabled = 0 },
+        { .name = "ctx",   .version = 1, .enabled = 1 },
+        { .name = "funcs", .version = 1, .enabled = 1 },
+        { .name = NULL,    .version = 0, .enabled = 0 }
+    };
+    agent_capabilities[2].version = ml_capable() ? 1 : 0;
+    agent_capabilities[2].enabled = ml_enabled(localhost);
+
+    agent_capabilities[3].version = enable_metric_correlations ? metric_correlations_version : 0;
+    agent_capabilities[3].enabled = enable_metric_correlations;
+
+    return agent_capabilities;
+}
+
+struct capability *aclk_get_node_instance_capas(RRDHOST *host)
+{
+    struct capability ni_caps[] = {
+        { .name = "proto", .version = 1,                     .enabled = 1 },
+        { .name = "ml",    .version = ml_capable(),          .enabled = ml_enabled(host) },
+        { .name = "mc",
+          .version = enable_metric_correlations ? metric_correlations_version : 0,
+          .enabled = enable_metric_correlations },
+        { .name = "ctx",   .version = 1,                     .enabled = 1 },
+        { .name = "funcs", .version = 0,                     .enabled = 0 },
+        { .name = NULL,    .version = 0,                     .enabled = 0 }
+    };
+    if (host->receiver && stream_has_capability(host->receiver, STREAM_CAP_FUNCTIONS)) {
+        ni_caps[4].version = 1;
+        ni_caps[4].enabled = 1;
+    }
+
+    struct capability *ret = mallocz(sizeof(ni_caps));
+    memcpy(ret, ni_caps, sizeof(ni_caps));
+    return ret;
+}

+ 14 - 0
aclk/aclk_capas.h

@@ -0,0 +1,14 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_CAPAS_H
+#define ACLK_CAPAS_H
+
+#include "daemon/common.h"
+#include "libnetdata/libnetdata.h"
+
+#include "schema-wrappers/capability.h"
+
+const struct capability *aclk_get_agent_capas();
+struct capability *aclk_get_node_instance_capas(RRDHOST *host);
+
+#endif /* ACLK_CAPAS_H */

+ 4 - 8
aclk/aclk_rx_msgs.c

@@ -5,6 +5,7 @@
 #include "aclk_stats.h"
 #include "aclk_query_queue.h"
 #include "aclk.h"
+#include "aclk_capas.h"
 
 #include "schema-wrappers/proto_2_json.h"
 
@@ -289,20 +290,15 @@ int create_node_instance_result(const char *msg, size_t msg_len)
         }
     }
 
-    struct capability caps[] = {
-        { .name = "proto", .version = 1,                     .enabled = 1 },
-        { .name = "ml",    .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
-        { .name = "mc",    .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
-        { .name = "ctx",   .version = 1,                     .enabled = 1 },
-        { .name = NULL,    .version = 0,                     .enabled = 0 }
-    };
-    node_state_update.capabilities = caps;
+    node_state_update.capabilities = aclk_get_node_instance_capas(host);
 
     rrdhost_aclk_state_lock(localhost);
     node_state_update.claim_id = localhost->aclk_state.claimed_id;
     query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
     rrdhost_aclk_state_unlock(localhost);
 
+    freez((void *)node_state_update.capabilities);
+
     query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
     query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
 

+ 2 - 12
aclk/aclk_tx_msgs.c

@@ -5,6 +5,7 @@
 #include "aclk_util.h"
 #include "aclk_stats.h"
 #include "aclk.h"
+#include "aclk_capas.h"
 
 #include "schema-wrappers/proto_2_json.h"
 
@@ -211,22 +212,11 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
     size_t len;
     uint16_t pid;
 
-    struct capability agent_capabilities[] = {
-        { .name = "json",  .version = 2, .enabled = 0 },
-        { .name = "proto", .version = 1, .enabled = 1 },
-#ifdef ENABLE_ML
-        { .name = "ml",    .version = 1, .enabled = ml_enabled(localhost) },
-#endif
-        { .name = "mc",    .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
-        { .name = "ctx",   .version = 1, .enabled = 1 },
-        { .name = NULL,    .version = 0, .enabled = 0 }
-    };
-
     update_agent_connection_t conn = {
         .reachable = (reachable ? 1 : 0),
         .lwt = 0,
         .session_id = aclk_session_newarch,
-        .capabilities = agent_capabilities
+        .capabilities = aclk_get_agent_capas()
     };
 
     rrdhost_aclk_state_lock(localhost);

+ 1 - 1
aclk/schema-wrappers/capability.cc

@@ -4,7 +4,7 @@
 
 #include "capability.h"
 
-void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa) {
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa) {
     proto_capa->set_name(c_capa->name);
     proto_capa->set_enabled(c_capa->enabled);
     proto_capa->set_version(c_capa->version);

+ 1 - 1
aclk/schema-wrappers/capability.h

@@ -18,7 +18,7 @@ struct capability {
 
 #include "proto/aclk/v1/lib.pb.h"
 
-void capability_set(aclk_lib::v1::Capability *proto_capa, struct capability *c_capa);
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa);
 #endif
 
 #endif /* ACLK_SCHEMA_CAPABILITY_H */

+ 1 - 1
aclk/schema-wrappers/connection.cc

@@ -29,7 +29,7 @@ char *generate_update_agent_connection(size_t *len, const update_agent_connectio
     timestamp->set_nanos(tv.tv_usec * 1000);
 
     if (data->capabilities) {
-        struct capability *capa = data->capabilities;
+        const struct capability *capa = data->capabilities;
         while (capa->name) {
             aclk_lib::v1::Capability *proto_capa = connupd.add_capabilities();
             capability_set(proto_capa, capa);

Some files were not shown because too many files changed in this diff