aclk.c 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk.h"
  3. #ifdef ENABLE_ACLK
  4. #include "aclk_stats.h"
  5. #include "mqtt_wss_client.h"
  6. #include "aclk_otp.h"
  7. #include "aclk_tx_msgs.h"
  8. #include "aclk_query.h"
  9. #include "aclk_query_queue.h"
  10. #include "aclk_util.h"
  11. #include "aclk_rx_msgs.h"
  12. #include "https_client.h"
  13. #include "schema-wrappers/schema_wrappers.h"
  14. #include "aclk_proxy.h"
  15. #ifdef ACLK_LOG_CONVERSATION_DIR
  16. #include <sys/types.h>
  17. #include <sys/stat.h>
  18. #include <fcntl.h>
  19. #endif
  20. #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
  21. #endif /* ENABLE_ACLK */
  22. int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
  23. int aclk_rcvd_cloud_msgs = 0;
  24. int aclk_connection_counter = 0;
  25. int disconnect_req = 0;
  26. int aclk_connected = 0;
  27. int aclk_ctx_based = 0;
  28. int aclk_disable_runtime = 0;
  29. int aclk_stats_enabled;
  30. int aclk_kill_link = 0;
  31. usec_t aclk_session_us = 0;
  32. time_t aclk_session_sec = 0;
  33. time_t last_conn_time_mqtt = 0;
  34. time_t last_conn_time_appl = 0;
  35. time_t last_disconnect_time = 0;
  36. time_t next_connection_attempt = 0;
  37. float last_backoff_value = 0;
  38. time_t aclk_block_until = 0;
  39. #ifdef ENABLE_ACLK
  40. mqtt_wss_client mqttwss_client;
  41. netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
  42. #define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
  43. #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
  44. struct aclk_shared_state aclk_shared_state = {
  45. .mqtt_shutdown_msg_id = -1,
  46. .mqtt_shutdown_msg_rcvd = 0
  47. };
  48. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  49. OSSL_DECODER_CTX *aclk_dctx = NULL;
  50. EVP_PKEY *aclk_private_key = NULL;
  51. #else
  52. static RSA *aclk_private_key = NULL;
  53. #endif
  54. static int load_private_key()
  55. {
  56. if (aclk_private_key != NULL) {
  57. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  58. EVP_PKEY_free(aclk_private_key);
  59. if (aclk_dctx)
  60. OSSL_DECODER_CTX_free(aclk_dctx);
  61. aclk_dctx = NULL;
  62. #else
  63. RSA_free(aclk_private_key);
  64. #endif
  65. }
  66. aclk_private_key = NULL;
  67. char filename[FILENAME_MAX + 1];
  68. snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
  69. long bytes_read;
  70. char *private_key = read_by_filename(filename, &bytes_read);
  71. if (!private_key) {
  72. error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  73. return 1;
  74. }
  75. debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  76. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  77. if (key_bio==NULL) {
  78. error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  79. goto biofailed;
  80. }
  81. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  82. aclk_dctx = OSSL_DECODER_CTX_new_for_pkey(&aclk_private_key, "PEM", NULL,
  83. "RSA",
  84. OSSL_KEYMGMT_SELECT_PRIVATE_KEY,
  85. NULL, NULL);
  86. if (!aclk_dctx) {
  87. error("Loading private key (from claiming) failed - no OpenSSL Decoders found");
  88. goto biofailed;
  89. }
  90. // this is necesseary to avoid RSA key with wrong size
  91. if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) {
  92. error("Decoding private key (from claiming) failed - invalid format.");
  93. goto biofailed;
  94. }
  95. #else
  96. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  97. #endif
  98. BIO_free(key_bio);
  99. if (aclk_private_key!=NULL)
  100. {
  101. freez(private_key);
  102. return 0;
  103. }
  104. char err[512];
  105. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  106. error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  107. biofailed:
  108. freez(private_key);
  109. return 1;
  110. }
  111. static int wait_till_cloud_enabled()
  112. {
  113. info("Waiting for Cloud to be enabled");
  114. while (!netdata_cloud_setting) {
  115. sleep_usec(USEC_PER_SEC * 1);
  116. if (netdata_exit)
  117. return 1;
  118. }
  119. return 0;
  120. }
  121. /**
  122. * Will block until agent is claimed. Returns only if agent claimed
  123. * or if agent needs to shutdown.
  124. *
  125. * @return `0` if agent has been claimed,
  126. * `1` if interrupted due to agent shutting down
  127. */
  128. static int wait_till_agent_claimed(void)
  129. {
  130. //TODO prevent malloc and freez
  131. char *agent_id = get_agent_claimid();
  132. while (likely(!agent_id)) {
  133. sleep_usec(USEC_PER_SEC * 1);
  134. if (netdata_exit)
  135. return 1;
  136. agent_id = get_agent_claimid();
  137. }
  138. freez(agent_id);
  139. return 0;
  140. }
  141. /**
  142. * Checks everything is ready for connection
  143. * agent claimed, cloud url set and private key available
  144. *
  145. * @param aclk_hostname points to location where string pointer to hostname will be set
  146. * @param aclk_port port to int where port will be saved
  147. *
  148. * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated
  149. */
  150. static int wait_till_agent_claim_ready()
  151. {
  152. url_t url;
  153. while (!netdata_exit) {
  154. if (wait_till_agent_claimed())
  155. return 1;
  156. // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
  157. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
  158. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  159. if (cloud_base_url == NULL) {
  160. error("Do not move the cloud base url out of post_conf_load!!");
  161. return 1;
  162. }
  163. // We just check configuration is valid here
  164. // TODO make it without malloc/free
  165. memset(&url, 0, sizeof(url_t));
  166. if (url_parse(cloud_base_url, &url)) {
  167. error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
  168. url_t_destroy(&url);
  169. sleep(5);
  170. continue;
  171. }
  172. url_t_destroy(&url);
  173. if (!load_private_key())
  174. return 0;
  175. sleep(5);
  176. }
  177. return 1;
  178. }
  179. void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
  180. {
  181. switch(log_type) {
  182. case MQTT_WSS_LOG_ERROR:
  183. case MQTT_WSS_LOG_FATAL:
  184. case MQTT_WSS_LOG_WARN:
  185. error_report("%s", str);
  186. return;
  187. case MQTT_WSS_LOG_INFO:
  188. info("%s", str);
  189. return;
  190. case MQTT_WSS_LOG_DEBUG:
  191. debug(D_ACLK, "%s", str);
  192. return;
  193. default:
  194. error("Unknown log type from mqtt_wss");
  195. }
  196. }
  197. //TODO prevent big buffer on stack
  198. #define RX_MSGLEN_MAX 4096
  199. static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
  200. {
  201. UNUSED(qos);
  202. aclk_rcvd_cloud_msgs++;
  203. if (msglen > RX_MSGLEN_MAX)
  204. error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
  205. debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
  206. if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
  207. error("Link is shutting down. Ignoring incoming message.");
  208. return;
  209. }
  210. const char *msgtype = strrchr(topic, '/');
  211. if (unlikely(!msgtype)) {
  212. error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
  213. return;
  214. }
  215. msgtype++;
  216. if (unlikely(!*msgtype)) {
  217. error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
  218. return;
  219. }
  220. #ifdef ACLK_LOG_CONVERSATION_DIR
  221. #define FN_MAX_LEN 512
  222. char filename[FN_MAX_LEN];
  223. int logfd;
  224. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
  225. logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
  226. if(logfd < 0)
  227. error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
  228. write(logfd, msg, msglen);
  229. close(logfd);
  230. #endif
  231. aclk_handle_new_cloud_msg(msgtype, msg, msglen, topic);
  232. }
  233. static void puback_callback(uint16_t packet_id)
  234. {
  235. if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
  236. last_conn_time_appl = now_realtime_sec();
  237. aclk_tbeb_reset();
  238. }
  239. #ifdef NETDATA_INTERNAL_CHECKS
  240. aclk_stats_msg_puback(packet_id);
  241. #endif
  242. if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
  243. info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
  244. aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
  245. }
  246. }
  247. static int read_query_thread_count()
  248. {
  249. int threads = MIN(processors/2, 6);
  250. threads = MAX(threads, 2);
  251. threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  252. if(threads < 1) {
  253. error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
  254. threads = 1;
  255. config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  256. }
  257. return threads;
  258. }
  259. void aclk_graceful_disconnect(mqtt_wss_client client);
  260. /* Keeps connection alive and handles all network communications.
  261. * Returns on error or when netdata is shutting down.
  262. * @param client instance of mqtt_wss_client
  263. * @returns 0 - Netdata Exits
  264. * >0 - Error happened. Reconnect and start over.
  265. */
  266. static int handle_connection(mqtt_wss_client client)
  267. {
  268. time_t last_periodic_query_wakeup = now_monotonic_sec();
  269. while (!netdata_exit) {
  270. // timeout 1000 to check at least once a second
  271. // for netdata_exit
  272. if (mqtt_wss_service(client, 1000) < 0){
  273. error_report("Connection Error or Dropped");
  274. return 1;
  275. }
  276. if (disconnect_req || aclk_kill_link) {
  277. info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
  278. disconnect_req ? "true" : "false",
  279. aclk_kill_link ? "true" : "false");
  280. disconnect_req = 0;
  281. aclk_kill_link = 0;
  282. aclk_graceful_disconnect(client);
  283. aclk_queue_unlock();
  284. aclk_shared_state.mqtt_shutdown_msg_id = -1;
  285. aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
  286. return 1;
  287. }
  288. // mqtt_wss_service will return faster than in one second
  289. // if there is enough work to do
  290. time_t now = now_monotonic_sec();
  291. if (last_periodic_query_wakeup < now) {
  292. // wake up at least one Query Thread at least
  293. // once per second
  294. last_periodic_query_wakeup = now;
  295. QUERY_THREAD_WAKEUP;
  296. }
  297. }
  298. return 0;
  299. }
  300. static inline void mqtt_connected_actions(mqtt_wss_client client)
  301. {
  302. char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
  303. if (!topic)
  304. error("Unable to fetch topic for COMMAND (to subscribe)");
  305. else
  306. mqtt_wss_subscribe(client, topic, 1);
  307. topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
  308. if (!topic)
  309. error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
  310. else
  311. mqtt_wss_subscribe(client, topic, 1);
  312. aclk_stats_upd_online(1);
  313. aclk_connected = 1;
  314. aclk_pubacks_per_conn = 0;
  315. aclk_rcvd_cloud_msgs = 0;
  316. aclk_connection_counter++;
  317. aclk_send_agent_connection_update(client, 1);
  318. }
  319. void aclk_graceful_disconnect(mqtt_wss_client client)
  320. {
  321. info("Preparing to gracefully shutdown ACLK connection");
  322. aclk_queue_lock();
  323. aclk_queue_flush();
  324. aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
  325. time_t t = now_monotonic_sec();
  326. while (!mqtt_wss_service(client, 100)) {
  327. if (now_monotonic_sec() - t >= 2) {
  328. error("Wasn't able to gracefully shutdown ACLK in time!");
  329. break;
  330. }
  331. if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
  332. info("MQTT App Layer `disconnect` message sent successfully");
  333. break;
  334. }
  335. }
  336. info("ACLK link is down");
  337. log_access("ACLK DISCONNECTED");
  338. aclk_stats_upd_online(0);
  339. last_disconnect_time = now_realtime_sec();
  340. aclk_connected = 0;
  341. info("Attempting to gracefully shutdown the MQTT/WSS connection");
  342. mqtt_wss_disconnect(client, 1000);
  343. }
  344. static unsigned long aclk_reconnect_delay() {
  345. unsigned long recon_delay;
  346. time_t now;
  347. if (aclk_disable_runtime) {
  348. aclk_tbeb_reset();
  349. return 60 * MSEC_PER_SEC;
  350. }
  351. now = now_monotonic_sec();
  352. if (aclk_block_until) {
  353. if (now < aclk_block_until) {
  354. recon_delay = aclk_block_until - now;
  355. recon_delay *= MSEC_PER_SEC;
  356. aclk_block_until = 0;
  357. aclk_tbeb_reset();
  358. return recon_delay;
  359. }
  360. aclk_block_until = 0;
  361. }
  362. if (!aclk_env || !aclk_env->backoff.base)
  363. return aclk_tbeb_delay(0, 2, 0, 1024);
  364. return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
  365. }
  366. /* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled
  367. * @return 0 - Go ahead and connect (delay expired)
  368. * 1 - netdata_exit
  369. */
  370. #define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
  371. static int aclk_block_till_recon_allowed() {
  372. unsigned long recon_delay = aclk_reconnect_delay();
  373. next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
  374. last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
  375. info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
  376. // we want to wake up from time to time to check netdata_exit
  377. while (recon_delay)
  378. {
  379. if (netdata_exit)
  380. return 1;
  381. if (recon_delay > NETDATA_EXIT_POLL_MS) {
  382. sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
  383. recon_delay -= NETDATA_EXIT_POLL_MS;
  384. continue;
  385. }
  386. sleep_usec(recon_delay * USEC_PER_MS);
  387. recon_delay = 0;
  388. }
  389. return netdata_exit;
  390. }
  391. #ifndef ACLK_DISABLE_CHALLENGE
  392. /* Cloud returns transport list ordered with highest
  393. * priority first. This function selects highest prio
  394. * transport that we can actually use (support)
  395. */
  396. static int aclk_get_transport_idx(aclk_env_t *env) {
  397. for (size_t i = 0; i < env->transport_count; i++) {
  398. // currently we support only MQTT 5
  399. // therefore select first transport that matches
  400. if (env->transports[i]->type == ACLK_TRP_MQTT_5) {
  401. return i;
  402. }
  403. }
  404. return -1;
  405. }
  406. #endif
  407. /* Attempts to make a connection to MQTT broker over WSS
  408. * @param client instance of mqtt_wss_client
  409. * @return 0 - Successful Connection,
  410. * <0 - Irrecoverable Error -> Kill ACLK,
  411. * >0 - netdata_exit
  412. */
  413. #define CLOUD_BASE_URL_READ_RETRY 30
  414. #ifdef ACLK_SSL_ALLOW_SELF_SIGNED
  415. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED
  416. #else
  417. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL
  418. #endif
  419. static int aclk_attempt_to_connect(mqtt_wss_client client)
  420. {
  421. int ret;
  422. url_t base_url;
  423. #ifndef ACLK_DISABLE_CHALLENGE
  424. url_t auth_url;
  425. url_t mqtt_url;
  426. #endif
  427. while (!netdata_exit) {
  428. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  429. if (cloud_base_url == NULL) {
  430. error_report("Do not move the cloud base url out of post_conf_load!!");
  431. return -1;
  432. }
  433. if (aclk_block_till_recon_allowed())
  434. return 1;
  435. info("Attempting connection now");
  436. memset(&base_url, 0, sizeof(url_t));
  437. if (url_parse(cloud_base_url, &base_url)) {
  438. error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
  439. sleep(CLOUD_BASE_URL_READ_RETRY);
  440. url_t_destroy(&base_url);
  441. continue;
  442. }
  443. struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
  444. aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
  445. struct mqtt_connect_params mqtt_conn_params = {
  446. .clientid = "anon",
  447. .username = "anon",
  448. .password = "anon",
  449. .will_topic = "lwt",
  450. .will_msg = NULL,
  451. .will_flags = MQTT_WSS_PUB_QOS2,
  452. .keep_alive = 60,
  453. .drop_on_publish_fail = 1
  454. };
  455. #ifndef ACLK_DISABLE_CHALLENGE
  456. if (aclk_env) {
  457. aclk_env_t_destroy(aclk_env);
  458. freez(aclk_env);
  459. }
  460. aclk_env = callocz(1, sizeof(aclk_env_t));
  461. ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
  462. url_t_destroy(&base_url);
  463. if (ret) {
  464. error_report("Failed to Get ACLK environment");
  465. // delay handled by aclk_block_till_recon_allowed
  466. continue;
  467. }
  468. if (netdata_exit)
  469. return 1;
  470. if (aclk_env->encoding != ACLK_ENC_PROTO) {
  471. error_report("This agent can only use the new cloud protocol but cloud requested old one.");
  472. continue;
  473. }
  474. if (!aclk_env_has_capa("proto")) {
  475. error_report("Can't use encoding=proto without at least \"proto\" capability.");
  476. continue;
  477. }
  478. info("New ACLK protobuf protocol negotiated successfully (/env response).");
  479. memset(&auth_url, 0, sizeof(url_t));
  480. if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
  481. error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
  482. url_t_destroy(&auth_url);
  483. continue;
  484. }
  485. ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
  486. url_t_destroy(&auth_url);
  487. if (ret) {
  488. error_report("Error passing Challenge/Response to get OTP");
  489. continue;
  490. }
  491. // aclk_get_topic moved here as during OTP we
  492. // generate the topic cache
  493. mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
  494. if (!mqtt_conn_params.will_topic) {
  495. error_report("Couldn't get LWT topic. Will not send LWT.");
  496. continue;
  497. }
  498. // Do the MQTT connection
  499. ret = aclk_get_transport_idx(aclk_env);
  500. if (ret < 0) {
  501. error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
  502. continue;
  503. }
  504. memset(&mqtt_url, 0, sizeof(url_t));
  505. if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
  506. error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
  507. url_t_destroy(&mqtt_url);
  508. continue;
  509. }
  510. #endif
  511. aclk_session_newarch = now_realtime_usec();
  512. aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
  513. aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
  514. mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
  515. #ifdef ACLK_DISABLE_CHALLENGE
  516. ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  517. url_t_destroy(&base_url);
  518. #else
  519. ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  520. url_t_destroy(&mqtt_url);
  521. freez((char*)mqtt_conn_params.clientid);
  522. freez((char*)mqtt_conn_params.password);
  523. freez((char*)mqtt_conn_params.username);
  524. #endif
  525. freez((char *)mqtt_conn_params.will_msg);
  526. if (!ret) {
  527. last_conn_time_mqtt = now_realtime_sec();
  528. info("ACLK connection successfully established");
  529. log_access("ACLK CONNECTED");
  530. mqtt_connected_actions(client);
  531. return 0;
  532. }
  533. error_report("Connect failed");
  534. }
  535. return 1;
  536. }
  537. /**
  538. * Main agent cloud link thread
  539. *
  540. * This thread will simply call the main event loop that handles
  541. * pending requests - both inbound and outbound
  542. *
  543. * @param ptr is a pointer to the netdata_static_thread structure.
  544. *
  545. * @return It always returns NULL
  546. */
  547. void *aclk_main(void *ptr)
  548. {
  549. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  550. struct aclk_stats_thread *stats_thread = NULL;
  551. struct aclk_query_threads query_threads;
  552. query_threads.thread_list = NULL;
  553. ACLK_PROXY_TYPE proxy_type;
  554. aclk_get_proxy(&proxy_type);
  555. if (proxy_type == PROXY_TYPE_SOCKS5) {
  556. error("SOCKS5 proxy is not supported by ACLK-NG yet.");
  557. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  558. return NULL;
  559. }
  560. unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers();
  561. // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
  562. // as it must notify the far end that it shutdown gracefully and avoid the LWT.
  563. netdata_thread_disable_cancelability();
  564. #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
  565. info("Killing ACLK thread -> cloud functionality has been disabled");
  566. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  567. return NULL;
  568. #endif
  569. query_threads.count = read_query_thread_count();
  570. if (wait_till_cloud_enabled())
  571. goto exit;
  572. if (wait_till_agent_claim_ready())
  573. goto exit;
  574. if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, 1))) {
  575. error("Couldn't initialize MQTT_WSS network library");
  576. goto exit;
  577. }
  578. // Enable MQTT buffer growth if necessary
  579. // e.g. old cloud architecture clients with huge nodes
  580. // that send JSON payloads of 10 MB as single messages
  581. mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
  582. aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled);
  583. if (aclk_stats_enabled) {
  584. stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
  585. stats_thread->thread = mallocz(sizeof(netdata_thread_t));
  586. stats_thread->query_thread_count = query_threads.count;
  587. stats_thread->client = mqttwss_client;
  588. aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
  589. netdata_thread_create(
  590. stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
  591. stats_thread);
  592. }
  593. // Keep reconnecting and talking until our time has come
  594. // and the Grim Reaper (netdata_exit) calls
  595. do {
  596. if (aclk_attempt_to_connect(mqttwss_client))
  597. goto exit_full;
  598. if (unlikely(!query_threads.thread_list))
  599. aclk_query_threads_start(&query_threads, mqttwss_client);
  600. if (handle_connection(mqttwss_client)) {
  601. aclk_stats_upd_online(0);
  602. last_disconnect_time = now_realtime_sec();
  603. aclk_connected = 0;
  604. log_access("ACLK DISCONNECTED");
  605. }
  606. } while (!netdata_exit);
  607. aclk_graceful_disconnect(mqttwss_client);
  608. exit_full:
  609. // Tear Down
  610. QUERY_THREAD_WAKEUP_ALL;
  611. aclk_query_threads_cleanup(&query_threads);
  612. if (aclk_stats_enabled) {
  613. netdata_thread_join(*stats_thread->thread, NULL);
  614. aclk_stats_thread_cleanup();
  615. freez(stats_thread->thread);
  616. freez(stats_thread);
  617. }
  618. free_topic_cache();
  619. mqtt_wss_destroy(mqttwss_client);
  620. exit:
  621. if (aclk_env) {
  622. aclk_env_t_destroy(aclk_env);
  623. freez(aclk_env);
  624. }
  625. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  626. return NULL;
  627. }
  628. void aclk_host_state_update(RRDHOST *host, int cmd)
  629. {
  630. uuid_t node_id;
  631. int ret;
  632. if (!aclk_connected)
  633. return;
  634. ret = get_node_id(&host->host_uuid, &node_id);
  635. if (ret > 0) {
  636. // this means we were not able to check if node_id already present
  637. error("Unable to check for node_id. Ignoring the host state update.");
  638. return;
  639. }
  640. if (ret < 0) {
  641. // node_id not found
  642. aclk_query_t create_query;
  643. create_query = aclk_query_new(REGISTER_NODE);
  644. rrdhost_aclk_state_lock(localhost);
  645. node_instance_creation_t node_instance_creation = {
  646. .claim_id = localhost->aclk_state.claimed_id,
  647. .hops = host->system_info->hops,
  648. .hostname = rrdhost_hostname(host),
  649. .machine_guid = host->machine_guid
  650. };
  651. create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
  652. rrdhost_aclk_state_unlock(localhost);
  653. create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
  654. create_query->data.bin_payload.msg_name = "CreateNodeInstance";
  655. info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
  656. aclk_queue_query(create_query);
  657. return;
  658. }
  659. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  660. node_instance_connection_t node_state_update = {
  661. .hops = host->system_info->hops,
  662. .live = cmd,
  663. .queryable = 1,
  664. .session_id = aclk_session_newarch
  665. };
  666. node_state_update.node_id = mallocz(UUID_STR_LEN);
  667. uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
  668. struct capability caps[] = {
  669. { .name = "proto", .version = 1, .enabled = 1 },
  670. { .name = "ml", .version = ml_capable(localhost), .enabled = ml_enabled(host) },
  671. { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
  672. { .name = "ctx", .version = 1, .enabled = 1 },
  673. { .name = NULL, .version = 0, .enabled = 0 }
  674. };
  675. node_state_update.capabilities = caps;
  676. rrdhost_aclk_state_lock(localhost);
  677. node_state_update.claim_id = localhost->aclk_state.claimed_id;
  678. query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
  679. rrdhost_aclk_state_unlock(localhost);
  680. info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
  681. host->system_info->hops);
  682. freez((void*)node_state_update.node_id);
  683. query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
  684. query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
  685. aclk_queue_query(query);
  686. }
  687. void aclk_send_node_instances()
  688. {
  689. struct node_instance_list *list_head = get_node_list();
  690. struct node_instance_list *list = list_head;
  691. if (unlikely(!list)) {
  692. error_report("Failure to get_node_list from DB!");
  693. return;
  694. }
  695. while (!uuid_is_null(list->host_id)) {
  696. if (!uuid_is_null(list->node_id)) {
  697. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  698. node_instance_connection_t node_state_update = {
  699. .live = list->live,
  700. .hops = list->hops,
  701. .queryable = 1,
  702. .session_id = aclk_session_newarch
  703. };
  704. node_state_update.node_id = mallocz(UUID_STR_LEN);
  705. uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
  706. char host_id[UUID_STR_LEN];
  707. uuid_unparse_lower(list->host_id, host_id);
  708. RRDHOST *host = rrdhost_find_by_guid(host_id);
  709. struct capability caps[] = {
  710. { .name = "proto", .version = 1, .enabled = 1 },
  711. { .name = "ml", .version = ml_capable(localhost), .enabled = host ? ml_enabled(host) : 0 },
  712. { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
  713. { .name = "ctx", .version = 1, .enabled = 1 },
  714. { .name = NULL, .version = 0, .enabled = 0 }
  715. };
  716. node_state_update.capabilities = caps;
  717. rrdhost_aclk_state_lock(localhost);
  718. node_state_update.claim_id = localhost->aclk_state.claimed_id;
  719. query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
  720. rrdhost_aclk_state_unlock(localhost);
  721. info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
  722. list->live,
  723. list->hops);
  724. freez((void*)node_state_update.node_id);
  725. query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
  726. query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
  727. aclk_queue_query(query);
  728. } else {
  729. aclk_query_t create_query;
  730. create_query = aclk_query_new(REGISTER_NODE);
  731. node_instance_creation_t node_instance_creation = {
  732. .hops = list->hops,
  733. .hostname = list->hostname,
  734. };
  735. node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
  736. uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
  737. create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
  738. create_query->data.bin_payload.msg_name = "CreateNodeInstance";
  739. rrdhost_aclk_state_lock(localhost);
  740. node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
  741. create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
  742. rrdhost_aclk_state_unlock(localhost);
  743. info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
  744. list->hops);
  745. freez((void *)node_instance_creation.machine_guid);
  746. aclk_queue_query(create_query);
  747. }
  748. freez(list->hostname);
  749. list++;
  750. }
  751. freez(list_head);
  752. }
  753. void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
  754. {
  755. aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
  756. }
  757. static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
  758. {
  759. struct proto_alert_status status;
  760. memset(&status, 0, sizeof(status));
  761. if (get_proto_alert_status(host, &status)) {
  762. buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
  763. return;
  764. }
  765. buffer_sprintf(wb,
  766. "\n\t\tUpdates: %d"
  767. "\n\t\tBatch ID: %"PRIu64
  768. "\n\t\tLast Acked Seq ID: %"PRIu64
  769. "\n\t\tPending Min Seq ID: %"PRIu64
  770. "\n\t\tPending Max Seq ID: %"PRIu64
  771. "\n\t\tLast Submitted Seq ID: %"PRIu64,
  772. status.alert_updates,
  773. status.alerts_batch_id,
  774. status.last_acked_sequence_id,
  775. status.pending_min_sequence_id,
  776. status.pending_max_sequence_id,
  777. status.last_submitted_sequence_id
  778. );
  779. }
  780. #endif /* ENABLE_ACLK */
  781. char *aclk_state(void)
  782. {
  783. #ifndef ENABLE_ACLK
  784. return strdupz("ACLK Available: No");
  785. #else
  786. BUFFER *wb = buffer_create(1024);
  787. struct tm *tmptr, tmbuf;
  788. char *ret;
  789. buffer_strcat(wb,
  790. "ACLK Available: Yes\n"
  791. "ACLK Version: 2\n"
  792. "Protocols Supported: Protobuf\n"
  793. );
  794. buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5);
  795. char *agent_id = get_agent_claimid();
  796. if (agent_id == NULL)
  797. buffer_strcat(wb, "No\n");
  798. else {
  799. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  800. buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
  801. freez(agent_id);
  802. }
  803. buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
  804. if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
  805. char timebuf[26];
  806. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  807. buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
  808. }
  809. if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
  810. char timebuf[26];
  811. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  812. buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
  813. }
  814. if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
  815. char timebuf[26];
  816. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  817. buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
  818. }
  819. if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
  820. char timebuf[26];
  821. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  822. buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
  823. }
  824. if (aclk_connected) {
  825. buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
  826. RRDHOST *host;
  827. rrd_rdlock();
  828. rrdhost_foreach_read(host) {
  829. buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host));
  830. buffer_strcat(wb, "\tClaimed ID: ");
  831. rrdhost_aclk_state_lock(host);
  832. if (host->aclk_state.claimed_id)
  833. buffer_strcat(wb, host->aclk_state.claimed_id);
  834. else
  835. buffer_strcat(wb, "null");
  836. rrdhost_aclk_state_unlock(host);
  837. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  838. buffer_strcat(wb, "\n\tNode ID: null\n");
  839. } else {
  840. char node_id[GUID_LEN + 1];
  841. uuid_unparse_lower(*host->node_id, node_id);
  842. buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
  843. }
  844. buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
  845. if (host != localhost)
  846. buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false");
  847. buffer_strcat(wb, "\n\tAlert Streaming Status:");
  848. fill_alert_status_for_host(wb, host);
  849. }
  850. rrd_unlock();
  851. }
  852. ret = strdupz(buffer_tostring(wb));
  853. buffer_free(wb);
  854. return ret;
  855. #endif /* ENABLE_ACLK */
  856. }
  857. #ifdef ENABLE_ACLK
  858. static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
  859. {
  860. struct proto_alert_status status;
  861. memset(&status, 0, sizeof(status));
  862. if (get_proto_alert_status(host, &status))
  863. return;
  864. json_object *tmp = json_object_new_int(status.alert_updates);
  865. json_object_object_add(obj, "updates", tmp);
  866. tmp = json_object_new_int(status.alerts_batch_id);
  867. json_object_object_add(obj, "batch-id", tmp);
  868. tmp = json_object_new_int(status.last_acked_sequence_id);
  869. json_object_object_add(obj, "last-acked-seq-id", tmp);
  870. tmp = json_object_new_int(status.pending_min_sequence_id);
  871. json_object_object_add(obj, "pending-min-seq-id", tmp);
  872. tmp = json_object_new_int(status.pending_max_sequence_id);
  873. json_object_object_add(obj, "pending-max-seq-id", tmp);
  874. tmp = json_object_new_int(status.last_submitted_sequence_id);
  875. json_object_object_add(obj, "last-submitted-seq-id", tmp);
  876. }
  877. static json_object *timestamp_to_json(const time_t *t)
  878. {
  879. struct tm *tmptr, tmbuf;
  880. if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
  881. char timebuf[26];
  882. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  883. return json_object_new_string(timebuf);
  884. }
  885. return NULL;
  886. }
  887. #endif /* ENABLE_ACLK */
  888. char *aclk_state_json(void)
  889. {
  890. #ifndef ENABLE_ACLK
  891. return strdupz("{\"aclk-available\":false}");
  892. #else
  893. json_object *tmp, *grp, *msg = json_object_new_object();
  894. tmp = json_object_new_boolean(1);
  895. json_object_object_add(msg, "aclk-available", tmp);
  896. tmp = json_object_new_int(2);
  897. json_object_object_add(msg, "aclk-version", tmp);
  898. grp = json_object_new_array();
  899. tmp = json_object_new_string("Protobuf");
  900. json_object_array_add(grp, tmp);
  901. json_object_object_add(msg, "protocols-supported", grp);
  902. char *agent_id = get_agent_claimid();
  903. tmp = json_object_new_boolean(agent_id != NULL);
  904. json_object_object_add(msg, "agent-claimed", tmp);
  905. if (agent_id) {
  906. tmp = json_object_new_string(agent_id);
  907. freez(agent_id);
  908. } else
  909. tmp = NULL;
  910. json_object_object_add(msg, "claimed-id", tmp);
  911. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  912. tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
  913. json_object_object_add(msg, "cloud-url", tmp);
  914. tmp = json_object_new_boolean(aclk_connected);
  915. json_object_object_add(msg, "online", tmp);
  916. tmp = json_object_new_string("Protobuf");
  917. json_object_object_add(msg, "used-cloud-protocol", tmp);
  918. tmp = json_object_new_int(5);
  919. json_object_object_add(msg, "mqtt-version", tmp);
  920. tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
  921. json_object_object_add(msg, "received-app-layer-msgs", tmp);
  922. tmp = json_object_new_int(aclk_pubacks_per_conn);
  923. json_object_object_add(msg, "received-mqtt-pubacks", tmp);
  924. tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
  925. json_object_object_add(msg, "reconnect-count", tmp);
  926. json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
  927. json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
  928. json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
  929. json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL);
  930. tmp = NULL;
  931. if (!aclk_connected && last_backoff_value)
  932. tmp = json_object_new_double(last_backoff_value);
  933. json_object_object_add(msg, "last-backoff-value", tmp);
  934. tmp = json_object_new_boolean(aclk_disable_runtime);
  935. json_object_object_add(msg, "banned-by-cloud", tmp);
  936. grp = json_object_new_array();
  937. RRDHOST *host;
  938. rrd_rdlock();
  939. rrdhost_foreach_read(host) {
  940. json_object *nodeinstance = json_object_new_object();
  941. tmp = json_object_new_string(rrdhost_hostname(host));
  942. json_object_object_add(nodeinstance, "hostname", tmp);
  943. tmp = json_object_new_string(host->machine_guid);
  944. json_object_object_add(nodeinstance, "mguid", tmp);
  945. rrdhost_aclk_state_lock(host);
  946. if (host->aclk_state.claimed_id) {
  947. tmp = json_object_new_string(host->aclk_state.claimed_id);
  948. json_object_object_add(nodeinstance, "claimed_id", tmp);
  949. } else
  950. json_object_object_add(nodeinstance, "claimed_id", NULL);
  951. rrdhost_aclk_state_unlock(host);
  952. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  953. json_object_object_add(nodeinstance, "node-id", NULL);
  954. } else {
  955. char node_id[GUID_LEN + 1];
  956. uuid_unparse_lower(*host->node_id, node_id);
  957. tmp = json_object_new_string(node_id);
  958. json_object_object_add(nodeinstance, "node-id", tmp);
  959. }
  960. tmp = json_object_new_int(host->system_info->hops);
  961. json_object_object_add(nodeinstance, "streaming-hops", tmp);
  962. tmp = json_object_new_string(host == localhost ? "self" : "child");
  963. json_object_object_add(nodeinstance, "relationship", tmp);
  964. tmp = json_object_new_boolean((host->receiver || host == localhost));
  965. json_object_object_add(nodeinstance, "streaming-online", tmp);
  966. tmp = json_object_new_object();
  967. fill_alert_status_for_host_json(tmp, host);
  968. json_object_object_add(nodeinstance, "alert-sync-status", tmp);
  969. json_object_array_add(grp, nodeinstance);
  970. }
  971. rrd_unlock();
  972. json_object_object_add(msg, "node-instances", grp);
  973. char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
  974. json_object_put(msg);
  975. return str;
  976. #endif /* ENABLE_ACLK */
  977. }
  978. void add_aclk_host_labels(void) {
  979. DICTIONARY *labels = localhost->rrdlabels;
  980. #ifdef ENABLE_ACLK
  981. rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  982. ACLK_PROXY_TYPE aclk_proxy;
  983. char *proxy_str;
  984. aclk_get_proxy(&aclk_proxy);
  985. switch(aclk_proxy) {
  986. case PROXY_TYPE_SOCKS5:
  987. proxy_str = "SOCKS5";
  988. break;
  989. case PROXY_TYPE_HTTP:
  990. proxy_str = "HTTP";
  991. break;
  992. default:
  993. proxy_str = "none";
  994. break;
  995. }
  996. rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO);
  997. rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
  998. rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  999. #else
  1000. rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  1001. #endif
  1002. }
  1003. void aclk_queue_node_info(RRDHOST *host) {
  1004. struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker;
  1005. if (likely(wc)) {
  1006. wc->node_info_send = 1;
  1007. }
  1008. }