aclk.c 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk.h"
  3. #ifdef ENABLE_ACLK
  4. #include "aclk_stats.h"
  5. #include "mqtt_wss_client.h"
  6. #include "aclk_otp.h"
  7. #include "aclk_tx_msgs.h"
  8. #include "aclk_query.h"
  9. #include "aclk_query_queue.h"
  10. #include "aclk_util.h"
  11. #include "aclk_rx_msgs.h"
  12. #include "https_client.h"
  13. #include "schema-wrappers/schema_wrappers.h"
  14. #include "aclk_capas.h"
  15. #include "aclk_proxy.h"
  16. #ifdef ACLK_LOG_CONVERSATION_DIR
  17. #include <sys/types.h>
  18. #include <sys/stat.h>
  19. #include <fcntl.h>
  20. #endif
  21. #define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
  22. #endif /* ENABLE_ACLK */
  23. int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
  24. int aclk_rcvd_cloud_msgs = 0;
  25. int aclk_connection_counter = 0;
  26. int disconnect_req = 0;
  27. int aclk_connected = 0;
  28. int aclk_ctx_based = 0;
  29. int aclk_disable_runtime = 0;
  30. int aclk_stats_enabled;
  31. int aclk_kill_link = 0;
  32. usec_t aclk_session_us = 0;
  33. time_t aclk_session_sec = 0;
  34. time_t last_conn_time_mqtt = 0;
  35. time_t last_conn_time_appl = 0;
  36. time_t last_disconnect_time = 0;
  37. time_t next_connection_attempt = 0;
  38. float last_backoff_value = 0;
  39. time_t aclk_block_until = 0;
  40. #ifdef ENABLE_ACLK
  41. mqtt_wss_client mqttwss_client;
  42. netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
  43. #define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
  44. #define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
  45. struct aclk_shared_state aclk_shared_state = {
  46. .mqtt_shutdown_msg_id = -1,
  47. .mqtt_shutdown_msg_rcvd = 0
  48. };
  49. #ifdef MQTT_WSS_DEBUG
  50. #include <openssl/ssl.h>
  51. #define DEFAULT_SSKEYLOGFILE_NAME "SSLKEYLOGFILE"
  52. const char *ssl_log_filename = NULL;
  53. FILE *ssl_log_file = NULL;
  54. static void aclk_ssl_keylog_cb(const SSL *ssl, const char *line)
  55. {
  56. (void)ssl;
  57. if (!ssl_log_file)
  58. ssl_log_file = fopen(ssl_log_filename, "a");
  59. if (!ssl_log_file) {
  60. netdata_log_error("Couldn't open ssl_log file (%s) for append.", ssl_log_filename);
  61. return;
  62. }
  63. fputs(line, ssl_log_file);
  64. putc('\n', ssl_log_file);
  65. fflush(ssl_log_file);
  66. }
  67. #endif
  68. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  69. OSSL_DECODER_CTX *aclk_dctx = NULL;
  70. EVP_PKEY *aclk_private_key = NULL;
  71. #else
  72. static RSA *aclk_private_key = NULL;
  73. #endif
  74. static int load_private_key()
  75. {
  76. if (aclk_private_key != NULL) {
  77. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  78. EVP_PKEY_free(aclk_private_key);
  79. if (aclk_dctx)
  80. OSSL_DECODER_CTX_free(aclk_dctx);
  81. aclk_dctx = NULL;
  82. #else
  83. RSA_free(aclk_private_key);
  84. #endif
  85. }
  86. aclk_private_key = NULL;
  87. char filename[FILENAME_MAX + 1];
  88. snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
  89. long bytes_read;
  90. char *private_key = read_by_filename(filename, &bytes_read);
  91. if (!private_key) {
  92. netdata_log_error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  93. return 1;
  94. }
  95. netdata_log_debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  96. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  97. if (key_bio==NULL) {
  98. netdata_log_error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  99. goto biofailed;
  100. }
  101. #if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
  102. aclk_dctx = OSSL_DECODER_CTX_new_for_pkey(&aclk_private_key, "PEM", NULL,
  103. "RSA",
  104. OSSL_KEYMGMT_SELECT_PRIVATE_KEY,
  105. NULL, NULL);
  106. if (!aclk_dctx) {
  107. netdata_log_error("Loading private key (from claiming) failed - no OpenSSL Decoders found");
  108. goto biofailed;
  109. }
  110. // this is necesseary to avoid RSA key with wrong size
  111. if (!OSSL_DECODER_from_bio(aclk_dctx, key_bio)) {
  112. netdata_log_error("Decoding private key (from claiming) failed - invalid format.");
  113. goto biofailed;
  114. }
  115. #else
  116. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  117. #endif
  118. BIO_free(key_bio);
  119. if (aclk_private_key!=NULL)
  120. {
  121. freez(private_key);
  122. return 0;
  123. }
  124. char err[512];
  125. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  126. netdata_log_error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  127. biofailed:
  128. freez(private_key);
  129. return 1;
  130. }
  131. static int wait_till_cloud_enabled()
  132. {
  133. nd_log(NDLS_DAEMON, NDLP_INFO,
  134. "Waiting for Cloud to be enabled");
  135. while (!netdata_cloud_enabled) {
  136. sleep_usec(USEC_PER_SEC * 1);
  137. if (!service_running(SERVICE_ACLK))
  138. return 1;
  139. }
  140. return 0;
  141. }
  142. /**
  143. * Will block until agent is claimed. Returns only if agent claimed
  144. * or if agent needs to shutdown.
  145. *
  146. * @return `0` if agent has been claimed,
  147. * `1` if interrupted due to agent shutting down
  148. */
  149. static int wait_till_agent_claimed(void)
  150. {
  151. //TODO prevent malloc and freez
  152. char *agent_id = get_agent_claimid();
  153. while (likely(!agent_id)) {
  154. sleep_usec(USEC_PER_SEC * 1);
  155. if (!service_running(SERVICE_ACLK))
  156. return 1;
  157. agent_id = get_agent_claimid();
  158. }
  159. freez(agent_id);
  160. return 0;
  161. }
  162. /**
  163. * Checks everything is ready for connection
  164. * agent claimed, cloud url set and private key available
  165. *
  166. * @param aclk_hostname points to location where string pointer to hostname will be set
  167. * @param aclk_port port to int where port will be saved
  168. *
  169. * @return If non 0 returned irrecoverable error happened (or netdata_exit) and ACLK should be terminated
  170. */
  171. static int wait_till_agent_claim_ready()
  172. {
  173. url_t url;
  174. while (service_running(SERVICE_ACLK)) {
  175. if (wait_till_agent_claimed())
  176. return 1;
  177. // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
  178. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
  179. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  180. if (cloud_base_url == NULL) {
  181. netdata_log_error("Do not move the cloud base url out of post_conf_load!!");
  182. return 1;
  183. }
  184. // We just check configuration is valid here
  185. // TODO make it without malloc/free
  186. memset(&url, 0, sizeof(url_t));
  187. if (url_parse(cloud_base_url, &url)) {
  188. netdata_log_error("Agent is claimed but the URL in configuration key \"cloud base url\" is invalid, please fix");
  189. url_t_destroy(&url);
  190. sleep(5);
  191. continue;
  192. }
  193. url_t_destroy(&url);
  194. if (!load_private_key())
  195. return 0;
  196. sleep(5);
  197. }
  198. return 1;
  199. }
  200. void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
  201. {
  202. switch(log_type) {
  203. case MQTT_WSS_LOG_ERROR:
  204. case MQTT_WSS_LOG_FATAL:
  205. nd_log(NDLS_DAEMON, NDLP_ERR, "%s", str);
  206. return;
  207. case MQTT_WSS_LOG_WARN:
  208. nd_log(NDLS_DAEMON, NDLP_WARNING, "%s", str);
  209. return;
  210. case MQTT_WSS_LOG_INFO:
  211. nd_log(NDLS_DAEMON, NDLP_INFO, "%s", str);
  212. return;
  213. case MQTT_WSS_LOG_DEBUG:
  214. return;
  215. default:
  216. nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown log type from mqtt_wss");
  217. }
  218. }
  219. static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
  220. {
  221. UNUSED(qos);
  222. aclk_rcvd_cloud_msgs++;
  223. netdata_log_debug(D_ACLK, "Got Message From Broker Topic \"%s\" QOS %d", topic, qos);
  224. if (aclk_shared_state.mqtt_shutdown_msg_id > 0) {
  225. netdata_log_error("Link is shutting down. Ignoring incoming message.");
  226. return;
  227. }
  228. const char *msgtype = strrchr(topic, '/');
  229. if (unlikely(!msgtype)) {
  230. error_report("Cannot get message type from topic. Ignoring message from topic \"%s\"", topic);
  231. return;
  232. }
  233. msgtype++;
  234. if (unlikely(!*msgtype)) {
  235. error_report("Message type empty. Ignoring message from topic \"%s\"", topic);
  236. return;
  237. }
  238. #ifdef ACLK_LOG_CONVERSATION_DIR
  239. #define FN_MAX_LEN 512
  240. char filename[FN_MAX_LEN];
  241. int logfd;
  242. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-rx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgtype);
  243. logfd = open(filename, O_CREAT | O_TRUNC | O_WRONLY, S_IRUSR | S_IWUSR );
  244. if(logfd < 0)
  245. netdata_log_error("Error opening ACLK Conversation logfile \"%s\" for RX message.", filename);
  246. write(logfd, msg, msglen);
  247. close(logfd);
  248. #endif
  249. aclk_handle_new_cloud_msg(msgtype, msg, msglen, topic);
  250. }
  251. static void puback_callback(uint16_t packet_id)
  252. {
  253. if (++aclk_pubacks_per_conn == ACLK_PUBACKS_CONN_STABLE) {
  254. last_conn_time_appl = now_realtime_sec();
  255. aclk_tbeb_reset();
  256. }
  257. #ifdef NETDATA_INTERNAL_CHECKS
  258. aclk_stats_msg_puback(packet_id);
  259. #endif
  260. if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
  261. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  262. "Shutdown message has been acknowledged by the cloud. Exiting gracefully");
  263. aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
  264. }
  265. }
  266. static int read_query_thread_count()
  267. {
  268. int threads = MIN(get_netdata_cpus()/2, 6);
  269. threads = MAX(threads, 2);
  270. threads = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  271. if(threads < 1) {
  272. netdata_log_error("You need at least one query thread. Overriding configured setting of \"%d\"", threads);
  273. threads = 1;
  274. config_set_number(CONFIG_SECTION_CLOUD, "query thread count", threads);
  275. }
  276. return threads;
  277. }
  278. void aclk_graceful_disconnect(mqtt_wss_client client);
  279. /* Keeps connection alive and handles all network communications.
  280. * Returns on error or when netdata is shutting down.
  281. * @param client instance of mqtt_wss_client
  282. * @returns 0 - Netdata Exits
  283. * >0 - Error happened. Reconnect and start over.
  284. */
  285. static int handle_connection(mqtt_wss_client client)
  286. {
  287. time_t last_periodic_query_wakeup = now_monotonic_sec();
  288. while (service_running(SERVICE_ACLK)) {
  289. // timeout 1000 to check at least once a second
  290. // for netdata_exit
  291. if (mqtt_wss_service(client, 1000) < 0){
  292. error_report("Connection Error or Dropped");
  293. return 1;
  294. }
  295. if (disconnect_req || aclk_kill_link) {
  296. nd_log(NDLS_DAEMON, NDLP_NOTICE,
  297. "Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
  298. disconnect_req ? "true" : "false",
  299. aclk_kill_link ? "true" : "false");
  300. disconnect_req = 0;
  301. aclk_kill_link = 0;
  302. aclk_graceful_disconnect(client);
  303. aclk_queue_unlock();
  304. aclk_shared_state.mqtt_shutdown_msg_id = -1;
  305. aclk_shared_state.mqtt_shutdown_msg_rcvd = 0;
  306. return 1;
  307. }
  308. // mqtt_wss_service will return faster than in one second
  309. // if there is enough work to do
  310. time_t now = now_monotonic_sec();
  311. if (last_periodic_query_wakeup < now) {
  312. // wake up at least one Query Thread at least
  313. // once per second
  314. last_periodic_query_wakeup = now;
  315. QUERY_THREAD_WAKEUP;
  316. }
  317. }
  318. return 0;
  319. }
  320. static inline void mqtt_connected_actions(mqtt_wss_client client)
  321. {
  322. char *topic = (char*)aclk_get_topic(ACLK_TOPICID_COMMAND);
  323. if (!topic)
  324. netdata_log_error("Unable to fetch topic for COMMAND (to subscribe)");
  325. else
  326. mqtt_wss_subscribe(client, topic, 1);
  327. topic = (char*)aclk_get_topic(ACLK_TOPICID_CMD_NG_V1);
  328. if (!topic)
  329. netdata_log_error("Unable to fetch topic for protobuf COMMAND (to subscribe)");
  330. else
  331. mqtt_wss_subscribe(client, topic, 1);
  332. aclk_stats_upd_online(1);
  333. aclk_connected = 1;
  334. aclk_pubacks_per_conn = 0;
  335. aclk_rcvd_cloud_msgs = 0;
  336. aclk_connection_counter++;
  337. aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER;
  338. while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL)
  339. mqtt_wss_set_topic_alias(client, topic);
  340. aclk_send_agent_connection_update(client, 1);
  341. }
  342. void aclk_graceful_disconnect(mqtt_wss_client client)
  343. {
  344. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  345. "Preparing to gracefully shutdown ACLK connection");
  346. aclk_queue_lock();
  347. aclk_queue_flush();
  348. aclk_shared_state.mqtt_shutdown_msg_id = aclk_send_agent_connection_update(client, 0);
  349. time_t t = now_monotonic_sec();
  350. while (!mqtt_wss_service(client, 100)) {
  351. if (now_monotonic_sec() - t >= 2) {
  352. netdata_log_error("Wasn't able to gracefully shutdown ACLK in time!");
  353. break;
  354. }
  355. if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
  356. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  357. "MQTT App Layer `disconnect` message sent successfully");
  358. break;
  359. }
  360. }
  361. nd_log(NDLS_DAEMON, NDLP_WARNING, "ACLK link is down");
  362. nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED");
  363. aclk_stats_upd_online(0);
  364. last_disconnect_time = now_realtime_sec();
  365. aclk_connected = 0;
  366. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  367. "Attempting to gracefully shutdown the MQTT/WSS connection");
  368. mqtt_wss_disconnect(client, 1000);
  369. }
  370. static unsigned long aclk_reconnect_delay() {
  371. unsigned long recon_delay;
  372. time_t now;
  373. if (aclk_disable_runtime) {
  374. aclk_tbeb_reset();
  375. return 60 * MSEC_PER_SEC;
  376. }
  377. now = now_monotonic_sec();
  378. if (aclk_block_until) {
  379. if (now < aclk_block_until) {
  380. recon_delay = aclk_block_until - now;
  381. recon_delay *= MSEC_PER_SEC;
  382. aclk_block_until = 0;
  383. aclk_tbeb_reset();
  384. return recon_delay;
  385. }
  386. aclk_block_until = 0;
  387. }
  388. if (!aclk_env || !aclk_env->backoff.base)
  389. return aclk_tbeb_delay(0, 2, 0, 1024);
  390. return aclk_tbeb_delay(0, aclk_env->backoff.base, aclk_env->backoff.min_s, aclk_env->backoff.max_s);
  391. }
  392. /* Block till aclk_reconnect_delay is satisfied or netdata_exit is signalled
  393. * @return 0 - Go ahead and connect (delay expired)
  394. * 1 - netdata_exit
  395. */
  396. #define NETDATA_EXIT_POLL_MS (MSEC_PER_SEC/4)
  397. static int aclk_block_till_recon_allowed() {
  398. unsigned long recon_delay = aclk_reconnect_delay();
  399. next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
  400. last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
  401. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  402. "Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
  403. // we want to wake up from time to time to check netdata_exit
  404. while (recon_delay)
  405. {
  406. if (!service_running(SERVICE_ACLK))
  407. return 1;
  408. if (recon_delay > NETDATA_EXIT_POLL_MS) {
  409. sleep_usec(NETDATA_EXIT_POLL_MS * USEC_PER_MS);
  410. recon_delay -= NETDATA_EXIT_POLL_MS;
  411. continue;
  412. }
  413. sleep_usec(recon_delay * USEC_PER_MS);
  414. recon_delay = 0;
  415. }
  416. return !service_running(SERVICE_ACLK);
  417. }
  418. #ifndef ACLK_DISABLE_CHALLENGE
  419. /* Cloud returns transport list ordered with highest
  420. * priority first. This function selects highest prio
  421. * transport that we can actually use (support)
  422. */
  423. static int aclk_get_transport_idx(aclk_env_t *env) {
  424. for (size_t i = 0; i < env->transport_count; i++) {
  425. // currently we support only MQTT 5
  426. // therefore select first transport that matches
  427. if (env->transports[i]->type == ACLK_TRP_MQTT_5) {
  428. return i;
  429. }
  430. }
  431. return -1;
  432. }
  433. #endif
  434. ACLK_STATUS aclk_status = ACLK_STATUS_NONE;
  435. const char *aclk_status_to_string(void) {
  436. switch(aclk_status) {
  437. case ACLK_STATUS_CONNECTED:
  438. return "connected";
  439. case ACLK_STATUS_NONE:
  440. return "none";
  441. case ACLK_STATUS_DISABLED:
  442. return "disabled";
  443. case ACLK_STATUS_NO_CLOUD_URL:
  444. return "no_cloud_url";
  445. case ACLK_STATUS_INVALID_CLOUD_URL:
  446. return "invalid_cloud_url";
  447. case ACLK_STATUS_NOT_CLAIMED:
  448. return "not_claimed";
  449. case ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE:
  450. return "env_endpoint_unreachable";
  451. case ACLK_STATUS_ENV_RESPONSE_NOT_200:
  452. return "env_response_not_200";
  453. case ACLK_STATUS_ENV_RESPONSE_EMPTY:
  454. return "env_response_empty";
  455. case ACLK_STATUS_ENV_RESPONSE_NOT_JSON:
  456. return "env_response_not_json";
  457. case ACLK_STATUS_ENV_FAILED:
  458. return "env_failed";
  459. case ACLK_STATUS_BLOCKED:
  460. return "blocked";
  461. case ACLK_STATUS_NO_OLD_PROTOCOL:
  462. return "no_old_protocol";
  463. case ACLK_STATUS_NO_PROTOCOL_CAPABILITY:
  464. return "no_protocol_capability";
  465. case ACLK_STATUS_INVALID_ENV_AUTH_URL:
  466. return "invalid_env_auth_url";
  467. case ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX:
  468. return "invalid_env_transport_idx";
  469. case ACLK_STATUS_INVALID_ENV_TRANSPORT_URL:
  470. return "invalid_env_transport_url";
  471. case ACLK_STATUS_INVALID_OTP:
  472. return "invalid_otp";
  473. case ACLK_STATUS_NO_LWT_TOPIC:
  474. return "no_lwt_topic";
  475. default:
  476. return "unknown";
  477. }
  478. }
  479. const char *aclk_cloud_base_url = NULL;
  480. /* Attempts to make a connection to MQTT broker over WSS
  481. * @param client instance of mqtt_wss_client
  482. * @return 0 - Successful Connection,
  483. * <0 - Irrecoverable Error -> Kill ACLK,
  484. * >0 - netdata_exit
  485. */
  486. #define CLOUD_BASE_URL_READ_RETRY 30
  487. #ifdef ACLK_SSL_ALLOW_SELF_SIGNED
  488. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_ALLOW_SELF_SIGNED
  489. #else
  490. #define ACLK_SSL_FLAGS MQTT_WSS_SSL_CERT_CHECK_FULL
  491. #endif
  492. static int aclk_attempt_to_connect(mqtt_wss_client client)
  493. {
  494. int ret;
  495. url_t base_url;
  496. #ifndef ACLK_DISABLE_CHALLENGE
  497. url_t auth_url;
  498. url_t mqtt_url;
  499. #endif
  500. while (service_running(SERVICE_ACLK)) {
  501. aclk_cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  502. if (aclk_cloud_base_url == NULL) {
  503. error_report("Do not move the cloud base url out of post_conf_load!!");
  504. aclk_status = ACLK_STATUS_NO_CLOUD_URL;
  505. return -1;
  506. }
  507. if (aclk_block_till_recon_allowed()) {
  508. aclk_status = ACLK_STATUS_BLOCKED;
  509. return 1;
  510. }
  511. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  512. "Attempting connection now");
  513. memset(&base_url, 0, sizeof(url_t));
  514. if (url_parse(aclk_cloud_base_url, &base_url)) {
  515. aclk_status = ACLK_STATUS_INVALID_CLOUD_URL;
  516. error_report("ACLK base URL configuration key could not be parsed. Will retry in %d seconds.", CLOUD_BASE_URL_READ_RETRY);
  517. sleep(CLOUD_BASE_URL_READ_RETRY);
  518. url_t_destroy(&base_url);
  519. continue;
  520. }
  521. struct mqtt_wss_proxy proxy_conf = { .host = NULL, .port = 0, .username = NULL, .password = NULL, .type = MQTT_WSS_DIRECT };
  522. aclk_set_proxy((char**)&proxy_conf.host, &proxy_conf.port, (char**)&proxy_conf.username, (char**)&proxy_conf.password, &proxy_conf.type);
  523. struct mqtt_connect_params mqtt_conn_params = {
  524. .clientid = "anon",
  525. .username = "anon",
  526. .password = "anon",
  527. .will_topic = "lwt",
  528. .will_msg = NULL,
  529. .will_flags = MQTT_WSS_PUB_QOS2,
  530. .keep_alive = 60,
  531. .drop_on_publish_fail = 1
  532. };
  533. #ifndef ACLK_DISABLE_CHALLENGE
  534. if (aclk_env) {
  535. aclk_env_t_destroy(aclk_env);
  536. freez(aclk_env);
  537. }
  538. aclk_env = callocz(1, sizeof(aclk_env_t));
  539. ret = aclk_get_env(aclk_env, base_url.host, base_url.port);
  540. url_t_destroy(&base_url);
  541. if(ret) switch(ret) {
  542. case 1:
  543. aclk_status = ACLK_STATUS_NOT_CLAIMED;
  544. error_report("Failed to Get ACLK environment (agent is not claimed)");
  545. // delay handled by aclk_block_till_recon_allowed
  546. continue;
  547. case 2:
  548. aclk_status = ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE;
  549. error_report("Failed to Get ACLK environment (cannot contact ENV endpoint)");
  550. // delay handled by aclk_block_till_recon_allowed
  551. continue;
  552. case 3:
  553. aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_200;
  554. error_report("Failed to Get ACLK environment (ENV response code is not 200)");
  555. // delay handled by aclk_block_till_recon_allowed
  556. continue;
  557. case 4:
  558. aclk_status = ACLK_STATUS_ENV_RESPONSE_EMPTY;
  559. error_report("Failed to Get ACLK environment (ENV response is empty)");
  560. // delay handled by aclk_block_till_recon_allowed
  561. continue;
  562. case 5:
  563. aclk_status = ACLK_STATUS_ENV_RESPONSE_NOT_JSON;
  564. error_report("Failed to Get ACLK environment (ENV response is not JSON)");
  565. // delay handled by aclk_block_till_recon_allowed
  566. continue;
  567. default:
  568. aclk_status = ACLK_STATUS_ENV_FAILED;
  569. error_report("Failed to Get ACLK environment (unknown error)");
  570. // delay handled by aclk_block_till_recon_allowed
  571. continue;
  572. }
  573. if (!service_running(SERVICE_ACLK)) {
  574. aclk_status = ACLK_STATUS_DISABLED;
  575. return 1;
  576. }
  577. if (aclk_env->encoding != ACLK_ENC_PROTO) {
  578. aclk_status = ACLK_STATUS_NO_OLD_PROTOCOL;
  579. error_report("This agent can only use the new cloud protocol but cloud requested old one.");
  580. continue;
  581. }
  582. if (!aclk_env_has_capa("proto")) {
  583. aclk_status = ACLK_STATUS_NO_PROTOCOL_CAPABILITY;
  584. error_report("Can't use encoding=proto without at least \"proto\" capability.");
  585. continue;
  586. }
  587. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  588. "New ACLK protobuf protocol negotiated successfully (/env response).");
  589. memset(&auth_url, 0, sizeof(url_t));
  590. if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
  591. aclk_status = ACLK_STATUS_INVALID_ENV_AUTH_URL;
  592. error_report("Parsing URL returned by env endpoint for authentication failed. \"%s\"", aclk_env->auth_endpoint);
  593. url_t_destroy(&auth_url);
  594. continue;
  595. }
  596. ret = aclk_get_mqtt_otp(aclk_private_key, (char **)&mqtt_conn_params.clientid, (char **)&mqtt_conn_params.username, (char **)&mqtt_conn_params.password, &auth_url);
  597. url_t_destroy(&auth_url);
  598. if (ret) {
  599. aclk_status = ACLK_STATUS_INVALID_OTP;
  600. error_report("Error passing Challenge/Response to get OTP");
  601. continue;
  602. }
  603. // aclk_get_topic moved here as during OTP we
  604. // generate the topic cache
  605. mqtt_conn_params.will_topic = aclk_get_topic(ACLK_TOPICID_AGENT_CONN);
  606. if (!mqtt_conn_params.will_topic) {
  607. aclk_status = ACLK_STATUS_NO_LWT_TOPIC;
  608. error_report("Couldn't get LWT topic. Will not send LWT.");
  609. continue;
  610. }
  611. // Do the MQTT connection
  612. ret = aclk_get_transport_idx(aclk_env);
  613. if (ret < 0) {
  614. aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX;
  615. error_report("Cloud /env endpoint didn't return any transport usable by this Agent.");
  616. continue;
  617. }
  618. memset(&mqtt_url, 0, sizeof(url_t));
  619. if (url_parse(aclk_env->transports[ret]->endpoint, &mqtt_url)){
  620. aclk_status = ACLK_STATUS_INVALID_ENV_TRANSPORT_URL;
  621. error_report("Failed to parse target URL for /env trp idx %d \"%s\"", ret, aclk_env->transports[ret]->endpoint);
  622. url_t_destroy(&mqtt_url);
  623. continue;
  624. }
  625. #endif
  626. aclk_session_newarch = now_realtime_usec();
  627. aclk_session_sec = aclk_session_newarch / USEC_PER_SEC;
  628. aclk_session_us = aclk_session_newarch % USEC_PER_SEC;
  629. mqtt_conn_params.will_msg = aclk_generate_lwt(&mqtt_conn_params.will_msg_len);
  630. #ifdef ACLK_DISABLE_CHALLENGE
  631. ret = mqtt_wss_connect(client, base_url.host, base_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  632. url_t_destroy(&base_url);
  633. #else
  634. ret = mqtt_wss_connect(client, mqtt_url.host, mqtt_url.port, &mqtt_conn_params, ACLK_SSL_FLAGS, &proxy_conf);
  635. url_t_destroy(&mqtt_url);
  636. freez((char*)mqtt_conn_params.clientid);
  637. freez((char*)mqtt_conn_params.password);
  638. freez((char*)mqtt_conn_params.username);
  639. #endif
  640. freez((char*)mqtt_conn_params.will_msg);
  641. freez((char*)proxy_conf.host);
  642. freez((char*)proxy_conf.username);
  643. freez((char*)proxy_conf.password);
  644. if (!ret) {
  645. last_conn_time_mqtt = now_realtime_sec();
  646. nd_log(NDLS_DAEMON, NDLP_INFO, "ACLK connection successfully established");
  647. aclk_status = ACLK_STATUS_CONNECTED;
  648. nd_log(NDLS_ACCESS, NDLP_INFO, "ACLK CONNECTED");
  649. mqtt_connected_actions(client);
  650. return 0;
  651. }
  652. error_report("Connect failed");
  653. }
  654. aclk_status = ACLK_STATUS_DISABLED;
  655. return 1;
  656. }
  657. /**
  658. * Main agent cloud link thread
  659. *
  660. * This thread will simply call the main event loop that handles
  661. * pending requests - both inbound and outbound
  662. *
  663. * @param ptr is a pointer to the netdata_static_thread structure.
  664. *
  665. * @return It always returns NULL
  666. */
  667. void *aclk_main(void *ptr)
  668. {
  669. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  670. struct aclk_stats_thread *stats_thread = NULL;
  671. struct aclk_query_threads query_threads;
  672. query_threads.thread_list = NULL;
  673. ACLK_PROXY_TYPE proxy_type;
  674. aclk_get_proxy(&proxy_type);
  675. if (proxy_type == PROXY_TYPE_SOCKS5) {
  676. netdata_log_error("SOCKS5 proxy is not supported by ACLK-NG yet.");
  677. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  678. return NULL;
  679. }
  680. unsigned int proto_hdl_cnt = aclk_init_rx_msg_handlers();
  681. // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
  682. // as it must notify the far end that it shutdown gracefully and avoid the LWT.
  683. netdata_thread_disable_cancelability();
  684. #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
  685. nd_log(NDLS_DAEMON, NDLP_INFO,
  686. "Killing ACLK thread -> cloud functionality has been disabled");
  687. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  688. return NULL;
  689. #endif
  690. query_threads.count = read_query_thread_count();
  691. if (wait_till_cloud_enabled())
  692. goto exit;
  693. if (wait_till_agent_claim_ready())
  694. goto exit;
  695. if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
  696. netdata_log_error("Couldn't initialize MQTT_WSS network library");
  697. goto exit;
  698. }
  699. #ifdef MQTT_WSS_DEBUG
  700. size_t default_ssl_log_filename_size = strlen(netdata_configured_log_dir) + strlen(DEFAULT_SSKEYLOGFILE_NAME) + 2;
  701. char *default_ssl_log_filename = mallocz(default_ssl_log_filename_size);
  702. snprintfz(default_ssl_log_filename, default_ssl_log_filename_size, "%s/%s", netdata_configured_log_dir, DEFAULT_SSKEYLOGFILE_NAME);
  703. ssl_log_filename = config_get(CONFIG_SECTION_CLOUD, "aclk ssl keylog file", default_ssl_log_filename);
  704. freez(default_ssl_log_filename);
  705. if (ssl_log_filename) {
  706. error_report("SSLKEYLOGFILE active (path:\"%s\")!", ssl_log_filename);
  707. mqtt_wss_set_SSL_CTX_keylog_cb(mqttwss_client, aclk_ssl_keylog_cb);
  708. }
  709. #endif
  710. // Enable MQTT buffer growth if necessary
  711. // e.g. old cloud architecture clients with huge nodes
  712. // that send JSON payloads of 10 MB as single messages
  713. mqtt_wss_set_max_buf_size(mqttwss_client, 25*1024*1024);
  714. aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", global_statistics_enabled);
  715. if (aclk_stats_enabled) {
  716. stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
  717. stats_thread->thread = mallocz(sizeof(netdata_thread_t));
  718. stats_thread->query_thread_count = query_threads.count;
  719. stats_thread->client = mqttwss_client;
  720. aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt);
  721. netdata_thread_create(
  722. stats_thread->thread, "ACLK_STATS", NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
  723. stats_thread);
  724. }
  725. // Keep reconnecting and talking until our time has come
  726. // and the Grim Reaper (netdata_exit) calls
  727. do {
  728. if (aclk_attempt_to_connect(mqttwss_client))
  729. goto exit_full;
  730. if (unlikely(!query_threads.thread_list))
  731. aclk_query_threads_start(&query_threads, mqttwss_client);
  732. if (handle_connection(mqttwss_client)) {
  733. aclk_stats_upd_online(0);
  734. last_disconnect_time = now_realtime_sec();
  735. aclk_connected = 0;
  736. nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED");
  737. }
  738. } while (service_running(SERVICE_ACLK));
  739. aclk_graceful_disconnect(mqttwss_client);
  740. #ifdef MQTT_WSS_DEBUG
  741. if (ssl_log_file)
  742. fclose(ssl_log_file);
  743. #endif
  744. exit_full:
  745. // Tear Down
  746. QUERY_THREAD_WAKEUP_ALL;
  747. aclk_query_threads_cleanup(&query_threads);
  748. if (aclk_stats_enabled) {
  749. netdata_thread_join(*stats_thread->thread, NULL);
  750. aclk_stats_thread_cleanup();
  751. freez(stats_thread->thread);
  752. freez(stats_thread);
  753. }
  754. free_topic_cache();
  755. mqtt_wss_destroy(mqttwss_client);
  756. exit:
  757. if (aclk_env) {
  758. aclk_env_t_destroy(aclk_env);
  759. freez(aclk_env);
  760. }
  761. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  762. return NULL;
  763. }
  764. void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
  765. {
  766. uuid_t node_id;
  767. int ret = 0;
  768. if (!aclk_connected)
  769. return;
  770. if (host->node_id && !uuid_is_null(*host->node_id)) {
  771. uuid_copy(node_id, *host->node_id);
  772. }
  773. else {
  774. ret = get_node_id(&host->host_uuid, &node_id);
  775. if (ret > 0) {
  776. // this means we were not able to check if node_id already present
  777. netdata_log_error("Unable to check for node_id. Ignoring the host state update.");
  778. return;
  779. }
  780. if (ret < 0) {
  781. // node_id not found
  782. aclk_query_t create_query;
  783. create_query = aclk_query_new(REGISTER_NODE);
  784. rrdhost_aclk_state_lock(localhost);
  785. node_instance_creation_t node_instance_creation = {
  786. .claim_id = localhost->aclk_state.claimed_id,
  787. .hops = host->system_info->hops,
  788. .hostname = rrdhost_hostname(host),
  789. .machine_guid = host->machine_guid};
  790. create_query->data.bin_payload.payload =
  791. generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
  792. rrdhost_aclk_state_unlock(localhost);
  793. create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
  794. create_query->data.bin_payload.msg_name = "CreateNodeInstance";
  795. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  796. "Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
  797. aclk_queue_query(create_query);
  798. return;
  799. }
  800. }
  801. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  802. node_instance_connection_t node_state_update = {
  803. .hops = host->system_info->hops,
  804. .live = cmd,
  805. .queryable = queryable,
  806. .session_id = aclk_session_newarch
  807. };
  808. node_state_update.node_id = mallocz(UUID_STR_LEN);
  809. uuid_unparse_lower(node_id, (char*)node_state_update.node_id);
  810. node_state_update.capabilities = aclk_get_agent_capas();
  811. rrdhost_aclk_state_lock(localhost);
  812. node_state_update.claim_id = localhost->aclk_state.claimed_id;
  813. query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
  814. rrdhost_aclk_state_unlock(localhost);
  815. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  816. "Queuing status update for node=%s, live=%d, hops=%u, queryable=%d",
  817. (char*)node_state_update.node_id, cmd, host->system_info->hops, queryable);
  818. freez((void*)node_state_update.node_id);
  819. query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
  820. query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
  821. aclk_queue_query(query);
  822. }
  823. void aclk_send_node_instances()
  824. {
  825. struct node_instance_list *list_head = get_node_list();
  826. struct node_instance_list *list = list_head;
  827. if (unlikely(!list)) {
  828. error_report("Failure to get_node_list from DB!");
  829. return;
  830. }
  831. while (!uuid_is_null(list->host_id)) {
  832. if (!uuid_is_null(list->node_id)) {
  833. aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
  834. node_instance_connection_t node_state_update = {
  835. .live = list->live,
  836. .hops = list->hops,
  837. .queryable = 1,
  838. .session_id = aclk_session_newarch
  839. };
  840. node_state_update.node_id = mallocz(UUID_STR_LEN);
  841. uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);
  842. char host_id[UUID_STR_LEN];
  843. uuid_unparse_lower(list->host_id, host_id);
  844. RRDHOST *host = rrdhost_find_by_guid(host_id);
  845. if (unlikely(!host)) {
  846. freez((void*)node_state_update.node_id);
  847. freez(query);
  848. continue;
  849. }
  850. node_state_update.capabilities = aclk_get_node_instance_capas(host);
  851. rrdhost_aclk_state_lock(localhost);
  852. node_state_update.claim_id = localhost->aclk_state.claimed_id;
  853. query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
  854. rrdhost_aclk_state_unlock(localhost);
  855. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  856. "Queuing status update for node=%s, live=%d, hops=%d, queryable=1",
  857. (char*)node_state_update.node_id, list->live, list->hops);
  858. freez((void*)node_state_update.capabilities);
  859. freez((void*)node_state_update.node_id);
  860. query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
  861. query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
  862. aclk_queue_query(query);
  863. } else {
  864. aclk_query_t create_query;
  865. create_query = aclk_query_new(REGISTER_NODE);
  866. node_instance_creation_t node_instance_creation = {
  867. .hops = list->hops,
  868. .hostname = list->hostname,
  869. };
  870. node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
  871. uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
  872. create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
  873. create_query->data.bin_payload.msg_name = "CreateNodeInstance";
  874. rrdhost_aclk_state_lock(localhost);
  875. node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
  876. create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
  877. rrdhost_aclk_state_unlock(localhost);
  878. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  879. "Queuing registration for host=%s, hops=%d",
  880. (char*)node_instance_creation.machine_guid, list->hops);
  881. freez((void *)node_instance_creation.machine_guid);
  882. aclk_queue_query(create_query);
  883. }
  884. freez(list->hostname);
  885. list++;
  886. }
  887. freez(list_head);
  888. }
  889. void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
  890. {
  891. aclk_send_bin_message_subtopic_pid(mqttwss_client, msg, msg_len, subtopic, msgname);
  892. }
  893. static void fill_alert_status_for_host(BUFFER *wb, RRDHOST *host)
  894. {
  895. struct proto_alert_status status;
  896. memset(&status, 0, sizeof(status));
  897. if (get_proto_alert_status(host, &status)) {
  898. buffer_strcat(wb, "\nFailed to get alert streaming status for this host");
  899. return;
  900. }
  901. buffer_sprintf(wb,
  902. "\n\t\tUpdates: %d"
  903. "\n\t\tPending Min Seq ID: %"PRIu64
  904. "\n\t\tPending Max Seq ID: %"PRIu64
  905. "\n\t\tLast Submitted Seq ID: %"PRIu64,
  906. status.alert_updates,
  907. status.pending_min_sequence_id,
  908. status.pending_max_sequence_id,
  909. status.last_submitted_sequence_id
  910. );
  911. }
  912. #endif /* ENABLE_ACLK */
  913. char *aclk_state(void)
  914. {
  915. #ifndef ENABLE_ACLK
  916. return strdupz("ACLK Available: No");
  917. #else
  918. BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_aclk);
  919. struct tm *tmptr, tmbuf;
  920. char *ret;
  921. buffer_strcat(wb,
  922. "ACLK Available: Yes\n"
  923. "ACLK Version: 2\n"
  924. "Protocols Supported: Protobuf\n"
  925. );
  926. buffer_sprintf(wb, "Protocol Used: Protobuf\nMQTT Version: %d\nClaimed: ", 5);
  927. char *agent_id = get_agent_claimid();
  928. if (agent_id == NULL)
  929. buffer_strcat(wb, "No\n");
  930. else {
  931. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  932. buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\n", agent_id, cloud_base_url ? cloud_base_url : "null");
  933. freez(agent_id);
  934. }
  935. buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_connected ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
  936. if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
  937. char timebuf[26];
  938. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  939. buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
  940. }
  941. if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
  942. char timebuf[26];
  943. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  944. buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
  945. }
  946. if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
  947. char timebuf[26];
  948. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  949. buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
  950. }
  951. if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
  952. char timebuf[26];
  953. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  954. buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
  955. }
  956. if (aclk_connected) {
  957. buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
  958. RRDHOST *host;
  959. rrd_rdlock();
  960. rrdhost_foreach_read(host) {
  961. buffer_sprintf(wb, "\n\n> Node Instance for mGUID: \"%s\" hostname \"%s\"\n", host->machine_guid, rrdhost_hostname(host));
  962. buffer_strcat(wb, "\tClaimed ID: ");
  963. rrdhost_aclk_state_lock(host);
  964. if (host->aclk_state.claimed_id)
  965. buffer_strcat(wb, host->aclk_state.claimed_id);
  966. else
  967. buffer_strcat(wb, "null");
  968. rrdhost_aclk_state_unlock(host);
  969. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  970. buffer_strcat(wb, "\n\tNode ID: null\n");
  971. } else {
  972. char node_id[GUID_LEN + 1];
  973. uuid_unparse_lower(*host->node_id, node_id);
  974. buffer_sprintf(wb, "\n\tNode ID: %s\n", node_id);
  975. }
  976. buffer_sprintf(wb, "\tStreaming Hops: %d\n\tRelationship: %s", host->system_info->hops, host == localhost ? "self" : "child");
  977. if (host != localhost)
  978. buffer_sprintf(wb, "\n\tStreaming Connection Live: %s", host->receiver ? "true" : "false");
  979. buffer_strcat(wb, "\n\tAlert Streaming Status:");
  980. fill_alert_status_for_host(wb, host);
  981. }
  982. rrd_unlock();
  983. }
  984. ret = strdupz(buffer_tostring(wb));
  985. buffer_free(wb);
  986. return ret;
  987. #endif /* ENABLE_ACLK */
  988. }
  989. #ifdef ENABLE_ACLK
  990. static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
  991. {
  992. struct proto_alert_status status;
  993. memset(&status, 0, sizeof(status));
  994. if (get_proto_alert_status(host, &status))
  995. return;
  996. json_object *tmp = json_object_new_int(status.alert_updates);
  997. json_object_object_add(obj, "updates", tmp);
  998. tmp = json_object_new_int(status.pending_min_sequence_id);
  999. json_object_object_add(obj, "pending-min-seq-id", tmp);
  1000. tmp = json_object_new_int(status.pending_max_sequence_id);
  1001. json_object_object_add(obj, "pending-max-seq-id", tmp);
  1002. tmp = json_object_new_int(status.last_submitted_sequence_id);
  1003. json_object_object_add(obj, "last-submitted-seq-id", tmp);
  1004. }
  1005. static json_object *timestamp_to_json(const time_t *t)
  1006. {
  1007. struct tm *tmptr, tmbuf;
  1008. if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
  1009. char timebuf[26];
  1010. strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
  1011. return json_object_new_string(timebuf);
  1012. }
  1013. return NULL;
  1014. }
  1015. #endif /* ENABLE_ACLK */
  1016. char *aclk_state_json(void)
  1017. {
  1018. #ifndef ENABLE_ACLK
  1019. return strdupz("{\"aclk-available\":false}");
  1020. #else
  1021. json_object *tmp, *grp, *msg = json_object_new_object();
  1022. tmp = json_object_new_boolean(1);
  1023. json_object_object_add(msg, "aclk-available", tmp);
  1024. tmp = json_object_new_int(2);
  1025. json_object_object_add(msg, "aclk-version", tmp);
  1026. grp = json_object_new_array();
  1027. tmp = json_object_new_string("Protobuf");
  1028. json_object_array_add(grp, tmp);
  1029. json_object_object_add(msg, "protocols-supported", grp);
  1030. char *agent_id = get_agent_claimid();
  1031. tmp = json_object_new_boolean(agent_id != NULL);
  1032. json_object_object_add(msg, "agent-claimed", tmp);
  1033. if (agent_id) {
  1034. tmp = json_object_new_string(agent_id);
  1035. freez(agent_id);
  1036. } else
  1037. tmp = NULL;
  1038. json_object_object_add(msg, "claimed-id", tmp);
  1039. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  1040. tmp = cloud_base_url ? json_object_new_string(cloud_base_url) : NULL;
  1041. json_object_object_add(msg, "cloud-url", tmp);
  1042. tmp = json_object_new_boolean(aclk_connected);
  1043. json_object_object_add(msg, "online", tmp);
  1044. tmp = json_object_new_string("Protobuf");
  1045. json_object_object_add(msg, "used-cloud-protocol", tmp);
  1046. tmp = json_object_new_int(5);
  1047. json_object_object_add(msg, "mqtt-version", tmp);
  1048. tmp = json_object_new_int(aclk_rcvd_cloud_msgs);
  1049. json_object_object_add(msg, "received-app-layer-msgs", tmp);
  1050. tmp = json_object_new_int(aclk_pubacks_per_conn);
  1051. json_object_object_add(msg, "received-mqtt-pubacks", tmp);
  1052. tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
  1053. json_object_object_add(msg, "reconnect-count", tmp);
  1054. json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
  1055. json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
  1056. json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
  1057. json_object_object_add(msg, "next-connection-attempt-utc", !aclk_connected ? timestamp_to_json(&next_connection_attempt) : NULL);
  1058. tmp = NULL;
  1059. if (!aclk_connected && last_backoff_value)
  1060. tmp = json_object_new_double(last_backoff_value);
  1061. json_object_object_add(msg, "last-backoff-value", tmp);
  1062. tmp = json_object_new_boolean(aclk_disable_runtime);
  1063. json_object_object_add(msg, "banned-by-cloud", tmp);
  1064. grp = json_object_new_array();
  1065. RRDHOST *host;
  1066. rrd_rdlock();
  1067. rrdhost_foreach_read(host) {
  1068. json_object *nodeinstance = json_object_new_object();
  1069. tmp = json_object_new_string(rrdhost_hostname(host));
  1070. json_object_object_add(nodeinstance, "hostname", tmp);
  1071. tmp = json_object_new_string(host->machine_guid);
  1072. json_object_object_add(nodeinstance, "mguid", tmp);
  1073. rrdhost_aclk_state_lock(host);
  1074. if (host->aclk_state.claimed_id) {
  1075. tmp = json_object_new_string(host->aclk_state.claimed_id);
  1076. json_object_object_add(nodeinstance, "claimed_id", tmp);
  1077. } else
  1078. json_object_object_add(nodeinstance, "claimed_id", NULL);
  1079. rrdhost_aclk_state_unlock(host);
  1080. if (host->node_id == NULL || uuid_is_null(*host->node_id)) {
  1081. json_object_object_add(nodeinstance, "node-id", NULL);
  1082. } else {
  1083. char node_id[GUID_LEN + 1];
  1084. uuid_unparse_lower(*host->node_id, node_id);
  1085. tmp = json_object_new_string(node_id);
  1086. json_object_object_add(nodeinstance, "node-id", tmp);
  1087. }
  1088. tmp = json_object_new_int(host->system_info->hops);
  1089. json_object_object_add(nodeinstance, "streaming-hops", tmp);
  1090. tmp = json_object_new_string(host == localhost ? "self" : "child");
  1091. json_object_object_add(nodeinstance, "relationship", tmp);
  1092. tmp = json_object_new_boolean((host->receiver || host == localhost));
  1093. json_object_object_add(nodeinstance, "streaming-online", tmp);
  1094. tmp = json_object_new_object();
  1095. fill_alert_status_for_host_json(tmp, host);
  1096. json_object_object_add(nodeinstance, "alert-sync-status", tmp);
  1097. json_object_array_add(grp, nodeinstance);
  1098. }
  1099. rrd_unlock();
  1100. json_object_object_add(msg, "node-instances", grp);
  1101. char *str = strdupz(json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN));
  1102. json_object_put(msg);
  1103. return str;
  1104. #endif /* ENABLE_ACLK */
  1105. }
  1106. void add_aclk_host_labels(void) {
  1107. RRDLABELS *labels = localhost->rrdlabels;
  1108. #ifdef ENABLE_ACLK
  1109. rrdlabels_add(labels, "_aclk_available", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  1110. ACLK_PROXY_TYPE aclk_proxy;
  1111. char *proxy_str;
  1112. aclk_get_proxy(&aclk_proxy);
  1113. switch(aclk_proxy) {
  1114. case PROXY_TYPE_SOCKS5:
  1115. proxy_str = "SOCKS5";
  1116. break;
  1117. case PROXY_TYPE_HTTP:
  1118. proxy_str = "HTTP";
  1119. break;
  1120. default:
  1121. proxy_str = "none";
  1122. break;
  1123. }
  1124. rrdlabels_add(labels, "_mqtt_version", "5", RRDLABEL_SRC_AUTO);
  1125. rrdlabels_add(labels, "_aclk_proxy", proxy_str, RRDLABEL_SRC_AUTO);
  1126. rrdlabels_add(labels, "_aclk_ng_new_cloud_protocol", "true", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  1127. #else
  1128. rrdlabels_add(labels, "_aclk_available", "false", RRDLABEL_SRC_AUTO|RRDLABEL_SRC_ACLK);
  1129. #endif
  1130. }
  1131. void aclk_queue_node_info(RRDHOST *host, bool immediate)
  1132. {
  1133. struct aclk_sync_cfg_t *wc = host->aclk_config;
  1134. if (likely(wc))
  1135. wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec();
  1136. }