Просмотр исходного кода

ACLK agent 1 (#7894)

* - Add initial mqtt support

* [WIP] Agent cloud link

- Setup main mqtt thread to connect to a broker using V5 of the MQTT protocol (TBD)

- Send alarms to "netdata/alarm"

- Add error checks to handle connection failures

- Add params for
  Broker, port
  Maximum concurrent sent  / recev messages

- Dummy function to check claiming status

- Generic mqtt_send command to publish message to a base topic , sub topic
  It will end up in the form base_topic/sub_topic

- Add host/port in the connection failure error message

* Test libmosquitto libs

* connect to broker locally (assume localhost:1883)
* subscribe to channel netdata/command
* Test try a reload command to trigger health reload
* publish alerts to netdata/alarm

* - Fix compile issues

* - Use sleep_usec instead of usleep

* - Delay reconnection on failure due to misconfiguration (high cpu usage)

* - Remove the TLS connection config

* - Fix NETDATA_MQTT_INITIALIZATION_SLEEP_WAIT to use seconds

* - Gather ACLK related code under aclk folder
- Add aclk_ functions for abstract layer
- Moved low level libs intergration in mqtt.c

* - Add README.md file with initial comment

* - Clean MQTT v5

* - Code cleanup

* - Remove alarm log for now
- Remove the heart beat

* - Remove message properties for V5

* - Remove message properties for V5 (header)

* Fixed the netdata target to use a local static version of libmosquitto.

The installer does not yet have steps to pull and build the local library.
cd project_root
git clone ssh://git@github.com/netdata/mosquitto mosquitto/
(cd mosquitto/lib && make)      # Ignore the cpp error

This will leave mosquitto/lib/libmosquitto.a for the build process to use.

* - Fix compile issues with older < 1.6 libmosquitto lib

* - Enable alarm events to check it works

- Re arrange includes

- Rework topic to be agent/guid/. Actual id will be
  returned by the is_agent_claimed

* - Add initial metadata info

- Added helper function in web_api

- Added a debug command (info)

* Update the claiming state to retrieve the claimed id.

* - Use define for constants like command and metadata topics

- Function to wait for initialization of the ACLK link

- New aclk_subscribe command with QOS parameter for the mqtt subscription

- Use the is_agent_claimed function to get the real claim id and use it to build the topics
  that will be used for the cloud communication

- Change in netdata-claim.sh.in to write the claim id without a trailing \n

* - Use define for constants like command and metadata topics

- Function to wait for initialization of the ACLK link

- New aclk_subscribe command with QOS parameter for the mqtt subscription

- Use the is_agent_claimed function to get the real claim id and use it to build the topics
  that will be used for the cloud communication

- Change in netdata-claim.sh.in to write the claim id without a trailing \n

* - Remove the alarm log for now

- Add code (but disabled) to send charts

* - Use dummy anon, anon as username and password for testing purposes

* - Use client id anon as well

* Testing without TLS

* Switching TLS back on to fix docker environment.

* - Added query processing
  An incoming URL now calls web_client_api_request_v1_data to handle a request and push the results
  back to the "data" topic

- Move the above processing from the message callback to the query handle loop

- Added helper "pause" , "resume" commands to stop and resume query processing to stress test loading the queue
  with queries before executing them

- Changed the endpoint topics to "meta", and "cmd"  (previously metadata and command)

* make info message follow protocol

* move metadata msg generation into new func

* move metadata msg generation into new func

* - Add metadata to the responses

- Add hook to queue chart changes on creation and dimensions

- Changed the queue mechanism to include delay for X seconds

- Add delayed submittion of charts to the cloud so that all DIMs are defined to avoid resubmission

* - Add additional data info for aclk_queue command

* - Use web_clinet_api_request_v1 to handle the incoming request
  This will handle all requests coming from the cloud

* - Cleanup and aclk_query structure
- Add msg_id parameter
- Enable the incoming JSON request
- Enable the outgoing JSON response

* - Added new thread to handle query processing
- Add lock and cond wait to wakeup thread when queries are submitted
- Cleanup on the main init function

* - Add wait time on agent init, to allow for chart, alarms and other definitions to be completed.
- During the wait time, no queries will be queued

* - Send metadata on query thread init
- New generic create header function for the JSON response
- Pack info and charts into one message
- Modified chart to remove entries (test)
- Modified charts mod to remove entries  e.g alarms and volatile info
- Change input to aclk_update_chart (RRDHOST / instead of hostname)

* - When a request fails, add to the payload
  - We may need to handle in a different key
- Error check in json parsing

* - Add dummy aclk_update_alarm command

* - Move incoming request JSON parsing code away from mqtt.c
- Added #ifdef ACLK_ENABLE so that we can have code merged but disabled by default
- Added version in incoming and outgoing JSON dict

* - Disable code if ACLK_ENABLE is not defined
- Remove references to the mqtt (mosquitto) lib
- Add dummy stubs in mqtt.c for completeness if ACLK_ENABLE is not defined

* - Disable challenge sample code for now

* - Remove libmosquitto from makefile

* - Fix spaces in Makefile.am
- Remove ifdef to avoid warning from LGTM

* - Remove for now the code that builds an along log test message to send to the cloud

* - Add check for ACLK_ENABLE definition and avoid calling the chart update functions

* - Remove commented code

* - Move source files to the correct place (ACLK_PLUGIN_FILES)

* - Remove include file thats not needed

* - Remove include file thats not needed
- Add improved checks for load_claiming_state()

* - Fix error message. Used error() that also logs errno and message

* - Fix some codacy issues

* - Fix more codacy issues, code cleanup

* - Revert code to address codacy warnings

* - Revert spaces added in a previous commit by mistake

* clean up if/else nest

* print error if fopen fails

* minor - error already logs errno

* - Fix version formatting

* - Cleanup all ACLK related compiler warnings
- Re-arrange include files
- Removed unused defines

* - More compilation warnings fixed
- Bug with thread creation fixed

* - Add condition to skip compilation of the ACLK code entirely. Add env variable ACLK="yes" to enable

* - Add condition to skip the libmosquitto

* - Change feature flag from ACLK_ENABLE to ENABLE_ACLK in accordance with the rest of ENABLE_xx flags
- Typo in info message fix

Co-authored-by: Andrew Moss <1043609+amoss@users.noreply.github.com>
Co-authored-by: Timo <6674623+underhood@users.noreply.github.com>
Stelios Fragkakis 5 лет назад
Родитель
Сommit
b2b3c18254
10 измененных файлов с 1523 добавлено и 18 удалено
  1. 10 2
      CMakeLists.txt
  2. 25 0
      Makefile.am
  3. 18 0
      aclk/Makefile.am
  4. 1 0
      aclk/README.md
  5. 995 0
      aclk/agent_cloud_link.c
  6. 103 0
      aclk/agent_cloud_link.h
  7. 318 0
      aclk/mqtt.c
  8. 21 0
      aclk/mqtt.h
  9. 31 15
      claim/claim.c
  10. 1 1
      claim/claim.h

+ 10 - 2
CMakeLists.txt

@@ -339,8 +339,7 @@ set(LIBNETDATA_FILES
 add_library(libnetdata OBJECT ${LIBNETDATA_FILES})
 
 set(APPS_PLUGIN_FILES
-        collectors/apps.plugin/apps_plugin.c
-        )
+        collectors/apps.plugin/apps_plugin.c)
 
 set(CHECKS_PLUGIN_FILES
         collectors/checks.plugin/plugin_checks.c
@@ -606,6 +605,14 @@ set(CLAIM_PLUGIN_FILES
         claim/claim.h
         )
 
+set(ACLK_PLUGIN_FILES
+        aclk/agent_cloud_link.c
+        aclk/agent_cloud_link.h
+        aclk/agent_cloud_link.c
+        aclk/mqtt.c
+        aclk/mqtt.h
+        )
+
 set(EXPORTING_ENGINE_FILES
         exporting/exporting_engine.c
         exporting/exporting_engine.h
@@ -673,6 +680,7 @@ set(NETDATA_FILES
         ${STREAMING_PLUGIN_FILES}
         ${WEB_PLUGIN_FILES}
         ${CLAIM_PLUGIN_FILES}
+        ${ACLK_PLUGIN_FILES}
         )
 
 set(NETDATACLI_FILES

+ 25 - 0
Makefile.am

@@ -100,6 +100,7 @@ SUBDIRS += \
     streaming \
     web \
     claim \
+    aclk \
     $(NULL)
 
 
@@ -108,6 +109,7 @@ AM_CFLAGS = \
     $(OPTIONAL_NFACCT_CFLAGS) \
     $(OPTIONAL_ZLIB_CFLAGS) \
     $(OPTIONAL_UUID_CFLAGS) \
+    $(OPTIONAL_MQTT_CFLAGS) \
     $(OPTIONAL_LIBCAP_LIBS) \
     $(OPTIONAL_IPMIMONITORING_CFLAGS) \
     $(OPTIONAL_CUPS_CFLAGS) \
@@ -456,6 +458,13 @@ CLAIM_PLUGIN_FILES = \
     claim/claim.h \
     $(NULL)
 
+ACLK_PLUGIN_FILES = \
+    aclk/agent_cloud_link.c \
+    aclk/agent_cloud_link.h \
+    aclk/mqtt.c \
+    aclk/mqtt.h \
+    $(NULL)
+
 EXPORTING_ENGINE_FILES = \
     exporting/exporting_engine.c \
     exporting/exporting_engine.h \
@@ -526,6 +535,12 @@ NETDATA_FILES = \
     $(CLAIM_PLUGIN_FILES) \
     $(NULL)
 
+if ENABLE_ACLK
+    NETDATA_FILES += \
+        $(ACLK_PLUGIN_FILES) \
+        $(NULL)
+endif
+
 if FREEBSD
     NETDATA_FILES += \
         $(FREEBSD_PLUGIN_FILES) \
@@ -559,6 +574,7 @@ NETDATA_COMMON_LIBS = \
     $(OPTIONAL_ZLIB_LIBS) \
     $(OPTIONAL_SSL_LIBS) \
     $(OPTIONAL_UUID_LIBS) \
+    $(OPTIONAL_MQTT_LIBS) \
     $(OPTIONAL_UV_LIBS) \
     $(OPTIONAL_LZ4_LIBS) \
     $(OPTIONAL_JUDY_LIBS) \
@@ -575,9 +591,18 @@ NETDATACLI_FILES = \
 
 sbin_PROGRAMS += netdata
 netdata_SOURCES = $(NETDATA_FILES)
+
+if ENABLE_ACLK
 netdata_LDADD = \
+    mosquitto/lib/libmosquitto.a \
     $(NETDATA_COMMON_LIBS) \
     $(NULL)
+else
+netdata_LDADD = \
+    $(NETDATA_COMMON_LIBS) \
+    $(NULL)
+endif
+
 if ENABLE_CXX_LINKER
     netdata_LINK = $(CXXLD) $(CXXFLAGS) $(LDFLAGS) -o $@
 else

+ 18 - 0
aclk/Makefile.am

@@ -0,0 +1,18 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+CLEANFILES = \
+    $(NULL)
+
+include $(top_srcdir)/build/subst.inc
+SUFFIXES = .in
+
+sbin_SCRIPTS = \
+    $(NULL)
+
+dist_noinst_DATA = \
+    README.md \
+    $(NULL)
+

+ 1 - 0
aclk/README.md

@@ -0,0 +1 @@
+This is the agent cloud link (ACLK) information file

+ 995 - 0
aclk/agent_cloud_link.c

@@ -0,0 +1,995 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "libnetdata/libnetdata.h"
+#include "agent_cloud_link.h"
+
+// Read from the config file -- new section [agent_cloud_link]
+// Defaults are supplied
+int aclk_recv_maximum = 0; // default 20
+int aclk_send_maximum = 0; // default 20
+
+int aclk_port = 0;          // default 1883
+char *aclk_hostname = NULL; //default localhost
+int aclk_subscribed = 0;
+
+int aclk_metadata_submitted = 0;
+int waiting_init = 1;
+int cmdpause = 0; // Used to pause query processing
+
+BUFFER *aclk_buffer = NULL;
+char *global_base_topic = NULL;
+
+int cloud_to_agent_parse(JSON_ENTRY *e)
+{
+    struct aclk_request *data = e->callback_data;
+
+    switch(e->type) {
+        case JSON_OBJECT:
+            e->callback_function = cloud_to_agent_parse;
+            break;
+        case JSON_ARRAY:
+            e->callback_function = cloud_to_agent_parse;
+            break;
+        case JSON_STRING:
+            if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) {
+                data->msg_id = strdupz(e->data.string);
+                break;
+            }
+            if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) {
+                data->type_id = strdupz(e->data.string);
+                break;
+            }
+            if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) {
+                data->topic = strdupz(e->data.string);
+                break;
+            }
+            if (!strcmp(e->name, ACLK_JSON_IN_URL)) {
+                data->url = strdupz(e->data.string);
+                break;
+            }
+            break;
+        case JSON_NUMBER:
+            if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) {
+                data->version = atol(e->data.string);
+                break;
+            }
+            break;
+
+        case JSON_BOOLEAN:
+            break;
+
+        case JSON_NULL:
+            break;
+    }
+    return 0;
+}
+
+//char *send_http_request(char *host, char *port, char *url, BUFFER *b)
+//{
+//    struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
+//
+//    buffer_flush(b);
+//    buffer_sprintf(
+//        b,
+//        "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n",
+//        url, host);
+//    int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout);
+//
+//    if (unlikely(sock == -1)) {
+//        error("Handshake failed");
+//        return NULL;
+//    }
+//
+//    SSL_CTX *ctx = security_initialize_openssl_client();
+//    // Certificate chain: not updating the stores - do we need private CA roots?
+//    // Calls to SSL_CTX_load_verify_locations would go here.
+//    SSL *ssl = SSL_new(ctx);
+//    SSL_set_fd(ssl, sock);
+//    int err = SSL_connect(ssl);
+//    SSL_write(ssl, b->buffer, b->len); // Timeout options?
+//    int bytes_read = SSL_read(ssl, b->buffer, b->len);
+//    SSL_shutdown(ssl);
+//    close(sock);
+//}
+
+// Set when we have connection up and running from the connection callback
+int aclk_connection_initialized = 0;
+
+static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
+static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
+
+#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
+#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
+
+#define QUERY_LOCK netdata_mutex_lock(&query_mutex)
+#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
+
+pthread_cond_t  query_cond_wait = PTHREAD_COND_INITIALIZER;
+pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
+
+#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
+#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
+#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
+
+
+struct aclk_query {
+    time_t created;
+    time_t run_after; // Delay run until after this time
+    char *topic;      // Topic to respond to
+    char *data;       // Internal data (NULL if request from the cloud)
+    char *msg_id;     // msg_id generated by the cloud (NULL if internal)
+    char *query;      // The actual query
+    u_char deleted;     // Mark deleted for garbage collect
+    struct aclk_query *next;
+};
+
+struct aclk_query_queue {
+    struct aclk_query *aclk_query_head;
+    struct aclk_query *aclk_query_tail;
+    u_int64_t count;
+} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
+
+/*
+ * Free a query structure when done
+ */
+
+void aclk_query_free(struct aclk_query *this_query)
+{
+    if (unlikely(!this_query))
+        return;
+
+    freez(this_query->topic);
+    freez(this_query->query);
+    if (this_query->data)
+        freez(this_query->data);
+    if (this_query->msg_id)
+        freez(this_query->msg_id);
+    freez(this_query);
+    return;
+}
+
+// Returns the entry after which we need to create a new entry to run at the specified time
+// If NULL is returned we need to add to HEAD
+// Called with locked entries
+
+struct aclk_query *aclk_query_find_position(time_t time_to_run)
+{
+    struct aclk_query *tmp_query, *last_query;
+
+    last_query = NULL;
+    tmp_query = aclk_queue.aclk_query_head;
+
+    while (tmp_query) {
+        if (tmp_query->run_after > time_to_run)
+            return last_query;
+        last_query = tmp_query;
+        tmp_query = tmp_query->next;
+    }
+    return last_query;
+}
+
+// Need to have a lock before calling this
+struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query)
+{
+    struct aclk_query *tmp_query;
+
+    tmp_query = aclk_queue.aclk_query_head;
+
+    while (tmp_query) {
+        if (likely(!tmp_query->deleted)) {
+            if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) {
+                if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
+                    (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0)))
+                    return tmp_query;
+            }
+        }
+        tmp_query = tmp_query->next;
+    }
+    return NULL;
+}
+
+/*
+ * Add a query to execute, the result will be send to the specified topic
+ */
+
+int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal)
+{
+    struct aclk_query *new_query, *tmp_query;
+
+    // Ignore all commands while we wait for the agent to initialize
+    if (unlikely(waiting_init))
+        return 0;
+
+    run_after = now_realtime_sec() + run_after;
+
+    QUERY_LOCK;
+    tmp_query = aclk_query_find(topic, data, msg_id, query);
+    if (unlikely(tmp_query)) {
+        if (tmp_query->run_after == run_after) {
+            QUERY_UNLOCK;
+            QUERY_THREAD_WAKEUP;
+            return 0;
+        }
+        tmp_query->deleted = 1;
+    }
+
+    new_query = callocz(1, sizeof(struct aclk_query));
+    if (internal) {
+        new_query->topic = strdupz(topic);
+        new_query->query = strdupz(query);
+    } else {
+        new_query->topic = topic;
+        new_query->query = query;
+        new_query->msg_id = msg_id;
+    }
+
+    if (data)
+        new_query->data = strdupz(data);
+
+    new_query->next = NULL;
+    new_query->created = now_realtime_sec();
+    new_query->run_after = run_after;
+
+    info("Added query (%s) (%s)", topic, query);
+
+    tmp_query = aclk_query_find_position(run_after);
+
+    if (tmp_query) {
+        new_query->next = tmp_query->next;
+        tmp_query->next = new_query;
+        if (tmp_query == aclk_queue.aclk_query_tail)
+            aclk_queue.aclk_query_tail = new_query;
+        aclk_queue.count++;
+        QUERY_UNLOCK;
+        QUERY_THREAD_WAKEUP;
+        return 0;
+    }
+
+    new_query->next = aclk_queue.aclk_query_head;
+    aclk_queue.aclk_query_head = new_query;
+    aclk_queue.count++;
+
+    QUERY_UNLOCK;
+    QUERY_THREAD_WAKEUP;
+    return 0;
+
+//    if (likely(aclk_queue.aclk_query_tail)) {
+//        aclk_queue.aclk_query_tail->next = new_query;
+//        aclk_queue.aclk_query_tail = new_query;
+//        aclk_queue.count++;
+//        QUERY_UNLOCK;
+//        return 0;
+//    }
+//
+//    if (likely(!aclk_queue.aclk_query_head)) {
+//        aclk_queue.aclk_query_head = new_query;
+//        aclk_queue.aclk_query_tail = new_query;
+//        aclk_queue.count++;
+//        QUERY_UNLOCK;
+//        return 0;
+//    }
+//    QUERY_UNLOCK;
+//    return 0;
+}
+
+inline int aclk_submit_request(struct aclk_request *request)
+{
+    return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0);
+}
+
+/*
+ * Get the next query to process - NULL if nothing there
+ * The caller needs to free memory by calling aclk_query_free()
+ *
+ *      topic
+ *      query
+ *      The structure itself
+ *
+ */
+struct aclk_query *aclk_queue_pop()
+{
+    struct aclk_query *this_query;
+
+    QUERY_LOCK;
+
+    if (likely(!aclk_queue.aclk_query_head)) {
+        QUERY_UNLOCK;
+        return NULL;
+    }
+
+    this_query = aclk_queue.aclk_query_head;
+
+    if (this_query->run_after > now_realtime_sec()) {
+        info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
+        QUERY_UNLOCK;
+        return NULL;
+    }
+
+    aclk_queue.count--;
+    aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
+
+    if (likely(!aclk_queue.aclk_query_head)) {
+        aclk_queue.aclk_query_tail = NULL;
+    }
+
+    QUERY_UNLOCK;
+    return this_query;
+}
+
+// This will give the base topic that the agent will publish messages.
+// subtopics will be sent under the base topic e.g.  base_topic/subtopic
+// This is called by aclk_init(), to compute the base topic once and have
+// it stored internally.
+// Need to check if additional logic should be added to make sure that there
+// is enough information to determine the base topic at init time
+
+// TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch
+// that on the fly
+
+char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action)
+{
+    static char *topic = NULL;
+
+    if (unlikely(!is_agent_claimed()))
+        return NULL;
+
+    ACLK_LOCK;
+
+    if (unlikely(action == PUBLICH_TOPIC_FREE)) {
+        if (likely(topic)) {
+            freez(topic);
+            topic = NULL;
+        }
+
+        ACLK_UNLOCK;
+
+        return NULL;
+    }
+
+    if (unlikely(action == PUBLICH_TOPIC_REBUILD)) {
+        ACLK_UNLOCK;
+        get_publish_base_topic(PUBLICH_TOPIC_FREE);
+        return get_publish_base_topic(PUBLICH_TOPIC_GET);
+    }
+
+    if (unlikely(!topic)) {
+        char tmp_topic[ACLK_MAX_TOPIC + 1];
+
+        sprintf(tmp_topic, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
+        topic = strdupz(tmp_topic);
+    }
+
+    ACLK_UNLOCK;
+    return topic;
+}
+
+char *get_topic(char *sub_topic, char *final_topic, int max_size)
+{
+    if (unlikely(!global_base_topic))
+        global_base_topic = GET_PUBLISH_BASE_TOPIC;
+
+    if (unlikely(!global_base_topic))
+        return sub_topic;
+
+    snprintfz(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
+
+    return final_topic;
+}
+
+// Wait for ACLK connection to be established
+int aclk_wait_for_initialization()
+{
+    if (unlikely(!aclk_connection_initialized)) {
+        time_t now = now_realtime_sec();
+
+        while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) {
+            sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT);
+            _link_event_loop(0);
+        }
+
+        if (unlikely(!aclk_connection_initialized)) {
+            error("ACLK connection cannot be established");
+            return 1;
+        }
+    }
+    return 0;
+}
+
+/*
+ * This function will fetch the next pending command and process it
+ *
+ */
+int aclk_process_query()
+{
+    struct aclk_query *this_query;
+    static u_int64_t query_count = 0;
+    //int rc;
+
+    if (unlikely(cmdpause))
+        return 0;
+
+    if (!aclk_connection_initialized)
+        return 0;
+
+    this_query = aclk_queue_pop();
+    if (likely(!this_query)) {
+        //info("No pending queries");
+        return 0;
+    }
+
+    if (unlikely(this_query->deleted)) {
+        info("Garbage collect query %s:%s", this_query->topic, this_query->query);
+        aclk_query_free(this_query);
+        return 1;
+    }
+
+    query_count++;
+    info(
+        "Query #%d (%s) (%s) in queue %d seconds", (int) query_count, this_query->topic, this_query->query,
+        (int) (now_realtime_sec() - this_query->created));
+
+    if (strncmp((char *)this_query->query, "/api/v1/", 8) == 0) {
+        struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
+        w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+        strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
+        w->cookie1[0] = 0;      // Simulate web_client_create_on_fd()
+        w->cookie2[0] = 0;      // Simulate web_client_create_on_fd()
+        w->acl = 0x1f;
+
+        char *mysep = strchr(this_query->query, '?');
+        if (mysep) {
+            strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
+            *mysep = '\0';
+        } else
+            strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
+
+        mysep = strrchr(this_query->query, '/');
+
+        // TODO: ignore return code for now
+        web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
+
+        //TODO: handle bad response perhaps in a different way. For now it does to the payload
+        //if (rc == HTTP_RESP_OK || 1) {
+            buffer_flush(aclk_buffer);
+
+            aclk_create_metadata_message(aclk_buffer, mysep ? mysep + 1 : "noop", this_query->msg_id, w->response.data);
+            aclk_buffer->contenttype = CT_APPLICATION_JSON;
+            aclk_send_message(this_query->topic, aclk_buffer->buffer);
+        //} else
+        //   error("Query RESP: %s", w->response.data->buffer);
+
+        buffer_free(w->response.data);
+        freez(w);
+        aclk_query_free(this_query);
+        return 1;
+    }
+
+    if (strcmp((char *)this_query->topic, "_chart") == 0) {
+        aclk_send_single_chart(this_query->data, this_query->query);
+    }
+
+    aclk_query_free(this_query);
+
+    return 1;
+}
+
+// Launch a query processing thread
+
+/*
+ * Process all pending queries
+ * Return 0 if no queries were processed, 1 otherwise
+ *
+ */
+
+int aclk_process_queries()
+{
+    if (unlikely(cmdpause))
+        return 0;
+
+    // Return if no queries pending
+    if (likely(!aclk_queue.count))
+        return 0;
+
+    info("Processing %d queries", (int ) aclk_queue.count);
+
+    while (aclk_process_query()) {
+        //rc = _link_event_loop(0);
+    };
+
+    return 1;
+}
+
+static void aclk_query_thread_cleanup(void *ptr)
+{
+    struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+    static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+    info("cleaning up...");
+
+    static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+/**
+ * MAin query processing thread
+ *
+ */
+void *aclk_query_main_thread(void *ptr)
+{
+    netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
+
+    while (!netdata_exit) {
+
+        QUERY_THREAD_LOCK;
+
+        if (unlikely(!aclk_metadata_submitted)) {
+            aclk_send_metadata();
+            aclk_metadata_submitted = 1;
+        }
+
+        if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
+            sleep_usec(USEC_PER_SEC * 1);
+
+        if (likely(aclk_connection_initialized && !netdata_exit)) {
+            while (aclk_process_queries()) {
+                // Sleep for a few ms and retry maybe we have something to process
+                // before going to sleep
+                // TODO: This needs improvement to avoid missed queries
+                sleep_usec(USEC_PER_MS * 100);
+            }
+        }
+
+        QUERY_THREAD_UNLOCK;
+
+    } // forever
+    info("Shutting down query processing thread");
+    netdata_thread_cleanup_pop(1);
+    return NULL;
+}
+
+// Thread cleanup
+static void aclk_main_cleanup(void *ptr)
+{
+    struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+    static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
+    info("cleaning up...");
+
+    QUERY_THREAD_WAKEUP;
+
+    static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+}
+
+/**
+ * Main agent cloud link thread
+ *
+ * This thread will simply call the main event loop that handles
+ * pending requests - both inbound and outbound
+ *
+ * @param ptr is a pointer to the netdata_static_thread structure.
+ *
+ * @return It always returns NULL
+ */
+void *aclk_main(void *ptr)
+{
+    //netdata_thread_t *query_thread;
+    struct netdata_static_thread query_thread;
+
+    memset(&query_thread, 0, sizeof(query_thread));
+
+    netdata_thread_cleanup_push(aclk_main_cleanup, ptr);
+
+    if (unlikely(!aclk_buffer))
+        aclk_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+
+    assert(aclk_buffer != NULL);
+
+    //netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
+    //netdata_thread_create(&query_thread.thread , "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
+    info("Waiting for netdata to be ready");
+    while (!netdata_ready) {
+        sleep_usec(USEC_PER_MS * 300);
+    }
+    info("Waiting %d seconds for the agent to initialize", ACLK_STARTUP_WAIT);
+    sleep_usec(USEC_PER_SEC * ACLK_STARTUP_WAIT);
+
+    // Ok mark we are ready to accept incoming requests
+    waiting_init = 0;
+
+    while (!netdata_exit) {
+        // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds
+        // TODO: Handle the unclaim command as well -- we may need to shutdown the connection
+        if (likely(!is_agent_claimed())) {
+            sleep_usec(USEC_PER_SEC * 60);
+            info("Checking agent claiming status");
+            continue;
+        }
+
+        if (unlikely(!aclk_connection_initialized)) {
+            static int initializing = 0;
+
+            if (likely(initializing)) {
+                _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
+                continue;
+            }
+            initializing = 1;
+            info("Initializing connection");
+            //send_http_request(aclk_hostname, "443", "/auth/challenge?id=blah", aclk_buffer);
+            if (unlikely(aclk_init(ACLK_INIT))) {
+                // TODO: TBD how to handle. We are claimed and we cant init the connection. For now keep trying.
+                sleep_usec(USEC_PER_SEC * 60);
+                continue;
+            } else {
+                sleep_usec(USEC_PER_SEC * 1);
+            }
+            _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
+            continue;
+        }
+
+        if (unlikely(!aclk_subscribed)) {
+            aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2);
+        }
+        if (unlikely(!query_thread.thread)) {
+            query_thread.thread = mallocz(sizeof(netdata_thread_t));
+            netdata_thread_create(
+                query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
+        }
+
+        //TODO: Check if there is a return code
+        _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
+
+    } // forever
+    aclk_shutdown();
+
+    netdata_thread_cleanup_pop(1);
+    return NULL;
+}
+
+/*
+ * Send a message to the cloud, using a base topic and sib_topic
+ * The final topic will be in the form <base_topic>/<sub_topic>
+ * If base_topic is missing then the global_base_topic will be used (if available)
+ *
+ */
+int aclk_send_message(char *sub_topic, char *message)
+{
+    int rc;
+    static int skip_due_to_shutdown = 0;
+    char topic[ACLK_MAX_TOPIC + 1];
+    char *final_topic;
+
+    if (!aclk_connection_initialized)
+        return 0;
+
+    if (unlikely(netdata_exit)) {
+        if (unlikely(!aclk_connection_initialized))
+            return 1;
+
+        ++skip_due_to_shutdown;
+        if (unlikely(!(skip_due_to_shutdown % 100)))
+            info("%d messages not sent -- shutdown in progress", skip_due_to_shutdown);
+        return 1;
+    }
+
+    if (unlikely(!message))
+        return 0;
+
+    if (unlikely(aclk_wait_for_initialization()))
+        return 1;
+
+    final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
+
+    ACLK_LOCK;
+    rc = _link_send_message(final_topic, message);
+    ACLK_UNLOCK;
+
+    // TODO: Add better handling -- error will flood the logfile here
+    if (unlikely(rc))
+        error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
+
+    return rc;
+}
+
+/*
+ * Subscribe to a topic in the cloud
+ * The final subscription will be in the form
+ * /agent/claim_id/<sub_topic>
+ */
+int aclk_subscribe(char *sub_topic, int qos)
+{
+    int rc;
+    //static char *global_base_topic = NULL;
+    char topic[ACLK_MAX_TOPIC + 1];
+    char *final_topic;
+
+    if (!aclk_connection_initialized)
+        return 0;
+
+    if (unlikely(netdata_exit)) {
+        return 1;
+    }
+
+    if (unlikely(aclk_wait_for_initialization()))
+        return 1;
+
+    final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
+
+    ACLK_LOCK;
+    rc = _link_subscribe(final_topic, qos);
+    ACLK_UNLOCK;
+
+    // TODO: Add better handling -- error will flood the logfile here
+    if (unlikely(rc))
+        error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
+
+    return rc;
+}
+
+// This is called from a callback when the link goes up
+void aclk_connect(void *ptr)
+{
+    (void) ptr;
+    info("Connection detected");
+    return;
+}
+
+// This is called from a callback when the link goes down
+void aclk_disconnect(void *ptr)
+{
+    (void) ptr;
+    info("Disconnect detected");
+    aclk_subscribed = 0;
+    aclk_metadata_submitted = 0;
+}
+
+void aclk_shutdown()
+{
+    info("Shutdown initiated");
+    aclk_connection_initialized = 0;
+    _link_shutdown();
+    info("Shutdown complete");
+}
+
+int aclk_init(ACLK_INIT_ACTION action)
+{
+    (void) action;
+
+    static int init = 0;
+    int rc;
+
+    if (likely(init))
+        return 0;
+
+    aclk_send_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link send maximum", 20);
+    aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20);
+
+    aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost");
+    aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 1883);
+
+    info("Maximum parallel outgoing messages %d", aclk_send_maximum);
+    info("Maximum parallel incoming messages %d", aclk_recv_maximum);
+
+    // This will setup the base publish topic internally
+    //get_publish_base_topic(PUBLICH_TOPIC_GET);
+
+    // initialize the low level link to the cloud
+    rc = _link_lib_init(aclk_hostname, aclk_port, aclk_connect, aclk_disconnect);
+    if (unlikely(rc)) {
+        error("Failed to initialize the agent cloud link library");
+        return 1;
+    }
+    global_base_topic = GET_PUBLISH_BASE_TOPIC;
+    init = 1;
+
+    return 0;
+}
+
+// Use this to disable encoding of quotes and newlines so that
+// MQTT subscriber can display more readable data on screen
+
+void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
+{
+    uuid_t uuid;
+    char uuid_str[36 + 1];
+
+    if (unlikely(!msg_id)) {
+        uuid_generate(uuid);
+        uuid_unparse(uuid, uuid_str);
+        msg_id = uuid_str;
+    }
+
+    buffer_sprintf(
+        dest,
+        "\t{\"type\": \"%s\",\n"
+        "\t\"msg-id\": \"%s\",\n"
+        "\t\"version\": %s,\n"
+        "\t\"payload\": ",
+        type, msg_id, ACLK_VERSION);
+}
+
+#define EYE_FRIENDLY 1
+
+// encapsulate contents into metadata message as per ACLK documentation
+void aclk_create_metadata_message(BUFFER *dest, char *type, char *msg_id, BUFFER *contents)
+{
+#ifndef EYE_FRIENDLY
+    char *tmp_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
+    char *src, *dst;
+#endif
+
+    buffer_sprintf(
+        dest,
+        "\t{\"type\": \"%s\",\n"
+        "\t\"msg-id\": \"%s\",\n"
+        "\t\"payload\": %s\n\t}",
+        type, msg_id ? msg_id : "", contents->buffer);
+
+#ifndef EYE_FRIENDLY
+    //TODO: this is the initial escaping, It will expanded
+    src = dest->buffer;
+    dst = tmp_buffer;
+    while (*src) {
+        switch (*src) {
+            case '0x0a':
+            case '\n':
+                *dst++ = '\\';
+                *dst++ = 'n';
+                break;
+            case '\"':
+                *dst++ = '\\';
+                *dst++ = '\"';
+                break;
+            case '\'':
+                *dst++ = '\\';
+                *dst++ = '\"';
+                break;
+            default:
+                *dst++ = *src;
+        }
+        src++;
+    }
+    *dst = '\0';
+
+    buffer_flush(dest);
+    buffer_sprintf(dest, "%s", tmp_buffer);
+
+    freez(tmp_buffer);
+#endif
+    return;
+}
+
+//TODO: this has been changed in the latest specs. We need to pack the data in one MQTT
+//message with a payload and has a list of json objects
+int aclk_send_alarm_metadata()
+{
+    //TODO: improve locking on the buffer -- same lock is used for the message send
+    //improve error handling
+    ACLK_LOCK;
+    buffer_flush(aclk_buffer);
+    // Alarms configuration
+    aclk_create_header(aclk_buffer, "alarms", NULL);
+    health_alarms2json(localhost, aclk_buffer, 1);
+    buffer_sprintf(aclk_buffer,"\n}");
+    ACLK_UNLOCK;
+    aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer);
+
+    // Alarms log
+    ACLK_LOCK;
+    buffer_flush(aclk_buffer);
+    aclk_create_header(aclk_buffer, "alarms_log", NULL);
+    health_alarm_log2json(localhost, aclk_buffer, 0);
+    buffer_sprintf(aclk_buffer,"\n}");
+    ACLK_UNLOCK;
+    aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer);
+
+    return 0;
+}
+
+
+// Send info metadata message to the cloud if the link is established
+// or on request
+int aclk_send_metadata()
+{
+    ACLK_LOCK;
+
+    buffer_flush(aclk_buffer);
+
+    aclk_create_header(aclk_buffer, "connect", NULL);
+    buffer_sprintf(aclk_buffer,"{\n\t \"info\" : ");
+    web_client_api_request_v1_info_fill_buffer(localhost, aclk_buffer);
+    buffer_sprintf(aclk_buffer,", \n\t \"charts\" : ");
+    charts2json(localhost, aclk_buffer);
+    buffer_sprintf(aclk_buffer,"\n}\n}");
+    aclk_buffer->contenttype = CT_APPLICATION_JSON;
+
+    ACLK_UNLOCK;
+
+    aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer);
+
+    aclk_send_alarm_metadata();
+
+    return 0;
+}
+
+//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
+
+int aclk_send_single_chart(char *hostname, char *chart)
+{
+    RRDHOST *target_host;
+    ACLK_LOCK;
+
+    buffer_flush(aclk_buffer);
+
+    target_host = rrdhost_find_by_hostname(hostname, 0);
+    if (!target_host)
+        return 1;
+
+    RRDSET *st = rrdset_find(target_host, chart);
+
+    if (!st)
+        st = rrdset_find_byname(target_host, chart);
+
+    if (!st) {
+        info("FAILED to find chart %s", chart);
+        return 1;
+    }
+
+    aclk_buffer->contenttype = CT_APPLICATION_JSON;
+
+    buffer_flush(aclk_buffer);
+
+    aclk_create_header(aclk_buffer, "chart", NULL);
+
+    rrdset2json(st, aclk_buffer, NULL, NULL);
+    buffer_sprintf(aclk_buffer,"\n}\n}");
+
+
+    ACLK_UNLOCK;
+    aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer);
+    return 0;
+}
+
+int    aclk_update_chart(RRDHOST *host, char *chart_name)
+{
+    (void) host;
+    (void) chart_name;
+#ifndef ENABLE_ACLK
+    return 0;
+#else
+    if (host != localhost)
+        return 0;
+
+    aclk_queue_query("_chart", host->hostname, NULL, chart_name, 2, 1);
+    return 0;
+#endif
+}
+
+int    aclk_update_alarm(RRDHOST *host, char *alarm_name)
+{
+    if (host != localhost)
+        return 0;
+
+    aclk_queue_query("_alarm", host->hostname, NULL, alarm_name, 2, 1);
+    return 0;
+}
+
+
+//TODO: add and check the incoming type e.g http
+int aclk_handle_cloud_request(char *payload)
+{
+    struct aclk_request cloud_to_agent = { .msg_id = NULL, .topic = NULL, .url = NULL, .version = 1};
+
+    int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
+
+    if (unlikely(JSON_OK != rc)) {
+        error("Malformed json request (%s)", payload);
+        return 1;
+    }
+
+    if (unlikely(!cloud_to_agent.url || !cloud_to_agent.topic)) {
+        return 1;
+    }
+
+    aclk_submit_request(&cloud_to_agent);
+
+    return 0;
+}

+ 103 - 0
aclk/agent_cloud_link.h

@@ -0,0 +1,103 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_AGENT_CLOUD_LINK_H
+#define NETDATA_AGENT_CLOUD_LINK_H
+
+#include "../daemon/common.h"
+#include "mqtt.h"
+
+#define ACLK_JSON_IN_MSGID "msg-id"
+#define ACLK_JSON_IN_TYPE "type"
+#define ACLK_JSON_IN_VERSION "version"
+#define ACLK_JSON_IN_TOPIC "callback-topic"
+#define ACLK_JSON_IN_URL "payload"
+
+
+#define ACLK_MSG_TYPE_CHART "chart"
+#define ACLK_ALARMS_TOPIC "alarms"
+#define ACLK_METADATA_TOPIC "meta"
+#define ACLK_COMMAND_TOPIC "cmd"
+#define ACLK_TOPIC_STRUCTURE "/agent/%s"
+
+#define ACLK_STARTUP_WAIT 30             // Seconds to wait before establishing initialization process
+#define ACLK_INITIALIZATION_WAIT 60      // Wait for link to initialize in seconds (per msg)
+#define ACLK_INITIALIZATION_SLEEP_WAIT 1 // Wait time @ spin lock for MQTT initialization in seconds
+#define ACLK_QOS 1
+#define ACLK_PING_INTERVAL 60
+#define ACLK_LOOP_TIMEOUT 5        // seconds to wait for operations in the library loop
+
+#define ACLK_MAX_TOPIC  255
+
+#define ACLK_RECONNECT_DELAY 1          // reconnect delay -- with backoff stragegy fow now
+#define ACLK_MAX_RECONNECT_DELAY 120
+#define ACLK_VERSION "1"
+
+#define CONFIG_SECTION_ACLK "agent_cloud_link"
+
+struct aclk_request {
+    char    *type_id;
+    char    *msg_id;
+    char    *topic;
+    char    *url;
+    int     version;
+};
+
+
+typedef enum publish_topic_action {
+    PUBLICH_TOPIC_GET,
+    PUBLICH_TOPIC_FREE,
+    PUBLICH_TOPIC_REBUILD
+} PUBLISH_TOPIC_ACTION;
+
+typedef enum aclk_init_action {
+    ACLK_INIT,
+    ACLK_REINIT
+} ACLK_INIT_ACTION;
+
+
+#define GET_PUBLISH_BASE_TOPIC get_publish_base_topic(0)
+#define FREE_PUBLISH_BASE_TOPIC get_publish_base_topic(1)
+#define REBUILD_PUBLISH_BASE_TOPIC get_publish_base_topic(2)
+
+void *aclk_main(void *ptr);
+
+#define NETDATA_ACLK_HOOK \
+    { \
+        .name = "AgentCloudLink", \
+        .config_section = NULL, \
+        .config_name = NULL, \
+        .enabled = 1, \
+        .thread = NULL, \
+        .init_routine = NULL, \
+        .start_routine = aclk_main \
+    },
+
+extern int aclk_send_message(char *sub_topic, char *message);
+
+int     aclk_init();
+char    *get_base_topic();
+
+extern char *is_agent_claimed(void);
+
+// callbacks for agent cloud link
+int aclk_subscribe(char  *topic, int qos);
+void aclk_shutdown();
+//void aclk_message_callback(struct mosquitto *moqs, void *obj, const struct mosquitto_message *msg);
+
+int cloud_to_agent_parse(JSON_ENTRY *e);
+void aclk_disconnect(void *conn);
+void aclk_connect(void *conn);
+void aclk_create_metadata_message(BUFFER *dest, char *type, char *msg_id, BUFFER *contents);
+int aclk_send_metadata();
+int aclk_wait_for_initialization();
+//int aclk_send_charts(RRDHOST *host, BUFFER *wb);
+int aclk_send_single_chart(char *host, char *chart);
+int aclk_queue_query(char *token, char *data, char *msg_type, char *query, int run_after, int internal);
+struct aclk_query  *aclk_query_find(char *token, char *data, char *msg_id, char *query);
+//void aclk_rrdset2json(RRDSET *st, BUFFER *wb, char *hostname, int is_slave);
+int aclk_update_chart(RRDHOST *host, char *chart_name);
+int aclk_update_alarm(RRDHOST *host, char *alarm_name);
+void aclk_create_header(BUFFER *dest, char *type, char *msg_id);
+int aclk_handle_cloud_request(char *payload);
+int aclk_submit_request(struct aclk_request *);
+#endif //NETDATA_AGENT_CLOUD_LINK_H

+ 318 - 0
aclk/mqtt.c

@@ -0,0 +1,318 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <libnetdata/json/json.h>
+#include "../daemon/common.h"
+#include "mqtt.h"
+
+void (*_on_connect)(void *ptr) = NULL;
+void (*_on_disconnect)(void *ptr) = NULL;
+extern int cmdpause;
+
+
+#ifndef ENABLE_ACLK
+
+inline const char *_link_strerror(int rc)
+{
+    (void) rc;
+    return "no error";
+}
+
+int _link_event_loop(int timeout)
+{
+    (void) timeout;
+    return 0;
+}
+
+int _link_send_message(char *topic, char *message)
+{
+    (void) topic;
+    (void) message;
+    return 0;
+}
+
+int _link_subscribe(char  *topic, int qos)
+{
+    (void) topic;
+    (void) qos;
+    return 0;
+}
+
+void _link_shutdown()
+{
+    return;
+}
+
+int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *))
+{
+    (void) aclk_hostname;
+    (void) aclk_port;
+    (void) on_connect;
+    (void) on_disconnect;
+    return 0;
+}
+
+#else
+/*
+ * Just report the library info in the logfile for reference when issues arise
+ *
+ */
+
+struct mosquitto *mosq = NULL;
+
+// Get a string description of the error
+
+inline const char *_link_strerror(int rc)
+{
+    return mosquitto_strerror(rc);
+}
+
+
+void mqtt_message_callback(
+    struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
+{
+    (void) mosq;
+    (void) obj;
+
+    // TODO: handle commands in a more efficient way, if we have many
+
+    if (strcmp((char *)msg->payload, "pause") == 0) {
+        cmdpause = 1;
+        return;
+    }
+
+    if (strcmp((char *)msg->payload, "resume") == 0) {
+        cmdpause = 0;
+        return;
+    }
+
+    if (strcmp((char *)msg->payload, "reload") == 0) {
+        error_log_limit_unlimited();
+        info("Reloading health configuration");
+        health_reload();
+        error_log_limit_reset();
+        return;
+    }
+
+    if (strcmp((char *)msg->payload, "info") == 0) {
+        aclk_send_metadata();
+        return;
+    }
+
+    aclk_handle_cloud_request(msg->payload);
+
+    //info("Received type=[%s], msg-id=[%s], topic=[%s], url=[%s]",cloud_to_agent.type_id, cloud_to_agent.msg_id, cloud_to_agent.topic, cloud_to_agent.url);
+
+}
+
+void connect_callback(struct mosquitto *mosq, void *obj, int rc)
+{
+    (void) obj;
+    (void) rc;
+
+    info("Connection to cloud estabilished");
+
+    aclk_connection_initialized = 1;
+    _on_connect((void *) mosq);
+
+    return;
+}
+
+
+void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
+{
+    (void) obj;
+    (void) rc;
+
+    info("Connection to cloud failed");
+    // TODO: Keep the connection "alive" for now. The library will reconnect.
+
+    //mqtt_connection_initialized = 0;
+    _on_disconnect((void *) mosq);
+    //sleep_usec(USEC_PER_SEC * 5);
+    return;
+}
+
+
+void _show_mqtt_info()
+{
+    int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
+    libmosq_version =  mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
+
+    info("Detected libmosquitto library version %d, %d.%d.%d",libmosq_version, libmosq_major, libmosq_minor, libmosq_revision);
+}
+
+int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *))
+{
+    int rc;
+    int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
+    char *ca_crt;
+    char *server_crt;
+    char *server_key;
+
+    // show library info so can have in in the logfile
+    libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
+    ca_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link cert", "*");
+    server_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link server cert", "*");
+    server_key = config_get(CONFIG_SECTION_ACLK, "agent cloud link server key", "*");
+
+
+    if (ca_crt[0] == '*') {
+        freez(ca_crt);
+        ca_crt = NULL;
+    }
+
+    if (server_crt[0] == '*') {
+        freez(server_crt);
+        server_crt = NULL;
+    }
+
+    if (server_key[0] == '*') {
+        freez(server_key);
+        server_key = NULL;
+    }
+
+    info(
+        "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor,
+        libmosq_revision);
+
+    rc = mosquitto_lib_init();
+    if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+        error("Failed to initialize MQTT (libmosquitto library)");
+        return 1;
+    }
+
+    mosq = mosquitto_new("anon", true, NULL);
+    if (unlikely(!mosq)) {
+        mosquitto_lib_cleanup();
+        error("MQTT new structure  -- %s", mosquitto_strerror(errno));
+        return 1;
+    }
+
+    _on_connect = on_connect;
+    _on_disconnect = on_disconnect;
+
+    mosquitto_connect_callback_set(mosq, connect_callback);
+    mosquitto_disconnect_callback_set(mosq, disconnect_callback);
+
+    mosquitto_username_pw_set(mosq, "anon", "anon");
+
+    rc = mosquitto_threaded_set(mosq, 1);
+    if (unlikely(rc != MOSQ_ERR_SUCCESS))
+        error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc));
+
+#if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000
+    rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0);
+    if (unlikely(rc != MOSQ_ERR_SUCCESS))
+        error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc));
+
+    rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1);
+    info("MQTT in flight messages set to 1  -- %s", mosquitto_strerror(rc));
+#endif
+
+    rc = mosquitto_reconnect_delay_set(mosq, ACLK_RECONNECT_DELAY, ACLK_MAX_RECONNECT_DELAY, 1);
+
+    if (unlikely(rc != MOSQ_ERR_SUCCESS))
+        error("Failed to set the reconnect delay (%d) (%s)", rc, mosquitto_strerror(rc));
+
+    mosquitto_tls_set(mosq, ca_crt, NULL, server_crt, server_key, NULL);
+
+    rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL);
+
+    if (unlikely(rc != MOSQ_ERR_SUCCESS))
+        error("Connect %s MQTT status = %d (%s)", aclk_hostname, rc, mosquitto_strerror(rc));
+    else
+        info("Establishing MQTT link to %s", aclk_hostname);
+
+    return rc;
+}
+
+int _link_event_loop(int timeout)
+{
+    int rc;
+
+    rc = mosquitto_loop(mosq, timeout, 1);
+
+    if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+        errno = 0;
+        error("Loop error code %d (%s)", rc, mosquitto_strerror(rc));
+        rc = mosquitto_reconnect(mosq);
+        if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+            error("Reconnect loop error code %d (%s)", rc, mosquitto_strerror(rc));
+        }
+        // TBD: Using delay
+        sleep_usec(USEC_PER_SEC * 10);
+    }
+    return rc;
+}
+
+void _link_shutdown()
+{
+    int rc;
+
+    rc = mosquitto_disconnect(mosq);
+    switch (rc) {
+        case MOSQ_ERR_SUCCESS:
+            info("MQTT disconnected from broker");
+            break;
+        default:
+            info("MQTT invalid structure");
+            break;
+    };
+
+    mosquitto_destroy(mosq);
+    mosq = NULL;
+    return;
+}
+
+
+int _link_subscribe(char  *topic, int qos)
+{
+    int rc;
+
+    if (unlikely(!mosq))
+        return 1;
+
+    mosquitto_message_callback_set(mosq, mqtt_message_callback);
+
+    rc = mosquitto_subscribe(mosq, NULL, topic, qos);
+    if (unlikely(rc)) {
+        errno = 0;
+        error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc));
+        return 1;
+    }
+
+    return 0;
+}
+
+
+/*
+ * Send a message to the cloud to specific topic
+ *
+ */
+
+int _link_send_message(char *topic, char *message)
+{
+    int rc;
+
+    rc = mosquitto_pub_topic_check(topic);
+
+    if (unlikely(rc != MOSQ_ERR_SUCCESS))
+        return rc;
+
+    int msg_len = strlen(message);
+
+    // TODO: handle encoding validation -- the message should be UFT8 encoded by the sender
+    //rc = mosquitto_validate_utf8(message, msg_len);
+    //if (unlikely(rc != MOSQ_ERR_SUCCESS))
+    //    return rc;
+
+    rc = mosquitto_publish(mosq, NULL, topic, msg_len, message, ACLK_QOS, 0);
+
+    // TODO: Add better handling -- error will flood the logfile here
+    if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
+        error("MQTT message failed : %s", mosquitto_strerror(rc));
+    }
+
+    return rc;
+}
+#endif

+ 21 - 0
aclk/mqtt.h

@@ -0,0 +1,21 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_MQTT_H
+#define NETDATA_MQTT_H
+
+#ifdef ENABLE_ACLK
+#include "mosquitto/lib/mosquitto.h"
+#endif
+
+void _show_mqtt_info();
+int _link_event_loop(int timeout);
+void _link_shutdown();
+int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *));
+int _link_subscribe(char *topic, int qos);
+int _link_send_message(char *topic, char *message);
+const char *_link_strerror(int rc);
+
+extern int aclk_connection_initialized;
+int aclk_handle_cloud_request(char *);
+
+#endif //NETDATA_MQTT_H

+ 31 - 15
claim/claim.c

@@ -21,13 +21,12 @@ static char *claiming_errors[] = {
         "internal server error"                         // 12
 };
 
-#define AGENT_UNCLAIMED 0
-#define AGENT_CLAIMED 1
-static uint8_t claiming_status = AGENT_UNCLAIMED;
 
-uint8_t is_agent_claimed(void)
+static char *claimed_id = NULL;
+
+char *is_agent_claimed(void)
 {
-    return (AGENT_CLAIMED == claiming_status);
+    return claimed_id;
 }
 
 #define CLAIMING_COMMAND_LENGTH 16384
@@ -64,8 +63,7 @@ void claim_agent(char *claiming_arguments)
     exit_code = mypclose(fp, command_pid);
     info("Agent claiming command returned with code %d", exit_code);
     if (0 == exit_code) {
-        claiming_status = AGENT_CLAIMED;
-        info("Agent successfully claimed.");
+        load_claiming_state();
         return;
     }
     if (exit_code < 0) {
@@ -85,19 +83,37 @@ void claim_agent(char *claiming_arguments)
 
 void load_claiming_state(void)
 {
-    info("The claiming feature is under development and still subject to change before the next release");
-    return;
+    if (claimed_id != NULL) {
+        freez(claimed_id);
+        claimed_id = NULL;
+    }
 
     char filename[FILENAME_MAX + 1];
     struct stat statbuf;
 
-    snprintfz(filename, FILENAME_MAX, "%s/claim.d/is_claimed", netdata_configured_user_config_dir);
+    snprintfz(filename, FILENAME_MAX, "%s/claim.d/claimed_id", netdata_configured_user_config_dir);
+
     // check if the file exists
     if (lstat(filename, &statbuf) != 0) {
-        info("File '%s' was not found. Setting state to AGENT_UNCLAIMED.", filename);
-        claiming_status = AGENT_UNCLAIMED;
-   } else {
-        info("File '%s' was found. Setting state to AGENT_CLAIMED.", filename);
-        claiming_status = AGENT_CLAIMED;
+        info("lstat on File '%s' failed reason=\"%s\". Setting state to AGENT_UNCLAIMED.", filename, strerror(errno));
+        return;
     }
+    if (unlikely(statbuf.st_size == 0)) {
+        info("File '%s' has no contents. Setting state to AGENT_UNCLAIMED.", filename);
+        return;
+    }
+
+    FILE *f = fopen(filename, "rt");
+    if (unlikely(f == NULL)) {
+        error("File '%s' cannot be opened. Setting state to AGENT_UNCLAIMED.", filename);
+        return;
+    }
+
+    claimed_id = callocz(1, statbuf.st_size + 1);
+    size_t bytes_read = fread(claimed_id, 1, statbuf.st_size, f);
+    claimed_id[bytes_read] = 0;
+    info("File '%s' was found. Setting state to AGENT_CLAIMED.", filename);
+    fclose(f);
+
+    snprintfz(filename, FILENAME_MAX, "%s/claim.d/private.pem", netdata_configured_user_config_dir);
 }

+ 1 - 1
claim/claim.h

@@ -8,7 +8,7 @@
 extern char *claiming_pending_arguments;
 
 void claim_agent(char *claiming_arguments);
-uint8_t is_agent_claimed(void);
+char *is_agent_claimed(void);
 void load_claiming_state(void);
 
 #endif //NETDATA_CLAIM_H

Некоторые файлы не были показаны из-за большого количества измененных файлов