agent_cloud_link.c 59 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "libnetdata/libnetdata.h"
  3. #include "agent_cloud_link.h"
  4. #include "aclk_lws_https_client.h"
  5. #include "aclk_common.h"
  6. int aclk_shutting_down = 0;
  7. // State-machine for the on-connect metadata transmission.
  8. // TODO: The AGENT_STATE should be centralized as it would be useful to control error-logging during the initial
  9. // agent startup phase.
  10. static ACLK_METADATA_STATE aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  11. static AGENT_STATE agent_state = AGENT_INITIALIZING;
  12. // Other global state
  13. static int aclk_subscribed = 0;
  14. static int aclk_disable_single_updates = 0;
  15. static time_t last_init_sequence = 0;
  16. static int waiting_init = 1;
  17. static char *aclk_username = NULL;
  18. static char *aclk_password = NULL;
  19. static char *global_base_topic = NULL;
  20. static int aclk_connecting = 0;
  21. int aclk_connected = 0; // Exposed in the web-api
  22. int aclk_force_reconnect = 0; // Indication from lower layers
  23. int aclk_kill_link = 0; // Tell the agent to tear down the link
  24. usec_t aclk_session_us = 0; // Used by the mqtt layer
  25. time_t aclk_session_sec = 0; // Used by the mqtt layer
  26. static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
  27. static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
  28. static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
  29. #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
  30. #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
  31. #define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
  32. #define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
  33. #define QUERY_LOCK netdata_mutex_lock(&query_mutex)
  34. #define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
  35. pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
  36. pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
  37. #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
  38. #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
  39. #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
  40. void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
  41. void aclk_lws_wss_destroy_context();
  42. /*
  43. * Maintain a list of collectors and chart count
  44. * If all the charts of a collector are deleted
  45. * then a new metadata dataset must be send to the cloud
  46. *
  47. */
  48. struct _collector {
  49. time_t created;
  50. uint32_t count; //chart count
  51. uint32_t hostname_hash;
  52. uint32_t plugin_hash;
  53. uint32_t module_hash;
  54. char *hostname;
  55. char *plugin_name;
  56. char *module_name;
  57. struct _collector *next;
  58. };
  59. struct _collector *collector_list = NULL;
  60. struct aclk_query {
  61. time_t created;
  62. time_t run_after; // Delay run until after this time
  63. ACLK_CMD cmd; // What command is this
  64. char *topic; // Topic to respond to
  65. char *data; // Internal data (NULL if request from the cloud)
  66. char *msg_id; // msg_id generated by the cloud (NULL if internal)
  67. char *query; // The actual query
  68. u_char deleted; // Mark deleted for garbage collect
  69. struct aclk_query *next;
  70. };
  71. struct aclk_query_queue {
  72. struct aclk_query *aclk_query_head;
  73. struct aclk_query *aclk_query_tail;
  74. uint64_t count;
  75. } aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
  76. char *create_uuid()
  77. {
  78. uuid_t uuid;
  79. char *uuid_str = mallocz(36 + 1);
  80. uuid_generate(uuid);
  81. uuid_unparse(uuid, uuid_str);
  82. return uuid_str;
  83. }
  84. int cloud_to_agent_parse(JSON_ENTRY *e)
  85. {
  86. struct aclk_request *data = e->callback_data;
  87. switch (e->type) {
  88. case JSON_OBJECT:
  89. case JSON_ARRAY:
  90. break;
  91. case JSON_STRING:
  92. if (!strcmp(e->name, "msg-id")) {
  93. data->msg_id = strdupz(e->data.string);
  94. break;
  95. }
  96. if (!strcmp(e->name, "type")) {
  97. data->type_id = strdupz(e->data.string);
  98. break;
  99. }
  100. if (!strcmp(e->name, "callback-topic")) {
  101. data->callback_topic = strdupz(e->data.string);
  102. break;
  103. }
  104. if (!strcmp(e->name, "payload")) {
  105. if (likely(e->data.string)) {
  106. size_t len = strlen(e->data.string);
  107. data->payload = mallocz(len+1);
  108. if (!url_decode_r(data->payload, e->data.string, len + 1))
  109. strcpy(data->payload, e->data.string);
  110. }
  111. break;
  112. }
  113. break;
  114. case JSON_NUMBER:
  115. if (!strcmp(e->name, "version")) {
  116. data->version = atoi(e->original_string);
  117. break;
  118. }
  119. break;
  120. case JSON_BOOLEAN:
  121. break;
  122. case JSON_NULL:
  123. break;
  124. }
  125. return 0;
  126. }
  127. static RSA *aclk_private_key = NULL;
  128. static int create_private_key()
  129. {
  130. if (aclk_private_key != NULL)
  131. RSA_free(aclk_private_key);
  132. aclk_private_key = NULL;
  133. char filename[FILENAME_MAX + 1];
  134. snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
  135. long bytes_read;
  136. char *private_key = read_by_filename(filename, &bytes_read);
  137. if (!private_key) {
  138. error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  139. return 1;
  140. }
  141. debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  142. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  143. if (key_bio==NULL) {
  144. error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  145. goto biofailed;
  146. }
  147. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  148. BIO_free(key_bio);
  149. if (aclk_private_key!=NULL)
  150. {
  151. freez(private_key);
  152. return 0;
  153. }
  154. char err[512];
  155. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  156. error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  157. biofailed:
  158. freez(private_key);
  159. return 1;
  160. }
  161. /*
  162. * After a connection failure -- delay in milliseconds
  163. * When a connection is established, the delay function
  164. * should be called with
  165. *
  166. * mode 0 to reset the delay
  167. * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
  168. *
  169. */
  170. unsigned long int aclk_reconnect_delay(int mode)
  171. {
  172. static int fail = -1;
  173. unsigned long int delay;
  174. if (!mode || fail == -1) {
  175. srandom(time(NULL));
  176. fail = mode - 1;
  177. return 0;
  178. }
  179. delay = (1 << fail);
  180. if (delay >= ACLK_MAX_BACKOFF_DELAY) {
  181. delay = ACLK_MAX_BACKOFF_DELAY * 1000;
  182. } else {
  183. fail++;
  184. delay = (delay * 1000) + (random() % 1000);
  185. }
  186. return delay;
  187. }
  188. /*
  189. * Free a query structure when done
  190. */
  191. void aclk_query_free(struct aclk_query *this_query)
  192. {
  193. if (unlikely(!this_query))
  194. return;
  195. freez(this_query->topic);
  196. if (likely(this_query->query))
  197. freez(this_query->query);
  198. if (likely(this_query->data))
  199. freez(this_query->data);
  200. if (likely(this_query->msg_id))
  201. freez(this_query->msg_id);
  202. freez(this_query);
  203. }
  204. // Returns the entry after which we need to create a new entry to run at the specified time
  205. // If NULL is returned we need to add to HEAD
  206. // Need to have a QUERY lock before calling this
  207. struct aclk_query *aclk_query_find_position(time_t time_to_run)
  208. {
  209. struct aclk_query *tmp_query, *last_query;
  210. // Quick check if we will add to the end
  211. if (likely(aclk_queue.aclk_query_tail)) {
  212. if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
  213. return aclk_queue.aclk_query_tail;
  214. }
  215. last_query = NULL;
  216. tmp_query = aclk_queue.aclk_query_head;
  217. while (tmp_query) {
  218. if (tmp_query->run_after > time_to_run)
  219. return last_query;
  220. last_query = tmp_query;
  221. tmp_query = tmp_query->next;
  222. }
  223. return last_query;
  224. }
  225. // Need to have a QUERY lock before calling this
  226. struct aclk_query *
  227. aclk_query_find(char *topic, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
  228. {
  229. struct aclk_query *tmp_query, *prev_query;
  230. UNUSED(cmd);
  231. tmp_query = aclk_queue.aclk_query_head;
  232. prev_query = NULL;
  233. while (tmp_query) {
  234. if (likely(!tmp_query->deleted)) {
  235. if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
  236. if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
  237. (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
  238. if (likely(last_query))
  239. *last_query = prev_query;
  240. return tmp_query;
  241. }
  242. }
  243. }
  244. prev_query = tmp_query;
  245. tmp_query = tmp_query->next;
  246. }
  247. return NULL;
  248. }
  249. /*
  250. * Add a query to execute, the result will be send to the specified topic
  251. */
  252. int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
  253. {
  254. struct aclk_query *new_query, *tmp_query;
  255. // Ignore all commands while we wait for the agent to initialize
  256. if (unlikely(waiting_init))
  257. return 1;
  258. run_after = now_realtime_sec() + run_after;
  259. QUERY_LOCK;
  260. struct aclk_query *last_query = NULL;
  261. tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
  262. if (unlikely(tmp_query)) {
  263. if (tmp_query->run_after == run_after) {
  264. QUERY_UNLOCK;
  265. QUERY_THREAD_WAKEUP;
  266. return 0;
  267. }
  268. if (last_query)
  269. last_query->next = tmp_query->next;
  270. else
  271. aclk_queue.aclk_query_head = tmp_query->next;
  272. debug(D_ACLK, "Removing double entry");
  273. aclk_query_free(tmp_query);
  274. aclk_queue.count--;
  275. }
  276. new_query = callocz(1, sizeof(struct aclk_query));
  277. new_query->cmd = aclk_cmd;
  278. if (internal) {
  279. new_query->topic = strdupz(topic);
  280. if (likely(query))
  281. new_query->query = strdupz(query);
  282. } else {
  283. new_query->topic = topic;
  284. new_query->query = query;
  285. new_query->msg_id = msg_id;
  286. }
  287. if (data)
  288. new_query->data = strdupz(data);
  289. new_query->next = NULL;
  290. new_query->created = now_realtime_sec();
  291. new_query->run_after = run_after;
  292. debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
  293. tmp_query = aclk_query_find_position(run_after);
  294. if (tmp_query) {
  295. new_query->next = tmp_query->next;
  296. tmp_query->next = new_query;
  297. if (tmp_query == aclk_queue.aclk_query_tail)
  298. aclk_queue.aclk_query_tail = new_query;
  299. aclk_queue.count++;
  300. QUERY_UNLOCK;
  301. QUERY_THREAD_WAKEUP;
  302. return 0;
  303. }
  304. new_query->next = aclk_queue.aclk_query_head;
  305. aclk_queue.aclk_query_head = new_query;
  306. aclk_queue.count++;
  307. QUERY_UNLOCK;
  308. QUERY_THREAD_WAKEUP;
  309. return 0;
  310. }
  311. inline int aclk_submit_request(struct aclk_request *request)
  312. {
  313. return aclk_queue_query(request->callback_topic, NULL, request->msg_id, request->payload, 0, 0, ACLK_CMD_CLOUD);
  314. }
  315. /*
  316. * Get the next query to process - NULL if nothing there
  317. * The caller needs to free memory by calling aclk_query_free()
  318. *
  319. * topic
  320. * query
  321. * The structure itself
  322. *
  323. */
  324. struct aclk_query *aclk_queue_pop()
  325. {
  326. struct aclk_query *this_query;
  327. QUERY_LOCK;
  328. if (likely(!aclk_queue.aclk_query_head)) {
  329. QUERY_UNLOCK;
  330. return NULL;
  331. }
  332. this_query = aclk_queue.aclk_query_head;
  333. // Get rid of the deleted entries
  334. while (this_query && this_query->deleted) {
  335. aclk_queue.count--;
  336. aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
  337. if (likely(!aclk_queue.aclk_query_head)) {
  338. aclk_queue.aclk_query_tail = NULL;
  339. }
  340. aclk_query_free(this_query);
  341. this_query = aclk_queue.aclk_query_head;
  342. }
  343. if (likely(!this_query)) {
  344. QUERY_UNLOCK;
  345. return NULL;
  346. }
  347. if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
  348. info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
  349. QUERY_UNLOCK;
  350. return NULL;
  351. }
  352. aclk_queue.count--;
  353. aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
  354. if (likely(!aclk_queue.aclk_query_head)) {
  355. aclk_queue.aclk_query_tail = NULL;
  356. }
  357. QUERY_UNLOCK;
  358. return this_query;
  359. }
  360. // This will give the base topic that the agent will publish messages.
  361. // subtopics will be sent under the base topic e.g. base_topic/subtopic
  362. // This is called during the connection, we delete any previous topic
  363. // in-case the user has changed the agent id and reclaimed.
  364. char *create_publish_base_topic()
  365. {
  366. char *agent_id = is_agent_claimed();
  367. if (unlikely(!agent_id))
  368. return NULL;
  369. ACLK_LOCK;
  370. if (global_base_topic)
  371. freez(global_base_topic);
  372. char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
  373. snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id);
  374. tmp = strchr(tmp_topic, '\n');
  375. if (unlikely(tmp))
  376. *tmp = '\0';
  377. global_base_topic = strdupz(tmp_topic);
  378. ACLK_UNLOCK;
  379. freez(agent_id);
  380. return global_base_topic;
  381. }
  382. /*
  383. * Build a topic based on sub_topic and final_topic
  384. * if the sub topic starts with / assume that is an absolute topic
  385. *
  386. */
  387. char *get_topic(char *sub_topic, char *final_topic, int max_size)
  388. {
  389. int rc;
  390. if (likely(sub_topic && sub_topic[0] == '/'))
  391. return sub_topic;
  392. if (unlikely(!global_base_topic))
  393. return sub_topic;
  394. rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
  395. if (unlikely(rc >= max_size))
  396. debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);
  397. return final_topic;
  398. }
  399. /*
  400. * Free a collector structure
  401. */
  402. static void _free_collector(struct _collector *collector)
  403. {
  404. if (likely(collector->plugin_name))
  405. freez(collector->plugin_name);
  406. if (likely(collector->module_name))
  407. freez(collector->module_name);
  408. if (likely(collector->hostname))
  409. freez(collector->hostname);
  410. freez(collector);
  411. }
  412. /*
  413. * This will report the collector list
  414. *
  415. */
  416. #ifdef ACLK_DEBUG
  417. static void _dump_collector_list()
  418. {
  419. struct _collector *tmp_collector;
  420. COLLECTOR_LOCK;
  421. info("DUMPING ALL COLLECTORS");
  422. if (unlikely(!collector_list || !collector_list->next)) {
  423. COLLECTOR_UNLOCK;
  424. info("DUMPING ALL COLLECTORS -- nothing found");
  425. return;
  426. }
  427. // Note that the first entry is "dummy"
  428. tmp_collector = collector_list->next;
  429. while (tmp_collector) {
  430. info(
  431. "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
  432. tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
  433. tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
  434. tmp_collector = tmp_collector->next;
  435. }
  436. info("DUMPING ALL COLLECTORS DONE");
  437. COLLECTOR_UNLOCK;
  438. }
  439. #endif
  440. /*
  441. * This will cleanup the collector list
  442. *
  443. */
  444. static void _reset_collector_list()
  445. {
  446. struct _collector *tmp_collector, *next_collector;
  447. COLLECTOR_LOCK;
  448. if (unlikely(!collector_list || !collector_list->next)) {
  449. COLLECTOR_UNLOCK;
  450. return;
  451. }
  452. // Note that the first entry is "dummy"
  453. tmp_collector = collector_list->next;
  454. collector_list->count = 0;
  455. collector_list->next = NULL;
  456. // We broke the link; we can unlock
  457. COLLECTOR_UNLOCK;
  458. while (tmp_collector) {
  459. next_collector = tmp_collector->next;
  460. _free_collector(tmp_collector);
  461. tmp_collector = next_collector;
  462. }
  463. }
  464. /*
  465. * Find a collector (if it exists)
  466. * Must lock before calling this
  467. * If last_collector is not null, it will return the previous collector in the linked
  468. * list (used in collector delete)
  469. */
  470. static struct _collector *_find_collector(
  471. const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
  472. {
  473. struct _collector *tmp_collector, *prev_collector;
  474. uint32_t plugin_hash;
  475. uint32_t module_hash;
  476. uint32_t hostname_hash;
  477. if (unlikely(!collector_list)) {
  478. collector_list = callocz(1, sizeof(struct _collector));
  479. return NULL;
  480. }
  481. if (unlikely(!collector_list->next))
  482. return NULL;
  483. plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  484. module_hash = module_name ? simple_hash(module_name) : 1;
  485. hostname_hash = simple_hash(hostname);
  486. // Note that the first entry is "dummy"
  487. tmp_collector = collector_list->next;
  488. prev_collector = collector_list;
  489. while (tmp_collector) {
  490. if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
  491. hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
  492. (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
  493. (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
  494. if (unlikely(last_collector))
  495. *last_collector = prev_collector;
  496. return tmp_collector;
  497. }
  498. prev_collector = tmp_collector;
  499. tmp_collector = tmp_collector->next;
  500. }
  501. return tmp_collector;
  502. }
  503. /*
  504. * Called to delete a collector
  505. * It will reduce the count (chart_count) and will remove it
  506. * from the linked list if the count reaches zero
  507. * The structure will be returned to the caller to free
  508. * the resources
  509. *
  510. */
  511. static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
  512. {
  513. struct _collector *tmp_collector, *prev_collector = NULL;
  514. tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
  515. if (likely(tmp_collector)) {
  516. --tmp_collector->count;
  517. if (unlikely(!tmp_collector->count))
  518. prev_collector->next = tmp_collector->next;
  519. }
  520. return tmp_collector;
  521. }
  522. /*
  523. * Add a new collector (plugin / module) to the list
  524. * If it already exists just update the chart count
  525. *
  526. * Lock before calling
  527. */
  528. static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
  529. {
  530. struct _collector *tmp_collector;
  531. tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
  532. if (unlikely(!tmp_collector)) {
  533. tmp_collector = callocz(1, sizeof(struct _collector));
  534. tmp_collector->hostname_hash = simple_hash(hostname);
  535. tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  536. tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
  537. tmp_collector->hostname = strdupz(hostname);
  538. tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
  539. tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
  540. tmp_collector->next = collector_list->next;
  541. collector_list->next = tmp_collector;
  542. }
  543. tmp_collector->count++;
  544. debug(
  545. D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
  546. module_name ? module_name : "*", tmp_collector->count);
  547. return tmp_collector;
  548. }
  549. /*
  550. * Add a new collector to the list
  551. * If it exists, update the chart count
  552. */
  553. void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
  554. {
  555. struct _collector *tmp_collector;
  556. COLLECTOR_LOCK;
  557. tmp_collector = _add_collector(hostname, plugin_name, module_name);
  558. if (unlikely(tmp_collector->count != 1)) {
  559. COLLECTOR_UNLOCK;
  560. return;
  561. }
  562. if (unlikely(agent_state == AGENT_INITIALIZING))
  563. last_init_sequence = now_realtime_sec();
  564. else {
  565. if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  566. debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
  567. }
  568. COLLECTOR_UNLOCK;
  569. }
  570. /*
  571. * Delete a collector from the list
  572. * If the chart count reaches zero the collector will be removed
  573. * from the list by calling del_collector.
  574. *
  575. * This function will release the memory used and schedule
  576. * a cloud update
  577. */
  578. void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
  579. {
  580. struct _collector *tmp_collector;
  581. COLLECTOR_LOCK;
  582. tmp_collector = _del_collector(hostname, plugin_name, module_name);
  583. if (unlikely(!tmp_collector || tmp_collector->count)) {
  584. COLLECTOR_UNLOCK;
  585. return;
  586. }
  587. debug(
  588. D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
  589. tmp_collector->count);
  590. COLLECTOR_UNLOCK;
  591. if (unlikely(agent_state == AGENT_INITIALIZING))
  592. last_init_sequence = now_realtime_sec();
  593. else {
  594. if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  595. debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
  596. }
  597. _free_collector(tmp_collector);
  598. }
  599. /*
  600. * Take a buffer, encode it and rewrite it
  601. *
  602. */
  603. static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines)
  604. {
  605. char *tmp_buffer = mallocz(content_size * 2);
  606. char *dst = tmp_buffer;
  607. while (content_size > 0) {
  608. switch (*src) {
  609. case '\n':
  610. if (keep_newlines)
  611. {
  612. *dst++ = '\\';
  613. *dst++ = 'n';
  614. }
  615. break;
  616. case '\t':
  617. break;
  618. case 0x01 ... 0x08:
  619. case 0x0b ... 0x1F:
  620. *dst++ = '\\';
  621. *dst++ = 'u';
  622. *dst++ = '0';
  623. *dst++ = '0';
  624. *dst++ = (*src < 0x0F) ? '0' : '1';
  625. *dst++ = to_hex(*src);
  626. break;
  627. case '\"':
  628. *dst++ = '\\';
  629. *dst++ = *src;
  630. break;
  631. default:
  632. *dst++ = *src;
  633. }
  634. src++;
  635. content_size--;
  636. }
  637. *dst = '\0';
  638. return tmp_buffer;
  639. }
  640. int aclk_execute_query(struct aclk_query *this_query)
  641. {
  642. if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
  643. struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
  644. w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  645. w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  646. w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  647. strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
  648. w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
  649. w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
  650. w->acl = 0x1f;
  651. char *mysep = strchr(this_query->query, '?');
  652. if (mysep) {
  653. strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
  654. *mysep = '\0';
  655. } else
  656. strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
  657. mysep = strrchr(this_query->query, '/');
  658. // TODO: handle bad response perhaps in a different way. For now it does to the payload
  659. w->response.code = web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
  660. now_realtime_timeval(&w->tv_ready);
  661. w->response.data->date = w->tv_ready.tv_sec;
  662. web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
  663. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  664. buffer_flush(local_buffer);
  665. local_buffer->contenttype = CT_APPLICATION_JSON;
  666. aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0);
  667. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  668. char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
  669. char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
  670. buffer_sprintf(
  671. local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}",
  672. w->response.code, encoded_response, encoded_header);
  673. buffer_sprintf(local_buffer, "\n}");
  674. debug(D_ACLK, "Response:%s", encoded_header);
  675. aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
  676. buffer_free(w->response.data);
  677. buffer_free(w->response.header);
  678. buffer_free(w->response.header_output);
  679. freez(w);
  680. buffer_free(local_buffer);
  681. freez(encoded_response);
  682. freez(encoded_header);
  683. return 0;
  684. }
  685. return 1;
  686. }
  687. /*
  688. * This function will fetch the next pending command and process it
  689. *
  690. */
  691. int aclk_process_query()
  692. {
  693. struct aclk_query *this_query;
  694. static long int query_count = 0;
  695. if (!aclk_connected)
  696. return 0;
  697. this_query = aclk_queue_pop();
  698. if (likely(!this_query)) {
  699. return 0;
  700. }
  701. if (unlikely(this_query->deleted)) {
  702. debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
  703. aclk_query_free(this_query);
  704. return 1;
  705. }
  706. query_count++;
  707. debug(
  708. D_ACLK, "Query #%ld (%s) size=%zu in queue %d seconds", query_count, this_query->topic,
  709. this_query->query ? strlen(this_query->query) : 0, (int)(now_realtime_sec() - this_query->created));
  710. switch (this_query->cmd) {
  711. case ACLK_CMD_ONCONNECT:
  712. debug(D_ACLK, "EXECUTING on connect metadata command");
  713. aclk_send_metadata();
  714. aclk_metadata_submitted = ACLK_METADATA_SENT;
  715. break;
  716. case ACLK_CMD_CHART:
  717. debug(D_ACLK, "EXECUTING a chart update command");
  718. aclk_send_single_chart(this_query->data, this_query->query);
  719. break;
  720. case ACLK_CMD_CHARTDEL:
  721. debug(D_ACLK, "EXECUTING a chart delete command");
  722. //TODO: This send the info metadata for now
  723. aclk_send_info_metadata();
  724. break;
  725. case ACLK_CMD_ALARM:
  726. debug(D_ACLK, "EXECUTING an alarm update command");
  727. aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
  728. break;
  729. case ACLK_CMD_CLOUD:
  730. debug(D_ACLK, "EXECUTING a cloud command");
  731. aclk_execute_query(this_query);
  732. break;
  733. default:
  734. break;
  735. }
  736. debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
  737. aclk_query_free(this_query);
  738. return 1;
  739. }
  740. /*
  741. * Process all pending queries
  742. * Return 0 if no queries were processed, 1 otherwise
  743. *
  744. */
  745. int aclk_process_queries()
  746. {
  747. if (unlikely(netdata_exit || !aclk_connected))
  748. return 0;
  749. if (likely(!aclk_queue.count))
  750. return 0;
  751. debug(D_ACLK, "Processing %d queries", (int)aclk_queue.count);
  752. //TODO: may consider possible throttling here
  753. while (aclk_process_query()) {
  754. // Process all commands
  755. };
  756. return 1;
  757. }
  758. static void aclk_query_thread_cleanup(void *ptr)
  759. {
  760. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  761. info("cleaning up...");
  762. _reset_collector_list();
  763. freez(collector_list);
  764. // Clean memory for pending queries if any
  765. struct aclk_query *this_query;
  766. do {
  767. this_query = aclk_queue_pop();
  768. aclk_query_free(this_query);
  769. } while (this_query);
  770. freez(static_thread->thread);
  771. freez(static_thread);
  772. }
  773. /**
  774. * Main query processing thread
  775. *
  776. * On startup wait for the agent collectors to initialize
  777. * Expect at least a time of ACLK_STABLE_TIMEOUT seconds
  778. * of no new collectors coming in in order to mark the agent
  779. * as stable (set agent_state = AGENT_STABLE)
  780. */
  781. void *aclk_query_main_thread(void *ptr)
  782. {
  783. netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
  784. while (agent_state == AGENT_INITIALIZING && !netdata_exit) {
  785. time_t checkpoint;
  786. checkpoint = now_realtime_sec() - last_init_sequence;
  787. if (checkpoint > ACLK_STABLE_TIMEOUT) {
  788. agent_state = AGENT_STABLE;
  789. info("AGENT stable, last collector initialization activity was %ld seconds ago", checkpoint);
  790. #ifdef ACLK_DEBUG
  791. _dump_collector_list();
  792. #endif
  793. break;
  794. }
  795. info("Waiting for agent collectors to initialize. Last activity was %ld seconds ago" , checkpoint);
  796. sleep_usec(USEC_PER_SEC * 1);
  797. }
  798. while (!netdata_exit) {
  799. if (unlikely(!aclk_metadata_submitted)) {
  800. aclk_metadata_submitted = ACLK_METADATA_CMD_QUEUED;
  801. if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
  802. errno = 0;
  803. error("ACLK failed to queue on_connect command");
  804. aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  805. }
  806. }
  807. aclk_process_queries();
  808. QUERY_THREAD_LOCK;
  809. // TODO: Need to check if there are queries awaiting already
  810. if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
  811. sleep_usec(USEC_PER_SEC * 1);
  812. QUERY_THREAD_UNLOCK;
  813. } // forever
  814. info("Shutting down query processing thread");
  815. netdata_thread_cleanup_pop(1);
  816. return NULL;
  817. }
  818. static void aclk_graceful_disconnect()
  819. {
  820. size_t write_q, write_q_bytes, read_q;
  821. time_t event_loop_timeout;
  822. // Send a graceful disconnect message
  823. BUFFER *b = buffer_create(512);
  824. aclk_create_header(b, "disconnect", NULL, 0, 0);
  825. buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
  826. aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
  827. buffer_free(b);
  828. event_loop_timeout = now_realtime_sec() + 5;
  829. write_q = 1;
  830. while (write_q && event_loop_timeout > now_realtime_sec()) {
  831. _link_event_loop();
  832. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  833. }
  834. aclk_shutting_down = 1;
  835. _link_shutdown();
  836. aclk_lws_wss_mqtt_layer_disconect_notif();
  837. write_q = 1;
  838. event_loop_timeout = now_realtime_sec() + 5;
  839. while (write_q && event_loop_timeout > now_realtime_sec()) {
  840. _link_event_loop();
  841. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  842. }
  843. aclk_shutting_down = 0;
  844. }
  845. // Thread cleanup
  846. static void aclk_main_cleanup(void *ptr)
  847. {
  848. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  849. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  850. info("cleaning up...");
  851. char *agent_id = is_agent_claimed();
  852. if (agent_id && aclk_connected) {
  853. freez(agent_id);
  854. // Wakeup thread to cleanup
  855. QUERY_THREAD_WAKEUP;
  856. aclk_graceful_disconnect();
  857. }
  858. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  859. }
  860. struct dictionary_singleton {
  861. char *key;
  862. char *result;
  863. };
  864. int json_extract_singleton(JSON_ENTRY *e)
  865. {
  866. struct dictionary_singleton *data = e->callback_data;
  867. switch (e->type) {
  868. case JSON_OBJECT:
  869. case JSON_ARRAY:
  870. break;
  871. case JSON_STRING:
  872. if (!strcmp(e->name, data->key)) {
  873. data->result = strdupz(e->data.string);
  874. break;
  875. }
  876. break;
  877. case JSON_NUMBER:
  878. case JSON_BOOLEAN:
  879. case JSON_NULL:
  880. break;
  881. }
  882. return 0;
  883. }
  884. // Base-64 decoder.
  885. // Note: This is non-validating, invalid input will be decoded without an error.
  886. // Challenges are packed into json strings so we don't skip newlines.
  887. // Size errors (i.e. invalid input size or insufficient output space) are caught.
  888. size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
  889. {
  890. static char lookup[256];
  891. static int first_time=1;
  892. if (first_time)
  893. {
  894. first_time = 0;
  895. for(int i=0; i<256; i++)
  896. lookup[i] = -1;
  897. for(int i='A'; i<='Z'; i++)
  898. lookup[i] = i-'A';
  899. for(int i='a'; i<='z'; i++)
  900. lookup[i] = i-'a' + 26;
  901. for(int i='0'; i<='9'; i++)
  902. lookup[i] = i-'0' + 52;
  903. lookup['+'] = 62;
  904. lookup['/'] = 63;
  905. }
  906. if ((input_size & 3) != 0)
  907. {
  908. error("Can't decode base-64 input length %zu", input_size);
  909. return 0;
  910. }
  911. size_t unpadded_size = (input_size/4) * 3;
  912. if ( unpadded_size > output_size )
  913. {
  914. error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
  915. return 0;
  916. }
  917. // Don't check padding within full quantums
  918. for (size_t i = 0 ; i < input_size-4 ; i+=4 )
  919. {
  920. uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
  921. output[0] = value >> 16;
  922. output[1] = value >> 8;
  923. output[2] = value;
  924. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  925. output += 3;
  926. input += 4;
  927. }
  928. // Handle padding only in last quantum
  929. if (input[2] == '=') {
  930. uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
  931. output[0] = value >> 4;
  932. //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
  933. return unpadded_size-2;
  934. }
  935. else if (input[3] == '=') {
  936. uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
  937. output[0] = value >> 10;
  938. output[1] = value >> 2;
  939. //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
  940. return unpadded_size-1;
  941. }
  942. else
  943. {
  944. uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
  945. output[0] = value >> 16;
  946. output[1] = value >> 8;
  947. output[2] = value;
  948. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  949. return unpadded_size;
  950. }
  951. }
  952. size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
  953. {
  954. uint32_t value;
  955. static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
  956. "abcdefghijklmnopqrstuvwxyz"
  957. "0123456789+/";
  958. if ((input_size/3+1)*4 >= output_size)
  959. {
  960. error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
  961. return 0;
  962. }
  963. size_t count = 0;
  964. while (input_size>3)
  965. {
  966. value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
  967. output[0] = lookup[value >> 18];
  968. output[1] = lookup[(value >> 12) & 0x3f];
  969. output[2] = lookup[(value >> 6) & 0x3f];
  970. output[3] = lookup[value & 0x3f];
  971. //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  972. output += 4;
  973. input += 3;
  974. input_size -= 3;
  975. count += 4;
  976. }
  977. switch (input_size)
  978. {
  979. case 2:
  980. value = (input[0] << 10) + (input[1] << 2);
  981. output[0] = lookup[(value >> 12) & 0x3f];
  982. output[1] = lookup[(value >> 6) & 0x3f];
  983. output[2] = lookup[value & 0x3f];
  984. output[3] = '=';
  985. //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
  986. count += 4;
  987. break;
  988. case 1:
  989. value = input[0] << 4;
  990. output[0] = lookup[(value >> 6) & 0x3f];
  991. output[1] = lookup[value & 0x3f];
  992. output[2] = '=';
  993. output[3] = '=';
  994. //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  995. count += 4;
  996. break;
  997. case 0:
  998. break;
  999. }
  1000. return count;
  1001. }
  1002. int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted)
  1003. {
  1004. int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING);
  1005. if (result == -1) {
  1006. char err[512];
  1007. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  1008. error("Decryption of the challenge failed: %s", err);
  1009. }
  1010. return result;
  1011. }
  1012. char *extract_payload(BUFFER *b)
  1013. {
  1014. char *s = b->buffer;
  1015. unsigned int line_len=0;
  1016. for (size_t i=0; i<b->len; i++)
  1017. {
  1018. if (*s == 0 )
  1019. return NULL;
  1020. if (*s == '\n' ) {
  1021. if (line_len==0)
  1022. return s+1;
  1023. line_len = 0;
  1024. }
  1025. else if (*s == '\r') {
  1026. /* don't count */
  1027. }
  1028. else
  1029. line_len ++;
  1030. s++;
  1031. }
  1032. return NULL;
  1033. }
  1034. void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
  1035. {
  1036. char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1037. debug(D_ACLK, "Performing challenge-response sequence");
  1038. if (aclk_password != NULL)
  1039. {
  1040. freez(aclk_password);
  1041. aclk_password = NULL;
  1042. }
  1043. // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
  1044. // TODO - target host?
  1045. char *agent_id = is_agent_claimed();
  1046. if (agent_id == NULL)
  1047. {
  1048. error("Agent was not claimed - cannot perform challenge/response");
  1049. goto CLEANUP;
  1050. }
  1051. char url[1024];
  1052. sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
  1053. info("Retrieving challenge from cloud: %s %s %s", aclk_hostname, aclk_port, url);
  1054. if(aclk_send_https_request("GET", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
  1055. {
  1056. error("Challenge failed: %s", data_buffer);
  1057. goto CLEANUP;
  1058. }
  1059. struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
  1060. debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
  1061. if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
  1062. {
  1063. freez(challenge.result);
  1064. error("Could not parse the json response with the challenge: %s", data_buffer);
  1065. goto CLEANUP;
  1066. }
  1067. if (challenge.result == NULL ) {
  1068. error("Could not retrieve challenge from auth response: %s", data_buffer);
  1069. goto CLEANUP;
  1070. }
  1071. size_t challenge_len = strlen(challenge.result);
  1072. unsigned char decoded[512];
  1073. size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
  1074. unsigned char plaintext[4096]={};
  1075. int decrypted_length = private_decrypt(decoded, decoded_len, plaintext);
  1076. freez(challenge.result);
  1077. char encoded[512];
  1078. size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
  1079. encoded[encoded_len] = 0;
  1080. debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
  1081. char response_json[4096]={};
  1082. sprintf(response_json, "{\"response\":\"%s\"}", encoded);
  1083. debug(D_ACLK, "Password phase: %s",response_json);
  1084. // TODO - host
  1085. sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
  1086. if(aclk_send_https_request("POST", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
  1087. {
  1088. error("Challenge-response failed: %s", data_buffer);
  1089. goto CLEANUP;
  1090. }
  1091. debug(D_ACLK, "Password response from cloud: %s", data_buffer);
  1092. struct dictionary_singleton password = { .key = "password", .result = NULL };
  1093. if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
  1094. {
  1095. freez(password.result);
  1096. error("Could not parse the json response with the password: %s", data_buffer);
  1097. goto CLEANUP;
  1098. }
  1099. if (password.result == NULL ) {
  1100. error("Could not retrieve password from auth response");
  1101. goto CLEANUP;
  1102. }
  1103. if (aclk_password != NULL )
  1104. freez(aclk_password);
  1105. aclk_password = password.result;
  1106. if (aclk_username != NULL)
  1107. freez(aclk_username);
  1108. aclk_username = agent_id;
  1109. agent_id = NULL;
  1110. CLEANUP:
  1111. if (agent_id != NULL)
  1112. freez(agent_id);
  1113. freez(data_buffer);
  1114. return;
  1115. }
  1116. static void aclk_try_to_connect(char *hostname, char *port, int port_num)
  1117. {
  1118. if (!aclk_private_key) {
  1119. error("Cannot try to establish the agent cloud link - no private key available!");
  1120. return;
  1121. }
  1122. info("Attempting to establish the agent cloud link");
  1123. aclk_get_challenge(hostname, port);
  1124. if (aclk_password == NULL)
  1125. return;
  1126. int rc;
  1127. aclk_connecting = 1;
  1128. create_publish_base_topic();
  1129. rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password);
  1130. if (unlikely(rc)) {
  1131. error("Failed to initialize the agent cloud link library");
  1132. }
  1133. }
  1134. /**
  1135. * Main agent cloud link thread
  1136. *
  1137. * This thread will simply call the main event loop that handles
  1138. * pending requests - both inbound and outbound
  1139. *
  1140. * @param ptr is a pointer to the netdata_static_thread structure.
  1141. *
  1142. * @return It always returns NULL
  1143. */
  1144. void *aclk_main(void *ptr)
  1145. {
  1146. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  1147. struct netdata_static_thread *query_thread;
  1148. // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
  1149. // as it must notify the far end that it shutdown gracefully and avoid the LWT.
  1150. netdata_thread_disable_cancelability();
  1151. #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK)
  1152. info("Killing ACLK thread -> cloud functionality has been disabled");
  1153. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1154. return NULL;
  1155. #endif
  1156. info("Waiting for netdata to be ready");
  1157. while (!netdata_ready) {
  1158. sleep_usec(USEC_PER_MS * 300);
  1159. }
  1160. info("Waiting for Cloud to be enabled");
  1161. while (!netdata_cloud_setting) {
  1162. sleep_usec(USEC_PER_SEC * 1);
  1163. if (netdata_exit) {
  1164. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1165. return NULL;
  1166. }
  1167. }
  1168. last_init_sequence = now_realtime_sec();
  1169. query_thread = NULL;
  1170. char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
  1171. char *aclk_port = NULL;
  1172. uint32_t port_num = 0;
  1173. info("Waiting for netdata to be claimed");
  1174. while(1) {
  1175. char *agent_id = is_agent_claimed();
  1176. while (likely(!agent_id)) {
  1177. sleep_usec(USEC_PER_SEC * 1);
  1178. if (netdata_exit)
  1179. goto exited;
  1180. agent_id = is_agent_claimed();
  1181. }
  1182. freez(agent_id);
  1183. // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
  1184. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
  1185. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  1186. if (cloud_base_url == NULL) {
  1187. error("Do not move the cloud base url out of post_conf_load!!");
  1188. goto exited;
  1189. }
  1190. if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
  1191. error("Agent is claimed but the configuration is invalid, please fix");
  1192. }
  1193. else
  1194. {
  1195. port_num = atoi(aclk_port); // SSL library uses the string, MQTT uses the numeric value
  1196. if (!create_private_key() && !_mqtt_lib_init())
  1197. break;
  1198. }
  1199. for (int i=0; i<60; i++) {
  1200. if (netdata_exit)
  1201. goto exited;
  1202. sleep_usec(USEC_PER_SEC * 1);
  1203. }
  1204. }
  1205. usec_t reconnect_expiry = 0; // In usecs
  1206. while (!netdata_exit) {
  1207. static int first_init = 0;
  1208. /* size_t write_q, write_q_bytes, read_q;
  1209. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/
  1210. if (aclk_kill_link) { // User has reloaded the claiming state
  1211. aclk_kill_link = 0;
  1212. aclk_graceful_disconnect();
  1213. create_private_key();
  1214. continue;
  1215. }
  1216. if (aclk_force_reconnect) {
  1217. aclk_lws_wss_destroy_context();
  1218. aclk_force_reconnect = 0;
  1219. }
  1220. if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) {
  1221. if (unlikely(!first_init)) {
  1222. aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
  1223. first_init = 1;
  1224. } else {
  1225. if (aclk_connecting == 0) {
  1226. if (reconnect_expiry == 0) {
  1227. unsigned long int delay = aclk_reconnect_delay(1);
  1228. reconnect_expiry = now_realtime_usec() + delay * 1000;
  1229. info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
  1230. }
  1231. if (now_realtime_usec() >= reconnect_expiry) {
  1232. reconnect_expiry = 0;
  1233. aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
  1234. }
  1235. sleep_usec(USEC_PER_MS * 100);
  1236. }
  1237. }
  1238. if (aclk_connecting) {
  1239. _link_event_loop();
  1240. sleep_usec(USEC_PER_MS * 100);
  1241. }
  1242. continue;
  1243. }
  1244. _link_event_loop();
  1245. if (unlikely(!aclk_connected || aclk_force_reconnect))
  1246. continue;
  1247. /*static int stress_counter = 0;
  1248. if (write_q_bytes==0 && stress_counter ++ >5)
  1249. {
  1250. aclk_send_stress_test(8000000);
  1251. stress_counter = 0;
  1252. }*/
  1253. // TODO: Move to on-connect
  1254. if (unlikely(!aclk_subscribed)) {
  1255. aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
  1256. }
  1257. if (unlikely(!query_thread)) {
  1258. query_thread = callocz(1, sizeof(struct netdata_static_thread));
  1259. query_thread->thread = mallocz(sizeof(netdata_thread_t));
  1260. netdata_thread_create(
  1261. query_thread->thread, ACLK_THREAD_NAME, NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread,
  1262. query_thread);
  1263. }
  1264. } // forever
  1265. exited:
  1266. // Wakeup query thread to cleanup
  1267. QUERY_THREAD_WAKEUP;
  1268. freez(aclk_username);
  1269. freez(aclk_password);
  1270. freez(aclk_hostname);
  1271. freez(aclk_port);
  1272. if (aclk_private_key != NULL)
  1273. RSA_free(aclk_private_key);
  1274. aclk_main_cleanup(ptr);
  1275. return NULL;
  1276. }
  1277. /*
  1278. * Send a message to the cloud, using a base topic and sib_topic
  1279. * The final topic will be in the form <base_topic>/<sub_topic>
  1280. * If base_topic is missing then the global_base_topic will be used (if available)
  1281. *
  1282. */
  1283. int aclk_send_message(char *sub_topic, char *message, char *msg_id)
  1284. {
  1285. int rc;
  1286. int mid;
  1287. char topic[ACLK_MAX_TOPIC + 1];
  1288. char *final_topic;
  1289. UNUSED(msg_id);
  1290. if (!aclk_connected)
  1291. return 0;
  1292. if (unlikely(!message))
  1293. return 0;
  1294. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  1295. if (unlikely(!final_topic)) {
  1296. errno = 0;
  1297. error("Unable to build outgoing topic; truncated?");
  1298. return 1;
  1299. }
  1300. ACLK_LOCK;
  1301. rc = _link_send_message(final_topic, (unsigned char *)message, &mid);
  1302. // TODO: link the msg_id with the mid so we can trace it
  1303. ACLK_UNLOCK;
  1304. if (unlikely(rc)) {
  1305. errno = 0;
  1306. error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
  1307. }
  1308. return rc;
  1309. }
  1310. /*
  1311. * Subscribe to a topic in the cloud
  1312. * The final subscription will be in the form
  1313. * /agent/claim_id/<sub_topic>
  1314. */
  1315. int aclk_subscribe(char *sub_topic, int qos)
  1316. {
  1317. int rc;
  1318. char topic[ACLK_MAX_TOPIC + 1];
  1319. char *final_topic;
  1320. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  1321. if (unlikely(!final_topic)) {
  1322. errno = 0;
  1323. error("Unable to build outgoing topic; truncated?");
  1324. return 1;
  1325. }
  1326. if (!aclk_connected) {
  1327. error("Cannot subscribe to %s - not connected!", topic);
  1328. return 1;
  1329. }
  1330. ACLK_LOCK;
  1331. rc = _link_subscribe(final_topic, qos);
  1332. ACLK_UNLOCK;
  1333. // TODO: Add better handling -- error will flood the logfile here
  1334. if (unlikely(rc)) {
  1335. errno = 0;
  1336. error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
  1337. }
  1338. return rc;
  1339. }
  1340. // This is called from a callback when the link goes up
  1341. void aclk_connect()
  1342. {
  1343. info("Connection detected (%"PRIu64" queued queries)", aclk_queue.count);
  1344. aclk_connected = 1;
  1345. waiting_init = 0;
  1346. aclk_reconnect_delay(0);
  1347. QUERY_THREAD_WAKEUP;
  1348. return;
  1349. }
  1350. // This is called from a callback when the link goes down
  1351. void aclk_disconnect()
  1352. {
  1353. if (likely(aclk_connected))
  1354. info("Disconnect detected (%"PRIu64" queued queries)", aclk_queue.count);
  1355. aclk_subscribed = 0;
  1356. aclk_metadata_submitted = ACLK_METADATA_REQUIRED;
  1357. waiting_init = 1;
  1358. aclk_connected = 0;
  1359. aclk_connecting = 0;
  1360. aclk_force_reconnect = 1;
  1361. }
  1362. inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us)
  1363. {
  1364. uuid_t uuid;
  1365. char uuid_str[36 + 1];
  1366. if (unlikely(!msg_id)) {
  1367. uuid_generate(uuid);
  1368. uuid_unparse(uuid, uuid_str);
  1369. msg_id = uuid_str;
  1370. }
  1371. if (ts_secs == 0) {
  1372. ts_us = now_realtime_usec();
  1373. ts_secs = ts_us / USEC_PER_SEC;
  1374. ts_us = ts_us % USEC_PER_SEC;
  1375. }
  1376. buffer_sprintf(
  1377. dest,
  1378. "\t{\"type\": \"%s\",\n"
  1379. "\t\"msg-id\": \"%s\",\n"
  1380. "\t\"timestamp\": %ld,\n"
  1381. "\t\"timestamp-offset-usec\": %llu,\n"
  1382. "\t\"connect\": %ld,\n"
  1383. "\t\"connect-offset-usec\": %llu,\n"
  1384. "\t\"version\": %d",
  1385. type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, ACLK_VERSION);
  1386. debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, ts_secs);
  1387. }
  1388. /*
  1389. * This will send alarm information which includes
  1390. * configured alarms
  1391. * alarm_log
  1392. * active alarms
  1393. */
  1394. void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
  1395. void aclk_send_alarm_metadata()
  1396. {
  1397. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1398. char *msg_id = create_uuid();
  1399. buffer_flush(local_buffer);
  1400. local_buffer->contenttype = CT_APPLICATION_JSON;
  1401. debug(D_ACLK, "Metadata alarms start");
  1402. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1403. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1404. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1405. // session.
  1406. if (aclk_metadata_submitted == ACLK_METADATA_SENT)
  1407. aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0);
  1408. else
  1409. aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us);
  1410. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1411. buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
  1412. health_alarms2json(localhost, local_buffer, 1);
  1413. debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);
  1414. // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
  1415. // health_alarm_log2json(localhost, local_buffer, 0);
  1416. // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
  1417. buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
  1418. health_active_log_alarms_2json(localhost, local_buffer);
  1419. //debug(D_ACLK, "Metadata message %s", local_buffer->buffer);
  1420. buffer_sprintf(local_buffer, "\n}\n}");
  1421. aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
  1422. freez(msg_id);
  1423. buffer_free(local_buffer);
  1424. }
  1425. /*
  1426. * This will send the agent metadata
  1427. * /api/v1/info
  1428. * charts
  1429. */
  1430. int aclk_send_info_metadata()
  1431. {
  1432. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1433. debug(D_ACLK, "Metadata /info start");
  1434. char *msg_id = create_uuid();
  1435. buffer_flush(local_buffer);
  1436. local_buffer->contenttype = CT_APPLICATION_JSON;
  1437. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1438. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1439. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1440. // session.
  1441. if (aclk_metadata_submitted == ACLK_METADATA_SENT)
  1442. aclk_create_header(local_buffer, "update", msg_id, 0, 0);
  1443. else
  1444. aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us);
  1445. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1446. buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
  1447. web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
  1448. debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
  1449. buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
  1450. charts2json(localhost, local_buffer, 1);
  1451. buffer_sprintf(local_buffer, "\n}\n}");
  1452. debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
  1453. aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
  1454. freez(msg_id);
  1455. buffer_free(local_buffer);
  1456. return 0;
  1457. }
  1458. void aclk_send_stress_test(size_t size)
  1459. {
  1460. char *buffer = mallocz(size);
  1461. if (buffer != NULL)
  1462. {
  1463. for(size_t i=0; i<size; i++)
  1464. buffer[i] = 'x';
  1465. buffer[size-1] = 0;
  1466. time_t time_created = now_realtime_sec();
  1467. sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
  1468. buffer[strlen(buffer)] = '"';
  1469. buffer[size-2] = '}';
  1470. buffer[size-3] = '"';
  1471. aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
  1472. error("Sending stress of size %zu at time %ld", size, time_created);
  1473. }
  1474. free(buffer);
  1475. }
  1476. // Send info metadata message to the cloud if the link is established
  1477. // or on request
  1478. int aclk_send_metadata()
  1479. {
  1480. aclk_send_info_metadata();
  1481. aclk_send_alarm_metadata();
  1482. return 0;
  1483. }
  1484. void aclk_single_update_disable()
  1485. {
  1486. aclk_disable_single_updates = 1;
  1487. }
  1488. void aclk_single_update_enable()
  1489. {
  1490. aclk_disable_single_updates = 0;
  1491. }
  1492. // Trigged by a health reload, sends the alarm metadata
  1493. void aclk_alarm_reload()
  1494. {
  1495. if (unlikely(agent_state == AGENT_INITIALIZING))
  1496. return;
  1497. if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
  1498. if (likely(aclk_connected)) {
  1499. errno = 0;
  1500. error("ACLK failed to queue on_connect command on alarm reload");
  1501. }
  1502. }
  1503. }
  1504. //rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
  1505. int aclk_send_single_chart(char *hostname, char *chart)
  1506. {
  1507. RRDHOST *target_host;
  1508. target_host = rrdhost_find_by_hostname(hostname, 0);
  1509. if (!target_host)
  1510. return 1;
  1511. RRDSET *st = rrdset_find(target_host, chart);
  1512. if (!st)
  1513. st = rrdset_find_byname(target_host, chart);
  1514. if (!st) {
  1515. info("FAILED to find chart %s", chart);
  1516. return 1;
  1517. }
  1518. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1519. char *msg_id = create_uuid();
  1520. buffer_flush(local_buffer);
  1521. local_buffer->contenttype = CT_APPLICATION_JSON;
  1522. aclk_create_header(local_buffer, "chart", msg_id, 0, 0);
  1523. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1524. rrdset2json(st, local_buffer, NULL, NULL, 1);
  1525. buffer_sprintf(local_buffer, "\t\n}");
  1526. aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);
  1527. freez(msg_id);
  1528. buffer_free(local_buffer);
  1529. return 0;
  1530. }
  1531. int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
  1532. {
  1533. #ifndef ENABLE_ACLK
  1534. UNUSED(host);
  1535. UNUSED(chart_name);
  1536. return 0;
  1537. #else
  1538. if (!netdata_cloud_setting)
  1539. return 0;
  1540. if (host != localhost)
  1541. return 0;
  1542. if (unlikely(aclk_disable_single_updates))
  1543. return 0;
  1544. if (unlikely(agent_state == AGENT_INITIALIZING))
  1545. last_init_sequence = now_realtime_sec();
  1546. else {
  1547. if (unlikely(aclk_queue_query("_chart", host->hostname, NULL, chart_name, 0, 1, aclk_cmd))) {
  1548. if (likely(aclk_connected)) {
  1549. errno = 0;
  1550. error("ACLK failed to queue chart_update command");
  1551. }
  1552. }
  1553. }
  1554. return 0;
  1555. #endif
  1556. }
  1557. int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
  1558. {
  1559. BUFFER *local_buffer = NULL;
  1560. if (host != localhost)
  1561. return 0;
  1562. if (unlikely(agent_state == AGENT_INITIALIZING))
  1563. return 0;
  1564. /*
  1565. * Check if individual updates have been disabled
  1566. * This will be the case when we do health reload
  1567. * and all the alarms will be dropped and recreated.
  1568. * At the end of the health reload the complete alarm metadata
  1569. * info will be sent
  1570. */
  1571. if (unlikely(aclk_disable_single_updates))
  1572. return 0;
  1573. local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1574. char *msg_id = create_uuid();
  1575. buffer_flush(local_buffer);
  1576. aclk_create_header(local_buffer, "status-change", msg_id, 0, 0);
  1577. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1578. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  1579. health_alarm_entry2json_nolock(local_buffer, ae, host);
  1580. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  1581. buffer_sprintf(local_buffer, "\n}");
  1582. if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
  1583. if (likely(aclk_connected)) {
  1584. errno = 0;
  1585. error("ACLK failed to queue alarm_command on alarm_update");
  1586. }
  1587. }
  1588. freez(msg_id);
  1589. buffer_free(local_buffer);
  1590. return 0;
  1591. }
  1592. /*
  1593. * Parse the incoming payload and queue a command if valid
  1594. */
  1595. int aclk_handle_cloud_request(char *payload)
  1596. {
  1597. struct aclk_request cloud_to_agent = {
  1598. .type_id = NULL, .msg_id = NULL, .callback_topic = NULL, .payload = NULL, .version = 0
  1599. };
  1600. if (unlikely(agent_state == AGENT_INITIALIZING)) {
  1601. debug(D_ACLK, "Ignoring cloud request; agent not in stable state");
  1602. return 0;
  1603. }
  1604. if (unlikely(!payload)) {
  1605. debug(D_ACLK, "ACLK incoming message is empty");
  1606. return 0;
  1607. }
  1608. debug(D_ACLK, "ACLK incoming message (%s)", payload);
  1609. int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
  1610. if (unlikely(
  1611. JSON_OK != rc || !cloud_to_agent.payload || !cloud_to_agent.callback_topic || !cloud_to_agent.msg_id ||
  1612. !cloud_to_agent.type_id || cloud_to_agent.version > ACLK_VERSION ||
  1613. strcmp(cloud_to_agent.type_id, "http"))) {
  1614. if (JSON_OK != rc)
  1615. error("Malformed json request (%s)", payload);
  1616. if (cloud_to_agent.version > ACLK_VERSION)
  1617. error("Unsupported version in JSON request %d", cloud_to_agent.version);
  1618. if (cloud_to_agent.payload)
  1619. freez(cloud_to_agent.payload);
  1620. if (cloud_to_agent.type_id)
  1621. freez(cloud_to_agent.type_id);
  1622. if (cloud_to_agent.msg_id)
  1623. freez(cloud_to_agent.msg_id);
  1624. if (cloud_to_agent.callback_topic)
  1625. freez(cloud_to_agent.callback_topic);
  1626. return 1;
  1627. }
  1628. // Checked to be "http", not needed anymore
  1629. if (likely(cloud_to_agent.type_id)) {
  1630. freez(cloud_to_agent.type_id);
  1631. cloud_to_agent.type_id = NULL;
  1632. }
  1633. if (unlikely(aclk_submit_request(&cloud_to_agent)))
  1634. debug(D_ACLK, "ACLK failed to queue incoming message (%s)", payload);
  1635. // Note: the payload comes from the callback and it will be automatically freed
  1636. return 0;
  1637. }