agent_cloud_link.c 51 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "libnetdata/libnetdata.h"
  3. #include "agent_cloud_link.h"
  4. #include "aclk_lws_https_client.h"
  5. #include "aclk_query.h"
  6. #include "aclk_common.h"
  7. #include "aclk_stats.h"
  8. #ifdef ENABLE_ACLK
  9. #include <libwebsockets.h>
  10. #endif
  11. int aclk_shutting_down = 0;
  12. // Other global state
  13. static int aclk_subscribed = 0;
  14. static int aclk_disable_single_updates = 0;
  15. static char *aclk_username = NULL;
  16. static char *aclk_password = NULL;
  17. static char *global_base_topic = NULL;
  18. static int aclk_connecting = 0;
  19. int aclk_force_reconnect = 0; // Indication from lower layers
  20. usec_t aclk_session_us = 0; // Used by the mqtt layer
  21. time_t aclk_session_sec = 0; // Used by the mqtt layer
  22. static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
  23. static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
  24. #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
  25. #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
  26. #define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
  27. #define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
  28. void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
  29. void aclk_lws_wss_destroy_context();
  30. /*
  31. * Maintain a list of collectors and chart count
  32. * If all the charts of a collector are deleted
  33. * then a new metadata dataset must be send to the cloud
  34. *
  35. */
  36. struct _collector {
  37. time_t created;
  38. uint32_t count; //chart count
  39. uint32_t hostname_hash;
  40. uint32_t plugin_hash;
  41. uint32_t module_hash;
  42. char *hostname;
  43. char *plugin_name;
  44. char *module_name;
  45. struct _collector *next;
  46. };
  47. struct _collector *collector_list = NULL;
  48. char *create_uuid()
  49. {
  50. uuid_t uuid;
  51. char *uuid_str = mallocz(36 + 1);
  52. uuid_generate(uuid);
  53. uuid_unparse(uuid, uuid_str);
  54. return uuid_str;
  55. }
  56. int cloud_to_agent_parse(JSON_ENTRY *e)
  57. {
  58. struct aclk_request *data = e->callback_data;
  59. switch (e->type) {
  60. case JSON_OBJECT:
  61. case JSON_ARRAY:
  62. break;
  63. case JSON_STRING:
  64. if (!strcmp(e->name, "msg-id")) {
  65. data->msg_id = strdupz(e->data.string);
  66. break;
  67. }
  68. if (!strcmp(e->name, "type")) {
  69. data->type_id = strdupz(e->data.string);
  70. break;
  71. }
  72. if (!strcmp(e->name, "callback-topic")) {
  73. data->callback_topic = strdupz(e->data.string);
  74. break;
  75. }
  76. if (!strcmp(e->name, "payload")) {
  77. if (likely(e->data.string)) {
  78. size_t len = strlen(e->data.string);
  79. data->payload = mallocz(len+1);
  80. if (!url_decode_r(data->payload, e->data.string, len + 1))
  81. strcpy(data->payload, e->data.string);
  82. }
  83. break;
  84. }
  85. break;
  86. case JSON_NUMBER:
  87. if (!strcmp(e->name, "version")) {
  88. data->version = e->data.number;
  89. break;
  90. }
  91. if (!strcmp(e->name, "min-version")) {
  92. data->min_version = e->data.number;
  93. break;
  94. }
  95. if (!strcmp(e->name, "max-version")) {
  96. data->max_version = e->data.number;
  97. break;
  98. }
  99. break;
  100. case JSON_BOOLEAN:
  101. break;
  102. case JSON_NULL:
  103. break;
  104. }
  105. return 0;
  106. }
  107. static RSA *aclk_private_key = NULL;
  108. static int create_private_key()
  109. {
  110. if (aclk_private_key != NULL)
  111. RSA_free(aclk_private_key);
  112. aclk_private_key = NULL;
  113. char filename[FILENAME_MAX + 1];
  114. snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
  115. long bytes_read;
  116. char *private_key = read_by_filename(filename, &bytes_read);
  117. if (!private_key) {
  118. error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  119. return 1;
  120. }
  121. debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  122. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  123. if (key_bio==NULL) {
  124. error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  125. goto biofailed;
  126. }
  127. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  128. BIO_free(key_bio);
  129. if (aclk_private_key!=NULL)
  130. {
  131. freez(private_key);
  132. return 0;
  133. }
  134. char err[512];
  135. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  136. error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  137. biofailed:
  138. freez(private_key);
  139. return 1;
  140. }
  141. /*
  142. * After a connection failure -- delay in milliseconds
  143. * When a connection is established, the delay function
  144. * should be called with
  145. *
  146. * mode 0 to reset the delay
  147. * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
  148. *
  149. */
  150. unsigned long int aclk_reconnect_delay(int mode)
  151. {
  152. static int fail = -1;
  153. unsigned long int delay;
  154. if (!mode || fail == -1) {
  155. srandom(time(NULL));
  156. fail = mode - 1;
  157. return 0;
  158. }
  159. delay = (1 << fail);
  160. if (delay >= ACLK_MAX_BACKOFF_DELAY) {
  161. delay = ACLK_MAX_BACKOFF_DELAY * 1000;
  162. } else {
  163. fail++;
  164. delay *= 1000;
  165. delay += (random() % (MAX(1000, delay/2)));
  166. }
  167. return delay;
  168. }
  169. // This will give the base topic that the agent will publish messages.
  170. // subtopics will be sent under the base topic e.g. base_topic/subtopic
  171. // This is called during the connection, we delete any previous topic
  172. // in-case the user has changed the agent id and reclaimed.
  173. char *create_publish_base_topic()
  174. {
  175. char *agent_id = is_agent_claimed();
  176. if (unlikely(!agent_id))
  177. return NULL;
  178. ACLK_LOCK;
  179. if (global_base_topic)
  180. freez(global_base_topic);
  181. char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
  182. snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id);
  183. tmp = strchr(tmp_topic, '\n');
  184. if (unlikely(tmp))
  185. *tmp = '\0';
  186. global_base_topic = strdupz(tmp_topic);
  187. ACLK_UNLOCK;
  188. freez(agent_id);
  189. return global_base_topic;
  190. }
  191. /*
  192. * Build a topic based on sub_topic and final_topic
  193. * if the sub topic starts with / assume that is an absolute topic
  194. *
  195. */
  196. char *get_topic(char *sub_topic, char *final_topic, int max_size)
  197. {
  198. int rc;
  199. if (likely(sub_topic && sub_topic[0] == '/'))
  200. return sub_topic;
  201. if (unlikely(!global_base_topic))
  202. return sub_topic;
  203. rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
  204. if (unlikely(rc >= max_size))
  205. debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);
  206. return final_topic;
  207. }
  208. #ifndef __GNUC__
  209. #pragma region ACLK Internal Collector Tracking
  210. #endif
  211. /*
  212. * Free a collector structure
  213. */
  214. static void _free_collector(struct _collector *collector)
  215. {
  216. if (likely(collector->plugin_name))
  217. freez(collector->plugin_name);
  218. if (likely(collector->module_name))
  219. freez(collector->module_name);
  220. if (likely(collector->hostname))
  221. freez(collector->hostname);
  222. freez(collector);
  223. }
  224. /*
  225. * This will report the collector list
  226. *
  227. */
  228. #ifdef ACLK_DEBUG
  229. static void _dump_collector_list()
  230. {
  231. struct _collector *tmp_collector;
  232. COLLECTOR_LOCK;
  233. info("DUMPING ALL COLLECTORS");
  234. if (unlikely(!collector_list || !collector_list->next)) {
  235. COLLECTOR_UNLOCK;
  236. info("DUMPING ALL COLLECTORS -- nothing found");
  237. return;
  238. }
  239. // Note that the first entry is "dummy"
  240. tmp_collector = collector_list->next;
  241. while (tmp_collector) {
  242. info(
  243. "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
  244. tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
  245. tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
  246. tmp_collector = tmp_collector->next;
  247. }
  248. info("DUMPING ALL COLLECTORS DONE");
  249. COLLECTOR_UNLOCK;
  250. }
  251. #endif
  252. /*
  253. * This will cleanup the collector list
  254. *
  255. */
  256. static void _reset_collector_list()
  257. {
  258. struct _collector *tmp_collector, *next_collector;
  259. COLLECTOR_LOCK;
  260. if (unlikely(!collector_list || !collector_list->next)) {
  261. COLLECTOR_UNLOCK;
  262. return;
  263. }
  264. // Note that the first entry is "dummy"
  265. tmp_collector = collector_list->next;
  266. collector_list->count = 0;
  267. collector_list->next = NULL;
  268. // We broke the link; we can unlock
  269. COLLECTOR_UNLOCK;
  270. while (tmp_collector) {
  271. next_collector = tmp_collector->next;
  272. _free_collector(tmp_collector);
  273. tmp_collector = next_collector;
  274. }
  275. }
  276. /*
  277. * Find a collector (if it exists)
  278. * Must lock before calling this
  279. * If last_collector is not null, it will return the previous collector in the linked
  280. * list (used in collector delete)
  281. */
  282. static struct _collector *_find_collector(
  283. const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
  284. {
  285. struct _collector *tmp_collector, *prev_collector;
  286. uint32_t plugin_hash;
  287. uint32_t module_hash;
  288. uint32_t hostname_hash;
  289. if (unlikely(!collector_list)) {
  290. collector_list = callocz(1, sizeof(struct _collector));
  291. return NULL;
  292. }
  293. if (unlikely(!collector_list->next))
  294. return NULL;
  295. plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  296. module_hash = module_name ? simple_hash(module_name) : 1;
  297. hostname_hash = simple_hash(hostname);
  298. // Note that the first entry is "dummy"
  299. tmp_collector = collector_list->next;
  300. prev_collector = collector_list;
  301. while (tmp_collector) {
  302. if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
  303. hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
  304. (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
  305. (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
  306. if (unlikely(last_collector))
  307. *last_collector = prev_collector;
  308. return tmp_collector;
  309. }
  310. prev_collector = tmp_collector;
  311. tmp_collector = tmp_collector->next;
  312. }
  313. return tmp_collector;
  314. }
  315. /*
  316. * Called to delete a collector
  317. * It will reduce the count (chart_count) and will remove it
  318. * from the linked list if the count reaches zero
  319. * The structure will be returned to the caller to free
  320. * the resources
  321. *
  322. */
  323. static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
  324. {
  325. struct _collector *tmp_collector, *prev_collector = NULL;
  326. tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
  327. if (likely(tmp_collector)) {
  328. --tmp_collector->count;
  329. if (unlikely(!tmp_collector->count))
  330. prev_collector->next = tmp_collector->next;
  331. }
  332. return tmp_collector;
  333. }
  334. /*
  335. * Add a new collector (plugin / module) to the list
  336. * If it already exists just update the chart count
  337. *
  338. * Lock before calling
  339. */
  340. static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
  341. {
  342. struct _collector *tmp_collector;
  343. tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
  344. if (unlikely(!tmp_collector)) {
  345. tmp_collector = callocz(1, sizeof(struct _collector));
  346. tmp_collector->hostname_hash = simple_hash(hostname);
  347. tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  348. tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
  349. tmp_collector->hostname = strdupz(hostname);
  350. tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
  351. tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
  352. tmp_collector->next = collector_list->next;
  353. collector_list->next = tmp_collector;
  354. }
  355. tmp_collector->count++;
  356. debug(
  357. D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
  358. module_name ? module_name : "*", tmp_collector->count);
  359. return tmp_collector;
  360. }
  361. #ifndef __GNUC__
  362. #pragma endregion
  363. #endif
  364. /* Avoids the need to scan trough all RRDHOSTS
  365. * every time any Query Thread Wakes Up
  366. * (every time we need to check child popcorn expiry)
  367. * call with ACLK_SHARED_STATE_LOCK held
  368. */
  369. void aclk_update_next_child_to_popcorn(void)
  370. {
  371. RRDHOST *host;
  372. int any = 0;
  373. rrd_rdlock();
  374. rrdhost_foreach_read(host) {
  375. if (unlikely(host == localhost || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)))
  376. continue;
  377. rrdhost_aclk_state_lock(host);
  378. if (!ACLK_IS_HOST_POPCORNING(host)) {
  379. rrdhost_aclk_state_unlock(host);
  380. continue;
  381. }
  382. any = 1;
  383. if (unlikely(!aclk_shared_state.next_popcorn_host)) {
  384. aclk_shared_state.next_popcorn_host = host;
  385. rrdhost_aclk_state_unlock(host);
  386. continue;
  387. }
  388. if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
  389. aclk_shared_state.next_popcorn_host = host;
  390. rrdhost_aclk_state_unlock(host);
  391. }
  392. if(!any)
  393. aclk_shared_state.next_popcorn_host = NULL;
  394. rrd_unlock();
  395. }
  396. /* If popcorning bump timer.
  397. * If popcorning or initializing (host not stable) return 1
  398. * Otherwise return 0
  399. */
  400. static int aclk_popcorn_check_bump(RRDHOST *host)
  401. {
  402. time_t now = now_monotonic_sec();
  403. int updated = 0, ret;
  404. ACLK_SHARED_STATE_LOCK;
  405. rrdhost_aclk_state_lock(host);
  406. ret = ACLK_IS_HOST_INITIALIZING(host);
  407. if (unlikely(ACLK_IS_HOST_POPCORNING(host))) {
  408. if(now != host->aclk_state.t_last_popcorn_update) {
  409. updated = 1;
  410. info("Restarting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
  411. }
  412. host->aclk_state.t_last_popcorn_update = now;
  413. rrdhost_aclk_state_unlock(host);
  414. if (host != localhost && updated)
  415. aclk_update_next_child_to_popcorn();
  416. ACLK_SHARED_STATE_UNLOCK;
  417. return ret;
  418. }
  419. rrdhost_aclk_state_unlock(host);
  420. ACLK_SHARED_STATE_UNLOCK;
  421. return ret;
  422. }
  423. inline static int aclk_host_initializing(RRDHOST *host)
  424. {
  425. rrdhost_aclk_state_lock(host);
  426. int ret = ACLK_IS_HOST_INITIALIZING(host);
  427. rrdhost_aclk_state_unlock(host);
  428. return ret;
  429. }
  430. static void aclk_start_host_popcorning(RRDHOST *host)
  431. {
  432. usec_t now = now_monotonic_sec();
  433. info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
  434. ACLK_SHARED_STATE_LOCK;
  435. rrdhost_aclk_state_lock(host);
  436. if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) {
  437. errno = 0;
  438. error("Localhost is allowed to do popcorning only once after startup!");
  439. rrdhost_aclk_state_unlock(host);
  440. ACLK_SHARED_STATE_UNLOCK;
  441. return;
  442. }
  443. host->aclk_state.state = ACLK_HOST_INITIALIZING;
  444. host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
  445. host->aclk_state.t_last_popcorn_update = now;
  446. rrdhost_aclk_state_unlock(host);
  447. if (host != localhost)
  448. aclk_update_next_child_to_popcorn();
  449. ACLK_SHARED_STATE_UNLOCK;
  450. }
  451. static void aclk_stop_host_popcorning(RRDHOST *host)
  452. {
  453. ACLK_SHARED_STATE_LOCK;
  454. rrdhost_aclk_state_lock(host);
  455. if (!ACLK_IS_HOST_POPCORNING(host)) {
  456. rrdhost_aclk_state_unlock(host);
  457. ACLK_SHARED_STATE_UNLOCK;
  458. return;
  459. }
  460. info("Host Disconnected before ACLK popcorning finished. Canceling. Host \"%s\" GUID:\"%s\"", host->hostname, host->machine_guid);
  461. host->aclk_state.t_last_popcorn_update = 0;
  462. host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
  463. rrdhost_aclk_state_unlock(host);
  464. if(host == aclk_shared_state.next_popcorn_host) {
  465. aclk_shared_state.next_popcorn_host = NULL;
  466. aclk_update_next_child_to_popcorn();
  467. }
  468. ACLK_SHARED_STATE_UNLOCK;
  469. }
  470. /*
  471. * Add a new collector to the list
  472. * If it exists, update the chart count
  473. */
  474. void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
  475. {
  476. struct _collector *tmp_collector;
  477. if (unlikely(!netdata_ready)) {
  478. return;
  479. }
  480. COLLECTOR_LOCK;
  481. tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name);
  482. if (unlikely(tmp_collector->count != 1)) {
  483. COLLECTOR_UNLOCK;
  484. return;
  485. }
  486. COLLECTOR_UNLOCK;
  487. if(aclk_popcorn_check_bump(host))
  488. return;
  489. if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  490. debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
  491. }
  492. /*
  493. * Delete a collector from the list
  494. * If the chart count reaches zero the collector will be removed
  495. * from the list by calling del_collector.
  496. *
  497. * This function will release the memory used and schedule
  498. * a cloud update
  499. */
  500. void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
  501. {
  502. struct _collector *tmp_collector;
  503. if (unlikely(!netdata_ready)) {
  504. return;
  505. }
  506. COLLECTOR_LOCK;
  507. tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name);
  508. if (unlikely(!tmp_collector || tmp_collector->count)) {
  509. COLLECTOR_UNLOCK;
  510. return;
  511. }
  512. debug(
  513. D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
  514. tmp_collector->count);
  515. COLLECTOR_UNLOCK;
  516. _free_collector(tmp_collector);
  517. if (aclk_popcorn_check_bump(host))
  518. return;
  519. if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  520. debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
  521. }
  522. static void aclk_graceful_disconnect()
  523. {
  524. size_t write_q, write_q_bytes, read_q;
  525. time_t event_loop_timeout;
  526. // Send a graceful disconnect message
  527. BUFFER *b = buffer_create(512);
  528. aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg);
  529. buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}");
  530. aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
  531. buffer_free(b);
  532. event_loop_timeout = now_realtime_sec() + 5;
  533. write_q = 1;
  534. while (write_q && event_loop_timeout > now_realtime_sec()) {
  535. _link_event_loop();
  536. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  537. }
  538. aclk_shutting_down = 1;
  539. _link_shutdown();
  540. aclk_lws_wss_mqtt_layer_disconect_notif();
  541. write_q = 1;
  542. event_loop_timeout = now_realtime_sec() + 5;
  543. while (write_q && event_loop_timeout > now_realtime_sec()) {
  544. _link_event_loop();
  545. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  546. }
  547. aclk_shutting_down = 0;
  548. }
  549. #ifndef __GNUC__
  550. #pragma region Incoming Msg Parsing
  551. #endif
  552. struct dictionary_singleton {
  553. char *key;
  554. char *result;
  555. };
  556. int json_extract_singleton(JSON_ENTRY *e)
  557. {
  558. struct dictionary_singleton *data = e->callback_data;
  559. switch (e->type) {
  560. case JSON_OBJECT:
  561. case JSON_ARRAY:
  562. break;
  563. case JSON_STRING:
  564. if (!strcmp(e->name, data->key)) {
  565. data->result = strdupz(e->data.string);
  566. break;
  567. }
  568. break;
  569. case JSON_NUMBER:
  570. case JSON_BOOLEAN:
  571. case JSON_NULL:
  572. break;
  573. }
  574. return 0;
  575. }
  576. #ifndef __GNUC__
  577. #pragma endregion
  578. #endif
  579. #ifndef __GNUC__
  580. #pragma region Challenge Response
  581. #endif
  582. // Base-64 decoder.
  583. // Note: This is non-validating, invalid input will be decoded without an error.
  584. // Challenges are packed into json strings so we don't skip newlines.
  585. // Size errors (i.e. invalid input size or insufficient output space) are caught.
  586. size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
  587. {
  588. static char lookup[256];
  589. static int first_time=1;
  590. if (first_time)
  591. {
  592. first_time = 0;
  593. for(int i=0; i<256; i++)
  594. lookup[i] = -1;
  595. for(int i='A'; i<='Z'; i++)
  596. lookup[i] = i-'A';
  597. for(int i='a'; i<='z'; i++)
  598. lookup[i] = i-'a' + 26;
  599. for(int i='0'; i<='9'; i++)
  600. lookup[i] = i-'0' + 52;
  601. lookup['+'] = 62;
  602. lookup['/'] = 63;
  603. }
  604. if ((input_size & 3) != 0)
  605. {
  606. error("Can't decode base-64 input length %zu", input_size);
  607. return 0;
  608. }
  609. size_t unpadded_size = (input_size/4) * 3;
  610. if ( unpadded_size > output_size )
  611. {
  612. error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
  613. return 0;
  614. }
  615. // Don't check padding within full quantums
  616. for (size_t i = 0 ; i < input_size-4 ; i+=4 )
  617. {
  618. uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
  619. output[0] = value >> 16;
  620. output[1] = value >> 8;
  621. output[2] = value;
  622. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  623. output += 3;
  624. input += 4;
  625. }
  626. // Handle padding only in last quantum
  627. if (input[2] == '=') {
  628. uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
  629. output[0] = value >> 4;
  630. //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
  631. return unpadded_size-2;
  632. }
  633. else if (input[3] == '=') {
  634. uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
  635. output[0] = value >> 10;
  636. output[1] = value >> 2;
  637. //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
  638. return unpadded_size-1;
  639. }
  640. else
  641. {
  642. uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
  643. output[0] = value >> 16;
  644. output[1] = value >> 8;
  645. output[2] = value;
  646. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  647. return unpadded_size;
  648. }
  649. }
  650. size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
  651. {
  652. uint32_t value;
  653. static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
  654. "abcdefghijklmnopqrstuvwxyz"
  655. "0123456789+/";
  656. if ((input_size/3+1)*4 >= output_size)
  657. {
  658. error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
  659. return 0;
  660. }
  661. size_t count = 0;
  662. while (input_size>3)
  663. {
  664. value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
  665. output[0] = lookup[value >> 18];
  666. output[1] = lookup[(value >> 12) & 0x3f];
  667. output[2] = lookup[(value >> 6) & 0x3f];
  668. output[3] = lookup[value & 0x3f];
  669. //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  670. output += 4;
  671. input += 3;
  672. input_size -= 3;
  673. count += 4;
  674. }
  675. switch (input_size)
  676. {
  677. case 2:
  678. value = (input[0] << 10) + (input[1] << 2);
  679. output[0] = lookup[(value >> 12) & 0x3f];
  680. output[1] = lookup[(value >> 6) & 0x3f];
  681. output[2] = lookup[value & 0x3f];
  682. output[3] = '=';
  683. //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
  684. count += 4;
  685. break;
  686. case 1:
  687. value = input[0] << 4;
  688. output[0] = lookup[(value >> 6) & 0x3f];
  689. output[1] = lookup[value & 0x3f];
  690. output[2] = '=';
  691. output[3] = '=';
  692. //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  693. count += 4;
  694. break;
  695. case 0:
  696. break;
  697. }
  698. return count;
  699. }
  700. int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted)
  701. {
  702. int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING);
  703. if (result == -1) {
  704. char err[512];
  705. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  706. error("Decryption of the challenge failed: %s", err);
  707. }
  708. return result;
  709. }
  710. void aclk_get_challenge(char *aclk_hostname, int port)
  711. {
  712. char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  713. debug(D_ACLK, "Performing challenge-response sequence");
  714. if (aclk_password != NULL)
  715. {
  716. freez(aclk_password);
  717. aclk_password = NULL;
  718. }
  719. // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
  720. // TODO - target host?
  721. char *agent_id = is_agent_claimed();
  722. if (agent_id == NULL)
  723. {
  724. error("Agent was not claimed - cannot perform challenge/response");
  725. goto CLEANUP;
  726. }
  727. char url[1024];
  728. sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
  729. info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url);
  730. if(aclk_send_https_request("GET", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
  731. {
  732. error("Challenge failed: %s", data_buffer);
  733. goto CLEANUP;
  734. }
  735. struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
  736. debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
  737. if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
  738. {
  739. freez(challenge.result);
  740. error("Could not parse the json response with the challenge: %s", data_buffer);
  741. goto CLEANUP;
  742. }
  743. if (challenge.result == NULL ) {
  744. error("Could not retrieve challenge from auth response: %s", data_buffer);
  745. goto CLEANUP;
  746. }
  747. size_t challenge_len = strlen(challenge.result);
  748. unsigned char decoded[512];
  749. size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
  750. unsigned char plaintext[4096]={};
  751. int decrypted_length = private_decrypt(decoded, decoded_len, plaintext);
  752. freez(challenge.result);
  753. char encoded[512];
  754. size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
  755. encoded[encoded_len] = 0;
  756. debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
  757. char response_json[4096]={};
  758. sprintf(response_json, "{\"response\":\"%s\"}", encoded);
  759. debug(D_ACLK, "Password phase: %s",response_json);
  760. // TODO - host
  761. sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
  762. if(aclk_send_https_request("POST", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
  763. {
  764. error("Challenge-response failed: %s", data_buffer);
  765. goto CLEANUP;
  766. }
  767. debug(D_ACLK, "Password response from cloud: %s", data_buffer);
  768. struct dictionary_singleton password = { .key = "password", .result = NULL };
  769. if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
  770. {
  771. freez(password.result);
  772. error("Could not parse the json response with the password: %s", data_buffer);
  773. goto CLEANUP;
  774. }
  775. if (password.result == NULL ) {
  776. error("Could not retrieve password from auth response");
  777. goto CLEANUP;
  778. }
  779. if (aclk_password != NULL )
  780. freez(aclk_password);
  781. aclk_password = password.result;
  782. if (aclk_username != NULL)
  783. freez(aclk_username);
  784. aclk_username = agent_id;
  785. agent_id = NULL;
  786. CLEANUP:
  787. if (agent_id != NULL)
  788. freez(agent_id);
  789. freez(data_buffer);
  790. return;
  791. }
  792. #ifndef __GNUC__
  793. #pragma endregion
  794. #endif
  795. static void aclk_try_to_connect(char *hostname, int port)
  796. {
  797. int rc;
  798. // this is usefull for developers working on ACLK
  799. // allows connecting agent to any MQTT broker
  800. // for debugging, development and testing purposes
  801. #ifndef ACLK_DISABLE_CHALLENGE
  802. if (!aclk_private_key) {
  803. error("Cannot try to establish the agent cloud link - no private key available!");
  804. return;
  805. }
  806. #endif
  807. info("Attempting to establish the agent cloud link");
  808. #ifdef ACLK_DISABLE_CHALLENGE
  809. error("Agent built with ACLK_DISABLE_CHALLENGE. This is for testing "
  810. "and development purposes only. Warranty void. Won't be able "
  811. "to connect to Netdata Cloud.");
  812. if (aclk_password == NULL)
  813. aclk_password = strdupz("anon");
  814. #else
  815. aclk_get_challenge(hostname, port);
  816. if (aclk_password == NULL)
  817. return;
  818. #endif
  819. aclk_connecting = 1;
  820. create_publish_base_topic();
  821. ACLK_SHARED_STATE_LOCK;
  822. aclk_shared_state.version_neg = 0;
  823. aclk_shared_state.version_neg_wait_till = 0;
  824. ACLK_SHARED_STATE_UNLOCK;
  825. rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password);
  826. if (unlikely(rc)) {
  827. error("Failed to initialize the agent cloud link library");
  828. }
  829. }
  830. // Sends "hello" message to negotiate ACLK version with cloud
  831. static inline void aclk_hello_msg()
  832. {
  833. BUFFER *buf = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  834. char *msg_id = create_uuid();
  835. ACLK_SHARED_STATE_LOCK;
  836. aclk_shared_state.version_neg = 0;
  837. aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
  838. ACLK_SHARED_STATE_UNLOCK;
  839. //Hello message is versioned separatelly from the rest of the protocol
  840. aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
  841. buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX);
  842. aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id);
  843. freez(msg_id);
  844. buffer_free(buf);
  845. }
  846. /**
  847. * Main agent cloud link thread
  848. *
  849. * This thread will simply call the main event loop that handles
  850. * pending requests - both inbound and outbound
  851. *
  852. * @param ptr is a pointer to the netdata_static_thread structure.
  853. *
  854. * @return It always returns NULL
  855. */
  856. void *aclk_main(void *ptr)
  857. {
  858. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  859. struct aclk_query_threads query_threads;
  860. struct aclk_stats_thread *stats_thread = NULL;
  861. time_t last_periodic_query_wakeup = 0;
  862. query_threads.thread_list = NULL;
  863. // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
  864. // as it must notify the far end that it shutdown gracefully and avoid the LWT.
  865. netdata_thread_disable_cancelability();
  866. #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK)
  867. info("Killing ACLK thread -> cloud functionality has been disabled");
  868. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  869. return NULL;
  870. #endif
  871. #ifndef LWS_WITH_SOCKS5
  872. ACLK_PROXY_TYPE proxy_type;
  873. aclk_get_proxy(&proxy_type);
  874. if(proxy_type == PROXY_TYPE_SOCKS5) {
  875. error("Disabling ACLK due to requested SOCKS5 proxy.");
  876. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  877. return NULL;
  878. }
  879. #endif
  880. info("Waiting for netdata to be ready");
  881. while (!netdata_ready) {
  882. sleep_usec(USEC_PER_MS * 300);
  883. }
  884. info("Waiting for Cloud to be enabled");
  885. while (!netdata_cloud_setting) {
  886. sleep_usec(USEC_PER_SEC * 1);
  887. if (netdata_exit) {
  888. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  889. return NULL;
  890. }
  891. }
  892. query_threads.count = MIN(processors/2, 6);
  893. query_threads.count = MAX(query_threads.count, 2);
  894. query_threads.count = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
  895. if(query_threads.count < 1) {
  896. error("You need at least one query thread. Overriding configured setting of \"%d\"", query_threads.count);
  897. query_threads.count = 1;
  898. config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
  899. }
  900. //start localhost popcorning
  901. aclk_start_host_popcorning(localhost);
  902. aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
  903. if (aclk_stats_enabled) {
  904. stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
  905. stats_thread->thread = mallocz(sizeof(netdata_thread_t));
  906. stats_thread->query_thread_count = query_threads.count;
  907. netdata_thread_create(
  908. stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
  909. stats_thread);
  910. }
  911. char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
  912. int port_num = 0;
  913. info("Waiting for netdata to be claimed");
  914. while(1) {
  915. char *agent_id = is_agent_claimed();
  916. while (likely(!agent_id)) {
  917. sleep_usec(USEC_PER_SEC * 1);
  918. if (netdata_exit)
  919. goto exited;
  920. agent_id = is_agent_claimed();
  921. }
  922. freez(agent_id);
  923. // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
  924. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
  925. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  926. if (cloud_base_url == NULL) {
  927. error("Do not move the cloud base url out of post_conf_load!!");
  928. goto exited;
  929. }
  930. if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &port_num))
  931. error("Agent is claimed but the configuration is invalid, please fix");
  932. else if (!create_private_key() && !_mqtt_lib_init())
  933. break;
  934. for (int i=0; i<60; i++) {
  935. if (netdata_exit)
  936. goto exited;
  937. sleep_usec(USEC_PER_SEC * 1);
  938. }
  939. }
  940. usec_t reconnect_expiry = 0; // In usecs
  941. while (!netdata_exit) {
  942. static int first_init = 0;
  943. /* size_t write_q, write_q_bytes, read_q;
  944. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/
  945. if (aclk_disable_runtime && !aclk_connected) {
  946. sleep(1);
  947. continue;
  948. }
  949. if (aclk_kill_link) { // User has reloaded the claiming state
  950. aclk_kill_link = 0;
  951. aclk_graceful_disconnect();
  952. create_private_key();
  953. continue;
  954. }
  955. if (aclk_force_reconnect) {
  956. aclk_lws_wss_destroy_context();
  957. aclk_force_reconnect = 0;
  958. }
  959. if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) {
  960. if (unlikely(!first_init)) {
  961. aclk_try_to_connect(aclk_hostname, port_num);
  962. first_init = 1;
  963. } else {
  964. if (aclk_connecting == 0) {
  965. if (reconnect_expiry == 0) {
  966. unsigned long int delay = aclk_reconnect_delay(1);
  967. reconnect_expiry = now_realtime_usec() + delay * 1000;
  968. info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
  969. }
  970. if (now_realtime_usec() >= reconnect_expiry) {
  971. reconnect_expiry = 0;
  972. aclk_try_to_connect(aclk_hostname, port_num);
  973. }
  974. sleep_usec(USEC_PER_MS * 100);
  975. }
  976. }
  977. if (aclk_connecting) {
  978. _link_event_loop();
  979. sleep_usec(USEC_PER_MS * 100);
  980. }
  981. continue;
  982. }
  983. _link_event_loop();
  984. if (unlikely(!aclk_connected || aclk_force_reconnect))
  985. continue;
  986. /*static int stress_counter = 0;
  987. if (write_q_bytes==0 && stress_counter ++ >5)
  988. {
  989. aclk_send_stress_test(8000000);
  990. stress_counter = 0;
  991. }*/
  992. if (unlikely(!aclk_subscribed)) {
  993. aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
  994. aclk_hello_msg();
  995. }
  996. if (unlikely(!query_threads.thread_list)) {
  997. aclk_query_threads_start(&query_threads);
  998. }
  999. time_t now = now_monotonic_sec();
  1000. if(aclk_connected && last_periodic_query_wakeup < now) {
  1001. // to make `aclk_queue_query()` param `run_after` work
  1002. // also makes per child popcorning work
  1003. last_periodic_query_wakeup = now;
  1004. QUERY_THREAD_WAKEUP;
  1005. }
  1006. } // forever
  1007. exited:
  1008. // Wakeup query thread to cleanup
  1009. QUERY_THREAD_WAKEUP_ALL;
  1010. freez(aclk_username);
  1011. freez(aclk_password);
  1012. freez(aclk_hostname);
  1013. if (aclk_private_key != NULL)
  1014. RSA_free(aclk_private_key);
  1015. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  1016. char *agent_id = is_agent_claimed();
  1017. if (agent_id && aclk_connected) {
  1018. freez(agent_id);
  1019. // Wakeup thread to cleanup
  1020. QUERY_THREAD_WAKEUP;
  1021. aclk_graceful_disconnect();
  1022. }
  1023. aclk_query_threads_cleanup(&query_threads);
  1024. _reset_collector_list();
  1025. freez(collector_list);
  1026. if(aclk_stats_enabled) {
  1027. netdata_thread_join(*stats_thread->thread, NULL);
  1028. aclk_stats_thread_cleanup();
  1029. freez(stats_thread->thread);
  1030. freez(stats_thread);
  1031. }
  1032. /*
  1033. * this must be last -> if all static threads signal
  1034. * THREAD_EXITED rrdengine will dealloc the RRDSETs
  1035. * and RRDDIMs that are used by still runing stat thread.
  1036. * see netdata_cleanup_and_exit() for reference
  1037. */
  1038. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1039. return NULL;
  1040. }
  1041. /*
  1042. * Send a message to the cloud, using a base topic and sib_topic
  1043. * The final topic will be in the form <base_topic>/<sub_topic>
  1044. * If base_topic is missing then the global_base_topic will be used (if available)
  1045. *
  1046. */
  1047. int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id)
  1048. {
  1049. int rc;
  1050. int mid;
  1051. char topic[ACLK_MAX_TOPIC + 1];
  1052. char *final_topic;
  1053. UNUSED(msg_id);
  1054. if (!aclk_connected)
  1055. return 0;
  1056. if (unlikely(!message))
  1057. return 0;
  1058. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  1059. if (unlikely(!final_topic)) {
  1060. errno = 0;
  1061. error("Unable to build outgoing topic; truncated?");
  1062. return 1;
  1063. }
  1064. ACLK_LOCK;
  1065. rc = _link_send_message(final_topic, message, len, &mid);
  1066. // TODO: link the msg_id with the mid so we can trace it
  1067. ACLK_UNLOCK;
  1068. if (unlikely(rc)) {
  1069. errno = 0;
  1070. error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
  1071. }
  1072. return rc;
  1073. }
  1074. int aclk_send_message(char *sub_topic, char *message, char *msg_id)
  1075. {
  1076. return aclk_send_message_bin(sub_topic, message, strlen(message), msg_id);
  1077. }
  1078. /*
  1079. * Subscribe to a topic in the cloud
  1080. * The final subscription will be in the form
  1081. * /agent/claim_id/<sub_topic>
  1082. */
  1083. int aclk_subscribe(char *sub_topic, int qos)
  1084. {
  1085. int rc;
  1086. char topic[ACLK_MAX_TOPIC + 1];
  1087. char *final_topic;
  1088. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  1089. if (unlikely(!final_topic)) {
  1090. errno = 0;
  1091. error("Unable to build outgoing topic; truncated?");
  1092. return 1;
  1093. }
  1094. if (!aclk_connected) {
  1095. error("Cannot subscribe to %s - not connected!", topic);
  1096. return 1;
  1097. }
  1098. ACLK_LOCK;
  1099. rc = _link_subscribe(final_topic, qos);
  1100. ACLK_UNLOCK;
  1101. // TODO: Add better handling -- error will flood the logfile here
  1102. if (unlikely(rc)) {
  1103. errno = 0;
  1104. error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
  1105. }
  1106. return rc;
  1107. }
  1108. // This is called from a callback when the link goes up
  1109. void aclk_connect()
  1110. {
  1111. info("Connection detected (%u queued queries)", aclk_query_size());
  1112. aclk_stats_upd_online(1);
  1113. aclk_connected = 1;
  1114. aclk_reconnect_delay(0);
  1115. QUERY_THREAD_WAKEUP;
  1116. return;
  1117. }
  1118. // This is called from a callback when the link goes down
  1119. void aclk_disconnect()
  1120. {
  1121. if (likely(aclk_connected))
  1122. info("Disconnect detected (%u queued queries)", aclk_query_size());
  1123. aclk_stats_upd_online(0);
  1124. aclk_subscribed = 0;
  1125. rrdhost_aclk_state_lock(localhost);
  1126. localhost->aclk_state.metadata = ACLK_METADATA_REQUIRED;
  1127. rrdhost_aclk_state_unlock(localhost);
  1128. aclk_connected = 0;
  1129. aclk_connecting = 0;
  1130. aclk_force_reconnect = 1;
  1131. }
  1132. inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version)
  1133. {
  1134. uuid_t uuid;
  1135. char uuid_str[36 + 1];
  1136. if (unlikely(!msg_id)) {
  1137. uuid_generate(uuid);
  1138. uuid_unparse(uuid, uuid_str);
  1139. msg_id = uuid_str;
  1140. }
  1141. if (ts_secs == 0) {
  1142. ts_us = now_realtime_usec();
  1143. ts_secs = ts_us / USEC_PER_SEC;
  1144. ts_us = ts_us % USEC_PER_SEC;
  1145. }
  1146. buffer_sprintf(
  1147. dest,
  1148. "{\t\"type\": \"%s\",\n"
  1149. "\t\"msg-id\": \"%s\",\n"
  1150. "\t\"timestamp\": %ld,\n"
  1151. "\t\"timestamp-offset-usec\": %llu,\n"
  1152. "\t\"connect\": %ld,\n"
  1153. "\t\"connect-offset-usec\": %llu,\n"
  1154. "\t\"version\": %d",
  1155. type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, version);
  1156. debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", version, msg_id, type, ts_secs);
  1157. }
  1158. /*
  1159. * This will send alarm information which includes
  1160. * configured alarms
  1161. * alarm_log
  1162. * active alarms
  1163. */
  1164. void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
  1165. void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
  1166. {
  1167. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1168. char *msg_id = create_uuid();
  1169. buffer_flush(local_buffer);
  1170. local_buffer->contenttype = CT_APPLICATION_JSON;
  1171. debug(D_ACLK, "Metadata alarms start");
  1172. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1173. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1174. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1175. // session.
  1176. if (metadata_submitted == ACLK_METADATA_SENT)
  1177. aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
  1178. else
  1179. aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
  1180. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1181. buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
  1182. health_alarms2json(localhost, local_buffer, 1);
  1183. debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);
  1184. // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
  1185. // health_alarm_log2json(localhost, local_buffer, 0);
  1186. // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
  1187. buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
  1188. health_active_log_alarms_2json(localhost, local_buffer);
  1189. //debug(D_ACLK, "Metadata message %s", local_buffer->buffer);
  1190. buffer_sprintf(local_buffer, "\n}\n}");
  1191. aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
  1192. freez(msg_id);
  1193. buffer_free(local_buffer);
  1194. }
  1195. /*
  1196. * This will send the agent metadata
  1197. * /api/v1/info
  1198. * charts
  1199. */
  1200. int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
  1201. {
  1202. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1203. debug(D_ACLK, "Metadata /info start");
  1204. char *msg_id = create_uuid();
  1205. buffer_flush(local_buffer);
  1206. local_buffer->contenttype = CT_APPLICATION_JSON;
  1207. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1208. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1209. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1210. // session.
  1211. if (metadata_submitted == ACLK_METADATA_SENT)
  1212. aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg);
  1213. else
  1214. aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
  1215. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1216. buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
  1217. web_client_api_request_v1_info_fill_buffer(host, local_buffer);
  1218. debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
  1219. buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
  1220. charts2json(host, local_buffer, 1, 0);
  1221. buffer_sprintf(local_buffer, "\n}\n}");
  1222. debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
  1223. aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
  1224. freez(msg_id);
  1225. buffer_free(local_buffer);
  1226. return 0;
  1227. }
  1228. int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
  1229. {
  1230. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1231. local_buffer->contenttype = CT_APPLICATION_JSON;
  1232. if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
  1233. fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg);
  1234. debug(D_ACLK, "Sending Child Disconnect");
  1235. char *msg_id = create_uuid();
  1236. aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg);
  1237. buffer_strcat(local_buffer, ",\"payload\":");
  1238. buffer_sprintf(local_buffer, "{\"guid\":\"%s\",\"claim_id\":", host->machine_guid);
  1239. rrdhost_aclk_state_lock(host);
  1240. if(host->aclk_state.claimed_id)
  1241. buffer_sprintf(local_buffer, "\"%s\"}}", host->aclk_state.claimed_id);
  1242. else
  1243. buffer_strcat(local_buffer, "null}}");
  1244. rrdhost_aclk_state_unlock(host);
  1245. aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
  1246. freez(msg_id);
  1247. buffer_free(local_buffer);
  1248. return 0;
  1249. }
  1250. void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd)
  1251. {
  1252. #if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
  1253. if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
  1254. return;
  1255. #else
  1256. #warning "This check became unnecessary. Remove"
  1257. #endif
  1258. if (unlikely(aclk_host_initializing(localhost)))
  1259. return;
  1260. switch (cmd) {
  1261. case ACLK_CMD_CHILD_CONNECT:
  1262. debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid);
  1263. aclk_start_host_popcorning(host);
  1264. aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
  1265. break;
  1266. case ACLK_CMD_CHILD_DISCONNECT:
  1267. debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid);
  1268. aclk_stop_host_popcorning(host);
  1269. aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
  1270. break;
  1271. default:
  1272. error("Unknown command for aclk_host_state_update %d.", (int)cmd);
  1273. }
  1274. }
  1275. void aclk_send_stress_test(size_t size)
  1276. {
  1277. char *buffer = mallocz(size);
  1278. if (buffer != NULL)
  1279. {
  1280. for(size_t i=0; i<size; i++)
  1281. buffer[i] = 'x';
  1282. buffer[size-1] = 0;
  1283. time_t time_created = now_realtime_sec();
  1284. sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
  1285. buffer[strlen(buffer)] = '"';
  1286. buffer[size-2] = '}';
  1287. buffer[size-3] = '"';
  1288. aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
  1289. error("Sending stress of size %zu at time %ld", size, time_created);
  1290. }
  1291. free(buffer);
  1292. }
  1293. // Send info metadata message to the cloud if the link is established
  1294. // or on request
  1295. int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host)
  1296. {
  1297. aclk_send_info_metadata(state, host);
  1298. if(host == localhost)
  1299. aclk_send_alarm_metadata(state);
  1300. return 0;
  1301. }
  1302. void aclk_single_update_disable()
  1303. {
  1304. aclk_disable_single_updates = 1;
  1305. }
  1306. void aclk_single_update_enable()
  1307. {
  1308. aclk_disable_single_updates = 0;
  1309. }
  1310. // Trigged by a health reload, sends the alarm metadata
  1311. void aclk_alarm_reload()
  1312. {
  1313. if (unlikely(aclk_host_initializing(localhost)))
  1314. return;
  1315. if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
  1316. if (likely(aclk_connected)) {
  1317. errno = 0;
  1318. error("ACLK failed to queue on_connect command on alarm reload");
  1319. }
  1320. }
  1321. }
  1322. //rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
  1323. int aclk_send_single_chart(RRDHOST *host, char *chart)
  1324. {
  1325. RRDSET *st = rrdset_find(host, chart);
  1326. if (!st)
  1327. st = rrdset_find_byname(host, chart);
  1328. if (!st) {
  1329. info("FAILED to find chart %s", chart);
  1330. return 1;
  1331. }
  1332. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1333. char *msg_id = create_uuid();
  1334. buffer_flush(local_buffer);
  1335. local_buffer->contenttype = CT_APPLICATION_JSON;
  1336. aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg);
  1337. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1338. rrdset2json(st, local_buffer, NULL, NULL, 1);
  1339. buffer_sprintf(local_buffer, "\t\n}");
  1340. aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);
  1341. freez(msg_id);
  1342. buffer_free(local_buffer);
  1343. return 0;
  1344. }
  1345. int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
  1346. {
  1347. #ifndef ENABLE_ACLK
  1348. UNUSED(host);
  1349. UNUSED(chart_name);
  1350. return 0;
  1351. #else
  1352. if (unlikely(!netdata_ready))
  1353. return 0;
  1354. if (!netdata_cloud_setting)
  1355. return 0;
  1356. if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
  1357. return 0;
  1358. if (aclk_host_initializing(localhost))
  1359. return 0;
  1360. if (unlikely(aclk_disable_single_updates))
  1361. return 0;
  1362. if (aclk_popcorn_check_bump(host))
  1363. return 0;
  1364. if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) {
  1365. if (likely(aclk_connected)) {
  1366. errno = 0;
  1367. error("ACLK failed to queue chart_update command");
  1368. }
  1369. }
  1370. return 0;
  1371. #endif
  1372. }
  1373. int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
  1374. {
  1375. BUFFER *local_buffer = NULL;
  1376. if (unlikely(!netdata_ready))
  1377. return 0;
  1378. if (host != localhost)
  1379. return 0;
  1380. if(unlikely(aclk_host_initializing(localhost)))
  1381. return 0;
  1382. /*
  1383. * Check if individual updates have been disabled
  1384. * This will be the case when we do health reload
  1385. * and all the alarms will be dropped and recreated.
  1386. * At the end of the health reload the complete alarm metadata
  1387. * info will be sent
  1388. */
  1389. if (unlikely(aclk_disable_single_updates))
  1390. return 0;
  1391. local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1392. char *msg_id = create_uuid();
  1393. buffer_flush(local_buffer);
  1394. aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg);
  1395. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1396. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  1397. health_alarm_entry2json_nolock(local_buffer, ae, host);
  1398. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  1399. buffer_sprintf(local_buffer, "\n}");
  1400. if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
  1401. if (likely(aclk_connected)) {
  1402. errno = 0;
  1403. error("ACLK failed to queue alarm_command on alarm_update");
  1404. }
  1405. }
  1406. freez(msg_id);
  1407. buffer_free(local_buffer);
  1408. return 0;
  1409. }