Просмотр исходного кода

ACLK-NG New Cloud NodeInstance related msgs (#11234)

Adds new cloud arch NodeInstance messages as per design.

Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Timotej S 3 лет назад
Родитель
Сommit
d005dee558
10 измененных файлов с 374 добавлено и 33 удалено
  1. 3 0
      .gitmodules
  2. 58 1
      CMakeLists.txt
  3. 50 6
      Makefile.am
  4. 1 0
      aclk/aclk-schemas
  5. 173 25
      aclk/aclk.c
  6. 4 0
      aclk/aclk.h
  7. 13 0
      aclk/aclk_api.c
  8. 2 0
      aclk/aclk_api.h
  9. 59 1
      aclk/aclk_query.c
  10. 11 0
      aclk/aclk_query_queue.c

+ 3 - 0
.gitmodules

@@ -1,3 +1,6 @@
 [submodule "mqtt_websockets"]
 	path = mqtt_websockets
 	url = https://github.com/underhood/mqtt_websockets.git
+[submodule "aclk/aclk-schemas"]
+	path = aclk/aclk-schemas
+	url = https://github.com/netdata/aclk-schemas.git

+ 58 - 1
CMakeLists.txt

@@ -796,6 +796,14 @@ set(ACLK_NG_FILES
         mqtt_websockets/c-rbuf/src/ringbuffer_internal.h
         mqtt_websockets/MQTT-C/src/mqtt.c
         mqtt_websockets/MQTT-C/include/mqtt.h
+        aclk/schema-wrappers/connection.cc
+        aclk/schema-wrappers/connection.h
+        aclk/schema-wrappers/node_connection.cc
+        aclk/schema-wrappers/node_connection.h
+        aclk/schema-wrappers/node_creation.cc
+        aclk/schema-wrappers/node_creation.h
+        aclk/schema-wrappers/schema_wrappers.h
+        aclk/schema-wrappers/schema_wrapper_utils.h
         )
 
 set(SPAWN_PLUGIN_FILES
@@ -1038,9 +1046,58 @@ ELSE()
     message(STATUS "agent-cloud-link Legacy: disabled")
 ENDIF()
 
+find_package(Protobuf REQUIRED)
+
+function(PROTOBUF_ACLK_GENERATE_CPP SRCS HDRS)
+  if(NOT ARGN)
+    message(SEND_ERROR "Error: PROTOBUF_ACLK_GENERATE_CPP() called without any proto files")
+    return()
+  endif()
+
+  set(${SRCS})
+  set(${HDRS})
+  foreach(FIL ${ARGN})
+    get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
+    get_filename_component(DIR ${ABS_FIL} DIRECTORY)
+    get_filename_component(FIL_WE ${FIL} NAME_WE)
+    set(GENERATED_PB_CC "${DIR}/${FIL_WE}.pb.cc")
+    set(GENERATED_PB_H "${DIR}/${FIL_WE}.pb.h")
+# cmake > 3.20 required :(
+#    cmake_path(SET GENERATED_PB_CC "${DIR}")
+#    cmake_path(SET GENERATED_PB_H "${DIR}")
+#    cmake_path(APPEND GENERATED_PB_CC "${FIL_WE}.pb.cc")
+#    cmake_path(APPEND GENERATED_PB_H "${FIL_WE}.pb.h")
+
+    list(APPEND ${SRCS} ${GENERATED_PB_CC})
+    list(APPEND ${HDRS} ${GENERATED_PB_H})
+    add_custom_command(
+      OUTPUT ${GENERATED_PB_CC}
+             ${GENERATED_PB_H}
+      COMMAND  ${PROTOBUF_PROTOC_EXECUTABLE}
+      ARGS -I=${CMAKE_SOURCE_DIR}/aclk/aclk-schemas --cpp_out=${CMAKE_SOURCE_DIR}/aclk/aclk-schemas ${ABS_FIL}
+      DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE}
+      COMMENT "Running C++ protocol buffer compiler on ${FIL}"
+      VERBATIM )
+  endforeach()
+  set_source_files_properties(${${SRCS}} ${${HDRS}} PROPERTIES GENERATED TRUE)
+  set(${SRCS} ${${SRCS}} PARENT_SCOPE)
+  set(${HDRS} ${${HDRS}} PARENT_SCOPE)
+endfunction()
+
+set(ACLK_NG_PROTO_DEFS
+        aclk/aclk-schemas/proto/agent/v1/connection.proto
+        aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto
+        aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto
+        )
+PROTOBUF_ACLK_GENERATE_CPP(ACLK_NG_PROTO_BUILT_SRCS ACLK_NG_PROTO_BUILT_HDRS ${ACLK_NG_PROTO_DEFS})
+
+list(APPEND NETDATA_COMMON_LIBRARIES ${PROTOBUF_LIBRARIES})
+list(APPEND NETDATA_COMMON_INCLUDE_DIRS ${PROTOBUF_INCLUDE_DIRS})
+list(APPEND NETDATA_COMMON_CFLAGS ${PROTOBUF_CFLAGS_OTHER})
 list(APPEND NETDATA_FILES ${ACLK_ALWAYS_BUILD})
-list(APPEND NETDATA_FILES ${ACLK_NG_FILES})
+list(APPEND NETDATA_FILES ${ACLK_NG_FILES} ${ACLK_NG_PROTO_BUILT_SRCS} ${ACLK_NG_PROTO_BUILT_HDRS})
 list(APPEND NETDATA_FILES ${ACLK_COMMON_FILES})
+include_directories(BEFORE ${CMAKE_SOURCE_DIR}/aclk/aclk-schemas)
 include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/MQTT-C/include)
 include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/src/include)
 include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/c-rbuf/include)

+ 50 - 6
Makefile.am

@@ -3,6 +3,9 @@
 AUTOMAKE_OPTIONS = foreign subdir-objects 1.11
 ACLOCAL_AMFLAGS = -I build/m4
 
+nodist_netdata_SOURCES=$(NULL)
+BUILT_SOURCES=$(NULL)
+
 MAINTAINERCLEANFILES = \
     config.log config.status \
     $(srcdir)/Makefile.in \
@@ -572,7 +575,39 @@ ACLK_NG_FILES = \
     mqtt_websockets/c-rbuf/src/ringbuffer_internal.h \
     mqtt_websockets/MQTT-C/src/mqtt.c \
     mqtt_websockets/MQTT-C/include/mqtt.h \
+    aclk/schema-wrappers/connection.cc \
+    aclk/schema-wrappers/connection.h \
+    aclk/schema-wrappers/node_connection.cc \
+    aclk/schema-wrappers/node_connection.h \
+    aclk/schema-wrappers/node_creation.cc \
+    aclk/schema-wrappers/node_creation.h \
+    aclk/schema-wrappers/schema_wrappers.h \
+    aclk/schema-wrappers/schema_wrapper_utils.h \
+    $(NULL)
+
+ACLK_NG_PROTO_BUILT_FILES = aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \
+    aclk/aclk-schemas/proto/agent/v1/connection.pb.h \
+    aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.cc \
+    aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.h \
+    aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \
+    aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h \
     $(NULL)
+
+BUILT_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES)
+nodist_netdata_SOURCES += $(ACLK_NG_PROTO_BUILT_FILES)
+
+aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \
+aclk/aclk-schemas/proto/agent/v1/connection.pb.h: aclk/aclk-schemas/proto/agent/v1/connection.proto
+	$(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
+aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.cc \
+aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.pb.h: aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto
+	$(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
+aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.cc \
+aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.pb.h: aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto
+	$(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
 endif #ACLK_NG
 
 if ENABLE_ACLK
@@ -784,12 +819,15 @@ netdata_LDADD = \
     $(NETDATA_COMMON_LIBS) \
     $(NULL)
 
+if ACLK_NG
+    netdata_LDADD += $(OPTIONAL_PROTOBUF_LIBS)
+endif
+
 if ACLK_LEGACY
 netdata_LDADD += \
     $(abs_top_srcdir)/externaldeps/mosquitto/libmosquitto.a \
     $(OPTIONAL_LIBCAP_LIBS) \
     $(OPTIONAL_LWS_LIBS) \
-    $(NETDATA_COMMON_LIBS) \
     $(NULL)
 endif #ACLK_LEGACY
 
@@ -899,12 +937,15 @@ endif
 
 if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE
     netdata_SOURCES += $(PROMETHEUS_REMOTE_WRITE_BACKEND_FILES) $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES)
-    netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS)
-    BUILT_SOURCES = \
+    netdata_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) \
+        $(OPTIONAL_PROTOBUF_LIBS) \
+        $(NULL)
+    BACKEND_PROMETHEUS_BUILT_SOURCES = \
         exporting/prometheus/remote_write/remote_write.pb.cc \
         exporting/prometheus/remote_write/remote_write.pb.h \
         $(NULL)
-    nodist_netdata_SOURCES = $(BUILT_SOURCES)
+    BUILT_SOURCES += $(BACKEND_PROMETHEUS_BUILT_SOURCES)
+    nodist_netdata_SOURCES += $(BACKEND_PROMETHEUS_BUILT_SOURCES)
 
 exporting/prometheus/remote_write/remote_write.pb.cc \
 exporting/prometheus/remote_write/remote_write.pb.h: exporting/prometheus/remote_write/remote_write.proto
@@ -1033,14 +1074,17 @@ if ENABLE_UNITTESTS
     exporting_tests_exporting_engine_testdriver_LDADD = $(NETDATA_COMMON_LIBS) $(TEST_LIBS)
 if ENABLE_BACKEND_PROMETHEUS_REMOTE_WRITE
     exporting_tests_exporting_engine_testdriver_SOURCES += $(PROMETHEUS_REMOTE_WRITE_EXPORTING_FILES)
-    exporting_tests_exporting_engine_testdriver_LDADD += $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS)
+    exporting_tests_exporting_engine_testdriver_LDADD += \
+        $(OPTIONAL_PROMETHEUS_REMOTE_WRITE_LIBS) \
+        $(OPTIONAL_PROTOBUF_LIBS) \
+        $(NULL)
     exporting_tests_exporting_engine_testdriver_LDFLAGS += \
         -Wl,--wrap=init_write_request \
         -Wl,--wrap=add_host_info \
         -Wl,--wrap=add_label \
         -Wl,--wrap=add_metric \
         $(NULL)
-    nodist_exporting_tests_exporting_engine_testdriver_SOURCES = $(BUILT_SOURCES)
+    nodist_exporting_tests_exporting_engine_testdriver_SOURCES = $(BACKEND_PROMETHEUS_BUILT_SOURCES)
 endif
 if ENABLE_BACKEND_KINESIS
     exporting_tests_exporting_engine_testdriver_SOURCES += $(KINESIS_EXPORTING_FILES)

+ 1 - 0
aclk/aclk-schemas

@@ -0,0 +1 @@
+Subproject commit b5fef3f3a84e6a5013b36b906f4677012c734416

+ 173 - 25
aclk/aclk.c

@@ -223,6 +223,45 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
     aclk_handle_cloud_message(cmsg);
 }
 
+
+static void msg_callback_new(const char *topic, const void *msg, size_t msglen, int qos)
+{
+    if (msglen > RX_MSGLEN_MAX)
+        error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
+
+    debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
+
+    if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
+        error("Link is shutting down. Ignoring message.");
+        return;
+    }
+
+    const char *msgtype = strrchr(topic, '/');
+    if (unlikely(!msgtype)) {
+        error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
+        return;
+    }
+    msgtype++;
+    if (unlikely(!*msgtype)) {
+        error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
+        return;
+    }
+
+#ifdef ACLK_LOG_CONVERSATION_DIR
+#define FN_MAX_LEN 512
+    char filename[FN_MAX_LEN];
+    int logfd;
+    snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
+    logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
+    if(logfd < 0)
+        error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
+    write(logfd, msg, msglen);
+    close(logfd);
+#endif
+
+    aclk_handle_new_cloud_msg(msgtype, msg, msglen);
+}
+
 static void puback_callback(uint16_t packet_id)
 {
     if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
@@ -306,11 +345,6 @@ static inline void queue_connect_payloads(void)
 
 static inline void mqtt_connected_actions(mqtt_wss_client client)
 {
-    // TODO global vars?
-    usec_t now = now_realtime_usec();
-    aclk_session_sec = now / USEC_PER_SEC;
-    aclk_session_us = now % USEC_PER_SEC;
-
     const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
 
     if (!topic)
@@ -318,16 +352,28 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
     else
         mqtt_wss_subscribe(client, topic, 1);
 
+    if (aclk_use_new_cloud_arch) {
+        topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
+        if (!topic)
+            error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
+        else
+            mqtt_wss_subscribe(client, topic, 1);
+    }
+
     aclk_stats_upd_online(1);
     aclk_connected = 1;
     aclk_pubacks_per_conn = 0;
 
-    ACLK_SHARED_STATE_LOCK;
-    if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
-        error("Sending `connect` payload immediately as popcorning was finished already.");
-        queue_connect_payloads();
+    if (!aclk_use_new_cloud_arch) {
+        ACLK_SHARED_STATE_LOCK;
+        if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
+            error("Sending `connect` payload immediately as popcorning was finished already.");
+            queue_connect_payloads();
+        }
+        ACLK_SHARED_STATE_UNLOCK;
+    } else {
+        aclk_send_agent_connection_update(client, 1);
     }
-    ACLK_SHARED_STATE_UNLOCK;
 }
 
 /* Waits until agent is ready or needs to exit
@@ -337,10 +383,13 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
  * @return  0 - Popcorning Finished - Agent STABLE,
  *         !0 - netdata_exit
  */
-static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_threads *query_threads)
+static int wait_popcorning_finishes()
 {
     time_t elapsed;
     int need_wait;
+    if (aclk_use_new_cloud_arch)
+        return 0;
+
     while (!netdata_exit) {
         ACLK_SHARED_STATE_LOCK;
         if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) {
@@ -352,9 +401,6 @@ static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_th
             aclk_shared_state.agent_state = ACLK_HOST_STABLE;
             ACLK_SHARED_STATE_UNLOCK;
             error("ACLK localhost popocorn finished");
-            if (unlikely(!query_threads->thread_list))
-                aclk_query_threads_start(query_threads, client);
-            queue_connect_payloads();
             return 0;
         }
         ACLK_SHARED_STATE_UNLOCK;
@@ -370,7 +416,11 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
     error("Preparing to Gracefully Shutdown the ACLK");
     aclk_queue_lock();
     aclk_queue_flush();
-    aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+    if (aclk_use_new_cloud_arch)
+        aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
+    else
+        aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
+
     time_t t = now_monotonic_sec();
     while (!mqtt_wss_service(client, 100)) {
         if (now_monotonic_sec() - t >= 2) {
@@ -481,7 +531,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
     url_t mqtt_url;
 #endif
 
-    json_object *lwt;
+    json_object *lwt = NULL;
 
     while (!netdata_exit) {
         char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
@@ -546,7 +596,11 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
 
         // aclk_get_topic moved here as during OTP we
         // generate the topic cache
-        mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+        if (aclk_use_new_cloud_arch)
+            mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
+        else
+            mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+
         if (!mqtt_conn_params.will_topic) {
             error("Couldn't get LWT topic. Will not send LWT.");
             continue;
@@ -567,9 +621,17 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
         }
 #endif
 
-        lwt = aclk_generate_disconnect(NULL);
-        mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
-        mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+        aclk_session_newarch = now_realtime_usec();
+        aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
+        aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
+
+        if (aclk_use_new_cloud_arch) {
+            mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
+        } else {
+            lwt = aclk_generate_disconnect(NULL);
+            mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
+            mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
+        }
 
 #ifdef ACLK_DISABLE_CHALLENGE
         ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
@@ -583,7 +645,10 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
         freez((char*)mqtt_conn_params.username);
 #endif
 
-        json_object_put(lwt);
+        if (aclk_use_new_cloud_arch)
+            freez((char *)mqtt_conn_params.will_msg);
+        else
+            json_object_put(lwt);
 
         if (!ret) {
             info("MQTTWSS connection succeeded");
@@ -609,6 +674,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
  */
 void *aclk_main(void *ptr)
 {
+#ifdef ACLK_NEWARCH_DEVMODE
+    aclk_use_new_cloud_arch = 1;
+#endif
     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
 
     struct aclk_stats_thread *stats_thread = NULL;
@@ -642,7 +710,7 @@ void *aclk_main(void *ptr)
     if (wait_till_agent_claim_ready())
         goto exit;
 
-    if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
+    if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, (aclk_use_new_cloud_arch ? msg_callback_new : msg_callback), puback_callback))) {
         error("Couldn't initialize MQTT_WSS network library");
         goto exit;
     }
@@ -666,8 +734,14 @@ void *aclk_main(void *ptr)
         // warning this assumes the popcorning is relative short (3s)
         // if that changes call mqtt_wss_service from within
         // to keep OpenSSL, WSS and MQTT connection alive
-        if (wait_popcorning_finishes(mqttwss_client, &query_threads))
+        if (wait_popcorning_finishes())
             goto exit_full;
+        
+        if (unlikely(!query_threads.thread_list))
+            aclk_query_threads_start(&query_threads, mqttwss_client);
+
+        if (!aclk_use_new_cloud_arch)
+            queue_connect_payloads();
 
         if (!handle_connection(mqttwss_client)) {
             aclk_stats_upd_online(0);
@@ -775,7 +849,7 @@ void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *m
 {
     struct aclk_query *query;
     struct _collector *tmp_collector;
-    if (unlikely(!netdata_ready)) {
+    if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
         return;
     }
 
@@ -818,7 +892,7 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m
 {
     struct aclk_query *query;
     struct _collector *tmp_collector;
-    if (unlikely(!netdata_ready)) {
+    if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
         return;
     }
 
@@ -854,3 +928,77 @@ void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *m
     query->data.metadata_alarms.initial_on_connect = 0;
     aclk_queue_query(query);
 }
+
+void ng_aclk_host_state_update(RRDHOST *host, int cmd)
+{
+    uuid_t node_id;
+    int ret;
+
+    if (!aclk_connected || !aclk_use_new_cloud_arch)
+        return;
+
+    ret = get_node_id(&host->host_uuid, &node_id);
+    if (ret > 0) {
+        // this means we were not able to check if node_id already present
+        error("Unable to check for node_id. Ignoring the host state update.");
+        return;
+    }
+    if (ret < 0) {
+        // node_id not found
+        aclk_query_t create_query;
+        create_query = aclk_query_new(REGISTER_NODE);
+        rrdhost_aclk_state_lock(localhost);
+        create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+        rrdhost_aclk_state_unlock(localhost);
+        create_query->data.node_creation.hops = 1; //TODO - real hop count instead of hardcoded
+        create_query->data.node_creation.hostname = strdupz(host->hostname);
+        create_query->data.node_creation.machine_guid = strdupz(host->machine_guid);
+        aclk_queue_query(create_query);
+        return;
+    }
+
+    aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+    query->data.node_update.hops = 1; //TODO - real hop count instead of hardcoded
+    rrdhost_aclk_state_lock(localhost);
+    query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+    rrdhost_aclk_state_unlock(localhost);
+    query->data.node_update.live = cmd;
+    query->data.node_update.node_id = mallocz(UUID_STR_LEN);
+    uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id);
+    query->data.node_update.queryable = 1;
+    query->data.node_update.session_id = aclk_session_newarch;
+    aclk_queue_query(query);
+}
+
+void aclk_send_node_instances()
+{
+    struct node_instance_list *list = get_node_list();
+    while (!uuid_is_null(list->host_id)) {
+        if (!uuid_is_null(list->node_id)) {
+            aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
+            rrdhost_aclk_state_lock(localhost);
+            query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
+            rrdhost_aclk_state_unlock(localhost);
+            query->data.node_update.live = list->live;
+            query->data.node_update.hops = list->hops;
+            query->data.node_update.node_id = mallocz(UUID_STR_LEN);
+            uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
+            query->data.node_update.queryable = 1;
+            query->data.node_update.session_id = aclk_session_newarch;
+            aclk_queue_query(query);
+        } else {
+            aclk_query_t create_query;
+            create_query = aclk_query_new(REGISTER_NODE);
+            rrdhost_aclk_state_lock(localhost);
+            create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
+            rrdhost_aclk_state_unlock(localhost);
+            create_query->data.node_creation.hops = uuid_compare(list->host_id, localhost->host_uuid) ? 1 : 0; // TODO - when streaming supports hops
+            create_query->data.node_creation.hostname = list->hostname;
+            create_query->data.node_creation.machine_guid  = mallocz(UUID_STR_LEN);
+            uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid);
+            aclk_queue_query(create_query);
+        }
+
+        list++;
+    }
+}

+ 4 - 0
aclk/aclk.h

@@ -43,4 +43,8 @@ int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create);
 void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
 void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
 
+void ng_aclk_host_state_update(RRDHOST *host, int cmd);
+
+void aclk_send_node_instances(void);
+
 #endif /* ACLK_H */

+ 13 - 0
aclk/aclk_api.c

@@ -146,6 +146,19 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu
     error_report("No usable aclk_del_collector implementation");
 }
 
+void aclk_host_state_update(RRDHOST *host, int connect)
+{
+#ifdef ACLK_NG
+    if (aclk_ng)
+        return ng_aclk_host_state_update(host, connect);
+#endif
+#ifdef ACLK_LEGACY
+    if (!aclk_ng)
+        return legacy_aclk_host_state_update(host, connect);
+#endif
+    error_report("Couldn't use any version of aclk_host_state_update");
+}
+
 #endif /* ENABLE_ACLK */
 
 struct label *add_aclk_host_labels(struct label *label) {

+ 2 - 0
aclk/aclk_api.h

@@ -35,6 +35,8 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
 void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
 void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
 
+void aclk_host_state_update(RRDHOST *host, int connect);
+
 #define NETDATA_ACLK_HOOK                                                                                              \
     { .name = "ACLK_Main",                                                                                             \
       .config_section = NULL,                                                                                          \

+ 59 - 1
aclk/aclk_query.c

@@ -55,11 +55,33 @@ static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char
     return t;
 }
 
+static RRDHOST *node_id_2_rrdhost(const char *node_id)
+{
+    int res;
+    uuid_t node_id_bin, host_id_bin;
+    char host_id[UUID_STR_LEN];
+    if (uuid_parse(node_id, node_id_bin)) {
+        error("Couldn't parse UUID %s", node_id);
+        return NULL;
+    }
+    if ((res = get_host_id(&node_id_bin, &host_id_bin))) {
+        error("node not found rc=%d", res);
+        return NULL;
+    }
+    uuid_unparse_lower(host_id_bin, host_id);
+    return rrdhost_find_by_guid(host_id, 0);
+}
+
+#define NODE_ID_QUERY "/node/"
+// TODO this function should be quarantied and written nicely
+// lots of skeletons from initial ACLK Legacy impl.
+// quick and dirty from the start
 static int http_api_v2(mqtt_wss_client client, aclk_query_t query)
 {
     int retval = 0;
     usec_t t;
     BUFFER *local_buffer = NULL;
+    RRDHOST *query_host = localhost;
 
 #ifdef NETDATA_WITH_ZLIB
     int z_ret;
@@ -76,6 +98,24 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query)
     w->cookie2[0] = 0;      // Simulate web_client_create_on_fd()
     w->acl = 0x1f;
 
+    if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
+        char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
+        char nodeid[UUID_STR_LEN];
+        if (strlen(node_uuid) < (UUID_STR_LEN - 1)) {
+            error("URL requests node_id but there is not enough chars following");
+            retval = 1;
+            goto cleanup;
+        }
+        strncpyz(nodeid, node_uuid, UUID_STR_LEN - 1);
+
+        query_host = node_id_2_rrdhost(nodeid);
+        if (!query_host) {
+            error("Host with node_id \"%s\" not found! Query Ignored!", node_uuid);
+            retval = 1;
+            goto cleanup;
+        }
+    }
+
     char *mysep = strchr(query->data.http_api_v2.query, '?');
     if (mysep) {
         url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
@@ -86,7 +126,7 @@ static int http_api_v2(mqtt_wss_client client, aclk_query_t query)
     mysep = strrchr(query->data.http_api_v2.query, '/');
 
     // execute the query
-    t = aclk_web_api_v1_request(localhost, w, mysep ? mysep + 1 : "noop");
+    t = aclk_web_api_v1_request(query_host, w, mysep ? mysep + 1 : "noop");
 
 #ifdef NETDATA_WITH_ZLIB
     // check if gzip encoding can and should be used
@@ -187,6 +227,22 @@ static int alarm_state_update_query(mqtt_wss_client client, aclk_query_t query)
     return 0;
 }
 
+static int register_node(mqtt_wss_client client, aclk_query_t query) {
+    // TODO create a pending registrations list
+    // with some timeouts to detect registration requests that
+    // go unanswered from the cloud
+    aclk_generate_node_registration(client, &query->data.node_creation);
+    return 0;
+}
+
+static int node_state_update(mqtt_wss_client client, aclk_query_t query) {
+    // TODO create a pending registrations list
+    // with some timeouts to detect registration requests that
+    // go unanswered from the cloud
+    aclk_generate_node_state_update(client, &query->data.node_update);
+    return 0;
+}
+
 aclk_query_handler aclk_query_handlers[] = {
     { .type = HTTP_API_V2,        .name = "http api request v2", .fnc = http_api_v2              },
     { .type = ALARM_STATE_UPDATE, .name = "alarm state update",  .fnc = alarm_state_update_query },
@@ -194,6 +250,8 @@ aclk_query_handler aclk_query_handlers[] = {
     { .type = METADATA_ALARMS,    .name = "alarms metadata",     .fnc = alarms_metadata          },
     { .type = CHART_NEW,          .name = "chart new",           .fnc = chart_query              },
     { .type = CHART_DEL,          .name = "chart delete",        .fnc = info_metadata            },
+    { .type = REGISTER_NODE,      .name = "register node",       .fnc = register_node            },
+    { .type = NODE_STATE_UPDATE,  .name = "node state update",   .fnc = node_state_update        },
     { .type = UNKNOWN,            .name = NULL,                  .fnc = NULL                     }
 };
 

+ 11 - 0
aclk/aclk_query_queue.c

@@ -114,6 +114,17 @@ void aclk_query_free(aclk_query_t query)
     if (query->type == ALARM_STATE_UPDATE && query->data.alarm_update)
         json_object_put(query->data.alarm_update);
 
+    if (query->type == NODE_STATE_UPDATE) {
+        freez((void*)query->data.node_update.claim_id);
+        freez((void*)query->data.node_update.node_id);
+    }
+
+    if (query->type == REGISTER_NODE) {
+        freez((void*)query->data.node_creation.claim_id);
+        freez((void*)query->data.node_creation.hostname);
+        freez((void*)query->data.node_creation.machine_guid);
+    }
+
     freez(query->dedup_id);
     freez(query->callback_topic);
     freez(query->msg_id);

Некоторые файлы не были показаны из-за большого количества измененных файлов