Browse Source

MQTT 5 publish topic alias support (#14067)

* mqtt_websockets bumps version
* use the new topic alias support in netdata
Timotej S 2 years ago
parent
commit
a749ab00a6
5 changed files with 32 additions and 3 deletions
  1. 5 2
      Makefile.am
  2. 4 0
      aclk/aclk.c
  3. 18 0
      aclk/aclk_util.c
  4. 4 0
      aclk/aclk_util.h
  5. 1 1
      mqtt_websockets

+ 5 - 2
Makefile.am

@@ -733,9 +733,12 @@ libmqttwebsockets_a_SOURCES = \
     mqtt_websockets/c-rbuf/include/ringbuffer.h \
     mqtt_websockets/c-rbuf/src/ringbuffer_internal.h \
     mqtt_websockets/MQTT-C/src/mqtt.c \
-    mqtt_websockets/MQTT-C/include/mqtt.h
+    mqtt_websockets/MQTT-C/include/mqtt.h \
+    mqtt_websockets/c_rhash/src/c_rhash.c \
+    mqtt_websockets/c_rhash/include/c_rhash.h \
+    mqtt_websockets/c_rhash/src/c_rhash_internal.h
 
-libmqttwebsockets_a_CFLAGS = $(CFLAGS) -DMQTT_WSS_CUSTOM_ALLOC -DRBUF_CUSTOM_MALLOC -I$(srcdir)/aclk/helpers
+libmqttwebsockets_a_CFLAGS = $(CFLAGS) -DMQTT_WSS_CUSTOM_ALLOC -DRBUF_CUSTOM_MALLOC -I$(srcdir)/aclk/helpers -I$(srcdir)/mqtt_websockets/c_rhash/include
 
 mqtt_websockets/src/mqtt_wss_client.$(OBJEXT) : CFLAGS += -Wno-unused-result
 

+ 4 - 0
aclk/aclk.c

@@ -365,6 +365,10 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
     aclk_rcvd_cloud_msgs = 0;
     aclk_connection_counter++;
 
+    aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER;
+    while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL)
+        mqtt_wss_set_topic_alias(client, topic);
+
     aclk_send_agent_connection_update(client, 1);
 }
 

+ 18 - 0
aclk/aclk_util.c

@@ -307,6 +307,24 @@ const char *aclk_get_topic(enum aclk_topics topic)
     return NULL;
 }
 
+/*
+ * Allows iterating all topics in topic cache without
+ * having to resort to callbacks. 
+ */
+
+const char *aclk_topic_cache_iterate(aclk_topic_cache_iter_t *iter)
+{
+    if (!aclk_topic_cache) {
+        error("Topic cache not initialized when %s was called.", __FUNCTION__);
+        return NULL;
+    }
+
+    if (*iter >= aclk_topic_cache_items)
+        return NULL;
+
+    return aclk_topic_cache[(*iter)++]->topic;
+}
+
 /*
  * TBEB with randomness
  *

+ 4 - 0
aclk/aclk_util.h

@@ -93,9 +93,13 @@ enum aclk_topics {
     ACLK_TOPICID_CTXS_UPDATED          = 20
 };
 
+typedef size_t aclk_topic_cache_iter_t;
+#define ACLK_TOPIC_CACHE_ITER_T_INITIALIZER (0)
+
 const char *aclk_get_topic(enum aclk_topics topic);
 int aclk_generate_topic_cache(struct json_object *json);
 void free_topic_cache(void);
+const char *aclk_topic_cache_iterate(aclk_topic_cache_iter_t *iter);
 // TODO
 // aclk_topics_reload //when claim id changes
 

+ 1 - 1
mqtt_websockets

@@ -1 +1 @@
-Subproject commit d1e30f55e2c21e3c8982c76e6a969a1e65292d90
+Subproject commit 8869ab354d10c071c1e5e33602cc6b7940b4427c