agent_cloud_link.c 58 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "libnetdata/libnetdata.h"
  3. #include "agent_cloud_link.h"
  4. #include "aclk_lws_https_client.h"
  5. #include "aclk_common.h"
  6. int aclk_shutting_down = 0;
  7. // State-machine for the on-connect metadata transmission.
  8. // TODO: The AGENT_STATE should be centralized as it would be useful to control error-logging during the initial
  9. // agent startup phase.
  10. static ACLK_METADATA_STATE aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  11. static AGENT_STATE agent_state = AGENT_INITIALIZING;
  12. // Other global state
  13. static int aclk_subscribed = 0;
  14. static int aclk_disable_single_updates = 0;
  15. static time_t last_init_sequence = 0;
  16. static int waiting_init = 1;
  17. static char *aclk_username = NULL;
  18. static char *aclk_password = NULL;
  19. static char *global_base_topic = NULL;
  20. static int aclk_connecting = 0;
  21. int aclk_connected = 0; // Exposed in the web-api
  22. int aclk_force_reconnect = 0; // Indication from lower layers
  23. usec_t aclk_session_us = 0; // Used by the mqtt layer
  24. time_t aclk_session_sec = 0; // Used by the mqtt layer
  25. static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
  26. static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
  27. static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
  28. #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
  29. #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
  30. #define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
  31. #define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
  32. #define QUERY_LOCK netdata_mutex_lock(&query_mutex)
  33. #define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
  34. pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
  35. pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
  36. #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
  37. #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
  38. #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
  39. void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
  40. void aclk_lws_wss_destroy_context();
  41. /*
  42. * Maintain a list of collectors and chart count
  43. * If all the charts of a collector are deleted
  44. * then a new metadata dataset must be send to the cloud
  45. *
  46. */
  47. struct _collector {
  48. time_t created;
  49. uint32_t count; //chart count
  50. uint32_t hostname_hash;
  51. uint32_t plugin_hash;
  52. uint32_t module_hash;
  53. char *hostname;
  54. char *plugin_name;
  55. char *module_name;
  56. struct _collector *next;
  57. };
  58. struct _collector *collector_list = NULL;
  59. struct aclk_query {
  60. time_t created;
  61. time_t run_after; // Delay run until after this time
  62. ACLK_CMD cmd; // What command is this
  63. char *topic; // Topic to respond to
  64. char *data; // Internal data (NULL if request from the cloud)
  65. char *msg_id; // msg_id generated by the cloud (NULL if internal)
  66. char *query; // The actual query
  67. u_char deleted; // Mark deleted for garbage collect
  68. struct aclk_query *next;
  69. };
  70. struct aclk_query_queue {
  71. struct aclk_query *aclk_query_head;
  72. struct aclk_query *aclk_query_tail;
  73. uint64_t count;
  74. } aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
  75. char *create_uuid()
  76. {
  77. uuid_t uuid;
  78. char *uuid_str = mallocz(36 + 1);
  79. uuid_generate(uuid);
  80. uuid_unparse(uuid, uuid_str);
  81. return uuid_str;
  82. }
  83. int cloud_to_agent_parse(JSON_ENTRY *e)
  84. {
  85. struct aclk_request *data = e->callback_data;
  86. switch (e->type) {
  87. case JSON_OBJECT:
  88. case JSON_ARRAY:
  89. break;
  90. case JSON_STRING:
  91. if (!strcmp(e->name, "msg-id")) {
  92. data->msg_id = strdupz(e->data.string);
  93. break;
  94. }
  95. if (!strcmp(e->name, "type")) {
  96. data->type_id = strdupz(e->data.string);
  97. break;
  98. }
  99. if (!strcmp(e->name, "callback-topic")) {
  100. data->callback_topic = strdupz(e->data.string);
  101. break;
  102. }
  103. if (!strcmp(e->name, "payload")) {
  104. if (likely(e->data.string)) {
  105. size_t len = strlen(e->data.string);
  106. data->payload = mallocz(len+1);
  107. if (!url_decode_r(data->payload, e->data.string, len + 1))
  108. strcpy(data->payload, e->data.string);
  109. }
  110. break;
  111. }
  112. break;
  113. case JSON_NUMBER:
  114. if (!strcmp(e->name, "version")) {
  115. data->version = atoi(e->original_string);
  116. break;
  117. }
  118. break;
  119. case JSON_BOOLEAN:
  120. break;
  121. case JSON_NULL:
  122. break;
  123. }
  124. return 0;
  125. }
  126. static RSA *aclk_private_key = NULL;
  127. static int create_private_key()
  128. {
  129. char filename[FILENAME_MAX + 1];
  130. snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
  131. long bytes_read;
  132. char *private_key = read_by_filename(filename, &bytes_read);
  133. if (!private_key) {
  134. error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  135. return 1;
  136. }
  137. debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  138. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  139. if (key_bio==NULL) {
  140. error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  141. goto biofailed;
  142. }
  143. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  144. BIO_free(key_bio);
  145. if (aclk_private_key!=NULL)
  146. {
  147. freez(private_key);
  148. return 0;
  149. }
  150. char err[512];
  151. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  152. error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  153. biofailed:
  154. freez(private_key);
  155. return 1;
  156. }
  157. /*
  158. * After a connection failure -- delay in milliseconds
  159. * When a connection is established, the delay function
  160. * should be called with
  161. *
  162. * mode 0 to reset the delay
  163. * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
  164. *
  165. */
  166. unsigned long int aclk_reconnect_delay(int mode)
  167. {
  168. static int fail = -1;
  169. unsigned long int delay;
  170. if (!mode || fail == -1) {
  171. srandom(time(NULL));
  172. fail = mode - 1;
  173. return 0;
  174. }
  175. delay = (1 << fail);
  176. if (delay >= ACLK_MAX_BACKOFF_DELAY) {
  177. delay = ACLK_MAX_BACKOFF_DELAY * 1000;
  178. } else {
  179. fail++;
  180. delay = (delay * 1000) + (random() % 1000);
  181. }
  182. return delay;
  183. }
  184. /*
  185. * Free a query structure when done
  186. */
  187. void aclk_query_free(struct aclk_query *this_query)
  188. {
  189. if (unlikely(!this_query))
  190. return;
  191. freez(this_query->topic);
  192. if (likely(this_query->query))
  193. freez(this_query->query);
  194. if (likely(this_query->data))
  195. freez(this_query->data);
  196. if (likely(this_query->msg_id))
  197. freez(this_query->msg_id);
  198. freez(this_query);
  199. }
  200. // Returns the entry after which we need to create a new entry to run at the specified time
  201. // If NULL is returned we need to add to HEAD
  202. // Need to have a QUERY lock before calling this
  203. struct aclk_query *aclk_query_find_position(time_t time_to_run)
  204. {
  205. struct aclk_query *tmp_query, *last_query;
  206. // Quick check if we will add to the end
  207. if (likely(aclk_queue.aclk_query_tail)) {
  208. if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
  209. return aclk_queue.aclk_query_tail;
  210. }
  211. last_query = NULL;
  212. tmp_query = aclk_queue.aclk_query_head;
  213. while (tmp_query) {
  214. if (tmp_query->run_after > time_to_run)
  215. return last_query;
  216. last_query = tmp_query;
  217. tmp_query = tmp_query->next;
  218. }
  219. return last_query;
  220. }
  221. // Need to have a QUERY lock before calling this
  222. struct aclk_query *
  223. aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
  224. {
  225. struct aclk_query *tmp_query, *prev_query;
  226. UNUSED(cmd);
  227. tmp_query = aclk_queue.aclk_query_head;
  228. prev_query = NULL;
  229. while (tmp_query) {
  230. if (likely(!tmp_query->deleted)) {
  231. if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
  232. if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
  233. (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
  234. if (likely(last_query))
  235. *last_query = prev_query;
  236. return tmp_query;
  237. }
  238. }
  239. }
  240. prev_query = tmp_query;
  241. tmp_query = tmp_query->next;
  242. }
  243. return NULL;
  244. }
  245. /*
  246. * Add a query to execute, the result will be send to the specified topic
  247. */
  248. int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
  249. {
  250. struct aclk_query *new_query, *tmp_query;
  251. // Ignore all commands while we wait for the agent to initialize
  252. if (unlikely(waiting_init))
  253. return 1;
  254. run_after = now_realtime_sec() + run_after;
  255. QUERY_LOCK;
  256. struct aclk_query *last_query = NULL;
  257. tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
  258. if (unlikely(tmp_query)) {
  259. if (tmp_query->run_after == run_after) {
  260. QUERY_UNLOCK;
  261. QUERY_THREAD_WAKEUP;
  262. return 0;
  263. }
  264. if (last_query)
  265. last_query->next = tmp_query->next;
  266. else
  267. aclk_queue.aclk_query_head = tmp_query->next;
  268. debug(D_ACLK, "Removing double entry");
  269. aclk_query_free(tmp_query);
  270. aclk_queue.count--;
  271. }
  272. new_query = callocz(1, sizeof(struct aclk_query));
  273. new_query->cmd = aclk_cmd;
  274. if (internal) {
  275. new_query->topic = strdupz(topic);
  276. if (likely(query))
  277. new_query->query = strdupz(query);
  278. } else {
  279. new_query->topic = topic;
  280. new_query->query = query;
  281. new_query->msg_id = msg_id;
  282. }
  283. if (data)
  284. new_query->data = strdupz(data);
  285. new_query->next = NULL;
  286. new_query->created = now_realtime_sec();
  287. new_query->run_after = run_after;
  288. debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
  289. tmp_query = aclk_query_find_position(run_after);
  290. if (tmp_query) {
  291. new_query->next = tmp_query->next;
  292. tmp_query->next = new_query;
  293. if (tmp_query == aclk_queue.aclk_query_tail)
  294. aclk_queue.aclk_query_tail = new_query;
  295. aclk_queue.count++;
  296. QUERY_UNLOCK;
  297. QUERY_THREAD_WAKEUP;
  298. return 0;
  299. }
  300. new_query->next = aclk_queue.aclk_query_head;
  301. aclk_queue.aclk_query_head = new_query;
  302. aclk_queue.count++;
  303. QUERY_UNLOCK;
  304. QUERY_THREAD_WAKEUP;
  305. return 0;
  306. }
  307. inline int aclk_submit_request(struct aclk_request *request)
  308. {
  309. return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD);
  310. }
  311. /*
  312. * Get the next query to process - NULL if nothing there
  313. * The caller needs to free memory by calling aclk_query_free()
  314. *
  315. * topic
  316. * query
  317. * The structure itself
  318. *
  319. */
  320. struct aclk_query *aclk_queue_pop()
  321. {
  322. struct aclk_query *this_query;
  323. QUERY_LOCK;
  324. if (likely(!aclk_queue.aclk_query_head)) {
  325. QUERY_UNLOCK;
  326. return NULL;
  327. }
  328. this_query = aclk_queue.aclk_query_head;
  329. // Get rid of the deleted entries
  330. while (this_query && this_query->deleted) {
  331. aclk_queue.count--;
  332. aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
  333. if (likely(!aclk_queue.aclk_query_head)) {
  334. aclk_queue.aclk_query_tail = NULL;
  335. }
  336. aclk_query_free(this_query);
  337. this_query = aclk_queue.aclk_query_head;
  338. }
  339. if (likely(!this_query)) {
  340. QUERY_UNLOCK;
  341. return NULL;
  342. }
  343. if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
  344. info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
  345. QUERY_UNLOCK;
  346. return NULL;
  347. }
  348. aclk_queue.count--;
  349. aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
  350. if (likely(!aclk_queue.aclk_query_head)) {
  351. aclk_queue.aclk_query_tail = NULL;
  352. }
  353. QUERY_UNLOCK;
  354. return this_query;
  355. }
  356. // This will give the base topic that the agent will publish messages.
  357. // subtopics will be sent under the base topic e.g. base_topic/subtopic
  358. // This is called by aclk_init(), to compute the base topic once and have
  359. // it stored internally.
  360. // Need to check if additional logic should be added to make sure that there
  361. // is enough information to determine the base topic at init time
  362. char *create_publish_base_topic()
  363. {
  364. if (unlikely(!is_agent_claimed()))
  365. return NULL;
  366. ACLK_LOCK;
  367. if (unlikely(!global_base_topic)) {
  368. char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
  369. snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
  370. tmp = strchr(tmp_topic, '\n');
  371. if (unlikely(tmp))
  372. *tmp = '\0';
  373. global_base_topic = strdupz(tmp_topic);
  374. }
  375. ACLK_UNLOCK;
  376. return global_base_topic;
  377. }
  378. /*
  379. * Build a topic based on sub_topic and final_topic
  380. * if the sub topic starts with / assume that is an absolute topic
  381. *
  382. */
  383. char *get_topic(char *sub_topic, char *final_topic, int max_size)
  384. {
  385. int rc;
  386. if (likely(sub_topic && sub_topic[0] == '/'))
  387. return sub_topic;
  388. if (unlikely(!global_base_topic))
  389. return sub_topic;
  390. rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
  391. if (unlikely(rc >= max_size))
  392. debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);
  393. return final_topic;
  394. }
  395. /*
  396. * Free a collector structure
  397. */
  398. static void _free_collector(struct _collector *collector)
  399. {
  400. if (likely(collector->plugin_name))
  401. freez(collector->plugin_name);
  402. if (likely(collector->module_name))
  403. freez(collector->module_name);
  404. if (likely(collector->hostname))
  405. freez(collector->hostname);
  406. freez(collector);
  407. }
  408. /*
  409. * This will report the collector list
  410. *
  411. */
  412. #ifdef ACLK_DEBUG
  413. static void _dump_collector_list()
  414. {
  415. struct _collector *tmp_collector;
  416. COLLECTOR_LOCK;
  417. info("DUMPING ALL COLLECTORS");
  418. if (unlikely(!collector_list || !collector_list->next)) {
  419. COLLECTOR_UNLOCK;
  420. info("DUMPING ALL COLLECTORS -- nothing found");
  421. return;
  422. }
  423. // Note that the first entry is "dummy"
  424. tmp_collector = collector_list->next;
  425. while (tmp_collector) {
  426. info(
  427. "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
  428. tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
  429. tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
  430. tmp_collector = tmp_collector->next;
  431. }
  432. info("DUMPING ALL COLLECTORS DONE");
  433. COLLECTOR_UNLOCK;
  434. }
  435. #endif
  436. /*
  437. * This will cleanup the collector list
  438. *
  439. */
  440. static void _reset_collector_list()
  441. {
  442. struct _collector *tmp_collector, *next_collector;
  443. COLLECTOR_LOCK;
  444. if (unlikely(!collector_list || !collector_list->next)) {
  445. COLLECTOR_UNLOCK;
  446. return;
  447. }
  448. // Note that the first entry is "dummy"
  449. tmp_collector = collector_list->next;
  450. collector_list->count = 0;
  451. collector_list->next = NULL;
  452. // We broke the link; we can unlock
  453. COLLECTOR_UNLOCK;
  454. while (tmp_collector) {
  455. next_collector = tmp_collector->next;
  456. _free_collector(tmp_collector);
  457. tmp_collector = next_collector;
  458. }
  459. }
  460. /*
  461. * Find a collector (if it exists)
  462. * Must lock before calling this
  463. * If last_collector is not null, it will return the previous collector in the linked
  464. * list (used in collector delete)
  465. */
  466. static struct _collector *_find_collector(
  467. const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
  468. {
  469. struct _collector *tmp_collector, *prev_collector;
  470. uint32_t plugin_hash;
  471. uint32_t module_hash;
  472. uint32_t hostname_hash;
  473. if (unlikely(!collector_list)) {
  474. collector_list = callocz(1, sizeof(struct _collector));
  475. return NULL;
  476. }
  477. if (unlikely(!collector_list->next))
  478. return NULL;
  479. plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  480. module_hash = module_name ? simple_hash(module_name) : 1;
  481. hostname_hash = simple_hash(hostname);
  482. // Note that the first entry is "dummy"
  483. tmp_collector = collector_list->next;
  484. prev_collector = collector_list;
  485. while (tmp_collector) {
  486. if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
  487. hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
  488. (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
  489. (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
  490. if (unlikely(last_collector))
  491. *last_collector = prev_collector;
  492. return tmp_collector;
  493. }
  494. prev_collector = tmp_collector;
  495. tmp_collector = tmp_collector->next;
  496. }
  497. return tmp_collector;
  498. }
  499. /*
  500. * Called to delete a collector
  501. * It will reduce the count (chart_count) and will remove it
  502. * from the linked list if the count reaches zero
  503. * The structure will be returned to the caller to free
  504. * the resources
  505. *
  506. */
  507. static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
  508. {
  509. struct _collector *tmp_collector, *prev_collector = NULL;
  510. tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
  511. if (likely(tmp_collector)) {
  512. --tmp_collector->count;
  513. if (unlikely(!tmp_collector->count))
  514. prev_collector->next = tmp_collector->next;
  515. }
  516. return tmp_collector;
  517. }
  518. /*
  519. * Add a new collector (plugin / module) to the list
  520. * If it already exists just update the chart count
  521. *
  522. * Lock before calling
  523. */
  524. static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
  525. {
  526. struct _collector *tmp_collector;
  527. tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
  528. if (unlikely(!tmp_collector)) {
  529. tmp_collector = callocz(1, sizeof(struct _collector));
  530. tmp_collector->hostname_hash = simple_hash(hostname);
  531. tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  532. tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
  533. tmp_collector->hostname = strdupz(hostname);
  534. tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
  535. tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
  536. tmp_collector->next = collector_list->next;
  537. collector_list->next = tmp_collector;
  538. }
  539. tmp_collector->count++;
  540. debug(
  541. D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
  542. module_name ? module_name : "*", tmp_collector->count);
  543. return tmp_collector;
  544. }
  545. /*
  546. * Add a new collector to the list
  547. * If it exists, update the chart count
  548. */
  549. void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
  550. {
  551. struct _collector *tmp_collector;
  552. COLLECTOR_LOCK;
  553. tmp_collector = _add_collector(hostname, plugin_name, module_name);
  554. if (unlikely(tmp_collector->count != 1)) {
  555. COLLECTOR_UNLOCK;
  556. return;
  557. }
  558. if (unlikely(agent_state == AGENT_INITIALIZING))
  559. last_init_sequence = now_realtime_sec();
  560. else {
  561. if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  562. debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
  563. }
  564. COLLECTOR_UNLOCK;
  565. }
  566. /*
  567. * Delete a collector from the list
  568. * If the chart count reaches zero the collector will be removed
  569. * from the list by calling del_collector.
  570. *
  571. * This function will release the memory used and schedule
  572. * a cloud update
  573. */
  574. void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
  575. {
  576. struct _collector *tmp_collector;
  577. COLLECTOR_LOCK;
  578. tmp_collector = _del_collector(hostname, plugin_name, module_name);
  579. if (unlikely(!tmp_collector || tmp_collector->count)) {
  580. COLLECTOR_UNLOCK;
  581. return;
  582. }
  583. debug(
  584. D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
  585. tmp_collector->count);
  586. COLLECTOR_UNLOCK;
  587. if (unlikely(agent_state == AGENT_INITIALIZING))
  588. last_init_sequence = now_realtime_sec();
  589. else {
  590. if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  591. debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
  592. }
  593. _free_collector(tmp_collector);
  594. }
  595. /*
  596. * Take a buffer, encode it and rewrite it
  597. *
  598. */
  599. static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines)
  600. {
  601. char *tmp_buffer = mallocz(content_size * 2);
  602. char *dst = tmp_buffer;
  603. while (content_size > 0) {
  604. switch (*src) {
  605. case '\n':
  606. if (keep_newlines)
  607. {
  608. *dst++ = '\\';
  609. *dst++ = 'n';
  610. }
  611. break;
  612. case '\t':
  613. break;
  614. case 0x01 ... 0x08:
  615. case 0x0b ... 0x1F:
  616. *dst++ = '\\';
  617. *dst++ = 'u';
  618. *dst++ = '0';
  619. *dst++ = '0';
  620. *dst++ = (*src < 0x0F) ? '0' : '1';
  621. *dst++ = to_hex(*src);
  622. break;
  623. case '\"':
  624. *dst++ = '\\';
  625. *dst++ = *src;
  626. break;
  627. default:
  628. *dst++ = *src;
  629. }
  630. src++;
  631. content_size--;
  632. }
  633. *dst = '\0';
  634. return tmp_buffer;
  635. }
  636. int aclk_execute_query(struct aclk_query *this_query)
  637. {
  638. if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
  639. struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
  640. w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  641. w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  642. w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  643. strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
  644. w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
  645. w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
  646. w->acl = 0x1f;
  647. char *mysep = strchr(this_query->query, '?');
  648. if (mysep) {
  649. strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
  650. *mysep = '\0';
  651. } else
  652. strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
  653. mysep = strrchr(this_query->query, '/');
  654. // TODO: handle bad response perhaps in a different way. For now it does to the payload
  655. w->response.code = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
  656. now_realtime_timeval(&w->tv_ready);
  657. w->response.data->date = w->tv_ready.tv_sec;
  658. web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
  659. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  660. buffer_flush(local_buffer);
  661. local_buffer->contenttype = CT_APPLICATION_JSON;
  662. aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0);
  663. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  664. char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
  665. char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
  666. buffer_sprintf(
  667. local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}",
  668. w->response.code, encoded_response, encoded_header);
  669. buffer_sprintf(local_buffer, "\n}");
  670. debug(D_ACLK, "Response:%s", encoded_header);
  671. aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
  672. buffer_free(w->response.data);
  673. buffer_free(w->response.header);
  674. buffer_free(w->response.header_output);
  675. freez(w);
  676. buffer_free(local_buffer);
  677. freez(encoded_response);
  678. freez(encoded_header);
  679. return 0;
  680. }
  681. return 1;
  682. }
  683. /*
  684. * This function will fetch the next pending command and process it
  685. *
  686. */
  687. int aclk_process_query()
  688. {
  689. struct aclk_query *this_query;
  690. static long int query_count = 0;
  691. if (!aclk_connected)
  692. return 0;
  693. this_query = aclk_queue_pop();
  694. if (likely(!this_query)) {
  695. return 0;
  696. }
  697. if (unlikely(this_query->deleted)) {
  698. debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
  699. aclk_query_free(this_query);
  700. return 1;
  701. }
  702. query_count++;
  703. debug(
  704. D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic,
  705. this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created));
  706. switch (this_query->cmd) {
  707. case ACLK_CMD_ONCONNECT:
  708. debug(D_ACLK, "EXECUTING on connect metadata command");
  709. aclk_send_metadata();
  710. aclk_metadata_submitted = ACLK_METADATA_SENT;
  711. break;
  712. case ACLK_CMD_CHART:
  713. debug(D_ACLK, "EXECUTING a chart update command");
  714. aclk_send_single_chart(this_query->data, this_query->query);
  715. break;
  716. case ACLK_CMD_CHARTDEL:
  717. debug(D_ACLK, "EXECUTING a chart delete command");
  718. //TODO: This send the info metadata for now
  719. aclk_send_info_metadata();
  720. break;
  721. case ACLK_CMD_ALARM:
  722. debug(D_ACLK, "EXECUTING an alarm update command");
  723. aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
  724. break;
  725. case ACLK_CMD_CLOUD:
  726. debug(D_ACLK, "EXECUTING a cloud command");
  727. aclk_execute_query(this_query);
  728. break;
  729. default:
  730. break;
  731. }
  732. debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
  733. aclk_query_free(this_query);
  734. return 1;
  735. }
  736. /*
  737. * Process all pending queries
  738. * Return 0 if no queries were processed, 1 otherwise
  739. *
  740. */
  741. int aclk_process_queries()
  742. {
  743. if (unlikely(netdata_exit || !aclk_connected))
  744. return 0;
  745. if (likely(!aclk_queue.count))
  746. return 0;
  747. debug(D_ACLK, "Processing %d queries", (int)aclk_queue.count);
  748. //TODO: may consider possible throttling here
  749. while (aclk_process_query()) {
  750. // Process all commands
  751. };
  752. return 1;
  753. }
  754. static void aclk_query_thread_cleanup(void *ptr)
  755. {
  756. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  757. info("cleaning up...");
  758. _reset_collector_list();
  759. freez(collector_list);
  760. // Clean memory for pending queries if any
  761. struct aclk_query *this_query;
  762. do {
  763. this_query = aclk_queue_pop();
  764. aclk_query_free(this_query);
  765. } while (this_query);
  766. freez(static_thread->thread);
  767. freez(static_thread);
  768. }
  769. /**
  770. * Main query processing thread
  771. *
  772. * On startup wait for the agent collectors to initialize
  773. * Expect at least a time of ACLK_STABLE_TIMEOUT seconds
  774. * of no new collectors coming in in order to mark the agent
  775. * as stable (set agent_state = AGENT_STABLE)
  776. */
  777. void *aclk_query_main_thread(void *ptr)
  778. {
  779. netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
  780. while (agent_state == AGENT_INITIALIZING && !netdata_exit) {
  781. time_t checkpoint;
  782. checkpoint = now_realtime_sec() - last_init_sequence;
  783. if (checkpoint > ACLK_STABLE_TIMEOUT) {
  784. agent_state = AGENT_STABLE;
  785. info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
  786. #ifdef ACLK_DEBUG
  787. _dump_collector_list();
  788. #endif
  789. break;
  790. }
  791. info("Waiting for agent collectors to initialize. Last activity was %ld seconds ago" , checkpoint);
  792. sleep_usec(USEC_PER_SEC * 1);
  793. }
  794. while (!netdata_exit) {
  795. if (unlikely(!aclk_metadata_submitted)) {
  796. aclk_metadata_submitted = ACLK_METADATA_CMD_QUEUED;
  797. if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
  798. errno = 0;
  799. error("ACLK failed to queue on_connect command");
  800. aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  801. }
  802. }
  803. aclk_process_queries();
  804. QUERY_THREAD_LOCK;
  805. // TODO: Need to check if there are queries awaiting already
  806. if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
  807. sleep_usec(USEC_PER_SEC * 1);
  808. QUERY_THREAD_UNLOCK;
  809. } // forever
  810. info("Shutting down query processing thread");
  811. netdata_thread_cleanup_pop(1);
  812. return NULL;
  813. }
  814. // Thread cleanup
  815. static void aclk_main_cleanup(void *ptr)
  816. {
  817. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  818. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  819. info("cleaning up...");
  820. if (is_agent_claimed() && aclk_connected) {
  821. size_t write_q, write_q_bytes, read_q;
  822. time_t event_loop_timeout;
  823. // Wakeup thread to cleanup
  824. QUERY_THREAD_WAKEUP;
  825. // Send a graceful disconnect message
  826. BUFFER *b = buffer_create(512);
  827. aclk_create_header(b, "disconnect", NULL, 0, 0);
  828. buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
  829. aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
  830. buffer_free(b);
  831. event_loop_timeout = now_realtime_sec() + 5;
  832. write_q = 1;
  833. while (write_q && event_loop_timeout > now_realtime_sec()) {
  834. _link_event_loop();
  835. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  836. }
  837. aclk_shutting_down = 1;
  838. _link_shutdown();
  839. aclk_lws_wss_mqtt_layer_disconect_notif();
  840. write_q = 1;
  841. event_loop_timeout = now_realtime_sec() + 5;
  842. while (write_q && event_loop_timeout > now_realtime_sec()) {
  843. _link_event_loop();
  844. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  845. }
  846. }
  847. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  848. }
  849. struct dictionary_singleton {
  850. char *key;
  851. char *result;
  852. };
  853. int json_extract_singleton(JSON_ENTRY *e)
  854. {
  855. struct dictionary_singleton *data = e->callback_data;
  856. switch (e->type) {
  857. case JSON_OBJECT:
  858. case JSON_ARRAY:
  859. break;
  860. case JSON_STRING:
  861. if (!strcmp(e->name, data->key)) {
  862. data->result = strdupz(e->data.string);
  863. break;
  864. }
  865. break;
  866. case JSON_NUMBER:
  867. case JSON_BOOLEAN:
  868. case JSON_NULL:
  869. break;
  870. }
  871. return 0;
  872. }
  873. // Base-64 decoder.
  874. // Note: This is non-validating, invalid input will be decoded without an error.
  875. // Challenges are packed into json strings so we don't skip newlines.
  876. // Size errors (i.e. invalid input size or insufficient output space) are caught.
  877. size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
  878. {
  879. static char lookup[256];
  880. static int first_time=1;
  881. if (first_time)
  882. {
  883. first_time = 0;
  884. for(int i=0; i<256; i++)
  885. lookup[i] = -1;
  886. for(int i='A'; i<='Z'; i++)
  887. lookup[i] = i-'A';
  888. for(int i='a'; i<='z'; i++)
  889. lookup[i] = i-'a' + 26;
  890. for(int i='0'; i<='9'; i++)
  891. lookup[i] = i-'0' + 52;
  892. lookup['+'] = 62;
  893. lookup['/'] = 63;
  894. }
  895. if ((input_size & 3) != 0)
  896. {
  897. error("Can't decode base-64 input length %zu", input_size);
  898. return 0;
  899. }
  900. size_t unpadded_size = (input_size/4) * 3;
  901. if ( unpadded_size > output_size )
  902. {
  903. error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
  904. return 0;
  905. }
  906. // Don't check padding within full quantums
  907. for (size_t i = 0 ; i < input_size-4 ; i+=4 )
  908. {
  909. uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
  910. output[0] = value >> 16;
  911. output[1] = value >> 8;
  912. output[2] = value;
  913. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  914. output += 3;
  915. input += 4;
  916. }
  917. // Handle padding only in last quantum
  918. if (input[2] == '=') {
  919. uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
  920. output[0] = value >> 4;
  921. //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
  922. return unpadded_size-2;
  923. }
  924. else if (input[3] == '=') {
  925. uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
  926. output[0] = value >> 10;
  927. output[1] = value >> 2;
  928. //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
  929. return unpadded_size-1;
  930. }
  931. else
  932. {
  933. uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
  934. output[0] = value >> 16;
  935. output[1] = value >> 8;
  936. output[2] = value;
  937. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  938. return unpadded_size;
  939. }
  940. }
  941. size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
  942. {
  943. uint32_t value;
  944. static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
  945. "abcdefghijklmnopqrstuvwxyz"
  946. "0123456789+/";
  947. if ((input_size/3+1)*4 >= output_size)
  948. {
  949. error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
  950. return 0;
  951. }
  952. size_t count = 0;
  953. while (input_size>3)
  954. {
  955. value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
  956. output[0] = lookup[value >> 18];
  957. output[1] = lookup[(value >> 12) & 0x3f];
  958. output[2] = lookup[(value >> 6) & 0x3f];
  959. output[3] = lookup[value & 0x3f];
  960. //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  961. output += 4;
  962. input += 3;
  963. input_size -= 3;
  964. count += 4;
  965. }
  966. switch (input_size)
  967. {
  968. case 2:
  969. value = (input[0] << 10) + (input[1] << 2);
  970. output[0] = lookup[(value >> 12) & 0x3f];
  971. output[1] = lookup[(value >> 6) & 0x3f];
  972. output[2] = lookup[value & 0x3f];
  973. output[3] = '=';
  974. //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
  975. count += 4;
  976. break;
  977. case 1:
  978. value = input[0] << 4;
  979. output[0] = lookup[(value >> 6) & 0x3f];
  980. output[1] = lookup[value & 0x3f];
  981. output[2] = '=';
  982. output[3] = '=';
  983. //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  984. count += 4;
  985. break;
  986. case 0:
  987. break;
  988. }
  989. return count;
  990. }
  991. int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted)
  992. {
  993. int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING);
  994. if (result == -1) {
  995. char err[512];
  996. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  997. error("Decryption of the challenge failed: %s", err);
  998. }
  999. return result;
  1000. }
  1001. char *extract_payload(BUFFER *b)
  1002. {
  1003. char *s = b->buffer;
  1004. unsigned int line_len=0;
  1005. for (size_t i=0; i<b->len; i++)
  1006. {
  1007. if (*s == 0 )
  1008. return NULL;
  1009. if (*s == '\n' ) {
  1010. if (line_len==0)
  1011. return s+1;
  1012. line_len = 0;
  1013. }
  1014. else if (*s == '\r') {
  1015. /* don't count */
  1016. }
  1017. else
  1018. line_len ++;
  1019. s++;
  1020. }
  1021. return NULL;
  1022. }
  1023. void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
  1024. {
  1025. char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1026. debug(D_ACLK, "Performing challenge-response sequence");
  1027. if (aclk_password != NULL)
  1028. {
  1029. freez(aclk_password);
  1030. aclk_password = NULL;
  1031. }
  1032. // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
  1033. // TODO - target host?
  1034. char *agent_id = is_agent_claimed();
  1035. if (agent_id == NULL)
  1036. {
  1037. error("Agent was not claimed - cannot perform challenge/response");
  1038. goto CLEANUP;
  1039. }
  1040. char url[1024];
  1041. sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
  1042. info("Retrieving challenge from cloud: %s %s %s", aclk_hostname, aclk_port, url);
  1043. if(aclk_send_https_request("GET", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
  1044. {
  1045. error("Challenge failed: %s", data_buffer);
  1046. goto CLEANUP;
  1047. }
  1048. struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
  1049. debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
  1050. if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
  1051. {
  1052. freez(challenge.result);
  1053. error("Could not parse the json response with the challenge: %s", data_buffer);
  1054. goto CLEANUP;
  1055. }
  1056. if (challenge.result == NULL ) {
  1057. error("Could not retrieve challenge from auth response: %s", data_buffer);
  1058. goto CLEANUP;
  1059. }
  1060. size_t challenge_len = strlen(challenge.result);
  1061. unsigned char decoded[512];
  1062. size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
  1063. unsigned char plaintext[4096]={};
  1064. int decrypted_length = private_decrypt(decoded, decoded_len, plaintext);
  1065. freez(challenge.result);
  1066. char encoded[512];
  1067. size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
  1068. encoded[encoded_len] = 0;
  1069. debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
  1070. char response_json[4096]={};
  1071. sprintf(response_json, "{\"response\":\"%s\"}", encoded);
  1072. debug(D_ACLK, "Password phase: %s",response_json);
  1073. // TODO - host
  1074. sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
  1075. if(aclk_send_https_request("POST", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
  1076. {
  1077. error("Challenge-response failed: %s", data_buffer);
  1078. goto CLEANUP;
  1079. }
  1080. debug(D_ACLK, "Password response from cloud: %s", data_buffer);
  1081. struct dictionary_singleton password = { .key = "password", .result = NULL };
  1082. if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
  1083. {
  1084. freez(password.result);
  1085. error("Could not parse the json response with the password: %s", data_buffer);
  1086. goto CLEANUP;
  1087. }
  1088. if (password.result == NULL ) {
  1089. error("Could not retrieve password from auth response");
  1090. goto CLEANUP;
  1091. }
  1092. if (aclk_password != NULL )
  1093. freez(aclk_password);
  1094. if (aclk_username == NULL)
  1095. aclk_username = strdupz(agent_id);
  1096. aclk_password = password.result;
  1097. CLEANUP:
  1098. freez(data_buffer);
  1099. return;
  1100. }
  1101. static void aclk_try_to_connect(char *hostname, char *port, int port_num)
  1102. {
  1103. info("Attempting to establish the agent cloud link");
  1104. aclk_get_challenge(hostname, port);
  1105. if (aclk_password == NULL)
  1106. return;
  1107. int rc;
  1108. aclk_connecting = 1;
  1109. rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password);
  1110. if (unlikely(rc)) {
  1111. error("Failed to initialize the agent cloud link library");
  1112. }
  1113. }
  1114. /**
  1115. * Main agent cloud link thread
  1116. *
  1117. * This thread will simply call the main event loop that handles
  1118. * pending requests - both inbound and outbound
  1119. *
  1120. * @param ptr is a pointer to the netdata_static_thread structure.
  1121. *
  1122. * @return It always returns NULL
  1123. */
  1124. void *aclk_main(void *ptr)
  1125. {
  1126. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  1127. struct netdata_static_thread *query_thread;
  1128. // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
  1129. // as it must notify the far end that it shutdown gracefully and avoid the LWT.
  1130. netdata_thread_disable_cancelability();
  1131. #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK)
  1132. info("Killing ACLK thread -> cloud functionality has been disabled");
  1133. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1134. return NULL;
  1135. #endif
  1136. info("Waiting for netdata to be ready");
  1137. while (!netdata_ready) {
  1138. sleep_usec(USEC_PER_MS * 300);
  1139. }
  1140. info("Waiting for Cloud to be enabled");
  1141. while (!netdata_cloud_setting) {
  1142. sleep_usec(USEC_PER_SEC * 1);
  1143. if (netdata_exit) {
  1144. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1145. return NULL;
  1146. }
  1147. }
  1148. last_init_sequence = now_realtime_sec();
  1149. query_thread = NULL;
  1150. char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
  1151. char *aclk_port = NULL;
  1152. uint32_t port_num = 0;
  1153. info("Waiting for netdata to be claimed");
  1154. while(1) {
  1155. while (likely(!is_agent_claimed())) {
  1156. sleep_usec(USEC_PER_SEC * 1);
  1157. if (netdata_exit)
  1158. goto exited;
  1159. }
  1160. // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
  1161. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
  1162. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  1163. if (cloud_base_url == NULL) {
  1164. error("Do not move the cloud base url out of post_conf_load!!");
  1165. goto exited;
  1166. }
  1167. if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
  1168. error("Agent is claimed but the configuration is invalid, please fix");
  1169. }
  1170. else
  1171. {
  1172. port_num = atoi(aclk_port); // SSL library uses the string, MQTT uses the numeric value
  1173. if (!create_private_key() && !_mqtt_lib_init())
  1174. break;
  1175. }
  1176. for (int i=0; i<60; i++) {
  1177. if (netdata_exit)
  1178. goto exited;
  1179. sleep_usec(USEC_PER_SEC * 1);
  1180. }
  1181. }
  1182. create_publish_base_topic();
  1183. usec_t reconnect_expiry = 0; // In usecs
  1184. while (!netdata_exit) {
  1185. static int first_init = 0;
  1186. size_t write_q, write_q_bytes, read_q;
  1187. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  1188. if (aclk_force_reconnect) {
  1189. aclk_lws_wss_destroy_context();
  1190. aclk_force_reconnect = 0;
  1191. }
  1192. if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) {
  1193. if (unlikely(!first_init)) {
  1194. aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
  1195. first_init = 1;
  1196. } else {
  1197. if (aclk_connecting == 0) {
  1198. if (reconnect_expiry == 0) {
  1199. unsigned long int delay = aclk_reconnect_delay(1);
  1200. reconnect_expiry = now_realtime_usec() + delay * 1000;
  1201. info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
  1202. }
  1203. if (now_realtime_usec() >= reconnect_expiry) {
  1204. reconnect_expiry = 0;
  1205. aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
  1206. }
  1207. sleep_usec(USEC_PER_MS * 100);
  1208. }
  1209. }
  1210. if (aclk_connecting) {
  1211. _link_event_loop();
  1212. sleep_usec(USEC_PER_MS * 100);
  1213. }
  1214. continue;
  1215. }
  1216. _link_event_loop();
  1217. if (unlikely(!aclk_connected || aclk_force_reconnect))
  1218. continue;
  1219. /*static int stress_counter = 0;
  1220. if (write_q_bytes==0 && stress_counter ++ >5)
  1221. {
  1222. aclk_send_stress_test(8000000);
  1223. stress_counter = 0;
  1224. }*/
  1225. // TODO: Move to on-connect
  1226. if (unlikely(!aclk_subscribed)) {
  1227. aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
  1228. }
  1229. if (unlikely(!query_thread)) {
  1230. query_thread = callocz(1, sizeof(struct netdata_static_thread));
  1231. query_thread->thread = mallocz(sizeof(netdata_thread_t));
  1232. netdata_thread_create(
  1233. query_thread->thread, ACLK_THREAD_NAME, NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread,
  1234. query_thread);
  1235. }
  1236. } // forever
  1237. exited:
  1238. // Wakeup query thread to cleanup
  1239. QUERY_THREAD_WAKEUP;
  1240. freez(aclk_username);
  1241. freez(aclk_password);
  1242. freez(aclk_hostname);
  1243. freez(aclk_port);
  1244. if (aclk_private_key != NULL)
  1245. RSA_free(aclk_private_key);
  1246. aclk_main_cleanup(ptr);
  1247. return NULL;
  1248. }
  1249. /*
  1250. * Send a message to the cloud, using a base topic and sib_topic
  1251. * The final topic will be in the form <base_topic>/<sub_topic>
  1252. * If base_topic is missing then the global_base_topic will be used (if available)
  1253. *
  1254. */
  1255. int aclk_send_message(char *sub_topic, char *message, char *msg_id)
  1256. {
  1257. int rc;
  1258. int mid;
  1259. char topic[ACLK_MAX_TOPIC + 1];
  1260. char *final_topic;
  1261. UNUSED(msg_id);
  1262. if (!aclk_connected)
  1263. return 0;
  1264. if (unlikely(!message))
  1265. return 0;
  1266. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  1267. if (unlikely(!final_topic)) {
  1268. errno = 0;
  1269. error("Unable to build outgoing topic; truncated?");
  1270. return 1;
  1271. }
  1272. ACLK_LOCK;
  1273. rc = _link_send_message(final_topic, (unsigned char *)message, &mid);
  1274. // TODO: link the msg_id with the mid so we can trace it
  1275. ACLK_UNLOCK;
  1276. if (unlikely(rc)) {
  1277. errno = 0;
  1278. error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
  1279. }
  1280. return rc;
  1281. }
  1282. /*
  1283. * Subscribe to a topic in the cloud
  1284. * The final subscription will be in the form
  1285. * /agent/claim_id/<sub_topic>
  1286. */
  1287. int aclk_subscribe(char *sub_topic, int qos)
  1288. {
  1289. int rc;
  1290. char topic[ACLK_MAX_TOPIC + 1];
  1291. char *final_topic;
  1292. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  1293. if (unlikely(!final_topic)) {
  1294. errno = 0;
  1295. error("Unable to build outgoing topic; truncated?");
  1296. return 1;
  1297. }
  1298. if (!aclk_connected) {
  1299. error("Cannot subscribe to %s - not connected!", topic);
  1300. return 1;
  1301. }
  1302. ACLK_LOCK;
  1303. rc = _link_subscribe(final_topic, qos);
  1304. ACLK_UNLOCK;
  1305. // TODO: Add better handling -- error will flood the logfile here
  1306. if (unlikely(rc)) {
  1307. errno = 0;
  1308. error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
  1309. }
  1310. return rc;
  1311. }
  1312. // This is called from a callback when the link goes up
  1313. void aclk_connect()
  1314. {
  1315. info("Connection detected (%"PRIu64" queued queries)", aclk_queue.count);
  1316. aclk_connected = 1;
  1317. waiting_init = 0;
  1318. aclk_reconnect_delay(0);
  1319. QUERY_THREAD_WAKEUP;
  1320. return;
  1321. }
  1322. // This is called from a callback when the link goes down
  1323. void aclk_disconnect()
  1324. {
  1325. if (likely(aclk_connected))
  1326. info("Disconnect detected (%"PRIu64" queued queries)", aclk_queue.count);
  1327. aclk_subscribed = 0;
  1328. aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  1329. waiting_init = 1;
  1330. aclk_connected = 0;
  1331. aclk_connecting = 0;
  1332. aclk_force_reconnect = 1;
  1333. }
  1334. void aclk_shutdown()
  1335. {
  1336. info("Shutdown initiated");
  1337. aclk_connected = 0;
  1338. _link_shutdown();
  1339. info("Shutdown complete");
  1340. }
  1341. inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us)
  1342. {
  1343. uuid_t uuid;
  1344. char uuid_str[36 + 1];
  1345. if (unlikely(!msg_id)) {
  1346. uuid_generate(uuid);
  1347. uuid_unparse(uuid, uuid_str);
  1348. msg_id = uuid_str;
  1349. }
  1350. if (ts_secs == 0) {
  1351. ts_us = now_realtime_usec();
  1352. ts_secs = ts_us / USEC_PER_SEC;
  1353. ts_us = ts_us % USEC_PER_SEC;
  1354. }
  1355. buffer_sprintf(
  1356. dest,
  1357. "\t{\"type\": \"%s\",\n"
  1358. "\t\"msg-id\": \"%s\",\n"
  1359. "\t\"timestamp\": %ld,\n"
  1360. "\t\"timestamp-offset-usec\": %llu,\n"
  1361. "\t\"connect\": %ld,\n"
  1362. "\t\"connect-offset-usec\": %llu,\n"
  1363. "\t\"version\": %d",
  1364. type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION);
  1365. debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs);
  1366. }
  1367. /*
  1368. * This will send alarm information which includes
  1369. * configured alarms
  1370. * alarm_log
  1371. * active alarms
  1372. */
  1373. void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
  1374. void aclk_send_alarm_metadata()
  1375. {
  1376. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1377. char *msg_id = create_uuid();
  1378. buffer_flush(local_buffer);
  1379. local_buffer->contenttype = CT_APPLICATION_JSON;
  1380. debug(D_ACLK, "Metadata alarms start");
  1381. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1382. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1383. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1384. // session.
  1385. if (aclk_metadata_submitted == ACLK_METADATA_SENT)
  1386. aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0);
  1387. else
  1388. aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us);
  1389. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1390. buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
  1391. health_alarms2json(localhost, local_buffer, 1);
  1392. debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);
  1393. // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
  1394. // health_alarm_log2json(localhost, local_buffer, 0);
  1395. // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
  1396. buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
  1397. health_active_log_alarms_2json(localhost, local_buffer);
  1398. //debug(D_ACLK, "Metadata message %s", local_buffer->buffer);
  1399. buffer_sprintf(local_buffer, "\n}\n}");
  1400. aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
  1401. freez(msg_id);
  1402. buffer_free(local_buffer);
  1403. }
  1404. /*
  1405. * This will send the agent metadata
  1406. * /api/v1/info
  1407. * charts
  1408. */
  1409. int aclk_send_info_metadata()
  1410. {
  1411. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1412. debug(D_ACLK, "Metadata /info start");
  1413. char *msg_id = create_uuid();
  1414. buffer_flush(local_buffer);
  1415. local_buffer->contenttype = CT_APPLICATION_JSON;
  1416. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1417. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1418. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1419. // session.
  1420. if (aclk_metadata_submitted == ACLK_METADATA_SENT)
  1421. aclk_create_header(local_buffer, "update", msg_id, 0, 0);
  1422. else
  1423. aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us);
  1424. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1425. buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
  1426. web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
  1427. debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
  1428. buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
  1429. charts2json(localhost, local_buffer, 1);
  1430. buffer_sprintf(local_buffer, "\n}\n}");
  1431. debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
  1432. aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
  1433. freez(msg_id);
  1434. buffer_free(local_buffer);
  1435. return 0;
  1436. }
  1437. void aclk_send_stress_test(size_t size)
  1438. {
  1439. char *buffer = mallocz(size);
  1440. if (buffer != NULL)
  1441. {
  1442. for(size_t i=0; i<size; i++)
  1443. buffer[i] = 'x';
  1444. buffer[size-1] = 0;
  1445. time_t time_created = now_realtime_sec();
  1446. sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
  1447. buffer[strlen(buffer)] = '"';
  1448. buffer[size-2] = '}';
  1449. buffer[size-3] = '"';
  1450. aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
  1451. error("Sending stress of size %zu at time %ld", size, time_created);
  1452. }
  1453. free(buffer);
  1454. }
  1455. // Send info metadata message to the cloud if the link is established
  1456. // or on request
  1457. int aclk_send_metadata()
  1458. {
  1459. aclk_send_info_metadata();
  1460. aclk_send_alarm_metadata();
  1461. return 0;
  1462. }
  1463. void aclk_single_update_disable()
  1464. {
  1465. aclk_disable_single_updates = 1;
  1466. }
  1467. void aclk_single_update_enable()
  1468. {
  1469. aclk_disable_single_updates = 0;
  1470. }
  1471. // Trigged by a health reload, sends the alarm metadata
  1472. void aclk_alarm_reload()
  1473. {
  1474. if (unlikely(agent_state == AGENT_INITIALIZING))
  1475. return;
  1476. if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
  1477. if (likely(aclk_connected)) {
  1478. errno = 0;
  1479. error("ACLK failed to queue on_connect command on alarm reload");
  1480. }
  1481. }
  1482. }
  1483. //rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
  1484. int aclk_send_single_chart(char *hostname, char *chart)
  1485. {
  1486. RRDHOST *target_host;
  1487. target_host = rrdhost_find_by_hostname(hostname, 0);
  1488. if (!target_host)
  1489. return 1;
  1490. RRDSET *st = rrdset_find(target_host, chart);
  1491. if (!st)
  1492. st = rrdset_find_byname(target_host, chart);
  1493. if (!st) {
  1494. info("FAILED to find chart %s", chart);
  1495. return 1;
  1496. }
  1497. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1498. char *msg_id = create_uuid();
  1499. buffer_flush(local_buffer);
  1500. local_buffer->contenttype = CT_APPLICATION_JSON;
  1501. aclk_create_header(local_buffer, "chart", msg_id, 0, 0);
  1502. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1503. rrdset2json(st, local_buffer, NULL, NULL, 1);
  1504. buffer_sprintf(local_buffer, "\t\n}");
  1505. aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);
  1506. freez(msg_id);
  1507. buffer_free(local_buffer);
  1508. return 0;
  1509. }
  1510. int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
  1511. {
  1512. #ifndef ENABLE_ACLK
  1513. UNUSED(host);
  1514. UNUSED(chart_name);
  1515. return 0;
  1516. #else
  1517. if (!netdata_cloud_setting)
  1518. return 0;
  1519. if (host != localhost)
  1520. return 0;
  1521. if (unlikely(aclk_disable_single_updates))
  1522. return 0;
  1523. if (unlikely(agent_state == AGENT_INITIALIZING))
  1524. last_init_sequence = now_realtime_sec();
  1525. else {
  1526. if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) {
  1527. if (likely(aclk_connected)) {
  1528. errno = 0;
  1529. error("ACLK failed to queue chart_update command");
  1530. }
  1531. }
  1532. }
  1533. return 0;
  1534. #endif
  1535. }
  1536. int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
  1537. {
  1538. BUFFER *local_buffer = NULL;
  1539. if (host != localhost)
  1540. return 0;
  1541. if (unlikely(agent_state == AGENT_INITIALIZING))
  1542. return 0;
  1543. /*
  1544. * Check if individual updates have been disabled
  1545. * This will be the case when we do health reload
  1546. * and all the alarms will be dropped and recreated.
  1547. * At the end of the health reload the complete alarm metadata
  1548. * info will be sent
  1549. */
  1550. if (unlikely(aclk_disable_single_updates))
  1551. return 0;
  1552. local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1553. char *msg_id = create_uuid();
  1554. buffer_flush(local_buffer);
  1555. aclk_create_header(local_buffer, "status-change", msg_id, 0, 0);
  1556. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1557. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  1558. health_alarm_entry2json_nolock(local_buffer, ae, host);
  1559. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  1560. buffer_sprintf(local_buffer, "\n}");
  1561. if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
  1562. if (likely(aclk_connected)) {
  1563. errno = 0;
  1564. error("ACLK failed to queue alarm_command on alarm_update");
  1565. }
  1566. }
  1567. freez(msg_id);
  1568. buffer_free(local_buffer);
  1569. return 0;
  1570. }
  1571. /*
  1572. * Parse the incoming payload and queue a command if valid
  1573. */
  1574. int aclk_handle_cloud_request(char *payload)
  1575. {
  1576. struct aclk_request cloud_to_agent = {
  1577. .type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0
  1578. };
  1579. if (unlikely(agent_state == AGENT_INITIALIZING)) {
  1580. debug(D_ACLK, "Ignoring cloud request; agent not in stable state");
  1581. return 0;
  1582. }
  1583. if (unlikely(!payload)) {
  1584. debug(D_ACLK, "ACLK incoming message is empty");
  1585. return 0;
  1586. }
  1587. debug(D_ACLK, "ACLK incoming message (%s)", payload);
  1588. int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
  1589. if (unlikely(
  1590. JSON_OK != rc || !cloud_to_agent.payload || !cloud_to_agent.callback_topic || !cloud_to_agent.msg_id ||
  1591. !cloud_to_agent.type_id || cloud_to_agent.version > ACLK_VERSION ||
  1592. strcmp(cloud_to_agent.type_id, "http"))) {
  1593. if (JSON_OK != rc)
  1594. error("Malformed json request (%s)", payload);
  1595. if (cloud_to_agent.version > ACLK_VERSION)
  1596. error("Unsupported version in JSON request %d", cloud_to_agent.version);
  1597. if (cloud_to_agent.payload)
  1598. freez(cloud_to_agent.payload);
  1599. if (cloud_to_agent.type_id)
  1600. freez(cloud_to_agent.type_id);
  1601. if (cloud_to_agent.msg_id)
  1602. freez(cloud_to_agent.msg_id);
  1603. if (cloud_to_agent.callback_topic)
  1604. freez(cloud_to_agent.callback_topic);
  1605. return 1;
  1606. }
  1607. // Checked to be "http", not needed anymore
  1608. if (likely(cloud_to_agent.type_id)) {
  1609. freez(cloud_to_agent.type_id);
  1610. cloud_to_agent.type_id = NULL;
  1611. }
  1612. if (unlikely(aclk_submit_request(&cloud_to_agent)))
  1613. debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);
  1614. // Note: the payload comes from the callback and it will be automatically freed
  1615. return 0;
  1616. }