Browse Source

Remove option to use MQTT 3 (#13824)

* remove mqtt3 support
Timotej S 2 years ago
parent
commit
50be9eb132
6 changed files with 18 additions and 64 deletions
  1. 0 2
      aclk/README.md
  2. 15 20
      aclk/aclk.c
  3. 0 1
      aclk/aclk.h
  4. 0 3
      aclk/aclk_alarm_api.c
  5. 0 16
      aclk/aclk_query_queue.c
  6. 3 22
      aclk/aclk_tx_msgs.c

+ 0 - 2
aclk/README.md

@@ -60,12 +60,10 @@ You can configure following keys in the `netdata.conf` section `[cloud]`:
 [cloud]
   statistics = yes
   query thread count = 2
-  mqtt5 = yes
 ```
 
 - `statistics` enables/disables ACLK related statistics and their charts. You can disable this to save some space in the database and slightly reduce memory usage of Netdata Agent.
 - `query thread count` specifies the number of threads to process cloud queries. Increasing this setting is useful for nodes with many children (streaming), which can expect to handle more queries (and/or more complicated queries).
-- `mqtt5` allows disabling the new MQTT5 implementation which is used now by default in case of issues. This option will be removed in future stable release.
 
 ## Disable the ACLK
 

+ 15 - 20
aclk/aclk.c

@@ -32,7 +32,6 @@ int aclk_connection_counter = 0;
 int disconnect_req = 0;
 
 int aclk_connected = 0;
-int use_mqtt_5 = 0;
 int aclk_ctx_based = 0;
 int aclk_disable_runtime = 0;
 int aclk_stats_enabled;
@@ -459,9 +458,9 @@ static int aclk_block_till_recon_allowed() {
  */
 static int aclk_get_transport_idx(aclk_env_t *env) {
     for (size_t i = 0; i < env->transport_count; i++) {
-        // currently we support only MQTT 3
+        // currently we support only MQTT 5
         // therefore select first transport that matches
-        if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) {
+        if (env->transports[i]->type == ACLK_TRP_MQTT_5) {
             return i;
         }
     }
@@ -495,7 +494,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
     while (!netdata_exit) {
         char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
         if (cloud_base_url == NULL) {
-            error("Do not move the cloud base url out of post_conf_load!!");
+            error_report("Do not move the cloud base url out of post_conf_load!!");
             return -1;
         }
 
@@ -505,7 +504,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
         info("Attempting connection now");
         memset(&base_url, 0, sizeof(url_t));
         if (url_parse(cloud_base_url, &base_url)) {
-            error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
+            error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
             sleep(CLOUD_BASE_URL_READ_RETRY);
             url_t_destroy(&base_url);
             continue;
@@ -535,7 +534,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
         ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
         url_t_destroy(&base_url);
         if (ret) {
-            error("Failed to Get ACLK environment");
+            error_report("Failed to Get ACLK environment");
             // delay handled by aclk_block_till_recon_allowed
             continue;
         }
@@ -549,14 +548,14 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
         }
 
         if (!aclk_env_has_capa("proto")) {
-            error ("Can't use encoding=proto without at least \"proto\" capability.");
+            error_report("Can't use encoding=proto without at least \"proto\" capability.");
             continue;
         }
         info("New ACLK protobuf protocol negotiated successfully (/env response).");
 
         memset(&auth_url, 0, sizeof(url_t));
         if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
-            error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
+            error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
             url_t_destroy(&auth_url);
             continue;
         }
@@ -564,7 +563,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
         ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
         url_t_destroy(&auth_url);
         if (ret) {
-            error("Error passing Challenge/Response to get OTP");
+            error_report("Error passing Challenge/Response to get OTP");
             continue;
         }
 
@@ -573,20 +572,20 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
         mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
 
         if (!mqtt_conn_params.will_topic) {
-            error("Couldn't get LWT topic. Will not send LWT.");
+            error_report("Couldn't get LWT topic. Will not send LWT.");
             continue;
         }
 
         // Do the MQTT connection
         ret = aclk_get_transport_idx(aclk_env);
         if (ret < 0) {
-            error("Cloud /env endpoint didn't return any transport usable by this Agent.");
+            error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
             continue;
         }
 
         memset(&mqtt_url, 0, sizeof(url_t));
         if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
-            error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
+            error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
             url_t_destroy(&mqtt_url);
             continue;
         }
@@ -672,9 +671,7 @@ void *aclk_main(void *ptr)
     if (wait_till_agent_claim_ready())
         goto exit;
 
-    use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
-
-    if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) {
+    if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) {
         error("Couldn't initialize MQTT_WSS network library");
         goto exit;
     }
@@ -919,7 +916,7 @@ char *aclk_state(void)
         "ACLK Version: 2\n"
         "Protocols Supported: Protobuf\n"
     );
-    buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", use_mqtt_5 ? 5 : 3);
+    buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5);
 
     char *agent_id = get_agent_claimid();
     if (agent_id == NULL)
@@ -1072,7 +1069,7 @@ char *aclk_state_json(void)
     tmp = json_object_new_string("Protobuf");
     json_object_object_add(msg, "used-cloud-protocol", tmp);
 
-    tmp = json_object_new_int(use_mqtt_5 ? 5 : 3);
+    tmp = json_object_new_int(5);
     json_object_object_add(msg, "mqtt-version", tmp);
 
     tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
@@ -1171,9 +1168,7 @@ void add_aclk_host_labels(void) {
             break;
     }
 
-    int mqtt5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_YES);
-
-    rrdlabels_add(labels, "_mqtt_version", mqtt5 ? "5" : "3", RRDLABEL_SRC_AUTO);
+    rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO);
     rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
     rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
 #else

+ 0 - 1
aclk/aclk.h

@@ -14,7 +14,6 @@
 #endif /* ENABLE_ACLK */
 
 extern int aclk_connected;
-extern int use_mqtt_5;
 extern int aclk_ctx_based;
 extern int aclk_disable_runtime;
 extern int aclk_stats_enabled;

+ 0 - 3
aclk/aclk_alarm_api.c

@@ -23,9 +23,6 @@ void aclk_send_alarm_log_entry(struct alarm_log_entry *log_entry)
     char *payload = generate_alarm_log_entry(&payload_size, log_entry);
 
     aclk_send_bin_msg(payload, payload_size, ACLK_TOPICID_ALARM_LOG, "AlarmLogEntry");
-
-    if (!use_mqtt_5)
-        freez(payload);
 }
 
 void aclk_send_provide_alarm_cfg(struct provide_alarm_configuration *cfg)

+ 0 - 16
aclk/aclk_query_queue.c

@@ -111,22 +111,6 @@ void aclk_query_free(aclk_query_t query)
             freez(query->data.http_api_v2.query);
         break;
 
-    case NODE_STATE_UPDATE:
-    case REGISTER_NODE:
-    case CHART_DIMS_UPDATE:
-    case CHART_CONFIG_UPDATED:
-    case CHART_RESET:
-    case RETENTION_UPDATED:
-    case UPDATE_NODE_INFO:
-    case ALARM_LOG_HEALTH:
-    case ALARM_PROVIDE_CFG:
-    case ALARM_SNAPSHOT:
-    case UPDATE_NODE_COLLECTORS:
-    case PROTO_BIN_MESSAGE:
-        if (!use_mqtt_5)
-            freez(query->data.bin_payload.payload);
-        break;
-
     default:
         break;
     }

+ 3 - 22
aclk/aclk_tx_msgs.c

@@ -35,10 +35,7 @@ uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, s
         return 0;
     }
 
-    if (use_mqtt_5)
-        mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
-    else
-        mqtt_wss_publish_pid(client, topic, msg, msg_len,  MQTT_WSS_PUB_QOS1, &packet_id);
+    mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
 
 #ifdef NETDATA_INTERNAL_CHECKS
     aclk_stats_msg_published(packet_id);
@@ -64,7 +61,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
     uint16_t packet_id;
     const char *str;
     char *full_msg = NULL;
-    int len, rc;
+    int len;
 
     if (unlikely(!topic || topic[0] != '/')) {
         error ("Full topic required!");
@@ -87,21 +84,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
         len += payload_len;
     }
 
-    if (use_mqtt_5)
-        mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id);
-    else {
-        rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len,  MQTT_WSS_PUB_QOS1, &packet_id, 5000);
-        freez(full_msg);
-        json_object_put(msg);
-        if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
-            error("Timeout sending binpacked message");
-            return HTTP_RESP_BACKEND_FETCH_FAILED;
-        }
-        if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
-            error("Message is bigger than allowed maximum");
-            return HTTP_RESP_FORBIDDEN;
-        }
-    }
+    mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id);
 
 #ifdef NETDATA_INTERNAL_CHECKS
     aclk_stats_msg_published(packet_id);
@@ -263,8 +246,6 @@ uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable
     }
 
     pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
-    if (!use_mqtt_5)
-        freez(msg);
     if (localhost->aclk_state.prev_claimed_id) {
         freez(localhost->aclk_state.prev_claimed_id);
         localhost->aclk_state.prev_claimed_id = NULL;