Просмотр исходного кода

ACLK Passwd endpoint update (#10859)

Updates ACLK-NG to properly handle new response of password payload as defined in New Cloud Architecture
Timotej S 3 лет назад
Родитель
Сommit
f9fbde67a4
6 измененных файлов с 319 добавлено и 76 удалено
  1. 26 11
      aclk/aclk.c
  2. 96 26
      aclk/aclk_otp.c
  3. 1 1
      aclk/aclk_otp.h
  4. 14 2
      aclk/aclk_tx_msgs.c
  5. 176 32
      aclk/aclk_util.c
  6. 6 4
      aclk/aclk_util.h

+ 26 - 11
aclk/aclk.c

@@ -201,6 +201,11 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
 {
     char cmsg[RX_MSGLEN_MAX];
     size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
+    const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
+    if (!cmd_topic) {
+        error("Error retrieving command topic");
+        return;
+    }
 
     if (msglen > RX_MSGLEN_MAX - 1)
         error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
@@ -224,7 +229,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
 
     debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d MSG: \"%s\"", topic, qos, cmsg);
 
-    if (strcmp(aclk_get_topic(ACLK_TOPICID_COMMAND), topic))
+    if (strcmp(cmd_topic, topic))
         error("Received message on unexpected topic %s", topic);
 
     if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
@@ -323,7 +328,12 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
     aclk_session_sec = now / USEC_PER_SEC;
     aclk_session_us = now % USEC_PER_SEC;
 
-    mqtt_wss_subscribe(client, aclk_get_topic(ACLK_TOPICID_COMMAND), 1);
+    const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
+
+    if (!topic)
+        error("Unable to fetch topic for COMMAND (to subscribe)");
+    else
+        mqtt_wss_subscribe(client, topic, 1);
 
     aclk_stats_upd_online(1);
     aclk_connected = 1;
@@ -331,7 +341,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
     aclk_hello_msg(client);
     ACLK_SHARED_STATE_LOCK;
     if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
-        error("Sending `connect` payload immediatelly as popcorning was finished already.");
+        error("Sending `connect` payload immediately as popcorning was finished already.");
         queue_connect_payloads();
     }
     ACLK_SHARED_STATE_UNLOCK;
@@ -461,9 +471,6 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
 #ifndef ACLK_DISABLE_CHALLENGE
     url_t auth_url;
     url_t mqtt_url;
-
-    char *mqtt_otp_user = NULL;
-    char *mqtt_otp_pass = NULL;
 #endif
 
     json_object *lwt;
@@ -494,7 +501,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
             .clientid   = "anon",
             .username   = "anon",
             .password   = "anon",
-            .will_topic = aclk_get_topic(ACLK_TOPICID_METADATA),
+            .will_topic = "lwt",
             .will_msg   = NULL,
             .will_flags = MQTT_WSS_PUB_QOS2,
             .keep_alive = 60
@@ -522,16 +529,20 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
             continue;
         }
 
-        ret = aclk_get_mqtt_otp(aclk_private_key, &mqtt_otp_user, &mqtt_otp_pass, &auth_url);
+        ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
         url_t_destroy(&auth_url);
         if (ret) {
             error("Error passing Challenge/Response to get OTP");
             continue;
         }
 
-        mqtt_conn_params.clientid = mqtt_otp_user;
-        mqtt_conn_params.username = mqtt_otp_user;
-        mqtt_conn_params.password = mqtt_otp_pass;
+        // aclk_get_topic moved here as during OTP we
+        // generate the topic cache
+        mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
+        if (!mqtt_conn_params.will_topic) {
+            error("Couldn't get LWT topic. Will not send LWT.");
+            continue;
+        }
 
         // Do the MQTT connection
         ret = aclk_get_transport_idx(aclk_env);
@@ -558,6 +569,10 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
 #else
         ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
         url_t_destroy(&mqtt_url);
+
+        freez((char*)mqtt_conn_params.clientid);
+        freez((char*)mqtt_conn_params.password);
+        freez((char*)mqtt_conn_params.username);
 #endif
 
         json_object_put(lwt);

+ 96 - 26
aclk/aclk_otp.c

@@ -188,8 +188,95 @@ static int aclk_https_request(https_req_t *request, https_req_response_t *respon
     return rc;
 }
 
+struct auth_data {
+    char *client_id;
+    char *username;
+    char *passwd;
+};
+
+#define PARSE_ENV_JSON_CHK_TYPE(it, type, name)                                                                        \
+    if (json_object_get_type(json_object_iter_peek_value(it)) != type) {                                               \
+        error("value of key \"%s\" should be %s", name, #type);                                                        \
+        goto exit;                                                                                                     \
+    }
+
+#define JSON_KEY_CLIENTID "clientID"
+#define JSON_KEY_USER     "username"
+#define JSON_KEY_PASS     "password"
+#define JSON_KEY_TOPICS   "topics"
+
+static int parse_passwd_response(const char *json_str, struct auth_data *auth) {
+    int rc = 1;
+    json_object *json;
+    struct json_object_iterator it;
+    struct json_object_iterator itEnd;
+
+    json = json_tokener_parse(json_str);
+    if (!json) {
+        error("JSON-C failed to parse the payload of http respons of /env endpoint");
+        return 1;
+    }
+
+    it = json_object_iter_begin(json);
+    itEnd = json_object_iter_end(json);
+
+    while (!json_object_iter_equal(&it, &itEnd)) {
+        if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_CLIENTID)) {
+            PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_CLIENTID)
+
+            auth->client_id = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+            json_object_iter_next(&it);
+            continue;
+        }
+        if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_USER)) {
+            PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_USER)
+
+            auth->username = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+            json_object_iter_next(&it);
+            continue;
+        }
+        if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_PASS)) {
+            PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_PASS)
+
+            auth->passwd = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+            json_object_iter_next(&it);
+            continue;
+        }
+        if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_TOPICS)) {
+            PARSE_ENV_JSON_CHK_TYPE(&it, json_type_array, JSON_KEY_TOPICS)
+
+            if (aclk_generate_topic_cache(json_object_iter_peek_value(&it))) {
+                error("Failed to generate topic cache!");
+                goto exit;
+            }
+            json_object_iter_next(&it);
+            continue;
+        }
+        error("Unknown key \"%s\" in passwd response payload. Ignoring", json_object_iter_peek_name(&it));
+        json_object_iter_next(&it);
+    }
+
+    if (!auth->client_id) {
+        error(JSON_KEY_CLIENTID " is compulsory key in /password response");
+        goto exit;
+    }
+    if (!auth->passwd) {
+        error(JSON_KEY_PASS " is compulsory in /password response");
+        goto exit;
+    }
+    if (!auth->username) {
+        error(JSON_KEY_USER " is compulsory in /password response");
+        goto exit;
+    }
+
+    rc = 0;
+exit:
+    json_object_put(json);
+    return rc;
+}
+
 #define OTP_URL_PREFIX "/api/v1/auth/node/"
-int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_usr, char **mqtt_pass, url_t *target) {
+int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) {
     // TODO this fnc will be rewritten and simplified in following PRs
     // still carries lot of baggage from ACLK Legacy
     int rc = 1;
@@ -272,43 +359,26 @@ int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_usr, char **mqtt_pass, url_t *targ
     }
     info ("ACLK_OTP Got Password from Cloud");
 
-    struct dictionary_singleton password = { .key = "password", .result = NULL };
-    if (json_parse(resp.payload, &password, json_extract_singleton) != JSON_OK)
-    {
-        freez(password.result);
-        error("Could not parse the json response with the password");
+    struct auth_data data = { .client_id = NULL, .passwd = NULL, .username = NULL };
+    
+    if (parse_passwd_response(resp.payload, &data)){
+        error("Error parsing response of password endpoint");
         goto cleanup_resp;
     }
 
-    if (password.result == NULL ) {
-        error("Could not retrieve password from auth response");
-        goto cleanup_resp;
-    }
-    if (*mqtt_pass != NULL )
-        freez(*mqtt_pass);
-    *mqtt_pass = password.result;
-    if (*mqtt_usr != NULL)
-        freez(*mqtt_usr);
-    *mqtt_usr = agent_id;
-    agent_id = NULL;
+    *mqtt_pass = data.passwd;
+    *mqtt_usr = data.username;
+    *mqtt_id = data.client_id;
 
     rc = 0;
-
 cleanup_resp:
     https_req_response_free(&resp);
 cleanup:
-    if (agent_id != NULL)
-        freez(agent_id);
+    freez(agent_id);
     buffer_free(url);
     return rc;
 }
 
-#define PARSE_ENV_JSON_CHK_TYPE(it, type, name)                                                                        \
-    if (json_object_get_type(json_object_iter_peek_value(it)) != type) {                                               \
-        error("value of key \"%s\" should be %s", name, #type);                                                        \
-        goto exit;                                                                                                     \
-    }
-
 #define JSON_KEY_ENC "encoding"
 #define JSON_KEY_AUTH_ENDPOINT "authEndpoint"
 #define JSON_KEY_TRP "transports"

+ 1 - 1
aclk/aclk_otp.h

@@ -7,7 +7,7 @@
 
 #include "https_client.h"
 
-int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_usr, char **mqtt_pass, url_t *target);
+int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target);
 int aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port);
 
 #endif /* ACLK_OTP_H */

+ 14 - 2
aclk/aclk_tx_msgs.c

@@ -13,8 +13,14 @@ static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg,
 {
     uint16_t packet_id;
     const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+    const char *topic = aclk_get_topic(subtopic);
 
-    mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str),  MQTT_WSS_PUB_QOS1, &packet_id);
+    if (unlikely(!topic)) {
+        error("Couldn't get topic. Aborting mesage send");
+        return;
+    }
+
+    mqtt_wss_publish_pid(client, topic, str, strlen(str),  MQTT_WSS_PUB_QOS1, &packet_id);
 #ifdef NETDATA_INTERNAL_CHECKS
     aclk_stats_msg_published(packet_id);
 #endif
@@ -30,8 +36,14 @@ static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_obje
 {
     uint16_t packet_id;
     const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
+    const char *topic = aclk_get_topic(subtopic);
+
+    if (unlikely(!topic)) {
+        error("Couldn't get topic. Aborting mesage send");
+        return 0;
+    }
 
-    mqtt_wss_publish_pid(client, aclk_get_topic(subtopic), str, strlen(str),  MQTT_WSS_PUB_QOS1, &packet_id);
+    mqtt_wss_publish_pid(client, topic, str, strlen(str),  MQTT_WSS_PUB_QOS1, &packet_id);
 #ifdef NETDATA_INTERNAL_CHECKS
     aclk_stats_msg_published(packet_id);
 #endif

+ 176 - 32
aclk/aclk_util.c

@@ -72,52 +72,188 @@ int aclk_get_conv_log_next()
 #define ACLK_TOPIC_PREFIX "/agent/"
 
 struct aclk_topic {
-    const char *topic_suffix;
+    enum aclk_topics topic_id;
+    // as received from cloud - we keep this for
+    // eventual topic list update when claim_id changes
+    char *topic_recvd;
+    // constructed topic
     char *topic;
 };
 
 // This helps to cache finalized topics (assembled with claim_id)
 // to not have to alloc or create buffer and construct topic every
 // time message is sent as in old ACLK
-static struct aclk_topic aclk_topic_cache[] = {
-    { .topic_suffix = "outbound/meta",   .topic = NULL }, // ACLK_TOPICID_CHART
-    { .topic_suffix = "outbound/alarms", .topic = NULL }, // ACLK_TOPICID_ALARMS
-    { .topic_suffix = "outbound/meta",   .topic = NULL }, // ACLK_TOPICID_METADATA
-    { .topic_suffix = "inbound/cmd",     .topic = NULL }, // ACLK_TOPICID_COMMAND
-    { .topic_suffix = NULL,              .topic = NULL }
-};
+static struct aclk_topic **aclk_topic_cache = NULL;
+static size_t aclk_topic_cache_items = 0;
 
 void free_topic_cache(void)
 {
-    struct aclk_topic *tc = aclk_topic_cache;
-    while (tc->topic_suffix) {
-        if (tc->topic) {
-            freez(tc->topic);
-            tc->topic = NULL;
+    if (aclk_topic_cache) {
+        for (size_t i = 0; i < aclk_topic_cache_items; i++) {
+            freez(aclk_topic_cache[i]->topic);
+            freez(aclk_topic_cache[i]->topic_recvd);
+            freez(aclk_topic_cache[i]);
         }
-        tc++;
+        freez(aclk_topic_cache);
+        aclk_topic_cache = NULL;
+        aclk_topic_cache_items = 0;
     }
 }
 
-static inline void generate_topic_cache(void)
-{
-    struct aclk_topic *tc = aclk_topic_cache;
-    char *ptr;
-    if (unlikely(!tc->topic)) {
-        rrdhost_aclk_state_lock(localhost);
-        while(tc->topic_suffix) {
-            tc->topic = mallocz(strlen(ACLK_TOPIC_PREFIX) + (UUID_STR_LEN - 1) + 2 /* '/' and \0 */ + strlen(tc->topic_suffix));
-            ptr = tc->topic;
-            strcpy(ptr, ACLK_TOPIC_PREFIX);
-            ptr += strlen(ACLK_TOPIC_PREFIX);
-            strcpy(ptr, localhost->aclk_state.claimed_id);
-            ptr += (UUID_STR_LEN - 1);
-            *ptr++ = '/';
-            strcpy(ptr, tc->topic_suffix);
-            tc++;
+#define JSON_TOPIC_KEY_TOPIC "topic"
+#define JSON_TOPIC_KEY_NAME "name"
+
+struct topic_name {
+    enum aclk_topics id;
+    // cloud name - how is it called
+    // in answer to /password endpoint
+    const char *name;
+} topic_names[] = {
+    { .id = ACLK_TOPICID_CHART,    .name = "chart"     },
+    { .id = ACLK_TOPICID_ALARMS,   .name = "alarms"    },
+    { .id = ACLK_TOPICID_METADATA, .name = "meta"      },
+    { .id = ACLK_TOPICID_COMMAND,  .name = "inbox-cmd" },
+    { .id = ACLK_TOPICID_UNKNOWN,  .name = NULL        }
+};
+
+enum aclk_topics compulsory_topics[] = {
+    ACLK_TOPICID_CHART,
+    ACLK_TOPICID_ALARMS,
+    ACLK_TOPICID_METADATA,
+    ACLK_TOPICID_COMMAND,
+    ACLK_TOPICID_UNKNOWN
+};
+
+static enum aclk_topics topic_name_to_id(const char *name) {
+    struct topic_name *topic = topic_names;
+    while (topic->name) {
+        if (!strcmp(topic->name, name)) {
+            return topic->id;
         }
+        topic++;
+    }
+    return ACLK_TOPICID_UNKNOWN;
+}
+
+static const char *topic_id_to_name(enum aclk_topics tid) {
+    struct topic_name *topic = topic_names;
+    while (topic->name) {
+        if (topic->id == tid)
+            return topic->name;
+        topic++;
+    }
+    return "unknown";
+}
+
+#define CLAIM_ID_REPLACE_TAG "#{claim_id}"
+static void topic_generate_final(struct aclk_topic *t) {
+    char *dest;
+    char *replace_tag = strstr(t->topic_recvd, CLAIM_ID_REPLACE_TAG);
+    if (!replace_tag)
+        return;
+
+    rrdhost_aclk_state_lock(localhost);
+    if (unlikely(!localhost->aclk_state.claimed_id)) {
+        error("This should never be called if agent not claimed");
         rrdhost_aclk_state_unlock(localhost);
+        return;
     }
+
+    t->topic = mallocz(strlen(t->topic_recvd) + 1 - strlen(CLAIM_ID_REPLACE_TAG) + strlen(localhost->aclk_state.claimed_id));
+    memcpy(t->topic, t->topic_recvd, replace_tag - t->topic_recvd);
+    dest = t->topic + (replace_tag - t->topic_recvd);
+
+    memcpy(dest, localhost->aclk_state.claimed_id, strlen(localhost->aclk_state.claimed_id));
+    dest += strlen(localhost->aclk_state.claimed_id);
+    rrdhost_aclk_state_unlock(localhost);
+    replace_tag += strlen(CLAIM_ID_REPLACE_TAG);
+    strcpy(dest, replace_tag);
+    dest += strlen(replace_tag);
+    *dest = 0;
+}
+
+static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *topic)
+{
+    struct json_object_iterator it;
+    struct json_object_iterator itEnd;
+
+    it = json_object_iter_begin(json);
+    itEnd = json_object_iter_end(json);
+
+    while (!json_object_iter_equal(&it, &itEnd)) {
+        if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_NAME)) {
+            if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) {
+                error("topic dictionary key \"" JSON_TOPIC_KEY_NAME "\" is expected to be json_type_string");
+                return 1;
+            }
+            topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it)));
+            if (topic->topic_id == ACLK_TOPICID_UNKNOWN) {
+                info("topic dictionary has unkown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it)));
+            }
+            json_object_iter_next(&it);
+            continue;
+        }
+        if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_TOPIC)) {
+            if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) {
+                error("topic dictionary key \"" JSON_TOPIC_KEY_TOPIC "\" is expected to be json_type_string");
+                return 1;
+            }
+            topic->topic_recvd = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+            json_object_iter_next(&it);
+            continue;
+        }
+
+        error("topic dictionary has Unknown/Unexpected key \"%s\" in topic description. Ignoring!", json_object_iter_peek_name(&it));
+        json_object_iter_next(&it);
+    }
+
+    if (!topic->topic_recvd) {
+        error("topic dictionary Missig compulsory key %s", JSON_TOPIC_KEY_TOPIC);
+        return 1;
+    }
+
+    topic_generate_final(topic);
+    aclk_topic_cache_items++;
+
+    return 0;
+}
+
+int aclk_generate_topic_cache(struct json_object *json)
+{
+    json_object *obj;
+
+    size_t array_size = json_object_array_length(json);
+    if (!array_size) {
+        error("Empty topic list!");
+        return 1;
+    }
+
+    if (aclk_topic_cache)
+        free_topic_cache();
+
+    aclk_topic_cache = callocz(array_size, sizeof(struct aclk_topic *));
+
+    for (size_t i = 0; i < array_size; i++) {
+        obj = json_object_array_get_idx(json, i);
+        if (json_object_get_type(obj) != json_type_object) {
+            error("expected json_type_object");
+            return 1;
+        }
+        aclk_topic_cache[i] = callocz(1, sizeof(struct aclk_topic));
+        if (topic_cache_add_topic(obj, aclk_topic_cache[i])) {
+            error("failed to parse topic @idx=%d", (int)i);
+            return 1;
+        }
+    }
+
+    for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) {
+        if (!aclk_get_topic(compulsory_topics[i])) {
+            error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i]));
+            return 1;
+        }
+    }
+
+    return 0;
 }
 
 /*
@@ -127,9 +263,17 @@ static inline void generate_topic_cache(void)
  */
 const char *aclk_get_topic(enum aclk_topics topic)
 {
-    generate_topic_cache();
+    if (!aclk_topic_cache) {
+        error("Topic cache not initialized");
+        return NULL;
+    }
 
-    return aclk_topic_cache[topic].topic;
+    for (size_t i = 0; i < aclk_topic_cache_items; i++) {
+        if (aclk_topic_cache[i]->topic_id == topic)
+            return aclk_topic_cache[i]->topic;
+    }
+    error("Unknown topic");
+    return NULL;
 }
 
 /*

+ 6 - 4
aclk/aclk_util.h

@@ -51,13 +51,15 @@ void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc);
 void aclk_env_t_destroy(aclk_env_t *env);
 
 enum aclk_topics {
-    ACLK_TOPICID_CHART    = 0,
-    ACLK_TOPICID_ALARMS   = 1,
-    ACLK_TOPICID_METADATA = 2,
-    ACLK_TOPICID_COMMAND  = 3,
+    ACLK_TOPICID_UNKNOWN  = 0,
+    ACLK_TOPICID_CHART    = 1,
+    ACLK_TOPICID_ALARMS   = 2,
+    ACLK_TOPICID_METADATA = 3,
+    ACLK_TOPICID_COMMAND  = 4
 };
 
 const char *aclk_get_topic(enum aclk_topics topic);
+int aclk_generate_topic_cache(struct json_object *json);
 void free_topic_cache(void);
 // TODO
 // aclk_topics_reload //when claim id changes