functions.c 32 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. /** @file functions.c
  3. *
  4. * @brief This is the file containing the implementation of the
  5. * logs management functions API.
  6. */
  7. #include "functions.h"
  8. #include "helper.h"
  9. #include "query.h"
  10. #define LOGS_MANAG_MAX_PARAMS 100
  11. #define LOGS_MANAGEMENT_DEFAULT_QUERY_DURATION_IN_SEC 3600
  12. #define LOGS_MANAGEMENT_DEFAULT_ITEMS_PER_QUERY 200
  13. #define LOGS_MANAG_FUNC_PARAM_HELP "help"
  14. #define LOGS_MANAG_FUNC_PARAM_ANCHOR "anchor"
  15. #define LOGS_MANAG_FUNC_PARAM_LAST "last"
  16. #define LOGS_MANAG_FUNC_PARAM_QUERY "query"
  17. #define LOGS_MANAG_FUNC_PARAM_FACETS "facets"
  18. #define LOGS_MANAG_FUNC_PARAM_HISTOGRAM "histogram"
  19. #define LOGS_MANAG_FUNC_PARAM_DIRECTION "direction"
  20. #define LOGS_MANAG_FUNC_PARAM_IF_MODIFIED_SINCE "if_modified_since"
  21. #define LOGS_MANAG_FUNC_PARAM_DATA_ONLY "data_only"
  22. #define LOGS_MANAG_FUNC_PARAM_SOURCE "source"
  23. #define LOGS_MANAG_FUNC_PARAM_INFO "info"
  24. #define LOGS_MANAG_FUNC_PARAM_ID "id"
  25. #define LOGS_MANAG_FUNC_PARAM_PROGRESS "progress"
  26. #define LOGS_MANAG_FUNC_PARAM_SLICE "slice"
  27. #define LOGS_MANAG_FUNC_PARAM_DELTA "delta"
  28. #define LOGS_MANAG_FUNC_PARAM_TAIL "tail"
  29. #define LOGS_MANAG_DEFAULT_DIRECTION FACETS_ANCHOR_DIRECTION_BACKWARD
  30. #define FACET_MAX_VALUE_LENGTH 8192
  31. #define FUNCTION_LOGSMANAGEMENT_HELP_LONG \
  32. LOGS_MANAGEMENT_PLUGIN_STR " / " LOGS_MANAG_FUNC_NAME"\n" \
  33. "\n" \
  34. FUNCTION_LOGSMANAGEMENT_HELP_SHORT"\n" \
  35. "\n" \
  36. "The following parameters are supported::\n" \
  37. "\n" \
  38. " "LOGS_MANAG_FUNC_PARAM_HELP"\n" \
  39. " Shows this help message\n" \
  40. "\n" \
  41. " "LOGS_MANAG_FUNC_PARAM_INFO"\n" \
  42. " Request initial configuration information about the plugin.\n" \
  43. " The key entity returned is the required_params array, which includes\n" \
  44. " all the available "LOGS_MANAG_FUNC_NAME" sources.\n" \
  45. " When `"LOGS_MANAG_FUNC_PARAM_INFO"` is requested, all other parameters are ignored.\n" \
  46. "\n" \
  47. " "LOGS_MANAG_FUNC_PARAM_DATA_ONLY":true or "LOGS_MANAG_FUNC_PARAM_DATA_ONLY":false\n" \
  48. " Quickly respond with data requested, without generating a\n" \
  49. " `histogram`, `facets` counters and `items`.\n" \
  50. "\n" \
  51. " "LOGS_MANAG_FUNC_PARAM_SOURCE":SOURCE\n" \
  52. " Query only the specified "LOGS_MANAG_FUNC_NAME" sources.\n" \
  53. " Do an `"LOGS_MANAG_FUNC_PARAM_INFO"` query to find the sources.\n" \
  54. "\n" \
  55. " "LOGS_MANAG_FUNC_PARAM_BEFORE":TIMESTAMP_IN_SECONDS\n" \
  56. " Absolute or relative (to now) timestamp in seconds, to start the query.\n" \
  57. " The query is always executed from the most recent to the oldest log entry.\n" \
  58. " If not given the default is: now.\n" \
  59. "\n" \
  60. " "LOGS_MANAG_FUNC_PARAM_AFTER":TIMESTAMP_IN_SECONDS\n" \
  61. " Absolute or relative (to `before`) timestamp in seconds, to end the query.\n" \
  62. " If not given, the default is "LOGS_MANAG_STR(-LOGS_MANAGEMENT_DEFAULT_QUERY_DURATION_IN_SEC)".\n" \
  63. "\n" \
  64. " "LOGS_MANAG_FUNC_PARAM_LAST":ITEMS\n" \
  65. " The number of items to return.\n" \
  66. " The default is "LOGS_MANAG_STR(LOGS_MANAGEMENT_DEFAULT_ITEMS_PER_QUERY)".\n" \
  67. "\n" \
  68. " "LOGS_MANAG_FUNC_PARAM_ANCHOR":TIMESTAMP_IN_MICROSECONDS\n" \
  69. " Return items relative to this timestamp.\n" \
  70. " The exact items to be returned depend on the query `"LOGS_MANAG_FUNC_PARAM_DIRECTION"`.\n" \
  71. "\n" \
  72. " "LOGS_MANAG_FUNC_PARAM_DIRECTION":forward or "LOGS_MANAG_FUNC_PARAM_DIRECTION":backward\n" \
  73. " When set to `backward` (default) the items returned are the newest before the\n" \
  74. " `"LOGS_MANAG_FUNC_PARAM_ANCHOR"`, (or `"LOGS_MANAG_FUNC_PARAM_BEFORE"` if `"LOGS_MANAG_FUNC_PARAM_ANCHOR"` is not set)\n" \
  75. " When set to `forward` the items returned are the oldest after the\n" \
  76. " `"LOGS_MANAG_FUNC_PARAM_ANCHOR"`, (or `"LOGS_MANAG_FUNC_PARAM_AFTER"` if `"LOGS_MANAG_FUNC_PARAM_ANCHOR"` is not set)\n" \
  77. " The default is: backward\n" \
  78. "\n" \
  79. " "LOGS_MANAG_FUNC_PARAM_QUERY":SIMPLE_PATTERN\n" \
  80. " Do a full text search to find the log entries matching the pattern given.\n" \
  81. " The plugin is searching for matches on all fields of the database.\n" \
  82. "\n" \
  83. " "LOGS_MANAG_FUNC_PARAM_IF_MODIFIED_SINCE":TIMESTAMP_IN_MICROSECONDS\n" \
  84. " Each successful response, includes a `last_modified` field.\n" \
  85. " By providing the timestamp to the `"LOGS_MANAG_FUNC_PARAM_IF_MODIFIED_SINCE"` parameter,\n" \
  86. " the plugin will return 200 with a successful response, or 304 if the source has not\n" \
  87. " been modified since that timestamp.\n" \
  88. "\n" \
  89. " "LOGS_MANAG_FUNC_PARAM_HISTOGRAM":facet_id\n" \
  90. " Use the given `facet_id` for the histogram.\n" \
  91. " This parameter is ignored in `"LOGS_MANAG_FUNC_PARAM_DATA_ONLY"` mode.\n" \
  92. "\n" \
  93. " "LOGS_MANAG_FUNC_PARAM_FACETS":facet_id1,facet_id2,facet_id3,...\n" \
  94. " Add the given facets to the list of fields for which analysis is required.\n" \
  95. " The plugin will offer both a histogram and facet value counters for its values.\n" \
  96. " This parameter is ignored in `"LOGS_MANAG_FUNC_PARAM_DATA_ONLY"` mode.\n" \
  97. "\n" \
  98. " facet_id:value_id1,value_id2,value_id3,...\n" \
  99. " Apply filters to the query, based on the facet IDs returned.\n" \
  100. " Each `facet_id` can be given once, but multiple `facet_ids` can be given.\n" \
  101. "\n"
  102. extern netdata_mutex_t stdout_mut;
  103. static DICTIONARY *function_query_status_dict = NULL;
  104. static DICTIONARY *used_hashes_registry = NULL;
  105. typedef struct function_query_status {
  106. bool *cancelled; // a pointer to the cancelling boolean
  107. usec_t stop_monotonic_ut;
  108. usec_t started_monotonic_ut;
  109. // request
  110. STRING *source;
  111. usec_t after_ut;
  112. usec_t before_ut;
  113. struct {
  114. usec_t start_ut;
  115. usec_t stop_ut;
  116. } anchor;
  117. FACETS_ANCHOR_DIRECTION direction;
  118. size_t entries;
  119. usec_t if_modified_since;
  120. bool delta;
  121. bool tail;
  122. bool data_only;
  123. bool slice;
  124. size_t filters;
  125. usec_t last_modified;
  126. const char *query;
  127. const char *histogram;
  128. // per file progress info
  129. size_t cached_count;
  130. // progress statistics
  131. usec_t matches_setup_ut;
  132. size_t rows_useful;
  133. size_t rows_read;
  134. size_t bytes_read;
  135. size_t files_matched;
  136. size_t file_working;
  137. } FUNCTION_QUERY_STATUS;
  138. #define LOGS_MANAG_KEYS_INCLUDED_IN_FACETS \
  139. "log_source" \
  140. "|log_type" \
  141. "|filename" \
  142. "|basename" \
  143. "|chartname" \
  144. "|message" \
  145. ""
  146. static void logsmanagement_function_facets(const char *transaction, char *function, int timeout, bool *cancelled){
  147. struct rusage start, end;
  148. getrusage(RUSAGE_THREAD, &start);
  149. const logs_qry_res_err_t *ret = &logs_qry_res_err[LOGS_QRY_RES_ERR_CODE_SERVER_ERR];
  150. BUFFER *wb = buffer_create(0, NULL);
  151. buffer_flush(wb);
  152. buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
  153. usec_t now_monotonic_ut = now_monotonic_usec();
  154. FUNCTION_QUERY_STATUS tmp_fqs = {
  155. .cancelled = cancelled,
  156. .started_monotonic_ut = now_monotonic_ut,
  157. .stop_monotonic_ut = now_monotonic_ut + (timeout * USEC_PER_SEC),
  158. };
  159. FUNCTION_QUERY_STATUS *fqs = NULL;
  160. const DICTIONARY_ITEM *fqs_item = NULL;
  161. FACETS *facets = facets_create(50, FACETS_OPTION_ALL_KEYS_FTS,
  162. NULL,
  163. LOGS_MANAG_KEYS_INCLUDED_IN_FACETS,
  164. NULL);
  165. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_INFO);
  166. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_SOURCE);
  167. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_AFTER);
  168. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_BEFORE);
  169. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_ANCHOR);
  170. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_DIRECTION);
  171. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_LAST);
  172. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_QUERY);
  173. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_FACETS);
  174. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_HISTOGRAM);
  175. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_IF_MODIFIED_SINCE);
  176. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_DATA_ONLY);
  177. // facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_ID);
  178. // facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_PROGRESS);
  179. facets_accepted_param(facets, LOGS_MANAG_FUNC_PARAM_DELTA);
  180. // facets_accepted_param(facets, JOURNAL_PARAMETER_TAIL);
  181. // #ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
  182. // facets_accepted_param(facets, JOURNAL_PARAMETER_SLICE);
  183. // #endif // HAVE_SD_JOURNAL_RESTART_FIELDS
  184. // register the fields in the order you want them on the dashboard
  185. facets_register_key_name(facets, "log_source", FACET_KEY_OPTION_FACET |
  186. FACET_KEY_OPTION_FTS);
  187. facets_register_key_name(facets, "log_type", FACET_KEY_OPTION_FACET |
  188. FACET_KEY_OPTION_FTS);
  189. facets_register_key_name(facets, "filename", FACET_KEY_OPTION_FACET |
  190. FACET_KEY_OPTION_FTS);
  191. facets_register_key_name(facets, "basename", FACET_KEY_OPTION_FACET |
  192. FACET_KEY_OPTION_FTS);
  193. facets_register_key_name(facets, "chartname", FACET_KEY_OPTION_VISIBLE |
  194. FACET_KEY_OPTION_FACET |
  195. FACET_KEY_OPTION_FTS);
  196. facets_register_key_name(facets, "message", FACET_KEY_OPTION_NEVER_FACET |
  197. FACET_KEY_OPTION_MAIN_TEXT |
  198. FACET_KEY_OPTION_VISIBLE |
  199. FACET_KEY_OPTION_FTS);
  200. bool info = false,
  201. data_only = false,
  202. progress = false,
  203. /* slice = true, */
  204. delta = false,
  205. tail = false;
  206. time_t after_s = 0, before_s = 0;
  207. usec_t anchor = 0;
  208. usec_t if_modified_since = 0;
  209. size_t last = 0;
  210. FACETS_ANCHOR_DIRECTION direction = LOGS_MANAG_DEFAULT_DIRECTION;
  211. const char *query = NULL;
  212. const char *chart = NULL;
  213. const char *source = NULL;
  214. const char *progress_id = NULL;
  215. // size_t filters = 0;
  216. buffer_json_member_add_object(wb, "_request");
  217. logs_query_params_t query_params = {0};
  218. unsigned long req_quota = 0;
  219. // unsigned int fn_off = 0, cn_off = 0;
  220. char *words[LOGS_MANAG_MAX_PARAMS] = { NULL };
  221. size_t num_words = quoted_strings_splitter_pluginsd(function, words, LOGS_MANAG_MAX_PARAMS);
  222. for(int i = 1; i < LOGS_MANAG_MAX_PARAMS ; i++) {
  223. char *keyword = get_word(words, num_words, i);
  224. if(!keyword) break;
  225. if(!strcmp(keyword, LOGS_MANAG_FUNC_PARAM_HELP)){
  226. BUFFER *wb = buffer_create(0, NULL);
  227. buffer_sprintf(wb, FUNCTION_LOGSMANAGEMENT_HELP_LONG);
  228. netdata_mutex_lock(&stdout_mut);
  229. pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb);
  230. netdata_mutex_unlock(&stdout_mut);
  231. buffer_free(wb);
  232. goto cleanup;
  233. }
  234. else if(!strcmp(keyword, LOGS_MANAG_FUNC_PARAM_INFO)){
  235. info = true;
  236. }
  237. else if(!strcmp(keyword, LOGS_MANAG_FUNC_PARAM_PROGRESS)){
  238. progress = true;
  239. }
  240. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_DELTA ":", sizeof(LOGS_MANAG_FUNC_PARAM_DELTA ":") - 1) == 0) {
  241. char *v = &keyword[sizeof(LOGS_MANAG_FUNC_PARAM_DELTA ":") - 1];
  242. if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
  243. delta = false;
  244. else
  245. delta = true;
  246. }
  247. // else if(strncmp(keyword, JOURNAL_PARAMETER_TAIL ":", sizeof(JOURNAL_PARAMETER_TAIL ":") - 1) == 0) {
  248. // char *v = &keyword[sizeof(JOURNAL_PARAMETER_TAIL ":") - 1];
  249. // if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
  250. // tail = false;
  251. // else
  252. // tail = true;
  253. // }
  254. else if(!strncmp( keyword,
  255. LOGS_MANAG_FUNC_PARAM_DATA_ONLY ":",
  256. sizeof(LOGS_MANAG_FUNC_PARAM_DATA_ONLY ":") - 1)) {
  257. char *v = &keyword[sizeof(LOGS_MANAG_FUNC_PARAM_DATA_ONLY ":") - 1];
  258. if(!strcmp(v, "false") || !strcmp(v, "no") || !strcmp(v, "0"))
  259. data_only = false;
  260. else
  261. data_only = true;
  262. }
  263. // else if(strncmp(keyword, JOURNAL_PARAMETER_SLICE ":", sizeof(JOURNAL_PARAMETER_SLICE ":") - 1) == 0) {
  264. // char *v = &keyword[sizeof(JOURNAL_PARAMETER_SLICE ":") - 1];
  265. // if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
  266. // slice = false;
  267. // else
  268. // slice = true;
  269. // }
  270. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_ID ":", sizeof(LOGS_MANAG_FUNC_PARAM_ID ":") - 1) == 0) {
  271. char *id = &keyword[sizeof(LOGS_MANAG_FUNC_PARAM_ID ":") - 1];
  272. if(*id)
  273. progress_id = id;
  274. }
  275. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_SOURCE ":", sizeof(LOGS_MANAG_FUNC_PARAM_SOURCE ":") - 1) == 0) {
  276. source = !strcmp("all", &keyword[sizeof(LOGS_MANAG_FUNC_PARAM_SOURCE ":") - 1]) ?
  277. NULL : &keyword[sizeof(LOGS_MANAG_FUNC_PARAM_SOURCE ":") - 1];
  278. }
  279. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_AFTER ":", sizeof(LOGS_MANAG_FUNC_PARAM_AFTER ":") - 1) == 0) {
  280. after_s = str2l(&keyword[sizeof(LOGS_MANAG_FUNC_PARAM_AFTER ":") - 1]);
  281. }
  282. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_BEFORE ":", sizeof(LOGS_MANAG_FUNC_PARAM_BEFORE ":") - 1) == 0) {
  283. before_s = str2l(&keyword[sizeof(LOGS_MANAG_FUNC_PARAM_BEFORE ":") - 1]);
  284. }
  285. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_IF_MODIFIED_SINCE ":", sizeof(LOGS_MANAG_FUNC_PARAM_IF_MODIFIED_SINCE ":") - 1) == 0) {
  286. if_modified_since = str2ull(&keyword[sizeof(LOGS_MANAG_FUNC_PARAM_IF_MODIFIED_SINCE ":") - 1], NULL);
  287. }
  288. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_ANCHOR ":", sizeof(LOGS_MANAG_FUNC_PARAM_ANCHOR ":") - 1) == 0) {
  289. anchor = str2ull(&keyword[sizeof(LOGS_MANAG_FUNC_PARAM_ANCHOR ":") - 1], NULL);
  290. }
  291. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_DIRECTION ":", sizeof(LOGS_MANAG_FUNC_PARAM_DIRECTION ":") - 1) == 0) {
  292. direction = !strcasecmp(&keyword[sizeof(LOGS_MANAG_FUNC_PARAM_DIRECTION ":") - 1], "forward") ?
  293. FACETS_ANCHOR_DIRECTION_FORWARD : FACETS_ANCHOR_DIRECTION_BACKWARD;
  294. }
  295. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_LAST ":", sizeof(LOGS_MANAG_FUNC_PARAM_LAST ":") - 1) == 0) {
  296. last = str2ul(&keyword[sizeof(LOGS_MANAG_FUNC_PARAM_LAST ":") - 1]);
  297. }
  298. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_QUERY ":", sizeof(LOGS_MANAG_FUNC_PARAM_QUERY ":") - 1) == 0) {
  299. query= &keyword[sizeof(LOGS_MANAG_FUNC_PARAM_QUERY ":") - 1];
  300. }
  301. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_HISTOGRAM ":", sizeof(LOGS_MANAG_FUNC_PARAM_HISTOGRAM ":") - 1) == 0) {
  302. chart = &keyword[sizeof(LOGS_MANAG_FUNC_PARAM_HISTOGRAM ":") - 1];
  303. }
  304. else if(strncmp(keyword, LOGS_MANAG_FUNC_PARAM_FACETS ":", sizeof(LOGS_MANAG_FUNC_PARAM_FACETS ":") - 1) == 0) {
  305. char *value = &keyword[sizeof(LOGS_MANAG_FUNC_PARAM_FACETS ":") - 1];
  306. if(*value) {
  307. buffer_json_member_add_array(wb, LOGS_MANAG_FUNC_PARAM_FACETS);
  308. while(value) {
  309. char *sep = strchr(value, ',');
  310. if(sep)
  311. *sep++ = '\0';
  312. facets_register_facet_id(facets, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER);
  313. buffer_json_add_array_item_string(wb, value);
  314. value = sep;
  315. }
  316. buffer_json_array_close(wb); // LOGS_MANAG_FUNC_PARAM_FACETS
  317. }
  318. }
  319. else {
  320. char *value = strchr(keyword, ':');
  321. if(value) {
  322. *value++ = '\0';
  323. buffer_json_member_add_array(wb, keyword);
  324. while(value) {
  325. char *sep = strchr(value, ',');
  326. if(sep)
  327. *sep++ = '\0';
  328. facets_register_facet_id_filter(facets, keyword, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER);
  329. buffer_json_add_array_item_string(wb, value);
  330. // filters++;
  331. value = sep;
  332. }
  333. buffer_json_array_close(wb); // keyword
  334. }
  335. }
  336. }
  337. // ------------------------------------------------------------------------
  338. // put this request into the progress db
  339. if(progress_id && *progress_id) {
  340. fqs_item = dictionary_set_and_acquire_item(function_query_status_dict, progress_id, &tmp_fqs, sizeof(tmp_fqs));
  341. fqs = dictionary_acquired_item_value(fqs_item);
  342. }
  343. else {
  344. // no progress id given, proceed without registering our progress in the dictionary
  345. fqs = &tmp_fqs;
  346. fqs_item = NULL;
  347. }
  348. // ------------------------------------------------------------------------
  349. // validate parameters
  350. time_t now_s = now_realtime_sec();
  351. time_t expires = now_s + 1;
  352. if(!after_s && !before_s) {
  353. before_s = now_s;
  354. after_s = before_s - LOGS_MANAGEMENT_DEFAULT_QUERY_DURATION_IN_SEC;
  355. }
  356. else
  357. rrdr_relative_window_to_absolute(&after_s, &before_s, now_s);
  358. if(after_s > before_s) {
  359. time_t tmp = after_s;
  360. after_s = before_s;
  361. before_s = tmp;
  362. }
  363. if(after_s == before_s)
  364. after_s = before_s - LOGS_MANAGEMENT_DEFAULT_QUERY_DURATION_IN_SEC;
  365. if(!last)
  366. last = LOGS_MANAGEMENT_DEFAULT_ITEMS_PER_QUERY;
  367. // ------------------------------------------------------------------------
  368. // set query time-frame, anchors and direction
  369. fqs->after_ut = after_s * USEC_PER_SEC;
  370. fqs->before_ut = (before_s * USEC_PER_SEC) + USEC_PER_SEC - 1;
  371. fqs->if_modified_since = if_modified_since;
  372. fqs->data_only = data_only;
  373. fqs->delta = (fqs->data_only) ? delta : false;
  374. fqs->tail = (fqs->data_only && fqs->if_modified_since) ? tail : false;
  375. fqs->source = string_strdupz(source);
  376. fqs->entries = last;
  377. fqs->last_modified = 0;
  378. // fqs->filters = filters;
  379. fqs->query = (query && *query) ? query : NULL;
  380. fqs->histogram = (chart && *chart) ? chart : NULL;
  381. fqs->direction = direction;
  382. fqs->anchor.start_ut = anchor;
  383. fqs->anchor.stop_ut = 0;
  384. if(fqs->anchor.start_ut && fqs->tail) {
  385. // a tail request
  386. // we need the top X entries from BEFORE
  387. // but, we need to calculate the facets and the
  388. // histogram up to the anchor
  389. fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
  390. fqs->anchor.start_ut = 0;
  391. fqs->anchor.stop_ut = anchor;
  392. }
  393. if(anchor && anchor < fqs->after_ut) {
  394. // log_fqs(fqs, "received anchor is too small for query timeframe, ignoring anchor");
  395. anchor = 0;
  396. fqs->anchor.start_ut = 0;
  397. fqs->anchor.stop_ut = 0;
  398. fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
  399. }
  400. else if(anchor > fqs->before_ut) {
  401. // log_fqs(fqs, "received anchor is too big for query timeframe, ignoring anchor");
  402. anchor = 0;
  403. fqs->anchor.start_ut = 0;
  404. fqs->anchor.stop_ut = 0;
  405. fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
  406. }
  407. facets_set_anchor(facets, fqs->anchor.start_ut, fqs->anchor.stop_ut, fqs->direction);
  408. facets_set_additional_options(facets,
  409. ((fqs->data_only) ? FACETS_OPTION_DATA_ONLY : 0) |
  410. ((fqs->delta) ? FACETS_OPTION_SHOW_DELTAS : 0));
  411. // ------------------------------------------------------------------------
  412. // set the rest of the query parameters
  413. facets_set_items(facets, fqs->entries);
  414. facets_set_query(facets, fqs->query);
  415. // #ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
  416. // fqs->slice = slice;
  417. // if(slice)
  418. // facets_enable_slice_mode(facets);
  419. // #else
  420. // fqs->slice = false;
  421. // #endif
  422. if(fqs->histogram)
  423. facets_set_timeframe_and_histogram_by_id(facets, fqs->histogram, fqs->after_ut, fqs->before_ut);
  424. else
  425. facets_set_timeframe_and_histogram_by_name(facets, chart ? chart : "chartname", fqs->after_ut, fqs->before_ut);
  426. // ------------------------------------------------------------------------
  427. // complete the request object
  428. buffer_json_member_add_boolean(wb, LOGS_MANAG_FUNC_PARAM_INFO, false);
  429. buffer_json_member_add_boolean(wb, LOGS_MANAG_FUNC_PARAM_SLICE, fqs->slice);
  430. buffer_json_member_add_boolean(wb, LOGS_MANAG_FUNC_PARAM_DATA_ONLY, fqs->data_only);
  431. buffer_json_member_add_boolean(wb, LOGS_MANAG_FUNC_PARAM_PROGRESS, false);
  432. buffer_json_member_add_boolean(wb, LOGS_MANAG_FUNC_PARAM_DELTA, fqs->delta);
  433. buffer_json_member_add_boolean(wb, LOGS_MANAG_FUNC_PARAM_TAIL, fqs->tail);
  434. buffer_json_member_add_string(wb, LOGS_MANAG_FUNC_PARAM_ID, progress_id);
  435. buffer_json_member_add_string(wb, LOGS_MANAG_FUNC_PARAM_SOURCE, string2str(fqs->source));
  436. buffer_json_member_add_uint64(wb, LOGS_MANAG_FUNC_PARAM_AFTER, fqs->after_ut / USEC_PER_SEC);
  437. buffer_json_member_add_uint64(wb, LOGS_MANAG_FUNC_PARAM_BEFORE, fqs->before_ut / USEC_PER_SEC);
  438. buffer_json_member_add_uint64(wb, LOGS_MANAG_FUNC_PARAM_IF_MODIFIED_SINCE, fqs->if_modified_since);
  439. buffer_json_member_add_uint64(wb, LOGS_MANAG_FUNC_PARAM_ANCHOR, anchor);
  440. buffer_json_member_add_string(wb, LOGS_MANAG_FUNC_PARAM_DIRECTION,
  441. fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward");
  442. buffer_json_member_add_uint64(wb, LOGS_MANAG_FUNC_PARAM_LAST, fqs->entries);
  443. buffer_json_member_add_string(wb, LOGS_MANAG_FUNC_PARAM_QUERY, fqs->query);
  444. buffer_json_member_add_string(wb, LOGS_MANAG_FUNC_PARAM_HISTOGRAM, fqs->histogram);
  445. buffer_json_object_close(wb); // request
  446. // buffer_json_journal_versions(wb);
  447. // ------------------------------------------------------------------------
  448. // run the request
  449. if(info) {
  450. facets_accepted_parameters_to_json_array(facets, wb, false);
  451. buffer_json_member_add_array(wb, "required_params");
  452. {
  453. buffer_json_add_array_item_object(wb);
  454. {
  455. buffer_json_member_add_string(wb, "id", "source");
  456. buffer_json_member_add_string(wb, "name", "source");
  457. buffer_json_member_add_string(wb, "help", "Select the Logs Management source to query");
  458. buffer_json_member_add_string(wb, "type", "select");
  459. buffer_json_member_add_array(wb, "options");
  460. ret = fetch_log_sources(wb);
  461. buffer_json_array_close(wb); // options array
  462. }
  463. buffer_json_object_close(wb); // required params object
  464. }
  465. buffer_json_array_close(wb); // required_params array
  466. facets_table_config(wb);
  467. buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
  468. buffer_json_member_add_string(wb, "type", "table");
  469. buffer_json_member_add_string(wb, "help", FUNCTION_LOGSMANAGEMENT_HELP_SHORT);
  470. buffer_json_finalize(wb);
  471. goto output;
  472. }
  473. if(progress) {
  474. // TODO: Add progress function
  475. // function_logsmanagement_progress(wb, transaction, progress_id);
  476. goto cleanup;
  477. }
  478. if(!req_quota)
  479. query_params.quota = LOGS_MANAG_QUERY_QUOTA_DEFAULT;
  480. else if(req_quota > LOGS_MANAG_QUERY_QUOTA_MAX)
  481. query_params.quota = LOGS_MANAG_QUERY_QUOTA_MAX;
  482. else query_params.quota = req_quota;
  483. if(fqs->source)
  484. query_params.chartname[0] = (char *) string2str(fqs->source);
  485. query_params.order_by_asc = 0;
  486. // NOTE: Always perform descending timestamp query, req_from_ts >= req_to_ts.
  487. if(fqs->direction == FACETS_ANCHOR_DIRECTION_BACKWARD){
  488. query_params.req_from_ts =
  489. (fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut / USEC_PER_MS : before_s * MSEC_PER_SEC;
  490. query_params.req_to_ts =
  491. (fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut / USEC_PER_MS : after_s * MSEC_PER_SEC;
  492. }
  493. else{
  494. query_params.req_from_ts =
  495. (fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut / USEC_PER_MS : before_s * MSEC_PER_SEC;
  496. query_params.req_to_ts =
  497. (fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut / USEC_PER_MS : after_s * MSEC_PER_SEC;
  498. }
  499. query_params.cancelled = cancelled;
  500. query_params.stop_monotonic_ut = now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC;
  501. query_params.results_buff = buffer_create(query_params.quota, NULL);
  502. facets_rows_begin(facets);
  503. do{
  504. if(query_params.act_to_ts)
  505. query_params.req_from_ts = query_params.act_to_ts - 1000;
  506. ret = execute_logs_manag_query(&query_params);
  507. size_t res_off = 0;
  508. logs_query_res_hdr_t *p_res_hdr;
  509. while(query_params.results_buff->len - res_off > 0){
  510. p_res_hdr = (logs_query_res_hdr_t *) &query_params.results_buff->buffer[res_off];
  511. ssize_t remaining = p_res_hdr->text_size;
  512. char *ls = &query_params.results_buff->buffer[res_off] + sizeof(*p_res_hdr) + p_res_hdr->text_size - 1;
  513. *ls = '\0';
  514. int timestamp_off = p_res_hdr->matches;
  515. do{
  516. do{
  517. --remaining;
  518. --ls;
  519. } while(remaining > 0 && *ls != '\n');
  520. *ls = '\0';
  521. --remaining;
  522. --ls;
  523. usec_t timestamp = p_res_hdr->timestamp * USEC_PER_MS + --timestamp_off;
  524. if(unlikely(!fqs->last_modified)) {
  525. if(timestamp == if_modified_since){
  526. ret = &logs_qry_res_err[LOGS_QRY_RES_ERR_CODE_UNMODIFIED];
  527. goto output;
  528. }
  529. else
  530. fqs->last_modified = timestamp;
  531. }
  532. facets_add_key_value(facets, "log_source", p_res_hdr->log_source[0] ? p_res_hdr->log_source : "-");
  533. facets_add_key_value(facets, "log_type", p_res_hdr->log_type[0] ? p_res_hdr->log_type : "-");
  534. facets_add_key_value(facets, "filename", p_res_hdr->filename[0] ? p_res_hdr->filename : "-");
  535. facets_add_key_value(facets, "basename", p_res_hdr->basename[0] ? p_res_hdr->basename : "-");
  536. facets_add_key_value(facets, "chartname", p_res_hdr->chartname[0] ? p_res_hdr->chartname : "-");
  537. size_t ls_len = strlen(ls + 2);
  538. facets_add_key_value_length(facets, "message", sizeof("message") - 1,
  539. ls + 2, ls_len <= FACET_MAX_VALUE_LENGTH ? ls_len : FACET_MAX_VALUE_LENGTH);
  540. facets_row_finished(facets, timestamp);
  541. } while(remaining > 0);
  542. res_off += sizeof(*p_res_hdr) + p_res_hdr->text_size;
  543. }
  544. buffer_flush(query_params.results_buff);
  545. } while(query_params.act_to_ts > query_params.req_to_ts);
  546. m_assert(query_params.req_from_ts == query_params.act_from_ts, "query_params.req_from_ts != query_params.act_from_ts");
  547. m_assert(query_params.req_to_ts == query_params.act_to_ts , "query_params.req_to_ts != query_params.act_to_ts");
  548. getrusage(RUSAGE_THREAD, &end);
  549. time_t user_time = end.ru_utime.tv_sec * USEC_PER_SEC + end.ru_utime.tv_usec -
  550. start.ru_utime.tv_sec * USEC_PER_SEC - start.ru_utime.tv_usec;
  551. time_t sys_time = end.ru_stime.tv_sec * USEC_PER_SEC + end.ru_stime.tv_usec -
  552. start.ru_stime.tv_sec * USEC_PER_SEC - start.ru_stime.tv_usec;
  553. buffer_json_member_add_object(wb, "logs_management_meta");
  554. buffer_json_member_add_string(wb, "api_version", LOGS_QRY_VERSION);
  555. buffer_json_member_add_uint64(wb, "num_lines", query_params.num_lines);
  556. buffer_json_member_add_uint64(wb, "user_time", user_time);
  557. buffer_json_member_add_uint64(wb, "system_time", sys_time);
  558. buffer_json_member_add_uint64(wb, "total_time", user_time + sys_time);
  559. buffer_json_member_add_uint64(wb, "error_code", (uint64_t) ret->err_code);
  560. buffer_json_member_add_string(wb, "error_string", ret->err_str);
  561. buffer_json_object_close(wb); // logs_management_meta
  562. buffer_json_member_add_uint64(wb, "status", ret->http_code);
  563. buffer_json_member_add_boolean(wb, "partial", ret->http_code != HTTP_RESP_OK ||
  564. ret->err_code == LOGS_QRY_RES_ERR_CODE_TIMEOUT);
  565. buffer_json_member_add_string(wb, "type", "table");
  566. if(!fqs->data_only) {
  567. buffer_json_member_add_time_t(wb, "update_every", 1);
  568. buffer_json_member_add_string(wb, "help", FUNCTION_LOGSMANAGEMENT_HELP_SHORT);
  569. }
  570. if(!fqs->data_only || fqs->tail)
  571. buffer_json_member_add_uint64(wb, "last_modified", fqs->last_modified);
  572. facets_sort_and_reorder_keys(facets);
  573. facets_report(facets, wb, used_hashes_registry);
  574. buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + (fqs->data_only ? 3600 : 0));
  575. buffer_json_finalize(wb); // logs_management_meta
  576. // ------------------------------------------------------------------------
  577. // cleanup query params
  578. string_freez(fqs->source);
  579. fqs->source = NULL;
  580. // ------------------------------------------------------------------------
  581. // handle error response
  582. output:
  583. netdata_mutex_lock(&stdout_mut);
  584. if(ret->http_code != HTTP_RESP_OK)
  585. pluginsd_function_json_error_to_stdout(transaction, ret->http_code, ret->err_str);
  586. else
  587. pluginsd_function_result_to_stdout(transaction, ret->http_code, "application/json", expires, wb);
  588. netdata_mutex_unlock(&stdout_mut);
  589. cleanup:
  590. facets_destroy(facets);
  591. buffer_free(query_params.results_buff);
  592. buffer_free(wb);
  593. if(fqs_item) {
  594. dictionary_del(function_query_status_dict, dictionary_acquired_item_name(fqs_item));
  595. dictionary_acquired_item_release(function_query_status_dict, fqs_item);
  596. dictionary_garbage_collect(function_query_status_dict);
  597. }
  598. }
  599. struct functions_evloop_globals *logsmanagement_func_facets_init(bool *p_logsmanagement_should_exit){
  600. function_query_status_dict = dictionary_create_advanced(
  601. DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
  602. NULL, sizeof(FUNCTION_QUERY_STATUS));
  603. used_hashes_registry = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
  604. netdata_mutex_lock(&stdout_mut);
  605. fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"%s\" %d \"%s\"\n",
  606. LOGS_MANAG_FUNC_NAME,
  607. LOGS_MANAG_QUERY_TIMEOUT_DEFAULT,
  608. FUNCTION_LOGSMANAGEMENT_HELP_SHORT);
  609. netdata_mutex_unlock(&stdout_mut);
  610. struct functions_evloop_globals *wg = functions_evloop_init(1, "LGSMNGM",
  611. &stdout_mut,
  612. p_logsmanagement_should_exit);
  613. functions_evloop_add_function( wg, LOGS_MANAG_FUNC_NAME,
  614. logsmanagement_function_facets,
  615. LOGS_MANAG_QUERY_TIMEOUT_DEFAULT);
  616. return wg;
  617. }