agent_cloud_link.c 60 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "libnetdata/libnetdata.h"
  3. #include "agent_cloud_link.h"
  4. #include "aclk_lws_https_client.h"
  5. #include "aclk_common.h"
  6. #include "aclk_stats.h"
  7. int aclk_shutting_down = 0;
  8. // State-machine for the on-connect metadata transmission.
  9. // TODO: The AGENT_STATE should be centralized as it would be useful to control error-logging during the initial
  10. // agent startup phase.
  11. static ACLK_METADATA_STATE aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  12. static AGENT_STATE agent_state = AGENT_INITIALIZING;
  13. // Other global state
  14. static int aclk_subscribed = 0;
  15. static int aclk_disable_single_updates = 0;
  16. static time_t last_init_sequence = 0;
  17. static int waiting_init = 1;
  18. static char *aclk_username = NULL;
  19. static char *aclk_password = NULL;
  20. static char *global_base_topic = NULL;
  21. static int aclk_connecting = 0;
  22. int aclk_connected = 0; // Exposed in the web-api
  23. int aclk_force_reconnect = 0; // Indication from lower layers
  24. int aclk_kill_link = 0; // Tell the agent to tear down the link
  25. usec_t aclk_session_us = 0; // Used by the mqtt layer
  26. time_t aclk_session_sec = 0; // Used by the mqtt layer
  27. static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
  28. static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
  29. static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
  30. #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
  31. #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
  32. #define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
  33. #define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
  34. #define QUERY_LOCK netdata_mutex_lock(&query_mutex)
  35. #define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
  36. pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
  37. pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
  38. #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
  39. #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
  40. #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
  41. void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
  42. void aclk_lws_wss_destroy_context();
  43. /*
  44. * Maintain a list of collectors and chart count
  45. * If all the charts of a collector are deleted
  46. * then a new metadata dataset must be send to the cloud
  47. *
  48. */
  49. struct _collector {
  50. time_t created;
  51. uint32_t count; //chart count
  52. uint32_t hostname_hash;
  53. uint32_t plugin_hash;
  54. uint32_t module_hash;
  55. char *hostname;
  56. char *plugin_name;
  57. char *module_name;
  58. struct _collector *next;
  59. };
  60. struct _collector *collector_list = NULL;
  61. struct aclk_query {
  62. time_t created;
  63. time_t run_after; // Delay run until after this time
  64. ACLK_CMD cmd; // What command is this
  65. char *topic; // Topic to respond to
  66. char *data; // Internal data (NULL if request from the cloud)
  67. char *msg_id; // msg_id generated by the cloud (NULL if internal)
  68. char *query; // The actual query
  69. u_char deleted; // Mark deleted for garbage collect
  70. struct aclk_query *next;
  71. };
  72. struct aclk_query_queue {
  73. struct aclk_query *aclk_query_head;
  74. struct aclk_query *aclk_query_tail;
  75. uint64_t count;
  76. } aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
  77. char *create_uuid()
  78. {
  79. uuid_t uuid;
  80. char *uuid_str = mallocz(36 + 1);
  81. uuid_generate(uuid);
  82. uuid_unparse(uuid, uuid_str);
  83. return uuid_str;
  84. }
  85. int cloud_to_agent_parse(JSON_ENTRY *e)
  86. {
  87. struct aclk_request *data = e->callback_data;
  88. switch (e->type) {
  89. case JSON_OBJECT:
  90. case JSON_ARRAY:
  91. break;
  92. case JSON_STRING:
  93. if (!strcmp(e->name, "msg-id")) {
  94. data->msg_id = strdupz(e->data.string);
  95. break;
  96. }
  97. if (!strcmp(e->name, "type")) {
  98. data->type_id = strdupz(e->data.string);
  99. break;
  100. }
  101. if (!strcmp(e->name, "callback-topic")) {
  102. data->callback_topic = strdupz(e->data.string);
  103. break;
  104. }
  105. if (!strcmp(e->name, "payload")) {
  106. if (likely(e->data.string)) {
  107. size_t len = strlen(e->data.string);
  108. data->payload = mallocz(len+1);
  109. if (!url_decode_r(data->payload, e->data.string, len + 1))
  110. strcpy(data->payload, e->data.string);
  111. }
  112. break;
  113. }
  114. break;
  115. case JSON_NUMBER:
  116. if (!strcmp(e->name, "version")) {
  117. data->version = atoi(e->original_string);
  118. break;
  119. }
  120. break;
  121. case JSON_BOOLEAN:
  122. break;
  123. case JSON_NULL:
  124. break;
  125. }
  126. return 0;
  127. }
  128. static RSA *aclk_private_key = NULL;
  129. static int create_private_key()
  130. {
  131. if (aclk_private_key != NULL)
  132. RSA_free(aclk_private_key);
  133. aclk_private_key = NULL;
  134. char filename[FILENAME_MAX + 1];
  135. snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
  136. long bytes_read;
  137. char *private_key = read_by_filename(filename, &bytes_read);
  138. if (!private_key) {
  139. error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  140. return 1;
  141. }
  142. debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  143. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  144. if (key_bio==NULL) {
  145. error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  146. goto biofailed;
  147. }
  148. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  149. BIO_free(key_bio);
  150. if (aclk_private_key!=NULL)
  151. {
  152. freez(private_key);
  153. return 0;
  154. }
  155. char err[512];
  156. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  157. error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  158. biofailed:
  159. freez(private_key);
  160. return 1;
  161. }
  162. /*
  163. * After a connection failure -- delay in milliseconds
  164. * When a connection is established, the delay function
  165. * should be called with
  166. *
  167. * mode 0 to reset the delay
  168. * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
  169. *
  170. */
  171. unsigned long int aclk_reconnect_delay(int mode)
  172. {
  173. static int fail = -1;
  174. unsigned long int delay;
  175. if (!mode || fail == -1) {
  176. srandom(time(NULL));
  177. fail = mode - 1;
  178. return 0;
  179. }
  180. delay = (1 << fail);
  181. if (delay >= ACLK_MAX_BACKOFF_DELAY) {
  182. delay = ACLK_MAX_BACKOFF_DELAY * 1000;
  183. } else {
  184. fail++;
  185. delay = (delay * 1000) + (random() % 1000);
  186. }
  187. return delay;
  188. }
  189. /*
  190. * Free a query structure when done
  191. */
  192. void aclk_query_free(struct aclk_query *this_query)
  193. {
  194. if (unlikely(!this_query))
  195. return;
  196. freez(this_query->topic);
  197. if (likely(this_query->query))
  198. freez(this_query->query);
  199. if (likely(this_query->data))
  200. freez(this_query->data);
  201. if (likely(this_query->msg_id))
  202. freez(this_query->msg_id);
  203. freez(this_query);
  204. }
  205. // Returns the entry after which we need to create a new entry to run at the specified time
  206. // If NULL is returned we need to add to HEAD
  207. // Need to have a QUERY lock before calling this
  208. struct aclk_query *aclk_query_find_position(time_t time_to_run)
  209. {
  210. struct aclk_query *tmp_query, *last_query;
  211. // Quick check if we will add to the end
  212. if (likely(aclk_queue.aclk_query_tail)) {
  213. if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
  214. return aclk_queue.aclk_query_tail;
  215. }
  216. last_query = NULL;
  217. tmp_query = aclk_queue.aclk_query_head;
  218. while (tmp_query) {
  219. if (tmp_query->run_after > time_to_run)
  220. return last_query;
  221. last_query = tmp_query;
  222. tmp_query = tmp_query->next;
  223. }
  224. return last_query;
  225. }
  226. // Need to have a QUERY lock before calling this
  227. struct aclk_query *
  228. aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
  229. {
  230. struct aclk_query *tmp_query, *prev_query;
  231. UNUSED(cmd);
  232. tmp_query = aclk_queue.aclk_query_head;
  233. prev_query = NULL;
  234. while (tmp_query) {
  235. if (likely(!tmp_query->deleted)) {
  236. if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
  237. if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
  238. (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
  239. if (likely(last_query))
  240. *last_query = prev_query;
  241. return tmp_query;
  242. }
  243. }
  244. }
  245. prev_query = tmp_query;
  246. tmp_query = tmp_query->next;
  247. }
  248. return NULL;
  249. }
  250. /*
  251. * Add a query to execute, the result will be send to the specified topic
  252. */
  253. int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
  254. {
  255. struct aclk_query *new_query, *tmp_query;
  256. // Ignore all commands while we wait for the agent to initialize
  257. if (unlikely(waiting_init))
  258. return 1;
  259. run_after = now_realtime_sec() + run_after;
  260. QUERY_LOCK;
  261. struct aclk_query *last_query = NULL;
  262. tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
  263. if (unlikely(tmp_query)) {
  264. if (tmp_query->run_after == run_after) {
  265. QUERY_UNLOCK;
  266. QUERY_THREAD_WAKEUP;
  267. return 0;
  268. }
  269. if (last_query)
  270. last_query->next = tmp_query->next;
  271. else
  272. aclk_queue.aclk_query_head = tmp_query->next;
  273. debug(D_ACLK, "Removing double entry");
  274. aclk_query_free(tmp_query);
  275. aclk_queue.count--;
  276. }
  277. if (aclk_stats_enabled) {
  278. ACLK_STATS_LOCK;
  279. aclk_metrics_per_sample.queries_queued++;
  280. ACLK_STATS_UNLOCK;
  281. }
  282. new_query = callocz(1, sizeof(struct aclk_query));
  283. new_query->cmd = aclk_cmd;
  284. if (internal) {
  285. new_query->topic = strdupz(topic);
  286. if (likely(query))
  287. new_query->query = strdupz(query);
  288. } else {
  289. new_query->topic = topic;
  290. new_query->query = query;
  291. new_query->msg_id = msg_id;
  292. }
  293. if (data)
  294. new_query->data = strdupz(data);
  295. new_query->next = NULL;
  296. new_query->created = now_realtime_sec();
  297. new_query->run_after = run_after;
  298. debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
  299. tmp_query = aclk_query_find_position(run_after);
  300. if (tmp_query) {
  301. new_query->next = tmp_query->next;
  302. tmp_query->next = new_query;
  303. if (tmp_query == aclk_queue.aclk_query_tail)
  304. aclk_queue.aclk_query_tail = new_query;
  305. aclk_queue.count++;
  306. QUERY_UNLOCK;
  307. QUERY_THREAD_WAKEUP;
  308. return 0;
  309. }
  310. new_query->next = aclk_queue.aclk_query_head;
  311. aclk_queue.aclk_query_head = new_query;
  312. aclk_queue.count++;
  313. QUERY_UNLOCK;
  314. QUERY_THREAD_WAKEUP;
  315. return 0;
  316. }
  317. inline int aclk_submit_request(struct aclk_request *request)
  318. {
  319. return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD);
  320. }
  321. /*
  322. * Get the next query to process - NULL if nothing there
  323. * The caller needs to free memory by calling aclk_query_free()
  324. *
  325. * topic
  326. * query
  327. * The structure itself
  328. *
  329. */
  330. struct aclk_query *aclk_queue_pop()
  331. {
  332. struct aclk_query *this_query;
  333. QUERY_LOCK;
  334. if (likely(!aclk_queue.aclk_query_head)) {
  335. QUERY_UNLOCK;
  336. return NULL;
  337. }
  338. this_query = aclk_queue.aclk_query_head;
  339. // Get rid of the deleted entries
  340. while (this_query && this_query->deleted) {
  341. aclk_queue.count--;
  342. aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
  343. if (likely(!aclk_queue.aclk_query_head)) {
  344. aclk_queue.aclk_query_tail = NULL;
  345. }
  346. aclk_query_free(this_query);
  347. this_query = aclk_queue.aclk_query_head;
  348. }
  349. if (likely(!this_query)) {
  350. QUERY_UNLOCK;
  351. return NULL;
  352. }
  353. if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
  354. info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
  355. QUERY_UNLOCK;
  356. return NULL;
  357. }
  358. aclk_queue.count--;
  359. aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
  360. if (likely(!aclk_queue.aclk_query_head)) {
  361. aclk_queue.aclk_query_tail = NULL;
  362. }
  363. QUERY_UNLOCK;
  364. return this_query;
  365. }
  366. // This will give the base topic that the agent will publish messages.
  367. // subtopics will be sent under the base topic e.g. base_topic/subtopic
  368. // This is called during the connection, we delete any previous topic
  369. // in-case the user has changed the agent id and reclaimed.
  370. char *create_publish_base_topic()
  371. {
  372. char *agent_id = is_agent_claimed();
  373. if (unlikely(!agent_id))
  374. return NULL;
  375. ACLK_LOCK;
  376. if (global_base_topic)
  377. freez(global_base_topic);
  378. char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
  379. snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id);
  380. tmp = strchr(tmp_topic, '\n');
  381. if (unlikely(tmp))
  382. *tmp = '\0';
  383. global_base_topic = strdupz(tmp_topic);
  384. ACLK_UNLOCK;
  385. freez(agent_id);
  386. return global_base_topic;
  387. }
  388. /*
  389. * Build a topic based on sub_topic and final_topic
  390. * if the sub topic starts with / assume that is an absolute topic
  391. *
  392. */
  393. char *get_topic(char *sub_topic, char *final_topic, int max_size)
  394. {
  395. int rc;
  396. if (likely(sub_topic && sub_topic[0] == '/'))
  397. return sub_topic;
  398. if (unlikely(!global_base_topic))
  399. return sub_topic;
  400. rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
  401. if (unlikely(rc >= max_size))
  402. debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);
  403. return final_topic;
  404. }
  405. /*
  406. * Free a collector structure
  407. */
  408. static void _free_collector(struct _collector *collector)
  409. {
  410. if (likely(collector->plugin_name))
  411. freez(collector->plugin_name);
  412. if (likely(collector->module_name))
  413. freez(collector->module_name);
  414. if (likely(collector->hostname))
  415. freez(collector->hostname);
  416. freez(collector);
  417. }
  418. /*
  419. * This will report the collector list
  420. *
  421. */
  422. #ifdef ACLK_DEBUG
  423. static void _dump_collector_list()
  424. {
  425. struct _collector *tmp_collector;
  426. COLLECTOR_LOCK;
  427. info("DUMPING ALL COLLECTORS");
  428. if (unlikely(!collector_list || !collector_list->next)) {
  429. COLLECTOR_UNLOCK;
  430. info("DUMPING ALL COLLECTORS -- nothing found");
  431. return;
  432. }
  433. // Note that the first entry is "dummy"
  434. tmp_collector = collector_list->next;
  435. while (tmp_collector) {
  436. info(
  437. "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
  438. tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
  439. tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
  440. tmp_collector = tmp_collector->next;
  441. }
  442. info("DUMPING ALL COLLECTORS DONE");
  443. COLLECTOR_UNLOCK;
  444. }
  445. #endif
  446. /*
  447. * This will cleanup the collector list
  448. *
  449. */
  450. static void _reset_collector_list()
  451. {
  452. struct _collector *tmp_collector, *next_collector;
  453. COLLECTOR_LOCK;
  454. if (unlikely(!collector_list || !collector_list->next)) {
  455. COLLECTOR_UNLOCK;
  456. return;
  457. }
  458. // Note that the first entry is "dummy"
  459. tmp_collector = collector_list->next;
  460. collector_list->count = 0;
  461. collector_list->next = NULL;
  462. // We broke the link; we can unlock
  463. COLLECTOR_UNLOCK;
  464. while (tmp_collector) {
  465. next_collector = tmp_collector->next;
  466. _free_collector(tmp_collector);
  467. tmp_collector = next_collector;
  468. }
  469. }
  470. /*
  471. * Find a collector (if it exists)
  472. * Must lock before calling this
  473. * If last_collector is not null, it will return the previous collector in the linked
  474. * list (used in collector delete)
  475. */
  476. static struct _collector *_find_collector(
  477. const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
  478. {
  479. struct _collector *tmp_collector, *prev_collector;
  480. uint32_t plugin_hash;
  481. uint32_t module_hash;
  482. uint32_t hostname_hash;
  483. if (unlikely(!collector_list)) {
  484. collector_list = callocz(1, sizeof(struct _collector));
  485. return NULL;
  486. }
  487. if (unlikely(!collector_list->next))
  488. return NULL;
  489. plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  490. module_hash = module_name ? simple_hash(module_name) : 1;
  491. hostname_hash = simple_hash(hostname);
  492. // Note that the first entry is "dummy"
  493. tmp_collector = collector_list->next;
  494. prev_collector = collector_list;
  495. while (tmp_collector) {
  496. if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
  497. hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
  498. (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
  499. (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
  500. if (unlikely(last_collector))
  501. *last_collector = prev_collector;
  502. return tmp_collector;
  503. }
  504. prev_collector = tmp_collector;
  505. tmp_collector = tmp_collector->next;
  506. }
  507. return tmp_collector;
  508. }
  509. /*
  510. * Called to delete a collector
  511. * It will reduce the count (chart_count) and will remove it
  512. * from the linked list if the count reaches zero
  513. * The structure will be returned to the caller to free
  514. * the resources
  515. *
  516. */
  517. static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
  518. {
  519. struct _collector *tmp_collector, *prev_collector = NULL;
  520. tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
  521. if (likely(tmp_collector)) {
  522. --tmp_collector->count;
  523. if (unlikely(!tmp_collector->count))
  524. prev_collector->next = tmp_collector->next;
  525. }
  526. return tmp_collector;
  527. }
  528. /*
  529. * Add a new collector (plugin / module) to the list
  530. * If it already exists just update the chart count
  531. *
  532. * Lock before calling
  533. */
  534. static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
  535. {
  536. struct _collector *tmp_collector;
  537. tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
  538. if (unlikely(!tmp_collector)) {
  539. tmp_collector = callocz(1, sizeof(struct _collector));
  540. tmp_collector->hostname_hash = simple_hash(hostname);
  541. tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  542. tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
  543. tmp_collector->hostname = strdupz(hostname);
  544. tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
  545. tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
  546. tmp_collector->next = collector_list->next;
  547. collector_list->next = tmp_collector;
  548. }
  549. tmp_collector->count++;
  550. debug(
  551. D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
  552. module_name ? module_name : "*", tmp_collector->count);
  553. return tmp_collector;
  554. }
  555. /*
  556. * Add a new collector to the list
  557. * If it exists, update the chart count
  558. */
  559. void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
  560. {
  561. struct _collector *tmp_collector;
  562. if (unlikely(!netdata_ready)) {
  563. return;
  564. }
  565. COLLECTOR_LOCK;
  566. tmp_collector = _add_collector(hostname, plugin_name, module_name);
  567. if (unlikely(tmp_collector->count != 1)) {
  568. COLLECTOR_UNLOCK;
  569. return;
  570. }
  571. if (unlikely(agent_state == AGENT_INITIALIZING))
  572. last_init_sequence = now_realtime_sec();
  573. else {
  574. if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  575. debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
  576. }
  577. COLLECTOR_UNLOCK;
  578. }
  579. /*
  580. * Delete a collector from the list
  581. * If the chart count reaches zero the collector will be removed
  582. * from the list by calling del_collector.
  583. *
  584. * This function will release the memory used and schedule
  585. * a cloud update
  586. */
  587. void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
  588. {
  589. struct _collector *tmp_collector;
  590. if (unlikely(!netdata_ready)) {
  591. return;
  592. }
  593. COLLECTOR_LOCK;
  594. tmp_collector = _del_collector(hostname, plugin_name, module_name);
  595. if (unlikely(!tmp_collector || tmp_collector->count)) {
  596. COLLECTOR_UNLOCK;
  597. return;
  598. }
  599. debug(
  600. D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
  601. tmp_collector->count);
  602. COLLECTOR_UNLOCK;
  603. if (unlikely(agent_state == AGENT_INITIALIZING))
  604. last_init_sequence = now_realtime_sec();
  605. else {
  606. if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  607. debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
  608. }
  609. _free_collector(tmp_collector);
  610. }
  611. /*
  612. * Take a buffer, encode it and rewrite it
  613. *
  614. */
  615. static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines)
  616. {
  617. char *tmp_buffer = mallocz(content_size * 2);
  618. char *dst = tmp_buffer;
  619. while (content_size > 0) {
  620. switch (*src) {
  621. case '\n':
  622. if (keep_newlines)
  623. {
  624. *dst++ = '\\';
  625. *dst++ = 'n';
  626. }
  627. break;
  628. case '\t':
  629. break;
  630. case 0x01 ... 0x08:
  631. case 0x0b ... 0x1F:
  632. *dst++ = '\\';
  633. *dst++ = 'u';
  634. *dst++ = '0';
  635. *dst++ = '0';
  636. *dst++ = (*src < 0x0F) ? '0' : '1';
  637. *dst++ = to_hex(*src);
  638. break;
  639. case '\"':
  640. *dst++ = '\\';
  641. *dst++ = *src;
  642. break;
  643. default:
  644. *dst++ = *src;
  645. }
  646. src++;
  647. content_size--;
  648. }
  649. *dst = '\0';
  650. return tmp_buffer;
  651. }
  652. int aclk_execute_query(struct aclk_query *this_query)
  653. {
  654. if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
  655. struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
  656. w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  657. w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  658. w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  659. strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
  660. w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
  661. w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
  662. w->acl = 0x1f;
  663. char *mysep = strchr(this_query->query, '?');
  664. if (mysep) {
  665. strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
  666. *mysep = '\0';
  667. } else
  668. strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
  669. mysep = strrchr(this_query->query, '/');
  670. // TODO: handle bad response perhaps in a different way. For now it does to the payload
  671. w->response.code = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
  672. now_realtime_timeval(&w->tv_ready);
  673. w->response.data->date = w->tv_ready.tv_sec;
  674. web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
  675. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  676. buffer_flush(local_buffer);
  677. local_buffer->contenttype = CT_APPLICATION_JSON;
  678. aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0);
  679. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  680. char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
  681. char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
  682. buffer_sprintf(
  683. local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}",
  684. w->response.code, encoded_response, encoded_header);
  685. buffer_sprintf(local_buffer, "\n}");
  686. debug(D_ACLK, "Response:%s", encoded_header);
  687. aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
  688. buffer_free(w->response.data);
  689. buffer_free(w->response.header);
  690. buffer_free(w->response.header_output);
  691. freez(w);
  692. buffer_free(local_buffer);
  693. freez(encoded_response);
  694. freez(encoded_header);
  695. return 0;
  696. }
  697. return 1;
  698. }
  699. /*
  700. * This function will fetch the next pending command and process it
  701. *
  702. */
  703. int aclk_process_query()
  704. {
  705. struct aclk_query *this_query;
  706. static long int query_count = 0;
  707. if (!aclk_connected)
  708. return 0;
  709. this_query = aclk_queue_pop();
  710. if (likely(!this_query)) {
  711. return 0;
  712. }
  713. if (unlikely(this_query->deleted)) {
  714. debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
  715. aclk_query_free(this_query);
  716. return 1;
  717. }
  718. query_count++;
  719. debug(
  720. D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic,
  721. this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created));
  722. switch (this_query->cmd) {
  723. case ACLK_CMD_ONCONNECT:
  724. debug(D_ACLK, "EXECUTING on connect metadata command");
  725. aclk_send_metadata();
  726. aclk_metadata_submitted = ACLK_METADATA_SENT;
  727. break;
  728. case ACLK_CMD_CHART:
  729. debug(D_ACLK, "EXECUTING a chart update command");
  730. aclk_send_single_chart(this_query->data, this_query->query);
  731. break;
  732. case ACLK_CMD_CHARTDEL:
  733. debug(D_ACLK, "EXECUTING a chart delete command");
  734. //TODO: This send the info metadata for now
  735. aclk_send_info_metadata();
  736. break;
  737. case ACLK_CMD_ALARM:
  738. debug(D_ACLK, "EXECUTING an alarm update command");
  739. aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
  740. break;
  741. case ACLK_CMD_CLOUD:
  742. debug(D_ACLK, "EXECUTING a cloud command");
  743. aclk_execute_query(this_query);
  744. break;
  745. default:
  746. break;
  747. }
  748. debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
  749. aclk_query_free(this_query);
  750. if (aclk_stats_enabled) {
  751. ACLK_STATS_LOCK;
  752. aclk_metrics_per_sample.queries_dispatched++;
  753. ACLK_STATS_UNLOCK;
  754. }
  755. return 1;
  756. }
  757. /*
  758. * Process all pending queries
  759. * Return 0 if no queries were processed, 1 otherwise
  760. *
  761. */
  762. int aclk_process_queries()
  763. {
  764. if (unlikely(netdata_exit || !aclk_connected))
  765. return 0;
  766. if (likely(!aclk_queue.count))
  767. return 0;
  768. debug(D_ACLK, "Processing %d queries", (int)aclk_queue.count);
  769. //TODO: may consider possible throttling here
  770. while (aclk_process_query()) {
  771. // Process all commands
  772. };
  773. return 1;
  774. }
  775. static void aclk_query_thread_cleanup(void *ptr)
  776. {
  777. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  778. info("cleaning up...");
  779. _reset_collector_list();
  780. freez(collector_list);
  781. // Clean memory for pending queries if any
  782. struct aclk_query *this_query;
  783. do {
  784. this_query = aclk_queue_pop();
  785. aclk_query_free(this_query);
  786. } while (this_query);
  787. freez(static_thread->thread);
  788. freez(static_thread);
  789. }
  790. /**
  791. * Main query processing thread
  792. *
  793. * On startup wait for the agent collectors to initialize
  794. * Expect at least a time of ACLK_STABLE_TIMEOUT seconds
  795. * of no new collectors coming in in order to mark the agent
  796. * as stable (set agent_state = AGENT_STABLE)
  797. */
  798. void *aclk_query_main_thread(void *ptr)
  799. {
  800. netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
  801. while (agent_state == AGENT_INITIALIZING && !netdata_exit) {
  802. time_t checkpoint;
  803. checkpoint = now_realtime_sec() - last_init_sequence;
  804. if (checkpoint > ACLK_STABLE_TIMEOUT) {
  805. agent_state = AGENT_STABLE;
  806. info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
  807. #ifdef ACLK_DEBUG
  808. _dump_collector_list();
  809. #endif
  810. break;
  811. }
  812. info("Waiting for agent collectors to initialize. Last activity was %ld seconds ago" , checkpoint);
  813. sleep_usec(USEC_PER_SEC * 1);
  814. }
  815. while (!netdata_exit) {
  816. if (unlikely(!aclk_metadata_submitted)) {
  817. aclk_metadata_submitted = ACLK_METADATA_CMD_QUEUED;
  818. if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
  819. errno = 0;
  820. error("ACLK failed to queue on_connect command");
  821. aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  822. }
  823. }
  824. aclk_process_queries();
  825. QUERY_THREAD_LOCK;
  826. // TODO: Need to check if there are queries awaiting already
  827. if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
  828. sleep_usec(USEC_PER_SEC * 1);
  829. QUERY_THREAD_UNLOCK;
  830. } // forever
  831. info("Shutting down query processing thread");
  832. netdata_thread_cleanup_pop(1);
  833. return NULL;
  834. }
  835. static void aclk_graceful_disconnect()
  836. {
  837. size_t write_q, write_q_bytes, read_q;
  838. time_t event_loop_timeout;
  839. // Send a graceful disconnect message
  840. BUFFER *b = buffer_create(512);
  841. aclk_create_header(b, "disconnect", NULL, 0, 0);
  842. buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
  843. aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
  844. buffer_free(b);
  845. event_loop_timeout = now_realtime_sec() + 5;
  846. write_q = 1;
  847. while (write_q && event_loop_timeout > now_realtime_sec()) {
  848. _link_event_loop();
  849. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  850. }
  851. aclk_shutting_down = 1;
  852. _link_shutdown();
  853. aclk_lws_wss_mqtt_layer_disconect_notif();
  854. write_q = 1;
  855. event_loop_timeout = now_realtime_sec() + 5;
  856. while (write_q && event_loop_timeout > now_realtime_sec()) {
  857. _link_event_loop();
  858. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  859. }
  860. aclk_shutting_down = 0;
  861. }
  862. // Thread cleanup
  863. static void aclk_main_cleanup(void *ptr)
  864. {
  865. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  866. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  867. info("cleaning up...");
  868. char *agent_id = is_agent_claimed();
  869. if (agent_id && aclk_connected) {
  870. freez(agent_id);
  871. // Wakeup thread to cleanup
  872. QUERY_THREAD_WAKEUP;
  873. aclk_graceful_disconnect();
  874. }
  875. }
  876. struct dictionary_singleton {
  877. char *key;
  878. char *result;
  879. };
  880. int json_extract_singleton(JSON_ENTRY *e)
  881. {
  882. struct dictionary_singleton *data = e->callback_data;
  883. switch (e->type) {
  884. case JSON_OBJECT:
  885. case JSON_ARRAY:
  886. break;
  887. case JSON_STRING:
  888. if (!strcmp(e->name, data->key)) {
  889. data->result = strdupz(e->data.string);
  890. break;
  891. }
  892. break;
  893. case JSON_NUMBER:
  894. case JSON_BOOLEAN:
  895. case JSON_NULL:
  896. break;
  897. }
  898. return 0;
  899. }
  900. // Base-64 decoder.
  901. // Note: This is non-validating, invalid input will be decoded without an error.
  902. // Challenges are packed into json strings so we don't skip newlines.
  903. // Size errors (i.e. invalid input size or insufficient output space) are caught.
  904. size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
  905. {
  906. static char lookup[256];
  907. static int first_time=1;
  908. if (first_time)
  909. {
  910. first_time = 0;
  911. for(int i=0; i<256; i++)
  912. lookup[i] = -1;
  913. for(int i='A'; i<='Z'; i++)
  914. lookup[i] = i-'A';
  915. for(int i='a'; i<='z'; i++)
  916. lookup[i] = i-'a' + 26;
  917. for(int i='0'; i<='9'; i++)
  918. lookup[i] = i-'0' + 52;
  919. lookup['+'] = 62;
  920. lookup['/'] = 63;
  921. }
  922. if ((input_size & 3) != 0)
  923. {
  924. error("Can't decode base-64 input length %zu", input_size);
  925. return 0;
  926. }
  927. size_t unpadded_size = (input_size/4) * 3;
  928. if ( unpadded_size > output_size )
  929. {
  930. error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
  931. return 0;
  932. }
  933. // Don't check padding within full quantums
  934. for (size_t i = 0 ; i < input_size-4 ; i+=4 )
  935. {
  936. uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
  937. output[0] = value >> 16;
  938. output[1] = value >> 8;
  939. output[2] = value;
  940. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  941. output += 3;
  942. input += 4;
  943. }
  944. // Handle padding only in last quantum
  945. if (input[2] == '=') {
  946. uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
  947. output[0] = value >> 4;
  948. //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
  949. return unpadded_size-2;
  950. }
  951. else if (input[3] == '=') {
  952. uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
  953. output[0] = value >> 10;
  954. output[1] = value >> 2;
  955. //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
  956. return unpadded_size-1;
  957. }
  958. else
  959. {
  960. uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
  961. output[0] = value >> 16;
  962. output[1] = value >> 8;
  963. output[2] = value;
  964. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  965. return unpadded_size;
  966. }
  967. }
  968. size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
  969. {
  970. uint32_t value;
  971. static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
  972. "abcdefghijklmnopqrstuvwxyz"
  973. "0123456789+/";
  974. if ((input_size/3+1)*4 >= output_size)
  975. {
  976. error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
  977. return 0;
  978. }
  979. size_t count = 0;
  980. while (input_size>3)
  981. {
  982. value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
  983. output[0] = lookup[value >> 18];
  984. output[1] = lookup[(value >> 12) & 0x3f];
  985. output[2] = lookup[(value >> 6) & 0x3f];
  986. output[3] = lookup[value & 0x3f];
  987. //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  988. output += 4;
  989. input += 3;
  990. input_size -= 3;
  991. count += 4;
  992. }
  993. switch (input_size)
  994. {
  995. case 2:
  996. value = (input[0] << 10) + (input[1] << 2);
  997. output[0] = lookup[(value >> 12) & 0x3f];
  998. output[1] = lookup[(value >> 6) & 0x3f];
  999. output[2] = lookup[value & 0x3f];
  1000. output[3] = '=';
  1001. //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
  1002. count += 4;
  1003. break;
  1004. case 1:
  1005. value = input[0] << 4;
  1006. output[0] = lookup[(value >> 6) & 0x3f];
  1007. output[1] = lookup[value & 0x3f];
  1008. output[2] = '=';
  1009. output[3] = '=';
  1010. //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  1011. count += 4;
  1012. break;
  1013. case 0:
  1014. break;
  1015. }
  1016. return count;
  1017. }
  1018. int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted)
  1019. {
  1020. int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING);
  1021. if (result == -1) {
  1022. char err[512];
  1023. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  1024. error("Decryption of the challenge failed: %s", err);
  1025. }
  1026. return result;
  1027. }
  1028. char *extract_payload(BUFFER *b)
  1029. {
  1030. char *s = b->buffer;
  1031. unsigned int line_len=0;
  1032. for (size_t i=0; i<b->len; i++)
  1033. {
  1034. if (*s == 0 )
  1035. return NULL;
  1036. if (*s == '\n' ) {
  1037. if (line_len==0)
  1038. return s+1;
  1039. line_len = 0;
  1040. }
  1041. else if (*s == '\r') {
  1042. /* don't count */
  1043. }
  1044. else
  1045. line_len ++;
  1046. s++;
  1047. }
  1048. return NULL;
  1049. }
  1050. void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
  1051. {
  1052. char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1053. debug(D_ACLK, "Performing challenge-response sequence");
  1054. if (aclk_password != NULL)
  1055. {
  1056. freez(aclk_password);
  1057. aclk_password = NULL;
  1058. }
  1059. // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
  1060. // TODO - target host?
  1061. char *agent_id = is_agent_claimed();
  1062. if (agent_id == NULL)
  1063. {
  1064. error("Agent was not claimed - cannot perform challenge/response");
  1065. goto CLEANUP;
  1066. }
  1067. char url[1024];
  1068. sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
  1069. info("Retrieving challenge from cloud: %s %s %s", aclk_hostname, aclk_port, url);
  1070. if(aclk_send_https_request("GET", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
  1071. {
  1072. error("Challenge failed: %s", data_buffer);
  1073. goto CLEANUP;
  1074. }
  1075. struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
  1076. debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
  1077. if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
  1078. {
  1079. freez(challenge.result);
  1080. error("Could not parse the json response with the challenge: %s", data_buffer);
  1081. goto CLEANUP;
  1082. }
  1083. if (challenge.result == NULL ) {
  1084. error("Could not retrieve challenge from auth response: %s", data_buffer);
  1085. goto CLEANUP;
  1086. }
  1087. size_t challenge_len = strlen(challenge.result);
  1088. unsigned char decoded[512];
  1089. size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
  1090. unsigned char plaintext[4096]={};
  1091. int decrypted_length = private_decrypt(decoded, decoded_len, plaintext);
  1092. freez(challenge.result);
  1093. char encoded[512];
  1094. size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
  1095. encoded[encoded_len] = 0;
  1096. debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
  1097. char response_json[4096]={};
  1098. sprintf(response_json, "{\"response\":\"%s\"}", encoded);
  1099. debug(D_ACLK, "Password phase: %s",response_json);
  1100. // TODO - host
  1101. sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
  1102. if(aclk_send_https_request("POST", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
  1103. {
  1104. error("Challenge-response failed: %s", data_buffer);
  1105. goto CLEANUP;
  1106. }
  1107. debug(D_ACLK, "Password response from cloud: %s", data_buffer);
  1108. struct dictionary_singleton password = { .key = "password", .result = NULL };
  1109. if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
  1110. {
  1111. freez(password.result);
  1112. error("Could not parse the json response with the password: %s", data_buffer);
  1113. goto CLEANUP;
  1114. }
  1115. if (password.result == NULL ) {
  1116. error("Could not retrieve password from auth response");
  1117. goto CLEANUP;
  1118. }
  1119. if (aclk_password != NULL )
  1120. freez(aclk_password);
  1121. aclk_password = password.result;
  1122. if (aclk_username != NULL)
  1123. freez(aclk_username);
  1124. aclk_username = agent_id;
  1125. agent_id = NULL;
  1126. CLEANUP:
  1127. if (agent_id != NULL)
  1128. freez(agent_id);
  1129. freez(data_buffer);
  1130. return;
  1131. }
  1132. static void aclk_try_to_connect(char *hostname, char *port, int port_num)
  1133. {
  1134. if (!aclk_private_key) {
  1135. error("Cannot try to establish the agent cloud link - no private key available!");
  1136. return;
  1137. }
  1138. info("Attempting to establish the agent cloud link");
  1139. aclk_get_challenge(hostname, port);
  1140. if (aclk_password == NULL)
  1141. return;
  1142. int rc;
  1143. aclk_connecting = 1;
  1144. create_publish_base_topic();
  1145. rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password);
  1146. if (unlikely(rc)) {
  1147. error("Failed to initialize the agent cloud link library");
  1148. }
  1149. }
  1150. /**
  1151. * Main agent cloud link thread
  1152. *
  1153. * This thread will simply call the main event loop that handles
  1154. * pending requests - both inbound and outbound
  1155. *
  1156. * @param ptr is a pointer to the netdata_static_thread structure.
  1157. *
  1158. * @return It always returns NULL
  1159. */
  1160. void *aclk_main(void *ptr)
  1161. {
  1162. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  1163. struct netdata_static_thread *query_thread;
  1164. struct netdata_static_thread *stats_thread = NULL;
  1165. // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
  1166. // as it must notify the far end that it shutdown gracefully and avoid the LWT.
  1167. netdata_thread_disable_cancelability();
  1168. #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK)
  1169. info("Killing ACLK thread -> cloud functionality has been disabled");
  1170. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1171. return NULL;
  1172. #endif
  1173. info("Waiting for netdata to be ready");
  1174. while (!netdata_ready) {
  1175. sleep_usec(USEC_PER_MS * 300);
  1176. }
  1177. info("Waiting for Cloud to be enabled");
  1178. while (!netdata_cloud_setting) {
  1179. sleep_usec(USEC_PER_SEC * 1);
  1180. if (netdata_exit) {
  1181. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1182. return NULL;
  1183. }
  1184. }
  1185. aclk_stats_enabled = appconfig_get_boolean(&cloud_config, CONFIG_SECTION_GLOBAL, "statistics", CONFIG_BOOLEAN_YES);
  1186. if (aclk_stats_enabled) {
  1187. stats_thread = callocz(1, sizeof(struct netdata_static_thread));
  1188. stats_thread->thread = mallocz(sizeof(netdata_thread_t));
  1189. netdata_thread_create(
  1190. stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
  1191. stats_thread);
  1192. }
  1193. last_init_sequence = now_realtime_sec();
  1194. query_thread = NULL;
  1195. char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
  1196. char *aclk_port = NULL;
  1197. uint32_t port_num = 0;
  1198. info("Waiting for netdata to be claimed");
  1199. while(1) {
  1200. char *agent_id = is_agent_claimed();
  1201. while (likely(!agent_id)) {
  1202. sleep_usec(USEC_PER_SEC * 1);
  1203. if (netdata_exit)
  1204. goto exited;
  1205. agent_id = is_agent_claimed();
  1206. }
  1207. freez(agent_id);
  1208. // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
  1209. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
  1210. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  1211. if (cloud_base_url == NULL) {
  1212. error("Do not move the cloud base url out of post_conf_load!!");
  1213. goto exited;
  1214. }
  1215. if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
  1216. error("Agent is claimed but the configuration is invalid, please fix");
  1217. }
  1218. else
  1219. {
  1220. port_num = atoi(aclk_port); // SSL library uses the string, MQTT uses the numeric value
  1221. if (!create_private_key() && !_mqtt_lib_init())
  1222. break;
  1223. }
  1224. for (int i=0; i<60; i++) {
  1225. if (netdata_exit)
  1226. goto exited;
  1227. sleep_usec(USEC_PER_SEC * 1);
  1228. }
  1229. }
  1230. usec_t reconnect_expiry = 0; // In usecs
  1231. while (!netdata_exit) {
  1232. static int first_init = 0;
  1233. /* size_t write_q, write_q_bytes, read_q;
  1234. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/
  1235. if (aclk_kill_link) { // User has reloaded the claiming state
  1236. aclk_kill_link = 0;
  1237. aclk_graceful_disconnect();
  1238. create_private_key();
  1239. continue;
  1240. }
  1241. if (aclk_force_reconnect) {
  1242. aclk_lws_wss_destroy_context();
  1243. aclk_force_reconnect = 0;
  1244. }
  1245. if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) {
  1246. if (unlikely(!first_init)) {
  1247. aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
  1248. first_init = 1;
  1249. } else {
  1250. if (aclk_connecting == 0) {
  1251. if (reconnect_expiry == 0) {
  1252. unsigned long int delay = aclk_reconnect_delay(1);
  1253. reconnect_expiry = now_realtime_usec() + delay * 1000;
  1254. info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
  1255. }
  1256. if (now_realtime_usec() >= reconnect_expiry) {
  1257. reconnect_expiry = 0;
  1258. aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
  1259. }
  1260. sleep_usec(USEC_PER_MS * 100);
  1261. }
  1262. }
  1263. if (aclk_connecting) {
  1264. _link_event_loop();
  1265. sleep_usec(USEC_PER_MS * 100);
  1266. }
  1267. continue;
  1268. }
  1269. _link_event_loop();
  1270. if (unlikely(!aclk_connected || aclk_force_reconnect))
  1271. continue;
  1272. /*static int stress_counter = 0;
  1273. if (write_q_bytes==0 && stress_counter ++ >5)
  1274. {
  1275. aclk_send_stress_test(8000000);
  1276. stress_counter = 0;
  1277. }*/
  1278. // TODO: Move to on-connect
  1279. if (unlikely(!aclk_subscribed)) {
  1280. aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
  1281. }
  1282. if (unlikely(!query_thread)) {
  1283. query_thread = callocz(1, sizeof(struct netdata_static_thread));
  1284. query_thread->thread = mallocz(sizeof(netdata_thread_t));
  1285. netdata_thread_create(
  1286. query_thread->thread, ACLK_THREAD_NAME, NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread,
  1287. query_thread);
  1288. }
  1289. } // forever
  1290. exited:
  1291. // Wakeup query thread to cleanup
  1292. QUERY_THREAD_WAKEUP;
  1293. freez(aclk_username);
  1294. freez(aclk_password);
  1295. freez(aclk_hostname);
  1296. freez(aclk_port);
  1297. if (aclk_private_key != NULL)
  1298. RSA_free(aclk_private_key);
  1299. aclk_main_cleanup(ptr);
  1300. if(aclk_stats_enabled) {
  1301. netdata_thread_join(*stats_thread->thread, NULL);
  1302. freez(stats_thread->thread);
  1303. freez(stats_thread);
  1304. }
  1305. /*
  1306. * this must be last -> if all static threads signal
  1307. * THREAD_EXITED rrdengine will dealloc the RRDSETs
  1308. * and RRDDIMs that are used by still runing stat thread.
  1309. * see netdata_cleanup_and_exit() for reference
  1310. */
  1311. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1312. return NULL;
  1313. }
  1314. /*
  1315. * Send a message to the cloud, using a base topic and sib_topic
  1316. * The final topic will be in the form <base_topic>/<sub_topic>
  1317. * If base_topic is missing then the global_base_topic will be used (if available)
  1318. *
  1319. */
  1320. int aclk_send_message(char *sub_topic, char *message, char *msg_id)
  1321. {
  1322. int rc;
  1323. int mid;
  1324. char topic[ACLK_MAX_TOPIC + 1];
  1325. char *final_topic;
  1326. UNUSED(msg_id);
  1327. if (!aclk_connected)
  1328. return 0;
  1329. if (unlikely(!message))
  1330. return 0;
  1331. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  1332. if (unlikely(!final_topic)) {
  1333. errno = 0;
  1334. error("Unable to build outgoing topic; truncated?");
  1335. return 1;
  1336. }
  1337. ACLK_LOCK;
  1338. rc = _link_send_message(final_topic, (unsigned char *)message, &mid);
  1339. // TODO: link the msg_id with the mid so we can trace it
  1340. ACLK_UNLOCK;
  1341. if (unlikely(rc)) {
  1342. errno = 0;
  1343. error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
  1344. }
  1345. return rc;
  1346. }
  1347. /*
  1348. * Subscribe to a topic in the cloud
  1349. * The final subscription will be in the form
  1350. * /agent/claim_id/<sub_topic>
  1351. */
  1352. int aclk_subscribe(char *sub_topic, int qos)
  1353. {
  1354. int rc;
  1355. char topic[ACLK_MAX_TOPIC + 1];
  1356. char *final_topic;
  1357. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  1358. if (unlikely(!final_topic)) {
  1359. errno = 0;
  1360. error("Unable to build outgoing topic; truncated?");
  1361. return 1;
  1362. }
  1363. if (!aclk_connected) {
  1364. error("Cannot subscribe to %s - not connected!", topic);
  1365. return 1;
  1366. }
  1367. ACLK_LOCK;
  1368. rc = _link_subscribe(final_topic, qos);
  1369. ACLK_UNLOCK;
  1370. // TODO: Add better handling -- error will flood the logfile here
  1371. if (unlikely(rc)) {
  1372. errno = 0;
  1373. error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
  1374. }
  1375. return rc;
  1376. }
  1377. // This is called from a callback when the link goes up
  1378. void aclk_connect()
  1379. {
  1380. info("Connection detected (%"PRIu64" queued queries)", aclk_queue.count);
  1381. aclk_stats_upd_online(1);
  1382. aclk_connected = 1;
  1383. waiting_init = 0;
  1384. aclk_reconnect_delay(0);
  1385. QUERY_THREAD_WAKEUP;
  1386. return;
  1387. }
  1388. // This is called from a callback when the link goes down
  1389. void aclk_disconnect()
  1390. {
  1391. if (likely(aclk_connected))
  1392. info("Disconnect detected (%"PRIu64" queued queries)", aclk_queue.count);
  1393. aclk_stats_upd_online(0);
  1394. aclk_subscribed = 0;
  1395. aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  1396. waiting_init = 1;
  1397. aclk_connected = 0;
  1398. aclk_connecting = 0;
  1399. aclk_force_reconnect = 1;
  1400. }
  1401. inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us)
  1402. {
  1403. uuid_t uuid;
  1404. char uuid_str[36 + 1];
  1405. if (unlikely(!msg_id)) {
  1406. uuid_generate(uuid);
  1407. uuid_unparse(uuid, uuid_str);
  1408. msg_id = uuid_str;
  1409. }
  1410. if (ts_secs == 0) {
  1411. ts_us = now_realtime_usec();
  1412. ts_secs = ts_us / USEC_PER_SEC;
  1413. ts_us = ts_us % USEC_PER_SEC;
  1414. }
  1415. buffer_sprintf(
  1416. dest,
  1417. "\t{\"type\": \"%s\",\n"
  1418. "\t\"msg-id\": \"%s\",\n"
  1419. "\t\"timestamp\": %ld,\n"
  1420. "\t\"timestamp-offset-usec\": %llu,\n"
  1421. "\t\"connect\": %ld,\n"
  1422. "\t\"connect-offset-usec\": %llu,\n"
  1423. "\t\"version\": %d",
  1424. type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION);
  1425. debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs);
  1426. }
  1427. /*
  1428. * This will send alarm information which includes
  1429. * configured alarms
  1430. * alarm_log
  1431. * active alarms
  1432. */
  1433. void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
  1434. void aclk_send_alarm_metadata()
  1435. {
  1436. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1437. char *msg_id = create_uuid();
  1438. buffer_flush(local_buffer);
  1439. local_buffer->contenttype = CT_APPLICATION_JSON;
  1440. debug(D_ACLK, "Metadata alarms start");
  1441. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1442. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1443. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1444. // session.
  1445. if (aclk_metadata_submitted == ACLK_METADATA_SENT)
  1446. aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0);
  1447. else
  1448. aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us);
  1449. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1450. buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
  1451. health_alarms2json(localhost, local_buffer, 1);
  1452. debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);
  1453. // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
  1454. // health_alarm_log2json(localhost, local_buffer, 0);
  1455. // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
  1456. buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
  1457. health_active_log_alarms_2json(localhost, local_buffer);
  1458. //debug(D_ACLK, "Metadata message %s", local_buffer->buffer);
  1459. buffer_sprintf(local_buffer, "\n}\n}");
  1460. aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
  1461. freez(msg_id);
  1462. buffer_free(local_buffer);
  1463. }
  1464. /*
  1465. * This will send the agent metadata
  1466. * /api/v1/info
  1467. * charts
  1468. */
  1469. int aclk_send_info_metadata()
  1470. {
  1471. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1472. debug(D_ACLK, "Metadata /info start");
  1473. char *msg_id = create_uuid();
  1474. buffer_flush(local_buffer);
  1475. local_buffer->contenttype = CT_APPLICATION_JSON;
  1476. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1477. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1478. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1479. // session.
  1480. if (aclk_metadata_submitted == ACLK_METADATA_SENT)
  1481. aclk_create_header(local_buffer, "update", msg_id, 0, 0);
  1482. else
  1483. aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us);
  1484. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1485. buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
  1486. web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
  1487. debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
  1488. buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
  1489. charts2json(localhost, local_buffer, 1, 0);
  1490. buffer_sprintf(local_buffer, "\n}\n}");
  1491. debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
  1492. aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
  1493. freez(msg_id);
  1494. buffer_free(local_buffer);
  1495. return 0;
  1496. }
  1497. void aclk_send_stress_test(size_t size)
  1498. {
  1499. char *buffer = mallocz(size);
  1500. if (buffer != NULL)
  1501. {
  1502. for(size_t i=0; i<size; i++)
  1503. buffer[i] = 'x';
  1504. buffer[size-1] = 0;
  1505. time_t time_created = now_realtime_sec();
  1506. sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
  1507. buffer[strlen(buffer)] = '"';
  1508. buffer[size-2] = '}';
  1509. buffer[size-3] = '"';
  1510. aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
  1511. error("Sending stress of size %zu at time %ld", size, time_created);
  1512. }
  1513. free(buffer);
  1514. }
  1515. // Send info metadata message to the cloud if the link is established
  1516. // or on request
  1517. int aclk_send_metadata()
  1518. {
  1519. aclk_send_info_metadata();
  1520. aclk_send_alarm_metadata();
  1521. return 0;
  1522. }
  1523. void aclk_single_update_disable()
  1524. {
  1525. aclk_disable_single_updates = 1;
  1526. }
  1527. void aclk_single_update_enable()
  1528. {
  1529. aclk_disable_single_updates = 0;
  1530. }
  1531. // Trigged by a health reload, sends the alarm metadata
  1532. void aclk_alarm_reload()
  1533. {
  1534. if (unlikely(agent_state == AGENT_INITIALIZING))
  1535. return;
  1536. if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
  1537. if (likely(aclk_connected)) {
  1538. errno = 0;
  1539. error("ACLK failed to queue on_connect command on alarm reload");
  1540. }
  1541. }
  1542. }
  1543. //rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
  1544. int aclk_send_single_chart(char *hostname, char *chart)
  1545. {
  1546. RRDHOST *target_host;
  1547. target_host = rrdhost_find_by_hostname(hostname, 0);
  1548. if (!target_host)
  1549. return 1;
  1550. RRDSET *st = rrdset_find(target_host, chart);
  1551. if (!st)
  1552. st = rrdset_find_byname(target_host, chart);
  1553. if (!st) {
  1554. info("FAILED to find chart %s", chart);
  1555. return 1;
  1556. }
  1557. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1558. char *msg_id = create_uuid();
  1559. buffer_flush(local_buffer);
  1560. local_buffer->contenttype = CT_APPLICATION_JSON;
  1561. aclk_create_header(local_buffer, "chart", msg_id, 0, 0);
  1562. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1563. rrdset2json(st, local_buffer, NULL, NULL, 1);
  1564. buffer_sprintf(local_buffer, "\t\n}");
  1565. aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);
  1566. freez(msg_id);
  1567. buffer_free(local_buffer);
  1568. return 0;
  1569. }
  1570. int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
  1571. {
  1572. #ifndef ENABLE_ACLK
  1573. UNUSED(host);
  1574. UNUSED(chart_name);
  1575. return 0;
  1576. #else
  1577. if (unlikely(!netdata_ready))
  1578. return 0;
  1579. if (!netdata_cloud_setting)
  1580. return 0;
  1581. if (host != localhost)
  1582. return 0;
  1583. if (unlikely(aclk_disable_single_updates))
  1584. return 0;
  1585. if (unlikely(agent_state == AGENT_INITIALIZING))
  1586. last_init_sequence = now_realtime_sec();
  1587. else {
  1588. if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) {
  1589. if (likely(aclk_connected)) {
  1590. errno = 0;
  1591. error("ACLK failed to queue chart_update command");
  1592. }
  1593. }
  1594. }
  1595. return 0;
  1596. #endif
  1597. }
  1598. int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
  1599. {
  1600. BUFFER *local_buffer = NULL;
  1601. if (unlikely(!netdata_ready))
  1602. return 0;
  1603. if (host != localhost)
  1604. return 0;
  1605. if (unlikely(agent_state == AGENT_INITIALIZING))
  1606. return 0;
  1607. /*
  1608. * Check if individual updates have been disabled
  1609. * This will be the case when we do health reload
  1610. * and all the alarms will be dropped and recreated.
  1611. * At the end of the health reload the complete alarm metadata
  1612. * info will be sent
  1613. */
  1614. if (unlikely(aclk_disable_single_updates))
  1615. return 0;
  1616. local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1617. char *msg_id = create_uuid();
  1618. buffer_flush(local_buffer);
  1619. aclk_create_header(local_buffer, "status-change", msg_id, 0, 0);
  1620. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1621. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  1622. health_alarm_entry2json_nolock(local_buffer, ae, host);
  1623. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  1624. buffer_sprintf(local_buffer, "\n}");
  1625. if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
  1626. if (likely(aclk_connected)) {
  1627. errno = 0;
  1628. error("ACLK failed to queue alarm_command on alarm_update");
  1629. }
  1630. }
  1631. freez(msg_id);
  1632. buffer_free(local_buffer);
  1633. return 0;
  1634. }
  1635. /*
  1636. * Parse the incoming payload and queue a command if valid
  1637. */
  1638. int aclk_handle_cloud_request(char *payload)
  1639. {
  1640. struct aclk_request cloud_to_agent = {
  1641. .type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0
  1642. };
  1643. if (aclk_stats_enabled) {
  1644. ACLK_STATS_LOCK;
  1645. aclk_metrics_per_sample.cloud_req_recvd++;
  1646. ACLK_STATS_UNLOCK;
  1647. }
  1648. if (unlikely(agent_state == AGENT_INITIALIZING)) {
  1649. debug(D_ACLK, "Ignoring cloud request; agent not in stable state");
  1650. return 0;
  1651. }
  1652. if (unlikely(!payload)) {
  1653. debug(D_ACLK, "ACLK incoming message is empty");
  1654. return 0;
  1655. }
  1656. debug(D_ACLK, "ACLK incoming message (%s)", payload);
  1657. int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
  1658. if (unlikely(
  1659. JSON_OK != rc || !cloud_to_agent.payload || !cloud_to_agent.callback_topic || !cloud_to_agent.msg_id ||
  1660. !cloud_to_agent.type_id || cloud_to_agent.version > ACLK_VERSION ||
  1661. strcmp(cloud_to_agent.type_id, "http"))) {
  1662. if (JSON_OK != rc)
  1663. error("Malformed json request (%s)", payload);
  1664. if (cloud_to_agent.version > ACLK_VERSION)
  1665. error("Unsupported version in JSON request %d", cloud_to_agent.version);
  1666. if (cloud_to_agent.payload)
  1667. freez(cloud_to_agent.payload);
  1668. if (cloud_to_agent.type_id)
  1669. freez(cloud_to_agent.type_id);
  1670. if (cloud_to_agent.msg_id)
  1671. freez(cloud_to_agent.msg_id);
  1672. if (cloud_to_agent.callback_topic)
  1673. freez(cloud_to_agent.callback_topic);
  1674. if (aclk_stats_enabled) {
  1675. ACLK_STATS_LOCK;
  1676. aclk_metrics_per_sample.cloud_req_err++;
  1677. ACLK_STATS_UNLOCK;
  1678. }
  1679. return 1;
  1680. }
  1681. // Checked to be "http", not needed anymore
  1682. if (likely(cloud_to_agent.type_id)) {
  1683. freez(cloud_to_agent.type_id);
  1684. cloud_to_agent.type_id = NULL;
  1685. }
  1686. if (unlikely(aclk_submit_request(&cloud_to_agent)))
  1687. debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);
  1688. // Note: the payload comes from the callback and it will be automatically freed
  1689. return 0;
  1690. }