Browse Source

initial MQTT over Secure Websockets support for ACLK (#7988)

* add aclk_lws_wss_client

* shorten the thread name in case more threads are necessary

* Draft libmosquitto<->libwebsockets integration

* use ringbuffer for recvd data

* Some code cleanup

* if mqtt connection fails close lws connection and reconect

* clear buffers on connection closed

* work on better loop integration

* move mosquitto read out of loop

* remove useless code when using websockets

* LWS - make host and port configurable

* make default port 9002 as we use MQTT over WSS now

* wait for link up before subscribing

start query thread after connection has been made

* cleanup - remove useless var

* if there is anything to write send it immediatelly

* cleanup: move buffers into engine instace

* allow MQTT IO from multiple threads (although preffered is MQTT IO to be done by single thread)

* add warning to future self

* add some comments for whoever reviews

* add destroy fnc - start work on cleanup

* minor - add mosquitto to .gitignore

* fix codacy errors

* do not reconnect automatically by default

* minor - remove outdated comment

* tab -> spaces

Co-Authored-By: Konstantinos Natsakis <5933427+knatsakis@users.noreply.github.com>

* address thiagoftsm valid comments

* add usefull logs in case of trouble

* fix -Wall -Wextra -Wformat-signedness warnings

* log error when connection fails

* update .gitignore to match new installer

* Fwd LWS logs to Netdata logs

* minor - tabulation fixes

* fix comments from thiago

* force SSL

* move UNUSED to libnetdata.h
@thiago correctly pointed out it might be usefull for others

* minor - rename function for clarity

* minor - remove commented out code

Co-authored-by: Konstantinos Natsakis <5933427+knatsakis@users.noreply.github.com>
Timotej Šiškovič 5 years ago
parent
commit
f4e1012f5f

+ 2 - 0
Makefile.am

@@ -463,6 +463,8 @@ ACLK_PLUGIN_FILES = \
     aclk/agent_cloud_link.h \
     aclk/mqtt.c \
     aclk/mqtt.h \
+    aclk/aclk_lws_wss_client.c \
+    aclk/aclk_lws_wss_client.h \
     $(NULL)
 
 EXPORTING_ENGINE_FILES = \

+ 337 - 0
aclk/aclk_lws_wss_client.c

@@ -0,0 +1,337 @@
+#include "aclk_lws_wss_client.h"
+
+#include "libnetdata/libnetdata.h"
+
+static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
+
+struct aclk_lws_wss_perconnect_data {
+	int todo;
+};
+
+struct lws_wss_packet_buffer {
+	unsigned char* data;
+	size_t data_size;
+	struct lws_wss_packet_buffer *next;
+};
+
+static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void* data, size_t size)
+{
+	struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer));
+	if(data) {
+		new->data = mallocz(LWS_PRE+size);
+		memcpy(new->data+LWS_PRE, data, size);
+		new->data_size = size;
+	}
+	return new;
+}
+
+static inline void lws_wss_packet_buffer_append(struct lws_wss_packet_buffer **list, struct lws_wss_packet_buffer *item)
+{
+	struct lws_wss_packet_buffer *tail = *list;
+	if(!*list) {
+		*list = item;
+		return;
+	}
+	while(tail->next) {
+		tail = tail->next;
+	}
+	tail->next = item;
+}
+
+static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_pop(struct lws_wss_packet_buffer **list)
+{
+	struct lws_wss_packet_buffer *ret = *list;
+	if(ret != NULL)
+		*list = ret->next;
+
+	return ret;
+}
+
+static inline void lws_wss_packet_buffer_free(struct lws_wss_packet_buffer *item)
+{
+	freez(item->data);
+	freez(item);
+}
+
+static inline void _aclk_lws_wss_read_buffer_clear(struct lws_ring *ringbuffer)
+{
+	size_t elems = lws_ring_get_count_waiting_elements(ringbuffer, NULL);
+	if(elems > 0)
+		lws_ring_consume(ringbuffer, NULL, NULL, elems);
+}
+
+static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer **list)
+{
+	struct lws_wss_packet_buffer *i;
+	while((i = lws_wss_packet_buffer_pop(list)) != NULL) {
+		lws_wss_packet_buffer_free(i);
+	}
+	*list = NULL;
+}
+
+static inline void aclk_lws_wss_clear_io_buffers(struct aclk_lws_wss_engine_instance *inst)
+{
+	aclk_lws_mutex_lock(&inst->read_buf_mutex);
+	_aclk_lws_wss_read_buffer_clear(inst->read_ringbuffer);
+	aclk_lws_mutex_unlock(&inst->read_buf_mutex);
+	aclk_lws_mutex_lock(&inst->write_buf_mutex);
+	_aclk_lws_wss_write_buffer_clear(&inst->write_buffer_head);
+	aclk_lws_mutex_unlock(&inst->write_buf_mutex);
+}
+
+static const struct lws_protocols protocols[] = {
+	{
+		"aclk-wss",
+		aclk_lws_wss_callback,
+		sizeof(struct aclk_lws_wss_perconnect_data),
+		0, 0, 0, 0
+	},
+	{ NULL, NULL, 0, 0, 0, 0, 0 }
+};
+
+static void aclk_lws_wss_log_divert(int level, const char *line) {
+	switch(level){
+		case LLL_ERR:
+			error("Libwebsockets Error: %s", line);
+			break;
+		case LLL_WARN:
+			debug(D_ACLK, "Libwebsockets Warn: %s", line);
+			break;
+		default:
+			error("Libwebsockets try to log with unknown log level (%d), msg: %s", level, line);
+	}
+}
+
+struct aclk_lws_wss_engine_instance* aclk_lws_wss_client_init (const struct aclk_lws_wss_engine_callbacks *callbacks, const char *target_hostname, int target_port) {
+	static int lws_logging_initialized = 0;
+	struct lws_context_creation_info info;
+	struct aclk_lws_wss_engine_instance *inst;
+
+	if(unlikely(!lws_logging_initialized)) {
+		lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert);
+		lws_logging_initialized = 1;
+	}
+
+	if(!callbacks || !target_hostname)
+		return NULL;
+
+	inst = callocz(1, sizeof(struct aclk_lws_wss_engine_instance));
+
+	inst->host = target_hostname;
+	inst->port = target_port;
+
+	memset(&info, 0, sizeof(struct lws_context_creation_info));
+	info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
+	info.port = CONTEXT_PORT_NO_LISTEN;
+	info.protocols = protocols;
+	info.user = inst;
+	
+	inst->lws_context = lws_create_context(&info);
+	if(!inst->lws_context)
+		goto failure_cleanup_2;
+
+	inst->callbacks = *callbacks;
+
+	aclk_lws_mutex_init(&inst->write_buf_mutex);
+	aclk_lws_mutex_init(&inst->read_buf_mutex);
+
+	inst->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL);
+	if(!inst->read_ringbuffer)
+		goto failure_cleanup;
+
+	return inst;
+
+failure_cleanup:
+	lws_context_destroy(inst->lws_context);
+failure_cleanup_2:
+	freez(inst);
+	return NULL;
+}
+
+void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst) {
+	lws_context_destroy(inst->lws_context);
+	inst->lws_context = NULL;
+	inst->lws_wsi = NULL;
+
+	aclk_lws_wss_clear_io_buffers(inst);
+
+#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
+	pthread_mutex_destroy(&inst->write_buf_mutex);
+	pthread_mutex_destroy(&inst->read_buf_mutex);
+#endif
+}
+
+void _aclk_wss_connect(struct aclk_lws_wss_engine_instance *inst){
+	struct lws_client_connect_info i;
+
+	memset(&i, 0, sizeof(i));
+	i.context = inst->lws_context;
+	i.port = inst->port;
+	i.address = inst->host;
+	i.path = "/mqtt";
+	i.host = inst->host;
+	i.protocol = "mqtt";
+#ifdef ACLK_SSL_ALLOW_SELF_SIGNED
+	i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
+#else
+	i.ssl_connection = LCCSCF_USE_SSL;
+#endif
+	lws_client_connect_via_info(&i);
+}
+
+static inline int received_data_to_ringbuff(struct lws_ring *buffer, void* data, size_t len) {
+	if( lws_ring_insert(buffer, data, len) != len ) {
+		error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding.");
+		return 0;
+	}
+	return 1;
+}
+
+static int
+aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason,
+			void *user, void *in, size_t len)
+{
+	UNUSED(user);
+	struct aclk_lws_wss_engine_instance *inst = lws_context_user(lws_get_context(wsi));
+	struct lws_wss_packet_buffer *data;
+	int retval = 0;
+
+	if( !inst ) {
+		error("Callback received without any aclk_lws_wss_engine_instance!");
+		return -1;
+	}
+
+	if( inst->upstream_reconnect_request ) {
+		error("Closing lws connectino due to libmosquitto error.");
+		char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection.";
+		lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char*)upstream_connection_error, strlen(upstream_connection_error));
+		retval = -1;
+		inst->upstream_reconnect_request = 0;
+	}
+
+	switch (reason) {
+	case LWS_CALLBACK_CLIENT_WRITEABLE:
+		aclk_lws_mutex_lock(&inst->write_buf_mutex);
+		data = lws_wss_packet_buffer_pop(&inst->write_buffer_head);
+		if(likely(data)) {
+			lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY);
+			lws_wss_packet_buffer_free(data);
+			if(inst->write_buffer_head)
+				lws_callback_on_writable(inst->lws_wsi);
+		}
+		aclk_lws_mutex_unlock(&inst->write_buf_mutex);
+		break;
+	case LWS_CALLBACK_CLIENT_RECEIVE:
+		aclk_lws_mutex_lock(&inst->read_buf_mutex);
+		if(!received_data_to_ringbuff(inst->read_ringbuffer, in, len))
+			retval = 1;
+		aclk_lws_mutex_unlock(&inst->read_buf_mutex);
+
+		if(likely(inst->callbacks.data_rcvd_callback))
+			// to future myself -> do not call this while read lock is active as it will eventually
+			// want to acquire same lock later in aclk_lws_wss_client_read() function
+			inst->callbacks.data_rcvd_callback();
+		else
+			inst->data_to_read = 1; //to inform logic above there is reason to call mosquitto_loop_read
+		break;
+	case LWS_CALLBACK_PROTOCOL_INIT:
+		//initial connection here
+		//later we will reconnect with delay od ACLK_LWS_WSS_RECONNECT_TIMEOUT
+		//in case this connection fails or drops
+		_aclk_wss_connect(inst);
+		break;
+	case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
+		//TODO if already active make some error noise
+		//currently we expect only one connection per netdata
+		inst->lws_wsi = wsi;
+		break;
+#ifdef AUTO_RECONNECT_ON_LWS_LAYER
+	case LWS_CALLBACK_USER:
+		inst->reconnect_timeout_running = 0;
+		_aclk_wss_connect(inst);
+		break;
+#endif
+	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
+		error("Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", inst->host, inst->port, (in ? (char*)in : "not given"));
+		/* FALLTHRU */
+	case LWS_CALLBACK_CLIENT_CLOSED:
+	case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
+#ifdef AUTO_RECONNECT_ON_LWS_LAYER
+		if(!inst->reconnect_timeout_running) {
+			lws_timed_callback_vh_protocol(lws_get_vhost(wsi),
+						lws_get_protocol(wsi),
+						LWS_CALLBACK_USER, ACLK_LWS_WSS_RECONNECT_TIMEOUT);
+			inst->reconnect_timeout_running = 1;
+		}
+		/* FALLTHRU */
+#endif
+		//no break here on purpose we want to continue with LWS_CALLBACK_WSI_DESTROY
+	case LWS_CALLBACK_WSI_DESTROY:
+		aclk_lws_wss_clear_io_buffers(inst);
+		inst->lws_wsi = NULL;
+		inst->websocket_connection_up = 0;
+		break;
+	case LWS_CALLBACK_CLIENT_ESTABLISHED:
+		inst->websocket_connection_up = 1;
+		if(inst->callbacks.connection_established_callback)
+			inst->callbacks.connection_established_callback();
+		break;
+	default:
+		break;
+	}
+	return retval; //0-OK, other connection should be closed!
+}
+
+int aclk_lws_wss_client_write(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count)
+{
+	if(inst && inst->lws_wsi && inst->websocket_connection_up)
+	{
+		aclk_lws_mutex_lock(&inst->write_buf_mutex);
+		lws_wss_packet_buffer_append(&inst->write_buffer_head, lws_wss_packet_buffer_new(buf, count));
+		aclk_lws_mutex_unlock(&inst->write_buf_mutex);
+
+		lws_callback_on_writable(inst->lws_wsi);
+		return count;
+	}
+	return 0;
+}
+
+int aclk_lws_wss_client_read(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count)
+{
+	size_t data_to_be_read = count;
+
+	aclk_lws_mutex_lock(&inst->read_buf_mutex);
+	size_t readable_byte_count = lws_ring_get_count_waiting_elements(inst->read_ringbuffer, NULL);
+	if(unlikely(readable_byte_count == 0)) {
+		errno = EAGAIN;
+		data_to_be_read = -1;
+		goto abort;
+	}
+
+	if( readable_byte_count < data_to_be_read )
+		data_to_be_read = readable_byte_count;
+
+	data_to_be_read = lws_ring_consume(inst->read_ringbuffer, NULL, buf, data_to_be_read);
+	if(data_to_be_read == readable_byte_count)
+		inst->data_to_read = 0;
+
+abort:
+	aclk_lws_mutex_unlock(&inst->read_buf_mutex);
+	return data_to_be_read;
+}
+
+int aclk_lws_wss_service_loop(struct aclk_lws_wss_engine_instance *inst)
+{
+	return lws_service(inst->lws_context, 0);
+}
+
+// in case the MQTT connection disconnect while lws transport is still operational
+// we should drop connection and reconnect
+// this function should be called when that happens to notify lws of that situation
+void aclk_lws_wss_mqtt_layer_disconect_notif(struct aclk_lws_wss_engine_instance *inst)
+{
+	if(inst->lws_wsi && inst->websocket_connection_up) {
+		inst->upstream_reconnect_request = 1;
+		lws_callback_on_writable(inst->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written.
+	}
+}

+ 81 - 0
aclk/aclk_lws_wss_client.h

@@ -0,0 +1,81 @@
+#ifndef ACLK_LWS_WSS_CLIENT_H
+#define ACLK_LWS_WSS_CLIENT_H
+
+#include <libwebsockets.h>
+
+#include "libnetdata/libnetdata.h"
+
+#define ACLK_LWS_WSS_RECONNECT_TIMEOUT 5
+
+// This is as define because ideally the ACLK at high level
+// can do mosqitto writes and reads only from one thread
+// which is cleaner implementation IMHO
+// in such case this mutexes are not necessarry and life
+// is simpler
+#define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1
+
+#define ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES 128*1024
+
+#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
+	#define aclk_lws_mutex_init(x) netdata_mutex_init(x)
+	#define aclk_lws_mutex_lock(x) netdata_mutex_lock(x)
+	#define aclk_lws_mutex_unlock(x) netdata_mutex_unlock(x)
+#else
+	#define aclk_lws_mutex_init(x)
+	#define aclk_lws_mutex_lock(x)
+	#define aclk_lws_mutex_unlock(x)
+#endif
+
+struct aclk_lws_wss_engine_callbacks {
+	void (*connection_established_callback)();
+	void (*data_rcvd_callback)();
+	void (*data_writable_callback)();
+};
+
+struct lws_wss_packet_buffer;
+
+struct aclk_lws_wss_engine_instance {
+	//target host/port for connection
+	const char *host;
+	int port;
+
+	//internal data
+	struct lws_context *lws_context;
+	struct lws *lws_wsi;
+
+#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
+	netdata_mutex_t write_buf_mutex;
+	netdata_mutex_t read_buf_mutex;
+#endif
+
+	struct lws_wss_packet_buffer *write_buffer_head;
+	struct lws_ring *read_ringbuffer;
+
+	struct aclk_lws_wss_engine_callbacks callbacks;
+
+	//flags to be readed by engine user
+	int websocket_connection_up;
+
+// currently this is by default disabled
+// as decision has been made that reconnection
+// will have to be done from top layer
+// (after getting the new MQTT auth data)
+// for now i keep it here as it is usefull for
+// some of my internall testing
+#ifdef AUTO_RECONNECT_ON_LWS_LAYER
+	int reconnect_timeout_running;
+#endif
+	int data_to_read;
+	int upstream_reconnect_request;
+};
+
+struct aclk_lws_wss_engine_instance* aclk_lws_wss_client_init (const struct aclk_lws_wss_engine_callbacks *callbacks, const char *target_hostname, int target_port);
+void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst);
+
+int aclk_lws_wss_client_write(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count);
+int aclk_lws_wss_client_read (struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count);
+int aclk_lws_wss_service_loop(struct aclk_lws_wss_engine_instance *inst);
+
+void aclk_lws_wss_mqtt_layer_disconect_notif(struct aclk_lws_wss_engine_instance *inst);
+
+#endif

+ 7 - 3
aclk/agent_cloud_link.c

@@ -94,6 +94,10 @@ int cloud_to_agent_parse(JSON_ENTRY *e)
 
 // Set when we have connection up and running from the connection callback
 int aclk_connection_initialized = 0;
+// TODO modify previous comment if this stays this way
+// con_initialized means library is initialized and ready to be used
+// acklk_connected means there is actually an established connection
+int aclk_mqtt_connected = 0;
 
 static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
 static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
@@ -625,10 +629,10 @@ void *aclk_main(void *ptr)
             continue;
         }
 
-        if (unlikely(!aclk_subscribed)) {
+        if (unlikely(!aclk_subscribed) && aclk_mqtt_connected) {
             aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2);
         }
-        if (unlikely(!query_thread.thread)) {
+        if (unlikely(!query_thread.thread && aclk_mqtt_connected)) {
             query_thread.thread = mallocz(sizeof(netdata_thread_t));
             netdata_thread_create(
                 query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
@@ -763,7 +767,7 @@ int aclk_init(ACLK_INIT_ACTION action)
     aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20);
 
     aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost");
-    aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 1883);
+    aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 9002);
 
     info("Maximum parallel outgoing messages %d", aclk_send_maximum);
     info("Maximum parallel incoming messages %d", aclk_recv_maximum);

+ 1 - 1
aclk/agent_cloud_link.h

@@ -63,7 +63,7 @@ void *aclk_main(void *ptr);
 
 #define NETDATA_ACLK_HOOK \
     { \
-        .name = "AgentCloudLink", \
+        .name = "ACLK_Main", \
         .config_section = NULL, \
         .config_name = NULL, \
         .enabled = 1, \

+ 135 - 7
aclk/mqtt.c

@@ -3,6 +3,7 @@
 #include <libnetdata/json/json.h>
 #include "../daemon/common.h"
 #include "mqtt.h"
+#include "aclk_lws_wss_client.h"
 
 void (*_on_connect)(void *ptr) = NULL;
 void (*_on_disconnect)(void *ptr) = NULL;
@@ -104,6 +105,14 @@ void mqtt_message_callback(
 
 }
 
+int lws_wss_client_initialized = 0;
+
+// This is not define because in future we might want to try plain
+// MQTT as fallback ?
+// e.g. try 1st MQTT-WSS, 2nd MQTT plain, 3rd https fallback...
+int mqtt_over_websockets = 1;
+struct aclk_lws_wss_engine_instance *lws_engine_instance = NULL;
+
 void connect_callback(struct mosquitto *mosq, void *obj, int rc)
 {
     (void) obj;
@@ -112,6 +121,7 @@ void connect_callback(struct mosquitto *mosq, void *obj, int rc)
     info("Connection to cloud estabilished");
 
     aclk_connection_initialized = 1;
+    aclk_mqtt_connected = 1;
     _on_connect((void *) mosq);
 
     return;
@@ -127,7 +137,12 @@ void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
     // TODO: Keep the connection "alive" for now. The library will reconnect.
 
     //mqtt_connection_initialized = 0;
+    aclk_mqtt_connected = 0;
     _on_disconnect((void *) mosq);
+
+    if(mqtt_over_websockets && lws_engine_instance)
+        aclk_lws_wss_mqtt_layer_disconect_notif(lws_engine_instance);
+
     //sleep_usec(USEC_PER_SEC * 5);
     return;
 }
@@ -141,7 +156,17 @@ void _show_mqtt_info()
     info("Detected libmosquitto library version %d, %d.%d.%d",libmosq_version, libmosq_major, libmosq_minor, libmosq_revision);
 }
 
-int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *))
+size_t _mqtt_external_write_hook(void *buf, size_t count)
+{
+    return aclk_lws_wss_client_write(lws_engine_instance, buf, count);
+}
+
+size_t _mqtt_external_read_hook(void *buf, size_t count)
+{
+    return aclk_lws_wss_client_read(lws_engine_instance, buf, count);
+}
+
+int _mqtt_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *))
 {
     int rc;
     int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
@@ -194,7 +219,7 @@ int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *
     mosquitto_connect_callback_set(mosq, connect_callback);
     mosquitto_disconnect_callback_set(mosq, disconnect_callback);
 
-    mosquitto_username_pw_set(mosq, "anon", "anon");
+    mosquitto_username_pw_set(mosq, NULL, NULL);
 
     rc = mosquitto_threaded_set(mosq, 1);
     if (unlikely(rc != MOSQ_ERR_SUCCESS))
@@ -209,12 +234,21 @@ int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *
     info("MQTT in flight messages set to 1  -- %s", mosquitto_strerror(rc));
 #endif
 
-    rc = mosquitto_reconnect_delay_set(mosq, ACLK_RECONNECT_DELAY, ACLK_MAX_RECONNECT_DELAY, 1);
+    if(!mqtt_over_websockets) {
+        rc = mosquitto_reconnect_delay_set(mosq, ACLK_RECONNECT_DELAY, ACLK_MAX_RECONNECT_DELAY, 1);
 
-    if (unlikely(rc != MOSQ_ERR_SUCCESS))
-        error("Failed to set the reconnect delay (%d) (%s)", rc, mosquitto_strerror(rc));
+        if (unlikely(rc != MOSQ_ERR_SUCCESS))
+            error("Failed to set the reconnect delay (%d) (%s)", rc, mosquitto_strerror(rc));
+
+        mosquitto_tls_set(mosq, ca_crt, NULL, server_crt, server_key, NULL);
+    }
+
+    return rc;
+}
 
-    mosquitto_tls_set(mosq, ca_crt, NULL, server_crt, server_key, NULL);
+int _link_mqtt_connect(char *aclk_hostname, int aclk_port)
+{
+    int rc;
 
     rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL);
 
@@ -226,7 +260,83 @@ int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *
     return rc;
 }
 
-int _link_event_loop(int timeout)
+static inline void _link_mosquitto_write()
+{
+    int rc;
+
+    if(!mqtt_over_websockets)
+        return;
+
+    rc = mosquitto_loop_misc(mosq);
+    if(unlikely( rc != MOSQ_ERR_SUCCESS ))
+        debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc));
+
+    if(likely( mosquitto_want_write(mosq) )) {
+        rc = mosquitto_loop_write(mosq, 1);
+        if( rc != MOSQ_ERR_SUCCESS )
+            debug(D_ACLK, "ACLK: failure during mosquitto_loop_write %s", mosquitto_strerror(rc));
+    }
+}
+
+void aclk_lws_connect_notif_callback(){
+    //the connection is done by LWS so this parameters dont matter
+    //ig MQTT over LWS is used
+    _link_mqtt_connect("doesntmatter", 12345);
+    _link_mosquitto_write();
+}
+
+void aclk_lws_data_received_callback(){
+    int rc = mosquitto_loop_read(mosq, 1);
+    if(rc != MOSQ_ERR_SUCCESS)
+        debug(D_ACLK, "ACLK: failure during mosquitto_loop_read %s", mosquitto_strerror(rc));       
+}
+
+static const struct aclk_lws_wss_engine_callbacks aclk_lws_engine_callbacks = {
+    .connection_established_callback = aclk_lws_connect_notif_callback,
+    .data_rcvd_callback = aclk_lws_data_received_callback,
+    .data_writable_callback = NULL
+};
+
+int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *))
+{
+    int rc;
+
+    if(mqtt_over_websockets) {
+        // we will connect when WebSocket connection is up
+        // based on callback
+        if(!lws_wss_client_initialized) {
+            lws_engine_instance = aclk_lws_wss_client_init(&aclk_lws_engine_callbacks, aclk_hostname, aclk_port);
+            aclk_lws_wss_service_loop(lws_engine_instance);
+            lws_wss_client_initialized = 1;
+        }
+    }
+
+    rc = _mqtt_lib_init(aclk_hostname, aclk_port, on_connect, on_disconnect);
+    if(rc != MOSQ_ERR_SUCCESS)
+        return rc;
+
+    if(mqtt_over_websockets) {
+        mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook);
+        return MOSQ_ERR_SUCCESS;
+    } else {
+        // if direct mqtt connection is used
+        // connect immediatelly
+        return _link_mqtt_connect(aclk_hostname, aclk_port);
+    }
+}
+
+static inline int _link_event_loop_wss()
+{
+    if(lws_engine_instance && lws_engine_instance->websocket_connection_up)
+        _link_mosquitto_write();
+
+    aclk_lws_wss_service_loop(lws_engine_instance);
+    // this is because if use LWS we don't want
+    // mqtt to reconnect by itself
+    return MOSQ_ERR_SUCCESS;
+}
+
+static inline int _link_event_loop_plain_mqtt(int timeout)
 {
     int rc;
 
@@ -245,6 +355,14 @@ int _link_event_loop(int timeout)
     return rc;
 }
 
+int _link_event_loop(int timeout)
+{
+    if(mqtt_over_websockets)
+        return _link_event_loop_wss();
+    
+    return _link_event_loop_plain_mqtt(timeout);
+}
+
 void _link_shutdown()
 {
     int rc;
@@ -261,6 +379,12 @@ void _link_shutdown()
 
     mosquitto_destroy(mosq);
     mosq = NULL;
+
+    if(lws_engine_instance) {
+        aclk_lws_wss_client_destroy(lws_engine_instance);
+        lws_engine_instance = NULL;
+    }
+
     return;
 }
 
@@ -281,6 +405,8 @@ int _link_subscribe(char  *topic, int qos)
         return 1;
     }
 
+    _link_mosquitto_write();
+
     return 0;
 }
 
@@ -313,6 +439,8 @@ int _link_send_message(char *topic, char *message)
         error("MQTT message failed : %s", mosquitto_strerror(rc));
     }
 
+    _link_mosquitto_write();
+
     return rc;
 }
 #endif

+ 3 - 1
aclk/mqtt.h

@@ -15,7 +15,9 @@ int _link_subscribe(char *topic, int qos);
 int _link_send_message(char *topic, char *message);
 const char *_link_strerror(int rc);
 
-extern int aclk_connection_initialized;
 int aclk_handle_cloud_request(char *);
 
+extern int aclk_connection_initialized;
+extern int aclk_mqtt_connected;
+
 #endif //NETDATA_MQTT_H

+ 2 - 0
libnetdata/libnetdata.h

@@ -288,6 +288,8 @@ extern void recursive_config_double_dir_load(
 
 #define BITS_IN_A_KILOBIT 1000
 
+/* misc. */
+#define UNUSED(x) (void)(x)
 
 extern void netdata_cleanup_and_exit(int ret) NORETURN;
 extern void send_statistics(const char *action, const char *action_result, const char *action_data);

+ 1 - 0
libnetdata/log/log.h

@@ -37,6 +37,7 @@
 #define D_POLLFD            0x0000000020000000
 #define D_STREAM            0x0000000040000000
 #define D_RRDENGINE         0x0000000100000000
+#define D_ACLK              0x0000000200000000
 #define D_SYSTEM            0x8000000000000000
 
 //#define DEBUG (D_WEB_CLIENT_ACCESS|D_LISTENER|D_RRD_STATS)