aclk.c 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk.h"
  3. #include "aclk_stats.h"
  4. #include "mqtt_wss_client.h"
  5. #include "aclk_otp.h"
  6. #include "aclk_tx_msgs.h"
  7. #include "aclk_query.h"
  8. #include "aclk_query_queue.h"
  9. #include "aclk_util.h"
  10. #include "aclk_rx_msgs.h"
  11. #include "aclk_collector_list.h"
  12. #include "https_client.h"
  13. #include "aclk_proxy.h"
  14. #ifdef ACLK_LOG_CONVERSATION_DIR
  15. #include <sys/types.h>
  16. #include <sys/stat.h>
  17. #include <fcntl.h>
  18. #endif
  19. #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
  20. int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
  21. int aclk_rcvd_cloud_msgs = 0;
  22. int aclk_connection_counter = 0;
  23. int disconnect_req = 0;
  24. time_t last_conn_time_mqtt = 0;
  25. time_t last_conn_time_appl = 0;
  26. time_t last_disconnect_time = 0;
  27. time_t next_connection_attempt = 0;
  28. float last_backoff_value = 0;
  29. int aclk_alert_reloaded = 1; //1 on startup, and again on health_reload
  30. time_t aclk_block_until = 0;
  31. mqtt_wss_client mqttwss_client;
  32. netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
  33. #define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
  34. #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
  35. struct aclk_shared_state aclk_shared_state = {
  36. .agent_state = ACLK_HOST_INITIALIZING,
  37. .last_popcorn_interrupt = 0,
  38. .mqtt_shutdown_msg_id = -1,
  39. .mqtt_shutdown_msg_rcvd = 0
  40. };
  41. static RSA *aclk_private_key = NULL;
  42. static int load_private_key()
  43. {
  44. if (aclk_private_key != NULL)
  45. RSA_free(aclk_private_key);
  46. aclk_private_key = NULL;
  47. char filename[FILENAME_MAX + 1];
  48. snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
  49. long bytes_read;
  50. char *private_key = read_by_filename(filename, &bytes_read);
  51. if (!private_key) {
  52. error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  53. return 1;
  54. }
  55. debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  56. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  57. if (key_bio==NULL) {
  58. error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  59. goto biofailed;
  60. }
  61. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  62. BIO_free(key_bio);
  63. if (aclk_private_key!=NULL)
  64. {
  65. freez(private_key);
  66. return 0;
  67. }
  68. char err[512];
  69. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  70. error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  71. biofailed:
  72. freez(private_key);
  73. return 1;
  74. }
  75. static int wait_till_cloud_enabled()
  76. {
  77. info("Waiting for Cloud to be enabled");
  78. while (!netdata_cloud_setting) {
  79. sleep_usec(USEC_PER_SEC * 1);
  80. if (netdata_exit)
  81. return 1;
  82. }
  83. return 0;
  84. }
  85. /**
  86. * Will block until agent is claimed. Returns only if agent claimed
  87. * or if agent needs to shutdown.
  88. *
  89. * @return `0` if agent has been claimed,
  90. * `1` if interrupted due to agent shutting down
  91. */
  92. static int wait_till_agent_claimed(void)
  93. {
  94. //TODO prevent malloc and freez
  95. char *agent_id = is_agent_claimed();
  96. while (likely(!agent_id)) {
  97. sleep_usec(USEC_PER_SEC * 1);
  98. if (netdata_exit)
  99. return 1;
  100. agent_id = is_agent_claimed();
  101. }
  102. freez(agent_id);
  103. return 0;
  104. }
  105. /**
  106. * Checks everything is ready for connection
  107. * agent claimed, cloud url set and private key available
  108. *
  109. * @param aclk_hostname points to location where string pointer to hostname will be set
  110. * @param aclk_port port to int where port will be saved
  111. *
  112. * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated
  113. */
  114. static int wait_till_agent_claim_ready()
  115. {
  116. url_t url;
  117. while (!netdata_exit) {
  118. if (wait_till_agent_claimed())
  119. return 1;
  120. // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
  121. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
  122. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  123. if (cloud_base_url == NULL) {
  124. error("Do not move the cloud base url out of post_conf_load!!");
  125. return 1;
  126. }
  127. // We just check configuration is valid here
  128. // TODO make it without malloc/free
  129. memset(&url, 0, sizeof(url_t));
  130. if (url_parse(cloud_base_url, &url)) {
  131. error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
  132. url_t_destroy(&url);
  133. sleep(5);
  134. continue;
  135. }
  136. url_t_destroy(&url);
  137. if (!load_private_key())
  138. return 0;
  139. sleep(5);
  140. }
  141. return 1;
  142. }
  143. void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
  144. {
  145. switch(log_type) {
  146. case MQTT_WSS_LOG_ERROR:
  147. case MQTT_WSS_LOG_FATAL:
  148. case MQTT_WSS_LOG_WARN:
  149. error("%s", str);
  150. return;
  151. case MQTT_WSS_LOG_INFO:
  152. info("%s", str);
  153. return;
  154. case MQTT_WSS_LOG_DEBUG:
  155. debug(D_ACLK, "%s", str);
  156. return;
  157. default:
  158. error("Unknown log type from mqtt_wss");
  159. }
  160. }
  161. //TODO prevent big buffer on stack
  162. #define RX_MSGLEN_MAX 4096
  163. static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos)
  164. {
  165. UNUSED(qos);
  166. char cmsg[RX_MSGLEN_MAX];
  167. size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
  168. const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
  169. if (!cmd_topic) {
  170. error("Error retrieving command topic");
  171. return;
  172. }
  173. if (msglen > RX_MSGLEN_MAX - 1)
  174. error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
  175. memcpy(cmsg,
  176. msg,
  177. len);
  178. cmsg[len] = 0;
  179. #ifdef ACLK_LOG_CONVERSATION_DIR
  180. #define FN_MAX_LEN 512
  181. char filename[FN_MAX_LEN];
  182. int logfd;
  183. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT());
  184. logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
  185. if(logfd < 0)
  186. error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
  187. write(logfd, msg, msglen);
  188. close(logfd);
  189. #endif
  190. debug(D_ACLK, "Got Message From Broker Topic \"%s\" QoS %d MSG: \"%s\"", topic, qos, cmsg);
  191. if (strcmp(cmd_topic, topic))
  192. error("Received message on unexpected topic %s", topic);
  193. if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
  194. error("Link is shutting down. Ignoring incoming message.");
  195. return;
  196. }
  197. aclk_handle_cloud_cmd_message(cmsg);
  198. }
  199. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  200. static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos)
  201. {
  202. UNUSED(qos);
  203. if (msglen > RX_MSGLEN_MAX)
  204. error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
  205. debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
  206. if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
  207. error("Link is shutting down. Ignoring incoming message.");
  208. return;
  209. }
  210. const char *msgtype = strrchr(topic, '/');
  211. if (unlikely(!msgtype)) {
  212. error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
  213. return;
  214. }
  215. msgtype++;
  216. if (unlikely(!*msgtype)) {
  217. error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
  218. return;
  219. }
  220. #ifdef ACLK_LOG_CONVERSATION_DIR
  221. #define FN_MAX_LEN 512
  222. char filename[FN_MAX_LEN];
  223. int logfd;
  224. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
  225. logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
  226. if(logfd < 0)
  227. error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
  228. write(logfd, msg, msglen);
  229. close(logfd);
  230. #endif
  231. aclk_handle_new_cloud_msg(msgtype, msg, msglen);
  232. }
  233. static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) {
  234. aclk_rcvd_cloud_msgs++;
  235. if (aclk_use_new_cloud_arch)
  236. msg_callback_new_protocol(topic, msg, msglen, qos);
  237. else
  238. msg_callback_old_protocol(topic, msg, msglen, qos);
  239. }
  240. #endif /* ENABLE_NEW_CLOUD_PROTOCOL */
  241. static void puback_callback(uint16_t packet_id)
  242. {
  243. if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
  244. last_conn_time_appl = now_realtime_sec();
  245. aclk_tbeb_reset();
  246. }
  247. #ifdef NETDATA_INTERNAL_CHECKS
  248. aclk_stats_msg_puback(packet_id);
  249. #endif
  250. if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
  251. info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
  252. aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
  253. }
  254. }
  255. static int read_query_thread_count()
  256. {
  257. int threads = MIN(processors/2, 6);
  258. threads = MAX(threads, 2);
  259. threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  260. if(threads < 1) {
  261. error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
  262. threads = 1;
  263. config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  264. }
  265. return threads;
  266. }
  267. void aclk_graceful_disconnect(mqtt_wss_client client);
  268. /* Keeps connection alive and handles all network communications.
  269. * Returns on error or when netdata is shutting down.
  270. * @param client instance of mqtt_wss_client
  271. * @returns 0 - Netdata Exits
  272. * >0 - Error happened. Reconnect and start over.
  273. */
  274. static int handle_connection(mqtt_wss_client client)
  275. {
  276. time_t last_periodic_query_wakeup = now_monotonic_sec();
  277. while (!netdata_exit) {
  278. // timeout 1000 to check at least once a second
  279. // for netdata_exit
  280. if (mqtt_wss_service(client, 1000) < 0){
  281. error_report("Connection Error or Dropped");
  282. return 1;
  283. }
  284. if (disconnect_req || aclk_kill_link) {
  285. info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
  286. disconnect_req ? "true" : "false",
  287. aclk_kill_link ? "true" : "false");
  288. disconnect_req = 0;
  289. aclk_kill_link = 0;
  290. aclk_graceful_disconnect(client);
  291. aclk_queue_unlock();
  292. aclk_shared_state.mqtt_shutdown_msg_id = -1;
  293. aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
  294. return 1;
  295. }
  296. // mqtt_wss_service will return faster than in one second
  297. // if there is enough work to do
  298. time_t now = now_monotonic_sec();
  299. if (last_periodic_query_wakeup < now) {
  300. // wake up at least one Query Thread at least
  301. // once per second
  302. last_periodic_query_wakeup = now;
  303. QUERY_THREAD_WAKEUP;
  304. }
  305. }
  306. return 0;
  307. }
  308. inline static int aclk_popcorn_check()
  309. {
  310. ACLK_SHARED_STATE_LOCK;
  311. if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
  312. ACLK_SHARED_STATE_UNLOCK;
  313. return 1;
  314. }
  315. ACLK_SHARED_STATE_UNLOCK;
  316. return 0;
  317. }
  318. inline static int aclk_popcorn_check_bump()
  319. {
  320. ACLK_SHARED_STATE_LOCK;
  321. if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
  322. aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
  323. ACLK_SHARED_STATE_UNLOCK;
  324. return 1;
  325. }
  326. ACLK_SHARED_STATE_UNLOCK;
  327. return 0;
  328. }
  329. static inline void queue_connect_payloads(void)
  330. {
  331. aclk_query_t query = aclk_query_new(METADATA_INFO);
  332. query->data.metadata_info.host = localhost;
  333. query->data.metadata_info.initial_on_connect = 1;
  334. aclk_queue_query(query);
  335. query = aclk_query_new(METADATA_ALARMS);
  336. query->data.metadata_alarms.initial_on_connect = 1;
  337. aclk_queue_query(query);
  338. }
  339. static inline void mqtt_connected_actions(mqtt_wss_client client)
  340. {
  341. const char *topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
  342. if (!topic)
  343. error("Unable to fetch topic for COMMAND (to subscribe)");
  344. else
  345. mqtt_wss_subscribe(client, topic, 1);
  346. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  347. if (aclk_use_new_cloud_arch) {
  348. topic = aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
  349. if (!topic)
  350. error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
  351. else
  352. mqtt_wss_subscribe(client, topic, 1);
  353. }
  354. #endif
  355. aclk_stats_upd_online(1);
  356. aclk_connected = 1;
  357. aclk_pubacks_per_conn = 0;
  358. aclk_rcvd_cloud_msgs = 0;
  359. aclk_connection_counter++;
  360. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  361. if (!aclk_use_new_cloud_arch) {
  362. #endif
  363. ACLK_SHARED_STATE_LOCK;
  364. if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
  365. error("Sending `connect` payload immediately as popcorning was finished already.");
  366. queue_connect_payloads();
  367. }
  368. ACLK_SHARED_STATE_UNLOCK;
  369. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  370. } else {
  371. aclk_send_agent_connection_update(client, 1);
  372. }
  373. #endif
  374. }
  375. /* Waits until agent is ready or needs to exit
  376. * @param client instance of mqtt_wss_client
  377. * @param query_threads pointer to aclk_query_threads
  378. * structure where to store data about started query threads
  379. * @return 0 - Popcorning Finished - Agent STABLE,
  380. * !0 - netdata_exit
  381. */
  382. static int wait_popcorning_finishes()
  383. {
  384. time_t elapsed;
  385. int need_wait;
  386. if (aclk_use_new_cloud_arch)
  387. return 0;
  388. while (!netdata_exit) {
  389. ACLK_SHARED_STATE_LOCK;
  390. if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) {
  391. ACLK_SHARED_STATE_UNLOCK;
  392. return 0;
  393. }
  394. elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
  395. if (elapsed >= ACLK_STABLE_TIMEOUT) {
  396. aclk_shared_state.agent_state = ACLK_HOST_STABLE;
  397. ACLK_SHARED_STATE_UNLOCK;
  398. error("ACLK localhost popcorn timer finished");
  399. return 0;
  400. }
  401. ACLK_SHARED_STATE_UNLOCK;
  402. need_wait = ACLK_STABLE_TIMEOUT - elapsed;
  403. error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait);
  404. sleep(need_wait);
  405. }
  406. return 1;
  407. }
  408. void aclk_graceful_disconnect(mqtt_wss_client client)
  409. {
  410. info("Preparing to gracefully shutdown ACLK connection");
  411. aclk_queue_lock();
  412. aclk_queue_flush();
  413. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  414. if (aclk_use_new_cloud_arch)
  415. aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
  416. else
  417. #endif
  418. aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
  419. time_t t = now_monotonic_sec();
  420. while (!mqtt_wss_service(client, 100)) {
  421. if (now_monotonic_sec() - t >= 2) {
  422. error("Wasn't able to gracefully shutdown ACLK in time!");
  423. break;
  424. }
  425. if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
  426. info("MQTT App Layer `disconnect` message sent successfully");
  427. break;
  428. }
  429. }
  430. info("ACLK link is down");
  431. log_access("ACLK DISCONNECTED");
  432. aclk_stats_upd_online(0);
  433. last_disconnect_time = now_realtime_sec();
  434. aclk_connected = 0;
  435. info("Attempting to gracefully shutdown the MQTT/WSS connection");
  436. mqtt_wss_disconnect(client, 1000);
  437. }
  438. static unsigned long aclk_reconnect_delay() {
  439. unsigned long recon_delay;
  440. time_t now;
  441. if (aclk_disable_runtime) {
  442. aclk_tbeb_reset();
  443. return 60 * MSEC_PER_SEC;
  444. }
  445. now = now_monotonic_sec();
  446. if (aclk_block_until) {
  447. if (now < aclk_block_until) {
  448. recon_delay = aclk_block_until - now;
  449. recon_delay *= MSEC_PER_SEC;
  450. aclk_block_until = 0;
  451. aclk_tbeb_reset();
  452. return recon_delay;
  453. }
  454. aclk_block_until = 0;
  455. }
  456. if (!aclk_env || !aclk_env->backoff.base)
  457. return aclk_tbeb_delay(0, 2, 0, 1024);
  458. return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
  459. }
  460. /* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled
  461. * @return 0 - Go ahead and connect (delay expired)
  462. * 1 - netdata_exit
  463. */
  464. #define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
  465. static int aclk_block_till_recon_allowed() {
  466. unsigned long recon_delay = aclk_reconnect_delay();
  467. next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
  468. last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
  469. info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
  470. // we want to wake up from time to time to check netdata_exit
  471. while (recon_delay)
  472. {
  473. if (netdata_exit)
  474. return 1;
  475. if (recon_delay > NETDATA_EXIT_POLL_MS) {
  476. sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
  477. recon_delay -= NETDATA_EXIT_POLL_MS;
  478. continue;
  479. }
  480. sleep_usec(recon_delay * USEC_PER_MS);
  481. recon_delay = 0;
  482. }
  483. return netdata_exit;
  484. }
  485. #ifndef ACLK_DISABLE_CHALLENGE
  486. /* Cloud returns transport list ordered with highest
  487. * priority first. This function selects highest prio
  488. * transport that we can actually use (support)
  489. */
  490. static int aclk_get_transport_idx(aclk_env_t *env) {
  491. for (size_t i = 0; i < env->transport_count; i++) {
  492. // currently we support only MQTT 3
  493. // therefore select first transport that matches
  494. if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) {
  495. return i;
  496. }
  497. }
  498. return -1;
  499. }
  500. #endif
  501. /* Attempts to make a connection to MQTT broker over WSS
  502. * @param client instance of mqtt_wss_client
  503. * @return 0 - Successful Connection,
  504. * <0 - Irrecoverable Error -> Kill ACLK,
  505. * >0 - netdata_exit
  506. */
  507. #define CLOUD_BASE_URL_READ_RETRY 30
  508. #ifdef ACLK_SSL_ALLOW_SELF_SIGNED
  509. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED
  510. #else
  511. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL
  512. #endif
  513. static int aclk_attempt_to_connect(mqtt_wss_client client)
  514. {
  515. int ret;
  516. url_t base_url;
  517. #ifndef ACLK_DISABLE_CHALLENGE
  518. url_t auth_url;
  519. url_t mqtt_url;
  520. #endif
  521. json_object *lwt = NULL;
  522. while (!netdata_exit) {
  523. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  524. if (cloud_base_url == NULL) {
  525. error("Do not move the cloud base url out of post_conf_load!!");
  526. return -1;
  527. }
  528. if (aclk_block_till_recon_allowed())
  529. return 1;
  530. info("Attempting connection now");
  531. memset(&base_url, 0, sizeof(url_t));
  532. if (url_parse(cloud_base_url, &base_url)) {
  533. error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
  534. sleep(CLOUD_BASE_URL_READ_RETRY);
  535. url_t_destroy(&base_url);
  536. continue;
  537. }
  538. struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT };
  539. aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
  540. struct mqtt_connect_params mqtt_conn_params = {
  541. .clientid = "anon",
  542. .username = "anon",
  543. .password = "anon",
  544. .will_topic = "lwt",
  545. .will_msg = NULL,
  546. .will_flags = MQTT_WSS_PUB_QOS2,
  547. .keep_alive = 60,
  548. .drop_on_publish_fail = 1
  549. };
  550. aclk_use_new_cloud_arch = 0;
  551. #ifndef ACLK_DISABLE_CHALLENGE
  552. if (aclk_env) {
  553. aclk_env_t_destroy(aclk_env);
  554. freez(aclk_env);
  555. }
  556. aclk_env = callocz(1, sizeof(aclk_env_t));
  557. ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
  558. url_t_destroy(&base_url);
  559. if (ret) {
  560. error("Failed to Get ACLK environment");
  561. // delay handled by aclk_block_till_recon_allowed
  562. continue;
  563. }
  564. if (netdata_exit)
  565. return 1;
  566. if (aclk_env->encoding == ACLK_ENC_PROTO) {
  567. #ifndef ENABLE_NEW_CLOUD_PROTOCOL
  568. error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!");
  569. continue;
  570. #else
  571. if (!aclk_env_has_capa("proto")) {
  572. error ("Can't encoding=proto without at least \"proto\" capability.");
  573. continue;
  574. }
  575. info("Switching ACLK to new protobuf protocol. Due to /env response.");
  576. aclk_use_new_cloud_arch = 1;
  577. #endif
  578. }
  579. memset(&auth_url, 0, sizeof(url_t));
  580. if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
  581. error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
  582. url_t_destroy(&auth_url);
  583. continue;
  584. }
  585. ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
  586. url_t_destroy(&auth_url);
  587. if (ret) {
  588. error("Error passing Challenge/Response to get OTP");
  589. continue;
  590. }
  591. // aclk_get_topic moved here as during OTP we
  592. // generate the topic cache
  593. if (aclk_use_new_cloud_arch)
  594. mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
  595. else
  596. mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
  597. if (!mqtt_conn_params.will_topic) {
  598. error("Couldn't get LWT topic. Will not send LWT.");
  599. continue;
  600. }
  601. // Do the MQTT connection
  602. ret = aclk_get_transport_idx(aclk_env);
  603. if (ret < 0) {
  604. error("Cloud /env endpoint didn't return any transport usable by this Agent.");
  605. continue;
  606. }
  607. memset(&mqtt_url, 0, sizeof(url_t));
  608. if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
  609. error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
  610. url_t_destroy(&mqtt_url);
  611. continue;
  612. }
  613. #endif
  614. aclk_session_newarch = now_realtime_usec();
  615. aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
  616. aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
  617. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  618. if (aclk_use_new_cloud_arch) {
  619. mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
  620. } else {
  621. #endif
  622. lwt = aclk_generate_disconnect(NULL);
  623. mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
  624. mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
  625. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  626. }
  627. #endif
  628. #ifdef ACLK_DISABLE_CHALLENGE
  629. ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  630. url_t_destroy(&base_url);
  631. #else
  632. ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  633. url_t_destroy(&mqtt_url);
  634. freez((char*)mqtt_conn_params.clientid);
  635. freez((char*)mqtt_conn_params.password);
  636. freez((char*)mqtt_conn_params.username);
  637. #endif
  638. if (aclk_use_new_cloud_arch)
  639. freez((char *)mqtt_conn_params.will_msg);
  640. else
  641. json_object_put(lwt);
  642. if (!ret) {
  643. last_conn_time_mqtt = now_realtime_sec();
  644. info("ACLK connection successfully established");
  645. log_access("ACLK CONNECTED");
  646. mqtt_connected_actions(client);
  647. return 0;
  648. }
  649. error_report("Connect failed");
  650. }
  651. return 1;
  652. }
  653. /**
  654. * Main agent cloud link thread
  655. *
  656. * This thread will simply call the main event loop that handles
  657. * pending requests - both inbound and outbound
  658. *
  659. * @param ptr is a pointer to the netdata_static_thread structure.
  660. *
  661. * @return It always returns NULL
  662. */
  663. void *aclk_main(void *ptr)
  664. {
  665. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  666. struct aclk_stats_thread *stats_thread = NULL;
  667. struct aclk_query_threads query_threads;
  668. query_threads.thread_list = NULL;
  669. ACLK_PROXY_TYPE proxy_type;
  670. aclk_get_proxy(&proxy_type);
  671. if (proxy_type == PROXY_TYPE_SOCKS5) {
  672. error("SOCKS5 proxy is not supported by ACLK-NG yet.");
  673. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  674. return NULL;
  675. }
  676. unsigned int proto_hdl_cnt;
  677. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  678. proto_hdl_cnt = aclk_init_rx_msg_handlers();
  679. #endif
  680. // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
  681. // as it must notify the far end that it shutdown gracefully and avoid the LWT.
  682. netdata_thread_disable_cancelability();
  683. #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
  684. info("Killing ACLK thread -> cloud functionality has been disabled");
  685. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  686. return NULL;
  687. #endif
  688. aclk_popcorn_check_bump(); // start localhost popcorn timer
  689. query_threads.count = read_query_thread_count();
  690. if (wait_till_cloud_enabled())
  691. goto exit;
  692. if (wait_till_agent_claim_ready())
  693. goto exit;
  694. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  695. if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
  696. #else
  697. if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback))) {
  698. #endif
  699. error("Couldn't initialize MQTT_WSS network library");
  700. goto exit;
  701. }
  702. // Enable MQTT buffer growth if necessary
  703. // e.g. old cloud architecture clients with huge nodes
  704. // that send JSON payloads of 10 MB as single messages
  705. mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
  706. aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
  707. if (aclk_stats_enabled) {
  708. stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
  709. stats_thread->thread = mallocz(sizeof(netdata_thread_t));
  710. stats_thread->query_thread_count = query_threads.count;
  711. aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
  712. netdata_thread_create(
  713. stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
  714. stats_thread);
  715. }
  716. // Keep reconnecting and talking until our time has come
  717. // and the Grim Reaper (netdata_exit) calls
  718. do {
  719. if (aclk_attempt_to_connect(mqttwss_client))
  720. goto exit_full;
  721. #if defined(ENABLE_ACLK) && !defined(ENABLE_NEW_CLOUD_PROTOCOL)
  722. error_report("############################ WARNING ###############################");
  723. error_report("# Your agent is configured to connect to cloud but has #");
  724. error_report("# no protobuf protocol support (uses legacy JSON protocol) #");
  725. error_report("# Legacy protocol will be deprecated soon (planned 1st March 2022) #");
  726. error_report("# Visit following link for more info and instructions how to solve #");
  727. error_report("# https://www.netdata.cloud/blog/netdata-clouds-new-architecture #");
  728. error_report("######################################################################");
  729. #endif
  730. // warning this assumes the popcorning is relative short (3s)
  731. // if that changes call mqtt_wss_service from within
  732. // to keep OpenSSL, WSS and MQTT connection alive
  733. if (wait_popcorning_finishes())
  734. goto exit_full;
  735. if (unlikely(!query_threads.thread_list))
  736. aclk_query_threads_start(&query_threads, mqttwss_client);
  737. if (!aclk_use_new_cloud_arch)
  738. queue_connect_payloads();
  739. if (handle_connection(mqttwss_client)) {
  740. aclk_stats_upd_online(0);
  741. last_disconnect_time = now_realtime_sec();
  742. aclk_connected = 0;
  743. log_access("ACLK DISCONNECTED");
  744. }
  745. } while (!netdata_exit);
  746. aclk_graceful_disconnect(mqttwss_client);
  747. exit_full:
  748. // Tear Down
  749. QUERY_THREAD_WAKEUP_ALL;
  750. aclk_query_threads_cleanup(&query_threads);
  751. if (aclk_stats_enabled) {
  752. netdata_thread_join(*stats_thread->thread, NULL);
  753. aclk_stats_thread_cleanup();
  754. freez(stats_thread->thread);
  755. freez(stats_thread);
  756. }
  757. free_topic_cache();
  758. mqtt_wss_destroy(mqttwss_client);
  759. exit:
  760. if (aclk_env) {
  761. aclk_env_t_destroy(aclk_env);
  762. freez(aclk_env);
  763. }
  764. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  765. return NULL;
  766. }
  767. // TODO this is taken over as workaround from old ACLK
  768. // fix this in both old and new ACLK
  769. extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
  770. void aclk_alarm_reload(void)
  771. {
  772. ACLK_SHARED_STATE_LOCK;
  773. if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
  774. ACLK_SHARED_STATE_UNLOCK;
  775. return;
  776. }
  777. ACLK_SHARED_STATE_UNLOCK;
  778. aclk_queue_query(aclk_query_new(METADATA_ALARMS));
  779. }
  780. int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
  781. {
  782. BUFFER *local_buffer;
  783. json_object *msg;
  784. if (host != localhost)
  785. return 0;
  786. ACLK_SHARED_STATE_LOCK;
  787. if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
  788. ACLK_SHARED_STATE_UNLOCK;
  789. return 0;
  790. }
  791. ACLK_SHARED_STATE_UNLOCK;
  792. local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  793. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  794. health_alarm_entry2json_nolock(local_buffer, ae, host);
  795. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  796. msg = json_tokener_parse(local_buffer->buffer);
  797. struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE);
  798. query->data.alarm_update = msg;
  799. aclk_queue_query(query);
  800. buffer_free(local_buffer);
  801. return 0;
  802. }
  803. int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
  804. {
  805. struct aclk_query *query;
  806. if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check())
  807. return 0;
  808. query = aclk_query_new(create ? CHART_NEW : CHART_DEL);
  809. if(create) {
  810. query->data.chart_add_del.host = host;
  811. query->data.chart_add_del.chart_name = strdupz(chart_name);
  812. } else {
  813. query->data.metadata_info.host = host;
  814. query->data.metadata_info.initial_on_connect = 0;
  815. }
  816. aclk_queue_query(query);
  817. return 0;
  818. }
  819. /*
  820. * Add a new collector to the list
  821. * If it exists, update the chart count
  822. */
  823. void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
  824. {
  825. struct aclk_query *query;
  826. struct _collector *tmp_collector;
  827. if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
  828. return;
  829. }
  830. COLLECTOR_LOCK;
  831. tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name);
  832. if (unlikely(tmp_collector->count != 1)) {
  833. COLLECTOR_UNLOCK;
  834. return;
  835. }
  836. COLLECTOR_UNLOCK;
  837. if (aclk_popcorn_check_bump())
  838. return;
  839. if (host != localhost)
  840. return;
  841. query = aclk_query_new(METADATA_INFO);
  842. query->data.metadata_info.host = localhost; //TODO
  843. query->data.metadata_info.initial_on_connect = 0;
  844. aclk_queue_query(query);
  845. query = aclk_query_new(METADATA_ALARMS);
  846. query->data.metadata_alarms.initial_on_connect = 0;
  847. aclk_queue_query(query);
  848. }
  849. /*
  850. * Delete a collector from the list
  851. * If the chart count reaches zero the collector will be removed
  852. * from the list by calling del_collector.
  853. *
  854. * This function will release the memory used and schedule
  855. * a cloud update
  856. */
  857. void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
  858. {
  859. struct aclk_query *query;
  860. struct _collector *tmp_collector;
  861. if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
  862. return;
  863. }
  864. COLLECTOR_LOCK;
  865. tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name);
  866. if (unlikely(!tmp_collector || tmp_collector->count)) {
  867. COLLECTOR_UNLOCK;
  868. return;
  869. }
  870. debug(
  871. D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
  872. tmp_collector->count);
  873. COLLECTOR_UNLOCK;
  874. _free_collector(tmp_collector);
  875. if (aclk_popcorn_check_bump())
  876. return;
  877. if (host != localhost)
  878. return;
  879. query = aclk_query_new(METADATA_INFO);
  880. query->data.metadata_info.host = localhost; //TODO
  881. query->data.metadata_info.initial_on_connect = 0;
  882. aclk_queue_query(query);
  883. query = aclk_query_new(METADATA_ALARMS);
  884. query->data.metadata_alarms.initial_on_connect = 0;
  885. aclk_queue_query(query);
  886. }
  887. void aclk_host_state_update(RRDHOST *host, int cmd)
  888. {
  889. uuid_t node_id;
  890. int ret;
  891. if (!aclk_connected || !aclk_use_new_cloud_arch)
  892. return;
  893. ret = get_node_id(&host->host_uuid, &node_id);
  894. if (ret > 0) {
  895. // this means we were not able to check if node_id already present
  896. error("Unable to check for node_id. Ignoring the host state update.");
  897. return;
  898. }
  899. if (ret < 0) {
  900. // node_id not found
  901. aclk_query_t create_query;
  902. create_query = aclk_query_new(REGISTER_NODE);
  903. rrdhost_aclk_state_lock(localhost);
  904. create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
  905. rrdhost_aclk_state_unlock(localhost);
  906. create_query->data.node_creation.hops = (uint32_t) host->system_info->hops;
  907. create_query->data.node_creation.hostname = strdupz(host->hostname);
  908. create_query->data.node_creation.machine_guid = strdupz(host->machine_guid);
  909. info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
  910. aclk_queue_query(create_query);
  911. return;
  912. }
  913. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  914. query->data.node_update.hops = (uint32_t) host->system_info->hops;
  915. rrdhost_aclk_state_lock(localhost);
  916. query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
  917. rrdhost_aclk_state_unlock(localhost);
  918. query->data.node_update.live = cmd;
  919. query->data.node_update.node_id = mallocz(UUID_STR_LEN);
  920. uuid_unparse_lower(node_id, (char*)query->data.node_update.node_id);
  921. query->data.node_update.queryable = 1;
  922. query->data.node_update.session_id = aclk_session_newarch;
  923. info("Queuing status update for node=%s, live=%d, hops=%u",(char*)query->data.node_update.node_id, cmd,
  924. host->system_info->hops);
  925. aclk_queue_query(query);
  926. }
  927. void aclk_send_node_instances()
  928. {
  929. struct node_instance_list *list_head = get_node_list();
  930. struct node_instance_list *list = list_head;
  931. if (unlikely(!list)) {
  932. error_report("Failure to get_node_list from DB!");
  933. return;
  934. }
  935. while (!uuid_is_null(list->host_id)) {
  936. if (!uuid_is_null(list->node_id)) {
  937. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  938. rrdhost_aclk_state_lock(localhost);
  939. query->data.node_update.claim_id = strdupz(localhost->aclk_state.claimed_id);
  940. rrdhost_aclk_state_unlock(localhost);
  941. query->data.node_update.live = list->live;
  942. query->data.node_update.hops = list->hops;
  943. query->data.node_update.node_id = mallocz(UUID_STR_LEN);
  944. uuid_unparse_lower(list->node_id, (char*)query->data.node_update.node_id);
  945. query->data.node_update.queryable = 1;
  946. query->data.node_update.session_id = aclk_session_newarch;
  947. freez(list->hostname);
  948. info("Queuing status update for node=%s, live=%d, hops=%d",(char*)query->data.node_update.node_id,
  949. list->live,
  950. list->hops);
  951. aclk_queue_query(query);
  952. } else {
  953. aclk_query_t create_query;
  954. create_query = aclk_query_new(REGISTER_NODE);
  955. rrdhost_aclk_state_lock(localhost);
  956. create_query->data.node_creation.claim_id = strdupz(localhost->aclk_state.claimed_id);
  957. rrdhost_aclk_state_unlock(localhost);
  958. create_query->data.node_creation.hops = list->hops;
  959. create_query->data.node_creation.hostname = list->hostname;
  960. create_query->data.node_creation.machine_guid = mallocz(UUID_STR_LEN);
  961. uuid_unparse_lower(list->host_id, (char*)create_query->data.node_creation.machine_guid);
  962. info("Queuing registration for host=%s, hops=%d",(char*)create_query->data.node_creation.machine_guid,
  963. list->hops);
  964. aclk_queue_query(create_query);
  965. }
  966. list++;
  967. }
  968. freez(list_head);
  969. }
  970. void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
  971. {
  972. aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
  973. }
  974. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  975. static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
  976. {
  977. struct proto_alert_status status;
  978. memset(&status, 0, sizeof(status));
  979. if (get_proto_alert_status(host, &status)) {
  980. buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
  981. return;
  982. }
  983. buffer_sprintf(wb,
  984. "\n\t\tUpdates: %d"
  985. "\n\t\tBatch ID: %"PRIu64
  986. "\n\t\tLast Acked Seq ID: %"PRIu64
  987. "\n\t\tPending Min Seq ID: %"PRIu64
  988. "\n\t\tPending Max Seq ID: %"PRIu64
  989. "\n\t\tLast Submitted Seq ID: %"PRIu64,
  990. status.alert_updates,
  991. status.alerts_batch_id,
  992. status.last_acked_sequence_id,
  993. status.pending_min_sequence_id,
  994. status.pending_max_sequence_id,
  995. status.last_submitted_sequence_id
  996. );
  997. }
  998. static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host)
  999. {
  1000. struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
  1001. if (!stats) {
  1002. buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host");
  1003. return;
  1004. }
  1005. buffer_sprintf(wb,
  1006. "\n\t\tUpdates: %d"
  1007. "\n\t\tBatch ID: %"PRIu64
  1008. "\n\t\tMin Seq ID: %"PRIu64
  1009. "\n\t\tMax Seq ID: %"PRIu64
  1010. "\n\t\tPending Min Seq ID: %"PRIu64
  1011. "\n\t\tPending Max Seq ID: %"PRIu64
  1012. "\n\t\tSent Min Seq ID: %"PRIu64
  1013. "\n\t\tSent Max Seq ID: %"PRIu64
  1014. "\n\t\tAcked Min Seq ID: %"PRIu64
  1015. "\n\t\tAcked Max Seq ID: %"PRIu64,
  1016. stats->updates,
  1017. stats->batch_id,
  1018. stats->min_seqid,
  1019. stats->max_seqid,
  1020. stats->min_seqid_pend,
  1021. stats->max_seqid_pend,
  1022. stats->min_seqid_sent,
  1023. stats->max_seqid_sent,
  1024. stats->min_seqid_ack,
  1025. stats->max_seqid_ack
  1026. );
  1027. freez(stats);
  1028. }
  1029. #endif
  1030. char *ng_aclk_state(void)
  1031. {
  1032. BUFFER *wb = buffer_create(1024);
  1033. struct tm *tmptr, tmbuf;
  1034. char *ret;
  1035. buffer_strcat(wb,
  1036. "ACLK Available: Yes\n"
  1037. "ACLK Version: 2\n"
  1038. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  1039. "Protocols Supported: Legacy, Protobuf\n"
  1040. #else
  1041. "Protocols Supported: Legacy\n"
  1042. #endif
  1043. );
  1044. buffer_sprintf(wb, "Protocol Used: %s\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
  1045. char *agent_id = is_agent_claimed();
  1046. if (agent_id == NULL)
  1047. buffer_strcat(wb, "No\n");
  1048. else {
  1049. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  1050. buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
  1051. freez(agent_id);
  1052. }
  1053. buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
  1054. if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
  1055. char timebuf[26];
  1056. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  1057. buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
  1058. }
  1059. if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
  1060. char timebuf[26];
  1061. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  1062. buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
  1063. }
  1064. if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
  1065. char timebuf[26];
  1066. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  1067. buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
  1068. }
  1069. if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
  1070. char timebuf[26];
  1071. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  1072. buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
  1073. }
  1074. if (aclk_connected) {
  1075. buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
  1076. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  1077. RRDHOST *host;
  1078. rrd_rdlock();
  1079. rrdhost_foreach_read(host) {
  1080. buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname);
  1081. buffer_strcat(wb, "\tClaimed ID: ");
  1082. rrdhost_aclk_state_lock(host);
  1083. if (host->aclk_state.claimed_id)
  1084. buffer_strcat(wb, host->aclk_state.claimed_id);
  1085. else
  1086. buffer_strcat(wb, "null");
  1087. rrdhost_aclk_state_unlock(host);
  1088. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  1089. buffer_strcat(wb, "\n\tNode ID: null\n");
  1090. } else {
  1091. char node_id[GUID_LEN + 1];
  1092. uuid_unparse_lower(*host->node_id, node_id);
  1093. buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
  1094. }
  1095. buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
  1096. if (host != localhost)
  1097. buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false");
  1098. buffer_strcat(wb, "\n\tAlert Streaming Status:");
  1099. fill_alert_status_for_host(wb, host);
  1100. buffer_strcat(wb, "\n\tChart Streaming Status:");
  1101. fill_chart_status_for_host(wb, host);
  1102. }
  1103. rrd_unlock();
  1104. #endif
  1105. }
  1106. ret = strdupz(buffer_tostring(wb));
  1107. buffer_free(wb);
  1108. return ret;
  1109. }
  1110. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  1111. static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
  1112. {
  1113. struct proto_alert_status status;
  1114. memset(&status, 0, sizeof(status));
  1115. if (get_proto_alert_status(host, &status))
  1116. return;
  1117. json_object *tmp = json_object_new_int(status.alert_updates);
  1118. json_object_object_add(obj, "updates", tmp);
  1119. tmp = json_object_new_int(status.alerts_batch_id);
  1120. json_object_object_add(obj, "batch-id", tmp);
  1121. tmp = json_object_new_int(status.last_acked_sequence_id);
  1122. json_object_object_add(obj, "last-acked-seq-id", tmp);
  1123. tmp = json_object_new_int(status.pending_min_sequence_id);
  1124. json_object_object_add(obj, "pending-min-seq-id", tmp);
  1125. tmp = json_object_new_int(status.pending_max_sequence_id);
  1126. json_object_object_add(obj, "pending-max-seq-id", tmp);
  1127. tmp = json_object_new_int(status.last_submitted_sequence_id);
  1128. json_object_object_add(obj, "last-submitted-seq-id", tmp);
  1129. }
  1130. static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host)
  1131. {
  1132. struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
  1133. if (!stats)
  1134. return;
  1135. json_object *tmp = json_object_new_int(stats->updates);
  1136. json_object_object_add(obj, "updates", tmp);
  1137. tmp = json_object_new_int(stats->batch_id);
  1138. json_object_object_add(obj, "batch-id", tmp);
  1139. tmp = json_object_new_int(stats->min_seqid);
  1140. json_object_object_add(obj, "min-seq-id", tmp);
  1141. tmp = json_object_new_int(stats->max_seqid);
  1142. json_object_object_add(obj, "max-seq-id", tmp);
  1143. tmp = json_object_new_int(stats->min_seqid_pend);
  1144. json_object_object_add(obj, "pending-min-seq-id", tmp);
  1145. tmp = json_object_new_int(stats->max_seqid_pend);
  1146. json_object_object_add(obj, "pending-max-seq-id", tmp);
  1147. tmp = json_object_new_int(stats->min_seqid_sent);
  1148. json_object_object_add(obj, "sent-min-seq-id", tmp);
  1149. tmp = json_object_new_int(stats->max_seqid_sent);
  1150. json_object_object_add(obj, "sent-max-seq-id", tmp);
  1151. tmp = json_object_new_int(stats->min_seqid_ack);
  1152. json_object_object_add(obj, "acked-min-seq-id", tmp);
  1153. tmp = json_object_new_int(stats->max_seqid_ack);
  1154. json_object_object_add(obj, "acked-max-seq-id", tmp);
  1155. freez(stats);
  1156. }
  1157. #endif
  1158. static json_object *timestamp_to_json(const time_t *t)
  1159. {
  1160. struct tm *tmptr, tmbuf;
  1161. if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
  1162. char timebuf[26];
  1163. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  1164. return json_object_new_string(timebuf);
  1165. }
  1166. return NULL;
  1167. }
  1168. char *ng_aclk_state_json(void)
  1169. {
  1170. json_object *tmp, *grp, *msg = json_object_new_object();
  1171. tmp = json_object_new_boolean(1);
  1172. json_object_object_add(msg, "aclk-available", tmp);
  1173. tmp = json_object_new_int(2);
  1174. json_object_object_add(msg, "aclk-version", tmp);
  1175. grp = json_object_new_array();
  1176. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  1177. tmp = json_object_new_string("Legacy");
  1178. json_object_array_add(grp, tmp);
  1179. tmp = json_object_new_string("Protobuf");
  1180. json_object_array_add(grp, tmp);
  1181. #else
  1182. tmp = json_object_new_string("Legacy");
  1183. json_object_array_add(grp, tmp);
  1184. #endif
  1185. json_object_object_add(msg, "protocols-supported", grp);
  1186. char *agent_id = is_agent_claimed();
  1187. tmp = json_object_new_boolean(agent_id != NULL);
  1188. json_object_object_add(msg, "agent-claimed", tmp);
  1189. if (agent_id) {
  1190. tmp = json_object_new_string(agent_id);
  1191. freez(agent_id);
  1192. } else
  1193. tmp = NULL;
  1194. json_object_object_add(msg, "claimed-id", tmp);
  1195. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  1196. tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
  1197. json_object_object_add(msg, "cloud-url", tmp);
  1198. tmp = json_object_new_boolean(aclk_connected);
  1199. json_object_object_add(msg, "online", tmp);
  1200. tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
  1201. json_object_object_add(msg, "used-cloud-protocol", tmp);
  1202. tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
  1203. json_object_object_add(msg, "received-app-layer-msgs", tmp);
  1204. tmp = json_object_new_int(aclk_pubacks_per_conn);
  1205. json_object_object_add(msg, "received-mqtt-pubacks", tmp);
  1206. tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
  1207. json_object_object_add(msg, "reconnect-count", tmp);
  1208. json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
  1209. json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
  1210. json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
  1211. json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL);
  1212. tmp = NULL;
  1213. if (!aclk_connected && last_backoff_value)
  1214. tmp = json_object_new_double(last_backoff_value);
  1215. json_object_object_add(msg, "last-backoff-value", tmp);
  1216. tmp = json_object_new_boolean(aclk_disable_runtime);
  1217. json_object_object_add(msg, "banned-by-cloud", tmp);
  1218. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  1219. grp = json_object_new_array();
  1220. RRDHOST *host;
  1221. rrd_rdlock();
  1222. rrdhost_foreach_read(host) {
  1223. json_object *nodeinstance = json_object_new_object();
  1224. tmp = json_object_new_string(host->hostname);
  1225. json_object_object_add(nodeinstance, "hostname", tmp);
  1226. tmp = json_object_new_string(host->machine_guid);
  1227. json_object_object_add(nodeinstance, "mguid", tmp);
  1228. rrdhost_aclk_state_lock(host);
  1229. if (host->aclk_state.claimed_id) {
  1230. tmp = json_object_new_string(host->aclk_state.claimed_id);
  1231. json_object_object_add(nodeinstance, "claimed_id", tmp);
  1232. } else
  1233. json_object_object_add(nodeinstance, "claimed_id", NULL);
  1234. rrdhost_aclk_state_unlock(host);
  1235. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  1236. json_object_object_add(nodeinstance, "node-id", NULL);
  1237. } else {
  1238. char node_id[GUID_LEN + 1];
  1239. uuid_unparse_lower(*host->node_id, node_id);
  1240. tmp = json_object_new_string(node_id);
  1241. json_object_object_add(nodeinstance, "node-id", tmp);
  1242. }
  1243. tmp = json_object_new_int(host->system_info->hops);
  1244. json_object_object_add(nodeinstance, "streaming-hops", tmp);
  1245. tmp = json_object_new_string(host == localhost ? "self" : "child");
  1246. json_object_object_add(nodeinstance, "relationship", tmp);
  1247. tmp = json_object_new_boolean((host->receiver || host == localhost));
  1248. json_object_object_add(nodeinstance, "streaming-online", tmp);
  1249. tmp = json_object_new_object();
  1250. fill_alert_status_for_host_json(tmp, host);
  1251. json_object_object_add(nodeinstance, "alert-sync-status", tmp);
  1252. tmp = json_object_new_object();
  1253. fill_chart_status_for_host_json(tmp, host);
  1254. json_object_object_add(nodeinstance, "chart-sync-status", tmp);
  1255. json_object_array_add(grp, nodeinstance);
  1256. }
  1257. rrd_unlock();
  1258. json_object_object_add(msg, "node-instances", grp);
  1259. #endif
  1260. char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
  1261. json_object_put(msg);
  1262. return str;
  1263. }