agent_cloud_link.c 56 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "libnetdata/libnetdata.h"
  3. #include "agent_cloud_link.h"
  4. #include "aclk_lws_https_client.h"
  5. #include "aclk_common.h"
  6. int aclk_shutting_down = 0;
  7. // State-machine for the on-connect metadata transmission.
  8. // TODO: The AGENT_STATE should be centralized as it would be useful to control error-logging during the initial
  9. // agent startup phase.
  10. static ACLK_METADATA_STATE aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  11. static AGENT_STATE agent_state = AGENT_INITIALIZING;
  12. // Other global state
  13. static int aclk_subscribed = 0;
  14. static int aclk_disable_single_updates = 0;
  15. static time_t last_init_sequence = 0;
  16. static int waiting_init = 1;
  17. static char *aclk_username = NULL;
  18. static char *aclk_password = NULL;
  19. static char *global_base_topic = NULL;
  20. static int aclk_connecting = 0;
  21. int aclk_connected = 0; // Exposed in the web-api
  22. usec_t aclk_session_us = 0; // Used by the mqtt layer
  23. time_t aclk_session_sec = 0; // Used by the mqtt layer
  24. static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
  25. static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
  26. static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
  27. #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
  28. #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
  29. #define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
  30. #define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
  31. #define QUERY_LOCK netdata_mutex_lock(&query_mutex)
  32. #define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
  33. pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
  34. pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
  35. #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
  36. #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
  37. #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
  38. void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
  39. /*
  40. * Maintain a list of collectors and chart count
  41. * If all the charts of a collector are deleted
  42. * then a new metadata dataset must be send to the cloud
  43. *
  44. */
  45. struct _collector {
  46. time_t created;
  47. uint32_t count; //chart count
  48. uint32_t hostname_hash;
  49. uint32_t plugin_hash;
  50. uint32_t module_hash;
  51. char *hostname;
  52. char *plugin_name;
  53. char *module_name;
  54. struct _collector *next;
  55. };
  56. struct _collector *collector_list = NULL;
  57. struct aclk_query {
  58. time_t created;
  59. time_t run_after; // Delay run until after this time
  60. ACLK_CMD cmd; // What command is this
  61. char *topic; // Topic to respond to
  62. char *data; // Internal data (NULL if request from the cloud)
  63. char *msg_id; // msg_id generated by the cloud (NULL if internal)
  64. char *query; // The actual query
  65. u_char deleted; // Mark deleted for garbage collect
  66. struct aclk_query *next;
  67. };
  68. struct aclk_query_queue {
  69. struct aclk_query *aclk_query_head;
  70. struct aclk_query *aclk_query_tail;
  71. uint64_t count;
  72. } aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
  73. char *create_uuid()
  74. {
  75. uuid_t uuid;
  76. char *uuid_str = mallocz(36 + 1);
  77. uuid_generate(uuid);
  78. uuid_unparse(uuid, uuid_str);
  79. return uuid_str;
  80. }
  81. int cloud_to_agent_parse(JSON_ENTRY *e)
  82. {
  83. struct aclk_request *data = e->callback_data;
  84. switch (e->type) {
  85. case JSON_OBJECT:
  86. case JSON_ARRAY:
  87. break;
  88. case JSON_STRING:
  89. if (!strcmp(e->name, "msg-id")) {
  90. data->msg_id = strdupz(e->data.string);
  91. break;
  92. }
  93. if (!strcmp(e->name, "type")) {
  94. data->type_id = strdupz(e->data.string);
  95. break;
  96. }
  97. if (!strcmp(e->name, "callback-topic")) {
  98. data->callback_topic = strdupz(e->data.string);
  99. break;
  100. }
  101. if (!strcmp(e->name, "payload")) {
  102. if (likely(e->data.string)) {
  103. size_t len = strlen(e->data.string);
  104. data->payload = mallocz(len+1);
  105. if (!url_decode_r(data->payload, e->data.string, len + 1))
  106. strcpy(data->payload, e->data.string);
  107. }
  108. break;
  109. }
  110. break;
  111. case JSON_NUMBER:
  112. if (!strcmp(e->name, "version")) {
  113. data->version = atoi(e->original_string);
  114. break;
  115. }
  116. break;
  117. case JSON_BOOLEAN:
  118. break;
  119. case JSON_NULL:
  120. break;
  121. }
  122. return 0;
  123. }
  124. static RSA *aclk_private_key = NULL;
  125. static int create_private_key()
  126. {
  127. char filename[FILENAME_MAX + 1];
  128. snprintfz(filename, FILENAME_MAX, "%s/claim.d/private.pem", netdata_configured_user_config_dir);
  129. long bytes_read;
  130. char *private_key = read_by_filename(filename, &bytes_read);
  131. if (!private_key) {
  132. error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  133. return 1;
  134. }
  135. debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  136. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  137. if (key_bio==NULL) {
  138. error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  139. goto biofailed;
  140. }
  141. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  142. BIO_free(key_bio);
  143. if (aclk_private_key!=NULL)
  144. {
  145. freez(private_key);
  146. return 0;
  147. }
  148. char err[512];
  149. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  150. error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  151. biofailed:
  152. freez(private_key);
  153. return 1;
  154. }
  155. /*
  156. * After a connection failure -- delay in milliseconds
  157. * When a connection is established, the delay function
  158. * should be called with
  159. *
  160. * mode 0 to reset the delay
  161. * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
  162. *
  163. */
  164. unsigned long int aclk_reconnect_delay(int mode)
  165. {
  166. static int fail = -1;
  167. unsigned long int delay;
  168. if (!mode || fail == -1) {
  169. srandom(time(NULL));
  170. fail = mode - 1;
  171. return 0;
  172. }
  173. delay = (1 << fail);
  174. if (delay >= ACLK_MAX_BACKOFF_DELAY) {
  175. delay = ACLK_MAX_BACKOFF_DELAY * 1000;
  176. } else {
  177. fail++;
  178. delay = (delay * 1000) + (random() % 1000);
  179. }
  180. return delay;
  181. }
  182. /*
  183. * Free a query structure when done
  184. */
  185. void aclk_query_free(struct aclk_query *this_query)
  186. {
  187. if (unlikely(!this_query))
  188. return;
  189. freez(this_query->topic);
  190. if (likely(this_query->query))
  191. freez(this_query->query);
  192. if (likely(this_query->data))
  193. freez(this_query->data);
  194. if (likely(this_query->msg_id))
  195. freez(this_query->msg_id);
  196. freez(this_query);
  197. }
  198. // Returns the entry after which we need to create a new entry to run at the specified time
  199. // If NULL is returned we need to add to HEAD
  200. // Need to have a QUERY lock before calling this
  201. struct aclk_query *aclk_query_find_position(time_t time_to_run)
  202. {
  203. struct aclk_query *tmp_query, *last_query;
  204. // Quick check if we will add to the end
  205. if (likely(aclk_queue.aclk_query_tail)) {
  206. if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
  207. return aclk_queue.aclk_query_tail;
  208. }
  209. last_query = NULL;
  210. tmp_query = aclk_queue.aclk_query_head;
  211. while (tmp_query) {
  212. if (tmp_query->run_after > time_to_run)
  213. return last_query;
  214. last_query = tmp_query;
  215. tmp_query = tmp_query->next;
  216. }
  217. return last_query;
  218. }
  219. // Need to have a QUERY lock before calling this
  220. struct aclk_query *
  221. aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
  222. {
  223. struct aclk_query *tmp_query, *prev_query;
  224. UNUSED(cmd);
  225. tmp_query = aclk_queue.aclk_query_head;
  226. prev_query = NULL;
  227. while (tmp_query) {
  228. if (likely(!tmp_query->deleted)) {
  229. if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
  230. if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
  231. (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
  232. if (likely(last_query))
  233. *last_query = prev_query;
  234. return tmp_query;
  235. }
  236. }
  237. }
  238. prev_query = tmp_query;
  239. tmp_query = tmp_query->next;
  240. }
  241. return NULL;
  242. }
  243. /*
  244. * Add a query to execute, the result will be send to the specified topic
  245. */
  246. int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
  247. {
  248. struct aclk_query *new_query, *tmp_query;
  249. // Ignore all commands while we wait for the agent to initialize
  250. if (unlikely(waiting_init))
  251. return 1;
  252. run_after = now_realtime_sec() + run_after;
  253. QUERY_LOCK;
  254. struct aclk_query *last_query = NULL;
  255. tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
  256. if (unlikely(tmp_query)) {
  257. if (tmp_query->run_after == run_after) {
  258. QUERY_UNLOCK;
  259. QUERY_THREAD_WAKEUP;
  260. return 0;
  261. }
  262. if (last_query)
  263. last_query->next = tmp_query->next;
  264. else
  265. aclk_queue.aclk_query_head = tmp_query->next;
  266. debug(D_ACLK, "Removing double entry");
  267. aclk_query_free(tmp_query);
  268. aclk_queue.count--;
  269. }
  270. new_query = callocz(1, sizeof(struct aclk_query));
  271. new_query->cmd = aclk_cmd;
  272. if (internal) {
  273. new_query->topic = strdupz(topic);
  274. if (likely(query))
  275. new_query->query = strdupz(query);
  276. } else {
  277. new_query->topic = topic;
  278. new_query->query = query;
  279. new_query->msg_id = msg_id;
  280. }
  281. if (data)
  282. new_query->data = strdupz(data);
  283. new_query->next = NULL;
  284. new_query->created = now_realtime_sec();
  285. new_query->run_after = run_after;
  286. debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
  287. tmp_query = aclk_query_find_position(run_after);
  288. if (tmp_query) {
  289. new_query->next = tmp_query->next;
  290. tmp_query->next = new_query;
  291. if (tmp_query == aclk_queue.aclk_query_tail)
  292. aclk_queue.aclk_query_tail = new_query;
  293. aclk_queue.count++;
  294. QUERY_UNLOCK;
  295. QUERY_THREAD_WAKEUP;
  296. return 0;
  297. }
  298. new_query->next = aclk_queue.aclk_query_head;
  299. aclk_queue.aclk_query_head = new_query;
  300. aclk_queue.count++;
  301. QUERY_UNLOCK;
  302. QUERY_THREAD_WAKEUP;
  303. return 0;
  304. }
  305. inline int aclk_submit_request(struct aclk_request *request)
  306. {
  307. return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD);
  308. }
  309. /*
  310. * Get the next query to process - NULL if nothing there
  311. * The caller needs to free memory by calling aclk_query_free()
  312. *
  313. * topic
  314. * query
  315. * The structure itself
  316. *
  317. */
  318. struct aclk_query *aclk_queue_pop()
  319. {
  320. struct aclk_query *this_query;
  321. QUERY_LOCK;
  322. if (likely(!aclk_queue.aclk_query_head)) {
  323. QUERY_UNLOCK;
  324. return NULL;
  325. }
  326. this_query = aclk_queue.aclk_query_head;
  327. // Get rid of the deleted entries
  328. while (this_query && this_query->deleted) {
  329. aclk_queue.count--;
  330. aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
  331. if (likely(!aclk_queue.aclk_query_head)) {
  332. aclk_queue.aclk_query_tail = NULL;
  333. }
  334. aclk_query_free(this_query);
  335. this_query = aclk_queue.aclk_query_head;
  336. }
  337. if (likely(!this_query)) {
  338. QUERY_UNLOCK;
  339. return NULL;
  340. }
  341. if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
  342. info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
  343. QUERY_UNLOCK;
  344. return NULL;
  345. }
  346. aclk_queue.count--;
  347. aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
  348. if (likely(!aclk_queue.aclk_query_head)) {
  349. aclk_queue.aclk_query_tail = NULL;
  350. }
  351. QUERY_UNLOCK;
  352. return this_query;
  353. }
  354. // This will give the base topic that the agent will publish messages.
  355. // subtopics will be sent under the base topic e.g. base_topic/subtopic
  356. // This is called by aclk_init(), to compute the base topic once and have
  357. // it stored internally.
  358. // Need to check if additional logic should be added to make sure that there
  359. // is enough information to determine the base topic at init time
  360. char *create_publish_base_topic()
  361. {
  362. if (unlikely(!is_agent_claimed()))
  363. return NULL;
  364. ACLK_LOCK;
  365. if (unlikely(!global_base_topic)) {
  366. char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
  367. snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
  368. tmp = strchr(tmp_topic, '\n');
  369. if (unlikely(tmp))
  370. *tmp = '\0';
  371. global_base_topic = strdupz(tmp_topic);
  372. }
  373. ACLK_UNLOCK;
  374. return global_base_topic;
  375. }
  376. /*
  377. * Build a topic based on sub_topic and final_topic
  378. * if the sub topic starts with / assume that is an absolute topic
  379. *
  380. */
  381. char *get_topic(char *sub_topic, char *final_topic, int max_size)
  382. {
  383. int rc;
  384. if (likely(sub_topic && sub_topic[0] == '/'))
  385. return sub_topic;
  386. if (unlikely(!global_base_topic))
  387. return sub_topic;
  388. rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
  389. if (unlikely(rc >= max_size))
  390. debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);
  391. return final_topic;
  392. }
  393. /*
  394. * Free a collector structure
  395. */
  396. static void _free_collector(struct _collector *collector)
  397. {
  398. if (likely(collector->plugin_name))
  399. freez(collector->plugin_name);
  400. if (likely(collector->module_name))
  401. freez(collector->module_name);
  402. if (likely(collector->hostname))
  403. freez(collector->hostname);
  404. freez(collector);
  405. }
  406. /*
  407. * This will report the collector list
  408. *
  409. */
  410. #ifdef ACLK_DEBUG
  411. static void _dump_collector_list()
  412. {
  413. struct _collector *tmp_collector;
  414. COLLECTOR_LOCK;
  415. info("DUMPING ALL COLLECTORS");
  416. if (unlikely(!collector_list || !collector_list->next)) {
  417. COLLECTOR_UNLOCK;
  418. info("DUMPING ALL COLLECTORS -- nothing found");
  419. return;
  420. }
  421. // Note that the first entry is "dummy"
  422. tmp_collector = collector_list->next;
  423. while (tmp_collector) {
  424. info(
  425. "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
  426. tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
  427. tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
  428. tmp_collector = tmp_collector->next;
  429. }
  430. info("DUMPING ALL COLLECTORS DONE");
  431. COLLECTOR_UNLOCK;
  432. }
  433. #endif
  434. /*
  435. * This will cleanup the collector list
  436. *
  437. */
  438. static void _reset_collector_list()
  439. {
  440. struct _collector *tmp_collector, *next_collector;
  441. COLLECTOR_LOCK;
  442. if (unlikely(!collector_list || !collector_list->next)) {
  443. COLLECTOR_UNLOCK;
  444. return;
  445. }
  446. // Note that the first entry is "dummy"
  447. tmp_collector = collector_list->next;
  448. collector_list->count = 0;
  449. collector_list->next = NULL;
  450. // We broke the link; we can unlock
  451. COLLECTOR_UNLOCK;
  452. while (tmp_collector) {
  453. next_collector = tmp_collector->next;
  454. _free_collector(tmp_collector);
  455. tmp_collector = next_collector;
  456. }
  457. }
  458. /*
  459. * Find a collector (if it exists)
  460. * Must lock before calling this
  461. * If last_collector is not null, it will return the previous collector in the linked
  462. * list (used in collector delete)
  463. */
  464. static struct _collector *_find_collector(
  465. const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
  466. {
  467. struct _collector *tmp_collector, *prev_collector;
  468. uint32_t plugin_hash;
  469. uint32_t module_hash;
  470. uint32_t hostname_hash;
  471. if (unlikely(!collector_list)) {
  472. collector_list = callocz(1, sizeof(struct _collector));
  473. return NULL;
  474. }
  475. if (unlikely(!collector_list->next))
  476. return NULL;
  477. plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  478. module_hash = module_name ? simple_hash(module_name) : 1;
  479. hostname_hash = simple_hash(hostname);
  480. // Note that the first entry is "dummy"
  481. tmp_collector = collector_list->next;
  482. prev_collector = collector_list;
  483. while (tmp_collector) {
  484. if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
  485. hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
  486. (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
  487. (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
  488. if (unlikely(last_collector))
  489. *last_collector = prev_collector;
  490. return tmp_collector;
  491. }
  492. prev_collector = tmp_collector;
  493. tmp_collector = tmp_collector->next;
  494. }
  495. return tmp_collector;
  496. }
  497. /*
  498. * Called to delete a collector
  499. * It will reduce the count (chart_count) and will remove it
  500. * from the linked list if the count reaches zero
  501. * The structure will be returned to the caller to free
  502. * the resources
  503. *
  504. */
  505. static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
  506. {
  507. struct _collector *tmp_collector, *prev_collector = NULL;
  508. tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
  509. if (likely(tmp_collector)) {
  510. --tmp_collector->count;
  511. if (unlikely(!tmp_collector->count))
  512. prev_collector->next = tmp_collector->next;
  513. }
  514. return tmp_collector;
  515. }
  516. /*
  517. * Add a new collector (plugin / module) to the list
  518. * If it already exists just update the chart count
  519. *
  520. * Lock before calling
  521. */
  522. static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
  523. {
  524. struct _collector *tmp_collector;
  525. tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
  526. if (unlikely(!tmp_collector)) {
  527. tmp_collector = callocz(1, sizeof(struct _collector));
  528. tmp_collector->hostname_hash = simple_hash(hostname);
  529. tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  530. tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
  531. tmp_collector->hostname = strdupz(hostname);
  532. tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
  533. tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
  534. tmp_collector->next = collector_list->next;
  535. collector_list->next = tmp_collector;
  536. }
  537. tmp_collector->count++;
  538. debug(
  539. D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
  540. module_name ? module_name : "*", tmp_collector->count);
  541. return tmp_collector;
  542. }
  543. /*
  544. * Add a new collector to the list
  545. * If it exists, update the chart count
  546. */
  547. void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
  548. {
  549. struct _collector *tmp_collector;
  550. COLLECTOR_LOCK;
  551. tmp_collector = _add_collector(hostname, plugin_name, module_name);
  552. if (unlikely(tmp_collector->count != 1)) {
  553. COLLECTOR_UNLOCK;
  554. return;
  555. }
  556. if (unlikely(agent_state == AGENT_INITIALIZING))
  557. last_init_sequence = now_realtime_sec();
  558. else {
  559. if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  560. debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
  561. }
  562. COLLECTOR_UNLOCK;
  563. }
  564. /*
  565. * Delete a collector from the list
  566. * If the chart count reaches zero the collector will be removed
  567. * from the list by calling del_collector.
  568. *
  569. * This function will release the memory used and schedule
  570. * a cloud update
  571. */
  572. void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
  573. {
  574. struct _collector *tmp_collector;
  575. COLLECTOR_LOCK;
  576. tmp_collector = _del_collector(hostname, plugin_name, module_name);
  577. if (unlikely(!tmp_collector || tmp_collector->count)) {
  578. COLLECTOR_UNLOCK;
  579. return;
  580. }
  581. debug(
  582. D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
  583. tmp_collector->count);
  584. COLLECTOR_UNLOCK;
  585. if (unlikely(agent_state == AGENT_INITIALIZING))
  586. last_init_sequence = now_realtime_sec();
  587. else {
  588. if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  589. debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
  590. }
  591. _free_collector(tmp_collector);
  592. }
  593. int aclk_execute_query(struct aclk_query *this_query)
  594. {
  595. if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
  596. struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
  597. w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  598. strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
  599. w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
  600. w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
  601. w->acl = 0x1f;
  602. char *mysep = strchr(this_query->query, '?');
  603. if (mysep) {
  604. strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
  605. *mysep = '\0';
  606. } else
  607. strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
  608. mysep = strrchr(this_query->query, '/');
  609. // TODO: handle bad response perhaps in a different way. For now it does to the payload
  610. int rc = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
  611. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  612. buffer_flush(local_buffer);
  613. local_buffer->contenttype = CT_APPLICATION_JSON;
  614. aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0);
  615. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  616. char *encoded_response = aclk_encode_response(w->response.data);
  617. buffer_sprintf(
  618. local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\"\n}", rc, encoded_response);
  619. buffer_sprintf(local_buffer, "\n}");
  620. aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
  621. buffer_free(w->response.data);
  622. freez(w);
  623. buffer_free(local_buffer);
  624. freez(encoded_response);
  625. return 0;
  626. }
  627. return 1;
  628. }
  629. /*
  630. * This function will fetch the next pending command and process it
  631. *
  632. */
  633. int aclk_process_query()
  634. {
  635. struct aclk_query *this_query;
  636. static long int query_count = 0;
  637. if (!aclk_connected)
  638. return 0;
  639. this_query = aclk_queue_pop();
  640. if (likely(!this_query)) {
  641. return 0;
  642. }
  643. if (unlikely(this_query->deleted)) {
  644. debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
  645. aclk_query_free(this_query);
  646. return 1;
  647. }
  648. query_count++;
  649. debug(
  650. D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic,
  651. this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created));
  652. switch (this_query->cmd) {
  653. case ACLK_CMD_ONCONNECT:
  654. debug(D_ACLK, "EXECUTING on connect metadata command");
  655. aclk_send_metadata();
  656. aclk_metadata_submitted = ACLK_METADATA_SENT;
  657. break;
  658. case ACLK_CMD_CHART:
  659. debug(D_ACLK, "EXECUTING a chart update command");
  660. aclk_send_single_chart(this_query->data, this_query->query);
  661. break;
  662. case ACLK_CMD_CHARTDEL:
  663. debug(D_ACLK, "EXECUTING a chart delete command");
  664. //TODO: This send the info metadata for now
  665. aclk_send_info_metadata();
  666. break;
  667. case ACLK_CMD_ALARM:
  668. debug(D_ACLK, "EXECUTING an alarm update command");
  669. aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
  670. break;
  671. case ACLK_CMD_CLOUD:
  672. debug(D_ACLK, "EXECUTING a cloud command");
  673. aclk_execute_query(this_query);
  674. break;
  675. default:
  676. break;
  677. }
  678. debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
  679. aclk_query_free(this_query);
  680. return 1;
  681. }
  682. /*
  683. * Process all pending queries
  684. * Return 0 if no queries were processed, 1 otherwise
  685. *
  686. */
  687. int aclk_process_queries()
  688. {
  689. if (unlikely(netdata_exit || !aclk_connected))
  690. return 0;
  691. if (likely(!aclk_queue.count))
  692. return 0;
  693. debug(D_ACLK, "Processing %d queries", (int)aclk_queue.count);
  694. //TODO: may consider possible throttling here
  695. while (aclk_process_query()) {
  696. // Process all commands
  697. };
  698. return 1;
  699. }
  700. static void aclk_query_thread_cleanup(void *ptr)
  701. {
  702. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  703. info("cleaning up...");
  704. _reset_collector_list();
  705. freez(collector_list);
  706. // Clean memory for pending queries if any
  707. struct aclk_query *this_query;
  708. do {
  709. this_query = aclk_queue_pop();
  710. aclk_query_free(this_query);
  711. } while (this_query);
  712. freez(static_thread->thread);
  713. freez(static_thread);
  714. }
  715. /**
  716. * Main query processing thread
  717. *
  718. * On startup wait for the agent collectors to initialize
  719. * Expect at least a time of ACLK_STABLE_TIMEOUT seconds
  720. * of no new collectors coming in in order to mark the agent
  721. * as stable (set agent_state = AGENT_STABLE)
  722. */
  723. void *aclk_query_main_thread(void *ptr)
  724. {
  725. netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
  726. while (agent_state == AGENT_INITIALIZING && !netdata_exit) {
  727. time_t checkpoint;
  728. checkpoint = now_realtime_sec() - last_init_sequence;
  729. if (checkpoint > ACLK_STABLE_TIMEOUT) {
  730. agent_state = AGENT_STABLE;
  731. info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
  732. #ifdef ACLK_DEBUG
  733. _dump_collector_list();
  734. #endif
  735. break;
  736. }
  737. info("Waiting for agent collectors to initialize. Last activity was %ld seconds ago" , checkpoint);
  738. sleep_usec(USEC_PER_SEC * 1);
  739. }
  740. while (!netdata_exit) {
  741. if (unlikely(!aclk_metadata_submitted)) {
  742. aclk_metadata_submitted = ACLK_METADATA_CMD_QUEUED;
  743. if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
  744. errno = 0;
  745. error("ACLK failed to queue on_connect command");
  746. aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  747. }
  748. }
  749. aclk_process_queries();
  750. QUERY_THREAD_LOCK;
  751. // TODO: Need to check if there are queries awaiting already
  752. if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
  753. sleep_usec(USEC_PER_SEC * 1);
  754. QUERY_THREAD_UNLOCK;
  755. } // forever
  756. info("Shutting down query processing thread");
  757. netdata_thread_cleanup_pop(1);
  758. return NULL;
  759. }
  760. // Thread cleanup
  761. static void aclk_main_cleanup(void *ptr)
  762. {
  763. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  764. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  765. info("cleaning up...");
  766. if (is_agent_claimed() && aclk_connected) {
  767. size_t write_q, write_q_bytes, read_q;
  768. time_t event_loop_timeout;
  769. // Wakeup thread to cleanup
  770. QUERY_THREAD_WAKEUP;
  771. // Send a graceful disconnect message
  772. BUFFER *b = buffer_create(512);
  773. aclk_create_header(b, "disconnect", NULL, 0, 0);
  774. buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
  775. aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
  776. buffer_free(b);
  777. event_loop_timeout = now_realtime_sec() + 5;
  778. write_q = 1;
  779. while (write_q && event_loop_timeout > now_realtime_sec()) {
  780. _link_event_loop();
  781. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  782. }
  783. aclk_shutting_down = 1;
  784. _link_shutdown();
  785. aclk_lws_wss_mqtt_layer_disconect_notif();
  786. write_q = 1;
  787. event_loop_timeout = now_realtime_sec() + 5;
  788. while (write_q && event_loop_timeout > now_realtime_sec()) {
  789. _link_event_loop();
  790. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  791. }
  792. }
  793. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  794. }
  795. struct dictionary_singleton {
  796. char *key;
  797. char *result;
  798. };
  799. int json_extract_singleton(JSON_ENTRY *e)
  800. {
  801. struct dictionary_singleton *data = e->callback_data;
  802. switch (e->type) {
  803. case JSON_OBJECT:
  804. case JSON_ARRAY:
  805. break;
  806. case JSON_STRING:
  807. if (!strcmp(e->name, data->key)) {
  808. data->result = strdupz(e->data.string);
  809. break;
  810. }
  811. break;
  812. case JSON_NUMBER:
  813. case JSON_BOOLEAN:
  814. case JSON_NULL:
  815. break;
  816. }
  817. return 0;
  818. }
  819. // Base-64 decoder.
  820. // Note: This is non-validating, invalid input will be decoded without an error.
  821. // Challenges are packed into json strings so we don't skip newlines.
  822. // Size errors (i.e. invalid input size or insufficient output space) are caught.
  823. size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
  824. {
  825. static char lookup[256];
  826. static int first_time=1;
  827. if (first_time)
  828. {
  829. first_time = 0;
  830. for(int i=0; i<256; i++)
  831. lookup[i] = -1;
  832. for(int i='A'; i<='Z'; i++)
  833. lookup[i] = i-'A';
  834. for(int i='a'; i<='z'; i++)
  835. lookup[i] = i-'a' + 26;
  836. for(int i='0'; i<='9'; i++)
  837. lookup[i] = i-'0' + 52;
  838. lookup['+'] = 62;
  839. lookup['/'] = 63;
  840. }
  841. if ((input_size & 3) != 0)
  842. {
  843. error("Can't decode base-64 input length %zu", input_size);
  844. return 0;
  845. }
  846. size_t unpadded_size = (input_size/4) * 3;
  847. if ( unpadded_size > output_size )
  848. {
  849. error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
  850. return 0;
  851. }
  852. // Don't check padding within full quantums
  853. for (size_t i = 0 ; i < input_size-4 ; i+=4 )
  854. {
  855. uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
  856. output[0] = value >> 16;
  857. output[1] = value >> 8;
  858. output[2] = value;
  859. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  860. output += 3;
  861. input += 4;
  862. }
  863. // Handle padding only in last quantum
  864. if (input[2] == '=') {
  865. uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
  866. output[0] = value >> 4;
  867. //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
  868. return unpadded_size-2;
  869. }
  870. else if (input[3] == '=') {
  871. uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
  872. output[0] = value >> 10;
  873. output[1] = value >> 2;
  874. //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
  875. return unpadded_size-1;
  876. }
  877. else
  878. {
  879. uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
  880. output[0] = value >> 16;
  881. output[1] = value >> 8;
  882. output[2] = value;
  883. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  884. return unpadded_size;
  885. }
  886. }
  887. size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
  888. {
  889. uint32_t value;
  890. static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
  891. "abcdefghijklmnopqrstuvwxyz"
  892. "0123456789+/";
  893. if ((input_size/3+1)*4 >= output_size)
  894. {
  895. error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
  896. return 0;
  897. }
  898. size_t count = 0;
  899. while (input_size>3)
  900. {
  901. value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
  902. output[0] = lookup[value >> 18];
  903. output[1] = lookup[(value >> 12) & 0x3f];
  904. output[2] = lookup[(value >> 6) & 0x3f];
  905. output[3] = lookup[value & 0x3f];
  906. //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  907. output += 4;
  908. input += 3;
  909. input_size -= 3;
  910. count += 4;
  911. }
  912. switch (input_size)
  913. {
  914. case 2:
  915. value = (input[0] << 10) + (input[1] << 2);
  916. output[0] = lookup[(value >> 12) & 0x3f];
  917. output[1] = lookup[(value >> 6) & 0x3f];
  918. output[2] = lookup[value & 0x3f];
  919. output[3] = '=';
  920. //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
  921. count += 4;
  922. break;
  923. case 1:
  924. value = input[0] << 4;
  925. output[0] = lookup[(value >> 6) & 0x3f];
  926. output[1] = lookup[value & 0x3f];
  927. output[2] = '=';
  928. output[3] = '=';
  929. //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  930. count += 4;
  931. break;
  932. case 0:
  933. break;
  934. }
  935. return count;
  936. }
  937. int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted)
  938. {
  939. int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING);
  940. if (result == -1) {
  941. char err[512];
  942. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  943. error("Decryption of the challenge failed: %s", err);
  944. }
  945. return result;
  946. }
  947. char *extract_payload(BUFFER *b)
  948. {
  949. char *s = b->buffer;
  950. unsigned int line_len=0;
  951. for (size_t i=0; i<b->len; i++)
  952. {
  953. if (*s == 0 )
  954. return NULL;
  955. if (*s == '\n' ) {
  956. if (line_len==0)
  957. return s+1;
  958. line_len = 0;
  959. }
  960. else if (*s == '\r') {
  961. /* don't count */
  962. }
  963. else
  964. line_len ++;
  965. s++;
  966. }
  967. return NULL;
  968. }
  969. void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
  970. {
  971. char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  972. debug(D_ACLK, "Performing challenge-response sequence");
  973. if (aclk_password != NULL)
  974. {
  975. freez(aclk_password);
  976. aclk_password = NULL;
  977. }
  978. // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
  979. // TODO - target host?
  980. char *agent_id = is_agent_claimed();
  981. if (agent_id == NULL)
  982. {
  983. error("Agent was not claimed - cannot perform challenge/response");
  984. goto CLEANUP;
  985. }
  986. char url[1024];
  987. sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
  988. info("Retrieving challenge from cloud: %s %s %s", aclk_hostname, aclk_port, url);
  989. if(aclk_send_https_request("GET", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
  990. {
  991. error("Challenge failed: %s", data_buffer);
  992. goto CLEANUP;
  993. }
  994. struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
  995. debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
  996. if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
  997. {
  998. freez(challenge.result);
  999. error("Could not parse the json response with the challenge: %s", data_buffer);
  1000. goto CLEANUP;
  1001. }
  1002. if (challenge.result == NULL ) {
  1003. error("Could not retrieve challenge from auth response: %s", data_buffer);
  1004. goto CLEANUP;
  1005. }
  1006. size_t challenge_len = strlen(challenge.result);
  1007. unsigned char decoded[512];
  1008. size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
  1009. unsigned char plaintext[4096]={};
  1010. int decrypted_length = private_decrypt(decoded, decoded_len, plaintext);
  1011. freez(challenge.result);
  1012. char encoded[512];
  1013. size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
  1014. encoded[encoded_len] = 0;
  1015. debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
  1016. char response_json[4096]={};
  1017. sprintf(response_json, "{\"response\":\"%s\"}", encoded);
  1018. debug(D_ACLK, "Password phase: %s",response_json);
  1019. // TODO - host
  1020. sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
  1021. if(aclk_send_https_request("POST", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
  1022. {
  1023. error("Challenge-response failed: %s", data_buffer);
  1024. goto CLEANUP;
  1025. }
  1026. debug(D_ACLK, "Password response from cloud: %s", data_buffer);
  1027. struct dictionary_singleton password = { .key = "password", .result = NULL };
  1028. if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
  1029. {
  1030. freez(password.result);
  1031. error("Could not parse the json response with the password: %s", data_buffer);
  1032. goto CLEANUP;
  1033. }
  1034. if (password.result == NULL ) {
  1035. error("Could not retrieve password from auth response");
  1036. goto CLEANUP;
  1037. }
  1038. if (aclk_password != NULL )
  1039. freez(aclk_password);
  1040. if (aclk_username == NULL)
  1041. aclk_username = strdupz(agent_id);
  1042. aclk_password = password.result;
  1043. CLEANUP:
  1044. freez(data_buffer);
  1045. return;
  1046. }
  1047. static void aclk_try_to_connect(char *hostname, char *port, int port_num)
  1048. {
  1049. info("Attempting to establish the agent cloud link");
  1050. aclk_get_challenge(hostname, port);
  1051. if (aclk_password == NULL)
  1052. return;
  1053. int rc;
  1054. aclk_connecting = 1;
  1055. rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password);
  1056. if (unlikely(rc)) {
  1057. error("Failed to initialize the agent cloud link library");
  1058. }
  1059. }
  1060. /**
  1061. * Main agent cloud link thread
  1062. *
  1063. * This thread will simply call the main event loop that handles
  1064. * pending requests - both inbound and outbound
  1065. *
  1066. * @param ptr is a pointer to the netdata_static_thread structure.
  1067. *
  1068. * @return It always returns NULL
  1069. */
  1070. void *aclk_main(void *ptr)
  1071. {
  1072. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  1073. struct netdata_static_thread *query_thread;
  1074. if (!netdata_cloud_setting) {
  1075. info("Killing ACLK thread -> cloud functionality has been disabled");
  1076. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1077. return NULL;
  1078. }
  1079. info("Waiting for netdata to be ready");
  1080. while (!netdata_ready) {
  1081. sleep_usec(USEC_PER_MS * 300);
  1082. }
  1083. last_init_sequence = now_realtime_sec();
  1084. query_thread = NULL;
  1085. char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
  1086. char *aclk_port = NULL;
  1087. uint32_t port_num = 0;
  1088. char *cloud_base_url = config_get(CONFIG_SECTION_CLOUD, "cloud base url", DEFAULT_CLOUD_BASE_URL);
  1089. if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
  1090. error("Configuration error - cannot use agent cloud link");
  1091. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1092. return NULL;
  1093. }
  1094. port_num = atoi(aclk_port); // SSL library uses the string, MQTT uses the numeric value
  1095. info("Waiting for netdata to be claimed");
  1096. while(1) {
  1097. while (likely(!is_agent_claimed())) {
  1098. sleep_usec(USEC_PER_SEC * 5);
  1099. if (netdata_exit)
  1100. goto exited;
  1101. }
  1102. if (!create_private_key() && !_mqtt_lib_init())
  1103. break;
  1104. if (netdata_exit)
  1105. goto exited;
  1106. sleep_usec(USEC_PER_SEC * 60);
  1107. }
  1108. create_publish_base_topic();
  1109. usec_t reconnect_expiry = 0; // In usecs
  1110. netdata_thread_disable_cancelability();
  1111. while (!netdata_exit) {
  1112. static int first_init = 0;
  1113. size_t write_q, write_q_bytes, read_q;
  1114. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  1115. //info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu",
  1116. // first_init, aclk_connected, aclk_connecting, write_q, write_q_bytes, read_q);
  1117. if (unlikely(!netdata_exit && !aclk_connected)) {
  1118. if (unlikely(!first_init)) {
  1119. aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
  1120. first_init = 1;
  1121. } else {
  1122. if (aclk_connecting == 0) {
  1123. if (reconnect_expiry == 0) {
  1124. unsigned long int delay = aclk_reconnect_delay(1);
  1125. reconnect_expiry = now_realtime_usec() + delay * 1000;
  1126. info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
  1127. }
  1128. if (now_realtime_usec() >= reconnect_expiry) {
  1129. reconnect_expiry = 0;
  1130. aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
  1131. }
  1132. sleep_usec(USEC_PER_MS * 100);
  1133. }
  1134. }
  1135. if (aclk_connecting) {
  1136. _link_event_loop();
  1137. sleep_usec(USEC_PER_MS * 100);
  1138. }
  1139. continue;
  1140. }
  1141. _link_event_loop();
  1142. if (unlikely(!aclk_connected))
  1143. continue;
  1144. /*static int stress_counter = 0;
  1145. if (write_q_bytes==0 && stress_counter ++ >5)
  1146. {
  1147. aclk_send_stress_test(8000000);
  1148. stress_counter = 0;
  1149. }*/
  1150. // TODO: Move to on-connect
  1151. if (unlikely(!aclk_subscribed)) {
  1152. aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
  1153. }
  1154. if (unlikely(!query_thread)) {
  1155. query_thread = callocz(1, sizeof(struct netdata_static_thread));
  1156. query_thread->thread = mallocz(sizeof(netdata_thread_t));
  1157. netdata_thread_create(
  1158. query_thread->thread, ACLK_THREAD_NAME, NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread,
  1159. query_thread);
  1160. }
  1161. } // forever
  1162. exited:
  1163. // Wakeup query thread to cleanup
  1164. QUERY_THREAD_WAKEUP;
  1165. freez(aclk_username);
  1166. freez(aclk_password);
  1167. freez(aclk_hostname);
  1168. freez(aclk_port);
  1169. if (aclk_private_key != NULL)
  1170. RSA_free(aclk_private_key);
  1171. aclk_main_cleanup(ptr);
  1172. return NULL;
  1173. }
  1174. /*
  1175. * Send a message to the cloud, using a base topic and sib_topic
  1176. * The final topic will be in the form <base_topic>/<sub_topic>
  1177. * If base_topic is missing then the global_base_topic will be used (if available)
  1178. *
  1179. */
  1180. int aclk_send_message(char *sub_topic, char *message, char *msg_id)
  1181. {
  1182. int rc;
  1183. int mid;
  1184. char topic[ACLK_MAX_TOPIC + 1];
  1185. char *final_topic;
  1186. UNUSED(msg_id);
  1187. if (!aclk_connected)
  1188. return 0;
  1189. if (unlikely(!message))
  1190. return 0;
  1191. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  1192. if (unlikely(!final_topic)) {
  1193. errno = 0;
  1194. error("Unable to build outgoing topic; truncated?");
  1195. return 1;
  1196. }
  1197. ACLK_LOCK;
  1198. rc = _link_send_message(final_topic, (unsigned char *)message, &mid);
  1199. // TODO: link the msg_id with the mid so we can trace it
  1200. ACLK_UNLOCK;
  1201. if (unlikely(rc)) {
  1202. errno = 0;
  1203. error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
  1204. }
  1205. return rc;
  1206. }
  1207. /*
  1208. * Subscribe to a topic in the cloud
  1209. * The final subscription will be in the form
  1210. * /agent/claim_id/<sub_topic>
  1211. */
  1212. int aclk_subscribe(char *sub_topic, int qos)
  1213. {
  1214. int rc;
  1215. char topic[ACLK_MAX_TOPIC + 1];
  1216. char *final_topic;
  1217. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  1218. if (unlikely(!final_topic)) {
  1219. errno = 0;
  1220. error("Unable to build outgoing topic; truncated?");
  1221. return 1;
  1222. }
  1223. if (!aclk_connected) {
  1224. error("Cannot subscribe to %s - not connected!", topic);
  1225. return 1;
  1226. }
  1227. ACLK_LOCK;
  1228. rc = _link_subscribe(final_topic, qos);
  1229. ACLK_UNLOCK;
  1230. // TODO: Add better handling -- error will flood the logfile here
  1231. if (unlikely(rc)) {
  1232. errno = 0;
  1233. error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
  1234. }
  1235. return rc;
  1236. }
  1237. // This is called from a callback when the link goes up
  1238. void aclk_connect()
  1239. {
  1240. info("Connection detected (%"PRIu64" queued queries)", aclk_queue.count);
  1241. aclk_connected = 1;
  1242. waiting_init = 0;
  1243. aclk_reconnect_delay(0);
  1244. QUERY_THREAD_WAKEUP;
  1245. return;
  1246. }
  1247. // This is called from a callback when the link goes down
  1248. void aclk_disconnect()
  1249. {
  1250. if (likely(aclk_connected))
  1251. info("Disconnect detected (%"PRIu64" queued queries)", aclk_queue.count);
  1252. aclk_subscribed = 0;
  1253. aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  1254. waiting_init = 1;
  1255. aclk_connected = 0;
  1256. aclk_connecting = 0;
  1257. }
  1258. void aclk_shutdown()
  1259. {
  1260. info("Shutdown initiated");
  1261. aclk_connected = 0;
  1262. _link_shutdown();
  1263. info("Shutdown complete");
  1264. }
  1265. inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us)
  1266. {
  1267. uuid_t uuid;
  1268. char uuid_str[36 + 1];
  1269. if (unlikely(!msg_id)) {
  1270. uuid_generate(uuid);
  1271. uuid_unparse(uuid, uuid_str);
  1272. msg_id = uuid_str;
  1273. }
  1274. if (ts_secs == 0) {
  1275. ts_us = now_realtime_usec();
  1276. ts_secs = ts_us / USEC_PER_SEC;
  1277. ts_us = ts_us % USEC_PER_SEC;
  1278. }
  1279. buffer_sprintf(
  1280. dest,
  1281. "\t{\"type\": \"%s\",\n"
  1282. "\t\"msg-id\": \"%s\",\n"
  1283. "\t\"timestamp\": %ld,\n"
  1284. "\t\"timestamp-offset-usec\": %llu,\n"
  1285. "\t\"connect\": %ld,\n"
  1286. "\t\"connect-offset-usec\": %llu,\n"
  1287. "\t\"version\": %d",
  1288. type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION);
  1289. debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs);
  1290. }
  1291. /*
  1292. * Take a buffer, encode it and rewrite it
  1293. *
  1294. */
  1295. char *aclk_encode_response(BUFFER *contents)
  1296. {
  1297. char *tmp_buffer = mallocz(contents->len * 2);
  1298. char *src, *dst;
  1299. size_t content_size = contents->len;
  1300. src = contents->buffer;
  1301. dst = tmp_buffer;
  1302. while (content_size > 0) {
  1303. switch (*src) {
  1304. case '\n':
  1305. case '\t':
  1306. break;
  1307. case 0x01 ... 0x08:
  1308. case 0x0b ... 0x1F:
  1309. *dst++ = '\\';
  1310. *dst++ = '0';
  1311. *dst++ = '0';
  1312. *dst++ = (*src < 0x0F) ? '0' : '1';
  1313. *dst++ = to_hex(*src);
  1314. break;
  1315. case '\"':
  1316. *dst++ = '\\';
  1317. *dst++ = *src;
  1318. break;
  1319. default:
  1320. *dst++ = *src;
  1321. }
  1322. src++;
  1323. content_size--;
  1324. }
  1325. *dst = '\0';
  1326. return tmp_buffer;
  1327. }
  1328. /*
  1329. * This will send alarm information which includes
  1330. * configured alarms
  1331. * alarm_log
  1332. * active alarms
  1333. */
  1334. void aclk_send_alarm_metadata()
  1335. {
  1336. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1337. char *msg_id = create_uuid();
  1338. buffer_flush(local_buffer);
  1339. local_buffer->contenttype = CT_APPLICATION_JSON;
  1340. debug(D_ACLK, "Metadata alarms start");
  1341. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1342. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1343. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1344. // session.
  1345. if (aclk_metadata_submitted == ACLK_METADATA_SENT)
  1346. aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0);
  1347. else
  1348. aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us);
  1349. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1350. buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
  1351. health_alarms2json(localhost, local_buffer, 1);
  1352. debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);
  1353. buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
  1354. health_alarm_log2json(localhost, local_buffer, 0);
  1355. debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
  1356. buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
  1357. health_alarms_values2json(localhost, local_buffer, 0);
  1358. debug(D_ACLK, "Metadata %s with alarms_active has %zu bytes", msg_id, local_buffer->len);
  1359. buffer_sprintf(local_buffer, "\n}\n}");
  1360. aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
  1361. freez(msg_id);
  1362. buffer_free(local_buffer);
  1363. }
  1364. /*
  1365. * This will send the agent metadata
  1366. * /api/v1/info
  1367. * charts
  1368. */
  1369. int aclk_send_info_metadata()
  1370. {
  1371. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1372. debug(D_ACLK, "Metadata /info start");
  1373. char *msg_id = create_uuid();
  1374. buffer_flush(local_buffer);
  1375. local_buffer->contenttype = CT_APPLICATION_JSON;
  1376. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1377. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1378. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1379. // session.
  1380. if (aclk_metadata_submitted == ACLK_METADATA_SENT)
  1381. aclk_create_header(local_buffer, "connect", msg_id, 0, 0);
  1382. else
  1383. aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us);
  1384. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1385. buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
  1386. web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
  1387. debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
  1388. buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
  1389. charts2json(localhost, local_buffer, 1);
  1390. buffer_sprintf(local_buffer, "\n}\n}");
  1391. debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
  1392. aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
  1393. freez(msg_id);
  1394. buffer_free(local_buffer);
  1395. return 0;
  1396. }
  1397. void aclk_send_stress_test(size_t size)
  1398. {
  1399. char *buffer = mallocz(size);
  1400. if (buffer != NULL)
  1401. {
  1402. for(size_t i=0; i<size; i++)
  1403. buffer[i] = 'x';
  1404. buffer[size-1] = 0;
  1405. time_t time_created = now_realtime_sec();
  1406. sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
  1407. buffer[strlen(buffer)] = '"';
  1408. buffer[size-2] = '}';
  1409. buffer[size-3] = '"';
  1410. aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
  1411. error("Sending stress of size %zu at time %ld", size, time_created);
  1412. }
  1413. free(buffer);
  1414. }
  1415. // Send info metadata message to the cloud if the link is established
  1416. // or on request
  1417. int aclk_send_metadata()
  1418. {
  1419. aclk_send_info_metadata();
  1420. aclk_send_alarm_metadata();
  1421. return 0;
  1422. }
  1423. void aclk_single_update_disable()
  1424. {
  1425. aclk_disable_single_updates = 1;
  1426. }
  1427. void aclk_single_update_enable()
  1428. {
  1429. aclk_disable_single_updates = 0;
  1430. }
  1431. // Trigged by a health reload, sends the alarm metadata
  1432. void aclk_alarm_reload()
  1433. {
  1434. if (unlikely(agent_state == AGENT_INITIALIZING))
  1435. return;
  1436. if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
  1437. if (likely(aclk_connected)) {
  1438. errno = 0;
  1439. error("ACLK failed to queue on_connect command on alarm reload");
  1440. }
  1441. }
  1442. }
  1443. //rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
  1444. int aclk_send_single_chart(char *hostname, char *chart)
  1445. {
  1446. RRDHOST *target_host;
  1447. target_host = rrdhost_find_by_hostname(hostname, 0);
  1448. if (!target_host)
  1449. return 1;
  1450. RRDSET *st = rrdset_find(target_host, chart);
  1451. if (!st)
  1452. st = rrdset_find_byname(target_host, chart);
  1453. if (!st) {
  1454. info("FAILED to find chart %s", chart);
  1455. return 1;
  1456. }
  1457. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1458. char *msg_id = create_uuid();
  1459. buffer_flush(local_buffer);
  1460. local_buffer->contenttype = CT_APPLICATION_JSON;
  1461. aclk_create_header(local_buffer, "chart", msg_id, 0, 0);
  1462. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1463. rrdset2json(st, local_buffer, NULL, NULL, 1);
  1464. buffer_sprintf(local_buffer, "\t\n}");
  1465. aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);
  1466. freez(msg_id);
  1467. buffer_free(local_buffer);
  1468. return 0;
  1469. }
  1470. int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
  1471. {
  1472. #ifndef ENABLE_ACLK
  1473. UNUSED(host);
  1474. UNUSED(chart_name);
  1475. return 0;
  1476. #else
  1477. if (!netdata_cloud_setting)
  1478. return 0;
  1479. if (host != localhost)
  1480. return 0;
  1481. if (unlikely(aclk_disable_single_updates))
  1482. return 0;
  1483. if (unlikely(agent_state == AGENT_INITIALIZING))
  1484. last_init_sequence = now_realtime_sec();
  1485. else {
  1486. if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) {
  1487. if (likely(aclk_connected)) {
  1488. errno = 0;
  1489. error("ACLK failed to queue chart_update command");
  1490. }
  1491. }
  1492. }
  1493. return 0;
  1494. #endif
  1495. }
  1496. int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
  1497. {
  1498. BUFFER *local_buffer = NULL;
  1499. if (host != localhost)
  1500. return 0;
  1501. if (unlikely(agent_state == AGENT_INITIALIZING))
  1502. return 0;
  1503. /*
  1504. * Check if individual updates have been disabled
  1505. * This will be the case when we do health reload
  1506. * and all the alarms will be dropped and recreated.
  1507. * At the end of the health reload the complete alarm metadata
  1508. * info will be sent
  1509. */
  1510. if (unlikely(aclk_disable_single_updates))
  1511. return 0;
  1512. local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1513. char *msg_id = create_uuid();
  1514. buffer_flush(local_buffer);
  1515. aclk_create_header(local_buffer, "status-change", msg_id, 0, 0);
  1516. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1517. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  1518. health_alarm_entry2json_nolock(local_buffer, ae, host);
  1519. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  1520. buffer_sprintf(local_buffer, "\n}");
  1521. if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
  1522. if (likely(aclk_connected)) {
  1523. errno = 0;
  1524. error("ACLK failed to queue alarm_command on alarm_update");
  1525. }
  1526. }
  1527. freez(msg_id);
  1528. buffer_free(local_buffer);
  1529. return 0;
  1530. }
  1531. /*
  1532. * Parse the incoming payload and queue a command if valid
  1533. */
  1534. int aclk_handle_cloud_request(char *payload)
  1535. {
  1536. struct aclk_request cloud_to_agent = {
  1537. .type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0
  1538. };
  1539. if (unlikely(agent_state == AGENT_INITIALIZING)) {
  1540. debug(D_ACLK, "Ignoring cloud request; agent not in stable state");
  1541. return 0;
  1542. }
  1543. if (unlikely(!payload)) {
  1544. debug(D_ACLK, "ACLK incoming message is empty");
  1545. return 0;
  1546. }
  1547. debug(D_ACLK, "ACLK incoming message (%s)", payload);
  1548. int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
  1549. if (unlikely(
  1550. JSON_OK != rc || !cloud_to_agent.payload || !cloud_to_agent.callback_topic || !cloud_to_agent.msg_id ||
  1551. !cloud_to_agent.type_id || cloud_to_agent.version > ACLK_VERSION ||
  1552. strcmp(cloud_to_agent.type_id, "http"))) {
  1553. if (JSON_OK != rc)
  1554. error("Malformed json request (%s)", payload);
  1555. if (cloud_to_agent.version > ACLK_VERSION)
  1556. error("Unsupported version in JSON request %d", cloud_to_agent.version);
  1557. if (cloud_to_agent.payload)
  1558. freez(cloud_to_agent.payload);
  1559. if (cloud_to_agent.type_id)
  1560. freez(cloud_to_agent.type_id);
  1561. if (cloud_to_agent.msg_id)
  1562. freez(cloud_to_agent.msg_id);
  1563. if (cloud_to_agent.callback_topic)
  1564. freez(cloud_to_agent.callback_topic);
  1565. return 1;
  1566. }
  1567. // Checked to be "http", not needed anymore
  1568. if (likely(cloud_to_agent.type_id)) {
  1569. freez(cloud_to_agent.type_id);
  1570. cloud_to_agent.type_id = NULL;
  1571. }
  1572. if (unlikely(aclk_submit_request(&cloud_to_agent)))
  1573. debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);
  1574. // Note: the payload comes from the callback and it will be automatically freed
  1575. return 0;
  1576. }