aclk.c 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk.h"
  3. #include "aclk_stats.h"
  4. #include "mqtt_wss_client.h"
  5. #include "aclk_otp.h"
  6. #include "aclk_tx_msgs.h"
  7. #include "aclk_query.h"
  8. #include "aclk_query_queue.h"
  9. #include "aclk_util.h"
  10. #include "aclk_rx_msgs.h"
  11. #include "aclk_collector_list.h"
  12. #include "https_client.h"
  13. #include "schema-wrappers/schema_wrappers.h"
  14. #include "aclk_proxy.h"
  15. #ifdef ACLK_LOG_CONVERSATION_DIR
  16. #include <sys/types.h>
  17. #include <sys/stat.h>
  18. #include <fcntl.h>
  19. #endif
  20. #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
  21. int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
  22. int aclk_rcvd_cloud_msgs = 0;
  23. int aclk_connection_counter = 0;
  24. int disconnect_req = 0;
  25. time_t last_conn_time_mqtt = 0;
  26. time_t last_conn_time_appl = 0;
  27. time_t last_disconnect_time = 0;
  28. time_t next_connection_attempt = 0;
  29. float last_backoff_value = 0;
  30. int aclk_alert_reloaded = 0; //1 on health log exchange, and again on health_reload
  31. time_t aclk_block_until = 0;
  32. mqtt_wss_client mqttwss_client;
  33. netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
  34. #define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
  35. #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
  36. struct aclk_shared_state aclk_shared_state = {
  37. .agent_state = ACLK_HOST_INITIALIZING,
  38. .last_popcorn_interrupt = 0,
  39. .mqtt_shutdown_msg_id = -1,
  40. .mqtt_shutdown_msg_rcvd = 0
  41. };
  42. static RSA *aclk_private_key = NULL;
  43. static int load_private_key()
  44. {
  45. if (aclk_private_key != NULL)
  46. RSA_free(aclk_private_key);
  47. aclk_private_key = NULL;
  48. char filename[FILENAME_MAX + 1];
  49. snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
  50. long bytes_read;
  51. char *private_key = read_by_filename(filename, &bytes_read);
  52. if (!private_key) {
  53. error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  54. return 1;
  55. }
  56. debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  57. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  58. if (key_bio==NULL) {
  59. error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  60. goto biofailed;
  61. }
  62. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  63. BIO_free(key_bio);
  64. if (aclk_private_key!=NULL)
  65. {
  66. freez(private_key);
  67. return 0;
  68. }
  69. char err[512];
  70. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  71. error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  72. biofailed:
  73. freez(private_key);
  74. return 1;
  75. }
  76. static int wait_till_cloud_enabled()
  77. {
  78. info("Waiting for Cloud to be enabled");
  79. while (!netdata_cloud_setting) {
  80. sleep_usec(USEC_PER_SEC * 1);
  81. if (netdata_exit)
  82. return 1;
  83. }
  84. return 0;
  85. }
  86. /**
  87. * Will block until agent is claimed. Returns only if agent claimed
  88. * or if agent needs to shutdown.
  89. *
  90. * @return `0` if agent has been claimed,
  91. * `1` if interrupted due to agent shutting down
  92. */
  93. static int wait_till_agent_claimed(void)
  94. {
  95. //TODO prevent malloc and freez
  96. char *agent_id = is_agent_claimed();
  97. while (likely(!agent_id)) {
  98. sleep_usec(USEC_PER_SEC * 1);
  99. if (netdata_exit)
  100. return 1;
  101. agent_id = is_agent_claimed();
  102. }
  103. freez(agent_id);
  104. return 0;
  105. }
  106. /**
  107. * Checks everything is ready for connection
  108. * agent claimed, cloud url set and private key available
  109. *
  110. * @param aclk_hostname points to location where string pointer to hostname will be set
  111. * @param aclk_port port to int where port will be saved
  112. *
  113. * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated
  114. */
  115. static int wait_till_agent_claim_ready()
  116. {
  117. url_t url;
  118. while (!netdata_exit) {
  119. if (wait_till_agent_claimed())
  120. return 1;
  121. // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
  122. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
  123. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  124. if (cloud_base_url == NULL) {
  125. error("Do not move the cloud base url out of post_conf_load!!");
  126. return 1;
  127. }
  128. // We just check configuration is valid here
  129. // TODO make it without malloc/free
  130. memset(&url, 0, sizeof(url_t));
  131. if (url_parse(cloud_base_url, &url)) {
  132. error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
  133. url_t_destroy(&url);
  134. sleep(5);
  135. continue;
  136. }
  137. url_t_destroy(&url);
  138. if (!load_private_key())
  139. return 0;
  140. sleep(5);
  141. }
  142. return 1;
  143. }
  144. void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
  145. {
  146. switch(log_type) {
  147. case MQTT_WSS_LOG_ERROR:
  148. case MQTT_WSS_LOG_FATAL:
  149. case MQTT_WSS_LOG_WARN:
  150. error_report("%s", str);
  151. return;
  152. case MQTT_WSS_LOG_INFO:
  153. info("%s", str);
  154. return;
  155. case MQTT_WSS_LOG_DEBUG:
  156. debug(D_ACLK, "%s", str);
  157. return;
  158. default:
  159. error("Unknown log type from mqtt_wss");
  160. }
  161. }
  162. //TODO prevent big buffer on stack
  163. #define RX_MSGLEN_MAX 4096
  164. static void msg_callback_old_protocol(const char *topic, const void *msg, size_t msglen, int qos)
  165. {
  166. UNUSED(qos);
  167. char cmsg[RX_MSGLEN_MAX];
  168. size_t len = (msglen < RX_MSGLEN_MAX - 1) ? msglen : (RX_MSGLEN_MAX - 1);
  169. const char *cmd_topic = aclk_get_topic(ACLK_TOPICID_COMMAND);
  170. if (!cmd_topic) {
  171. error("Error retrieving command topic");
  172. return;
  173. }
  174. if (msglen > RX_MSGLEN_MAX - 1)
  175. error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
  176. memcpy(cmsg,
  177. msg,
  178. len);
  179. cmsg[len] = 0;
  180. #ifdef ACLK_LOG_CONVERSATION_DIR
  181. #define FN_MAX_LEN 512
  182. char filename[FN_MAX_LEN];
  183. int logfd;
  184. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx.json", ACLK_GET_CONV_LOG_NEXT());
  185. logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
  186. if(logfd < 0)
  187. error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
  188. write(logfd, msg, msglen);
  189. close(logfd);
  190. #endif
  191. debug(D_ACLK, "Got Message From Broker Topic \"%s\" QoS %d MSG: \"%s\"", topic, qos, cmsg);
  192. if (strcmp(cmd_topic, topic))
  193. error("Received message on unexpected topic %s", topic);
  194. if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
  195. error("Link is shutting down. Ignoring incoming message.");
  196. return;
  197. }
  198. aclk_handle_cloud_cmd_message(cmsg);
  199. }
  200. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  201. static void msg_callback_new_protocol(const char *topic, const void *msg, size_t msglen, int qos)
  202. {
  203. UNUSED(qos);
  204. if (msglen > RX_MSGLEN_MAX)
  205. error("Incoming ACLK message was bigger than MAX of %d and got truncated.", RX_MSGLEN_MAX);
  206. debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
  207. if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
  208. error("Link is shutting down. Ignoring incoming message.");
  209. return;
  210. }
  211. const char *msgtype = strrchr(topic, '/');
  212. if (unlikely(!msgtype)) {
  213. error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
  214. return;
  215. }
  216. msgtype++;
  217. if (unlikely(!*msgtype)) {
  218. error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
  219. return;
  220. }
  221. #ifdef ACLK_LOG_CONVERSATION_DIR
  222. #define FN_MAX_LEN 512
  223. char filename[FN_MAX_LEN];
  224. int logfd;
  225. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
  226. logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
  227. if(logfd < 0)
  228. error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
  229. write(logfd, msg, msglen);
  230. close(logfd);
  231. #endif
  232. aclk_handle_new_cloud_msg(msgtype, msg, msglen);
  233. }
  234. static inline void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) {
  235. aclk_rcvd_cloud_msgs++;
  236. if (aclk_use_new_cloud_arch)
  237. msg_callback_new_protocol(topic, msg, msglen, qos);
  238. else
  239. msg_callback_old_protocol(topic, msg, msglen, qos);
  240. }
  241. #endif /* ENABLE_NEW_CLOUD_PROTOCOL */
  242. static void puback_callback(uint16_t packet_id)
  243. {
  244. if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
  245. last_conn_time_appl = now_realtime_sec();
  246. aclk_tbeb_reset();
  247. }
  248. #ifdef NETDATA_INTERNAL_CHECKS
  249. aclk_stats_msg_puback(packet_id);
  250. #endif
  251. if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
  252. info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
  253. aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
  254. }
  255. }
  256. static int read_query_thread_count()
  257. {
  258. int threads = MIN(processors/2, 6);
  259. threads = MAX(threads, 2);
  260. threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  261. if(threads < 1) {
  262. error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
  263. threads = 1;
  264. config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  265. }
  266. return threads;
  267. }
  268. void aclk_graceful_disconnect(mqtt_wss_client client);
  269. /* Keeps connection alive and handles all network communications.
  270. * Returns on error or when netdata is shutting down.
  271. * @param client instance of mqtt_wss_client
  272. * @returns 0 - Netdata Exits
  273. * >0 - Error happened. Reconnect and start over.
  274. */
  275. static int handle_connection(mqtt_wss_client client)
  276. {
  277. time_t last_periodic_query_wakeup = now_monotonic_sec();
  278. while (!netdata_exit) {
  279. // timeout 1000 to check at least once a second
  280. // for netdata_exit
  281. if (mqtt_wss_service(client, 1000) < 0){
  282. error_report("Connection Error or Dropped");
  283. return 1;
  284. }
  285. if (disconnect_req || aclk_kill_link) {
  286. info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
  287. disconnect_req ? "true" : "false",
  288. aclk_kill_link ? "true" : "false");
  289. disconnect_req = 0;
  290. aclk_kill_link = 0;
  291. aclk_graceful_disconnect(client);
  292. aclk_queue_unlock();
  293. aclk_shared_state.mqtt_shutdown_msg_id = -1;
  294. aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
  295. return 1;
  296. }
  297. // mqtt_wss_service will return faster than in one second
  298. // if there is enough work to do
  299. time_t now = now_monotonic_sec();
  300. if (last_periodic_query_wakeup < now) {
  301. // wake up at least one Query Thread at least
  302. // once per second
  303. last_periodic_query_wakeup = now;
  304. QUERY_THREAD_WAKEUP;
  305. }
  306. }
  307. return 0;
  308. }
  309. inline static int aclk_popcorn_check()
  310. {
  311. ACLK_SHARED_STATE_LOCK;
  312. if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
  313. ACLK_SHARED_STATE_UNLOCK;
  314. return 1;
  315. }
  316. ACLK_SHARED_STATE_UNLOCK;
  317. return 0;
  318. }
  319. inline static int aclk_popcorn_check_bump()
  320. {
  321. ACLK_SHARED_STATE_LOCK;
  322. if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
  323. aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
  324. ACLK_SHARED_STATE_UNLOCK;
  325. return 1;
  326. }
  327. ACLK_SHARED_STATE_UNLOCK;
  328. return 0;
  329. }
  330. static inline void queue_connect_payloads(void)
  331. {
  332. aclk_query_t query = aclk_query_new(METADATA_INFO);
  333. query->data.metadata_info.host = localhost;
  334. query->data.metadata_info.initial_on_connect = 1;
  335. aclk_queue_query(query);
  336. query = aclk_query_new(METADATA_ALARMS);
  337. query->data.metadata_alarms.initial_on_connect = 1;
  338. aclk_queue_query(query);
  339. }
  340. static inline void mqtt_connected_actions(mqtt_wss_client client)
  341. {
  342. char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
  343. if (!topic)
  344. error("Unable to fetch topic for COMMAND (to subscribe)");
  345. else
  346. mqtt_wss_subscribe(client, topic, 1);
  347. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  348. if (aclk_use_new_cloud_arch) {
  349. topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
  350. if (!topic)
  351. error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
  352. else
  353. mqtt_wss_subscribe(client, topic, 1);
  354. }
  355. #endif
  356. aclk_stats_upd_online(1);
  357. aclk_connected = 1;
  358. aclk_pubacks_per_conn = 0;
  359. aclk_rcvd_cloud_msgs = 0;
  360. aclk_connection_counter++;
  361. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  362. if (!aclk_use_new_cloud_arch) {
  363. #endif
  364. ACLK_SHARED_STATE_LOCK;
  365. if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
  366. error("Sending `connect` payload immediately as popcorning was finished already.");
  367. queue_connect_payloads();
  368. }
  369. ACLK_SHARED_STATE_UNLOCK;
  370. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  371. } else {
  372. aclk_send_agent_connection_update(client, 1);
  373. }
  374. #endif
  375. }
  376. /* Waits until agent is ready or needs to exit
  377. * @param client instance of mqtt_wss_client
  378. * @param query_threads pointer to aclk_query_threads
  379. * structure where to store data about started query threads
  380. * @return 0 - Popcorning Finished - Agent STABLE,
  381. * !0 - netdata_exit
  382. */
  383. static int wait_popcorning_finishes()
  384. {
  385. time_t elapsed;
  386. int need_wait;
  387. if (aclk_use_new_cloud_arch)
  388. return 0;
  389. while (!netdata_exit) {
  390. ACLK_SHARED_STATE_LOCK;
  391. if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) {
  392. ACLK_SHARED_STATE_UNLOCK;
  393. return 0;
  394. }
  395. elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
  396. if (elapsed >= ACLK_STABLE_TIMEOUT) {
  397. aclk_shared_state.agent_state = ACLK_HOST_STABLE;
  398. ACLK_SHARED_STATE_UNLOCK;
  399. error("ACLK localhost popcorn timer finished");
  400. return 0;
  401. }
  402. ACLK_SHARED_STATE_UNLOCK;
  403. need_wait = ACLK_STABLE_TIMEOUT - elapsed;
  404. error("ACLK localhost popcorn timer - wait %d seconds longer", need_wait);
  405. sleep(need_wait);
  406. }
  407. return 1;
  408. }
  409. void aclk_graceful_disconnect(mqtt_wss_client client)
  410. {
  411. info("Preparing to gracefully shutdown ACLK connection");
  412. aclk_queue_lock();
  413. aclk_queue_flush();
  414. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  415. if (aclk_use_new_cloud_arch)
  416. aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
  417. else
  418. #endif
  419. aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_app_layer_disconnect(client, "graceful");
  420. time_t t = now_monotonic_sec();
  421. while (!mqtt_wss_service(client, 100)) {
  422. if (now_monotonic_sec() - t >= 2) {
  423. error("Wasn't able to gracefully shutdown ACLK in time!");
  424. break;
  425. }
  426. if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
  427. info("MQTT App Layer `disconnect` message sent successfully");
  428. break;
  429. }
  430. }
  431. info("ACLK link is down");
  432. log_access("ACLK DISCONNECTED");
  433. aclk_stats_upd_online(0);
  434. last_disconnect_time = now_realtime_sec();
  435. aclk_connected = 0;
  436. info("Attempting to gracefully shutdown the MQTT/WSS connection");
  437. mqtt_wss_disconnect(client, 1000);
  438. }
  439. static unsigned long aclk_reconnect_delay() {
  440. unsigned long recon_delay;
  441. time_t now;
  442. if (aclk_disable_runtime) {
  443. aclk_tbeb_reset();
  444. return 60 * MSEC_PER_SEC;
  445. }
  446. now = now_monotonic_sec();
  447. if (aclk_block_until) {
  448. if (now < aclk_block_until) {
  449. recon_delay = aclk_block_until - now;
  450. recon_delay *= MSEC_PER_SEC;
  451. aclk_block_until = 0;
  452. aclk_tbeb_reset();
  453. return recon_delay;
  454. }
  455. aclk_block_until = 0;
  456. }
  457. if (!aclk_env || !aclk_env->backoff.base)
  458. return aclk_tbeb_delay(0, 2, 0, 1024);
  459. return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
  460. }
  461. /* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled
  462. * @return 0 - Go ahead and connect (delay expired)
  463. * 1 - netdata_exit
  464. */
  465. #define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
  466. static int aclk_block_till_recon_allowed() {
  467. unsigned long recon_delay = aclk_reconnect_delay();
  468. next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
  469. last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
  470. info("Wait before attempting to reconnect in %.3f seconds\n", recon_delay / (float)MSEC_PER_SEC);
  471. // we want to wake up from time to time to check netdata_exit
  472. while (recon_delay)
  473. {
  474. if (netdata_exit)
  475. return 1;
  476. if (recon_delay > NETDATA_EXIT_POLL_MS) {
  477. sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
  478. recon_delay -= NETDATA_EXIT_POLL_MS;
  479. continue;
  480. }
  481. sleep_usec(recon_delay * USEC_PER_MS);
  482. recon_delay = 0;
  483. }
  484. return netdata_exit;
  485. }
  486. #ifndef ACLK_DISABLE_CHALLENGE
  487. /* Cloud returns transport list ordered with highest
  488. * priority first. This function selects highest prio
  489. * transport that we can actually use (support)
  490. */
  491. static int aclk_get_transport_idx(aclk_env_t *env) {
  492. for (size_t i = 0; i < env->transport_count; i++) {
  493. // currently we support only MQTT 3
  494. // therefore select first transport that matches
  495. if (env->transports[i]->type == ACLK_TRP_MQTT_3_1_1) {
  496. return i;
  497. }
  498. }
  499. return -1;
  500. }
  501. #endif
  502. /* Attempts to make a connection to MQTT broker over WSS
  503. * @param client instance of mqtt_wss_client
  504. * @return 0 - Successful Connection,
  505. * <0 - Irrecoverable Error -> Kill ACLK,
  506. * >0 - netdata_exit
  507. */
  508. #define CLOUD_BASE_URL_READ_RETRY 30
  509. #ifdef ACLK_SSL_ALLOW_SELF_SIGNED
  510. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED
  511. #else
  512. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL
  513. #endif
  514. static int aclk_attempt_to_connect(mqtt_wss_client client)
  515. {
  516. int ret;
  517. url_t base_url;
  518. #ifndef ACLK_DISABLE_CHALLENGE
  519. url_t auth_url;
  520. url_t mqtt_url;
  521. #endif
  522. json_object *lwt = NULL;
  523. while (!netdata_exit) {
  524. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  525. if (cloud_base_url == NULL) {
  526. error("Do not move the cloud base url out of post_conf_load!!");
  527. return -1;
  528. }
  529. if (aclk_block_till_recon_allowed())
  530. return 1;
  531. info("Attempting connection now");
  532. memset(&base_url, 0, sizeof(url_t));
  533. if (url_parse(cloud_base_url, &base_url)) {
  534. error("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
  535. sleep(CLOUD_BASE_URL_READ_RETRY);
  536. url_t_destroy(&base_url);
  537. continue;
  538. }
  539. struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .type = MQTT_WSS_DIRECT };
  540. aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, &proxy_conf.type);
  541. struct mqtt_connect_params mqtt_conn_params = {
  542. .clientid = "anon",
  543. .username = "anon",
  544. .password = "anon",
  545. .will_topic = "lwt",
  546. .will_msg = NULL,
  547. .will_flags = MQTT_WSS_PUB_QOS2,
  548. .keep_alive = 60,
  549. .drop_on_publish_fail = 1
  550. };
  551. aclk_use_new_cloud_arch = 0;
  552. #ifndef ACLK_DISABLE_CHALLENGE
  553. if (aclk_env) {
  554. aclk_env_t_destroy(aclk_env);
  555. freez(aclk_env);
  556. }
  557. aclk_env = callocz(1, sizeof(aclk_env_t));
  558. ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
  559. url_t_destroy(&base_url);
  560. if (ret) {
  561. error("Failed to Get ACLK environment");
  562. // delay handled by aclk_block_till_recon_allowed
  563. continue;
  564. }
  565. if (netdata_exit)
  566. return 1;
  567. if (aclk_env->encoding == ACLK_ENC_PROTO) {
  568. #ifndef ENABLE_NEW_CLOUD_PROTOCOL
  569. error("Cloud requested New Cloud Protocol to be used but this agent cannot support it!");
  570. continue;
  571. #else
  572. if (!aclk_env_has_capa("proto")) {
  573. error ("Can't encoding=proto without at least \"proto\" capability.");
  574. continue;
  575. }
  576. info("Switching ACLK to new protobuf protocol. Due to /env response.");
  577. aclk_use_new_cloud_arch = 1;
  578. #endif
  579. }
  580. memset(&auth_url, 0, sizeof(url_t));
  581. if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
  582. error("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
  583. url_t_destroy(&auth_url);
  584. continue;
  585. }
  586. ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
  587. url_t_destroy(&auth_url);
  588. if (ret) {
  589. error("Error passing Challenge/Response to get OTP");
  590. continue;
  591. }
  592. // aclk_get_topic moved here as during OTP we
  593. // generate the topic cache
  594. if (aclk_use_new_cloud_arch)
  595. mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
  596. else
  597. mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_METADATA);
  598. if (!mqtt_conn_params.will_topic) {
  599. error("Couldn't get LWT topic. Will not send LWT.");
  600. continue;
  601. }
  602. // Do the MQTT connection
  603. ret = aclk_get_transport_idx(aclk_env);
  604. if (ret < 0) {
  605. error("Cloud /env endpoint didn't return any transport usable by this Agent.");
  606. continue;
  607. }
  608. memset(&mqtt_url, 0, sizeof(url_t));
  609. if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
  610. error("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
  611. url_t_destroy(&mqtt_url);
  612. continue;
  613. }
  614. #endif
  615. aclk_session_newarch = now_realtime_usec();
  616. aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
  617. aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
  618. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  619. if (aclk_use_new_cloud_arch) {
  620. mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
  621. } else {
  622. #endif
  623. lwt = aclk_generate_disconnect(NULL);
  624. mqtt_conn_params.will_msg = json_object_to_json_string_ext(lwt, JSON_C_TO_STRING_PLAIN);
  625. mqtt_conn_params.will_msg_len = strlen(mqtt_conn_params.will_msg);
  626. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  627. }
  628. #endif
  629. #ifdef ACLK_DISABLE_CHALLENGE
  630. ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  631. url_t_destroy(&base_url);
  632. #else
  633. ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  634. url_t_destroy(&mqtt_url);
  635. freez((char*)mqtt_conn_params.clientid);
  636. freez((char*)mqtt_conn_params.password);
  637. freez((char*)mqtt_conn_params.username);
  638. #endif
  639. if (aclk_use_new_cloud_arch)
  640. freez((char *)mqtt_conn_params.will_msg);
  641. else
  642. json_object_put(lwt);
  643. if (!ret) {
  644. last_conn_time_mqtt = now_realtime_sec();
  645. info("ACLK connection successfully established");
  646. log_access("ACLK CONNECTED");
  647. mqtt_connected_actions(client);
  648. return 0;
  649. }
  650. error_report("Connect failed");
  651. }
  652. return 1;
  653. }
  654. /**
  655. * Main agent cloud link thread
  656. *
  657. * This thread will simply call the main event loop that handles
  658. * pending requests - both inbound and outbound
  659. *
  660. * @param ptr is a pointer to the netdata_static_thread structure.
  661. *
  662. * @return It always returns NULL
  663. */
  664. void *aclk_main(void *ptr)
  665. {
  666. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  667. struct aclk_stats_thread *stats_thread = NULL;
  668. struct aclk_query_threads query_threads;
  669. query_threads.thread_list = NULL;
  670. ACLK_PROXY_TYPE proxy_type;
  671. aclk_get_proxy(&proxy_type);
  672. if (proxy_type == PROXY_TYPE_SOCKS5) {
  673. error("SOCKS5 proxy is not supported by ACLK-NG yet.");
  674. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  675. return NULL;
  676. }
  677. unsigned int proto_hdl_cnt;
  678. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  679. proto_hdl_cnt = aclk_init_rx_msg_handlers();
  680. #endif
  681. // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
  682. // as it must notify the far end that it shutdown gracefully and avoid the LWT.
  683. netdata_thread_disable_cancelability();
  684. #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
  685. info("Killing ACLK thread -> cloud functionality has been disabled");
  686. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  687. return NULL;
  688. #endif
  689. aclk_popcorn_check_bump(); // start localhost popcorn timer
  690. query_threads.count = read_query_thread_count();
  691. if (wait_till_cloud_enabled())
  692. goto exit;
  693. if (wait_till_agent_claim_ready())
  694. goto exit;
  695. use_mqtt_5 = config_get_boolean(CONFIG_SECTION_CLOUD, "mqtt5", CONFIG_BOOLEAN_NO);
  696. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  697. if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback, use_mqtt_5))) {
  698. #else
  699. if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback_old_protocol, puback_callback, use_mqtt_5))) {
  700. #endif
  701. error("Couldn't initialize MQTT_WSS network library");
  702. goto exit;
  703. }
  704. // Enable MQTT buffer growth if necessary
  705. // e.g. old cloud architecture clients with huge nodes
  706. // that send JSON payloads of 10 MB as single messages
  707. mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
  708. aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
  709. if (aclk_stats_enabled) {
  710. stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
  711. stats_thread->thread = mallocz(sizeof(netdata_thread_t));
  712. stats_thread->query_thread_count = query_threads.count;
  713. aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
  714. netdata_thread_create(
  715. stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
  716. stats_thread);
  717. }
  718. // Keep reconnecting and talking until our time has come
  719. // and the Grim Reaper (netdata_exit) calls
  720. do {
  721. if (aclk_attempt_to_connect(mqttwss_client))
  722. goto exit_full;
  723. #if defined(ENABLE_ACLK) && !defined(ENABLE_NEW_CLOUD_PROTOCOL)
  724. error_report("############################ WARNING ###############################");
  725. error_report("# Your agent is configured to connect to cloud but has #");
  726. error_report("# no protobuf protocol support (uses legacy JSON protocol) #");
  727. error_report("# Legacy protocol will be deprecated soon (planned 1st March 2022) #");
  728. error_report("# Visit following link for more info and instructions how to solve #");
  729. error_report("# https://www.netdata.cloud/blog/netdata-clouds-new-architecture #");
  730. error_report("######################################################################");
  731. #endif
  732. // warning this assumes the popcorning is relative short (3s)
  733. // if that changes call mqtt_wss_service from within
  734. // to keep OpenSSL, WSS and MQTT connection alive
  735. if (wait_popcorning_finishes())
  736. goto exit_full;
  737. if (unlikely(!query_threads.thread_list))
  738. aclk_query_threads_start(&query_threads, mqttwss_client);
  739. if (!aclk_use_new_cloud_arch)
  740. queue_connect_payloads();
  741. if (handle_connection(mqttwss_client)) {
  742. aclk_stats_upd_online(0);
  743. last_disconnect_time = now_realtime_sec();
  744. aclk_connected = 0;
  745. log_access("ACLK DISCONNECTED");
  746. }
  747. } while (!netdata_exit);
  748. aclk_graceful_disconnect(mqttwss_client);
  749. exit_full:
  750. // Tear Down
  751. QUERY_THREAD_WAKEUP_ALL;
  752. aclk_query_threads_cleanup(&query_threads);
  753. if (aclk_stats_enabled) {
  754. netdata_thread_join(*stats_thread->thread, NULL);
  755. aclk_stats_thread_cleanup();
  756. freez(stats_thread->thread);
  757. freez(stats_thread);
  758. }
  759. free_topic_cache();
  760. mqtt_wss_destroy(mqttwss_client);
  761. exit:
  762. if (aclk_env) {
  763. aclk_env_t_destroy(aclk_env);
  764. freez(aclk_env);
  765. }
  766. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  767. return NULL;
  768. }
  769. // TODO this is taken over as workaround from old ACLK
  770. // fix this in both old and new ACLK
  771. extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
  772. void aclk_alarm_reload(void)
  773. {
  774. ACLK_SHARED_STATE_LOCK;
  775. if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
  776. ACLK_SHARED_STATE_UNLOCK;
  777. return;
  778. }
  779. ACLK_SHARED_STATE_UNLOCK;
  780. aclk_queue_query(aclk_query_new(METADATA_ALARMS));
  781. }
  782. int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
  783. {
  784. BUFFER *local_buffer;
  785. json_object *msg;
  786. if (host != localhost)
  787. return 0;
  788. ACLK_SHARED_STATE_LOCK;
  789. if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
  790. ACLK_SHARED_STATE_UNLOCK;
  791. return 0;
  792. }
  793. ACLK_SHARED_STATE_UNLOCK;
  794. local_buffer = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  795. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  796. health_alarm_entry2json_nolock(local_buffer, ae, host);
  797. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  798. msg = json_tokener_parse(local_buffer->buffer);
  799. struct aclk_query *query = aclk_query_new(ALARM_STATE_UPDATE);
  800. query->data.alarm_update = msg;
  801. aclk_queue_query(query);
  802. buffer_free(local_buffer);
  803. return 0;
  804. }
  805. int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
  806. {
  807. struct aclk_query *query;
  808. if (host == localhost ? aclk_popcorn_check_bump() : aclk_popcorn_check())
  809. return 0;
  810. query = aclk_query_new(create ? CHART_NEW : CHART_DEL);
  811. if(create) {
  812. query->data.chart_add_del.host = host;
  813. query->data.chart_add_del.chart_name = strdupz(chart_name);
  814. } else {
  815. query->data.metadata_info.host = host;
  816. query->data.metadata_info.initial_on_connect = 0;
  817. }
  818. aclk_queue_query(query);
  819. return 0;
  820. }
  821. /*
  822. * Add a new collector to the list
  823. * If it exists, update the chart count
  824. */
  825. void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
  826. {
  827. struct aclk_query *query;
  828. struct _collector *tmp_collector;
  829. if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
  830. return;
  831. }
  832. COLLECTOR_LOCK;
  833. tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name);
  834. if (unlikely(tmp_collector->count != 1)) {
  835. COLLECTOR_UNLOCK;
  836. return;
  837. }
  838. COLLECTOR_UNLOCK;
  839. if (aclk_popcorn_check_bump())
  840. return;
  841. if (host != localhost)
  842. return;
  843. query = aclk_query_new(METADATA_INFO);
  844. query->data.metadata_info.host = localhost; //TODO
  845. query->data.metadata_info.initial_on_connect = 0;
  846. aclk_queue_query(query);
  847. query = aclk_query_new(METADATA_ALARMS);
  848. query->data.metadata_alarms.initial_on_connect = 0;
  849. aclk_queue_query(query);
  850. }
  851. /*
  852. * Delete a collector from the list
  853. * If the chart count reaches zero the collector will be removed
  854. * from the list by calling del_collector.
  855. *
  856. * This function will release the memory used and schedule
  857. * a cloud update
  858. */
  859. void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
  860. {
  861. struct aclk_query *query;
  862. struct _collector *tmp_collector;
  863. if (unlikely(!netdata_ready || aclk_use_new_cloud_arch)) {
  864. return;
  865. }
  866. COLLECTOR_LOCK;
  867. tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name);
  868. if (unlikely(!tmp_collector || tmp_collector->count)) {
  869. COLLECTOR_UNLOCK;
  870. return;
  871. }
  872. debug(
  873. D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
  874. tmp_collector->count);
  875. COLLECTOR_UNLOCK;
  876. _free_collector(tmp_collector);
  877. if (aclk_popcorn_check_bump())
  878. return;
  879. if (host != localhost)
  880. return;
  881. query = aclk_query_new(METADATA_INFO);
  882. query->data.metadata_info.host = localhost; //TODO
  883. query->data.metadata_info.initial_on_connect = 0;
  884. aclk_queue_query(query);
  885. query = aclk_query_new(METADATA_ALARMS);
  886. query->data.metadata_alarms.initial_on_connect = 0;
  887. aclk_queue_query(query);
  888. }
  889. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  890. void aclk_host_state_update(RRDHOST *host, int cmd)
  891. {
  892. uuid_t node_id;
  893. int ret;
  894. if (!aclk_connected || !aclk_use_new_cloud_arch)
  895. return;
  896. ret = get_node_id(&host->host_uuid, &node_id);
  897. if (ret > 0) {
  898. // this means we were not able to check if node_id already present
  899. error("Unable to check for node_id. Ignoring the host state update.");
  900. return;
  901. }
  902. if (ret < 0) {
  903. // node_id not found
  904. aclk_query_t create_query;
  905. create_query = aclk_query_new(REGISTER_NODE);
  906. rrdhost_aclk_state_lock(localhost);
  907. node_instance_creation_t node_instance_creation = {
  908. .claim_id = localhost->aclk_state.claimed_id,
  909. .hops = host->system_info->hops,
  910. .hostname = host->hostname,
  911. .machine_guid = host->machine_guid
  912. };
  913. create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
  914. rrdhost_aclk_state_unlock(localhost);
  915. create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
  916. create_query->data.bin_payload.msg_name = "CreateNodeInstance";
  917. info("Registering host=%s, hops=%u",host->machine_guid, host->system_info->hops);
  918. aclk_queue_query(create_query);
  919. return;
  920. }
  921. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  922. node_instance_connection_t node_state_update = {
  923. .hops = host->system_info->hops,
  924. .live = cmd,
  925. .queryable = 1,
  926. .session_id = aclk_session_newarch
  927. };
  928. node_state_update.node_id = mallocz(UUID_STR_LEN);
  929. uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
  930. rrdhost_aclk_state_lock(localhost);
  931. node_state_update.claim_id = localhost->aclk_state.claimed_id;
  932. query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
  933. rrdhost_aclk_state_unlock(localhost);
  934. info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
  935. host->system_info->hops);
  936. freez((void*)node_state_update.node_id);
  937. query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
  938. query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
  939. aclk_queue_query(query);
  940. }
  941. void aclk_send_node_instances()
  942. {
  943. struct node_instance_list *list_head = get_node_list();
  944. struct node_instance_list *list = list_head;
  945. if (unlikely(!list)) {
  946. error_report("Failure to get_node_list from DB!");
  947. return;
  948. }
  949. while (!uuid_is_null(list->host_id)) {
  950. if (!uuid_is_null(list->node_id)) {
  951. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  952. node_instance_connection_t node_state_update = {
  953. .live = list->live,
  954. .hops = list->hops,
  955. .queryable = 1,
  956. .session_id = aclk_session_newarch
  957. };
  958. node_state_update.node_id = mallocz(UUID_STR_LEN);
  959. uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
  960. rrdhost_aclk_state_lock(localhost);
  961. node_state_update.claim_id = localhost->aclk_state.claimed_id;
  962. query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
  963. rrdhost_aclk_state_unlock(localhost);
  964. info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
  965. list->live,
  966. list->hops);
  967. freez((void*)node_state_update.node_id);
  968. query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
  969. query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
  970. aclk_queue_query(query);
  971. } else {
  972. aclk_query_t create_query;
  973. create_query = aclk_query_new(REGISTER_NODE);
  974. node_instance_creation_t node_instance_creation = {
  975. .hops = list->hops,
  976. .hostname = list->hostname,
  977. };
  978. node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
  979. uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
  980. create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
  981. create_query->data.bin_payload.msg_name = "CreateNodeInstance";
  982. rrdhost_aclk_state_lock(localhost);
  983. node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
  984. create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
  985. rrdhost_aclk_state_unlock(localhost);
  986. info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
  987. list->hops);
  988. freez(node_instance_creation.machine_guid);
  989. aclk_queue_query(create_query);
  990. }
  991. freez(list->hostname);
  992. list++;
  993. }
  994. freez(list_head);
  995. }
  996. #endif
  997. void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
  998. {
  999. aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
  1000. }
  1001. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  1002. static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
  1003. {
  1004. struct proto_alert_status status;
  1005. memset(&status, 0, sizeof(status));
  1006. if (get_proto_alert_status(host, &status)) {
  1007. buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
  1008. return;
  1009. }
  1010. buffer_sprintf(wb,
  1011. "\n\t\tUpdates: %d"
  1012. "\n\t\tBatch ID: %"PRIu64
  1013. "\n\t\tLast Acked Seq ID: %"PRIu64
  1014. "\n\t\tPending Min Seq ID: %"PRIu64
  1015. "\n\t\tPending Max Seq ID: %"PRIu64
  1016. "\n\t\tLast Submitted Seq ID: %"PRIu64,
  1017. status.alert_updates,
  1018. status.alerts_batch_id,
  1019. status.last_acked_sequence_id,
  1020. status.pending_min_sequence_id,
  1021. status.pending_max_sequence_id,
  1022. status.last_submitted_sequence_id
  1023. );
  1024. }
  1025. static void fill_chart_status_for_host(BUFFER *wb, RRDHOST *host)
  1026. {
  1027. struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
  1028. if (!stats) {
  1029. buffer_strcat(wb, "\n\t\tFailed to get alert streaming status for this host");
  1030. return;
  1031. }
  1032. buffer_sprintf(wb,
  1033. "\n\t\tUpdates: %d"
  1034. "\n\t\tBatch ID: %"PRIu64
  1035. "\n\t\tMin Seq ID: %"PRIu64
  1036. "\n\t\tMax Seq ID: %"PRIu64
  1037. "\n\t\tPending Min Seq ID: %"PRIu64
  1038. "\n\t\tPending Max Seq ID: %"PRIu64
  1039. "\n\t\tSent Min Seq ID: %"PRIu64
  1040. "\n\t\tSent Max Seq ID: %"PRIu64
  1041. "\n\t\tAcked Min Seq ID: %"PRIu64
  1042. "\n\t\tAcked Max Seq ID: %"PRIu64,
  1043. stats->updates,
  1044. stats->batch_id,
  1045. stats->min_seqid,
  1046. stats->max_seqid,
  1047. stats->min_seqid_pend,
  1048. stats->max_seqid_pend,
  1049. stats->min_seqid_sent,
  1050. stats->max_seqid_sent,
  1051. stats->min_seqid_ack,
  1052. stats->max_seqid_ack
  1053. );
  1054. freez(stats);
  1055. }
  1056. #endif
  1057. char *ng_aclk_state(void)
  1058. {
  1059. BUFFER *wb = buffer_create(1024);
  1060. struct tm *tmptr, tmbuf;
  1061. char *ret;
  1062. buffer_strcat(wb,
  1063. "ACLK Available: Yes\n"
  1064. "ACLK Version: 2\n"
  1065. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  1066. "Protocols Supported: Legacy, Protobuf\n"
  1067. #else
  1068. "Protocols Supported: Legacy\n"
  1069. #endif
  1070. );
  1071. buffer_sprintf(wb, "Protocol Used: %s\nMQTT Version: %d\nClaimed: ", aclk_use_new_cloud_arch ? "Protobuf" : "Legacy", use_mqtt_5 ? 5 : 3);
  1072. char *agent_id = is_agent_claimed();
  1073. if (agent_id == NULL)
  1074. buffer_strcat(wb, "No\n");
  1075. else {
  1076. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  1077. buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
  1078. freez(agent_id);
  1079. }
  1080. buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
  1081. if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
  1082. char timebuf[26];
  1083. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  1084. buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
  1085. }
  1086. if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
  1087. char timebuf[26];
  1088. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  1089. buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
  1090. }
  1091. if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
  1092. char timebuf[26];
  1093. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  1094. buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
  1095. }
  1096. if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
  1097. char timebuf[26];
  1098. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  1099. buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
  1100. }
  1101. if (aclk_connected) {
  1102. buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
  1103. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  1104. RRDHOST *host;
  1105. rrd_rdlock();
  1106. rrdhost_foreach_read(host) {
  1107. buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, host->hostname);
  1108. buffer_strcat(wb, "\tClaimed ID: ");
  1109. rrdhost_aclk_state_lock(host);
  1110. if (host->aclk_state.claimed_id)
  1111. buffer_strcat(wb, host->aclk_state.claimed_id);
  1112. else
  1113. buffer_strcat(wb, "null");
  1114. rrdhost_aclk_state_unlock(host);
  1115. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  1116. buffer_strcat(wb, "\n\tNode ID: null\n");
  1117. } else {
  1118. char node_id[GUID_LEN + 1];
  1119. uuid_unparse_lower(*host->node_id, node_id);
  1120. buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
  1121. }
  1122. buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
  1123. if (host != localhost)
  1124. buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false");
  1125. buffer_strcat(wb, "\n\tAlert Streaming Status:");
  1126. fill_alert_status_for_host(wb, host);
  1127. buffer_strcat(wb, "\n\tChart Streaming Status:");
  1128. fill_chart_status_for_host(wb, host);
  1129. }
  1130. rrd_unlock();
  1131. #endif
  1132. }
  1133. ret = strdupz(buffer_tostring(wb));
  1134. buffer_free(wb);
  1135. return ret;
  1136. }
  1137. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  1138. static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
  1139. {
  1140. struct proto_alert_status status;
  1141. memset(&status, 0, sizeof(status));
  1142. if (get_proto_alert_status(host, &status))
  1143. return;
  1144. json_object *tmp = json_object_new_int(status.alert_updates);
  1145. json_object_object_add(obj, "updates", tmp);
  1146. tmp = json_object_new_int(status.alerts_batch_id);
  1147. json_object_object_add(obj, "batch-id", tmp);
  1148. tmp = json_object_new_int(status.last_acked_sequence_id);
  1149. json_object_object_add(obj, "last-acked-seq-id", tmp);
  1150. tmp = json_object_new_int(status.pending_min_sequence_id);
  1151. json_object_object_add(obj, "pending-min-seq-id", tmp);
  1152. tmp = json_object_new_int(status.pending_max_sequence_id);
  1153. json_object_object_add(obj, "pending-max-seq-id", tmp);
  1154. tmp = json_object_new_int(status.last_submitted_sequence_id);
  1155. json_object_object_add(obj, "last-submitted-seq-id", tmp);
  1156. }
  1157. static void fill_chart_status_for_host_json(json_object *obj, RRDHOST *host)
  1158. {
  1159. struct aclk_chart_sync_stats *stats = aclk_get_chart_sync_stats(host);
  1160. if (!stats)
  1161. return;
  1162. json_object *tmp = json_object_new_int(stats->updates);
  1163. json_object_object_add(obj, "updates", tmp);
  1164. tmp = json_object_new_int(stats->batch_id);
  1165. json_object_object_add(obj, "batch-id", tmp);
  1166. tmp = json_object_new_int(stats->min_seqid);
  1167. json_object_object_add(obj, "min-seq-id", tmp);
  1168. tmp = json_object_new_int(stats->max_seqid);
  1169. json_object_object_add(obj, "max-seq-id", tmp);
  1170. tmp = json_object_new_int(stats->min_seqid_pend);
  1171. json_object_object_add(obj, "pending-min-seq-id", tmp);
  1172. tmp = json_object_new_int(stats->max_seqid_pend);
  1173. json_object_object_add(obj, "pending-max-seq-id", tmp);
  1174. tmp = json_object_new_int(stats->min_seqid_sent);
  1175. json_object_object_add(obj, "sent-min-seq-id", tmp);
  1176. tmp = json_object_new_int(stats->max_seqid_sent);
  1177. json_object_object_add(obj, "sent-max-seq-id", tmp);
  1178. tmp = json_object_new_int(stats->min_seqid_ack);
  1179. json_object_object_add(obj, "acked-min-seq-id", tmp);
  1180. tmp = json_object_new_int(stats->max_seqid_ack);
  1181. json_object_object_add(obj, "acked-max-seq-id", tmp);
  1182. freez(stats);
  1183. }
  1184. #endif
  1185. static json_object *timestamp_to_json(const time_t *t)
  1186. {
  1187. struct tm *tmptr, tmbuf;
  1188. if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
  1189. char timebuf[26];
  1190. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  1191. return json_object_new_string(timebuf);
  1192. }
  1193. return NULL;
  1194. }
  1195. char *ng_aclk_state_json(void)
  1196. {
  1197. json_object *tmp, *grp, *msg = json_object_new_object();
  1198. tmp = json_object_new_boolean(1);
  1199. json_object_object_add(msg, "aclk-available", tmp);
  1200. tmp = json_object_new_int(2);
  1201. json_object_object_add(msg, "aclk-version", tmp);
  1202. grp = json_object_new_array();
  1203. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  1204. tmp = json_object_new_string("Legacy");
  1205. json_object_array_add(grp, tmp);
  1206. tmp = json_object_new_string("Protobuf");
  1207. json_object_array_add(grp, tmp);
  1208. #else
  1209. tmp = json_object_new_string("Legacy");
  1210. json_object_array_add(grp, tmp);
  1211. #endif
  1212. json_object_object_add(msg, "protocols-supported", grp);
  1213. char *agent_id = is_agent_claimed();
  1214. tmp = json_object_new_boolean(agent_id != NULL);
  1215. json_object_object_add(msg, "agent-claimed", tmp);
  1216. if (agent_id) {
  1217. tmp = json_object_new_string(agent_id);
  1218. freez(agent_id);
  1219. } else
  1220. tmp = NULL;
  1221. json_object_object_add(msg, "claimed-id", tmp);
  1222. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  1223. tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
  1224. json_object_object_add(msg, "cloud-url", tmp);
  1225. tmp = json_object_new_boolean(aclk_connected);
  1226. json_object_object_add(msg, "online", tmp);
  1227. tmp = json_object_new_string(aclk_use_new_cloud_arch ? "Protobuf" : "Legacy");
  1228. json_object_object_add(msg, "used-cloud-protocol", tmp);
  1229. tmp = json_object_new_int(use_mqtt_5 ? 5 : 3);
  1230. json_object_object_add(msg, "mqtt-version", tmp);
  1231. tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
  1232. json_object_object_add(msg, "received-app-layer-msgs", tmp);
  1233. tmp = json_object_new_int(aclk_pubacks_per_conn);
  1234. json_object_object_add(msg, "received-mqtt-pubacks", tmp);
  1235. tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
  1236. json_object_object_add(msg, "reconnect-count", tmp);
  1237. json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
  1238. json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
  1239. json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
  1240. json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL);
  1241. tmp = NULL;
  1242. if (!aclk_connected && last_backoff_value)
  1243. tmp = json_object_new_double(last_backoff_value);
  1244. json_object_object_add(msg, "last-backoff-value", tmp);
  1245. tmp = json_object_new_boolean(aclk_disable_runtime);
  1246. json_object_object_add(msg, "banned-by-cloud", tmp);
  1247. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  1248. grp = json_object_new_array();
  1249. RRDHOST *host;
  1250. rrd_rdlock();
  1251. rrdhost_foreach_read(host) {
  1252. json_object *nodeinstance = json_object_new_object();
  1253. tmp = json_object_new_string(host->hostname);
  1254. json_object_object_add(nodeinstance, "hostname", tmp);
  1255. tmp = json_object_new_string(host->machine_guid);
  1256. json_object_object_add(nodeinstance, "mguid", tmp);
  1257. rrdhost_aclk_state_lock(host);
  1258. if (host->aclk_state.claimed_id) {
  1259. tmp = json_object_new_string(host->aclk_state.claimed_id);
  1260. json_object_object_add(nodeinstance, "claimed_id", tmp);
  1261. } else
  1262. json_object_object_add(nodeinstance, "claimed_id", NULL);
  1263. rrdhost_aclk_state_unlock(host);
  1264. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  1265. json_object_object_add(nodeinstance, "node-id", NULL);
  1266. } else {
  1267. char node_id[GUID_LEN + 1];
  1268. uuid_unparse_lower(*host->node_id, node_id);
  1269. tmp = json_object_new_string(node_id);
  1270. json_object_object_add(nodeinstance, "node-id", tmp);
  1271. }
  1272. tmp = json_object_new_int(host->system_info->hops);
  1273. json_object_object_add(nodeinstance, "streaming-hops", tmp);
  1274. tmp = json_object_new_string(host == localhost ? "self" : "child");
  1275. json_object_object_add(nodeinstance, "relationship", tmp);
  1276. tmp = json_object_new_boolean((host->receiver || host == localhost));
  1277. json_object_object_add(nodeinstance, "streaming-online", tmp);
  1278. tmp = json_object_new_object();
  1279. fill_alert_status_for_host_json(tmp, host);
  1280. json_object_object_add(nodeinstance, "alert-sync-status", tmp);
  1281. tmp = json_object_new_object();
  1282. fill_chart_status_for_host_json(tmp, host);
  1283. json_object_object_add(nodeinstance, "chart-sync-status", tmp);
  1284. json_object_array_add(grp, nodeinstance);
  1285. }
  1286. rrd_unlock();
  1287. json_object_object_add(msg, "node-instances", grp);
  1288. #endif
  1289. char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
  1290. json_object_put(msg);
  1291. return str;
  1292. }