Просмотр исходного кода

Query queue only for queries (#13431)

simplify and clean up
Timotej S 2 лет назад
Родитель
Сommit
221fd51287
10 измененных файлов с 73 добавлено и 223 удалено
  1. 15 23
      aclk/aclk.c
  2. 11 0
      aclk/aclk.h
  3. 4 22
      aclk/aclk_alarm_api.c
  4. 10 39
      aclk/aclk_charts_api.c
  5. 4 10
      aclk/aclk_contexts_api.c
  6. 13 55
      aclk/aclk_query.c
  7. 0 2
      aclk/aclk_query.h
  8. 5 30
      aclk/aclk_query_queue.c
  9. 3 33
      aclk/aclk_query_queue.h
  10. 8 9
      aclk/aclk_rx_msgs.c

+ 15 - 23
aclk/aclk.c

@@ -742,8 +742,8 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
     }
     if (ret < 0) {
         // node_id not found
-        aclk_query_t create_query;
-        create_query = aclk_query_new(REGISTER_NODE);
+        size_t payload_len;
+
         rrdhost_aclk_state_lock(localhost);
         node_instance_creation_t node_instance_creation = {
             .claim_id = localhost->aclk_state.claimed_id,
@@ -751,16 +751,14 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
             .hostname = host->hostname,
             .machine_guid = host->machine_guid
         };
-        create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+        char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation);
         rrdhost_aclk_state_unlock(localhost);
-        create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
-        create_query->data.bin_payload.msg_name = "CreateNodeInstance";
+
         info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
-        aclk_queue_query(create_query);
+        aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
         return;
     }
 
-    aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
     node_instance_connection_t node_state_update = {
         .hops = host->system_info->hops,
         .live = cmd,
@@ -781,15 +779,14 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
 
     rrdhost_aclk_state_lock(localhost);
     node_state_update.claim_id = localhost->aclk_state.claimed_id;
-    query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+    size_t payload_len;
+    char *payload = generate_node_instance_connection(&payload_len, &node_state_update);
     rrdhost_aclk_state_unlock(localhost);
 
     info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
          host->system_info->hops);
     freez((void*)node_state_update.node_id);
-    query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
-    query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
-    aclk_queue_query(query);
+    aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
 }
 
 void aclk_send_node_instances()
@@ -802,7 +799,6 @@ void aclk_send_node_instances()
     }
     while (!uuid_is_null(list->host_id)) {
         if (!uuid_is_null(list->node_id)) {
-            aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
             node_instance_connection_t node_state_update = {
                 .live = list->live,
                 .hops = list->hops,
@@ -827,34 +823,30 @@ void aclk_send_node_instances()
 
             rrdhost_aclk_state_lock(localhost);
             node_state_update.claim_id = localhost->aclk_state.claimed_id;
-            query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+            size_t payload_len;
+            char *payload = generate_node_instance_connection(&payload_len, &node_state_update);
             rrdhost_aclk_state_unlock(localhost);
             info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
                  list->live,
                  list->hops);
             freez((void*)node_state_update.node_id);
-            query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
-            query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
-            aclk_queue_query(query);
+            aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
         } else {
-            aclk_query_t create_query;
-            create_query = aclk_query_new(REGISTER_NODE);
             node_instance_creation_t node_instance_creation = {
                 .hops = list->hops,
                 .hostname = list->hostname,
             };
             node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
             uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
-            create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
-            create_query->data.bin_payload.msg_name = "CreateNodeInstance";
             rrdhost_aclk_state_lock(localhost);
-            node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
-            create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
+            node_instance_creation.claim_id = localhost->aclk_state.claimed_id;
+            size_t payload_len;
+            char *payload = generate_node_instance_creation(&payload_len, &node_instance_creation);
             rrdhost_aclk_state_unlock(localhost);
             info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
                  list->hops);
             freez(node_instance_creation.machine_guid);
-            aclk_queue_query(create_query);
+            aclk_send_bin_message_subtopic_pid(mqttwss_client, payload, payload_len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
         }
         freez(list->hostname);
 

+ 11 - 0
aclk/aclk.h

@@ -34,6 +34,17 @@ void aclk_send_node_instances(void);
 
 void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
 
+#define GENERATE_AND_SEND_PAYLOAD(topic, msg_name, generator_fnc, generator_data...)                                   \
+    size_t payload_len;                                                                                                \
+    char *payload = generator_fnc(&payload_len, generator_data);                                                       \
+    if (unlikely(payload == NULL)) {                                                                                   \
+        error("Failed to generate payload (%s)", __FUNCTION__);                                                        \
+        return;                                                                                                        \
+    }                                                                                                                  \
+    aclk_send_bin_msg(payload, payload_len, topic, msg_name);                                                          \
+    if (!use_mqtt_5)                                                                                                   \
+        freez(payload);
+
 char *ng_aclk_state(void);
 char *ng_aclk_state_json(void);
 

+ 4 - 22
aclk/aclk_alarm_api.c

@@ -10,38 +10,20 @@
 
 void aclk_send_alarm_log_health(struct alarm_log_health *log_health)
 {
-    aclk_query_t query = aclk_query_new(ALARM_LOG_HEALTH);
-    query->data.bin_payload.payload = generate_alarm_log_health(&query->data.bin_payload.size, log_health);
-    query->data.bin_payload.topic = ACLK_TOPICID_ALARM_HEALTH;
-    query->data.bin_payload.msg_name = "AlarmLogHealth";
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_HEALTH, "AlarmLogHealth", generate_alarm_log_health, log_health);
 }
 
 void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry)
 {
-    size_t payload_size;
-    char *payload = generate_alarm_log_entry(&payload_size, log_entry);
-
-    aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry");
-
-    if (!use_mqtt_5)
-        freez(payload);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry", generate_alarm_log_entry, log_entry);
 }
 
 void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg)
 {
-    aclk_query_t query = aclk_query_new(ALARM_PROVIDE_CFG);
-    query->data.bin_payload.payload = generate_provide_alarm_configuration(&query->data.bin_payload.size, cfg);
-    query->data.bin_payload.topic = ACLK_TOPICID_ALARM_CONFIG;
-    query->data.bin_payload.msg_name = "ProvideAlarmConfiguration";
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_CONFIG, "ProvideAlarmConfiguration", generate_provide_alarm_configuration, cfg);
 }
 
 void aclk_send_alarm_snapshot(alarm_snapshot_proto_ptr_t snapshot)
 {
-    aclk_query_t query = aclk_query_new(ALARM_SNAPSHOT);
-    query->data.bin_payload.payload = generate_alarm_snapshot_bin(&query->data.bin_payload.size, snapshot);
-    query->data.bin_payload.topic = ACLK_TOPICID_ALARM_SNAPSHOT;
-    query->data.bin_payload.msg_name = "AlarmSnapshot";
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_ALARM_SNAPSHOT, "AlarmSnapshot", generate_alarm_snapshot_bin, snapshot);
 }

+ 10 - 39
aclk/aclk_charts_api.c

@@ -3,75 +3,46 @@
 
 #include "aclk_query_queue.h"
 
+#include "aclk.h"
+
 #define CHART_DIM_UPDATE_NAME "ChartsAndDimensionsUpdated"
 
 void aclk_chart_inst_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
 {
-    aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
-    query->data.bin_payload.payload = generate_charts_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions);
-    query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_charts_updated, payloads, payload_sizes, new_positions);
 }
 
 void aclk_chart_dim_update(char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
 {
-    aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
-    query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS;
-    query->data.bin_payload.payload = generate_chart_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, new_positions);
-    query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_chart_dimensions_updated, payloads, payload_sizes, new_positions);
 }
 
 void aclk_chart_inst_and_dim_update(char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id)
 {
-    aclk_query_t query = aclk_query_new(CHART_DIMS_UPDATE);
-    query->data.bin_payload.topic = ACLK_TOPICID_CHART_DIMS;
-    query->data.bin_payload.payload = generate_charts_and_dimensions_updated(&query->data.bin_payload.size, payloads, payload_sizes, is_dim, new_positions, batch_id);
-    query->data.bin_payload.msg_name = CHART_DIM_UPDATE_NAME;
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_DIMS, CHART_DIM_UPDATE_NAME, generate_charts_and_dimensions_updated, payloads, payload_sizes, is_dim, new_positions, batch_id);
 }
 
 void aclk_chart_config_updated(struct chart_config_updated *config_list, int list_size)
 {
-    aclk_query_t query = aclk_query_new(CHART_CONFIG_UPDATED);
-    query->data.bin_payload.topic = ACLK_TOPICID_CHART_CONFIGS_UPDATED;
-    query->data.bin_payload.payload = generate_chart_configs_updated(&query->data.bin_payload.size, config_list, list_size);
-    query->data.bin_payload.msg_name = "ChartConfigsUpdated";
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_CONFIGS_UPDATED, "ChartConfigsUpdated", generate_chart_configs_updated, config_list, list_size);
 }
 
 void aclk_chart_reset(chart_reset_t reset)
 {
-    aclk_query_t query = aclk_query_new(CHART_RESET);
-    query->data.bin_payload.topic = ACLK_TOPICID_CHART_RESET;
-    query->data.bin_payload.payload = generate_reset_chart_messages(&query->data.bin_payload.size, reset);
-    query->data.bin_payload.msg_name = "ResetChartMessages";
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CHART_RESET, "ResetChartMessages", generate_reset_chart_messages, reset);
 }
 
 void aclk_retention_updated(struct retention_updated *data)
 {
-    aclk_query_t query = aclk_query_new(RETENTION_UPDATED);
-    query->data.bin_payload.topic = ACLK_TOPICID_RETENTION_UPDATED;
-    query->data.bin_payload.payload = generate_retention_updated(&query->data.bin_payload.size, data);
-    query->data.bin_payload.msg_name = "RetentionUpdated";
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_RETENTION_UPDATED, "RetentionUpdated", generate_retention_updated, data);
 }
 
 void aclk_update_node_info(struct update_node_info *info)
 {
-    aclk_query_t query = aclk_query_new(UPDATE_NODE_INFO);
-    query->data.bin_payload.topic = ACLK_TOPICID_NODE_INFO;
-    query->data.bin_payload.payload = generate_update_node_info_message(&query->data.bin_payload.size, info);
-    query->data.bin_payload.msg_name = "UpdateNodeInfo";
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_NODE_INFO, "UpdateNodeInfo", generate_update_node_info_message, info);
 }
 
 void aclk_update_node_collectors(struct update_node_collectors *collectors)
 {
-    aclk_query_t query = aclk_query_new(UPDATE_NODE_COLLECTORS);
-    query->data.bin_payload.topic = ACLK_TOPICID_NODE_COLLECTORS;
-    query->data.bin_payload.payload = generate_update_node_collectors_message(&query->data.bin_payload.size, collectors);
-    query->data.bin_payload.msg_name = "UpdateNodeCollectors";
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_NODE_COLLECTORS, "UpdateNodeCollectors", generate_update_node_collectors_message, collectors);
 }

+ 4 - 10
aclk/aclk_contexts_api.c

@@ -4,20 +4,14 @@
 
 #include "aclk_contexts_api.h"
 
+#include "aclk.h"
+
 void aclk_send_contexts_snapshot(contexts_snapshot_t data)
 {
-    aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
-    query->data.bin_payload.topic = ACLK_TOPICID_CTXS_SNAPSHOT;
-    query->data.bin_payload.payload = contexts_snapshot_2bin(data, &query->data.bin_payload.size);
-    query->data.bin_payload.msg_name = "ContextsSnapshot";
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CTXS_SNAPSHOT, "ContextsSnapshot", contexts_snapshot_2bin, data);
 }
 
 void aclk_send_contexts_updated(contexts_updated_t data)
 {
-    aclk_query_t query = aclk_query_new(PROTO_BIN_MESSAGE);
-    query->data.bin_payload.topic = ACLK_TOPICID_CTXS_UPDATED;
-    query->data.bin_payload.payload = contexts_updated_2bin(data, &query->data.bin_payload.size);
-    query->data.bin_payload.msg_name = "ContextsUpdated";
-    QUEUE_IF_PAYLOAD_PRESENT(query);
+    GENERATE_AND_SEND_PAYLOAD(ACLK_TOPICID_CTXS_UPDATED, "ContextsUpdated", contexts_updated_2bin, data);
 }

+ 13 - 55
aclk/aclk_query.c

@@ -84,7 +84,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
     w->cookie2[0] = 0;      // Simulate web_client_create_on_fd()
     w->acl = 0x1f;
 
-    buffer_strcat(log_buffer, query->data.http_api_v2.query);
+    buffer_strcat(log_buffer, query->http_api_v2.query);
     size_t size = 0;
     size_t sent = 0;
     w->tv_in = query->created_tv;
@@ -102,8 +102,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
     }
 
     RRDHOST *temp_host = NULL;
-    if (!strncmp(query->data.http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
-        char *node_uuid = query->data.http_api_v2.query + strlen(NODE_ID_QUERY);
+    if (!strncmp(query->http_api_v2.query, NODE_ID_QUERY, strlen(NODE_ID_QUERY))) {
+        char *node_uuid = query->http_api_v2.query + strlen(NODE_ID_QUERY);
         char nodeid[UUID_STR_LEN];
         if (strlen(node_uuid) < (UUID_STR_LEN - 1)) {
             error_report(CLOUD_EMSG_MALFORMED_NODE_ID);
@@ -127,14 +127,14 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
         }
     }
 
-    char *mysep = strchr(query->data.http_api_v2.query, '?');
+    char *mysep = strchr(query->http_api_v2.query, '?');
     if (mysep) {
         url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
         *mysep = '\0';
     } else
-        url_decode_r(w->decoded_query_string, query->data.http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
+        url_decode_r(w->decoded_query_string, query->http_api_v2.query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
 
-    mysep = strrchr(query->data.http_api_v2.query, '/');
+    mysep = strrchr(query->http_api_v2.query, '/');
 
     if (aclk_stats_enabled) {
         ACLK_STATS_LOCK;
@@ -151,7 +151,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
 
 #ifdef NETDATA_WITH_ZLIB
     // check if gzip encoding can and should be used
-    if ((start = strstr((char *)query->data.http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) {
+    if ((start = strstr((char *)query->http_api_v2.payload, WEB_HDR_ACCEPT_ENC))) {
         start += strlen(WEB_HDR_ACCEPT_ENC);
         end = strstr(start, "\x0D\x0A");
         start = strstr(start, "gzip");
@@ -256,57 +256,16 @@ cleanup:
     return retval;
 }
 
-static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
-    // this will be simplified when legacy support is removed
-    aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name);
-    return 0;
-}
-
-const char *aclk_query_get_name(aclk_query_type_t qt)
-{
-    switch (qt) {
-        case HTTP_API_V2:          return "http_api_request_v2";
-        case REGISTER_NODE:        return "register_node";
-        case NODE_STATE_UPDATE:    return "node_state_update";
-        case CHART_DIMS_UPDATE:    return "chart_and_dim_update";
-        case CHART_CONFIG_UPDATED: return "chart_config_updated";
-        case CHART_RESET:          return "reset_chart_messages";
-        case RETENTION_UPDATED:    return "update_retention_info";
-        case UPDATE_NODE_INFO:     return "update_node_info";
-        case ALARM_LOG_HEALTH:     return "alarm_log_health";
-        case ALARM_PROVIDE_CFG:    return "provide_alarm_config";
-        case ALARM_SNAPSHOT:       return "alarm_snapshot";
-        case UPDATE_NODE_COLLECTORS: return "update_node_collectors";
-        case PROTO_BIN_MESSAGE:    return "generic_binary_proto_message";
-        default:
-            error_report("Unknown query type used %d", (int) qt);
-            return "unknown";
-    }
-}
-
 static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
 {   
-    if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) {
-        error_report("Unknown query in query queue. %u", query->type);
-        aclk_query_free(query);
-        return;
-    }
-
-    worker_is_busy(query->type);
-    if (query->type == HTTP_API_V2) {
-        debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\"");
-        http_api_v2(query_thr, query);
-    } else {
-        debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name);
-        send_bin_msg(query_thr, query);
-    }
+    worker_is_busy(0);
+    debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\"");
+    http_api_v2(query_thr, query);
 
     if (aclk_stats_enabled) {
         ACLK_STATS_LOCK;
         aclk_metrics_per_sample.queries_dispatched++;
         aclk_queries_per_thread[query_thr->idx]++;
-        aclk_metrics_per_sample.queries_per_type[query->type]++;
         ACLK_STATS_UNLOCK;
     }
 
@@ -326,11 +285,10 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr)
     return 0;
 }
 
-static void worker_aclk_register(void) {
+static void worker_aclk_register(void)
+{
     worker_register("ACLKQUERY");
-    for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) {
-        worker_register_job_name(i, aclk_query_get_name(i));
-    }
+    worker_register_job_name(0, "http query");
 }
 
 /**

+ 0 - 2
aclk/aclk_query.h

@@ -31,6 +31,4 @@ struct aclk_query_threads {
 void aclk_query_threads_start(struct aclk_query_threads *query_threads, mqtt_wss_client client);
 void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
 
-const char *aclk_query_get_name(aclk_query_type_t qt);
-
 #endif //NETDATA_AGENT_CLOUD_LINK_H

+ 5 - 30
aclk/aclk_query_queue.c

@@ -95,41 +95,16 @@ void aclk_queue_flush(void)
     };
 }
 
-aclk_query_t aclk_query_new(aclk_query_type_t type)
+aclk_query_t aclk_query_new()
 {
-    aclk_query_t query = callocz(1, sizeof(struct aclk_query));
-    query->type = type;
-    return query;
+    return callocz(1, sizeof(struct aclk_query));
 }
 
 void aclk_query_free(aclk_query_t query)
 {
-    switch (query->type) {
-    case HTTP_API_V2:
-        freez(query->data.http_api_v2.payload);
-        if (query->data.http_api_v2.query != query->dedup_id)
-            freez(query->data.http_api_v2.query);
-        break;
-
-    case NODE_STATE_UPDATE:
-    case REGISTER_NODE:
-    case CHART_DIMS_UPDATE:
-    case CHART_CONFIG_UPDATED:
-    case CHART_RESET:
-    case RETENTION_UPDATED:
-    case UPDATE_NODE_INFO:
-    case ALARM_LOG_HEALTH:
-    case ALARM_PROVIDE_CFG:
-    case ALARM_SNAPSHOT:
-    case UPDATE_NODE_COLLECTORS:
-    case PROTO_BIN_MESSAGE:
-        if (!use_mqtt_5)
-            freez(query->data.bin_payload.payload);
-        break;
-
-    default:
-        break;
-    }
+    freez(query->http_api_v2.payload);
+    if (query->http_api_v2.query != query->dedup_id)
+        freez(query->http_api_v2.query);
 
     freez(query->dedup_id);
     freez(query->callback_topic);

+ 3 - 33
aclk/aclk_query_queue.h

@@ -9,24 +9,6 @@
 
 #include "aclk_util.h"
 
-typedef enum {
-    UNKNOWN = 0,
-    HTTP_API_V2,
-    REGISTER_NODE,
-    NODE_STATE_UPDATE,
-    CHART_DIMS_UPDATE,
-    CHART_CONFIG_UPDATED,
-    CHART_RESET,
-    RETENTION_UPDATED,
-    UPDATE_NODE_INFO,
-    ALARM_LOG_HEALTH,
-    ALARM_PROVIDE_CFG,
-    ALARM_SNAPSHOT,
-    UPDATE_NODE_COLLECTORS,
-    PROTO_BIN_MESSAGE,
-    ACLK_QUERY_TYPE_COUNT // always keep this as last
-} aclk_query_type_t;
-
 struct aclk_query_http_api_v2 {
     char *payload;
     char *query;
@@ -41,8 +23,6 @@ struct aclk_bin_payload {
 
 typedef struct aclk_query *aclk_query_t;
 struct aclk_query {
-    aclk_query_type_t type;
-
     // dedup_id is used to deduplicate queries in the list
     // if type and dedup_id is the same message is deduplicated
     // set dedup_id to NULL to never deduplicate the message
@@ -59,13 +39,11 @@ struct aclk_query {
 
     // TODO maybe remove?
     int version;
-    union {
-        struct aclk_query_http_api_v2 http_api_v2;
-        struct aclk_bin_payload bin_payload;
-    } data;
+
+    struct aclk_query_http_api_v2 http_api_v2;
 };
 
-aclk_query_t aclk_query_new(aclk_query_type_t type);
+aclk_query_t aclk_query_new();
 void aclk_query_free(aclk_query_t query);
 
 int aclk_queue_query(aclk_query_t query);
@@ -75,12 +53,4 @@ void aclk_queue_flush(void);
 void aclk_queue_lock(void);
 void aclk_queue_unlock(void);
 
-#define QUEUE_IF_PAYLOAD_PRESENT(query)                                                                                \
-    if (likely(query->data.bin_payload.payload)) {                                                                     \
-        aclk_queue_query(query);                                                                                       \
-    } else {                                                                                                           \
-        error("Failed to generate payload (%s)", __FUNCTION__);                                                        \
-        aclk_query_free(query);                                                                                        \
-    }
-
 #endif /* NETDATA_ACLK_QUERY_QUEUE_H */

+ 8 - 9
aclk/aclk_rx_msgs.c

@@ -5,6 +5,7 @@
 #include "aclk_stats.h"
 #include "aclk_query_queue.h"
 #include "aclk.h"
+#include "aclk_tx_msgs.h"
 
 #include "schema-wrappers/proto_2_json.h"
 
@@ -131,14 +132,14 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent
         return 1;
     }
 
-    query = aclk_query_new(HTTP_API_V2);
+    query = aclk_query_new();
 
-    if (unlikely(aclk_extract_v2_data(raw_payload, &query->data.http_api_v2.payload))) {
+    if (unlikely(aclk_extract_v2_data(raw_payload, &query->http_api_v2.payload))) {
         error("Error extracting payload expected after the JSON dictionary.");
         goto error;
     }
 
-    if (unlikely(aclk_v2_payload_get_query(query->data.http_api_v2.payload, &query->dedup_id))) {
+    if (unlikely(aclk_v2_payload_get_query(query->http_api_v2.payload, &query->dedup_id))) {
         error("Could not extract payload from query");
         goto error;
     }
@@ -158,7 +159,7 @@ static int aclk_handle_cloud_http_request_v2(struct aclk_request *cloud_to_agent
     query->timeout = cloud_to_agent->timeout;
     // for clarity and code readability as when we process the request
     // it would be strange to get URL from `dedup_id`
-    query->data.http_api_v2.query = query->dedup_id;
+    query->http_api_v2.query = query->dedup_id;
     query->msg_id = cloud_to_agent->msg_id;
     aclk_queue_query(query);
     return 0;
@@ -265,7 +266,6 @@ int create_node_instance_result(const char *msg, size_t msg_len)
     }
     update_node_id(&host_id, &node_id);
 
-    aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
     node_instance_connection_t node_state_update = {
         .hops = 1,
         .live = 0,
@@ -298,15 +298,14 @@ int create_node_instance_result(const char *msg, size_t msg_len)
     };
     node_state_update.capabilities = caps;
 
+    size_t payload_len;
     rrdhost_aclk_state_lock(localhost);
     node_state_update.claim_id = localhost->aclk_state.claimed_id;
-    query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
+    char *payload = generate_node_instance_connection(&payload_len, &node_state_update);
     rrdhost_aclk_state_unlock(localhost);
 
-    query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
-    query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
+    aclk_send_bin_msg(payload, payload_len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
 
-    aclk_queue_query(query);
     freez(res.node_id);
     freez(res.machine_guid);
     return 0;

Некоторые файлы не были показаны из-за большого количества измененных файлов