aclk.c 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk.h"
  3. #ifdef ENABLE_ACLK
  4. #include "aclk_stats.h"
  5. #include "mqtt_wss_client.h"
  6. #include "aclk_otp.h"
  7. #include "aclk_tx_msgs.h"
  8. #include "aclk_query.h"
  9. #include "aclk_query_queue.h"
  10. #include "aclk_util.h"
  11. #include "aclk_rx_msgs.h"
  12. #include "https_client.h"
  13. #include "schema-wrappers/schema_wrappers.h"
  14. #include "aclk_capas.h"
  15. #include "aclk_proxy.h"
  16. #ifdef ACLK_LOG_CONVERSATION_DIR
  17. #include <sys/types.h>
  18. #include <sys/stat.h>
  19. #include <fcntl.h>
  20. #endif
  21. #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
  22. #endif /* ENABLE_ACLK */
  23. int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
  24. int aclk_rcvd_cloud_msgs = 0;
  25. int aclk_connection_counter = 0;
  26. int disconnect_req = 0;
  27. int aclk_connected = 0;
  28. int aclk_ctx_based = 0;
  29. int aclk_disable_runtime = 0;
  30. int aclk_stats_enabled;
  31. int aclk_kill_link = 0;
  32. usec_t aclk_session_us = 0;
  33. time_t aclk_session_sec = 0;
  34. time_t last_conn_time_mqtt = 0;
  35. time_t last_conn_time_appl = 0;
  36. time_t last_disconnect_time = 0;
  37. time_t next_connection_attempt = 0;
  38. float last_backoff_value = 0;
  39. time_t aclk_block_until = 0;
  40. #ifdef ENABLE_ACLK
  41. mqtt_wss_client mqttwss_client;
  42. netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
  43. #define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
  44. #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
  45. struct aclk_shared_state aclk_shared_state = {
  46. .mqtt_shutdown_msg_id = -1,
  47. .mqtt_shutdown_msg_rcvd = 0
  48. };
  49. #ifdef MQTT_WSS_DEBUG
  50. #include <openssl/ssl.h>
  51. #define DEFAULT_SSKEYLOGFILE_NAME "SSLKEYLOGFILE"
  52. const char *ssl_log_filename = NULL;
  53. FILE *ssl_log_file = NULL;
  54. static void aclk_ssl_keylog_cb(const SSL *ssl, const char *line)
  55. {
  56. (void)ssl;
  57. if (!ssl_log_file)
  58. ssl_log_file = fopen(ssl_log_filename, "a");
  59. if (!ssl_log_file) {
  60. error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename);
  61. return;
  62. }
  63. fputs(line, ssl_log_file);
  64. putc('\n', ssl_log_file);
  65. fflush(ssl_log_file);
  66. }
  67. #endif
  68. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  69. OSSL_DECODER_CTX *aclk_dctx = NULL;
  70. EVP_PKEY *aclk_private_key = NULL;
  71. #else
  72. static RSA *aclk_private_key = NULL;
  73. #endif
  74. static int load_private_key()
  75. {
  76. if (aclk_private_key != NULL) {
  77. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  78. EVP_PKEY_free(aclk_private_key);
  79. if (aclk_dctx)
  80. OSSL_DECODER_CTX_free(aclk_dctx);
  81. aclk_dctx = NULL;
  82. #else
  83. RSA_free(aclk_private_key);
  84. #endif
  85. }
  86. aclk_private_key = NULL;
  87. char filename[FILENAME_MAX + 1];
  88. snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
  89. long bytes_read;
  90. char *private_key = read_by_filename(filename, &bytes_read);
  91. if (!private_key) {
  92. error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  93. return 1;
  94. }
  95. debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  96. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  97. if (key_bio==NULL) {
  98. error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  99. goto biofailed;
  100. }
  101. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  102. aclk_dctx = OSSL_DECODER_CTX_new_for_pkey(&aclk_private_key, "PEM", NULL,
  103. "RSA",
  104. OSSL_KEYMGMT_SELECT_PRIVATE_KEY,
  105. NULL, NULL);
  106. if (!aclk_dctx) {
  107. error("Loading private key (from claiming) failed - no OpenSSL Decoders found");
  108. goto biofailed;
  109. }
  110. // this is necesseary to avoid RSA key with wrong size
  111. if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) {
  112. error("Decoding private key (from claiming) failed - invalid format.");
  113. goto biofailed;
  114. }
  115. #else
  116. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  117. #endif
  118. BIO_free(key_bio);
  119. if (aclk_private_key!=NULL)
  120. {
  121. freez(private_key);
  122. return 0;
  123. }
  124. char err[512];
  125. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  126. error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  127. biofailed:
  128. freez(private_key);
  129. return 1;
  130. }
  131. static int wait_till_cloud_enabled()
  132. {
  133. info("Waiting for Cloud to be enabled");
  134. while (!netdata_cloud_setting) {
  135. sleep_usec(USEC_PER_SEC * 1);
  136. if (!service_running(SERVICE_ACLK))
  137. return 1;
  138. }
  139. return 0;
  140. }
  141. /**
  142. * Will block until agent is claimed. Returns only if agent claimed
  143. * or if agent needs to shutdown.
  144. *
  145. * @return `0` if agent has been claimed,
  146. * `1` if interrupted due to agent shutting down
  147. */
  148. static int wait_till_agent_claimed(void)
  149. {
  150. //TODO prevent malloc and freez
  151. char *agent_id = get_agent_claimid();
  152. while (likely(!agent_id)) {
  153. sleep_usec(USEC_PER_SEC * 1);
  154. if (!service_running(SERVICE_ACLK))
  155. return 1;
  156. agent_id = get_agent_claimid();
  157. }
  158. freez(agent_id);
  159. return 0;
  160. }
  161. /**
  162. * Checks everything is ready for connection
  163. * agent claimed, cloud url set and private key available
  164. *
  165. * @param aclk_hostname points to location where string pointer to hostname will be set
  166. * @param aclk_port port to int where port will be saved
  167. *
  168. * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated
  169. */
  170. static int wait_till_agent_claim_ready()
  171. {
  172. url_t url;
  173. while (service_running(SERVICE_ACLK)) {
  174. if (wait_till_agent_claimed())
  175. return 1;
  176. // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
  177. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
  178. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  179. if (cloud_base_url == NULL) {
  180. error("Do not move the cloud base url out of post_conf_load!!");
  181. return 1;
  182. }
  183. // We just check configuration is valid here
  184. // TODO make it without malloc/free
  185. memset(&url, 0, sizeof(url_t));
  186. if (url_parse(cloud_base_url, &url)) {
  187. error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
  188. url_t_destroy(&url);
  189. sleep(5);
  190. continue;
  191. }
  192. url_t_destroy(&url);
  193. if (!load_private_key())
  194. return 0;
  195. sleep(5);
  196. }
  197. return 1;
  198. }
  199. void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
  200. {
  201. switch(log_type) {
  202. case MQTT_WSS_LOG_ERROR:
  203. case MQTT_WSS_LOG_FATAL:
  204. case MQTT_WSS_LOG_WARN:
  205. error_report("%s", str);
  206. return;
  207. case MQTT_WSS_LOG_INFO:
  208. info("%s", str);
  209. return;
  210. case MQTT_WSS_LOG_DEBUG:
  211. debug(D_ACLK, "%s", str);
  212. return;
  213. default:
  214. error("Unknown log type from mqtt_wss");
  215. }
  216. }
  217. static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
  218. {
  219. UNUSED(qos);
  220. aclk_rcvd_cloud_msgs++;
  221. debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
  222. if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
  223. error("Link is shutting down. Ignoring incoming message.");
  224. return;
  225. }
  226. const char *msgtype = strrchr(topic, '/');
  227. if (unlikely(!msgtype)) {
  228. error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
  229. return;
  230. }
  231. msgtype++;
  232. if (unlikely(!*msgtype)) {
  233. error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
  234. return;
  235. }
  236. #ifdef ACLK_LOG_CONVERSATION_DIR
  237. #define FN_MAX_LEN 512
  238. char filename[FN_MAX_LEN];
  239. int logfd;
  240. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
  241. logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
  242. if(logfd < 0)
  243. error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
  244. write(logfd, msg, msglen);
  245. close(logfd);
  246. #endif
  247. aclk_handle_new_cloud_msg(msgtype, msg, msglen, topic);
  248. }
  249. static void puback_callback(uint16_t packet_id)
  250. {
  251. if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
  252. last_conn_time_appl = now_realtime_sec();
  253. aclk_tbeb_reset();
  254. }
  255. #ifdef NETDATA_INTERNAL_CHECKS
  256. aclk_stats_msg_puback(packet_id);
  257. #endif
  258. if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
  259. info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
  260. aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
  261. }
  262. }
  263. static int read_query_thread_count()
  264. {
  265. int threads = MIN(get_netdata_cpus()/2, 6);
  266. threads = MAX(threads, 2);
  267. threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  268. if(threads < 1) {
  269. error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
  270. threads = 1;
  271. config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  272. }
  273. return threads;
  274. }
  275. void aclk_graceful_disconnect(mqtt_wss_client client);
  276. /* Keeps connection alive and handles all network communications.
  277. * Returns on error or when netdata is shutting down.
  278. * @param client instance of mqtt_wss_client
  279. * @returns 0 - Netdata Exits
  280. * >0 - Error happened. Reconnect and start over.
  281. */
  282. static int handle_connection(mqtt_wss_client client)
  283. {
  284. time_t last_periodic_query_wakeup = now_monotonic_sec();
  285. while (service_running(SERVICE_ACLK)) {
  286. // timeout 1000 to check at least once a second
  287. // for netdata_exit
  288. if (mqtt_wss_service(client, 1000) < 0){
  289. error_report("Connection Error or Dropped");
  290. return 1;
  291. }
  292. if (disconnect_req || aclk_kill_link) {
  293. info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
  294. disconnect_req ? "true" : "false",
  295. aclk_kill_link ? "true" : "false");
  296. disconnect_req = 0;
  297. aclk_kill_link = 0;
  298. aclk_graceful_disconnect(client);
  299. aclk_queue_unlock();
  300. aclk_shared_state.mqtt_shutdown_msg_id = -1;
  301. aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
  302. return 1;
  303. }
  304. // mqtt_wss_service will return faster than in one second
  305. // if there is enough work to do
  306. time_t now = now_monotonic_sec();
  307. if (last_periodic_query_wakeup < now) {
  308. // wake up at least one Query Thread at least
  309. // once per second
  310. last_periodic_query_wakeup = now;
  311. QUERY_THREAD_WAKEUP;
  312. }
  313. }
  314. return 0;
  315. }
  316. static inline void mqtt_connected_actions(mqtt_wss_client client)
  317. {
  318. char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
  319. if (!topic)
  320. error("Unable to fetch topic for COMMAND (to subscribe)");
  321. else
  322. mqtt_wss_subscribe(client, topic, 1);
  323. topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
  324. if (!topic)
  325. error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
  326. else
  327. mqtt_wss_subscribe(client, topic, 1);
  328. aclk_stats_upd_online(1);
  329. aclk_connected = 1;
  330. aclk_pubacks_per_conn = 0;
  331. aclk_rcvd_cloud_msgs = 0;
  332. aclk_connection_counter++;
  333. aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER;
  334. while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL)
  335. mqtt_wss_set_topic_alias(client, topic);
  336. aclk_send_agent_connection_update(client, 1);
  337. }
  338. void aclk_graceful_disconnect(mqtt_wss_client client)
  339. {
  340. info("Preparing to gracefully shutdown ACLK connection");
  341. aclk_queue_lock();
  342. aclk_queue_flush();
  343. aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
  344. time_t t = now_monotonic_sec();
  345. while (!mqtt_wss_service(client, 100)) {
  346. if (now_monotonic_sec() - t >= 2) {
  347. error("Wasn't able to gracefully shutdown ACLK in time!");
  348. break;
  349. }
  350. if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
  351. info("MQTT App Layer `disconnect` message sent successfully");
  352. break;
  353. }
  354. }
  355. info("ACLK link is down");
  356. log_access("ACLK DISCONNECTED");
  357. aclk_stats_upd_online(0);
  358. last_disconnect_time = now_realtime_sec();
  359. aclk_connected = 0;
  360. info("Attempting to gracefully shutdown the MQTT/WSS connection");
  361. mqtt_wss_disconnect(client, 1000);
  362. }
  363. static unsigned long aclk_reconnect_delay() {
  364. unsigned long recon_delay;
  365. time_t now;
  366. if (aclk_disable_runtime) {
  367. aclk_tbeb_reset();
  368. return 60 * MSEC_PER_SEC;
  369. }
  370. now = now_monotonic_sec();
  371. if (aclk_block_until) {
  372. if (now < aclk_block_until) {
  373. recon_delay = aclk_block_until - now;
  374. recon_delay *= MSEC_PER_SEC;
  375. aclk_block_until = 0;
  376. aclk_tbeb_reset();
  377. return recon_delay;
  378. }
  379. aclk_block_until = 0;
  380. }
  381. if (!aclk_env || !aclk_env->backoff.base)
  382. return aclk_tbeb_delay(0, 2, 0, 1024);
  383. return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
  384. }
  385. /* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled
  386. * @return 0 - Go ahead and connect (delay expired)
  387. * 1 - netdata_exit
  388. */
  389. #define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
  390. static int aclk_block_till_recon_allowed() {
  391. unsigned long recon_delay = aclk_reconnect_delay();
  392. next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
  393. last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
  394. info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
  395. // we want to wake up from time to time to check netdata_exit
  396. while (recon_delay)
  397. {
  398. if (!service_running(SERVICE_ACLK))
  399. return 1;
  400. if (recon_delay > NETDATA_EXIT_POLL_MS) {
  401. sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
  402. recon_delay -= NETDATA_EXIT_POLL_MS;
  403. continue;
  404. }
  405. sleep_usec(recon_delay * USEC_PER_MS);
  406. recon_delay = 0;
  407. }
  408. return !service_running(SERVICE_ACLK);
  409. }
  410. #ifndef ACLK_DISABLE_CHALLENGE
  411. /* Cloud returns transport list ordered with highest
  412. * priority first. This function selects highest prio
  413. * transport that we can actually use (support)
  414. */
  415. static int aclk_get_transport_idx(aclk_env_t *env) {
  416. for (size_t i = 0; i < env->transport_count; i++) {
  417. // currently we support only MQTT 5
  418. // therefore select first transport that matches
  419. if (env->transports[i]->type == ACLK_TRP_MQTT_5) {
  420. return i;
  421. }
  422. }
  423. return -1;
  424. }
  425. #endif
  426. /* Attempts to make a connection to MQTT broker over WSS
  427. * @param client instance of mqtt_wss_client
  428. * @return 0 - Successful Connection,
  429. * <0 - Irrecoverable Error -> Kill ACLK,
  430. * >0 - netdata_exit
  431. */
  432. #define CLOUD_BASE_URL_READ_RETRY 30
  433. #ifdef ACLK_SSL_ALLOW_SELF_SIGNED
  434. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED
  435. #else
  436. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL
  437. #endif
  438. static int aclk_attempt_to_connect(mqtt_wss_client client)
  439. {
  440. int ret;
  441. url_t base_url;
  442. #ifndef ACLK_DISABLE_CHALLENGE
  443. url_t auth_url;
  444. url_t mqtt_url;
  445. #endif
  446. while (service_running(SERVICE_ACLK)) {
  447. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  448. if (cloud_base_url == NULL) {
  449. error_report("Do not move the cloud base url out of post_conf_load!!");
  450. return -1;
  451. }
  452. if (aclk_block_till_recon_allowed())
  453. return 1;
  454. info("Attempting connection now");
  455. memset(&base_url, 0, sizeof(url_t));
  456. if (url_parse(cloud_base_url, &base_url)) {
  457. error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
  458. sleep(CLOUD_BASE_URL_READ_RETRY);
  459. url_t_destroy(&base_url);
  460. continue;
  461. }
  462. struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
  463. aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, (char**)&proxy_conf.username, (char**)&proxy_conf.password, &proxy_conf.type);
  464. struct mqtt_connect_params mqtt_conn_params = {
  465. .clientid = "anon",
  466. .username = "anon",
  467. .password = "anon",
  468. .will_topic = "lwt",
  469. .will_msg = NULL,
  470. .will_flags = MQTT_WSS_PUB_QOS2,
  471. .keep_alive = 60,
  472. .drop_on_publish_fail = 1
  473. };
  474. #ifndef ACLK_DISABLE_CHALLENGE
  475. if (aclk_env) {
  476. aclk_env_t_destroy(aclk_env);
  477. freez(aclk_env);
  478. }
  479. aclk_env = callocz(1, sizeof(aclk_env_t));
  480. ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
  481. url_t_destroy(&base_url);
  482. if (ret) {
  483. error_report("Failed to Get ACLK environment");
  484. // delay handled by aclk_block_till_recon_allowed
  485. continue;
  486. }
  487. if (!service_running(SERVICE_ACLK))
  488. return 1;
  489. if (aclk_env->encoding != ACLK_ENC_PROTO) {
  490. error_report("This agent can only use the new cloud protocol but cloud requested old one.");
  491. continue;
  492. }
  493. if (!aclk_env_has_capa("proto")) {
  494. error_report("Can't use encoding=proto without at least \"proto\" capability.");
  495. continue;
  496. }
  497. info("New ACLK protobuf protocol negotiated successfully (/env response).");
  498. memset(&auth_url, 0, sizeof(url_t));
  499. if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
  500. error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
  501. url_t_destroy(&auth_url);
  502. continue;
  503. }
  504. ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
  505. url_t_destroy(&auth_url);
  506. if (ret) {
  507. error_report("Error passing Challenge/Response to get OTP");
  508. continue;
  509. }
  510. // aclk_get_topic moved here as during OTP we
  511. // generate the topic cache
  512. mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
  513. if (!mqtt_conn_params.will_topic) {
  514. error_report("Couldn't get LWT topic. Will not send LWT.");
  515. continue;
  516. }
  517. // Do the MQTT connection
  518. ret = aclk_get_transport_idx(aclk_env);
  519. if (ret < 0) {
  520. error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
  521. continue;
  522. }
  523. memset(&mqtt_url, 0, sizeof(url_t));
  524. if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
  525. error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
  526. url_t_destroy(&mqtt_url);
  527. continue;
  528. }
  529. #endif
  530. aclk_session_newarch = now_realtime_usec();
  531. aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
  532. aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
  533. mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
  534. #ifdef ACLK_DISABLE_CHALLENGE
  535. ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  536. url_t_destroy(&base_url);
  537. #else
  538. ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  539. url_t_destroy(&mqtt_url);
  540. freez((char*)mqtt_conn_params.clientid);
  541. freez((char*)mqtt_conn_params.password);
  542. freez((char*)mqtt_conn_params.username);
  543. #endif
  544. freez((char*)mqtt_conn_params.will_msg);
  545. freez((char*)proxy_conf.host);
  546. freez((char*)proxy_conf.username);
  547. freez((char*)proxy_conf.password);
  548. if (!ret) {
  549. last_conn_time_mqtt = now_realtime_sec();
  550. info("ACLK connection successfully established");
  551. log_access("ACLK CONNECTED");
  552. mqtt_connected_actions(client);
  553. return 0;
  554. }
  555. error_report("Connect failed");
  556. }
  557. return 1;
  558. }
  559. /**
  560. * Main agent cloud link thread
  561. *
  562. * This thread will simply call the main event loop that handles
  563. * pending requests - both inbound and outbound
  564. *
  565. * @param ptr is a pointer to the netdata_static_thread structure.
  566. *
  567. * @return It always returns NULL
  568. */
  569. void *aclk_main(void *ptr)
  570. {
  571. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  572. struct aclk_stats_thread *stats_thread = NULL;
  573. struct aclk_query_threads query_threads;
  574. query_threads.thread_list = NULL;
  575. ACLK_PROXY_TYPE proxy_type;
  576. aclk_get_proxy(&proxy_type);
  577. if (proxy_type == PROXY_TYPE_SOCKS5) {
  578. error("SOCKS5 proxy is not supported by ACLK-NG yet.");
  579. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  580. return NULL;
  581. }
  582. unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers();
  583. // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
  584. // as it must notify the far end that it shutdown gracefully and avoid the LWT.
  585. netdata_thread_disable_cancelability();
  586. #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
  587. info("Killing ACLK thread -> cloud functionality has been disabled");
  588. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  589. return NULL;
  590. #endif
  591. query_threads.count = read_query_thread_count();
  592. if (wait_till_cloud_enabled())
  593. goto exit;
  594. if (wait_till_agent_claim_ready())
  595. goto exit;
  596. if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
  597. error("Couldn't initialize MQTT_WSS network library");
  598. goto exit;
  599. }
  600. #ifdef MQTT_WSS_DEBUG
  601. size_t default_ssl_log_filename_size = strlen(netdata_configured_log_dir) + strlen(DEFAULT_SSKEYLOGFILE_NAME) + 2;
  602. char *default_ssl_log_filename = mallocz(default_ssl_log_filename_size);
  603. snprintfz(default_ssl_log_filename, default_ssl_log_filename_size, "%s/%s", netdata_configured_log_dir, DEFAULT_SSKEYLOGFILE_NAME);
  604. ssl_log_filename = config_get(CONFIG_SECTION_CLOUD, "aclk ssl keylog file", default_ssl_log_filename);
  605. freez(default_ssl_log_filename);
  606. if (ssl_log_filename) {
  607. error_report("SSLKEYLOGFILE active (path:\"%s\")!", ssl_log_filename);
  608. mqtt_wss_set_SSL_CTX_keylog_cb(mqttwss_client, aclk_ssl_keylog_cb);
  609. }
  610. #endif
  611. // Enable MQTT buffer growth if necessary
  612. // e.g. old cloud architecture clients with huge nodes
  613. // that send JSON payloads of 10 MB as single messages
  614. mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
  615. aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled);
  616. if (aclk_stats_enabled) {
  617. stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
  618. stats_thread->thread = mallocz(sizeof(netdata_thread_t));
  619. stats_thread->query_thread_count = query_threads.count;
  620. stats_thread->client = mqttwss_client;
  621. aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
  622. netdata_thread_create(
  623. stats_thread->thread, "ACLK_STATS", NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
  624. stats_thread);
  625. }
  626. // Keep reconnecting and talking until our time has come
  627. // and the Grim Reaper (netdata_exit) calls
  628. do {
  629. if (aclk_attempt_to_connect(mqttwss_client))
  630. goto exit_full;
  631. if (unlikely(!query_threads.thread_list))
  632. aclk_query_threads_start(&query_threads, mqttwss_client);
  633. if (handle_connection(mqttwss_client)) {
  634. aclk_stats_upd_online(0);
  635. last_disconnect_time = now_realtime_sec();
  636. aclk_connected = 0;
  637. log_access("ACLK DISCONNECTED");
  638. }
  639. } while (service_running(SERVICE_ACLK));
  640. aclk_graceful_disconnect(mqttwss_client);
  641. #ifdef MQTT_WSS_DEBUG
  642. if (ssl_log_file)
  643. fclose(ssl_log_file);
  644. #endif
  645. exit_full:
  646. // Tear Down
  647. QUERY_THREAD_WAKEUP_ALL;
  648. aclk_query_threads_cleanup(&query_threads);
  649. if (aclk_stats_enabled) {
  650. netdata_thread_join(*stats_thread->thread, NULL);
  651. aclk_stats_thread_cleanup();
  652. freez(stats_thread->thread);
  653. freez(stats_thread);
  654. }
  655. free_topic_cache();
  656. mqtt_wss_destroy(mqttwss_client);
  657. exit:
  658. if (aclk_env) {
  659. aclk_env_t_destroy(aclk_env);
  660. freez(aclk_env);
  661. }
  662. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  663. return NULL;
  664. }
  665. void aclk_host_state_update(RRDHOST *host, int cmd)
  666. {
  667. uuid_t node_id;
  668. int ret = 0;
  669. if (!aclk_connected)
  670. return;
  671. if (host->node_id && !uuid_is_null(*host->node_id)) {
  672. uuid_copy(node_id, *host->node_id);
  673. }
  674. else {
  675. ret = get_node_id(&host->host_uuid, &node_id);
  676. if (ret > 0) {
  677. // this means we were not able to check if node_id already present
  678. error("Unable to check for node_id. Ignoring the host state update.");
  679. return;
  680. }
  681. if (ret < 0) {
  682. // node_id not found
  683. aclk_query_t create_query;
  684. create_query = aclk_query_new(REGISTER_NODE);
  685. rrdhost_aclk_state_lock(localhost);
  686. node_instance_creation_t node_instance_creation = {
  687. .claim_id = localhost->aclk_state.claimed_id,
  688. .hops = host->system_info->hops,
  689. .hostname = rrdhost_hostname(host),
  690. .machine_guid = host->machine_guid};
  691. create_query->data.bin_payload.payload =
  692. generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
  693. rrdhost_aclk_state_unlock(localhost);
  694. create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
  695. create_query->data.bin_payload.msg_name = "CreateNodeInstance";
  696. info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
  697. aclk_queue_query(create_query);
  698. return;
  699. }
  700. }
  701. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  702. node_instance_connection_t node_state_update = {
  703. .hops = host->system_info->hops,
  704. .live = cmd,
  705. .queryable = 1,
  706. .session_id = aclk_session_newarch
  707. };
  708. node_state_update.node_id = mallocz(UUID_STR_LEN);
  709. uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
  710. node_state_update.capabilities = aclk_get_agent_capas();
  711. rrdhost_aclk_state_lock(localhost);
  712. node_state_update.claim_id = localhost->aclk_state.claimed_id;
  713. query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
  714. rrdhost_aclk_state_unlock(localhost);
  715. info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
  716. host->system_info->hops);
  717. freez((void*)node_state_update.node_id);
  718. query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
  719. query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
  720. aclk_queue_query(query);
  721. }
  722. void aclk_send_node_instances()
  723. {
  724. struct node_instance_list *list_head = get_node_list();
  725. struct node_instance_list *list = list_head;
  726. if (unlikely(!list)) {
  727. error_report("Failure to get_node_list from DB!");
  728. return;
  729. }
  730. while (!uuid_is_null(list->host_id)) {
  731. if (!uuid_is_null(list->node_id)) {
  732. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  733. node_instance_connection_t node_state_update = {
  734. .live = list->live,
  735. .hops = list->hops,
  736. .queryable = 1,
  737. .session_id = aclk_session_newarch
  738. };
  739. node_state_update.node_id = mallocz(UUID_STR_LEN);
  740. uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
  741. char host_id[UUID_STR_LEN];
  742. uuid_unparse_lower(list->host_id, host_id);
  743. RRDHOST *host = rrdhost_find_by_guid(host_id);
  744. if (unlikely(!host)) {
  745. freez((void*)node_state_update.node_id);
  746. freez(query);
  747. continue;
  748. }
  749. node_state_update.capabilities = aclk_get_node_instance_capas(host);
  750. rrdhost_aclk_state_lock(localhost);
  751. node_state_update.claim_id = localhost->aclk_state.claimed_id;
  752. query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
  753. rrdhost_aclk_state_unlock(localhost);
  754. info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
  755. list->live,
  756. list->hops);
  757. freez((void*)node_state_update.capabilities);
  758. freez((void*)node_state_update.node_id);
  759. query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
  760. query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
  761. aclk_queue_query(query);
  762. } else {
  763. aclk_query_t create_query;
  764. create_query = aclk_query_new(REGISTER_NODE);
  765. node_instance_creation_t node_instance_creation = {
  766. .hops = list->hops,
  767. .hostname = list->hostname,
  768. };
  769. node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
  770. uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
  771. create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
  772. create_query->data.bin_payload.msg_name = "CreateNodeInstance";
  773. rrdhost_aclk_state_lock(localhost);
  774. node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
  775. create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
  776. rrdhost_aclk_state_unlock(localhost);
  777. info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
  778. list->hops);
  779. freez((void *)node_instance_creation.machine_guid);
  780. aclk_queue_query(create_query);
  781. }
  782. freez(list->hostname);
  783. list++;
  784. }
  785. freez(list_head);
  786. }
  787. void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
  788. {
  789. aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
  790. }
  791. static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
  792. {
  793. struct proto_alert_status status;
  794. memset(&status, 0, sizeof(status));
  795. if (get_proto_alert_status(host, &status)) {
  796. buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
  797. return;
  798. }
  799. buffer_sprintf(wb,
  800. "\n\t\tUpdates: %d"
  801. "\n\t\tPending Min Seq ID: %"PRIu64
  802. "\n\t\tPending Max Seq ID: %"PRIu64
  803. "\n\t\tLast Submitted Seq ID: %"PRIu64,
  804. status.alert_updates,
  805. status.pending_min_sequence_id,
  806. status.pending_max_sequence_id,
  807. status.last_submitted_sequence_id
  808. );
  809. }
  810. #endif /* ENABLE_ACLK */
  811. char *aclk_state(void)
  812. {
  813. #ifndef ENABLE_ACLK
  814. return strdupz("ACLK Available: No");
  815. #else
  816. BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk);
  817. struct tm *tmptr, tmbuf;
  818. char *ret;
  819. buffer_strcat(wb,
  820. "ACLK Available: Yes\n"
  821. "ACLK Version: 2\n"
  822. "Protocols Supported: Protobuf\n"
  823. );
  824. buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5);
  825. char *agent_id = get_agent_claimid();
  826. if (agent_id == NULL)
  827. buffer_strcat(wb, "No\n");
  828. else {
  829. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  830. buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
  831. freez(agent_id);
  832. }
  833. buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
  834. if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
  835. char timebuf[26];
  836. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  837. buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
  838. }
  839. if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
  840. char timebuf[26];
  841. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  842. buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
  843. }
  844. if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
  845. char timebuf[26];
  846. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  847. buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
  848. }
  849. if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
  850. char timebuf[26];
  851. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  852. buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
  853. }
  854. if (aclk_connected) {
  855. buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
  856. RRDHOST *host;
  857. rrd_rdlock();
  858. rrdhost_foreach_read(host) {
  859. buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host));
  860. buffer_strcat(wb, "\tClaimed ID: ");
  861. rrdhost_aclk_state_lock(host);
  862. if (host->aclk_state.claimed_id)
  863. buffer_strcat(wb, host->aclk_state.claimed_id);
  864. else
  865. buffer_strcat(wb, "null");
  866. rrdhost_aclk_state_unlock(host);
  867. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  868. buffer_strcat(wb, "\n\tNode ID: null\n");
  869. } else {
  870. char node_id[GUID_LEN + 1];
  871. uuid_unparse_lower(*host->node_id, node_id);
  872. buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
  873. }
  874. buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
  875. if (host != localhost)
  876. buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false");
  877. buffer_strcat(wb, "\n\tAlert Streaming Status:");
  878. fill_alert_status_for_host(wb, host);
  879. }
  880. rrd_unlock();
  881. }
  882. ret = strdupz(buffer_tostring(wb));
  883. buffer_free(wb);
  884. return ret;
  885. #endif /* ENABLE_ACLK */
  886. }
  887. #ifdef ENABLE_ACLK
  888. static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
  889. {
  890. struct proto_alert_status status;
  891. memset(&status, 0, sizeof(status));
  892. if (get_proto_alert_status(host, &status))
  893. return;
  894. json_object *tmp = json_object_new_int(status.alert_updates);
  895. json_object_object_add(obj, "updates", tmp);
  896. tmp = json_object_new_int(status.pending_min_sequence_id);
  897. json_object_object_add(obj, "pending-min-seq-id", tmp);
  898. tmp = json_object_new_int(status.pending_max_sequence_id);
  899. json_object_object_add(obj, "pending-max-seq-id", tmp);
  900. tmp = json_object_new_int(status.last_submitted_sequence_id);
  901. json_object_object_add(obj, "last-submitted-seq-id", tmp);
  902. }
  903. static json_object *timestamp_to_json(const time_t *t)
  904. {
  905. struct tm *tmptr, tmbuf;
  906. if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
  907. char timebuf[26];
  908. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  909. return json_object_new_string(timebuf);
  910. }
  911. return NULL;
  912. }
  913. #endif /* ENABLE_ACLK */
  914. char *aclk_state_json(void)
  915. {
  916. #ifndef ENABLE_ACLK
  917. return strdupz("{\"aclk-available\":false}");
  918. #else
  919. json_object *tmp, *grp, *msg = json_object_new_object();
  920. tmp = json_object_new_boolean(1);
  921. json_object_object_add(msg, "aclk-available", tmp);
  922. tmp = json_object_new_int(2);
  923. json_object_object_add(msg, "aclk-version", tmp);
  924. grp = json_object_new_array();
  925. tmp = json_object_new_string("Protobuf");
  926. json_object_array_add(grp, tmp);
  927. json_object_object_add(msg, "protocols-supported", grp);
  928. char *agent_id = get_agent_claimid();
  929. tmp = json_object_new_boolean(agent_id != NULL);
  930. json_object_object_add(msg, "agent-claimed", tmp);
  931. if (agent_id) {
  932. tmp = json_object_new_string(agent_id);
  933. freez(agent_id);
  934. } else
  935. tmp = NULL;
  936. json_object_object_add(msg, "claimed-id", tmp);
  937. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  938. tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
  939. json_object_object_add(msg, "cloud-url", tmp);
  940. tmp = json_object_new_boolean(aclk_connected);
  941. json_object_object_add(msg, "online", tmp);
  942. tmp = json_object_new_string("Protobuf");
  943. json_object_object_add(msg, "used-cloud-protocol", tmp);
  944. tmp = json_object_new_int(5);
  945. json_object_object_add(msg, "mqtt-version", tmp);
  946. tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
  947. json_object_object_add(msg, "received-app-layer-msgs", tmp);
  948. tmp = json_object_new_int(aclk_pubacks_per_conn);
  949. json_object_object_add(msg, "received-mqtt-pubacks", tmp);
  950. tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
  951. json_object_object_add(msg, "reconnect-count", tmp);
  952. json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
  953. json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
  954. json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
  955. json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL);
  956. tmp = NULL;
  957. if (!aclk_connected && last_backoff_value)
  958. tmp = json_object_new_double(last_backoff_value);
  959. json_object_object_add(msg, "last-backoff-value", tmp);
  960. tmp = json_object_new_boolean(aclk_disable_runtime);
  961. json_object_object_add(msg, "banned-by-cloud", tmp);
  962. grp = json_object_new_array();
  963. RRDHOST *host;
  964. rrd_rdlock();
  965. rrdhost_foreach_read(host) {
  966. json_object *nodeinstance = json_object_new_object();
  967. tmp = json_object_new_string(rrdhost_hostname(host));
  968. json_object_object_add(nodeinstance, "hostname", tmp);
  969. tmp = json_object_new_string(host->machine_guid);
  970. json_object_object_add(nodeinstance, "mguid", tmp);
  971. rrdhost_aclk_state_lock(host);
  972. if (host->aclk_state.claimed_id) {
  973. tmp = json_object_new_string(host->aclk_state.claimed_id);
  974. json_object_object_add(nodeinstance, "claimed_id", tmp);
  975. } else
  976. json_object_object_add(nodeinstance, "claimed_id", NULL);
  977. rrdhost_aclk_state_unlock(host);
  978. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  979. json_object_object_add(nodeinstance, "node-id", NULL);
  980. } else {
  981. char node_id[GUID_LEN + 1];
  982. uuid_unparse_lower(*host->node_id, node_id);
  983. tmp = json_object_new_string(node_id);
  984. json_object_object_add(nodeinstance, "node-id", tmp);
  985. }
  986. tmp = json_object_new_int(host->system_info->hops);
  987. json_object_object_add(nodeinstance, "streaming-hops", tmp);
  988. tmp = json_object_new_string(host == localhost ? "self" : "child");
  989. json_object_object_add(nodeinstance, "relationship", tmp);
  990. tmp = json_object_new_boolean((host->receiver || host == localhost));
  991. json_object_object_add(nodeinstance, "streaming-online", tmp);
  992. tmp = json_object_new_object();
  993. fill_alert_status_for_host_json(tmp, host);
  994. json_object_object_add(nodeinstance, "alert-sync-status", tmp);
  995. json_object_array_add(grp, nodeinstance);
  996. }
  997. rrd_unlock();
  998. json_object_object_add(msg, "node-instances", grp);
  999. char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
  1000. json_object_put(msg);
  1001. return str;
  1002. #endif /* ENABLE_ACLK */
  1003. }
  1004. void add_aclk_host_labels(void) {
  1005. DICTIONARY *labels = localhost->rrdlabels;
  1006. #ifdef ENABLE_ACLK
  1007. rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  1008. ACLK_PROXY_TYPE aclk_proxy;
  1009. char *proxy_str;
  1010. aclk_get_proxy(&aclk_proxy);
  1011. switch(aclk_proxy) {
  1012. case PROXY_TYPE_SOCKS5:
  1013. proxy_str = "SOCKS5";
  1014. break;
  1015. case PROXY_TYPE_HTTP:
  1016. proxy_str = "HTTP";
  1017. break;
  1018. default:
  1019. proxy_str = "none";
  1020. break;
  1021. }
  1022. rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO);
  1023. rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
  1024. rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  1025. #else
  1026. rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  1027. #endif
  1028. }
  1029. void aclk_queue_node_info(RRDHOST *host, bool immediate)
  1030. {
  1031. struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config;
  1032. if (likely(wc))
  1033. wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec();
  1034. }