agent_cloud_link.c 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "libnetdata/libnetdata.h"
  3. #include "agent_cloud_link.h"
  4. #include "aclk_lws_https_client.h"
  5. #include "aclk_query.h"
  6. #include "aclk_common.h"
  7. #include "aclk_stats.h"
  8. #ifdef ENABLE_ACLK
  9. #include <libwebsockets.h>
  10. #endif
  11. int aclk_shutting_down = 0;
  12. // Other global state
  13. static int aclk_subscribed = 0;
  14. static int aclk_disable_single_updates = 0;
  15. static char *aclk_username = NULL;
  16. static char *aclk_password = NULL;
  17. static char *global_base_topic = NULL;
  18. static int aclk_connecting = 0;
  19. int aclk_force_reconnect = 0; // Indication from lower layers
  20. usec_t aclk_session_us = 0; // Used by the mqtt layer
  21. time_t aclk_session_sec = 0; // Used by the mqtt layer
  22. static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
  23. static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
  24. #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
  25. #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
  26. #define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
  27. #define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
  28. void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
  29. void aclk_lws_wss_destroy_context();
  30. /*
  31. * Maintain a list of collectors and chart count
  32. * If all the charts of a collector are deleted
  33. * then a new metadata dataset must be send to the cloud
  34. *
  35. */
  36. struct _collector {
  37. time_t created;
  38. uint32_t count; //chart count
  39. uint32_t hostname_hash;
  40. uint32_t plugin_hash;
  41. uint32_t module_hash;
  42. char *hostname;
  43. char *plugin_name;
  44. char *module_name;
  45. struct _collector *next;
  46. };
  47. struct _collector *collector_list = NULL;
  48. char *create_uuid()
  49. {
  50. uuid_t uuid;
  51. char *uuid_str = mallocz(36 + 1);
  52. uuid_generate(uuid);
  53. uuid_unparse(uuid, uuid_str);
  54. return uuid_str;
  55. }
  56. int cloud_to_agent_parse(JSON_ENTRY *e)
  57. {
  58. struct aclk_request *data = e->callback_data;
  59. switch (e->type) {
  60. case JSON_OBJECT:
  61. case JSON_ARRAY:
  62. break;
  63. case JSON_STRING:
  64. if (!strcmp(e->name, "msg-id")) {
  65. data->msg_id = strdupz(e->data.string);
  66. break;
  67. }
  68. if (!strcmp(e->name, "type")) {
  69. data->type_id = strdupz(e->data.string);
  70. break;
  71. }
  72. if (!strcmp(e->name, "callback-topic")) {
  73. data->callback_topic = strdupz(e->data.string);
  74. break;
  75. }
  76. if (!strcmp(e->name, "payload")) {
  77. if (likely(e->data.string)) {
  78. size_t len = strlen(e->data.string);
  79. data->payload = mallocz(len+1);
  80. if (!url_decode_r(data->payload, e->data.string, len + 1))
  81. strcpy(data->payload, e->data.string);
  82. }
  83. break;
  84. }
  85. break;
  86. case JSON_NUMBER:
  87. if (!strcmp(e->name, "version")) {
  88. data->version = e->data.number;
  89. break;
  90. }
  91. if (!strcmp(e->name, "min-version")) {
  92. data->min_version = e->data.number;
  93. break;
  94. }
  95. if (!strcmp(e->name, "max-version")) {
  96. data->max_version = e->data.number;
  97. break;
  98. }
  99. break;
  100. case JSON_BOOLEAN:
  101. break;
  102. case JSON_NULL:
  103. break;
  104. }
  105. return 0;
  106. }
  107. static RSA *aclk_private_key = NULL;
  108. static int create_private_key()
  109. {
  110. if (aclk_private_key != NULL)
  111. RSA_free(aclk_private_key);
  112. aclk_private_key = NULL;
  113. char filename[FILENAME_MAX + 1];
  114. snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);
  115. long bytes_read;
  116. char *private_key = read_by_filename(filename, &bytes_read);
  117. if (!private_key) {
  118. error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
  119. return 1;
  120. }
  121. debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);
  122. BIO *key_bio = BIO_new_mem_buf(private_key, -1);
  123. if (key_bio==NULL) {
  124. error("Claimed agent cannot establish ACLK - failed to create BIO for key");
  125. goto biofailed;
  126. }
  127. aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
  128. BIO_free(key_bio);
  129. if (aclk_private_key!=NULL)
  130. {
  131. freez(private_key);
  132. return 0;
  133. }
  134. char err[512];
  135. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  136. error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);
  137. biofailed:
  138. freez(private_key);
  139. return 1;
  140. }
  141. /*
  142. * After a connection failure -- delay in milliseconds
  143. * When a connection is established, the delay function
  144. * should be called with
  145. *
  146. * mode 0 to reset the delay
  147. * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
  148. *
  149. */
  150. unsigned long int aclk_reconnect_delay(int mode)
  151. {
  152. static int fail = -1;
  153. unsigned long int delay;
  154. if (!mode || fail == -1) {
  155. srandom(time(NULL));
  156. fail = mode - 1;
  157. return 0;
  158. }
  159. delay = (1 << fail);
  160. if (delay >= ACLK_MAX_BACKOFF_DELAY) {
  161. delay = ACLK_MAX_BACKOFF_DELAY * 1000;
  162. } else {
  163. fail++;
  164. delay = (delay * 1000) + (random() % 1000);
  165. }
  166. return delay;
  167. }
  168. // This will give the base topic that the agent will publish messages.
  169. // subtopics will be sent under the base topic e.g. base_topic/subtopic
  170. // This is called during the connection, we delete any previous topic
  171. // in-case the user has changed the agent id and reclaimed.
  172. char *create_publish_base_topic()
  173. {
  174. char *agent_id = is_agent_claimed();
  175. if (unlikely(!agent_id))
  176. return NULL;
  177. ACLK_LOCK;
  178. if (global_base_topic)
  179. freez(global_base_topic);
  180. char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;
  181. snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id);
  182. tmp = strchr(tmp_topic, '\n');
  183. if (unlikely(tmp))
  184. *tmp = '\0';
  185. global_base_topic = strdupz(tmp_topic);
  186. ACLK_UNLOCK;
  187. freez(agent_id);
  188. return global_base_topic;
  189. }
  190. /*
  191. * Build a topic based on sub_topic and final_topic
  192. * if the sub topic starts with / assume that is an absolute topic
  193. *
  194. */
  195. char *get_topic(char *sub_topic, char *final_topic, int max_size)
  196. {
  197. int rc;
  198. if (likely(sub_topic && sub_topic[0] == '/'))
  199. return sub_topic;
  200. if (unlikely(!global_base_topic))
  201. return sub_topic;
  202. rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
  203. if (unlikely(rc >= max_size))
  204. debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);
  205. return final_topic;
  206. }
  207. #ifndef __GNUC__
  208. #pragma region ACLK Internal Collector Tracking
  209. #endif
  210. /*
  211. * Free a collector structure
  212. */
  213. static void _free_collector(struct _collector *collector)
  214. {
  215. if (likely(collector->plugin_name))
  216. freez(collector->plugin_name);
  217. if (likely(collector->module_name))
  218. freez(collector->module_name);
  219. if (likely(collector->hostname))
  220. freez(collector->hostname);
  221. freez(collector);
  222. }
  223. /*
  224. * This will report the collector list
  225. *
  226. */
  227. #ifdef ACLK_DEBUG
  228. static void _dump_collector_list()
  229. {
  230. struct _collector *tmp_collector;
  231. COLLECTOR_LOCK;
  232. info("DUMPING ALL COLLECTORS");
  233. if (unlikely(!collector_list || !collector_list->next)) {
  234. COLLECTOR_UNLOCK;
  235. info("DUMPING ALL COLLECTORS -- nothing found");
  236. return;
  237. }
  238. // Note that the first entry is "dummy"
  239. tmp_collector = collector_list->next;
  240. while (tmp_collector) {
  241. info(
  242. "COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
  243. tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
  244. tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
  245. tmp_collector = tmp_collector->next;
  246. }
  247. info("DUMPING ALL COLLECTORS DONE");
  248. COLLECTOR_UNLOCK;
  249. }
  250. #endif
  251. /*
  252. * This will cleanup the collector list
  253. *
  254. */
  255. static void _reset_collector_list()
  256. {
  257. struct _collector *tmp_collector, *next_collector;
  258. COLLECTOR_LOCK;
  259. if (unlikely(!collector_list || !collector_list->next)) {
  260. COLLECTOR_UNLOCK;
  261. return;
  262. }
  263. // Note that the first entry is "dummy"
  264. tmp_collector = collector_list->next;
  265. collector_list->count = 0;
  266. collector_list->next = NULL;
  267. // We broke the link; we can unlock
  268. COLLECTOR_UNLOCK;
  269. while (tmp_collector) {
  270. next_collector = tmp_collector->next;
  271. _free_collector(tmp_collector);
  272. tmp_collector = next_collector;
  273. }
  274. }
  275. /*
  276. * Find a collector (if it exists)
  277. * Must lock before calling this
  278. * If last_collector is not null, it will return the previous collector in the linked
  279. * list (used in collector delete)
  280. */
  281. static struct _collector *_find_collector(
  282. const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
  283. {
  284. struct _collector *tmp_collector, *prev_collector;
  285. uint32_t plugin_hash;
  286. uint32_t module_hash;
  287. uint32_t hostname_hash;
  288. if (unlikely(!collector_list)) {
  289. collector_list = callocz(1, sizeof(struct _collector));
  290. return NULL;
  291. }
  292. if (unlikely(!collector_list->next))
  293. return NULL;
  294. plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  295. module_hash = module_name ? simple_hash(module_name) : 1;
  296. hostname_hash = simple_hash(hostname);
  297. // Note that the first entry is "dummy"
  298. tmp_collector = collector_list->next;
  299. prev_collector = collector_list;
  300. while (tmp_collector) {
  301. if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
  302. hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
  303. (!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
  304. (!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
  305. if (unlikely(last_collector))
  306. *last_collector = prev_collector;
  307. return tmp_collector;
  308. }
  309. prev_collector = tmp_collector;
  310. tmp_collector = tmp_collector->next;
  311. }
  312. return tmp_collector;
  313. }
  314. /*
  315. * Called to delete a collector
  316. * It will reduce the count (chart_count) and will remove it
  317. * from the linked list if the count reaches zero
  318. * The structure will be returned to the caller to free
  319. * the resources
  320. *
  321. */
  322. static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
  323. {
  324. struct _collector *tmp_collector, *prev_collector = NULL;
  325. tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
  326. if (likely(tmp_collector)) {
  327. --tmp_collector->count;
  328. if (unlikely(!tmp_collector->count))
  329. prev_collector->next = tmp_collector->next;
  330. }
  331. return tmp_collector;
  332. }
  333. /*
  334. * Add a new collector (plugin / module) to the list
  335. * If it already exists just update the chart count
  336. *
  337. * Lock before calling
  338. */
  339. static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
  340. {
  341. struct _collector *tmp_collector;
  342. tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
  343. if (unlikely(!tmp_collector)) {
  344. tmp_collector = callocz(1, sizeof(struct _collector));
  345. tmp_collector->hostname_hash = simple_hash(hostname);
  346. tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
  347. tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
  348. tmp_collector->hostname = strdupz(hostname);
  349. tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
  350. tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
  351. tmp_collector->next = collector_list->next;
  352. collector_list->next = tmp_collector;
  353. }
  354. tmp_collector->count++;
  355. debug(
  356. D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
  357. module_name ? module_name : "*", tmp_collector->count);
  358. return tmp_collector;
  359. }
  360. #ifndef __GNUC__
  361. #pragma endregion
  362. #endif
  363. inline static int aclk_popcorn_check_bump()
  364. {
  365. ACLK_SHARED_STATE_LOCK;
  366. if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
  367. aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
  368. ACLK_SHARED_STATE_UNLOCK;
  369. return 1;
  370. }
  371. ACLK_SHARED_STATE_UNLOCK;
  372. return 0;
  373. }
  374. /*
  375. * Add a new collector to the list
  376. * If it exists, update the chart count
  377. */
  378. void aclk_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
  379. {
  380. struct _collector *tmp_collector;
  381. if (unlikely(!netdata_ready)) {
  382. return;
  383. }
  384. COLLECTOR_LOCK;
  385. tmp_collector = _add_collector(hostname, plugin_name, module_name);
  386. if (unlikely(tmp_collector->count != 1)) {
  387. COLLECTOR_UNLOCK;
  388. return;
  389. }
  390. COLLECTOR_UNLOCK;
  391. if(aclk_popcorn_check_bump())
  392. return;
  393. if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  394. debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
  395. }
  396. /*
  397. * Delete a collector from the list
  398. * If the chart count reaches zero the collector will be removed
  399. * from the list by calling del_collector.
  400. *
  401. * This function will release the memory used and schedule
  402. * a cloud update
  403. */
  404. void aclk_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
  405. {
  406. struct _collector *tmp_collector;
  407. if (unlikely(!netdata_ready)) {
  408. return;
  409. }
  410. COLLECTOR_LOCK;
  411. tmp_collector = _del_collector(hostname, plugin_name, module_name);
  412. if (unlikely(!tmp_collector || tmp_collector->count)) {
  413. COLLECTOR_UNLOCK;
  414. return;
  415. }
  416. debug(
  417. D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
  418. tmp_collector->count);
  419. COLLECTOR_UNLOCK;
  420. _free_collector(tmp_collector);
  421. if (aclk_popcorn_check_bump())
  422. return;
  423. if (unlikely(aclk_queue_query("collector", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
  424. debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
  425. }
  426. static void aclk_graceful_disconnect()
  427. {
  428. size_t write_q, write_q_bytes, read_q;
  429. time_t event_loop_timeout;
  430. // Send a graceful disconnect message
  431. BUFFER *b = buffer_create(512);
  432. aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg);
  433. buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}\n");
  434. aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
  435. buffer_free(b);
  436. event_loop_timeout = now_realtime_sec() + 5;
  437. write_q = 1;
  438. while (write_q && event_loop_timeout > now_realtime_sec()) {
  439. _link_event_loop();
  440. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  441. }
  442. aclk_shutting_down = 1;
  443. _link_shutdown();
  444. aclk_lws_wss_mqtt_layer_disconect_notif();
  445. write_q = 1;
  446. event_loop_timeout = now_realtime_sec() + 5;
  447. while (write_q && event_loop_timeout > now_realtime_sec()) {
  448. _link_event_loop();
  449. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
  450. }
  451. aclk_shutting_down = 0;
  452. }
  453. #ifndef __GNUC__
  454. #pragma region Incoming Msg Parsing
  455. #endif
  456. struct dictionary_singleton {
  457. char *key;
  458. char *result;
  459. };
  460. int json_extract_singleton(JSON_ENTRY *e)
  461. {
  462. struct dictionary_singleton *data = e->callback_data;
  463. switch (e->type) {
  464. case JSON_OBJECT:
  465. case JSON_ARRAY:
  466. break;
  467. case JSON_STRING:
  468. if (!strcmp(e->name, data->key)) {
  469. data->result = strdupz(e->data.string);
  470. break;
  471. }
  472. break;
  473. case JSON_NUMBER:
  474. case JSON_BOOLEAN:
  475. case JSON_NULL:
  476. break;
  477. }
  478. return 0;
  479. }
  480. #ifndef __GNUC__
  481. #pragma endregion
  482. #endif
  483. #ifndef __GNUC__
  484. #pragma region Challenge Response
  485. #endif
  486. // Base-64 decoder.
  487. // Note: This is non-validating, invalid input will be decoded without an error.
  488. // Challenges are packed into json strings so we don't skip newlines.
  489. // Size errors (i.e. invalid input size or insufficient output space) are caught.
  490. size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
  491. {
  492. static char lookup[256];
  493. static int first_time=1;
  494. if (first_time)
  495. {
  496. first_time = 0;
  497. for(int i=0; i<256; i++)
  498. lookup[i] = -1;
  499. for(int i='A'; i<='Z'; i++)
  500. lookup[i] = i-'A';
  501. for(int i='a'; i<='z'; i++)
  502. lookup[i] = i-'a' + 26;
  503. for(int i='0'; i<='9'; i++)
  504. lookup[i] = i-'0' + 52;
  505. lookup['+'] = 62;
  506. lookup['/'] = 63;
  507. }
  508. if ((input_size & 3) != 0)
  509. {
  510. error("Can't decode base-64 input length %zu", input_size);
  511. return 0;
  512. }
  513. size_t unpadded_size = (input_size/4) * 3;
  514. if ( unpadded_size > output_size )
  515. {
  516. error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
  517. return 0;
  518. }
  519. // Don't check padding within full quantums
  520. for (size_t i = 0 ; i < input_size-4 ; i+=4 )
  521. {
  522. uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
  523. output[0] = value >> 16;
  524. output[1] = value >> 8;
  525. output[2] = value;
  526. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  527. output += 3;
  528. input += 4;
  529. }
  530. // Handle padding only in last quantum
  531. if (input[2] == '=') {
  532. uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
  533. output[0] = value >> 4;
  534. //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
  535. return unpadded_size-2;
  536. }
  537. else if (input[3] == '=') {
  538. uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
  539. output[0] = value >> 10;
  540. output[1] = value >> 2;
  541. //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
  542. return unpadded_size-1;
  543. }
  544. else
  545. {
  546. uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
  547. output[0] = value >> 16;
  548. output[1] = value >> 8;
  549. output[2] = value;
  550. //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
  551. return unpadded_size;
  552. }
  553. }
  554. size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
  555. {
  556. uint32_t value;
  557. static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
  558. "abcdefghijklmnopqrstuvwxyz"
  559. "0123456789+/";
  560. if ((input_size/3+1)*4 >= output_size)
  561. {
  562. error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
  563. return 0;
  564. }
  565. size_t count = 0;
  566. while (input_size>3)
  567. {
  568. value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
  569. output[0] = lookup[value >> 18];
  570. output[1] = lookup[(value >> 12) & 0x3f];
  571. output[2] = lookup[(value >> 6) & 0x3f];
  572. output[3] = lookup[value & 0x3f];
  573. //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  574. output += 4;
  575. input += 3;
  576. input_size -= 3;
  577. count += 4;
  578. }
  579. switch (input_size)
  580. {
  581. case 2:
  582. value = (input[0] << 10) + (input[1] << 2);
  583. output[0] = lookup[(value >> 12) & 0x3f];
  584. output[1] = lookup[(value >> 6) & 0x3f];
  585. output[2] = lookup[value & 0x3f];
  586. output[3] = '=';
  587. //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]);
  588. count += 4;
  589. break;
  590. case 1:
  591. value = input[0] << 4;
  592. output[0] = lookup[(value >> 6) & 0x3f];
  593. output[1] = lookup[value & 0x3f];
  594. output[2] = '=';
  595. output[3] = '=';
  596. //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
  597. count += 4;
  598. break;
  599. case 0:
  600. break;
  601. }
  602. return count;
  603. }
  604. int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted)
  605. {
  606. int result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING);
  607. if (result == -1) {
  608. char err[512];
  609. ERR_error_string_n(ERR_get_error(), err, sizeof(err));
  610. error("Decryption of the challenge failed: %s", err);
  611. }
  612. return result;
  613. }
  614. void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
  615. {
  616. char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  617. debug(D_ACLK, "Performing challenge-response sequence");
  618. if (aclk_password != NULL)
  619. {
  620. freez(aclk_password);
  621. aclk_password = NULL;
  622. }
  623. // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
  624. // TODO - target host?
  625. char *agent_id = is_agent_claimed();
  626. if (agent_id == NULL)
  627. {
  628. error("Agent was not claimed - cannot perform challenge/response");
  629. goto CLEANUP;
  630. }
  631. char url[1024];
  632. sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
  633. info("Retrieving challenge from cloud: %s %s %s", aclk_hostname, aclk_port, url);
  634. if(aclk_send_https_request("GET", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
  635. {
  636. error("Challenge failed: %s", data_buffer);
  637. goto CLEANUP;
  638. }
  639. struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };
  640. debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
  641. if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
  642. {
  643. freez(challenge.result);
  644. error("Could not parse the json response with the challenge: %s", data_buffer);
  645. goto CLEANUP;
  646. }
  647. if (challenge.result == NULL ) {
  648. error("Could not retrieve challenge from auth response: %s", data_buffer);
  649. goto CLEANUP;
  650. }
  651. size_t challenge_len = strlen(challenge.result);
  652. unsigned char decoded[512];
  653. size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));
  654. unsigned char plaintext[4096]={};
  655. int decrypted_length = private_decrypt(decoded, decoded_len, plaintext);
  656. freez(challenge.result);
  657. char encoded[512];
  658. size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
  659. encoded[encoded_len] = 0;
  660. debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);
  661. char response_json[4096]={};
  662. sprintf(response_json, "{\"response\":\"%s\"}", encoded);
  663. debug(D_ACLK, "Password phase: %s",response_json);
  664. // TODO - host
  665. sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
  666. if(aclk_send_https_request("POST", aclk_hostname, aclk_port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
  667. {
  668. error("Challenge-response failed: %s", data_buffer);
  669. goto CLEANUP;
  670. }
  671. debug(D_ACLK, "Password response from cloud: %s", data_buffer);
  672. struct dictionary_singleton password = { .key = "password", .result = NULL };
  673. if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
  674. {
  675. freez(password.result);
  676. error("Could not parse the json response with the password: %s", data_buffer);
  677. goto CLEANUP;
  678. }
  679. if (password.result == NULL ) {
  680. error("Could not retrieve password from auth response");
  681. goto CLEANUP;
  682. }
  683. if (aclk_password != NULL )
  684. freez(aclk_password);
  685. aclk_password = password.result;
  686. if (aclk_username != NULL)
  687. freez(aclk_username);
  688. aclk_username = agent_id;
  689. agent_id = NULL;
  690. CLEANUP:
  691. if (agent_id != NULL)
  692. freez(agent_id);
  693. freez(data_buffer);
  694. return;
  695. }
  696. #ifndef __GNUC__
  697. #pragma endregion
  698. #endif
  699. static void aclk_try_to_connect(char *hostname, char *port, int port_num)
  700. {
  701. if (!aclk_private_key) {
  702. error("Cannot try to establish the agent cloud link - no private key available!");
  703. return;
  704. }
  705. info("Attempting to establish the agent cloud link");
  706. aclk_get_challenge(hostname, port);
  707. if (aclk_password == NULL)
  708. return;
  709. int rc;
  710. aclk_connecting = 1;
  711. create_publish_base_topic();
  712. ACLK_SHARED_STATE_LOCK;
  713. aclk_shared_state.version_neg = 0;
  714. aclk_shared_state.version_neg_wait_till = 0;
  715. ACLK_SHARED_STATE_UNLOCK;
  716. rc = mqtt_attempt_connection(hostname, port_num, aclk_username, aclk_password);
  717. if (unlikely(rc)) {
  718. error("Failed to initialize the agent cloud link library");
  719. }
  720. }
  721. // Sends "hello" message to negotiate ACLK version with cloud
  722. static inline void aclk_hello_msg()
  723. {
  724. BUFFER *buf = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  725. char *msg_id = create_uuid();
  726. ACLK_SHARED_STATE_LOCK;
  727. aclk_shared_state.version_neg = 0;
  728. aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
  729. ACLK_SHARED_STATE_UNLOCK;
  730. //Hello message is versioned separatelly from the rest of the protocol
  731. aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
  732. buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX);
  733. aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id);
  734. freez(msg_id);
  735. buffer_free(buf);
  736. }
  737. /**
  738. * Main agent cloud link thread
  739. *
  740. * This thread will simply call the main event loop that handles
  741. * pending requests - both inbound and outbound
  742. *
  743. * @param ptr is a pointer to the netdata_static_thread structure.
  744. *
  745. * @return It always returns NULL
  746. */
  747. void *aclk_main(void *ptr)
  748. {
  749. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  750. struct aclk_query_threads query_threads;
  751. struct aclk_stats_thread *stats_thread = NULL;
  752. query_threads.thread_list = NULL;
  753. // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
  754. // as it must notify the far end that it shutdown gracefully and avoid the LWT.
  755. netdata_thread_disable_cancelability();
  756. #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK)
  757. info("Killing ACLK thread -> cloud functionality has been disabled");
  758. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  759. return NULL;
  760. #endif
  761. #ifndef LWS_WITH_SOCKS5
  762. ACLK_PROXY_TYPE proxy_type;
  763. aclk_get_proxy(&proxy_type);
  764. if(proxy_type == PROXY_TYPE_SOCKS5) {
  765. error("Disabling ACLK due to requested SOCKS5 proxy.");
  766. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  767. return NULL;
  768. }
  769. #endif
  770. info("Waiting for netdata to be ready");
  771. while (!netdata_ready) {
  772. sleep_usec(USEC_PER_MS * 300);
  773. }
  774. info("Waiting for Cloud to be enabled");
  775. while (!netdata_cloud_setting) {
  776. sleep_usec(USEC_PER_SEC * 1);
  777. if (netdata_exit) {
  778. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  779. return NULL;
  780. }
  781. }
  782. query_threads.count = MIN(processors/2, 6);
  783. query_threads.count = MAX(query_threads.count, 2);
  784. query_threads.count = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
  785. if(query_threads.count < 1) {
  786. error("You need at least one query thread. Overriding configured setting of \"%d\"", query_threads.count);
  787. query_threads.count = 1;
  788. config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
  789. }
  790. aclk_shared_state.last_popcorn_interrupt = now_realtime_sec(); // without mutex here because threads are not yet started
  791. aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
  792. if (aclk_stats_enabled) {
  793. stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
  794. stats_thread->thread = mallocz(sizeof(netdata_thread_t));
  795. stats_thread->query_thread_count = query_threads.count;
  796. netdata_thread_create(
  797. stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
  798. stats_thread);
  799. }
  800. char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
  801. char *aclk_port = NULL;
  802. uint32_t port_num = 0;
  803. info("Waiting for netdata to be claimed");
  804. while(1) {
  805. char *agent_id = is_agent_claimed();
  806. while (likely(!agent_id)) {
  807. sleep_usec(USEC_PER_SEC * 1);
  808. if (netdata_exit)
  809. goto exited;
  810. agent_id = is_agent_claimed();
  811. }
  812. freez(agent_id);
  813. // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
  814. // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
  815. char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
  816. if (cloud_base_url == NULL) {
  817. error("Do not move the cloud base url out of post_conf_load!!");
  818. goto exited;
  819. }
  820. if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &aclk_port)) {
  821. error("Agent is claimed but the configuration is invalid, please fix");
  822. }
  823. else
  824. {
  825. port_num = atoi(aclk_port); // SSL library uses the string, MQTT uses the numeric value
  826. if (!create_private_key() && !_mqtt_lib_init())
  827. break;
  828. }
  829. for (int i=0; i<60; i++) {
  830. if (netdata_exit)
  831. goto exited;
  832. sleep_usec(USEC_PER_SEC * 1);
  833. }
  834. }
  835. usec_t reconnect_expiry = 0; // In usecs
  836. while (!netdata_exit) {
  837. static int first_init = 0;
  838. /* size_t write_q, write_q_bytes, read_q;
  839. lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/
  840. if (aclk_disable_runtime && !aclk_connected) {
  841. sleep(1);
  842. continue;
  843. }
  844. if (aclk_kill_link) { // User has reloaded the claiming state
  845. aclk_kill_link = 0;
  846. aclk_graceful_disconnect();
  847. create_private_key();
  848. continue;
  849. }
  850. if (aclk_force_reconnect) {
  851. aclk_lws_wss_destroy_context();
  852. aclk_force_reconnect = 0;
  853. }
  854. if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) {
  855. if (unlikely(!first_init)) {
  856. aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
  857. first_init = 1;
  858. } else {
  859. if (aclk_connecting == 0) {
  860. if (reconnect_expiry == 0) {
  861. unsigned long int delay = aclk_reconnect_delay(1);
  862. reconnect_expiry = now_realtime_usec() + delay * 1000;
  863. info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
  864. }
  865. if (now_realtime_usec() >= reconnect_expiry) {
  866. reconnect_expiry = 0;
  867. aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
  868. }
  869. sleep_usec(USEC_PER_MS * 100);
  870. }
  871. }
  872. if (aclk_connecting) {
  873. _link_event_loop();
  874. sleep_usec(USEC_PER_MS * 100);
  875. }
  876. continue;
  877. }
  878. _link_event_loop();
  879. if (unlikely(!aclk_connected || aclk_force_reconnect))
  880. continue;
  881. /*static int stress_counter = 0;
  882. if (write_q_bytes==0 && stress_counter ++ >5)
  883. {
  884. aclk_send_stress_test(8000000);
  885. stress_counter = 0;
  886. }*/
  887. if (unlikely(!aclk_subscribed)) {
  888. aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
  889. aclk_hello_msg();
  890. }
  891. if (unlikely(!query_threads.thread_list)) {
  892. aclk_query_threads_start(&query_threads);
  893. }
  894. } // forever
  895. exited:
  896. // Wakeup query thread to cleanup
  897. QUERY_THREAD_WAKEUP_ALL;
  898. freez(aclk_username);
  899. freez(aclk_password);
  900. freez(aclk_hostname);
  901. freez(aclk_port);
  902. if (aclk_private_key != NULL)
  903. RSA_free(aclk_private_key);
  904. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  905. char *agent_id = is_agent_claimed();
  906. if (agent_id && aclk_connected) {
  907. freez(agent_id);
  908. // Wakeup thread to cleanup
  909. QUERY_THREAD_WAKEUP;
  910. aclk_graceful_disconnect();
  911. }
  912. aclk_query_threads_cleanup(&query_threads);
  913. _reset_collector_list();
  914. freez(collector_list);
  915. if(aclk_stats_enabled) {
  916. netdata_thread_join(*stats_thread->thread, NULL);
  917. aclk_stats_thread_cleanup();
  918. freez(stats_thread->thread);
  919. freez(stats_thread);
  920. }
  921. /*
  922. * this must be last -> if all static threads signal
  923. * THREAD_EXITED rrdengine will dealloc the RRDSETs
  924. * and RRDDIMs that are used by still runing stat thread.
  925. * see netdata_cleanup_and_exit() for reference
  926. */
  927. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  928. return NULL;
  929. }
  930. /*
  931. * Send a message to the cloud, using a base topic and sib_topic
  932. * The final topic will be in the form <base_topic>/<sub_topic>
  933. * If base_topic is missing then the global_base_topic will be used (if available)
  934. *
  935. */
  936. int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id)
  937. {
  938. int rc;
  939. int mid;
  940. char topic[ACLK_MAX_TOPIC + 1];
  941. char *final_topic;
  942. UNUSED(msg_id);
  943. if (!aclk_connected)
  944. return 0;
  945. if (unlikely(!message))
  946. return 0;
  947. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  948. if (unlikely(!final_topic)) {
  949. errno = 0;
  950. error("Unable to build outgoing topic; truncated?");
  951. return 1;
  952. }
  953. ACLK_LOCK;
  954. rc = _link_send_message(final_topic, message, len, &mid);
  955. // TODO: link the msg_id with the mid so we can trace it
  956. ACLK_UNLOCK;
  957. if (unlikely(rc)) {
  958. errno = 0;
  959. error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
  960. }
  961. return rc;
  962. }
  963. int aclk_send_message(char *sub_topic, char *message, char *msg_id)
  964. {
  965. return aclk_send_message_bin(sub_topic, message, strlen(message), msg_id);
  966. }
  967. /*
  968. * Subscribe to a topic in the cloud
  969. * The final subscription will be in the form
  970. * /agent/claim_id/<sub_topic>
  971. */
  972. int aclk_subscribe(char *sub_topic, int qos)
  973. {
  974. int rc;
  975. char topic[ACLK_MAX_TOPIC + 1];
  976. char *final_topic;
  977. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  978. if (unlikely(!final_topic)) {
  979. errno = 0;
  980. error("Unable to build outgoing topic; truncated?");
  981. return 1;
  982. }
  983. if (!aclk_connected) {
  984. error("Cannot subscribe to %s - not connected!", topic);
  985. return 1;
  986. }
  987. ACLK_LOCK;
  988. rc = _link_subscribe(final_topic, qos);
  989. ACLK_UNLOCK;
  990. // TODO: Add better handling -- error will flood the logfile here
  991. if (unlikely(rc)) {
  992. errno = 0;
  993. error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
  994. }
  995. return rc;
  996. }
  997. // This is called from a callback when the link goes up
  998. void aclk_connect()
  999. {
  1000. info("Connection detected (%u queued queries)", aclk_query_size());
  1001. aclk_stats_upd_online(1);
  1002. aclk_connected = 1;
  1003. aclk_reconnect_delay(0);
  1004. QUERY_THREAD_WAKEUP;
  1005. return;
  1006. }
  1007. // This is called from a callback when the link goes down
  1008. void aclk_disconnect()
  1009. {
  1010. if (likely(aclk_connected))
  1011. info("Disconnect detected (%u queued queries)", aclk_query_size());
  1012. aclk_stats_upd_online(0);
  1013. aclk_subscribed = 0;
  1014. ACLK_SHARED_STATE_LOCK;
  1015. aclk_shared_state.metadata_submitted = ACLK_METADATA_REQUIRED;
  1016. ACLK_SHARED_STATE_UNLOCK;
  1017. aclk_connected = 0;
  1018. aclk_connecting = 0;
  1019. aclk_force_reconnect = 1;
  1020. }
  1021. inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version)
  1022. {
  1023. uuid_t uuid;
  1024. char uuid_str[36 + 1];
  1025. if (unlikely(!msg_id)) {
  1026. uuid_generate(uuid);
  1027. uuid_unparse(uuid, uuid_str);
  1028. msg_id = uuid_str;
  1029. }
  1030. if (ts_secs == 0) {
  1031. ts_us = now_realtime_usec();
  1032. ts_secs = ts_us / USEC_PER_SEC;
  1033. ts_us = ts_us % USEC_PER_SEC;
  1034. }
  1035. buffer_sprintf(
  1036. dest,
  1037. "{\t\"type\": \"%s\",\n"
  1038. "\t\"msg-id\": \"%s\",\n"
  1039. "\t\"timestamp\": %ld,\n"
  1040. "\t\"timestamp-offset-usec\": %llu,\n"
  1041. "\t\"connect\": %ld,\n"
  1042. "\t\"connect-offset-usec\": %llu,\n"
  1043. "\t\"version\": %d",
  1044. type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, version);
  1045. debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", version, msg_id, type, ts_secs);
  1046. }
  1047. /*
  1048. * This will send alarm information which includes
  1049. * configured alarms
  1050. * alarm_log
  1051. * active alarms
  1052. */
  1053. void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
  1054. void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
  1055. {
  1056. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1057. char *msg_id = create_uuid();
  1058. buffer_flush(local_buffer);
  1059. local_buffer->contenttype = CT_APPLICATION_JSON;
  1060. debug(D_ACLK, "Metadata alarms start");
  1061. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1062. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1063. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1064. // session.
  1065. if (metadata_submitted == ACLK_METADATA_SENT)
  1066. aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
  1067. else
  1068. aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
  1069. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1070. buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
  1071. health_alarms2json(localhost, local_buffer, 1);
  1072. debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);
  1073. // buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
  1074. // health_alarm_log2json(localhost, local_buffer, 0);
  1075. // debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
  1076. buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
  1077. health_active_log_alarms_2json(localhost, local_buffer);
  1078. //debug(D_ACLK, "Metadata message %s", local_buffer->buffer);
  1079. buffer_sprintf(local_buffer, "\n}\n}");
  1080. aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
  1081. freez(msg_id);
  1082. buffer_free(local_buffer);
  1083. }
  1084. /*
  1085. * This will send the agent metadata
  1086. * /api/v1/info
  1087. * charts
  1088. */
  1089. int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted)
  1090. {
  1091. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1092. debug(D_ACLK, "Metadata /info start");
  1093. char *msg_id = create_uuid();
  1094. buffer_flush(local_buffer);
  1095. local_buffer->contenttype = CT_APPLICATION_JSON;
  1096. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  1097. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  1098. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  1099. // session.
  1100. if (metadata_submitted == ACLK_METADATA_SENT)
  1101. aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg);
  1102. else
  1103. aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
  1104. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1105. buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
  1106. web_client_api_request_v1_info_fill_buffer(localhost, local_buffer);
  1107. debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);
  1108. buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
  1109. charts2json(localhost, local_buffer, 1, 0);
  1110. buffer_sprintf(local_buffer, "\n}\n}");
  1111. debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
  1112. aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
  1113. freez(msg_id);
  1114. buffer_free(local_buffer);
  1115. return 0;
  1116. }
  1117. void aclk_send_stress_test(size_t size)
  1118. {
  1119. char *buffer = mallocz(size);
  1120. if (buffer != NULL)
  1121. {
  1122. for(size_t i=0; i<size; i++)
  1123. buffer[i] = 'x';
  1124. buffer[size-1] = 0;
  1125. time_t time_created = now_realtime_sec();
  1126. sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
  1127. buffer[strlen(buffer)] = '"';
  1128. buffer[size-2] = '}';
  1129. buffer[size-3] = '"';
  1130. aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
  1131. error("Sending stress of size %zu at time %ld", size, time_created);
  1132. }
  1133. free(buffer);
  1134. }
  1135. // Send info metadata message to the cloud if the link is established
  1136. // or on request
  1137. int aclk_send_metadata(ACLK_METADATA_STATE state)
  1138. {
  1139. aclk_send_info_metadata(state);
  1140. aclk_send_alarm_metadata(state);
  1141. return 0;
  1142. }
  1143. void aclk_single_update_disable()
  1144. {
  1145. aclk_disable_single_updates = 1;
  1146. }
  1147. void aclk_single_update_enable()
  1148. {
  1149. aclk_disable_single_updates = 0;
  1150. }
  1151. // Trigged by a health reload, sends the alarm metadata
  1152. void aclk_alarm_reload()
  1153. {
  1154. ACLK_SHARED_STATE_LOCK;
  1155. if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
  1156. ACLK_SHARED_STATE_UNLOCK;
  1157. return;
  1158. }
  1159. ACLK_SHARED_STATE_UNLOCK;
  1160. if (unlikely(aclk_queue_query("on_connect", NULL, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
  1161. if (likely(aclk_connected)) {
  1162. errno = 0;
  1163. error("ACLK failed to queue on_connect command on alarm reload");
  1164. }
  1165. }
  1166. }
  1167. //rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
  1168. int aclk_send_single_chart(char *hostname, char *chart)
  1169. {
  1170. RRDHOST *target_host;
  1171. target_host = rrdhost_find_by_hostname(hostname, 0);
  1172. if (!target_host)
  1173. return 1;
  1174. RRDSET *st = rrdset_find(target_host, chart);
  1175. if (!st)
  1176. st = rrdset_find_byname(target_host, chart);
  1177. if (!st) {
  1178. info("FAILED to find chart %s", chart);
  1179. return 1;
  1180. }
  1181. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1182. char *msg_id = create_uuid();
  1183. buffer_flush(local_buffer);
  1184. local_buffer->contenttype = CT_APPLICATION_JSON;
  1185. aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg);
  1186. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1187. rrdset2json(st, local_buffer, NULL, NULL, 1);
  1188. buffer_sprintf(local_buffer, "\t\n}");
  1189. aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);
  1190. freez(msg_id);
  1191. buffer_free(local_buffer);
  1192. return 0;
  1193. }
  1194. int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
  1195. {
  1196. #ifndef ENABLE_ACLK
  1197. UNUSED(host);
  1198. UNUSED(chart_name);
  1199. return 0;
  1200. #else
  1201. if (unlikely(!netdata_ready))
  1202. return 0;
  1203. if (!netdata_cloud_setting)
  1204. return 0;
  1205. if (host != localhost)
  1206. return 0;
  1207. if (unlikely(aclk_disable_single_updates))
  1208. return 0;
  1209. if (aclk_popcorn_check_bump())
  1210. return 0;
  1211. if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) {
  1212. if (likely(aclk_connected)) {
  1213. errno = 0;
  1214. error("ACLK failed to queue chart_update command");
  1215. }
  1216. }
  1217. return 0;
  1218. #endif
  1219. }
  1220. int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
  1221. {
  1222. BUFFER *local_buffer = NULL;
  1223. if (unlikely(!netdata_ready))
  1224. return 0;
  1225. if (host != localhost)
  1226. return 0;
  1227. ACLK_SHARED_STATE_LOCK;
  1228. if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
  1229. ACLK_SHARED_STATE_UNLOCK;
  1230. return 0;
  1231. }
  1232. ACLK_SHARED_STATE_UNLOCK;
  1233. /*
  1234. * Check if individual updates have been disabled
  1235. * This will be the case when we do health reload
  1236. * and all the alarms will be dropped and recreated.
  1237. * At the end of the health reload the complete alarm metadata
  1238. * info will be sent
  1239. */
  1240. if (unlikely(aclk_disable_single_updates))
  1241. return 0;
  1242. local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  1243. char *msg_id = create_uuid();
  1244. buffer_flush(local_buffer);
  1245. aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg);
  1246. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  1247. netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
  1248. health_alarm_entry2json_nolock(local_buffer, ae, host);
  1249. netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
  1250. buffer_sprintf(local_buffer, "\n}");
  1251. if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
  1252. if (likely(aclk_connected)) {
  1253. errno = 0;
  1254. error("ACLK failed to queue alarm_command on alarm_update");
  1255. }
  1256. }
  1257. freez(msg_id);
  1258. buffer_free(local_buffer);
  1259. return 0;
  1260. }