aclk.c 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk.h"
  3. #ifdef ENABLE_ACLK
  4. #include "aclk_stats.h"
  5. #include "mqtt_wss_client.h"
  6. #include "aclk_otp.h"
  7. #include "aclk_tx_msgs.h"
  8. #include "aclk_query.h"
  9. #include "aclk_query_queue.h"
  10. #include "aclk_util.h"
  11. #include "aclk_rx_msgs.h"
  12. #include "https_client.h"
  13. #include "schema-wrappers/schema_wrappers.h"
  14. #include "aclk_capas.h"
  15. #include "aclk_proxy.h"
  16. #ifdef ACLK_LOG_CONVERSATION_DIR
  17. #include <sys/types.h>
  18. #include <sys/stat.h>
  19. #include <fcntl.h>
  20. #endif
  21. #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
  22. #endif /* ENABLE_ACLK */
  23. int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
  24. int aclk_rcvd_cloud_msgs = 0;
  25. int aclk_connection_counter = 0;
  26. int disconnect_req = 0;
  27. int aclk_connected = 0;
  28. int aclk_ctx_based = 0;
  29. int aclk_disable_runtime = 0;
  30. int aclk_stats_enabled;
  31. int aclk_kill_link = 0;
  32. usec_t aclk_session_us = 0;
  33. time_t aclk_session_sec = 0;
  34. time_t last_conn_time_mqtt = 0;
  35. time_t last_conn_time_appl = 0;
  36. time_t last_disconnect_time = 0;
  37. time_t next_connection_attempt = 0;
  38. float last_backoff_value = 0;
  39. time_t aclk_block_until = 0;
  40. int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload
  41. #ifdef ENABLE_ACLK
  42. mqtt_wss_client mqttwss_client;
  43. netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
  44. #define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
  45. #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
  46. struct aclk_shared_state aclk_shared_state = {
  47. .mqtt_shutdown_msg_id = -1,
  48. .mqtt_shutdown_msg_rcvd = 0
  49. };
  50. #ifdef MQTT_WSS_DEBUG
  51. #include <openssl/ssl.h>
  52. #define DEFAULT_SSKEYLOGFILE_NAME "SSLKEYLOGFILE"
  53. const char *ssl_log_filename = NULL;
  54. FILE *ssl_log_file = NULL;
  55. static void aclk_ssl_keylog_cb(const SSL *ssl, const char *line)
  56. {
  57. (void)ssl;
  58. if (!ssl_log_file)
  59. ssl_log_file = fopen(ssl_log_filename, "a");
  60. if (!ssl_log_file) {
  61. error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename);
  62. return;
  63. }
  64. fputs(line, ssl_log_file);
  65. putc('\n', ssl_log_file);
  66. fflush(ssl_log_file);
  67. }
  68. #endif
  69. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  70. OSSL_DECODER_CTX *aclk_dctx = NULL;
  71. EVP_PKEY *aclk_private_key = NULL;
  72. #else
  73. static RSA *aclk_private_key = NULL;
  74. #endif
  75. static int load_private_key()
  76. {
  77. if (aclk_private_key != NULL) {
  78. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  79. EVP_PKEY_free(aclk_private_key);
  80. if (aclk_dctx)
  81. OSSL_DECODER_CTX_free(aclk_dctx);
  82. aclk_dctx = NULL;
  83. #else
  84. RSA_free(aclk_private_key);
  85. #endif
  86. }
  87. aclk_private_key = NULL;
  88. char filename[FILENAME_MAX + 1];
  89. snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
  90. long bytes_read;
  91. char *private_key = read_by_filename(filename, &bytes_read);
  92. if (!private_key) {
  93. error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  94. return 1;
  95. }
  96. debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  97. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  98. if (key_bio==NULL) {
  99. error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  100. goto biofailed;
  101. }
  102. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  103. aclk_dctx = OSSL_DECODER_CTX_new_for_pkey(&aclk_private_key, "PEM", NULL,
  104. "RSA",
  105. OSSL_KEYMGMT_SELECT_PRIVATE_KEY,
  106. NULL, NULL);
  107. if (!aclk_dctx) {
  108. error("Loading private key (from claiming) failed - no OpenSSL Decoders found");
  109. goto biofailed;
  110. }
  111. // this is necesseary to avoid RSA key with wrong size
  112. if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) {
  113. error("Decoding private key (from claiming) failed - invalid format.");
  114. goto biofailed;
  115. }
  116. #else
  117. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  118. #endif
  119. BIO_free(key_bio);
  120. if (aclk_private_key!=NULL)
  121. {
  122. freez(private_key);
  123. return 0;
  124. }
  125. char err[512];
  126. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  127. error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  128. biofailed:
  129. freez(private_key);
  130. return 1;
  131. }
  132. static int wait_till_cloud_enabled()
  133. {
  134. info("Waiting for Cloud to be enabled");
  135. while (!netdata_cloud_setting) {
  136. sleep_usec(USEC_PER_SEC * 1);
  137. if (!service_running(SERVICE_ACLK))
  138. return 1;
  139. }
  140. return 0;
  141. }
  142. /**
  143. * Will block until agent is claimed. Returns only if agent claimed
  144. * or if agent needs to shutdown.
  145. *
  146. * @return `0` if agent has been claimed,
  147. * `1` if interrupted due to agent shutting down
  148. */
  149. static int wait_till_agent_claimed(void)
  150. {
  151. //TODO prevent malloc and freez
  152. char *agent_id = get_agent_claimid();
  153. while (likely(!agent_id)) {
  154. sleep_usec(USEC_PER_SEC * 1);
  155. if (!service_running(SERVICE_ACLK))
  156. return 1;
  157. agent_id = get_agent_claimid();
  158. }
  159. freez(agent_id);
  160. return 0;
  161. }
  162. /**
  163. * Checks everything is ready for connection
  164. * agent claimed, cloud url set and private key available
  165. *
  166. * @param aclk_hostname points to location where string pointer to hostname will be set
  167. * @param aclk_port port to int where port will be saved
  168. *
  169. * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated
  170. */
  171. static int wait_till_agent_claim_ready()
  172. {
  173. url_t url;
  174. while (service_running(SERVICE_ACLK)) {
  175. if (wait_till_agent_claimed())
  176. return 1;
  177. // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
  178. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
  179. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  180. if (cloud_base_url == NULL) {
  181. error("Do not move the cloud base url out of post_conf_load!!");
  182. return 1;
  183. }
  184. // We just check configuration is valid here
  185. // TODO make it without malloc/free
  186. memset(&url, 0, sizeof(url_t));
  187. if (url_parse(cloud_base_url, &url)) {
  188. error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
  189. url_t_destroy(&url);
  190. sleep(5);
  191. continue;
  192. }
  193. url_t_destroy(&url);
  194. if (!load_private_key())
  195. return 0;
  196. sleep(5);
  197. }
  198. return 1;
  199. }
  200. void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
  201. {
  202. switch(log_type) {
  203. case MQTT_WSS_LOG_ERROR:
  204. case MQTT_WSS_LOG_FATAL:
  205. case MQTT_WSS_LOG_WARN:
  206. error_report("%s", str);
  207. return;
  208. case MQTT_WSS_LOG_INFO:
  209. info("%s", str);
  210. return;
  211. case MQTT_WSS_LOG_DEBUG:
  212. debug(D_ACLK, "%s", str);
  213. return;
  214. default:
  215. error("Unknown log type from mqtt_wss");
  216. }
  217. }
  218. //TODO prevent big buffer on stack
  219. #define RX_MSGLEN_MAX 4096
  220. static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
  221. {
  222. UNUSED(qos);
  223. aclk_rcvd_cloud_msgs++;
  224. if (msglen > RX_MSGLEN_MAX)
  225. error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
  226. debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
  227. if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
  228. error("Link is shutting down. Ignoring incoming message.");
  229. return;
  230. }
  231. const char *msgtype = strrchr(topic, '/');
  232. if (unlikely(!msgtype)) {
  233. error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
  234. return;
  235. }
  236. msgtype++;
  237. if (unlikely(!*msgtype)) {
  238. error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
  239. return;
  240. }
  241. #ifdef ACLK_LOG_CONVERSATION_DIR
  242. #define FN_MAX_LEN 512
  243. char filename[FN_MAX_LEN];
  244. int logfd;
  245. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
  246. logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
  247. if(logfd < 0)
  248. error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
  249. write(logfd, msg, msglen);
  250. close(logfd);
  251. #endif
  252. aclk_handle_new_cloud_msg(msgtype, msg, msglen, topic);
  253. }
  254. static void puback_callback(uint16_t packet_id)
  255. {
  256. if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
  257. last_conn_time_appl = now_realtime_sec();
  258. aclk_tbeb_reset();
  259. }
  260. #ifdef NETDATA_INTERNAL_CHECKS
  261. aclk_stats_msg_puback(packet_id);
  262. #endif
  263. if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
  264. info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
  265. aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
  266. }
  267. }
  268. static int read_query_thread_count()
  269. {
  270. int threads = MIN(get_netdata_cpus()/2, 6);
  271. threads = MAX(threads, 2);
  272. threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  273. if(threads < 1) {
  274. error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
  275. threads = 1;
  276. config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  277. }
  278. return threads;
  279. }
  280. void aclk_graceful_disconnect(mqtt_wss_client client);
  281. /* Keeps connection alive and handles all network communications.
  282. * Returns on error or when netdata is shutting down.
  283. * @param client instance of mqtt_wss_client
  284. * @returns 0 - Netdata Exits
  285. * >0 - Error happened. Reconnect and start over.
  286. */
  287. static int handle_connection(mqtt_wss_client client)
  288. {
  289. time_t last_periodic_query_wakeup = now_monotonic_sec();
  290. while (service_running(SERVICE_ACLK)) {
  291. // timeout 1000 to check at least once a second
  292. // for netdata_exit
  293. if (mqtt_wss_service(client, 1000) < 0){
  294. error_report("Connection Error or Dropped");
  295. return 1;
  296. }
  297. if (disconnect_req || aclk_kill_link) {
  298. info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
  299. disconnect_req ? "true" : "false",
  300. aclk_kill_link ? "true" : "false");
  301. disconnect_req = 0;
  302. aclk_kill_link = 0;
  303. aclk_graceful_disconnect(client);
  304. aclk_queue_unlock();
  305. aclk_shared_state.mqtt_shutdown_msg_id = -1;
  306. aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
  307. return 1;
  308. }
  309. // mqtt_wss_service will return faster than in one second
  310. // if there is enough work to do
  311. time_t now = now_monotonic_sec();
  312. if (last_periodic_query_wakeup < now) {
  313. // wake up at least one Query Thread at least
  314. // once per second
  315. last_periodic_query_wakeup = now;
  316. QUERY_THREAD_WAKEUP;
  317. }
  318. }
  319. return 0;
  320. }
  321. static inline void mqtt_connected_actions(mqtt_wss_client client)
  322. {
  323. char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
  324. if (!topic)
  325. error("Unable to fetch topic for COMMAND (to subscribe)");
  326. else
  327. mqtt_wss_subscribe(client, topic, 1);
  328. topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
  329. if (!topic)
  330. error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
  331. else
  332. mqtt_wss_subscribe(client, topic, 1);
  333. aclk_stats_upd_online(1);
  334. aclk_connected = 1;
  335. aclk_pubacks_per_conn = 0;
  336. aclk_rcvd_cloud_msgs = 0;
  337. aclk_connection_counter++;
  338. aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER;
  339. while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL)
  340. mqtt_wss_set_topic_alias(client, topic);
  341. aclk_send_agent_connection_update(client, 1);
  342. }
  343. void aclk_graceful_disconnect(mqtt_wss_client client)
  344. {
  345. info("Preparing to gracefully shutdown ACLK connection");
  346. aclk_queue_lock();
  347. aclk_queue_flush();
  348. aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
  349. time_t t = now_monotonic_sec();
  350. while (!mqtt_wss_service(client, 100)) {
  351. if (now_monotonic_sec() - t >= 2) {
  352. error("Wasn't able to gracefully shutdown ACLK in time!");
  353. break;
  354. }
  355. if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
  356. info("MQTT App Layer `disconnect` message sent successfully");
  357. break;
  358. }
  359. }
  360. info("ACLK link is down");
  361. log_access("ACLK DISCONNECTED");
  362. aclk_stats_upd_online(0);
  363. last_disconnect_time = now_realtime_sec();
  364. aclk_connected = 0;
  365. info("Attempting to gracefully shutdown the MQTT/WSS connection");
  366. mqtt_wss_disconnect(client, 1000);
  367. }
  368. static unsigned long aclk_reconnect_delay() {
  369. unsigned long recon_delay;
  370. time_t now;
  371. if (aclk_disable_runtime) {
  372. aclk_tbeb_reset();
  373. return 60 * MSEC_PER_SEC;
  374. }
  375. now = now_monotonic_sec();
  376. if (aclk_block_until) {
  377. if (now < aclk_block_until) {
  378. recon_delay = aclk_block_until - now;
  379. recon_delay *= MSEC_PER_SEC;
  380. aclk_block_until = 0;
  381. aclk_tbeb_reset();
  382. return recon_delay;
  383. }
  384. aclk_block_until = 0;
  385. }
  386. if (!aclk_env || !aclk_env->backoff.base)
  387. return aclk_tbeb_delay(0, 2, 0, 1024);
  388. return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
  389. }
  390. /* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled
  391. * @return 0 - Go ahead and connect (delay expired)
  392. * 1 - netdata_exit
  393. */
  394. #define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
  395. static int aclk_block_till_recon_allowed() {
  396. unsigned long recon_delay = aclk_reconnect_delay();
  397. next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
  398. last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
  399. info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
  400. // we want to wake up from time to time to check netdata_exit
  401. while (recon_delay)
  402. {
  403. if (!service_running(SERVICE_ACLK))
  404. return 1;
  405. if (recon_delay > NETDATA_EXIT_POLL_MS) {
  406. sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
  407. recon_delay -= NETDATA_EXIT_POLL_MS;
  408. continue;
  409. }
  410. sleep_usec(recon_delay * USEC_PER_MS);
  411. recon_delay = 0;
  412. }
  413. return !service_running(SERVICE_ACLK);
  414. }
  415. #ifndef ACLK_DISABLE_CHALLENGE
  416. /* Cloud returns transport list ordered with highest
  417. * priority first. This function selects highest prio
  418. * transport that we can actually use (support)
  419. */
  420. static int aclk_get_transport_idx(aclk_env_t *env) {
  421. for (size_t i = 0; i < env->transport_count; i++) {
  422. // currently we support only MQTT 5
  423. // therefore select first transport that matches
  424. if (env->transports[i]->type == ACLK_TRP_MQTT_5) {
  425. return i;
  426. }
  427. }
  428. return -1;
  429. }
  430. #endif
  431. /* Attempts to make a connection to MQTT broker over WSS
  432. * @param client instance of mqtt_wss_client
  433. * @return 0 - Successful Connection,
  434. * <0 - Irrecoverable Error -> Kill ACLK,
  435. * >0 - netdata_exit
  436. */
  437. #define CLOUD_BASE_URL_READ_RETRY 30
  438. #ifdef ACLK_SSL_ALLOW_SELF_SIGNED
  439. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED
  440. #else
  441. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL
  442. #endif
  443. static int aclk_attempt_to_connect(mqtt_wss_client client)
  444. {
  445. int ret;
  446. url_t base_url;
  447. #ifndef ACLK_DISABLE_CHALLENGE
  448. url_t auth_url;
  449. url_t mqtt_url;
  450. #endif
  451. while (service_running(SERVICE_ACLK)) {
  452. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  453. if (cloud_base_url == NULL) {
  454. error_report("Do not move the cloud base url out of post_conf_load!!");
  455. return -1;
  456. }
  457. if (aclk_block_till_recon_allowed())
  458. return 1;
  459. info("Attempting connection now");
  460. memset(&base_url, 0, sizeof(url_t));
  461. if (url_parse(cloud_base_url, &base_url)) {
  462. error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
  463. sleep(CLOUD_BASE_URL_READ_RETRY);
  464. url_t_destroy(&base_url);
  465. continue;
  466. }
  467. struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
  468. aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, (char**)&proxy_conf.username, (char**)&proxy_conf.password, &proxy_conf.type);
  469. struct mqtt_connect_params mqtt_conn_params = {
  470. .clientid = "anon",
  471. .username = "anon",
  472. .password = "anon",
  473. .will_topic = "lwt",
  474. .will_msg = NULL,
  475. .will_flags = MQTT_WSS_PUB_QOS2,
  476. .keep_alive = 60,
  477. .drop_on_publish_fail = 1
  478. };
  479. #ifndef ACLK_DISABLE_CHALLENGE
  480. if (aclk_env) {
  481. aclk_env_t_destroy(aclk_env);
  482. freez(aclk_env);
  483. }
  484. aclk_env = callocz(1, sizeof(aclk_env_t));
  485. ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
  486. url_t_destroy(&base_url);
  487. if (ret) {
  488. error_report("Failed to Get ACLK environment");
  489. // delay handled by aclk_block_till_recon_allowed
  490. continue;
  491. }
  492. if (!service_running(SERVICE_ACLK))
  493. return 1;
  494. if (aclk_env->encoding != ACLK_ENC_PROTO) {
  495. error_report("This agent can only use the new cloud protocol but cloud requested old one.");
  496. continue;
  497. }
  498. if (!aclk_env_has_capa("proto")) {
  499. error_report("Can't use encoding=proto without at least \"proto\" capability.");
  500. continue;
  501. }
  502. info("New ACLK protobuf protocol negotiated successfully (/env response).");
  503. memset(&auth_url, 0, sizeof(url_t));
  504. if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
  505. error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
  506. url_t_destroy(&auth_url);
  507. continue;
  508. }
  509. ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
  510. url_t_destroy(&auth_url);
  511. if (ret) {
  512. error_report("Error passing Challenge/Response to get OTP");
  513. continue;
  514. }
  515. // aclk_get_topic moved here as during OTP we
  516. // generate the topic cache
  517. mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
  518. if (!mqtt_conn_params.will_topic) {
  519. error_report("Couldn't get LWT topic. Will not send LWT.");
  520. continue;
  521. }
  522. // Do the MQTT connection
  523. ret = aclk_get_transport_idx(aclk_env);
  524. if (ret < 0) {
  525. error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
  526. continue;
  527. }
  528. memset(&mqtt_url, 0, sizeof(url_t));
  529. if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
  530. error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
  531. url_t_destroy(&mqtt_url);
  532. continue;
  533. }
  534. #endif
  535. aclk_session_newarch = now_realtime_usec();
  536. aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
  537. aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
  538. mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
  539. #ifdef ACLK_DISABLE_CHALLENGE
  540. ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  541. url_t_destroy(&base_url);
  542. #else
  543. ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  544. url_t_destroy(&mqtt_url);
  545. freez((char*)mqtt_conn_params.clientid);
  546. freez((char*)mqtt_conn_params.password);
  547. freez((char*)mqtt_conn_params.username);
  548. #endif
  549. freez((char*)mqtt_conn_params.will_msg);
  550. freez((char*)proxy_conf.host);
  551. freez((char*)proxy_conf.username);
  552. freez((char*)proxy_conf.password);
  553. if (!ret) {
  554. last_conn_time_mqtt = now_realtime_sec();
  555. info("ACLK connection successfully established");
  556. log_access("ACLK CONNECTED");
  557. mqtt_connected_actions(client);
  558. return 0;
  559. }
  560. error_report("Connect failed");
  561. }
  562. return 1;
  563. }
  564. /**
  565. * Main agent cloud link thread
  566. *
  567. * This thread will simply call the main event loop that handles
  568. * pending requests - both inbound and outbound
  569. *
  570. * @param ptr is a pointer to the netdata_static_thread structure.
  571. *
  572. * @return It always returns NULL
  573. */
  574. void *aclk_main(void *ptr)
  575. {
  576. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  577. struct aclk_stats_thread *stats_thread = NULL;
  578. struct aclk_query_threads query_threads;
  579. query_threads.thread_list = NULL;
  580. ACLK_PROXY_TYPE proxy_type;
  581. aclk_get_proxy(&proxy_type);
  582. if (proxy_type == PROXY_TYPE_SOCKS5) {
  583. error("SOCKS5 proxy is not supported by ACLK-NG yet.");
  584. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  585. return NULL;
  586. }
  587. unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers();
  588. // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
  589. // as it must notify the far end that it shutdown gracefully and avoid the LWT.
  590. netdata_thread_disable_cancelability();
  591. #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
  592. info("Killing ACLK thread -> cloud functionality has been disabled");
  593. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  594. return NULL;
  595. #endif
  596. query_threads.count = read_query_thread_count();
  597. if (wait_till_cloud_enabled())
  598. goto exit;
  599. if (wait_till_agent_claim_ready())
  600. goto exit;
  601. if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
  602. error("Couldn't initialize MQTT_WSS network library");
  603. goto exit;
  604. }
  605. #ifdef MQTT_WSS_DEBUG
  606. size_t default_ssl_log_filename_size = strlen(netdata_configured_log_dir) + strlen(DEFAULT_SSKEYLOGFILE_NAME) + 2;
  607. char *default_ssl_log_filename = mallocz(default_ssl_log_filename_size);
  608. snprintfz(default_ssl_log_filename, default_ssl_log_filename_size, "%s/%s", netdata_configured_log_dir, DEFAULT_SSKEYLOGFILE_NAME);
  609. ssl_log_filename = config_get(CONFIG_SECTION_CLOUD, "aclk ssl keylog file", default_ssl_log_filename);
  610. freez(default_ssl_log_filename);
  611. if (ssl_log_filename) {
  612. error_report("SSLKEYLOGFILE active (path:\"%s\")!", ssl_log_filename);
  613. mqtt_wss_set_SSL_CTX_keylog_cb(mqttwss_client, aclk_ssl_keylog_cb);
  614. }
  615. #endif
  616. // Enable MQTT buffer growth if necessary
  617. // e.g. old cloud architecture clients with huge nodes
  618. // that send JSON payloads of 10 MB as single messages
  619. mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
  620. aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled);
  621. if (aclk_stats_enabled) {
  622. stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
  623. stats_thread->thread = mallocz(sizeof(netdata_thread_t));
  624. stats_thread->query_thread_count = query_threads.count;
  625. stats_thread->client = mqttwss_client;
  626. aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
  627. netdata_thread_create(
  628. stats_thread->thread, "ACLK_STATS", NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
  629. stats_thread);
  630. }
  631. // Keep reconnecting and talking until our time has come
  632. // and the Grim Reaper (netdata_exit) calls
  633. do {
  634. if (aclk_attempt_to_connect(mqttwss_client))
  635. goto exit_full;
  636. if (unlikely(!query_threads.thread_list))
  637. aclk_query_threads_start(&query_threads, mqttwss_client);
  638. if (handle_connection(mqttwss_client)) {
  639. aclk_stats_upd_online(0);
  640. last_disconnect_time = now_realtime_sec();
  641. aclk_connected = 0;
  642. log_access("ACLK DISCONNECTED");
  643. }
  644. } while (service_running(SERVICE_ACLK));
  645. aclk_graceful_disconnect(mqttwss_client);
  646. #ifdef MQTT_WSS_DEBUG
  647. if (ssl_log_file)
  648. fclose(ssl_log_file);
  649. #endif
  650. exit_full:
  651. // Tear Down
  652. QUERY_THREAD_WAKEUP_ALL;
  653. aclk_query_threads_cleanup(&query_threads);
  654. if (aclk_stats_enabled) {
  655. netdata_thread_join(*stats_thread->thread, NULL);
  656. aclk_stats_thread_cleanup();
  657. freez(stats_thread->thread);
  658. freez(stats_thread);
  659. }
  660. free_topic_cache();
  661. mqtt_wss_destroy(mqttwss_client);
  662. exit:
  663. if (aclk_env) {
  664. aclk_env_t_destroy(aclk_env);
  665. freez(aclk_env);
  666. }
  667. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  668. return NULL;
  669. }
  670. void aclk_host_state_update(RRDHOST *host, int cmd)
  671. {
  672. uuid_t node_id;
  673. int ret = 0;
  674. if (!aclk_connected)
  675. return;
  676. if (host->node_id && !uuid_is_null(*host->node_id)) {
  677. uuid_copy(node_id, *host->node_id);
  678. }
  679. else {
  680. ret = get_node_id(&host->host_uuid, &node_id);
  681. if (ret > 0) {
  682. // this means we were not able to check if node_id already present
  683. error("Unable to check for node_id. Ignoring the host state update.");
  684. return;
  685. }
  686. if (ret < 0) {
  687. // node_id not found
  688. aclk_query_t create_query;
  689. create_query = aclk_query_new(REGISTER_NODE);
  690. rrdhost_aclk_state_lock(localhost);
  691. node_instance_creation_t node_instance_creation = {
  692. .claim_id = localhost->aclk_state.claimed_id,
  693. .hops = host->system_info->hops,
  694. .hostname = rrdhost_hostname(host),
  695. .machine_guid = host->machine_guid};
  696. create_query->data.bin_payload.payload =
  697. generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
  698. rrdhost_aclk_state_unlock(localhost);
  699. create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
  700. create_query->data.bin_payload.msg_name = "CreateNodeInstance";
  701. info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
  702. aclk_queue_query(create_query);
  703. return;
  704. }
  705. }
  706. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  707. node_instance_connection_t node_state_update = {
  708. .hops = host->system_info->hops,
  709. .live = cmd,
  710. .queryable = 1,
  711. .session_id = aclk_session_newarch
  712. };
  713. node_state_update.node_id = mallocz(UUID_STR_LEN);
  714. uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
  715. node_state_update.capabilities = aclk_get_agent_capas();
  716. rrdhost_aclk_state_lock(localhost);
  717. node_state_update.claim_id = localhost->aclk_state.claimed_id;
  718. query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
  719. rrdhost_aclk_state_unlock(localhost);
  720. info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
  721. host->system_info->hops);
  722. freez((void*)node_state_update.node_id);
  723. query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
  724. query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
  725. aclk_queue_query(query);
  726. }
  727. void aclk_send_node_instances()
  728. {
  729. struct node_instance_list *list_head = get_node_list();
  730. struct node_instance_list *list = list_head;
  731. if (unlikely(!list)) {
  732. error_report("Failure to get_node_list from DB!");
  733. return;
  734. }
  735. while (!uuid_is_null(list->host_id)) {
  736. if (!uuid_is_null(list->node_id)) {
  737. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  738. node_instance_connection_t node_state_update = {
  739. .live = list->live,
  740. .hops = list->hops,
  741. .queryable = 1,
  742. .session_id = aclk_session_newarch
  743. };
  744. node_state_update.node_id = mallocz(UUID_STR_LEN);
  745. uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
  746. char host_id[UUID_STR_LEN];
  747. uuid_unparse_lower(list->host_id, host_id);
  748. RRDHOST *host = rrdhost_find_by_guid(host_id);
  749. node_state_update.capabilities = aclk_get_node_instance_capas(host);
  750. rrdhost_aclk_state_lock(localhost);
  751. node_state_update.claim_id = localhost->aclk_state.claimed_id;
  752. query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
  753. rrdhost_aclk_state_unlock(localhost);
  754. info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
  755. list->live,
  756. list->hops);
  757. freez((void*)node_state_update.capabilities);
  758. freez((void*)node_state_update.node_id);
  759. query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
  760. query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
  761. aclk_queue_query(query);
  762. } else {
  763. aclk_query_t create_query;
  764. create_query = aclk_query_new(REGISTER_NODE);
  765. node_instance_creation_t node_instance_creation = {
  766. .hops = list->hops,
  767. .hostname = list->hostname,
  768. };
  769. node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
  770. uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
  771. create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
  772. create_query->data.bin_payload.msg_name = "CreateNodeInstance";
  773. rrdhost_aclk_state_lock(localhost);
  774. node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
  775. create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
  776. rrdhost_aclk_state_unlock(localhost);
  777. info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
  778. list->hops);
  779. freez((void *)node_instance_creation.machine_guid);
  780. aclk_queue_query(create_query);
  781. }
  782. freez(list->hostname);
  783. list++;
  784. }
  785. freez(list_head);
  786. }
  787. void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
  788. {
  789. aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
  790. }
  791. static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
  792. {
  793. struct proto_alert_status status;
  794. memset(&status, 0, sizeof(status));
  795. if (get_proto_alert_status(host, &status)) {
  796. buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
  797. return;
  798. }
  799. buffer_sprintf(wb,
  800. "\n\t\tUpdates: %d"
  801. "\n\t\tBatch ID: %"PRIu64
  802. "\n\t\tLast Acked Seq ID: %"PRIu64
  803. "\n\t\tPending Min Seq ID: %"PRIu64
  804. "\n\t\tPending Max Seq ID: %"PRIu64
  805. "\n\t\tLast Submitted Seq ID: %"PRIu64,
  806. status.alert_updates,
  807. status.alerts_batch_id,
  808. status.last_acked_sequence_id,
  809. status.pending_min_sequence_id,
  810. status.pending_max_sequence_id,
  811. status.last_submitted_sequence_id
  812. );
  813. }
  814. #endif /* ENABLE_ACLK */
  815. char *aclk_state(void)
  816. {
  817. #ifndef ENABLE_ACLK
  818. return strdupz("ACLK Available: No");
  819. #else
  820. BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk);
  821. struct tm *tmptr, tmbuf;
  822. char *ret;
  823. buffer_strcat(wb,
  824. "ACLK Available: Yes\n"
  825. "ACLK Version: 2\n"
  826. "Protocols Supported: Protobuf\n"
  827. );
  828. buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5);
  829. char *agent_id = get_agent_claimid();
  830. if (agent_id == NULL)
  831. buffer_strcat(wb, "No\n");
  832. else {
  833. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  834. buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
  835. freez(agent_id);
  836. }
  837. buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
  838. if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
  839. char timebuf[26];
  840. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  841. buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
  842. }
  843. if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
  844. char timebuf[26];
  845. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  846. buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
  847. }
  848. if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
  849. char timebuf[26];
  850. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  851. buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
  852. }
  853. if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
  854. char timebuf[26];
  855. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  856. buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
  857. }
  858. if (aclk_connected) {
  859. buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
  860. RRDHOST *host;
  861. rrd_rdlock();
  862. rrdhost_foreach_read(host) {
  863. buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host));
  864. buffer_strcat(wb, "\tClaimed ID: ");
  865. rrdhost_aclk_state_lock(host);
  866. if (host->aclk_state.claimed_id)
  867. buffer_strcat(wb, host->aclk_state.claimed_id);
  868. else
  869. buffer_strcat(wb, "null");
  870. rrdhost_aclk_state_unlock(host);
  871. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  872. buffer_strcat(wb, "\n\tNode ID: null\n");
  873. } else {
  874. char node_id[GUID_LEN + 1];
  875. uuid_unparse_lower(*host->node_id, node_id);
  876. buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
  877. }
  878. buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
  879. if (host != localhost)
  880. buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false");
  881. buffer_strcat(wb, "\n\tAlert Streaming Status:");
  882. fill_alert_status_for_host(wb, host);
  883. }
  884. rrd_unlock();
  885. }
  886. ret = strdupz(buffer_tostring(wb));
  887. buffer_free(wb);
  888. return ret;
  889. #endif /* ENABLE_ACLK */
  890. }
  891. #ifdef ENABLE_ACLK
  892. static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
  893. {
  894. struct proto_alert_status status;
  895. memset(&status, 0, sizeof(status));
  896. if (get_proto_alert_status(host, &status))
  897. return;
  898. json_object *tmp = json_object_new_int(status.alert_updates);
  899. json_object_object_add(obj, "updates", tmp);
  900. tmp = json_object_new_int(status.alerts_batch_id);
  901. json_object_object_add(obj, "batch-id", tmp);
  902. tmp = json_object_new_int(status.last_acked_sequence_id);
  903. json_object_object_add(obj, "last-acked-seq-id", tmp);
  904. tmp = json_object_new_int(status.pending_min_sequence_id);
  905. json_object_object_add(obj, "pending-min-seq-id", tmp);
  906. tmp = json_object_new_int(status.pending_max_sequence_id);
  907. json_object_object_add(obj, "pending-max-seq-id", tmp);
  908. tmp = json_object_new_int(status.last_submitted_sequence_id);
  909. json_object_object_add(obj, "last-submitted-seq-id", tmp);
  910. }
  911. static json_object *timestamp_to_json(const time_t *t)
  912. {
  913. struct tm *tmptr, tmbuf;
  914. if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
  915. char timebuf[26];
  916. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  917. return json_object_new_string(timebuf);
  918. }
  919. return NULL;
  920. }
  921. #endif /* ENABLE_ACLK */
  922. char *aclk_state_json(void)
  923. {
  924. #ifndef ENABLE_ACLK
  925. return strdupz("{\"aclk-available\":false}");
  926. #else
  927. json_object *tmp, *grp, *msg = json_object_new_object();
  928. tmp = json_object_new_boolean(1);
  929. json_object_object_add(msg, "aclk-available", tmp);
  930. tmp = json_object_new_int(2);
  931. json_object_object_add(msg, "aclk-version", tmp);
  932. grp = json_object_new_array();
  933. tmp = json_object_new_string("Protobuf");
  934. json_object_array_add(grp, tmp);
  935. json_object_object_add(msg, "protocols-supported", grp);
  936. char *agent_id = get_agent_claimid();
  937. tmp = json_object_new_boolean(agent_id != NULL);
  938. json_object_object_add(msg, "agent-claimed", tmp);
  939. if (agent_id) {
  940. tmp = json_object_new_string(agent_id);
  941. freez(agent_id);
  942. } else
  943. tmp = NULL;
  944. json_object_object_add(msg, "claimed-id", tmp);
  945. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  946. tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
  947. json_object_object_add(msg, "cloud-url", tmp);
  948. tmp = json_object_new_boolean(aclk_connected);
  949. json_object_object_add(msg, "online", tmp);
  950. tmp = json_object_new_string("Protobuf");
  951. json_object_object_add(msg, "used-cloud-protocol", tmp);
  952. tmp = json_object_new_int(5);
  953. json_object_object_add(msg, "mqtt-version", tmp);
  954. tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
  955. json_object_object_add(msg, "received-app-layer-msgs", tmp);
  956. tmp = json_object_new_int(aclk_pubacks_per_conn);
  957. json_object_object_add(msg, "received-mqtt-pubacks", tmp);
  958. tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
  959. json_object_object_add(msg, "reconnect-count", tmp);
  960. json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
  961. json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
  962. json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
  963. json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL);
  964. tmp = NULL;
  965. if (!aclk_connected && last_backoff_value)
  966. tmp = json_object_new_double(last_backoff_value);
  967. json_object_object_add(msg, "last-backoff-value", tmp);
  968. tmp = json_object_new_boolean(aclk_disable_runtime);
  969. json_object_object_add(msg, "banned-by-cloud", tmp);
  970. grp = json_object_new_array();
  971. RRDHOST *host;
  972. rrd_rdlock();
  973. rrdhost_foreach_read(host) {
  974. json_object *nodeinstance = json_object_new_object();
  975. tmp = json_object_new_string(rrdhost_hostname(host));
  976. json_object_object_add(nodeinstance, "hostname", tmp);
  977. tmp = json_object_new_string(host->machine_guid);
  978. json_object_object_add(nodeinstance, "mguid", tmp);
  979. rrdhost_aclk_state_lock(host);
  980. if (host->aclk_state.claimed_id) {
  981. tmp = json_object_new_string(host->aclk_state.claimed_id);
  982. json_object_object_add(nodeinstance, "claimed_id", tmp);
  983. } else
  984. json_object_object_add(nodeinstance, "claimed_id", NULL);
  985. rrdhost_aclk_state_unlock(host);
  986. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  987. json_object_object_add(nodeinstance, "node-id", NULL);
  988. } else {
  989. char node_id[GUID_LEN + 1];
  990. uuid_unparse_lower(*host->node_id, node_id);
  991. tmp = json_object_new_string(node_id);
  992. json_object_object_add(nodeinstance, "node-id", tmp);
  993. }
  994. tmp = json_object_new_int(host->system_info->hops);
  995. json_object_object_add(nodeinstance, "streaming-hops", tmp);
  996. tmp = json_object_new_string(host == localhost ? "self" : "child");
  997. json_object_object_add(nodeinstance, "relationship", tmp);
  998. tmp = json_object_new_boolean((host->receiver || host == localhost));
  999. json_object_object_add(nodeinstance, "streaming-online", tmp);
  1000. tmp = json_object_new_object();
  1001. fill_alert_status_for_host_json(tmp, host);
  1002. json_object_object_add(nodeinstance, "alert-sync-status", tmp);
  1003. json_object_array_add(grp, nodeinstance);
  1004. }
  1005. rrd_unlock();
  1006. json_object_object_add(msg, "node-instances", grp);
  1007. char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
  1008. json_object_put(msg);
  1009. return str;
  1010. #endif /* ENABLE_ACLK */
  1011. }
  1012. void add_aclk_host_labels(void) {
  1013. DICTIONARY *labels = localhost->rrdlabels;
  1014. #ifdef ENABLE_ACLK
  1015. rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  1016. ACLK_PROXY_TYPE aclk_proxy;
  1017. char *proxy_str;
  1018. aclk_get_proxy(&aclk_proxy);
  1019. switch(aclk_proxy) {
  1020. case PROXY_TYPE_SOCKS5:
  1021. proxy_str = "SOCKS5";
  1022. break;
  1023. case PROXY_TYPE_HTTP:
  1024. proxy_str = "HTTP";
  1025. break;
  1026. default:
  1027. proxy_str = "none";
  1028. break;
  1029. }
  1030. rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO);
  1031. rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
  1032. rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  1033. #else
  1034. rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  1035. #endif
  1036. }
  1037. void aclk_queue_node_info(RRDHOST *host) {
  1038. struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker;
  1039. if (likely(wc)) {
  1040. wc->node_info_send = 1;
  1041. }
  1042. }