Browse Source

ACLK new cloud architecture new TBEB (#10941)

* new TBEB impl. honoring new cloud architecture requirements
* handle error cases during env/passwd/challenge as per spec of new cloud architecture
Timotej S 3 years ago
parent
commit
690df2de3b
5 changed files with 218 additions and 23 deletions
  1. 32 5
      aclk/aclk.c
  2. 2 0
      aclk/aclk.h
  3. 159 0
      aclk/aclk_otp.c
  4. 23 17
      aclk/aclk_util.c
  5. 2 1
      aclk/aclk_util.h

+ 32 - 5
aclk/aclk.c

@@ -29,6 +29,8 @@ int aclk_kill_link = 0;
 
 int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
 
+time_t aclk_block_until = 0;
+
 usec_t aclk_session_us = 0;         // Used by the mqtt layer
 time_t aclk_session_sec = 0;        // Used by the mqtt layer
 
@@ -241,7 +243,7 @@ static void msg_callback(const char *topic, const void *msg, size_t msglen, int
 static void puback_callback(uint16_t packet_id)
 {
     if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE)
-        aclk_reconnect_delay(0);
+        aclk_tbeb_reset();
 
 #ifdef NETDATA_INTERNAL_CHECKS
     aclk_stats_msg_puback(packet_id);
@@ -404,16 +406,41 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
     mqtt_wss_disconnect(client, 1000);
 }
 
+static unsigned long aclk_reconnect_delay() {
+    unsigned long recon_delay;
+    time_t now;
+
+    if (aclk_disable_runtime) {
+        aclk_tbeb_reset();
+        return 60 * MSEC_PER_SEC;
+    }
+
+    now = now_monotonic_sec();
+    if (aclk_block_until) {
+        if (now < aclk_block_until) {
+            recon_delay = aclk_block_until - now;
+            recon_delay *= MSEC_PER_SEC;
+            aclk_block_until = 0;
+            aclk_tbeb_reset();
+            return recon_delay;
+        }
+        aclk_block_until = 0;
+    }
+
+    if (!aclk_env || !aclk_env->backoff.base)
+        return aclk_tbeb_delay(0, 2, 0, 1024);
+
+    return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
+}
+
 /* Block till aclk_reconnect_delay is satisifed or netdata_exit is signalled
  * @return 0 - Go ahead and connect (delay expired)
  *         1 - netdata_exit
  */
 #define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
 static int aclk_block_till_recon_allowed() {
-    // Handle reconnect exponential backoff
-    // fnc aclk_reconnect_delay comes from ACLK Legacy @amoss
-    // but has been modifed slightly (more randomness)
-    unsigned long recon_delay = aclk_reconnect_delay(1);
+    unsigned long recon_delay = aclk_reconnect_delay();
+
     info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
     // we want to wake up from time to time to check netdata_exit
     while (recon_delay)

+ 2 - 0
aclk/aclk.h

@@ -25,6 +25,8 @@ extern int aclk_disable_single_updates;
 extern int aclk_kill_link;
 extern int aclk_connected;
 
+extern time_t aclk_block_until;
+
 extern usec_t aclk_session_us;
 extern time_t aclk_session_sec;
 

+ 159 - 0
aclk/aclk_otp.c

@@ -275,6 +275,161 @@ exit:
     return rc;
 }
 
+#define JSON_KEY_ERTRY   "errorNonRetryable"
+#define JSON_KEY_EDELAY  "errorRetryDelaySeconds"
+#define JSON_KEY_EEC     "errorCode"
+#define JSON_KEY_EMSGKEY "errorMsgKey"
+#define JSON_KEY_EMSG    "errorMessage"
+#if JSON_C_MINOR_VERSION >= 13
+static const char *get_json_str_by_path(json_object *json, const char *path) {
+    json_object *ptr;
+    if (json_pointer_get(json, path, &ptr)) {
+        error("Missing compulsory key \"%s\" in error response", path);
+        return NULL;
+    }
+    if (json_object_get_type(ptr) != json_type_string) {
+        error("Value of Key \"%s\" in error response should be string", path);
+        return NULL;
+    }
+    return json_object_get_string(ptr);
+}
+
+static int aclk_parse_otp_error(const char *json_str) {
+    int rc = 1;
+    json_object *json, *ptr;
+    const char *ec;
+    const char *ek;
+    const char *emsg;
+    int block_retry = -1, backoff = -1;
+
+
+    json = json_tokener_parse(json_str);
+    if (!json) {
+        error("JSON-C failed to parse the payload of http response of /env endpoint");
+        return 1;
+    }
+
+    if ((ec = get_json_str_by_path(json, "/" JSON_KEY_EEC)) == NULL)
+        goto exit;
+
+    if ((ek = get_json_str_by_path(json, "/" JSON_KEY_EMSGKEY)) == NULL)
+        goto exit;
+
+    if ((emsg = get_json_str_by_path(json, "/" JSON_KEY_EMSG)) == NULL)
+        goto exit;
+
+    // optional field
+    if (!json_pointer_get(json, "/" JSON_KEY_ERTRY, &ptr)) {
+        if (json_object_get_type(ptr) != json_type_boolean) {
+            error("Error response Key " "/" JSON_KEY_ERTRY " should be of boolean type");
+            goto exit;
+        }
+        block_retry = json_object_get_boolean(ptr);
+    }
+
+    // optional field
+    if (!json_pointer_get(json, "/" JSON_KEY_EDELAY, &ptr)) {
+        if (json_object_get_type(ptr) != json_type_int) {
+            error("Error response Key " "/" JSON_KEY_EDELAY " should be of integer type");
+            goto exit;
+        }
+        backoff = json_object_get_int(ptr);
+    }
+
+    if (block_retry > 0)
+        aclk_disable_runtime = 1;
+
+    if (backoff > 0)
+        aclk_block_until = now_monotonic_sec() + backoff;
+
+    error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff);
+    rc = 0;
+exit:
+    json_object_put(json);
+    return rc;
+}
+#else
+static int aclk_parse_otp_error(const char *json_str) {
+    int rc = 1;
+    int block_retry = -1, backoff = -1;
+
+    const char *ec = NULL;
+    const char *ek = NULL;
+    const char *emsg = NULL;
+
+    json_object *json;
+    struct json_object_iterator it;
+    struct json_object_iterator itEnd;
+
+    json = json_tokener_parse(json_str);
+    if (!json) {
+        error("JSON-C failed to parse the payload of http respons of /env endpoint");
+        return 1;
+    }
+
+    it = json_object_iter_begin(json);
+    itEnd = json_object_iter_end(json);
+
+    while (!json_object_iter_equal(&it, &itEnd)) {
+        if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EMSG)) {
+            PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EMSG)
+
+            emsg = json_object_get_string(json_object_iter_peek_value(&it));
+            json_object_iter_next(&it);
+            continue;
+        }
+        if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EMSGKEY)) {
+            PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EMSGKEY)
+
+            ek = json_object_get_string(json_object_iter_peek_value(&it));
+            json_object_iter_next(&it);
+            continue;
+        }
+        if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EEC)) {
+            PARSE_ENV_JSON_CHK_TYPE(&it, json_type_string, JSON_KEY_EEC)
+
+            ec = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
+            json_object_iter_next(&it);
+            continue;
+        }
+        if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_EDELAY)) {
+            if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_int) {
+                error("value of key " JSON_KEY_EDELAY " should be integer");
+                goto exit;
+            }
+
+            backoff = json_object_get_int(json_object_iter_peek_value(&it));
+            json_object_iter_next(&it);
+            continue;
+        }
+        if (!strcmp(json_object_iter_peek_name(&it), JSON_KEY_ERTRY)) {
+            if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_boolean) {
+                error("value of key " JSON_KEY_ERTRY " should be integer");
+                goto exit;
+            }
+
+            block_retry = json_object_get_boolean(json_object_iter_peek_value(&it));
+            json_object_iter_next(&it);
+            continue;
+        }
+        error("Unknown key \"%s\" in error response payload. Ignoring", json_object_iter_peek_name(&it));
+        json_object_iter_next(&it);
+    }
+
+    if (block_retry > 0)
+        aclk_disable_runtime = 1;
+
+    if (backoff > 0)
+        aclk_block_until = now_monotonic_sec() + backoff;
+
+    error("Cloud returned EC=\"%s\", Msg-Key:\"%s\", Msg:\"%s\", BlockRetry:%s, Backoff:%ds (-1 unset by cloud)", ec, ek, emsg, block_retry > 0 ? "true" : "false", backoff);
+    rc = 0;
+exit:
+    json_object_put(json);
+    return rc;
+}
+#endif
+
 #define OTP_URL_PREFIX "/api/v1/auth/node/"
 int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target) {
     // TODO this fnc will be rewritten and simplified in following PRs
@@ -304,6 +459,8 @@ int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_p
     }
     if (resp.http_code != 200) {
         error ("ACLK_OTP Challenge HTTP code not 200 OK (got %d)", resp.http_code);
+        if (resp.payload_size)
+            aclk_parse_otp_error(resp.payload);
         goto cleanup_resp;
     }
     info ("ACLK_OTP Got Challenge from Cloud");
@@ -355,6 +512,8 @@ int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_p
     }
     if (resp.http_code != 201) {
         error ("ACLK_OTP Password HTTP code not 201 Created (got %d)", resp.http_code);
+        if (resp.payload_size)
+            aclk_parse_otp_error(resp.payload);
         goto cleanup_resp;
     }
     info ("ACLK_OTP Got Password from Cloud");

+ 23 - 17
aclk/aclk_util.c

@@ -279,33 +279,39 @@ const char *aclk_get_topic(enum aclk_topics topic)
 /*
  * TBEB with randomness
  *
- * @param mode 0 - to reset the delay,
- *             1 - to advance a step and calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
+ * @param reset 1 - to reset the delay,
+ *              0 - to advance a step and calculate sleep time in ms
+ * @param min, max in seconds
  * @returns delay in ms
  *
  */
-#define ACLK_MAX_BACKOFF_DELAY 1024
-unsigned long int aclk_reconnect_delay(int mode)
-{
-    static int fail = -1;
-    unsigned long int delay;
 
-    if (!mode || fail == -1) {
-        srandom(time(NULL));
-        fail = mode - 1;
+unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max) {
+    static int attempt = -1;
+
+    if (reset) {
+        attempt = -1;
         return 0;
     }
 
-    delay = (1 << fail);
+    attempt++;
 
-    if (delay >= ACLK_MAX_BACKOFF_DELAY) {
-        delay = ACLK_MAX_BACKOFF_DELAY * 1000;
-    } else {
-        fail++;
-        delay *= 1000;
-        delay += (random() % (MAX(1000, delay/2)));
+    if (attempt == 0) {
+        srandom(time(NULL));
+        return 0;
     }
 
+    unsigned long int delay = pow(base, attempt - 1);
+    delay *= MSEC_PER_SEC;
+
+    delay += (random() % (MAX(1000, delay/2)));
+
+    if (delay <= min * MSEC_PER_SEC)
+        return min;
+
+    if (delay >= max * MSEC_PER_SEC)
+        return max;
+
     return delay;
 }
 

+ 2 - 1
aclk/aclk_util.h

@@ -75,7 +75,8 @@ int aclk_get_conv_log_next();
 #endif
 #endif
 
-unsigned long int aclk_reconnect_delay(int mode);
+unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max);
+#define aclk_tbeb_reset(x) aclk_tbeb_delay(1, 0, 0, 0)
 
 typedef enum aclk_proxy_type {
     PROXY_TYPE_UNKNOWN = 0,