agent_cloud_link.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "libnetdata/libnetdata.h"
  3. #include "agent_cloud_link.h"
  4. // Read from the config file -- new section [agent_cloud_link]
  5. // Defaults are supplied
  6. int aclk_recv_maximum = 0; // default 20
  7. int aclk_send_maximum = 0; // default 20
  8. int aclk_port = 0; // default 1883
  9. char *aclk_hostname = NULL; //default localhost
  10. int aclk_subscribed = 0;
  11. int aclk_metadata_submitted = 0;
  12. int waiting_init = 1;
  13. int cmdpause = 0; // Used to pause query processing
  14. BUFFER *aclk_buffer = NULL;
  15. char *global_base_topic = NULL;
  16. int cloud_to_agent_parse(JSON_ENTRY *e)
  17. {
  18. struct aclk_request *data = e->callback_data;
  19. switch(e->type) {
  20. case JSON_OBJECT:
  21. e->callback_function = cloud_to_agent_parse;
  22. break;
  23. case JSON_ARRAY:
  24. e->callback_function = cloud_to_agent_parse;
  25. break;
  26. case JSON_STRING:
  27. if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) {
  28. data->msg_id = strdupz(e->data.string);
  29. break;
  30. }
  31. if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) {
  32. data->type_id = strdupz(e->data.string);
  33. break;
  34. }
  35. if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) {
  36. data->topic = strdupz(e->data.string);
  37. break;
  38. }
  39. if (!strcmp(e->name, ACLK_JSON_IN_URL)) {
  40. data->url = strdupz(e->data.string);
  41. break;
  42. }
  43. break;
  44. case JSON_NUMBER:
  45. if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) {
  46. data->version = atol(e->data.string);
  47. break;
  48. }
  49. break;
  50. case JSON_BOOLEAN:
  51. break;
  52. case JSON_NULL:
  53. break;
  54. }
  55. return 0;
  56. }
  57. //char *send_http_request(char *host, char *port, char *url, BUFFER *b)
  58. //{
  59. // struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
  60. //
  61. // buffer_flush(b);
  62. // buffer_sprintf(
  63. // b,
  64. // "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n",
  65. // url, host);
  66. // int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout);
  67. //
  68. // if (unlikely(sock == -1)) {
  69. // error("Handshake failed");
  70. // return NULL;
  71. // }
  72. //
  73. // SSL_CTX *ctx = security_initialize_openssl_client();
  74. // // Certificate chain: not updating the stores - do we need private CA roots?
  75. // // Calls to SSL_CTX_load_verify_locations would go here.
  76. // SSL *ssl = SSL_new(ctx);
  77. // SSL_set_fd(ssl, sock);
  78. // int err = SSL_connect(ssl);
  79. // SSL_write(ssl, b->buffer, b->len); // Timeout options?
  80. // int bytes_read = SSL_read(ssl, b->buffer, b->len);
  81. // SSL_shutdown(ssl);
  82. // close(sock);
  83. //}
  84. // Set when we have connection up and running from the connection callback
  85. int aclk_connection_initialized = 0;
  86. static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
  87. static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
  88. #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
  89. #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
  90. #define QUERY_LOCK netdata_mutex_lock(&query_mutex)
  91. #define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
  92. pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
  93. pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
  94. #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
  95. #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
  96. #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
  97. struct aclk_query {
  98. time_t created;
  99. time_t run_after; // Delay run until after this time
  100. char *topic; // Topic to respond to
  101. char *data; // Internal data (NULL if request from the cloud)
  102. char *msg_id; // msg_id generated by the cloud (NULL if internal)
  103. char *query; // The actual query
  104. u_char deleted; // Mark deleted for garbage collect
  105. struct aclk_query *next;
  106. };
  107. struct aclk_query_queue {
  108. struct aclk_query *aclk_query_head;
  109. struct aclk_query *aclk_query_tail;
  110. u_int64_t count;
  111. } aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
  112. /*
  113. * Free a query structure when done
  114. */
  115. void aclk_query_free(struct aclk_query *this_query)
  116. {
  117. if (unlikely(!this_query))
  118. return;
  119. freez(this_query->topic);
  120. freez(this_query->query);
  121. if (this_query->data)
  122. freez(this_query->data);
  123. if (this_query->msg_id)
  124. freez(this_query->msg_id);
  125. freez(this_query);
  126. return;
  127. }
  128. // Returns the entry after which we need to create a new entry to run at the specified time
  129. // If NULL is returned we need to add to HEAD
  130. // Called with locked entries
  131. struct aclk_query *aclk_query_find_position(time_t time_to_run)
  132. {
  133. struct aclk_query *tmp_query, *last_query;
  134. last_query = NULL;
  135. tmp_query = aclk_queue.aclk_query_head;
  136. while (tmp_query) {
  137. if (tmp_query->run_after > time_to_run)
  138. return last_query;
  139. last_query = tmp_query;
  140. tmp_query = tmp_query->next;
  141. }
  142. return last_query;
  143. }
  144. // Need to have a lock before calling this
  145. struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query)
  146. {
  147. struct aclk_query *tmp_query;
  148. tmp_query = aclk_queue.aclk_query_head;
  149. while (tmp_query) {
  150. if (likely(!tmp_query->deleted)) {
  151. if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) {
  152. if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
  153. (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0)))
  154. return tmp_query;
  155. }
  156. }
  157. tmp_query = tmp_query->next;
  158. }
  159. return NULL;
  160. }
  161. /*
  162. * Add a query to execute, the result will be send to the specified topic
  163. */
  164. int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal)
  165. {
  166. struct aclk_query *new_query, *tmp_query;
  167. // Ignore all commands while we wait for the agent to initialize
  168. if (unlikely(waiting_init))
  169. return 0;
  170. run_after = now_realtime_sec() + run_after;
  171. QUERY_LOCK;
  172. tmp_query = aclk_query_find(topic, data, msg_id, query);
  173. if (unlikely(tmp_query)) {
  174. if (tmp_query->run_after == run_after) {
  175. QUERY_UNLOCK;
  176. QUERY_THREAD_WAKEUP;
  177. return 0;
  178. }
  179. tmp_query->deleted = 1;
  180. }
  181. new_query = callocz(1, sizeof(struct aclk_query));
  182. if (internal) {
  183. new_query->topic = strdupz(topic);
  184. new_query->query = strdupz(query);
  185. } else {
  186. new_query->topic = topic;
  187. new_query->query = query;
  188. new_query->msg_id = msg_id;
  189. }
  190. if (data)
  191. new_query->data = strdupz(data);
  192. new_query->next = NULL;
  193. new_query->created = now_realtime_sec();
  194. new_query->run_after = run_after;
  195. info("Added query (%s) (%s)", topic, query);
  196. tmp_query = aclk_query_find_position(run_after);
  197. if (tmp_query) {
  198. new_query->next = tmp_query->next;
  199. tmp_query->next = new_query;
  200. if (tmp_query == aclk_queue.aclk_query_tail)
  201. aclk_queue.aclk_query_tail = new_query;
  202. aclk_queue.count++;
  203. QUERY_UNLOCK;
  204. QUERY_THREAD_WAKEUP;
  205. return 0;
  206. }
  207. new_query->next = aclk_queue.aclk_query_head;
  208. aclk_queue.aclk_query_head = new_query;
  209. aclk_queue.count++;
  210. QUERY_UNLOCK;
  211. QUERY_THREAD_WAKEUP;
  212. return 0;
  213. // if (likely(aclk_queue.aclk_query_tail)) {
  214. // aclk_queue.aclk_query_tail->next = new_query;
  215. // aclk_queue.aclk_query_tail = new_query;
  216. // aclk_queue.count++;
  217. // QUERY_UNLOCK;
  218. // return 0;
  219. // }
  220. //
  221. // if (likely(!aclk_queue.aclk_query_head)) {
  222. // aclk_queue.aclk_query_head = new_query;
  223. // aclk_queue.aclk_query_tail = new_query;
  224. // aclk_queue.count++;
  225. // QUERY_UNLOCK;
  226. // return 0;
  227. // }
  228. // QUERY_UNLOCK;
  229. // return 0;
  230. }
  231. inline int aclk_submit_request(struct aclk_request *request)
  232. {
  233. return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0);
  234. }
  235. /*
  236. * Get the next query to process - NULL if nothing there
  237. * The caller needs to free memory by calling aclk_query_free()
  238. *
  239. * topic
  240. * query
  241. * The structure itself
  242. *
  243. */
  244. struct aclk_query *aclk_queue_pop()
  245. {
  246. struct aclk_query *this_query;
  247. QUERY_LOCK;
  248. if (likely(!aclk_queue.aclk_query_head)) {
  249. QUERY_UNLOCK;
  250. return NULL;
  251. }
  252. this_query = aclk_queue.aclk_query_head;
  253. if (this_query->run_after > now_realtime_sec()) {
  254. info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
  255. QUERY_UNLOCK;
  256. return NULL;
  257. }
  258. aclk_queue.count--;
  259. aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
  260. if (likely(!aclk_queue.aclk_query_head)) {
  261. aclk_queue.aclk_query_tail = NULL;
  262. }
  263. QUERY_UNLOCK;
  264. return this_query;
  265. }
  266. // This will give the base topic that the agent will publish messages.
  267. // subtopics will be sent under the base topic e.g. base_topic/subtopic
  268. // This is called by aclk_init(), to compute the base topic once and have
  269. // it stored internally.
  270. // Need to check if additional logic should be added to make sure that there
  271. // is enough information to determine the base topic at init time
  272. // TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch
  273. // that on the fly
  274. char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action)
  275. {
  276. static char *topic = NULL;
  277. if (unlikely(!is_agent_claimed()))
  278. return NULL;
  279. ACLK_LOCK;
  280. if (unlikely(action == PUBLICH_TOPIC_FREE)) {
  281. if (likely(topic)) {
  282. freez(topic);
  283. topic = NULL;
  284. }
  285. ACLK_UNLOCK;
  286. return NULL;
  287. }
  288. if (unlikely(action == PUBLICH_TOPIC_REBUILD)) {
  289. ACLK_UNLOCK;
  290. get_publish_base_topic(PUBLICH_TOPIC_FREE);
  291. return get_publish_base_topic(PUBLICH_TOPIC_GET);
  292. }
  293. if (unlikely(!topic)) {
  294. char tmp_topic[ACLK_MAX_TOPIC + 1];
  295. sprintf(tmp_topic, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
  296. topic = strdupz(tmp_topic);
  297. }
  298. ACLK_UNLOCK;
  299. return topic;
  300. }
  301. char *get_topic(char *sub_topic, char *final_topic, int max_size)
  302. {
  303. if (unlikely(!global_base_topic))
  304. global_base_topic = GET_PUBLISH_BASE_TOPIC;
  305. if (unlikely(!global_base_topic))
  306. return sub_topic;
  307. snprintfz(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
  308. return final_topic;
  309. }
  310. // Wait for ACLK connection to be established
  311. int aclk_wait_for_initialization()
  312. {
  313. if (unlikely(!aclk_connection_initialized)) {
  314. time_t now = now_realtime_sec();
  315. while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) {
  316. sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT);
  317. _link_event_loop(0);
  318. }
  319. if (unlikely(!aclk_connection_initialized)) {
  320. error("ACLK connection cannot be established");
  321. return 1;
  322. }
  323. }
  324. return 0;
  325. }
  326. /*
  327. * This function will fetch the next pending command and process it
  328. *
  329. */
  330. int aclk_process_query()
  331. {
  332. struct aclk_query *this_query;
  333. static u_int64_t query_count = 0;
  334. //int rc;
  335. if (unlikely(cmdpause))
  336. return 0;
  337. if (!aclk_connection_initialized)
  338. return 0;
  339. this_query = aclk_queue_pop();
  340. if (likely(!this_query)) {
  341. //info("No pending queries");
  342. return 0;
  343. }
  344. if (unlikely(this_query->deleted)) {
  345. info("Garbage collect query %s:%s", this_query->topic, this_query->query);
  346. aclk_query_free(this_query);
  347. return 1;
  348. }
  349. query_count++;
  350. info(
  351. "Query #%d (%s) (%s) in queue %d seconds", (int) query_count, this_query->topic, this_query->query,
  352. (int) (now_realtime_sec() - this_query->created));
  353. if (strncmp((char *)this_query->query, "/api/v1/", 8) == 0) {
  354. struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
  355. w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  356. strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
  357. w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
  358. w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
  359. w->acl = 0x1f;
  360. char *mysep = strchr(this_query->query, '?');
  361. if (mysep) {
  362. strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
  363. *mysep = '\0';
  364. } else
  365. strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
  366. mysep = strrchr(this_query->query, '/');
  367. // TODO: ignore return code for now
  368. web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
  369. //TODO: handle bad response perhaps in a different way. For now it does to the payload
  370. //if (rc == HTTP_RESP_OK || 1) {
  371. buffer_flush(aclk_buffer);
  372. aclk_create_metadata_message(aclk_buffer, mysep ? mysep + 1 : "noop", this_query->msg_id, w->response.data);
  373. aclk_buffer->contenttype = CT_APPLICATION_JSON;
  374. aclk_send_message(this_query->topic, aclk_buffer->buffer);
  375. //} else
  376. // error("Query RESP: %s", w->response.data->buffer);
  377. buffer_free(w->response.data);
  378. freez(w);
  379. aclk_query_free(this_query);
  380. return 1;
  381. }
  382. if (strcmp((char *)this_query->topic, "_chart") == 0) {
  383. aclk_send_single_chart(this_query->data, this_query->query);
  384. }
  385. aclk_query_free(this_query);
  386. return 1;
  387. }
  388. // Launch a query processing thread
  389. /*
  390. * Process all pending queries
  391. * Return 0 if no queries were processed, 1 otherwise
  392. *
  393. */
  394. int aclk_process_queries()
  395. {
  396. if (unlikely(cmdpause))
  397. return 0;
  398. // Return if no queries pending
  399. if (likely(!aclk_queue.count))
  400. return 0;
  401. info("Processing %d queries", (int ) aclk_queue.count);
  402. while (aclk_process_query()) {
  403. //rc = _link_event_loop(0);
  404. };
  405. return 1;
  406. }
  407. static void aclk_query_thread_cleanup(void *ptr)
  408. {
  409. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  410. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  411. info("cleaning up...");
  412. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  413. }
  414. /**
  415. * MAin query processing thread
  416. *
  417. */
  418. void *aclk_query_main_thread(void *ptr)
  419. {
  420. netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
  421. while (!netdata_exit) {
  422. QUERY_THREAD_LOCK;
  423. if (unlikely(!aclk_metadata_submitted)) {
  424. aclk_send_metadata();
  425. aclk_metadata_submitted = 1;
  426. }
  427. if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
  428. sleep_usec(USEC_PER_SEC * 1);
  429. if (likely(aclk_connection_initialized && !netdata_exit)) {
  430. while (aclk_process_queries()) {
  431. // Sleep for a few ms and retry maybe we have something to process
  432. // before going to sleep
  433. // TODO: This needs improvement to avoid missed queries
  434. sleep_usec(USEC_PER_MS * 100);
  435. }
  436. }
  437. QUERY_THREAD_UNLOCK;
  438. } // forever
  439. info("Shutting down query processing thread");
  440. netdata_thread_cleanup_pop(1);
  441. return NULL;
  442. }
  443. // Thread cleanup
  444. static void aclk_main_cleanup(void *ptr)
  445. {
  446. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  447. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  448. info("cleaning up...");
  449. QUERY_THREAD_WAKEUP;
  450. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  451. }
  452. /**
  453. * Main agent cloud link thread
  454. *
  455. * This thread will simply call the main event loop that handles
  456. * pending requests - both inbound and outbound
  457. *
  458. * @param ptr is a pointer to the netdata_static_thread structure.
  459. *
  460. * @return It always returns NULL
  461. */
  462. void *aclk_main(void *ptr)
  463. {
  464. //netdata_thread_t *query_thread;
  465. struct netdata_static_thread query_thread;
  466. memset(&query_thread, 0, sizeof(query_thread));
  467. netdata_thread_cleanup_push(aclk_main_cleanup, ptr);
  468. if (unlikely(!aclk_buffer))
  469. aclk_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  470. assert(aclk_buffer != NULL);
  471. //netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
  472. //netdata_thread_create(&query_thread.thread , "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
  473. info("Waiting for netdata to be ready");
  474. while (!netdata_ready) {
  475. sleep_usec(USEC_PER_MS * 300);
  476. }
  477. info("Waiting %d seconds for the agent to initialize", ACLK_STARTUP_WAIT);
  478. sleep_usec(USEC_PER_SEC * ACLK_STARTUP_WAIT);
  479. // Ok mark we are ready to accept incoming requests
  480. waiting_init = 0;
  481. while (!netdata_exit) {
  482. // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds
  483. // TODO: Handle the unclaim command as well -- we may need to shutdown the connection
  484. if (likely(!is_agent_claimed())) {
  485. sleep_usec(USEC_PER_SEC * 60);
  486. info("Checking agent claiming status");
  487. continue;
  488. }
  489. if (unlikely(!aclk_connection_initialized)) {
  490. static int initializing = 0;
  491. if (likely(initializing)) {
  492. _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
  493. continue;
  494. }
  495. initializing = 1;
  496. info("Initializing connection");
  497. //send_http_request(aclk_hostname, "443", "/auth/challenge?id=blah", aclk_buffer);
  498. if (unlikely(aclk_init(ACLK_INIT))) {
  499. // TODO: TBD how to handle. We are claimed and we cant init the connection. For now keep trying.
  500. sleep_usec(USEC_PER_SEC * 60);
  501. continue;
  502. } else {
  503. sleep_usec(USEC_PER_SEC * 1);
  504. }
  505. _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
  506. continue;
  507. }
  508. if (unlikely(!aclk_subscribed)) {
  509. aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2);
  510. }
  511. if (unlikely(!query_thread.thread)) {
  512. query_thread.thread = mallocz(sizeof(netdata_thread_t));
  513. netdata_thread_create(
  514. query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
  515. }
  516. //TODO: Check if there is a return code
  517. _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
  518. } // forever
  519. aclk_shutdown();
  520. netdata_thread_cleanup_pop(1);
  521. return NULL;
  522. }
  523. /*
  524. * Send a message to the cloud, using a base topic and sib_topic
  525. * The final topic will be in the form <base_topic>/<sub_topic>
  526. * If base_topic is missing then the global_base_topic will be used (if available)
  527. *
  528. */
  529. int aclk_send_message(char *sub_topic, char *message)
  530. {
  531. int rc;
  532. static int skip_due_to_shutdown = 0;
  533. char topic[ACLK_MAX_TOPIC + 1];
  534. char *final_topic;
  535. if (!aclk_connection_initialized)
  536. return 0;
  537. if (unlikely(netdata_exit)) {
  538. if (unlikely(!aclk_connection_initialized))
  539. return 1;
  540. ++skip_due_to_shutdown;
  541. if (unlikely(!(skip_due_to_shutdown % 100)))
  542. info("%d messages not sent -- shutdown in progress", skip_due_to_shutdown);
  543. return 1;
  544. }
  545. if (unlikely(!message))
  546. return 0;
  547. if (unlikely(aclk_wait_for_initialization()))
  548. return 1;
  549. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  550. ACLK_LOCK;
  551. rc = _link_send_message(final_topic, message);
  552. ACLK_UNLOCK;
  553. // TODO: Add better handling -- error will flood the logfile here
  554. if (unlikely(rc))
  555. error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
  556. return rc;
  557. }
  558. /*
  559. * Subscribe to a topic in the cloud
  560. * The final subscription will be in the form
  561. * /agent/claim_id/<sub_topic>
  562. */
  563. int aclk_subscribe(char *sub_topic, int qos)
  564. {
  565. int rc;
  566. //static char *global_base_topic = NULL;
  567. char topic[ACLK_MAX_TOPIC + 1];
  568. char *final_topic;
  569. if (!aclk_connection_initialized)
  570. return 0;
  571. if (unlikely(netdata_exit)) {
  572. return 1;
  573. }
  574. if (unlikely(aclk_wait_for_initialization()))
  575. return 1;
  576. final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
  577. ACLK_LOCK;
  578. rc = _link_subscribe(final_topic, qos);
  579. ACLK_UNLOCK;
  580. // TODO: Add better handling -- error will flood the logfile here
  581. if (unlikely(rc))
  582. error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
  583. return rc;
  584. }
  585. // This is called from a callback when the link goes up
  586. void aclk_connect(void *ptr)
  587. {
  588. (void) ptr;
  589. info("Connection detected");
  590. return;
  591. }
  592. // This is called from a callback when the link goes down
  593. void aclk_disconnect(void *ptr)
  594. {
  595. (void) ptr;
  596. info("Disconnect detected");
  597. aclk_subscribed = 0;
  598. aclk_metadata_submitted = 0;
  599. }
  600. void aclk_shutdown()
  601. {
  602. info("Shutdown initiated");
  603. aclk_connection_initialized = 0;
  604. _link_shutdown();
  605. info("Shutdown complete");
  606. }
  607. int aclk_init(ACLK_INIT_ACTION action)
  608. {
  609. (void) action;
  610. static int init = 0;
  611. int rc;
  612. if (likely(init))
  613. return 0;
  614. aclk_send_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link send maximum", 20);
  615. aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20);
  616. aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost");
  617. aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 1883);
  618. info("Maximum parallel outgoing messages %d", aclk_send_maximum);
  619. info("Maximum parallel incoming messages %d", aclk_recv_maximum);
  620. // This will setup the base publish topic internally
  621. //get_publish_base_topic(PUBLICH_TOPIC_GET);
  622. // initialize the low level link to the cloud
  623. rc = _link_lib_init(aclk_hostname, aclk_port, aclk_connect, aclk_disconnect);
  624. if (unlikely(rc)) {
  625. error("Failed to initialize the agent cloud link library");
  626. return 1;
  627. }
  628. global_base_topic = GET_PUBLISH_BASE_TOPIC;
  629. init = 1;
  630. return 0;
  631. }
  632. // Use this to disable encoding of quotes and newlines so that
  633. // MQTT subscriber can display more readable data on screen
  634. void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
  635. {
  636. uuid_t uuid;
  637. char uuid_str[36 + 1];
  638. if (unlikely(!msg_id)) {
  639. uuid_generate(uuid);
  640. uuid_unparse(uuid, uuid_str);
  641. msg_id = uuid_str;
  642. }
  643. buffer_sprintf(
  644. dest,
  645. "\t{\"type\": \"%s\",\n"
  646. "\t\"msg-id\": \"%s\",\n"
  647. "\t\"version\": %s,\n"
  648. "\t\"payload\": ",
  649. type, msg_id, ACLK_VERSION);
  650. }
  651. #define EYE_FRIENDLY 1
  652. // encapsulate contents into metadata message as per ACLK documentation
  653. void aclk_create_metadata_message(BUFFER *dest, char *type, char *msg_id, BUFFER *contents)
  654. {
  655. #ifndef EYE_FRIENDLY
  656. char *tmp_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  657. char *src, *dst;
  658. #endif
  659. buffer_sprintf(
  660. dest,
  661. "\t{\"type\": \"%s\",\n"
  662. "\t\"msg-id\": \"%s\",\n"
  663. "\t\"payload\": %s\n\t}",
  664. type, msg_id ? msg_id : "", contents->buffer);
  665. #ifndef EYE_FRIENDLY
  666. //TODO: this is the initial escaping, It will expanded
  667. src = dest->buffer;
  668. dst = tmp_buffer;
  669. while (*src) {
  670. switch (*src) {
  671. case '0x0a':
  672. case '\n':
  673. *dst++ = '\\';
  674. *dst++ = 'n';
  675. break;
  676. case '\"':
  677. *dst++ = '\\';
  678. *dst++ = '\"';
  679. break;
  680. case '\'':
  681. *dst++ = '\\';
  682. *dst++ = '\"';
  683. break;
  684. default:
  685. *dst++ = *src;
  686. }
  687. src++;
  688. }
  689. *dst = '\0';
  690. buffer_flush(dest);
  691. buffer_sprintf(dest, "%s", tmp_buffer);
  692. freez(tmp_buffer);
  693. #endif
  694. return;
  695. }
  696. //TODO: this has been changed in the latest specs. We need to pack the data in one MQTT
  697. //message with a payload and has a list of json objects
  698. int aclk_send_alarm_metadata()
  699. {
  700. //TODO: improve locking on the buffer -- same lock is used for the message send
  701. //improve error handling
  702. ACLK_LOCK;
  703. buffer_flush(aclk_buffer);
  704. // Alarms configuration
  705. aclk_create_header(aclk_buffer, "alarms", NULL);
  706. health_alarms2json(localhost, aclk_buffer, 1);
  707. buffer_sprintf(aclk_buffer,"\n}");
  708. ACLK_UNLOCK;
  709. aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer);
  710. // Alarms log
  711. ACLK_LOCK;
  712. buffer_flush(aclk_buffer);
  713. aclk_create_header(aclk_buffer, "alarms_log", NULL);
  714. health_alarm_log2json(localhost, aclk_buffer, 0);
  715. buffer_sprintf(aclk_buffer,"\n}");
  716. ACLK_UNLOCK;
  717. aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer);
  718. return 0;
  719. }
  720. // Send info metadata message to the cloud if the link is established
  721. // or on request
  722. int aclk_send_metadata()
  723. {
  724. ACLK_LOCK;
  725. buffer_flush(aclk_buffer);
  726. aclk_create_header(aclk_buffer, "connect", NULL);
  727. buffer_sprintf(aclk_buffer,"{\n\t \"info\" : ");
  728. web_client_api_request_v1_info_fill_buffer(localhost, aclk_buffer);
  729. buffer_sprintf(aclk_buffer,", \n\t \"charts\" : ");
  730. charts2json(localhost, aclk_buffer);
  731. buffer_sprintf(aclk_buffer,"\n}\n}");
  732. aclk_buffer->contenttype = CT_APPLICATION_JSON;
  733. ACLK_UNLOCK;
  734. aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer);
  735. aclk_send_alarm_metadata();
  736. return 0;
  737. }
  738. //rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
  739. int aclk_send_single_chart(char *hostname, char *chart)
  740. {
  741. RRDHOST *target_host;
  742. ACLK_LOCK;
  743. buffer_flush(aclk_buffer);
  744. target_host = rrdhost_find_by_hostname(hostname, 0);
  745. if (!target_host)
  746. return 1;
  747. RRDSET *st = rrdset_find(target_host, chart);
  748. if (!st)
  749. st = rrdset_find_byname(target_host, chart);
  750. if (!st) {
  751. info("FAILED to find chart %s", chart);
  752. return 1;
  753. }
  754. aclk_buffer->contenttype = CT_APPLICATION_JSON;
  755. buffer_flush(aclk_buffer);
  756. aclk_create_header(aclk_buffer, "chart", NULL);
  757. rrdset2json(st, aclk_buffer, NULL, NULL);
  758. buffer_sprintf(aclk_buffer,"\n}\n}");
  759. ACLK_UNLOCK;
  760. aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer);
  761. return 0;
  762. }
  763. int aclk_update_chart(RRDHOST *host, char *chart_name)
  764. {
  765. (void) host;
  766. (void) chart_name;
  767. #ifndef ENABLE_ACLK
  768. return 0;
  769. #else
  770. if (host != localhost)
  771. return 0;
  772. aclk_queue_query("_chart", host->hostname, NULL, chart_name, 2, 1);
  773. return 0;
  774. #endif
  775. }
  776. int aclk_update_alarm(RRDHOST *host, char *alarm_name)
  777. {
  778. if (host != localhost)
  779. return 0;
  780. aclk_queue_query("_alarm", host->hostname, NULL, alarm_name, 2, 1);
  781. return 0;
  782. }
  783. //TODO: add and check the incoming type e.g http
  784. int aclk_handle_cloud_request(char *payload)
  785. {
  786. struct aclk_request cloud_to_agent = { .msg_id = NULL, .topic = NULL, .url = NULL, .version = 1};
  787. int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
  788. if (unlikely(JSON_OK != rc)) {
  789. error("Malformed json request (%s)", payload);
  790. return 1;
  791. }
  792. if (unlikely(!cloud_to_agent.url || !cloud_to_agent.topic)) {
  793. return 1;
  794. }
  795. aclk_submit_request(&cloud_to_agent);
  796. return 0;
  797. }