Просмотр исходного кода

ACLK: Implemented Last Will and Testament (#8410)

* Added support for Last Will and Testament to the ACLK
* On normal agent shutdown an alternate "graceful shutdown" message is published
Stelios Fragkakis 5 лет назад
Родитель
Сommit
2c716dc31e
5 измененных файлов с 103 добавлено и 8 удалено
  1. 11 0
      aclk/aclk_lws_wss_client.c
  2. 48 5
      aclk/agent_cloud_link.c
  3. 1 0
      aclk/agent_cloud_link.h
  4. 40 3
      aclk/mqtt.c
  5. 3 0
      aclk/mqtt.h

+ 11 - 0
aclk/aclk_lws_wss_client.c

@@ -6,6 +6,8 @@
 #include "../daemon/common.h"
 #include "aclk_common.h"
 
+extern int aclk_shutting_down;
+
 static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
 
 struct aclk_lws_wss_perconnect_data {
@@ -320,6 +322,8 @@ static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
             return "LWS_CALLBACK_CLIENT_ESTABLISHED";
         case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
             return "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION";
+        case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
+            return "LWS_CALLBACK_EVENT_WAIT_CANCELLED";
         default:
             // Not using an internal buffer here for thread-safety with unknown calling context.
             error("Unknown LWS callback %u", reason);
@@ -331,6 +335,13 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas
     UNUSED(user);
     struct lws_wss_packet_buffer *data;
     int retval = 0;
+    static int lws_shutting_down = 0;
+
+    if (unlikely(aclk_shutting_down && !lws_shutting_down)) {
+            lws_shutting_down = 1;
+            retval = -1;
+            engine_instance->upstream_reconnect_request = 0;
+    }
 
     // Callback servicing is forced when we are closed from above.
     if (engine_instance->upstream_reconnect_request) {

+ 48 - 5
aclk/agent_cloud_link.c

@@ -5,6 +5,7 @@
 #include "aclk_lws_https_client.h"
 #include "aclk_common.h"
 
+int aclk_shutting_down = 0;
 // State-machine for the on-connect metadata transmission.
 // TODO: The AGENT_STATE should be centralized as it would be useful to control error-logging during the initial
 //       agent startup phase.
@@ -43,6 +44,8 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
 #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
 #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
 
+void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
+
 /*
  * Maintain a list of collectors and chart count
  * If all the charts of a collector are deleted
@@ -936,13 +939,54 @@ void *aclk_query_main_thread(void *ptr)
 // Thread cleanup
 static void aclk_main_cleanup(void *ptr)
 {
+    char payload[512];
     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
     static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
 
     info("cleaning up...");
 
-    // Wakeup thread to cleanup
-    QUERY_THREAD_WAKEUP;
+    if (is_agent_claimed() && aclk_connected) {
+        size_t write_q, write_q_bytes, read_q;
+        time_t event_loop_timeout;
+
+        // Wakeup thread to cleanup
+        QUERY_THREAD_WAKEUP;
+        // Send a graceful disconnect message
+        time_t time_created = now_realtime_sec();
+        char *msg_id = create_uuid();
+
+        snprintfz(
+            payload, 511,
+            "{ \"type\": \"disconnect\","
+            " \"msg-id\": \"%s\","
+            " \"timestamp\": %ld,"
+            " \"version\": %d,"
+            " \"payload\": \"graceful\" }",
+            msg_id, time_created, ACLK_VERSION);
+
+        aclk_send_message(ACLK_METADATA_TOPIC, payload, msg_id);
+        freez(msg_id);
+
+        event_loop_timeout = now_realtime_sec() + 5;
+        write_q = 1;
+        while (write_q && event_loop_timeout > now_realtime_sec()) {
+            _link_event_loop();
+            lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+        }
+
+        aclk_shutting_down = 1;
+        _link_shutdown();
+        aclk_lws_wss_mqtt_layer_disconect_notif();
+
+        write_q = 1;
+        event_loop_timeout = now_realtime_sec() + 5;
+        while (write_q && event_loop_timeout > now_realtime_sec()) {
+            _link_event_loop();
+            lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
+        }
+    }
+
+    info("Disconnected");
 
     static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
 }
@@ -1243,7 +1287,6 @@ static void aclk_try_to_connect(char *hostname, char *port, int port_num)
  *
  * @return It always returns NULL
  */
-void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
 void *aclk_main(void *ptr)
 {
     struct netdata_static_thread *query_thread;
@@ -1295,8 +1338,8 @@ void *aclk_main(void *ptr)
         size_t write_q, write_q_bytes, read_q;
         lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
         //info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu",
-        //     first_init, aclk_connected, aclk_connecting, write_q, write_q_bytes, read_q);
-        if (unlikely(!aclk_connected)) {
+        //   first_init, aclk_connected, aclk_connecting, write_q, write_q_bytes, read_q);
+        if (unlikely(!netdata_exit && !aclk_connected)) {
             if (unlikely(!first_init)) {
                 aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
                 first_init = 1;

+ 1 - 0
aclk/agent_cloud_link.h

@@ -78,6 +78,7 @@ extern int aclk_send_message(char *sub_topic, char *message, char *msg_id);
 //char    *get_base_topic();
 
 extern char *is_agent_claimed(void);
+extern void aclk_lws_wss_mqtt_layer_disconect_notif();
 char *create_uuid();
 
 // callbacks for agent cloud link

+ 40 - 3
aclk/mqtt.c

@@ -131,6 +131,8 @@ static int _mqtt_create_connection(char *username, char *password)
         return MOSQ_ERR_UNKNOWN;
     }
 
+    _link_set_lwt("outbound/meta", 2);
+
     mosquitto_connect_callback_set(mosq, connect_callback);
     mosquitto_disconnect_callback_set(mosq, disconnect_callback);
     mosquitto_publish_callback_set(mosq, publish_callback);
@@ -174,6 +176,10 @@ static inline void _link_mosquitto_write()
 {
     int rc;
 
+    if (unlikely(!mosq)) {
+        return;
+    }
+
     rc = mosquitto_loop_misc(mosq);
     if (unlikely(rc != MOSQ_ERR_SUCCESS))
         debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc));
@@ -234,6 +240,9 @@ void _link_shutdown()
 {
     int rc;
 
+    if (likely(!mosq))
+        return;
+
     rc = mosquitto_disconnect(mosq);
     switch (rc) {
         case MOSQ_ERR_SUCCESS:
@@ -243,11 +252,39 @@ void _link_shutdown()
             info("MQTT invalid structure");
             break;
     };
+}
+
 
-    mosquitto_destroy(mosq);
-    mosq = NULL;
+int _link_set_lwt(char *sub_topic, int qos)
+{
+    int rc;
+    char topic[ACLK_MAX_TOPIC + 1];
+    char payload[512];
+    char *final_topic;
 
-    aclk_lws_wss_client_destroy();
+    final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
+    if (unlikely(!final_topic)) {
+        errno = 0;
+        error("Unable to build outgoing topic; truncated?");
+        return 1;
+    }
+
+    time_t time_created = now_realtime_sec();
+    char *msg_id = create_uuid();
+
+    snprintfz(
+        payload, 511,
+        "{ \"type\": \"disconnect\","
+        " \"msg-id\": \"%s\","
+        " \"timestamp\": %ld,"
+        " \"version\": %d,"
+        " \"payload\": \"unexpected\" }",
+        msg_id, time_created, ACLK_VERSION);
+
+    freez(msg_id);
+
+    rc = mosquitto_will_set(mosq, topic, strlen(payload), (const void *) payload, qos, 0);
+    return rc;
 }
 
 int _link_subscribe(char *topic, int qos)

+ 3 - 0
aclk/mqtt.h

@@ -16,7 +16,10 @@ int _mqtt_lib_init();
 int _link_subscribe(char *topic, int qos);
 int _link_send_message(char *topic, unsigned char *message, int *mid);
 const char *_link_strerror(int rc);
+int _link_set_lwt(char *topic, int qos);
+
 
 int aclk_handle_cloud_request(char *);
+extern char *get_topic(char *sub_topic, char *final_topic, int max_size);
 
 #endif //NETDATA_MQTT_H