mqtt.c 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include <libnetdata/json/json.h>
  3. #include "../daemon/common.h"
  4. #include "mqtt.h"
  5. #include "aclk_lws_wss_client.h"
  6. extern usec_t aclk_session_us;
  7. extern time_t aclk_session_sec;
  8. inline const char *_link_strerror(int rc)
  9. {
  10. return mosquitto_strerror(rc);
  11. }
  12. #ifdef NETDATA_INTERNAL_CHECKS
  13. static struct timeval sendTimes[1024];
  14. #endif
  15. static struct mosquitto *mosq = NULL;
  16. void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
  17. {
  18. UNUSED(mosq);
  19. UNUSED(obj);
  20. aclk_handle_cloud_request(msg->payload);
  21. }
  22. void publish_callback(struct mosquitto *mosq, void *obj, int rc)
  23. {
  24. UNUSED(mosq);
  25. UNUSED(obj);
  26. UNUSED(rc);
  27. #ifdef NETDATA_INTERNAL_CHECKS
  28. struct timeval now, *orig;
  29. now_realtime_timeval(&now);
  30. orig = &sendTimes[ rc & 0x3ff ];
  31. int64_t diff = (now.tv_sec - orig->tv_sec) * USEC_PER_SEC + (now.tv_usec - orig->tv_usec);
  32. info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff / 1000);
  33. #endif
  34. return;
  35. }
  36. void connect_callback(struct mosquitto *mosq, void *obj, int rc)
  37. {
  38. UNUSED(mosq);
  39. UNUSED(obj);
  40. UNUSED(rc);
  41. info("Connection to cloud estabilished");
  42. aclk_connect();
  43. return;
  44. }
  45. void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
  46. {
  47. UNUSED(mosq);
  48. UNUSED(obj);
  49. UNUSED(rc);
  50. if (netdata_exit)
  51. info("Connection to cloud terminated due to agent shutdown");
  52. else {
  53. errno = 0;
  54. error("Connection to cloud failed");
  55. }
  56. aclk_disconnect();
  57. aclk_lws_wss_mqtt_layer_disconect_notif();
  58. return;
  59. }
  60. void _show_mqtt_info()
  61. {
  62. int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
  63. libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
  64. info(
  65. "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor,
  66. libmosq_revision);
  67. }
  68. size_t _mqtt_external_write_hook(void *buf, size_t count)
  69. {
  70. return aclk_lws_wss_client_write(buf, count);
  71. }
  72. size_t _mqtt_external_read_hook(void *buf, size_t count)
  73. {
  74. return aclk_lws_wss_client_read(buf, count);
  75. }
  76. int _mqtt_lib_init()
  77. {
  78. int rc;
  79. //int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
  80. /* Commenting out now as it is unused - do not delete, this is needed for the on-prem version.
  81. char *ca_crt;
  82. char *server_crt;
  83. char *server_key;
  84. // show library info so can have it in the logfile
  85. //libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
  86. ca_crt = config_get(CONFIG_SECTION_CLOUD, "link cert", "*");
  87. server_crt = config_get(CONFIG_SECTION_CLOUD, "link server cert", "*");
  88. server_key = config_get(CONFIG_SECTION_CLOUD, "link server key", "*");
  89. if (ca_crt[0] == '*') {
  90. freez(ca_crt);
  91. ca_crt = NULL;
  92. }
  93. if (server_crt[0] == '*') {
  94. freez(server_crt);
  95. server_crt = NULL;
  96. }
  97. if (server_key[0] == '*') {
  98. freez(server_key);
  99. server_key = NULL;
  100. }
  101. */
  102. // info(
  103. // "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor,
  104. // libmosq_revision);
  105. rc = mosquitto_lib_init();
  106. if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
  107. error("Failed to initialize MQTT (libmosquitto library)");
  108. return 1;
  109. }
  110. return 0;
  111. }
  112. static int _mqtt_create_connection(char *username, char *password)
  113. {
  114. if (mosq != NULL)
  115. mosquitto_destroy(mosq);
  116. mosq = mosquitto_new(username, true, NULL);
  117. if (unlikely(!mosq)) {
  118. mosquitto_lib_cleanup();
  119. error("MQTT new structure -- %s", mosquitto_strerror(errno));
  120. return MOSQ_ERR_UNKNOWN;
  121. }
  122. // Record the session start time to allow a nominal LWT timestamp
  123. usec_t now = now_realtime_usec();
  124. aclk_session_sec = now / USEC_PER_SEC;
  125. aclk_session_us = now % USEC_PER_SEC;
  126. _link_set_lwt("outbound/meta", 2);
  127. mosquitto_connect_callback_set(mosq, connect_callback);
  128. mosquitto_disconnect_callback_set(mosq, disconnect_callback);
  129. mosquitto_publish_callback_set(mosq, publish_callback);
  130. info("Using challenge-response: %s / %s", username, password);
  131. mosquitto_username_pw_set(mosq, username, password);
  132. int rc = mosquitto_threaded_set(mosq, 1);
  133. if (unlikely(rc != MOSQ_ERR_SUCCESS))
  134. error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc));
  135. #if defined(LIBMOSQUITTO_VERSION_NUMBER) >= 1006000
  136. rc = mosquitto_int_option(mosq, MQTT_PROTOCOL_V311, 0);
  137. if (unlikely(rc != MOSQ_ERR_SUCCESS))
  138. error("MQTT protocol specification rc = %d (%s)", rc, mosquitto_strerror(rc));
  139. rc = mosquitto_int_option(mosq, MOSQ_OPT_SEND_MAXIMUM, 1);
  140. info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc));
  141. #endif
  142. return rc;
  143. }
  144. static int _link_mqtt_connect(char *aclk_hostname, int aclk_port)
  145. {
  146. int rc;
  147. rc = mosquitto_connect_async(mosq, aclk_hostname, aclk_port, ACLK_PING_INTERVAL);
  148. if (unlikely(rc != MOSQ_ERR_SUCCESS))
  149. error(
  150. "Failed to establish link to [%s:%d] MQTT status = %d (%s)", aclk_hostname, aclk_port, rc,
  151. mosquitto_strerror(rc));
  152. else
  153. info("Establishing MQTT link to [%s:%d]", aclk_hostname, aclk_port);
  154. return rc;
  155. }
  156. static inline void _link_mosquitto_write()
  157. {
  158. int rc;
  159. if (unlikely(!mosq)) {
  160. return;
  161. }
  162. rc = mosquitto_loop_misc(mosq);
  163. if (unlikely(rc != MOSQ_ERR_SUCCESS))
  164. debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc));
  165. if (likely(mosquitto_want_write(mosq))) {
  166. rc = mosquitto_loop_write(mosq, 1);
  167. if (rc != MOSQ_ERR_SUCCESS)
  168. debug(D_ACLK, "ACLK: failure during mosquitto_loop_write %s", mosquitto_strerror(rc));
  169. }
  170. }
  171. void aclk_lws_connection_established(char *hostname, int port)
  172. {
  173. _link_mqtt_connect(hostname, port); // Parameters only used for logging, lower layer connected.
  174. _link_mosquitto_write();
  175. }
  176. void aclk_lws_connection_data_received()
  177. {
  178. int rc = mosquitto_loop_read(mosq, 1);
  179. if (rc != MOSQ_ERR_SUCCESS)
  180. debug(D_ACLK, "ACLK: failure during mosquitto_loop_read %s", mosquitto_strerror(rc));
  181. }
  182. void aclk_lws_connection_closed()
  183. {
  184. aclk_disconnect();
  185. }
  186. int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password)
  187. {
  188. if(aclk_lws_wss_connect(aclk_hostname, aclk_port))
  189. return MOSQ_ERR_UNKNOWN;
  190. aclk_lws_wss_service_loop();
  191. int rc = _mqtt_create_connection(username, password);
  192. if (rc!= MOSQ_ERR_SUCCESS)
  193. return rc;
  194. mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook);
  195. return rc;
  196. }
  197. inline int _link_event_loop()
  198. {
  199. // TODO: Check if we need to flush undelivered messages from libmosquitto on new connection attempts (QoS=1).
  200. _link_mosquitto_write();
  201. aclk_lws_wss_service_loop();
  202. // this is because if use LWS we don't want
  203. // mqtt to reconnect by itself
  204. return MOSQ_ERR_SUCCESS;
  205. }
  206. void _link_shutdown()
  207. {
  208. int rc;
  209. if (likely(!mosq))
  210. return;
  211. rc = mosquitto_disconnect(mosq);
  212. switch (rc) {
  213. case MOSQ_ERR_SUCCESS:
  214. info("MQTT disconnected from broker");
  215. break;
  216. default:
  217. info("MQTT invalid structure");
  218. break;
  219. };
  220. }
  221. int _link_set_lwt(char *sub_topic, int qos)
  222. {
  223. int rc;
  224. char topic[ACLK_MAX_TOPIC + 1];
  225. char *final_topic;
  226. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  227. if (unlikely(!final_topic)) {
  228. errno = 0;
  229. error("Unable to build outgoing topic; truncated?");
  230. return 1;
  231. }
  232. usec_t lwt_time = aclk_session_sec * USEC_PER_SEC + aclk_session_us + 1;
  233. BUFFER *b = buffer_create(512);
  234. aclk_create_header(b, "disconnect", NULL, lwt_time / USEC_PER_SEC, lwt_time % USEC_PER_SEC);
  235. buffer_strcat(b, ", \"payload\": \"unexpected\" }");
  236. rc = mosquitto_will_set(mosq, topic, buffer_strlen(b), buffer_tostring(b), qos, 0);
  237. buffer_free(b);
  238. return rc;
  239. }
  240. int _link_subscribe(char *topic, int qos)
  241. {
  242. int rc;
  243. if (unlikely(!mosq))
  244. return 1;
  245. mosquitto_message_callback_set(mosq, mqtt_message_callback);
  246. rc = mosquitto_subscribe(mosq, NULL, topic, qos);
  247. if (unlikely(rc)) {
  248. errno = 0;
  249. error("Failed to register subscription %d (%s)", rc, mosquitto_strerror(rc));
  250. return 1;
  251. }
  252. _link_mosquitto_write();
  253. return 0;
  254. }
  255. /*
  256. * Send a message to the cloud to specific topic
  257. *
  258. */
  259. int _link_send_message(char *topic, unsigned char *message, int *mid)
  260. {
  261. int rc;
  262. size_t write_q, write_q_bytes, read_q;
  263. rc = mosquitto_pub_topic_check(topic);
  264. if (unlikely(rc != MOSQ_ERR_SUCCESS))
  265. return rc;
  266. int msg_len = strlen((char*)message);
  267. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  268. rc = mosquitto_publish(mosq, mid, topic, msg_len, message, ACLK_QOS, 0);
  269. #ifdef NETDATA_INTERNAL_CHECKS
  270. char msg_head[64];
  271. memset(msg_head, 0, sizeof(msg_head));
  272. strncpy(msg_head, (char*)message, 60);
  273. for (size_t i = 0; i < sizeof(msg_head); i++)
  274. if(msg_head[i] == '\n') msg_head[i] = ' ';
  275. info("Sending MQTT len=%d mid=%d wq=%zu (%zu-bytes) readq=%zu: %s", msg_len,
  276. *mid, write_q, write_q_bytes, read_q, msg_head);
  277. now_realtime_timeval(&sendTimes[ *mid & 0x3ff ]);
  278. #endif
  279. // TODO: Add better handling -- error will flood the logfile here
  280. if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
  281. errno = 0;
  282. error("MQTT message failed : %s", mosquitto_strerror(rc));
  283. }
  284. _link_mosquitto_write();
  285. return rc;
  286. }