aclk_query.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843
  1. #include "aclk_common.h"
  2. #include "aclk_query.h"
  3. #include "aclk_stats.h"
  4. #include "aclk_rx_msgs.h"
  5. #define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
  6. pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
  7. pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
  8. #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
  9. #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
  10. volatile int aclk_connected = 0;
  11. #ifndef __GNUC__
  12. #pragma region ACLK_QUEUE
  13. #endif
  14. static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER;
  15. #define ACLK_QUEUE_LOCK netdata_mutex_lock(&queue_mutex)
  16. #define ACLK_QUEUE_UNLOCK netdata_mutex_unlock(&queue_mutex)
  17. struct aclk_query {
  18. usec_t created;
  19. struct timeval tv_in;
  20. usec_t created_boot_time;
  21. time_t run_after; // Delay run until after this time
  22. ACLK_CMD cmd; // What command is this
  23. char *topic; // Topic to respond to
  24. char *data; // Internal data (NULL if request from the cloud)
  25. char *msg_id; // msg_id generated by the cloud (NULL if internal)
  26. char *query; // The actual query
  27. u_char deleted; // Mark deleted for garbage collect
  28. int idx; // index of query thread
  29. struct aclk_query *next;
  30. };
  31. struct aclk_query_queue {
  32. struct aclk_query *aclk_query_head;
  33. struct aclk_query *aclk_query_tail;
  34. unsigned int count;
  35. } aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
  36. unsigned int aclk_query_size()
  37. {
  38. int r;
  39. ACLK_QUEUE_LOCK;
  40. r = aclk_queue.count;
  41. ACLK_QUEUE_UNLOCK;
  42. return r;
  43. }
  44. /*
  45. * Free a query structure when done
  46. */
  47. static void aclk_query_free(struct aclk_query *this_query)
  48. {
  49. if (unlikely(!this_query))
  50. return;
  51. freez(this_query->topic);
  52. if (likely(this_query->query))
  53. freez(this_query->query);
  54. if(this_query->data && this_query->cmd == ACLK_CMD_CLOUD_QUERY_2) {
  55. struct aclk_cloud_req_v2 *del = (struct aclk_cloud_req_v2 *)this_query->data;
  56. freez(del->query_endpoint);
  57. freez(del->data);
  58. freez(del);
  59. }
  60. if (likely(this_query->msg_id))
  61. freez(this_query->msg_id);
  62. freez(this_query);
  63. }
  64. /*
  65. * Get the next query to process - NULL if nothing there
  66. * The caller needs to free memory by calling aclk_query_free()
  67. *
  68. * topic
  69. * query
  70. * The structure itself
  71. *
  72. */
  73. static struct aclk_query *aclk_queue_pop()
  74. {
  75. struct aclk_query *this_query;
  76. ACLK_QUEUE_LOCK;
  77. if (likely(!aclk_queue.aclk_query_head)) {
  78. ACLK_QUEUE_UNLOCK;
  79. return NULL;
  80. }
  81. this_query = aclk_queue.aclk_query_head;
  82. // Get rid of the deleted entries
  83. while (this_query && this_query->deleted) {
  84. aclk_queue.count--;
  85. aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
  86. if (likely(!aclk_queue.aclk_query_head)) {
  87. aclk_queue.aclk_query_tail = NULL;
  88. }
  89. aclk_query_free(this_query);
  90. this_query = aclk_queue.aclk_query_head;
  91. }
  92. if (likely(!this_query)) {
  93. ACLK_QUEUE_UNLOCK;
  94. return NULL;
  95. }
  96. if (!this_query->deleted && this_query->run_after > now_realtime_sec()) {
  97. info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
  98. ACLK_QUEUE_UNLOCK;
  99. return NULL;
  100. }
  101. aclk_queue.count--;
  102. aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
  103. if (likely(!aclk_queue.aclk_query_head)) {
  104. aclk_queue.aclk_query_tail = NULL;
  105. }
  106. ACLK_QUEUE_UNLOCK;
  107. return this_query;
  108. }
  109. // Returns the entry after which we need to create a new entry to run at the specified time
  110. // If NULL is returned we need to add to HEAD
  111. // Need to have a QUERY lock before calling this
  112. static struct aclk_query *aclk_query_find_position(time_t time_to_run)
  113. {
  114. struct aclk_query *tmp_query, *last_query;
  115. // Quick check if we will add to the end
  116. if (likely(aclk_queue.aclk_query_tail)) {
  117. if (aclk_queue.aclk_query_tail->run_after <= time_to_run)
  118. return aclk_queue.aclk_query_tail;
  119. }
  120. last_query = NULL;
  121. tmp_query = aclk_queue.aclk_query_head;
  122. while (tmp_query) {
  123. if (tmp_query->run_after > time_to_run)
  124. return last_query;
  125. last_query = tmp_query;
  126. tmp_query = tmp_query->next;
  127. }
  128. return last_query;
  129. }
  130. // Need to have a QUERY lock before calling this
  131. static struct aclk_query *
  132. aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query)
  133. {
  134. struct aclk_query *tmp_query, *prev_query;
  135. UNUSED(cmd);
  136. tmp_query = aclk_queue.aclk_query_head;
  137. prev_query = NULL;
  138. while (tmp_query) {
  139. if (likely(!tmp_query->deleted)) {
  140. if (strcmp(tmp_query->topic, topic) == 0 && (!query || strcmp(tmp_query->query, query) == 0)) {
  141. if ((!data || data == tmp_query->data) &&
  142. (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0))) {
  143. if (likely(last_query))
  144. *last_query = prev_query;
  145. return tmp_query;
  146. }
  147. }
  148. }
  149. prev_query = tmp_query;
  150. tmp_query = tmp_query->next;
  151. }
  152. return NULL;
  153. }
  154. /*
  155. * Add a query to execute, the result will be send to the specified topic
  156. */
  157. int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
  158. {
  159. struct aclk_query *new_query, *tmp_query;
  160. // Ignore all commands while we wait for the agent to initialize
  161. if (unlikely(!aclk_connected))
  162. return 1;
  163. run_after = now_realtime_sec() + run_after;
  164. ACLK_QUEUE_LOCK;
  165. struct aclk_query *last_query = NULL;
  166. tmp_query = aclk_query_find(topic, data, msg_id, query, aclk_cmd, &last_query);
  167. if (unlikely(tmp_query)) {
  168. if (tmp_query->run_after == run_after) {
  169. ACLK_QUEUE_UNLOCK;
  170. QUERY_THREAD_WAKEUP;
  171. return 0;
  172. }
  173. if (last_query)
  174. last_query->next = tmp_query->next;
  175. else
  176. aclk_queue.aclk_query_head = tmp_query->next;
  177. debug(D_ACLK, "Removing double entry");
  178. aclk_query_free(tmp_query);
  179. aclk_queue.count--;
  180. }
  181. if (aclk_stats_enabled) {
  182. ACLK_STATS_LOCK;
  183. aclk_metrics_per_sample.queries_queued++;
  184. ACLK_STATS_UNLOCK;
  185. }
  186. new_query = callocz(1, sizeof(struct aclk_query));
  187. new_query->cmd = aclk_cmd;
  188. if (internal) {
  189. new_query->topic = strdupz(topic);
  190. if (likely(query))
  191. new_query->query = strdupz(query);
  192. } else {
  193. new_query->topic = topic;
  194. new_query->query = query;
  195. new_query->msg_id = msg_id;
  196. }
  197. new_query->data = data;
  198. new_query->next = NULL;
  199. now_realtime_timeval(&new_query->tv_in);
  200. new_query->created = (new_query->tv_in.tv_sec * USEC_PER_SEC) + new_query->tv_in.tv_usec;
  201. new_query->created_boot_time = now_boottime_usec();
  202. new_query->run_after = run_after;
  203. debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
  204. tmp_query = aclk_query_find_position(run_after);
  205. if (tmp_query) {
  206. new_query->next = tmp_query->next;
  207. tmp_query->next = new_query;
  208. if (tmp_query == aclk_queue.aclk_query_tail)
  209. aclk_queue.aclk_query_tail = new_query;
  210. aclk_queue.count++;
  211. ACLK_QUEUE_UNLOCK;
  212. QUERY_THREAD_WAKEUP;
  213. return 0;
  214. }
  215. new_query->next = aclk_queue.aclk_query_head;
  216. aclk_queue.aclk_query_head = new_query;
  217. aclk_queue.count++;
  218. ACLK_QUEUE_UNLOCK;
  219. QUERY_THREAD_WAKEUP;
  220. return 0;
  221. }
  222. #ifndef __GNUC__
  223. #pragma endregion
  224. #endif
  225. #ifndef __GNUC__
  226. #pragma region Helper Functions
  227. #endif
  228. /*
  229. * Take a buffer, encode it and rewrite it
  230. *
  231. */
  232. static char *aclk_encode_response(char *src, size_t content_size, int keep_newlines)
  233. {
  234. char *tmp_buffer = mallocz(content_size * 2);
  235. char *dst = tmp_buffer;
  236. while (content_size > 0) {
  237. switch (*src) {
  238. case '\n':
  239. if (keep_newlines)
  240. {
  241. *dst++ = '\\';
  242. *dst++ = 'n';
  243. }
  244. break;
  245. case '\t':
  246. break;
  247. case 0x01 ... 0x08:
  248. case 0x0b ... 0x1F:
  249. *dst++ = '\\';
  250. *dst++ = 'u';
  251. *dst++ = '0';
  252. *dst++ = '0';
  253. *dst++ = (*src < 0x0F) ? '0' : '1';
  254. *dst++ = to_hex(*src);
  255. break;
  256. case '\"':
  257. *dst++ = '\\';
  258. *dst++ = *src;
  259. break;
  260. default:
  261. *dst++ = *src;
  262. }
  263. src++;
  264. content_size--;
  265. }
  266. *dst = '\0';
  267. return tmp_buffer;
  268. }
  269. #ifndef __GNUC__
  270. #pragma endregion
  271. #endif
  272. #ifndef __GNUC__
  273. #pragma region ACLK_QUERY
  274. #endif
  275. static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created)
  276. {
  277. usec_t t = now_boottime_usec();
  278. aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created);
  279. w->response.code = web_client_api_request_v1(host, w, url);
  280. t = now_boottime_usec() - t;
  281. aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t);
  282. return t;
  283. }
  284. static int aclk_execute_query(struct aclk_query *this_query)
  285. {
  286. if (strncmp(this_query->query, "/api/v1/", 8) == 0) {
  287. struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
  288. w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  289. w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  290. w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  291. strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
  292. w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
  293. w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
  294. w->acl = 0x1f;
  295. char *mysep = strchr(this_query->query, '?');
  296. if (mysep) {
  297. strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
  298. *mysep = '\0';
  299. } else
  300. strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
  301. mysep = strrchr(this_query->query, '/');
  302. // TODO: handle bad response perhaps in a different way. For now it does to the payload
  303. w->tv_in = this_query->tv_in;
  304. now_realtime_timeval(&w->tv_ready);
  305. aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
  306. size_t size = w->response.data->len;
  307. size_t sent = size;
  308. w->response.data->date = w->tv_ready.tv_sec;
  309. web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
  310. BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  311. buffer_flush(local_buffer);
  312. local_buffer->contenttype = CT_APPLICATION_JSON;
  313. aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
  314. buffer_strcat(local_buffer, ",\n\t\"payload\": ");
  315. char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
  316. char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
  317. buffer_sprintf(
  318. local_buffer, "{\n\"code\": %d,\n\"body\": \"%s\",\n\"headers\": \"%s\"\n}",
  319. w->response.code, encoded_response, encoded_header);
  320. buffer_sprintf(local_buffer, "\n}");
  321. debug(D_ACLK, "Response:%s", encoded_header);
  322. aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
  323. struct timeval tv;
  324. now_realtime_timeval(&tv);
  325. log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
  326. w->id
  327. , gettid()
  328. , this_query->idx
  329. , "DATA"
  330. , sent
  331. , size
  332. , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0)
  333. , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
  334. , dt_usec(&tv, &w->tv_ready) / 1000.0
  335. , dt_usec(&tv, &w->tv_in) / 1000.0
  336. , w->response.code
  337. , strip_control_characters(this_query->query)
  338. );
  339. buffer_free(w->response.data);
  340. buffer_free(w->response.header);
  341. buffer_free(w->response.header_output);
  342. freez(w);
  343. buffer_free(local_buffer);
  344. freez(encoded_response);
  345. freez(encoded_header);
  346. return 0;
  347. }
  348. return 1;
  349. }
  350. static int aclk_execute_query_v2(struct aclk_query *this_query)
  351. {
  352. int retval = 0;
  353. usec_t t;
  354. BUFFER *local_buffer = NULL;
  355. struct aclk_cloud_req_v2 *cloud_req = (struct aclk_cloud_req_v2 *)this_query->data;
  356. #ifdef NETDATA_WITH_ZLIB
  357. int z_ret;
  358. BUFFER *z_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  359. char *start, *end;
  360. #endif
  361. struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
  362. w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  363. w->response.header = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  364. w->response.header_output = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);
  365. strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
  366. w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
  367. w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
  368. w->acl = 0x1f;
  369. char *mysep = strchr(this_query->query, '?');
  370. if (mysep) {
  371. url_decode_r(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE + 1);
  372. *mysep = '\0';
  373. } else
  374. url_decode_r(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE + 1);
  375. mysep = strrchr(this_query->query, '/');
  376. // execute the query
  377. w->tv_in = this_query->tv_in;
  378. now_realtime_timeval(&w->tv_ready);
  379. t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
  380. size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
  381. size_t sent = size;
  382. #ifdef NETDATA_WITH_ZLIB
  383. // check if gzip encoding can and should be used
  384. if ((start = strstr(cloud_req->data, WEB_HDR_ACCEPT_ENC))) {
  385. start += strlen(WEB_HDR_ACCEPT_ENC);
  386. end = strstr(start, "\x0D\x0A");
  387. start = strstr(start, "gzip");
  388. if (start && start < end) {
  389. w->response.zstream.zalloc = Z_NULL;
  390. w->response.zstream.zfree = Z_NULL;
  391. w->response.zstream.opaque = Z_NULL;
  392. if(deflateInit2(&w->response.zstream, web_gzip_level, Z_DEFLATED, 15 + 16, 8, web_gzip_strategy) == Z_OK) {
  393. w->response.zinitialized = 1;
  394. w->response.zoutput = 1;
  395. } else
  396. error("Failed to initialize zlib. Proceeding without compression.");
  397. }
  398. }
  399. if (w->response.data->len && w->response.zinitialized) {
  400. w->response.zstream.next_in = (Bytef *)w->response.data->buffer;
  401. w->response.zstream.avail_in = w->response.data->len;
  402. do {
  403. w->response.zstream.avail_out = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE;
  404. w->response.zstream.next_out = w->response.zbuffer;
  405. z_ret = deflate(&w->response.zstream, Z_FINISH);
  406. if(z_ret < 0) {
  407. if(w->response.zstream.msg)
  408. error("Error compressing body. ZLIB error: \"%s\"", w->response.zstream.msg);
  409. else
  410. error("Unknown error during zlib compression.");
  411. retval = 1;
  412. goto cleanup;
  413. }
  414. int bytes_to_cpy = NETDATA_WEB_RESPONSE_ZLIB_CHUNK_SIZE - w->response.zstream.avail_out;
  415. buffer_need_bytes(z_buffer, bytes_to_cpy);
  416. memcpy(&z_buffer->buffer[z_buffer->len], w->response.zbuffer, bytes_to_cpy);
  417. z_buffer->len += bytes_to_cpy;
  418. } while(z_ret != Z_STREAM_END);
  419. // so that web_client_build_http_header
  420. // puts correct content lenght into header
  421. buffer_free(w->response.data);
  422. w->response.data = z_buffer;
  423. z_buffer = NULL;
  424. }
  425. #endif
  426. w->response.data->date = w->tv_ready.tv_sec;
  427. web_client_build_http_header(w);
  428. local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
  429. local_buffer->contenttype = CT_APPLICATION_JSON;
  430. aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
  431. buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code);
  432. buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A");
  433. buffer_strcat(local_buffer, w->response.header_output->buffer);
  434. if (w->response.data->len) {
  435. #ifdef NETDATA_WITH_ZLIB
  436. if (w->response.zinitialized) {
  437. buffer_need_bytes(local_buffer, w->response.data->len);
  438. memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len);
  439. local_buffer->len += w->response.data->len;
  440. sent = sent - size + w->response.data->len;
  441. } else {
  442. #endif
  443. buffer_strcat(local_buffer, w->response.data->buffer);
  444. #ifdef NETDATA_WITH_ZLIB
  445. }
  446. #endif
  447. }
  448. aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id);
  449. struct timeval tv;
  450. now_realtime_timeval(&tv);
  451. log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
  452. w->id
  453. , gettid()
  454. , this_query->idx
  455. , "DATA"
  456. , sent
  457. , size
  458. , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0)
  459. , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
  460. , dt_usec(&tv, &w->tv_ready) / 1000.0
  461. , dt_usec(&tv, &w->tv_in) / 1000.0
  462. , w->response.code
  463. , strip_control_characters(this_query->query)
  464. );
  465. cleanup:
  466. #ifdef NETDATA_WITH_ZLIB
  467. if(w->response.zinitialized)
  468. deflateEnd(&w->response.zstream);
  469. buffer_free(z_buffer);
  470. #endif
  471. buffer_free(w->response.data);
  472. buffer_free(w->response.header);
  473. buffer_free(w->response.header_output);
  474. freez(w);
  475. buffer_free(local_buffer);
  476. return retval;
  477. }
  478. #define ACLK_HOST_PTR_COMPULSORY(x) \
  479. if (unlikely(!host)) { \
  480. errno = 0; \
  481. error(x " needs host pointer"); \
  482. break; \
  483. }
  484. /*
  485. * This function will fetch the next pending command and process it
  486. *
  487. */
  488. static int aclk_process_query(struct aclk_query_thread *t_info)
  489. {
  490. struct aclk_query *this_query;
  491. static long int query_count = 0;
  492. ACLK_METADATA_STATE meta_state;
  493. RRDHOST *host;
  494. if (!aclk_connected)
  495. return 0;
  496. this_query = aclk_queue_pop();
  497. if (likely(!this_query)) {
  498. return 0;
  499. }
  500. if (unlikely(this_query->deleted)) {
  501. debug(D_ACLK, "Garbage collect query %s:%s", this_query->topic, this_query->query);
  502. aclk_query_free(this_query);
  503. return 1;
  504. }
  505. query_count++;
  506. host = (RRDHOST*)this_query->data;
  507. this_query->idx = t_info->idx;
  508. debug(
  509. D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic,
  510. this_query->query ? strlen(this_query->query) : 0, (now_realtime_usec() - this_query->created)/USEC_PER_MS);
  511. switch (this_query->cmd) {
  512. case ACLK_CMD_ONCONNECT:
  513. ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT");
  514. #if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
  515. if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
  516. error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE);
  517. break;
  518. }
  519. #else
  520. #warning "This check became unnecessary. Remove"
  521. #endif
  522. debug(D_ACLK, "EXECUTING on connect metadata command for host \"%s\" GUID \"%s\"",
  523. host->hostname,
  524. host->machine_guid);
  525. rrdhost_aclk_state_lock(host);
  526. meta_state = host->aclk_state.metadata;
  527. host->aclk_state.metadata = ACLK_METADATA_SENT;
  528. rrdhost_aclk_state_unlock(host);
  529. aclk_send_metadata(meta_state, host);
  530. break;
  531. case ACLK_CMD_CHART:
  532. ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHART");
  533. debug(D_ACLK, "EXECUTING a chart update command");
  534. aclk_send_single_chart(host, this_query->query);
  535. break;
  536. case ACLK_CMD_CHARTDEL:
  537. ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHARTDEL");
  538. debug(D_ACLK, "EXECUTING a chart delete command");
  539. //TODO: This send the info metadata for now
  540. aclk_send_info_metadata(ACLK_METADATA_SENT, host);
  541. break;
  542. case ACLK_CMD_ALARM:
  543. debug(D_ACLK, "EXECUTING an alarm update command");
  544. aclk_send_message(this_query->topic, this_query->query, this_query->msg_id);
  545. break;
  546. case ACLK_CMD_CLOUD:
  547. debug(D_ACLK, "EXECUTING a cloud command");
  548. aclk_execute_query(this_query);
  549. break;
  550. case ACLK_CMD_CLOUD_QUERY_2:
  551. debug(D_ACLK, "EXECUTING Cloud Query v2");
  552. aclk_execute_query_v2(this_query);
  553. break;
  554. case ACLK_CMD_CHILD_CONNECT:
  555. case ACLK_CMD_CHILD_DISCONNECT:
  556. ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_CHILD_CONNECT/ACLK_CMD_CHILD_DISCONNECT");
  557. debug(
  558. D_ACLK, "Execution Child %s command",
  559. this_query->cmd == ACLK_CMD_CHILD_CONNECT ? "connect" : "disconnect");
  560. aclk_send_info_child_connection(host, this_query->cmd);
  561. break;
  562. default:
  563. errno = 0;
  564. error("Unknown ACLK Query Command");
  565. break;
  566. }
  567. debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
  568. if (aclk_stats_enabled) {
  569. ACLK_STATS_LOCK;
  570. aclk_metrics_per_sample.queries_dispatched++;
  571. aclk_queries_per_thread[t_info->idx]++;
  572. ACLK_STATS_UNLOCK;
  573. if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) {
  574. getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]);
  575. getrusage_called_this_tick[t_info->idx]++;
  576. }
  577. }
  578. aclk_query_free(this_query);
  579. return 1;
  580. }
  581. void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
  582. {
  583. if (query_threads && query_threads->thread_list) {
  584. for (int i = 0; i < query_threads->count; i++) {
  585. netdata_thread_join(query_threads->thread_list[i].thread, NULL);
  586. }
  587. freez(query_threads->thread_list);
  588. }
  589. struct aclk_query *this_query;
  590. do {
  591. this_query = aclk_queue_pop();
  592. aclk_query_free(this_query);
  593. } while (this_query);
  594. }
  595. #define TASK_LEN_MAX 16
  596. void aclk_query_threads_start(struct aclk_query_threads *query_threads)
  597. {
  598. info("Starting %d query threads.", query_threads->count);
  599. char thread_name[TASK_LEN_MAX];
  600. query_threads->thread_list = callocz(query_threads->count, sizeof(struct aclk_query_thread));
  601. for (int i = 0; i < query_threads->count; i++) {
  602. query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
  603. if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i) < 0))
  604. error("snprintf encoding error");
  605. netdata_thread_create(
  606. &query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
  607. &query_threads->thread_list[i]);
  608. }
  609. }
  610. /**
  611. * Checks and updates popcorning state of rrdhost
  612. * returns actual/updated popcorning state
  613. */
  614. ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host)
  615. {
  616. rrdhost_aclk_state_lock(host);
  617. ACLK_POPCORNING_STATE ret = host->aclk_state.state;
  618. if (host->aclk_state.state != ACLK_HOST_INITIALIZING){
  619. rrdhost_aclk_state_unlock(host);
  620. return ret;
  621. }
  622. if (!host->aclk_state.t_last_popcorn_update){
  623. rrdhost_aclk_state_unlock(host);
  624. return ret;
  625. }
  626. time_t t_diff = now_monotonic_sec() - host->aclk_state.t_last_popcorn_update;
  627. if (t_diff >= ACLK_STABLE_TIMEOUT) {
  628. host->aclk_state.state = ACLK_HOST_STABLE;
  629. host->aclk_state.t_last_popcorn_update = 0;
  630. rrdhost_aclk_state_unlock(host);
  631. info("Host \"%s\" stable, ACLK popcorning finished. Last interrupt was %ld seconds ago", host->hostname, t_diff);
  632. return ACLK_HOST_STABLE;
  633. }
  634. rrdhost_aclk_state_unlock(host);
  635. return ret;
  636. }
  637. /**
  638. * Main query processing thread
  639. *
  640. * On startup wait for the agent collectors to initialize
  641. * Expect at least a time of ACLK_STABLE_TIMEOUT seconds
  642. * of no new collectors coming in in order to mark the agent
  643. * as stable (set agent_state = AGENT_STABLE)
  644. */
  645. void *aclk_query_main_thread(void *ptr)
  646. {
  647. struct aclk_query_thread *info = ptr;
  648. while (!netdata_exit) {
  649. if(aclk_host_popcorn_check(localhost) == ACLK_HOST_STABLE) {
  650. #ifdef ACLK_DEBUG
  651. _dump_collector_list();
  652. #endif
  653. break;
  654. }
  655. sleep_usec(USEC_PER_SEC * 1);
  656. }
  657. while (!netdata_exit) {
  658. if(aclk_disable_runtime) {
  659. sleep(1);
  660. continue;
  661. }
  662. ACLK_SHARED_STATE_LOCK;
  663. if (unlikely(!aclk_shared_state.version_neg)) {
  664. if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
  665. ACLK_SHARED_STATE_UNLOCK;
  666. info("Waiting for ACLK Version Negotiation message from Cloud");
  667. sleep(1);
  668. continue;
  669. }
  670. errno = 0;
  671. error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds."
  672. " Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
  673. aclk_shared_state.version_neg = ACLK_VERSION_MIN;
  674. aclk_set_rx_handlers(aclk_shared_state.version_neg);
  675. }
  676. ACLK_SHARED_STATE_UNLOCK;
  677. rrdhost_aclk_state_lock(localhost);
  678. if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) {
  679. if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
  680. rrdhost_aclk_state_unlock(localhost);
  681. errno = 0;
  682. error("ACLK failed to queue on_connect command");
  683. sleep(1);
  684. continue;
  685. }
  686. localhost->aclk_state.metadata = ACLK_METADATA_CMD_QUEUED;
  687. }
  688. rrdhost_aclk_state_unlock(localhost);
  689. ACLK_SHARED_STATE_LOCK;
  690. if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
  691. aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
  692. aclk_shared_state.next_popcorn_host = NULL;
  693. aclk_update_next_child_to_popcorn();
  694. }
  695. ACLK_SHARED_STATE_UNLOCK;
  696. while (aclk_process_query(info)) {
  697. // Process all commands
  698. };
  699. QUERY_THREAD_LOCK;
  700. // TODO: Need to check if there are queries awaiting already
  701. if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
  702. sleep_usec(USEC_PER_SEC * 1);
  703. QUERY_THREAD_UNLOCK;
  704. }
  705. return NULL;
  706. }
  707. #ifndef __GNUC__
  708. #pragma endregion
  709. #endif