systemd-journal.c 89 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. /*
  3. * netdata systemd-journal.plugin
  4. * Copyright (C) 2023 Netdata Inc.
  5. * GPL v3+
  6. */
  7. #include "systemd-internals.h"
  8. /*
  9. * TODO
  10. *
  11. * _UDEV_DEVLINK is frequently set more than once per field - support multi-value faces
  12. *
  13. */
  14. #define FACET_MAX_VALUE_LENGTH 8192
  15. #define SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION "View, search and analyze systemd journal entries."
  16. #define SYSTEMD_JOURNAL_FUNCTION_NAME "systemd-journal"
  17. #define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 60
  18. #define SYSTEMD_JOURNAL_MAX_PARAMS 1000
  19. #define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (1 * 3600)
  20. #define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200
  21. #define SYSTEMD_JOURNAL_DEFAULT_ITEMS_SAMPLING 1000000
  22. #define SYSTEMD_JOURNAL_SAMPLING_SLOTS 1000
  23. #define SYSTEMD_JOURNAL_SAMPLING_RECALIBRATE 10000
  24. #define JOURNAL_PARAMETER_HELP "help"
  25. #define JOURNAL_PARAMETER_AFTER "after"
  26. #define JOURNAL_PARAMETER_BEFORE "before"
  27. #define JOURNAL_PARAMETER_ANCHOR "anchor"
  28. #define JOURNAL_PARAMETER_LAST "last"
  29. #define JOURNAL_PARAMETER_QUERY "query"
  30. #define JOURNAL_PARAMETER_FACETS "facets"
  31. #define JOURNAL_PARAMETER_HISTOGRAM "histogram"
  32. #define JOURNAL_PARAMETER_DIRECTION "direction"
  33. #define JOURNAL_PARAMETER_IF_MODIFIED_SINCE "if_modified_since"
  34. #define JOURNAL_PARAMETER_DATA_ONLY "data_only"
  35. #define JOURNAL_PARAMETER_SOURCE "source"
  36. #define JOURNAL_PARAMETER_INFO "info"
  37. #define JOURNAL_PARAMETER_ID "id"
  38. #define JOURNAL_PARAMETER_PROGRESS "progress"
  39. #define JOURNAL_PARAMETER_SLICE "slice"
  40. #define JOURNAL_PARAMETER_DELTA "delta"
  41. #define JOURNAL_PARAMETER_TAIL "tail"
  42. #define JOURNAL_PARAMETER_SAMPLING "sampling"
  43. #define JOURNAL_KEY_ND_JOURNAL_FILE "ND_JOURNAL_FILE"
  44. #define JOURNAL_KEY_ND_JOURNAL_PROCESS "ND_JOURNAL_PROCESS"
  45. #define JOURNAL_DEFAULT_SLICE_MODE true
  46. #define JOURNAL_DEFAULT_DIRECTION FACETS_ANCHOR_DIRECTION_BACKWARD
  47. #define SYSTEMD_ALWAYS_VISIBLE_KEYS NULL
  48. #define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS \
  49. "!MESSAGE_ID" \
  50. "|*MESSAGE*" \
  51. "|*_RAW" \
  52. "|*_USEC" \
  53. "|*_NSEC" \
  54. "|*TIMESTAMP*" \
  55. "|*_ID" \
  56. "|*_ID_*" \
  57. "|__*" \
  58. ""
  59. #define SYSTEMD_KEYS_INCLUDED_IN_FACETS \
  60. \
  61. /* --- USER JOURNAL FIELDS --- */ \
  62. \
  63. /* "|MESSAGE" */ \
  64. "|MESSAGE_ID" \
  65. "|PRIORITY" \
  66. "|CODE_FILE" \
  67. /* "|CODE_LINE" */ \
  68. "|CODE_FUNC" \
  69. "|ERRNO" \
  70. /* "|INVOCATION_ID" */ \
  71. /* "|USER_INVOCATION_ID" */ \
  72. "|SYSLOG_FACILITY" \
  73. "|SYSLOG_IDENTIFIER" \
  74. /* "|SYSLOG_PID" */ \
  75. /* "|SYSLOG_TIMESTAMP" */ \
  76. /* "|SYSLOG_RAW" */ \
  77. /* "!DOCUMENTATION" */ \
  78. /* "|TID" */ \
  79. "|UNIT" \
  80. "|USER_UNIT" \
  81. "|UNIT_RESULT" /* undocumented */ \
  82. \
  83. \
  84. /* --- TRUSTED JOURNAL FIELDS --- */ \
  85. \
  86. /* "|_PID" */ \
  87. "|_UID" \
  88. "|_GID" \
  89. "|_COMM" \
  90. "|_EXE" \
  91. /* "|_CMDLINE" */ \
  92. "|_CAP_EFFECTIVE" \
  93. /* "|_AUDIT_SESSION" */ \
  94. "|_AUDIT_LOGINUID" \
  95. "|_SYSTEMD_CGROUP" \
  96. "|_SYSTEMD_SLICE" \
  97. "|_SYSTEMD_UNIT" \
  98. "|_SYSTEMD_USER_UNIT" \
  99. "|_SYSTEMD_USER_SLICE" \
  100. "|_SYSTEMD_SESSION" \
  101. "|_SYSTEMD_OWNER_UID" \
  102. "|_SELINUX_CONTEXT" \
  103. /* "|_SOURCE_REALTIME_TIMESTAMP" */ \
  104. "|_BOOT_ID" \
  105. "|_MACHINE_ID" \
  106. /* "|_SYSTEMD_INVOCATION_ID" */ \
  107. "|_HOSTNAME" \
  108. "|_TRANSPORT" \
  109. "|_STREAM_ID" \
  110. /* "|LINE_BREAK" */ \
  111. "|_NAMESPACE" \
  112. "|_RUNTIME_SCOPE" \
  113. \
  114. \
  115. /* --- KERNEL JOURNAL FIELDS --- */ \
  116. \
  117. /* "|_KERNEL_DEVICE" */ \
  118. "|_KERNEL_SUBSYSTEM" \
  119. /* "|_UDEV_SYSNAME" */ \
  120. "|_UDEV_DEVNODE" \
  121. /* "|_UDEV_DEVLINK" */ \
  122. \
  123. \
  124. /* --- LOGGING ON BEHALF --- */ \
  125. \
  126. "|OBJECT_UID" \
  127. "|OBJECT_GID" \
  128. "|OBJECT_COMM" \
  129. "|OBJECT_EXE" \
  130. /* "|OBJECT_CMDLINE" */ \
  131. /* "|OBJECT_AUDIT_SESSION" */ \
  132. "|OBJECT_AUDIT_LOGINUID" \
  133. "|OBJECT_SYSTEMD_CGROUP" \
  134. "|OBJECT_SYSTEMD_SESSION" \
  135. "|OBJECT_SYSTEMD_OWNER_UID" \
  136. "|OBJECT_SYSTEMD_UNIT" \
  137. "|OBJECT_SYSTEMD_USER_UNIT" \
  138. \
  139. \
  140. /* --- CORE DUMPS --- */ \
  141. \
  142. "|COREDUMP_COMM" \
  143. "|COREDUMP_UNIT" \
  144. "|COREDUMP_USER_UNIT" \
  145. "|COREDUMP_SIGNAL_NAME" \
  146. "|COREDUMP_CGROUP" \
  147. \
  148. \
  149. /* --- DOCKER --- */ \
  150. \
  151. "|CONTAINER_ID" \
  152. /* "|CONTAINER_ID_FULL" */ \
  153. "|CONTAINER_NAME" \
  154. "|CONTAINER_TAG" \
  155. "|IMAGE_NAME" /* undocumented */ \
  156. /* "|CONTAINER_PARTIAL_MESSAGE" */ \
  157. \
  158. \
  159. /* --- NETDATA --- */ \
  160. \
  161. "|ND_NIDL_NODE" \
  162. "|ND_NIDL_CONTEXT" \
  163. "|ND_LOG_SOURCE" \
  164. /*"|ND_MODULE" */ \
  165. "|ND_ALERT_NAME" \
  166. "|ND_ALERT_CLASS" \
  167. "|ND_ALERT_COMPONENT" \
  168. "|ND_ALERT_TYPE" \
  169. \
  170. ""
  171. // ----------------------------------------------------------------------------
  172. typedef struct function_query_status {
  173. bool *cancelled; // a pointer to the cancelling boolean
  174. usec_t stop_monotonic_ut;
  175. usec_t started_monotonic_ut;
  176. // request
  177. SD_JOURNAL_FILE_SOURCE_TYPE source_type;
  178. SIMPLE_PATTERN *sources;
  179. usec_t after_ut;
  180. usec_t before_ut;
  181. struct {
  182. usec_t start_ut;
  183. usec_t stop_ut;
  184. } anchor;
  185. FACETS_ANCHOR_DIRECTION direction;
  186. size_t entries;
  187. usec_t if_modified_since;
  188. bool delta;
  189. bool tail;
  190. bool data_only;
  191. bool slice;
  192. size_t sampling;
  193. size_t filters;
  194. usec_t last_modified;
  195. const char *query;
  196. const char *histogram;
  197. struct {
  198. usec_t start_ut; // the starting time of the query - we start from this
  199. usec_t stop_ut; // the ending time of the query - we stop at this
  200. usec_t first_msg_ut;
  201. sd_id128_t first_msg_writer;
  202. uint64_t first_msg_seqnum;
  203. } query_file;
  204. struct {
  205. uint32_t enable_after_samples;
  206. uint32_t slots;
  207. uint32_t sampled;
  208. uint32_t unsampled;
  209. uint32_t estimated;
  210. } samples;
  211. struct {
  212. uint32_t enable_after_samples;
  213. uint32_t every;
  214. uint32_t skipped;
  215. uint32_t recalibrate;
  216. uint32_t sampled;
  217. uint32_t unsampled;
  218. uint32_t estimated;
  219. } samples_per_file;
  220. struct {
  221. usec_t start_ut;
  222. usec_t end_ut;
  223. usec_t step_ut;
  224. uint32_t enable_after_samples;
  225. uint32_t sampled[SYSTEMD_JOURNAL_SAMPLING_SLOTS];
  226. uint32_t unsampled[SYSTEMD_JOURNAL_SAMPLING_SLOTS];
  227. } samples_per_time_slot;
  228. // per file progress info
  229. // size_t cached_count;
  230. // progress statistics
  231. usec_t matches_setup_ut;
  232. size_t rows_useful;
  233. size_t rows_read;
  234. size_t bytes_read;
  235. size_t files_matched;
  236. size_t file_working;
  237. } FUNCTION_QUERY_STATUS;
  238. static void log_fqs(FUNCTION_QUERY_STATUS *fqs, const char *msg) {
  239. netdata_log_error("ERROR: %s, on query "
  240. "timeframe [%"PRIu64" - %"PRIu64"], "
  241. "anchor [%"PRIu64" - %"PRIu64"], "
  242. "if_modified_since %"PRIu64", "
  243. "data_only:%s, delta:%s, tail:%s, direction:%s"
  244. , msg
  245. , fqs->after_ut, fqs->before_ut
  246. , fqs->anchor.start_ut, fqs->anchor.stop_ut
  247. , fqs->if_modified_since
  248. , fqs->data_only ? "true" : "false"
  249. , fqs->delta ? "true" : "false"
  250. , fqs->tail ? "tail" : "false"
  251. , fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward");
  252. }
  253. static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) {
  254. if(sd_journal_seek_realtime_usec(j, timestamp) < 0) {
  255. netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, timestamp);
  256. if(sd_journal_seek_tail(j) < 0) {
  257. netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail");
  258. return false;
  259. }
  260. }
  261. return true;
  262. }
  263. #define JD_SOURCE_REALTIME_TIMESTAMP "_SOURCE_REALTIME_TIMESTAMP"
  264. // ----------------------------------------------------------------------------
  265. // sampling support
  266. static void sampling_query_init(FUNCTION_QUERY_STATUS *fqs, FACETS *facets) {
  267. if(!fqs->sampling)
  268. return;
  269. if(!fqs->slice) {
  270. // the user is doing a full data query
  271. // disable sampling
  272. fqs->sampling = 0;
  273. return;
  274. }
  275. if(fqs->data_only) {
  276. // the user is doing a data query
  277. // disable sampling
  278. fqs->sampling = 0;
  279. return;
  280. }
  281. if(!fqs->files_matched) {
  282. // no files have been matched
  283. // disable sampling
  284. fqs->sampling = 0;
  285. return;
  286. }
  287. fqs->samples.slots = facets_histogram_slots(facets);
  288. if(fqs->samples.slots < 2) fqs->samples.slots = 2;
  289. if(fqs->samples.slots > SYSTEMD_JOURNAL_SAMPLING_SLOTS)
  290. fqs->samples.slots = SYSTEMD_JOURNAL_SAMPLING_SLOTS;
  291. if(!fqs->after_ut || !fqs->before_ut || fqs->after_ut >= fqs->before_ut) {
  292. // we don't have enough information for sampling
  293. fqs->sampling = 0;
  294. return;
  295. }
  296. usec_t delta = fqs->before_ut - fqs->after_ut;
  297. usec_t step = delta / facets_histogram_slots(facets) - 1;
  298. if(step < 1) step = 1;
  299. fqs->samples_per_time_slot.start_ut = fqs->after_ut;
  300. fqs->samples_per_time_slot.end_ut = fqs->before_ut;
  301. fqs->samples_per_time_slot.step_ut = step;
  302. // the minimum number of rows to enable sampling
  303. fqs->samples.enable_after_samples = fqs->sampling / 2;
  304. size_t files_matched = fqs->files_matched;
  305. if(!files_matched)
  306. files_matched = 1;
  307. // the minimum number of rows per file to enable sampling
  308. fqs->samples_per_file.enable_after_samples = (fqs->sampling / 4) / files_matched;
  309. if(fqs->samples_per_file.enable_after_samples < fqs->entries)
  310. fqs->samples_per_file.enable_after_samples = fqs->entries;
  311. // the minimum number of rows per time slot to enable sampling
  312. fqs->samples_per_time_slot.enable_after_samples = (fqs->sampling / 4) / fqs->samples.slots;
  313. if(fqs->samples_per_time_slot.enable_after_samples < fqs->entries)
  314. fqs->samples_per_time_slot.enable_after_samples = fqs->entries;
  315. }
  316. static void sampling_file_init(FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf __maybe_unused) {
  317. fqs->samples_per_file.sampled = 0;
  318. fqs->samples_per_file.unsampled = 0;
  319. fqs->samples_per_file.estimated = 0;
  320. fqs->samples_per_file.every = 0;
  321. fqs->samples_per_file.skipped = 0;
  322. fqs->samples_per_file.recalibrate = 0;
  323. }
  324. static size_t sampling_file_lines_scanned_so_far(FUNCTION_QUERY_STATUS *fqs) {
  325. size_t sampled = fqs->samples_per_file.sampled + fqs->samples_per_file.unsampled;
  326. if(!sampled) sampled = 1;
  327. return sampled;
  328. }
  329. static void sampling_running_file_query_overlapping_timeframe_ut(
  330. FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction,
  331. usec_t msg_ut, usec_t *after_ut, usec_t *before_ut) {
  332. // find the overlap of the query and file timeframes
  333. // taking into account the first message we encountered
  334. usec_t oldest_ut, newest_ut;
  335. if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) {
  336. // the first message we know (oldest)
  337. oldest_ut = fqs->query_file.first_msg_ut ? fqs->query_file.first_msg_ut : jf->msg_first_ut;
  338. if(!oldest_ut) oldest_ut = fqs->query_file.start_ut;
  339. if(jf->msg_last_ut)
  340. newest_ut = MIN(fqs->query_file.stop_ut, jf->msg_last_ut);
  341. else if(jf->file_last_modified_ut)
  342. newest_ut = MIN(fqs->query_file.stop_ut, jf->file_last_modified_ut);
  343. else
  344. newest_ut = fqs->query_file.stop_ut;
  345. if(msg_ut < oldest_ut)
  346. oldest_ut = msg_ut - 1;
  347. }
  348. else /* BACKWARD */ {
  349. // the latest message we know (newest)
  350. newest_ut = fqs->query_file.first_msg_ut ? fqs->query_file.first_msg_ut : jf->msg_last_ut;
  351. if(!newest_ut) newest_ut = fqs->query_file.start_ut;
  352. if(jf->msg_first_ut)
  353. oldest_ut = MAX(fqs->query_file.stop_ut, jf->msg_first_ut);
  354. else
  355. oldest_ut = fqs->query_file.stop_ut;
  356. if(newest_ut < msg_ut)
  357. newest_ut = msg_ut + 1;
  358. }
  359. *after_ut = oldest_ut;
  360. *before_ut = newest_ut;
  361. }
  362. static double sampling_running_file_query_progress_by_time(FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf,
  363. FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
  364. usec_t after_ut, before_ut, elapsed_ut;
  365. sampling_running_file_query_overlapping_timeframe_ut(fqs, jf, direction, msg_ut, &after_ut, &before_ut);
  366. if(direction == FACETS_ANCHOR_DIRECTION_FORWARD)
  367. elapsed_ut = msg_ut - after_ut;
  368. else
  369. elapsed_ut = before_ut - msg_ut;
  370. usec_t total_ut = before_ut - after_ut;
  371. double progress = (double)elapsed_ut / (double)total_ut;
  372. return progress;
  373. }
  374. static usec_t sampling_running_file_query_remaining_time(FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf,
  375. FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut,
  376. usec_t *total_time_ut, usec_t *remaining_start_ut,
  377. usec_t *remaining_end_ut) {
  378. usec_t after_ut, before_ut;
  379. sampling_running_file_query_overlapping_timeframe_ut(fqs, jf, direction, msg_ut, &after_ut, &before_ut);
  380. // since we have a timestamp in msg_ut
  381. // this timestamp can extend the overlap
  382. if(msg_ut <= after_ut)
  383. after_ut = msg_ut - 1;
  384. if(msg_ut >= before_ut)
  385. before_ut = msg_ut + 1;
  386. // return the remaining duration
  387. usec_t remaining_from_ut, remaining_to_ut;
  388. if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) {
  389. remaining_from_ut = msg_ut;
  390. remaining_to_ut = before_ut;
  391. }
  392. else {
  393. remaining_from_ut = after_ut;
  394. remaining_to_ut = msg_ut;
  395. }
  396. usec_t remaining_ut = remaining_to_ut - remaining_from_ut;
  397. if(total_time_ut)
  398. *total_time_ut = (before_ut > after_ut) ? before_ut - after_ut : 1;
  399. if(remaining_start_ut)
  400. *remaining_start_ut = remaining_from_ut;
  401. if(remaining_end_ut)
  402. *remaining_end_ut = remaining_to_ut;
  403. return remaining_ut;
  404. }
  405. static size_t sampling_running_file_query_estimate_remaining_lines_by_time(FUNCTION_QUERY_STATUS *fqs,
  406. struct journal_file *jf,
  407. FACETS_ANCHOR_DIRECTION direction,
  408. usec_t msg_ut) {
  409. size_t scanned_lines = sampling_file_lines_scanned_so_far(fqs);
  410. // Calculate the proportion of time covered
  411. usec_t total_time_ut, remaining_start_ut, remaining_end_ut;
  412. usec_t remaining_time_ut = sampling_running_file_query_remaining_time(fqs, jf, direction, msg_ut, &total_time_ut,
  413. &remaining_start_ut, &remaining_end_ut);
  414. if (total_time_ut == 0) total_time_ut = 1;
  415. double proportion_by_time = (double) (total_time_ut - remaining_time_ut) / (double) total_time_ut;
  416. if (proportion_by_time == 0 || proportion_by_time > 1.0 || !isfinite(proportion_by_time))
  417. proportion_by_time = 1.0;
  418. // Estimate the total number of lines in the file
  419. size_t expected_matching_logs_by_time = (size_t)((double)scanned_lines / proportion_by_time);
  420. if(jf->messages_in_file && expected_matching_logs_by_time > jf->messages_in_file)
  421. expected_matching_logs_by_time = jf->messages_in_file;
  422. // Calculate the estimated number of remaining lines
  423. size_t remaining_logs_by_time = expected_matching_logs_by_time - scanned_lines;
  424. if (remaining_logs_by_time < 1) remaining_logs_by_time = 1;
  425. // nd_log(NDLS_COLLECTORS, NDLP_INFO,
  426. // "JOURNAL ESTIMATION: '%s' "
  427. // "scanned_lines=%zu [sampled=%zu, unsampled=%zu, estimated=%zu], "
  428. // "file [%"PRIu64" - %"PRIu64", duration %"PRId64", known lines in file %zu], "
  429. // "query [%"PRIu64" - %"PRIu64", duration %"PRId64"], "
  430. // "first message read from the file at %"PRIu64", current message at %"PRIu64", "
  431. // "proportion of time %.2f %%, "
  432. // "expected total lines in file %zu, "
  433. // "remaining lines %zu, "
  434. // "remaining time %"PRIu64" [%"PRIu64" - %"PRIu64", duration %"PRId64"]"
  435. // , jf->filename
  436. // , scanned_lines, fqs->samples_per_file.sampled, fqs->samples_per_file.unsampled, fqs->samples_per_file.estimated
  437. // , jf->msg_first_ut, jf->msg_last_ut, jf->msg_last_ut - jf->msg_first_ut, jf->messages_in_file
  438. // , fqs->query_file.start_ut, fqs->query_file.stop_ut, fqs->query_file.stop_ut - fqs->query_file.start_ut
  439. // , fqs->query_file.first_msg_ut, msg_ut
  440. // , proportion_by_time * 100.0
  441. // , expected_matching_logs_by_time
  442. // , remaining_logs_by_time
  443. // , remaining_time_ut, remaining_start_ut, remaining_end_ut, remaining_end_ut - remaining_start_ut
  444. // );
  445. return remaining_logs_by_time;
  446. }
  447. static size_t sampling_running_file_query_estimate_remaining_lines(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
  448. size_t expected_matching_logs_by_seqnum = 0;
  449. double proportion_by_seqnum = 0.0;
  450. size_t remaining_logs_by_seqnum = 0;
  451. #ifdef HAVE_SD_JOURNAL_GET_SEQNUM
  452. uint64_t current_msg_seqnum;
  453. sd_id128_t current_msg_writer;
  454. if(!fqs->query_file.first_msg_seqnum || sd_journal_get_seqnum(j, &current_msg_seqnum, &current_msg_writer) < 0) {
  455. fqs->query_file.first_msg_seqnum = 0;
  456. fqs->query_file.first_msg_writer = SD_ID128_NULL;
  457. }
  458. else if(jf->messages_in_file) {
  459. size_t scanned_lines = sampling_file_lines_scanned_so_far(fqs);
  460. double proportion_of_all_lines_so_far;
  461. if(direction == FACETS_ANCHOR_DIRECTION_FORWARD)
  462. proportion_of_all_lines_so_far = (double)scanned_lines / (double)(current_msg_seqnum - jf->first_seqnum);
  463. else
  464. proportion_of_all_lines_so_far = (double)scanned_lines / (double)(jf->last_seqnum - current_msg_seqnum);
  465. if(proportion_of_all_lines_so_far > 1.0)
  466. proportion_of_all_lines_so_far = 1.0;
  467. expected_matching_logs_by_seqnum = (size_t)(proportion_of_all_lines_so_far * (double)jf->messages_in_file);
  468. proportion_by_seqnum = (double)scanned_lines / (double)expected_matching_logs_by_seqnum;
  469. if (proportion_by_seqnum == 0 || proportion_by_seqnum > 1.0 || !isfinite(proportion_by_seqnum))
  470. proportion_by_seqnum = 1.0;
  471. remaining_logs_by_seqnum = expected_matching_logs_by_seqnum - scanned_lines;
  472. if(!remaining_logs_by_seqnum) remaining_logs_by_seqnum = 1;
  473. }
  474. #endif
  475. if(remaining_logs_by_seqnum)
  476. return remaining_logs_by_seqnum;
  477. return sampling_running_file_query_estimate_remaining_lines_by_time(fqs, jf, direction, msg_ut);
  478. }
  479. static void sampling_decide_file_sampling_every(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
  480. size_t files_matched = fqs->files_matched;
  481. if(!files_matched) files_matched = 1;
  482. size_t remaining_lines = sampling_running_file_query_estimate_remaining_lines(j, fqs, jf, direction, msg_ut);
  483. size_t wanted_samples = (fqs->sampling / 2) / files_matched;
  484. if(!wanted_samples) wanted_samples = 1;
  485. fqs->samples_per_file.every = remaining_lines / wanted_samples;
  486. if(fqs->samples_per_file.every < 1)
  487. fqs->samples_per_file.every = 1;
  488. }
  489. typedef enum {
  490. SAMPLING_STOP_AND_ESTIMATE = -1,
  491. SAMPLING_FULL = 0,
  492. SAMPLING_SKIP_FIELDS = 1,
  493. } sampling_t;
  494. static inline sampling_t is_row_in_sample(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, usec_t msg_ut, FACETS_ANCHOR_DIRECTION direction, bool candidate_to_keep) {
  495. if(!fqs->sampling || candidate_to_keep)
  496. return SAMPLING_FULL;
  497. if(unlikely(msg_ut < fqs->samples_per_time_slot.start_ut))
  498. msg_ut = fqs->samples_per_time_slot.start_ut;
  499. if(unlikely(msg_ut > fqs->samples_per_time_slot.end_ut))
  500. msg_ut = fqs->samples_per_time_slot.end_ut;
  501. size_t slot = (msg_ut - fqs->samples_per_time_slot.start_ut) / fqs->samples_per_time_slot.step_ut;
  502. if(slot >= fqs->samples.slots)
  503. slot = fqs->samples.slots - 1;
  504. bool should_sample = false;
  505. if(fqs->samples.sampled < fqs->samples.enable_after_samples ||
  506. fqs->samples_per_file.sampled < fqs->samples_per_file.enable_after_samples ||
  507. fqs->samples_per_time_slot.sampled[slot] < fqs->samples_per_time_slot.enable_after_samples)
  508. should_sample = true;
  509. else if(fqs->samples_per_file.recalibrate >= SYSTEMD_JOURNAL_SAMPLING_RECALIBRATE || !fqs->samples_per_file.every) {
  510. // this is the first to be unsampled for this file
  511. sampling_decide_file_sampling_every(j, fqs, jf, direction, msg_ut);
  512. fqs->samples_per_file.recalibrate = 0;
  513. should_sample = true;
  514. }
  515. else {
  516. // we sample 1 every fqs->samples_per_file.every
  517. if(fqs->samples_per_file.skipped >= fqs->samples_per_file.every) {
  518. fqs->samples_per_file.skipped = 0;
  519. should_sample = true;
  520. }
  521. else
  522. fqs->samples_per_file.skipped++;
  523. }
  524. if(should_sample) {
  525. fqs->samples.sampled++;
  526. fqs->samples_per_file.sampled++;
  527. fqs->samples_per_time_slot.sampled[slot]++;
  528. return SAMPLING_FULL;
  529. }
  530. fqs->samples_per_file.recalibrate++;
  531. fqs->samples.unsampled++;
  532. fqs->samples_per_file.unsampled++;
  533. fqs->samples_per_time_slot.unsampled[slot]++;
  534. if(fqs->samples_per_file.unsampled > fqs->samples_per_file.sampled) {
  535. double progress_by_time = sampling_running_file_query_progress_by_time(fqs, jf, direction, msg_ut);
  536. if(progress_by_time > SYSTEMD_JOURNAL_ENABLE_ESTIMATIONS_FILE_PERCENTAGE)
  537. return SAMPLING_STOP_AND_ESTIMATE;
  538. }
  539. return SAMPLING_SKIP_FIELDS;
  540. }
  541. static void sampling_update_running_query_file_estimates(FACETS *facets, sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, usec_t msg_ut, FACETS_ANCHOR_DIRECTION direction) {
  542. usec_t total_time_ut, remaining_start_ut, remaining_end_ut;
  543. sampling_running_file_query_remaining_time(fqs, jf, direction, msg_ut, &total_time_ut, &remaining_start_ut,
  544. &remaining_end_ut);
  545. size_t remaining_lines = sampling_running_file_query_estimate_remaining_lines(j, fqs, jf, direction, msg_ut);
  546. facets_update_estimations(facets, remaining_start_ut, remaining_end_ut, remaining_lines);
  547. fqs->samples.estimated += remaining_lines;
  548. fqs->samples_per_file.estimated += remaining_lines;
  549. }
  550. // ----------------------------------------------------------------------------
  551. static inline size_t netdata_systemd_journal_process_row(sd_journal *j, FACETS *facets, struct journal_file *jf, usec_t *msg_ut) {
  552. const void *data;
  553. size_t length, bytes = 0;
  554. facets_add_key_value_length(facets, JOURNAL_KEY_ND_JOURNAL_FILE, sizeof(JOURNAL_KEY_ND_JOURNAL_FILE) - 1, jf->filename, jf->filename_len);
  555. SD_JOURNAL_FOREACH_DATA(j, data, length) {
  556. const char *key, *value;
  557. size_t key_length, value_length;
  558. if(!parse_journal_field(data, length, &key, &key_length, &value, &value_length))
  559. continue;
  560. #ifdef NETDATA_INTERNAL_CHECKS
  561. usec_t origin_journal_ut = *msg_ut;
  562. #endif
  563. if(unlikely(key_length == sizeof(JD_SOURCE_REALTIME_TIMESTAMP) - 1 &&
  564. memcmp(key, JD_SOURCE_REALTIME_TIMESTAMP, sizeof(JD_SOURCE_REALTIME_TIMESTAMP) - 1) == 0)) {
  565. usec_t ut = str2ull(value, NULL);
  566. if(ut && ut < *msg_ut) {
  567. usec_t delta = *msg_ut - ut;
  568. *msg_ut = ut;
  569. if(delta > JOURNAL_VS_REALTIME_DELTA_MAX_UT)
  570. delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT;
  571. // update max_journal_vs_realtime_delta_ut if the delta increased
  572. usec_t expected = jf->max_journal_vs_realtime_delta_ut;
  573. do {
  574. if(delta <= expected)
  575. break;
  576. } while(!__atomic_compare_exchange_n(&jf->max_journal_vs_realtime_delta_ut, &expected, delta, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
  577. internal_error(delta > expected,
  578. "increased max_journal_vs_realtime_delta_ut from %"PRIu64" to %"PRIu64", "
  579. "journal %"PRIu64", actual %"PRIu64" (delta %"PRIu64")"
  580. , expected, delta, origin_journal_ut, *msg_ut, origin_journal_ut - (*msg_ut));
  581. }
  582. }
  583. bytes += length;
  584. facets_add_key_value_length(facets, key, key_length, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH);
  585. }
  586. return bytes;
  587. }
  588. #define FUNCTION_PROGRESS_UPDATE_ROWS(rows_read, rows) __atomic_fetch_add(&(rows_read), rows, __ATOMIC_RELAXED)
  589. #define FUNCTION_PROGRESS_UPDATE_BYTES(bytes_read, bytes) __atomic_fetch_add(&(bytes_read), bytes, __ATOMIC_RELAXED)
  590. #define FUNCTION_PROGRESS_EVERY_ROWS (1ULL << 13)
  591. #define FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS (1ULL << 7)
  592. static inline ND_SD_JOURNAL_STATUS check_stop(const bool *cancelled, const usec_t *stop_monotonic_ut) {
  593. if(cancelled && __atomic_load_n(cancelled, __ATOMIC_RELAXED)) {
  594. internal_error(true, "Function has been cancelled");
  595. return ND_SD_JOURNAL_CANCELLED;
  596. }
  597. if(now_monotonic_usec() > __atomic_load_n(stop_monotonic_ut, __ATOMIC_RELAXED)) {
  598. internal_error(true, "Function timed out");
  599. return ND_SD_JOURNAL_TIMED_OUT;
  600. }
  601. return ND_SD_JOURNAL_OK;
  602. }
  603. ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward(
  604. sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
  605. struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
  606. usec_t anchor_delta = __atomic_load_n(&jf->max_journal_vs_realtime_delta_ut, __ATOMIC_RELAXED);
  607. usec_t start_ut = ((fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut : fqs->before_ut) + anchor_delta;
  608. usec_t stop_ut = (fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->after_ut;
  609. bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut);
  610. fqs->query_file.start_ut = start_ut;
  611. fqs->query_file.stop_ut = stop_ut;
  612. if(!netdata_systemd_journal_seek_to(j, start_ut))
  613. return ND_SD_JOURNAL_FAILED_TO_SEEK;
  614. size_t errors_no_timestamp = 0;
  615. usec_t latest_msg_ut = 0; // the biggest timestamp we have seen so far
  616. usec_t first_msg_ut = 0; // the first message we got from the db
  617. size_t row_counter = 0, last_row_counter = 0, rows_useful = 0;
  618. size_t bytes = 0, last_bytes = 0;
  619. usec_t last_usec_from = 0;
  620. usec_t last_usec_to = 0;
  621. ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
  622. facets_rows_begin(facets);
  623. while (status == ND_SD_JOURNAL_OK && sd_journal_previous(j) > 0) {
  624. usec_t msg_ut = 0;
  625. if(sd_journal_get_realtime_usec(j, &msg_ut) < 0 || !msg_ut) {
  626. errors_no_timestamp++;
  627. continue;
  628. }
  629. if (unlikely(msg_ut > start_ut))
  630. continue;
  631. if (unlikely(msg_ut < stop_ut))
  632. break;
  633. if(unlikely(msg_ut > latest_msg_ut))
  634. latest_msg_ut = msg_ut;
  635. if(unlikely(!first_msg_ut)) {
  636. first_msg_ut = msg_ut;
  637. fqs->query_file.first_msg_ut = msg_ut;
  638. #ifdef HAVE_SD_JOURNAL_GET_SEQNUM
  639. if(sd_journal_get_seqnum(j, &fqs->query_file.first_msg_seqnum, &fqs->query_file.first_msg_writer) < 0) {
  640. fqs->query_file.first_msg_seqnum = 0;
  641. fqs->query_file.first_msg_writer = SD_ID128_NULL;
  642. }
  643. #endif
  644. }
  645. sampling_t sample = is_row_in_sample(j, fqs, jf, msg_ut,
  646. FACETS_ANCHOR_DIRECTION_BACKWARD,
  647. facets_row_candidate_to_keep(facets, msg_ut));
  648. if(sample == SAMPLING_FULL) {
  649. bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut);
  650. // make sure each line gets a unique timestamp
  651. if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to))
  652. msg_ut = --last_usec_from;
  653. else
  654. last_usec_from = last_usec_to = msg_ut;
  655. if(facets_row_finished(facets, msg_ut))
  656. rows_useful++;
  657. row_counter++;
  658. if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 &&
  659. stop_when_full &&
  660. facets_rows(facets) >= fqs->entries)) {
  661. // stop the data only query
  662. usec_t oldest = facets_row_oldest_ut(facets);
  663. if(oldest && msg_ut < (oldest - anchor_delta))
  664. break;
  665. }
  666. if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) {
  667. FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
  668. last_row_counter = row_counter;
  669. FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
  670. last_bytes = bytes;
  671. status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
  672. }
  673. }
  674. else if(sample == SAMPLING_SKIP_FIELDS)
  675. facets_row_finished_unsampled(facets, msg_ut);
  676. else {
  677. sampling_update_running_query_file_estimates(facets, j, fqs, jf, msg_ut, FACETS_ANCHOR_DIRECTION_BACKWARD);
  678. break;
  679. }
  680. }
  681. FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
  682. FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
  683. fqs->rows_useful += rows_useful;
  684. if(errors_no_timestamp)
  685. netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
  686. if(latest_msg_ut > fqs->last_modified)
  687. fqs->last_modified = latest_msg_ut;
  688. return status;
  689. }
  690. ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward(
  691. sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
  692. struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
  693. usec_t anchor_delta = __atomic_load_n(&jf->max_journal_vs_realtime_delta_ut, __ATOMIC_RELAXED);
  694. usec_t start_ut = (fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut : fqs->after_ut;
  695. usec_t stop_ut = ((fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->before_ut) + anchor_delta;
  696. bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut);
  697. fqs->query_file.start_ut = start_ut;
  698. fqs->query_file.stop_ut = stop_ut;
  699. if(!netdata_systemd_journal_seek_to(j, start_ut))
  700. return ND_SD_JOURNAL_FAILED_TO_SEEK;
  701. size_t errors_no_timestamp = 0;
  702. usec_t latest_msg_ut = 0; // the biggest timestamp we have seen so far
  703. usec_t first_msg_ut = 0; // the first message we got from the db
  704. size_t row_counter = 0, last_row_counter = 0, rows_useful = 0;
  705. size_t bytes = 0, last_bytes = 0;
  706. usec_t last_usec_from = 0;
  707. usec_t last_usec_to = 0;
  708. ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
  709. facets_rows_begin(facets);
  710. while (status == ND_SD_JOURNAL_OK && sd_journal_next(j) > 0) {
  711. usec_t msg_ut = 0;
  712. if(sd_journal_get_realtime_usec(j, &msg_ut) < 0 || !msg_ut) {
  713. errors_no_timestamp++;
  714. continue;
  715. }
  716. if (unlikely(msg_ut < start_ut))
  717. continue;
  718. if (unlikely(msg_ut > stop_ut))
  719. break;
  720. if(likely(msg_ut > latest_msg_ut))
  721. latest_msg_ut = msg_ut;
  722. if(unlikely(!first_msg_ut)) {
  723. first_msg_ut = msg_ut;
  724. fqs->query_file.first_msg_ut = msg_ut;
  725. }
  726. sampling_t sample = is_row_in_sample(j, fqs, jf, msg_ut,
  727. FACETS_ANCHOR_DIRECTION_FORWARD,
  728. facets_row_candidate_to_keep(facets, msg_ut));
  729. if(sample == SAMPLING_FULL) {
  730. bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut);
  731. // make sure each line gets a unique timestamp
  732. if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to))
  733. msg_ut = ++last_usec_to;
  734. else
  735. last_usec_from = last_usec_to = msg_ut;
  736. if(facets_row_finished(facets, msg_ut))
  737. rows_useful++;
  738. row_counter++;
  739. if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 &&
  740. stop_when_full &&
  741. facets_rows(facets) >= fqs->entries)) {
  742. // stop the data only query
  743. usec_t newest = facets_row_newest_ut(facets);
  744. if(newest && msg_ut > (newest + anchor_delta))
  745. break;
  746. }
  747. if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) {
  748. FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
  749. last_row_counter = row_counter;
  750. FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
  751. last_bytes = bytes;
  752. status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
  753. }
  754. }
  755. else if(sample == SAMPLING_SKIP_FIELDS)
  756. facets_row_finished_unsampled(facets, msg_ut);
  757. else {
  758. sampling_update_running_query_file_estimates(facets, j, fqs, jf, msg_ut, FACETS_ANCHOR_DIRECTION_FORWARD);
  759. break;
  760. }
  761. }
  762. FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
  763. FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
  764. fqs->rows_useful += rows_useful;
  765. if(errors_no_timestamp)
  766. netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
  767. if(latest_msg_ut > fqs->last_modified)
  768. fqs->last_modified = latest_msg_ut;
  769. return status;
  770. }
  771. bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) {
  772. // return true, if data have been modified since the timestamp
  773. if(!last_modified || !seek_to)
  774. return false;
  775. if(!netdata_systemd_journal_seek_to(j, seek_to))
  776. return false;
  777. usec_t first_msg_ut = 0;
  778. while (sd_journal_previous(j) > 0) {
  779. usec_t msg_ut;
  780. if(sd_journal_get_realtime_usec(j, &msg_ut) < 0)
  781. continue;
  782. first_msg_ut = msg_ut;
  783. break;
  784. }
  785. return first_msg_ut != last_modified;
  786. }
  787. #ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
  788. static bool netdata_systemd_filtering_by_journal(sd_journal *j, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) {
  789. const char *field = NULL;
  790. const void *data = NULL;
  791. size_t data_length;
  792. size_t added_keys = 0;
  793. size_t failures = 0;
  794. size_t filters_added = 0;
  795. SD_JOURNAL_FOREACH_FIELD(j, field) { // for each key
  796. bool interesting;
  797. if(fqs->data_only)
  798. interesting = facets_key_name_is_filter(facets, field);
  799. else
  800. interesting = facets_key_name_is_facet(facets, field);
  801. if(interesting) {
  802. if(sd_journal_query_unique(j, field) >= 0) {
  803. bool added_this_key = false;
  804. size_t added_values = 0;
  805. SD_JOURNAL_FOREACH_UNIQUE(j, data, data_length) { // for each value of the key
  806. const char *key, *value;
  807. size_t key_length, value_length;
  808. if(!parse_journal_field(data, data_length, &key, &key_length, &value, &value_length))
  809. continue;
  810. facets_add_possible_value_name_to_key(facets, key, key_length, value, value_length);
  811. if(!facets_key_name_value_length_is_selected(facets, key, key_length, value, value_length))
  812. continue;
  813. if(added_keys && !added_this_key) {
  814. if(sd_journal_add_conjunction(j) < 0) // key AND key AND key
  815. failures++;
  816. added_this_key = true;
  817. added_keys++;
  818. }
  819. else if(added_values)
  820. if(sd_journal_add_disjunction(j) < 0) // value OR value OR value
  821. failures++;
  822. if(sd_journal_add_match(j, data, data_length) < 0)
  823. failures++;
  824. if(!added_keys) {
  825. added_keys++;
  826. added_this_key = true;
  827. }
  828. added_values++;
  829. filters_added++;
  830. }
  831. }
  832. }
  833. }
  834. if(failures) {
  835. log_fqs(fqs, "failed to setup journal filter, will run the full query.");
  836. sd_journal_flush_matches(j);
  837. return true;
  838. }
  839. return filters_added ? true : false;
  840. }
  841. #endif // HAVE_SD_JOURNAL_RESTART_FIELDS
  842. static ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_one_file(
  843. const char *filename, BUFFER *wb, FACETS *facets,
  844. struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
  845. sd_journal *j = NULL;
  846. errno = 0;
  847. fstat_cache_enable_on_thread();
  848. const char *paths[2] = {
  849. [0] = filename,
  850. [1] = NULL,
  851. };
  852. if(sd_journal_open_files(&j, paths, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) {
  853. netdata_log_error("JOURNAL: cannot open file '%s' for query", filename);
  854. fstat_cache_disable_on_thread();
  855. return ND_SD_JOURNAL_FAILED_TO_OPEN;
  856. }
  857. ND_SD_JOURNAL_STATUS status;
  858. bool matches_filters = true;
  859. #ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
  860. if(fqs->slice) {
  861. usec_t started = now_monotonic_usec();
  862. matches_filters = netdata_systemd_filtering_by_journal(j, facets, fqs) || !fqs->filters;
  863. usec_t ended = now_monotonic_usec();
  864. fqs->matches_setup_ut += (ended - started);
  865. }
  866. #endif // HAVE_SD_JOURNAL_RESTART_FIELDS
  867. if(matches_filters) {
  868. if(fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD)
  869. status = netdata_systemd_journal_query_forward(j, wb, facets, jf, fqs);
  870. else
  871. status = netdata_systemd_journal_query_backward(j, wb, facets, jf, fqs);
  872. }
  873. else
  874. status = ND_SD_JOURNAL_NO_FILE_MATCHED;
  875. sd_journal_close(j);
  876. fstat_cache_disable_on_thread();
  877. return status;
  878. }
  879. static bool jf_is_mine(struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
  880. if((fqs->source_type == SDJF_NONE && !fqs->sources) || (jf->source_type & fqs->source_type) ||
  881. (fqs->sources && simple_pattern_matches(fqs->sources, string2str(jf->source)))) {
  882. if(!jf->msg_last_ut || !jf->msg_last_ut)
  883. // the file is not scanned yet, or the timestamps have not been updated,
  884. // so we don't know if it can contribute or not - let's add it.
  885. return true;
  886. usec_t anchor_delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT;
  887. usec_t first_ut = jf->msg_first_ut - anchor_delta;
  888. usec_t last_ut = jf->msg_last_ut + anchor_delta;
  889. if(last_ut >= fqs->after_ut && first_ut <= fqs->before_ut)
  890. return true;
  891. }
  892. return false;
  893. }
  894. static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) {
  895. ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_NO_FILE_MATCHED;
  896. struct journal_file *jf;
  897. fqs->files_matched = 0;
  898. fqs->file_working = 0;
  899. fqs->rows_useful = 0;
  900. fqs->rows_read = 0;
  901. fqs->bytes_read = 0;
  902. size_t files_used = 0;
  903. size_t files_max = dictionary_entries(journal_files_registry);
  904. const DICTIONARY_ITEM *file_items[files_max];
  905. // count the files
  906. bool files_are_newer = false;
  907. dfe_start_read(journal_files_registry, jf) {
  908. if(!jf_is_mine(jf, fqs))
  909. continue;
  910. file_items[files_used++] = dictionary_acquired_item_dup(journal_files_registry, jf_dfe.item);
  911. if(jf->msg_last_ut > fqs->if_modified_since)
  912. files_are_newer = true;
  913. }
  914. dfe_done(jf);
  915. fqs->files_matched = files_used;
  916. if(fqs->if_modified_since && !files_are_newer) {
  917. buffer_flush(wb);
  918. return HTTP_RESP_NOT_MODIFIED;
  919. }
  920. // sort the files, so that they are optimal for facets
  921. if(files_used >= 2) {
  922. if (fqs->direction == FACETS_ANCHOR_DIRECTION_BACKWARD)
  923. qsort(file_items, files_used, sizeof(const DICTIONARY_ITEM *),
  924. journal_file_dict_items_backward_compar);
  925. else
  926. qsort(file_items, files_used, sizeof(const DICTIONARY_ITEM *),
  927. journal_file_dict_items_forward_compar);
  928. }
  929. bool partial = false;
  930. usec_t query_started_ut = now_monotonic_usec();
  931. usec_t started_ut = query_started_ut;
  932. usec_t ended_ut = started_ut;
  933. usec_t duration_ut = 0, max_duration_ut = 0;
  934. sampling_query_init(fqs, facets);
  935. buffer_json_member_add_array(wb, "_journal_files");
  936. for(size_t f = 0; f < files_used ;f++) {
  937. const char *filename = dictionary_acquired_item_name(file_items[f]);
  938. jf = dictionary_acquired_item_value(file_items[f]);
  939. if(!jf_is_mine(jf, fqs))
  940. continue;
  941. started_ut = ended_ut;
  942. // do not even try to do the query if we expect it to pass the timeout
  943. if(ended_ut > (query_started_ut + (fqs->stop_monotonic_ut - query_started_ut) * 3 / 4) &&
  944. ended_ut + max_duration_ut * 2 >= fqs->stop_monotonic_ut) {
  945. partial = true;
  946. status = ND_SD_JOURNAL_TIMED_OUT;
  947. break;
  948. }
  949. fqs->file_working++;
  950. // fqs->cached_count = 0;
  951. size_t fs_calls = fstat_thread_calls;
  952. size_t fs_cached = fstat_thread_cached_responses;
  953. size_t rows_useful = fqs->rows_useful;
  954. size_t rows_read = fqs->rows_read;
  955. size_t bytes_read = fqs->bytes_read;
  956. size_t matches_setup_ut = fqs->matches_setup_ut;
  957. sampling_file_init(fqs, jf);
  958. ND_SD_JOURNAL_STATUS tmp_status = netdata_systemd_journal_query_one_file(filename, wb, facets, jf, fqs);
  959. // nd_log(NDLS_COLLECTORS, NDLP_INFO,
  960. // "JOURNAL ESTIMATION FINAL: '%s' "
  961. // "total lines %zu [sampled=%zu, unsampled=%zu, estimated=%zu], "
  962. // "file [%"PRIu64" - %"PRIu64", duration %"PRId64", known lines in file %zu], "
  963. // "query [%"PRIu64" - %"PRIu64", duration %"PRId64"], "
  964. // , jf->filename
  965. // , fqs->samples_per_file.sampled + fqs->samples_per_file.unsampled + fqs->samples_per_file.estimated
  966. // , fqs->samples_per_file.sampled, fqs->samples_per_file.unsampled, fqs->samples_per_file.estimated
  967. // , jf->msg_first_ut, jf->msg_last_ut, jf->msg_last_ut - jf->msg_first_ut, jf->messages_in_file
  968. // , fqs->query_file.start_ut, fqs->query_file.stop_ut, fqs->query_file.stop_ut - fqs->query_file.start_ut
  969. // );
  970. rows_useful = fqs->rows_useful - rows_useful;
  971. rows_read = fqs->rows_read - rows_read;
  972. bytes_read = fqs->bytes_read - bytes_read;
  973. matches_setup_ut = fqs->matches_setup_ut - matches_setup_ut;
  974. fs_calls = fstat_thread_calls - fs_calls;
  975. fs_cached = fstat_thread_cached_responses - fs_cached;
  976. ended_ut = now_monotonic_usec();
  977. duration_ut = ended_ut - started_ut;
  978. if(duration_ut > max_duration_ut)
  979. max_duration_ut = duration_ut;
  980. buffer_json_add_array_item_object(wb); // journal file
  981. {
  982. // information about the file
  983. buffer_json_member_add_string(wb, "_filename", filename);
  984. buffer_json_member_add_uint64(wb, "_source_type", jf->source_type);
  985. buffer_json_member_add_string(wb, "_source", string2str(jf->source));
  986. buffer_json_member_add_uint64(wb, "_last_modified_ut", jf->file_last_modified_ut);
  987. buffer_json_member_add_uint64(wb, "_msg_first_ut", jf->msg_first_ut);
  988. buffer_json_member_add_uint64(wb, "_msg_last_ut", jf->msg_last_ut);
  989. buffer_json_member_add_uint64(wb, "_journal_vs_realtime_delta_ut", jf->max_journal_vs_realtime_delta_ut);
  990. // information about the current use of the file
  991. buffer_json_member_add_uint64(wb, "duration_ut", ended_ut - started_ut);
  992. buffer_json_member_add_uint64(wb, "rows_read", rows_read);
  993. buffer_json_member_add_uint64(wb, "rows_useful", rows_useful);
  994. buffer_json_member_add_double(wb, "rows_per_second", (double) rows_read / (double) duration_ut * (double) USEC_PER_SEC);
  995. buffer_json_member_add_uint64(wb, "bytes_read", bytes_read);
  996. buffer_json_member_add_double(wb, "bytes_per_second", (double) bytes_read / (double) duration_ut * (double) USEC_PER_SEC);
  997. buffer_json_member_add_uint64(wb, "duration_matches_ut", matches_setup_ut);
  998. buffer_json_member_add_uint64(wb, "fstat_query_calls", fs_calls);
  999. buffer_json_member_add_uint64(wb, "fstat_query_cached_responses", fs_cached);
  1000. if(fqs->sampling) {
  1001. buffer_json_member_add_object(wb, "_sampling");
  1002. {
  1003. buffer_json_member_add_uint64(wb, "sampled", fqs->samples_per_file.sampled);
  1004. buffer_json_member_add_uint64(wb, "unsampled", fqs->samples_per_file.unsampled);
  1005. buffer_json_member_add_uint64(wb, "estimated", fqs->samples_per_file.estimated);
  1006. }
  1007. buffer_json_object_close(wb); // _sampling
  1008. }
  1009. }
  1010. buffer_json_object_close(wb); // journal file
  1011. bool stop = false;
  1012. switch(tmp_status) {
  1013. case ND_SD_JOURNAL_OK:
  1014. case ND_SD_JOURNAL_NO_FILE_MATCHED:
  1015. status = (status == ND_SD_JOURNAL_OK) ? ND_SD_JOURNAL_OK : tmp_status;
  1016. break;
  1017. case ND_SD_JOURNAL_FAILED_TO_OPEN:
  1018. case ND_SD_JOURNAL_FAILED_TO_SEEK:
  1019. partial = true;
  1020. if(status == ND_SD_JOURNAL_NO_FILE_MATCHED)
  1021. status = tmp_status;
  1022. break;
  1023. case ND_SD_JOURNAL_CANCELLED:
  1024. case ND_SD_JOURNAL_TIMED_OUT:
  1025. partial = true;
  1026. stop = true;
  1027. status = tmp_status;
  1028. break;
  1029. case ND_SD_JOURNAL_NOT_MODIFIED:
  1030. internal_fatal(true, "this should never be returned here");
  1031. break;
  1032. }
  1033. if(stop)
  1034. break;
  1035. }
  1036. buffer_json_array_close(wb); // _journal_files
  1037. // release the files
  1038. for(size_t f = 0; f < files_used ;f++)
  1039. dictionary_acquired_item_release(journal_files_registry, file_items[f]);
  1040. switch (status) {
  1041. case ND_SD_JOURNAL_OK:
  1042. if(fqs->if_modified_since && !fqs->rows_useful) {
  1043. buffer_flush(wb);
  1044. return HTTP_RESP_NOT_MODIFIED;
  1045. }
  1046. break;
  1047. case ND_SD_JOURNAL_TIMED_OUT:
  1048. case ND_SD_JOURNAL_NO_FILE_MATCHED:
  1049. break;
  1050. case ND_SD_JOURNAL_CANCELLED:
  1051. buffer_flush(wb);
  1052. return HTTP_RESP_CLIENT_CLOSED_REQUEST;
  1053. case ND_SD_JOURNAL_NOT_MODIFIED:
  1054. buffer_flush(wb);
  1055. return HTTP_RESP_NOT_MODIFIED;
  1056. default:
  1057. case ND_SD_JOURNAL_FAILED_TO_OPEN:
  1058. case ND_SD_JOURNAL_FAILED_TO_SEEK:
  1059. buffer_flush(wb);
  1060. return HTTP_RESP_INTERNAL_SERVER_ERROR;
  1061. }
  1062. buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
  1063. buffer_json_member_add_boolean(wb, "partial", partial);
  1064. buffer_json_member_add_string(wb, "type", "table");
  1065. // build a message for the query
  1066. if(!fqs->data_only) {
  1067. CLEAN_BUFFER *msg = buffer_create(0, NULL);
  1068. CLEAN_BUFFER *msg_description = buffer_create(0, NULL);
  1069. ND_LOG_FIELD_PRIORITY msg_priority = NDLP_INFO;
  1070. if(!journal_files_completed_once()) {
  1071. buffer_strcat(msg, "Journals are still being scanned. ");
  1072. buffer_strcat(msg_description
  1073. , "LIBRARY SCAN: The journal files are still being scanned, you are probably viewing incomplete data. ");
  1074. msg_priority = NDLP_WARNING;
  1075. }
  1076. if(partial) {
  1077. buffer_strcat(msg, "Query timed-out, incomplete data. ");
  1078. buffer_strcat(msg_description
  1079. , "QUERY TIMEOUT: The query timed out and may not include all the data of the selected window. ");
  1080. msg_priority = NDLP_WARNING;
  1081. }
  1082. if(fqs->samples.estimated || fqs->samples.unsampled) {
  1083. double percent = (double) (fqs->samples.sampled * 100.0 /
  1084. (fqs->samples.estimated + fqs->samples.unsampled + fqs->samples.sampled));
  1085. buffer_sprintf(msg, "%.2f%% real data", percent);
  1086. buffer_sprintf(msg_description, "ACTUAL DATA: The filters counters reflect %0.2f%% of the data. ", percent);
  1087. msg_priority = MIN(msg_priority, NDLP_NOTICE);
  1088. }
  1089. if(fqs->samples.unsampled) {
  1090. double percent = (double) (fqs->samples.unsampled * 100.0 /
  1091. (fqs->samples.estimated + fqs->samples.unsampled + fqs->samples.sampled));
  1092. buffer_sprintf(msg, ", %.2f%% unsampled", percent);
  1093. buffer_sprintf(msg_description
  1094. , "UNSAMPLED DATA: %0.2f%% of the events exist and have been counted, but their values have not been evaluated, so they are not included in the filters counters. "
  1095. , percent);
  1096. msg_priority = MIN(msg_priority, NDLP_NOTICE);
  1097. }
  1098. if(fqs->samples.estimated) {
  1099. double percent = (double) (fqs->samples.estimated * 100.0 /
  1100. (fqs->samples.estimated + fqs->samples.unsampled + fqs->samples.sampled));
  1101. buffer_sprintf(msg, ", %.2f%% estimated", percent);
  1102. buffer_sprintf(msg_description
  1103. , "ESTIMATED DATA: The query selected a large amount of data, so to avoid delaying too much, the presented data are estimated by %0.2f%%. "
  1104. , percent);
  1105. msg_priority = MIN(msg_priority, NDLP_NOTICE);
  1106. }
  1107. buffer_json_member_add_object(wb, "message");
  1108. if(buffer_tostring(msg)) {
  1109. buffer_json_member_add_string(wb, "title", buffer_tostring(msg));
  1110. buffer_json_member_add_string(wb, "description", buffer_tostring(msg_description));
  1111. buffer_json_member_add_string(wb, "status", nd_log_id2priority(msg_priority));
  1112. }
  1113. // else send an empty object if there is nothing to tell
  1114. buffer_json_object_close(wb); // message
  1115. }
  1116. if(!fqs->data_only) {
  1117. buffer_json_member_add_time_t(wb, "update_every", 1);
  1118. buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
  1119. }
  1120. if(!fqs->data_only || fqs->tail)
  1121. buffer_json_member_add_uint64(wb, "last_modified", fqs->last_modified);
  1122. facets_sort_and_reorder_keys(facets);
  1123. facets_report(facets, wb, used_hashes_registry);
  1124. buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + (fqs->data_only ? 3600 : 0));
  1125. buffer_json_member_add_object(wb, "_fstat_caching");
  1126. {
  1127. buffer_json_member_add_uint64(wb, "calls", fstat_thread_calls);
  1128. buffer_json_member_add_uint64(wb, "cached", fstat_thread_cached_responses);
  1129. }
  1130. buffer_json_object_close(wb); // _fstat_caching
  1131. if(fqs->sampling) {
  1132. buffer_json_member_add_object(wb, "_sampling");
  1133. {
  1134. buffer_json_member_add_uint64(wb, "sampled", fqs->samples.sampled);
  1135. buffer_json_member_add_uint64(wb, "unsampled", fqs->samples.unsampled);
  1136. buffer_json_member_add_uint64(wb, "estimated", fqs->samples.estimated);
  1137. }
  1138. buffer_json_object_close(wb); // _sampling
  1139. }
  1140. buffer_json_finalize(wb);
  1141. return HTTP_RESP_OK;
  1142. }
  1143. static void netdata_systemd_journal_function_help(const char *transaction) {
  1144. BUFFER *wb = buffer_create(0, NULL);
  1145. buffer_sprintf(wb,
  1146. "%s / %s\n"
  1147. "\n"
  1148. "%s\n"
  1149. "\n"
  1150. "The following parameters are supported:\n"
  1151. "\n"
  1152. " "JOURNAL_PARAMETER_HELP"\n"
  1153. " Shows this help message.\n"
  1154. "\n"
  1155. " "JOURNAL_PARAMETER_INFO"\n"
  1156. " Request initial configuration information about the plugin.\n"
  1157. " The key entity returned is the required_params array, which includes\n"
  1158. " all the available systemd journal sources.\n"
  1159. " When `"JOURNAL_PARAMETER_INFO"` is requested, all other parameters are ignored.\n"
  1160. "\n"
  1161. " "JOURNAL_PARAMETER_ID":STRING\n"
  1162. " Caller supplied unique ID of the request.\n"
  1163. " This can be used later to request a progress report of the query.\n"
  1164. " Optional, but if omitted no `"JOURNAL_PARAMETER_PROGRESS"` can be requested.\n"
  1165. "\n"
  1166. " "JOURNAL_PARAMETER_PROGRESS"\n"
  1167. " Request a progress report (the `id` of a running query is required).\n"
  1168. " When `"JOURNAL_PARAMETER_PROGRESS"` is requested, only parameter `"JOURNAL_PARAMETER_ID"` is used.\n"
  1169. "\n"
  1170. " "JOURNAL_PARAMETER_DATA_ONLY":true or "JOURNAL_PARAMETER_DATA_ONLY":false\n"
  1171. " Quickly respond with data requested, without generating a\n"
  1172. " `histogram`, `facets` counters and `items`.\n"
  1173. "\n"
  1174. " "JOURNAL_PARAMETER_DELTA":true or "JOURNAL_PARAMETER_DELTA":false\n"
  1175. " When doing data only queries, include deltas for histogram, facets and items.\n"
  1176. "\n"
  1177. " "JOURNAL_PARAMETER_TAIL":true or "JOURNAL_PARAMETER_TAIL":false\n"
  1178. " When doing data only queries, respond with the newest messages,\n"
  1179. " and up to the anchor, but calculate deltas (if requested) for\n"
  1180. " the duration [anchor - before].\n"
  1181. "\n"
  1182. " "JOURNAL_PARAMETER_SLICE":true or "JOURNAL_PARAMETER_SLICE":false\n"
  1183. " When it is turned on, the plugin is executing filtering via libsystemd,\n"
  1184. " utilizing all the available indexes of the journal files.\n"
  1185. " When it is off, only the time constraint is handled by libsystemd and\n"
  1186. " all filtering is done by the plugin.\n"
  1187. " The default is: %s\n"
  1188. "\n"
  1189. " "JOURNAL_PARAMETER_SOURCE":SOURCE\n"
  1190. " Query only the specified journal sources.\n"
  1191. " Do an `"JOURNAL_PARAMETER_INFO"` query to find the sources.\n"
  1192. "\n"
  1193. " "JOURNAL_PARAMETER_BEFORE":TIMESTAMP_IN_SECONDS\n"
  1194. " Absolute or relative (to now) timestamp in seconds, to start the query.\n"
  1195. " The query is always executed from the most recent to the oldest log entry.\n"
  1196. " If not given the default is: now.\n"
  1197. "\n"
  1198. " "JOURNAL_PARAMETER_AFTER":TIMESTAMP_IN_SECONDS\n"
  1199. " Absolute or relative (to `before`) timestamp in seconds, to end the query.\n"
  1200. " If not given, the default is %d.\n"
  1201. "\n"
  1202. " "JOURNAL_PARAMETER_LAST":ITEMS\n"
  1203. " The number of items to return.\n"
  1204. " The default is %d.\n"
  1205. "\n"
  1206. " "JOURNAL_PARAMETER_SAMPLING":ITEMS\n"
  1207. " The number of log entries to sample to estimate facets counters and histogram.\n"
  1208. " The default is %d.\n"
  1209. "\n"
  1210. " "JOURNAL_PARAMETER_ANCHOR":TIMESTAMP_IN_MICROSECONDS\n"
  1211. " Return items relative to this timestamp.\n"
  1212. " The exact items to be returned depend on the query `"JOURNAL_PARAMETER_DIRECTION"`.\n"
  1213. "\n"
  1214. " "JOURNAL_PARAMETER_DIRECTION":forward or "JOURNAL_PARAMETER_DIRECTION":backward\n"
  1215. " When set to `backward` (default) the items returned are the newest before the\n"
  1216. " `"JOURNAL_PARAMETER_ANCHOR"`, (or `"JOURNAL_PARAMETER_BEFORE"` if `"JOURNAL_PARAMETER_ANCHOR"` is not set)\n"
  1217. " When set to `forward` the items returned are the oldest after the\n"
  1218. " `"JOURNAL_PARAMETER_ANCHOR"`, (or `"JOURNAL_PARAMETER_AFTER"` if `"JOURNAL_PARAMETER_ANCHOR"` is not set)\n"
  1219. " The default is: %s\n"
  1220. "\n"
  1221. " "JOURNAL_PARAMETER_QUERY":SIMPLE_PATTERN\n"
  1222. " Do a full text search to find the log entries matching the pattern given.\n"
  1223. " The plugin is searching for matches on all fields of the database.\n"
  1224. "\n"
  1225. " "JOURNAL_PARAMETER_IF_MODIFIED_SINCE":TIMESTAMP_IN_MICROSECONDS\n"
  1226. " Each successful response, includes a `last_modified` field.\n"
  1227. " By providing the timestamp to the `"JOURNAL_PARAMETER_IF_MODIFIED_SINCE"` parameter,\n"
  1228. " the plugin will return 200 with a successful response, or 304 if the source has not\n"
  1229. " been modified since that timestamp.\n"
  1230. "\n"
  1231. " "JOURNAL_PARAMETER_HISTOGRAM":facet_id\n"
  1232. " Use the given `facet_id` for the histogram.\n"
  1233. " This parameter is ignored in `"JOURNAL_PARAMETER_DATA_ONLY"` mode.\n"
  1234. "\n"
  1235. " "JOURNAL_PARAMETER_FACETS":facet_id1,facet_id2,facet_id3,...\n"
  1236. " Add the given facets to the list of fields for which analysis is required.\n"
  1237. " The plugin will offer both a histogram and facet value counters for its values.\n"
  1238. " This parameter is ignored in `"JOURNAL_PARAMETER_DATA_ONLY"` mode.\n"
  1239. "\n"
  1240. " facet_id:value_id1,value_id2,value_id3,...\n"
  1241. " Apply filters to the query, based on the facet IDs returned.\n"
  1242. " Each `facet_id` can be given once, but multiple `facet_ids` can be given.\n"
  1243. "\n"
  1244. , program_name
  1245. , SYSTEMD_JOURNAL_FUNCTION_NAME
  1246. , SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION
  1247. , JOURNAL_DEFAULT_SLICE_MODE ? "true" : "false" // slice
  1248. , -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION
  1249. , SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY
  1250. , SYSTEMD_JOURNAL_DEFAULT_ITEMS_SAMPLING
  1251. , JOURNAL_DEFAULT_DIRECTION == FACETS_ANCHOR_DIRECTION_BACKWARD ? "backward" : "forward"
  1252. );
  1253. netdata_mutex_lock(&stdout_mutex);
  1254. pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb);
  1255. netdata_mutex_unlock(&stdout_mutex);
  1256. buffer_free(wb);
  1257. }
  1258. DICTIONARY *function_query_status_dict = NULL;
  1259. static void function_systemd_journal_progress(BUFFER *wb, const char *transaction, const char *progress_id) {
  1260. if(!progress_id || !(*progress_id)) {
  1261. netdata_mutex_lock(&stdout_mutex);
  1262. pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, "missing progress id");
  1263. netdata_mutex_unlock(&stdout_mutex);
  1264. return;
  1265. }
  1266. const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(function_query_status_dict, progress_id);
  1267. if(!item) {
  1268. netdata_mutex_lock(&stdout_mutex);
  1269. pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, "progress id is not found here");
  1270. netdata_mutex_unlock(&stdout_mutex);
  1271. return;
  1272. }
  1273. FUNCTION_QUERY_STATUS *fqs = dictionary_acquired_item_value(item);
  1274. usec_t now_monotonic_ut = now_monotonic_usec();
  1275. if(now_monotonic_ut + 10 * USEC_PER_SEC > fqs->stop_monotonic_ut)
  1276. fqs->stop_monotonic_ut = now_monotonic_ut + 10 * USEC_PER_SEC;
  1277. usec_t duration_ut = now_monotonic_ut - fqs->started_monotonic_ut;
  1278. size_t files_matched = fqs->files_matched;
  1279. size_t file_working = fqs->file_working;
  1280. if(file_working > files_matched)
  1281. files_matched = file_working;
  1282. size_t rows_read = __atomic_load_n(&fqs->rows_read, __ATOMIC_RELAXED);
  1283. size_t bytes_read = __atomic_load_n(&fqs->bytes_read, __ATOMIC_RELAXED);
  1284. buffer_flush(wb);
  1285. buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
  1286. buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
  1287. buffer_json_member_add_string(wb, "type", "table");
  1288. buffer_json_member_add_uint64(wb, "running_duration_usec", duration_ut);
  1289. buffer_json_member_add_double(wb, "progress", (double)file_working * 100.0 / (double)files_matched);
  1290. char msg[1024 + 1];
  1291. snprintfz(msg, sizeof(msg) - 1,
  1292. "Read %zu rows (%0.0f rows/s), "
  1293. "data %0.1f MB (%0.1f MB/s), "
  1294. "file %zu of %zu",
  1295. rows_read, (double)rows_read / (double)duration_ut * (double)USEC_PER_SEC,
  1296. (double)bytes_read / 1024.0 / 1024.0, ((double)bytes_read / (double)duration_ut * (double)USEC_PER_SEC) / 1024.0 / 1024.0,
  1297. file_working, files_matched
  1298. );
  1299. buffer_json_member_add_string(wb, "message", msg);
  1300. buffer_json_finalize(wb);
  1301. netdata_mutex_lock(&stdout_mutex);
  1302. pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", now_realtime_sec() + 1, wb);
  1303. netdata_mutex_unlock(&stdout_mutex);
  1304. dictionary_acquired_item_release(function_query_status_dict, item);
  1305. }
  1306. void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) {
  1307. fstat_thread_calls = 0;
  1308. fstat_thread_cached_responses = 0;
  1309. BUFFER *wb = buffer_create(0, NULL);
  1310. buffer_flush(wb);
  1311. buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
  1312. usec_t now_monotonic_ut = now_monotonic_usec();
  1313. FUNCTION_QUERY_STATUS tmp_fqs = {
  1314. .cancelled = cancelled,
  1315. .started_monotonic_ut = now_monotonic_ut,
  1316. .stop_monotonic_ut = now_monotonic_ut + (timeout * USEC_PER_SEC),
  1317. };
  1318. FUNCTION_QUERY_STATUS *fqs = NULL;
  1319. const DICTIONARY_ITEM *fqs_item = NULL;
  1320. FACETS *facets = facets_create(50, FACETS_OPTION_ALL_KEYS_FTS,
  1321. SYSTEMD_ALWAYS_VISIBLE_KEYS,
  1322. SYSTEMD_KEYS_INCLUDED_IN_FACETS,
  1323. SYSTEMD_KEYS_EXCLUDED_FROM_FACETS);
  1324. facets_accepted_param(facets, JOURNAL_PARAMETER_INFO);
  1325. facets_accepted_param(facets, JOURNAL_PARAMETER_SOURCE);
  1326. facets_accepted_param(facets, JOURNAL_PARAMETER_AFTER);
  1327. facets_accepted_param(facets, JOURNAL_PARAMETER_BEFORE);
  1328. facets_accepted_param(facets, JOURNAL_PARAMETER_ANCHOR);
  1329. facets_accepted_param(facets, JOURNAL_PARAMETER_DIRECTION);
  1330. facets_accepted_param(facets, JOURNAL_PARAMETER_LAST);
  1331. facets_accepted_param(facets, JOURNAL_PARAMETER_QUERY);
  1332. facets_accepted_param(facets, JOURNAL_PARAMETER_FACETS);
  1333. facets_accepted_param(facets, JOURNAL_PARAMETER_HISTOGRAM);
  1334. facets_accepted_param(facets, JOURNAL_PARAMETER_IF_MODIFIED_SINCE);
  1335. facets_accepted_param(facets, JOURNAL_PARAMETER_DATA_ONLY);
  1336. facets_accepted_param(facets, JOURNAL_PARAMETER_ID);
  1337. facets_accepted_param(facets, JOURNAL_PARAMETER_PROGRESS);
  1338. facets_accepted_param(facets, JOURNAL_PARAMETER_DELTA);
  1339. facets_accepted_param(facets, JOURNAL_PARAMETER_TAIL);
  1340. facets_accepted_param(facets, JOURNAL_PARAMETER_SAMPLING);
  1341. #ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
  1342. facets_accepted_param(facets, JOURNAL_PARAMETER_SLICE);
  1343. #endif // HAVE_SD_JOURNAL_RESTART_FIELDS
  1344. // register the fields in the order you want them on the dashboard
  1345. facets_register_row_severity(facets, syslog_priority_to_facet_severity, NULL);
  1346. facets_register_key_name(facets, "_HOSTNAME",
  1347. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_VISIBLE);
  1348. facets_register_dynamic_key_name(facets, JOURNAL_KEY_ND_JOURNAL_PROCESS,
  1349. FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE,
  1350. netdata_systemd_journal_dynamic_row_id, NULL);
  1351. facets_register_key_name(facets, "MESSAGE",
  1352. FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT |
  1353. FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS);
  1354. // facets_register_dynamic_key_name(facets, "MESSAGE",
  1355. // FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_RICH_TEXT |
  1356. // FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS,
  1357. // netdata_systemd_journal_rich_message, NULL);
  1358. facets_register_key_name_transformation(facets, "PRIORITY",
  1359. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW |
  1360. FACET_KEY_OPTION_EXPANDED_FILTER,
  1361. netdata_systemd_journal_transform_priority, NULL);
  1362. facets_register_key_name_transformation(facets, "SYSLOG_FACILITY",
  1363. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW |
  1364. FACET_KEY_OPTION_EXPANDED_FILTER,
  1365. netdata_systemd_journal_transform_syslog_facility, NULL);
  1366. facets_register_key_name_transformation(facets, "ERRNO",
  1367. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1368. netdata_systemd_journal_transform_errno, NULL);
  1369. facets_register_key_name(facets, JOURNAL_KEY_ND_JOURNAL_FILE,
  1370. FACET_KEY_OPTION_NEVER_FACET);
  1371. facets_register_key_name(facets, "SYSLOG_IDENTIFIER",
  1372. FACET_KEY_OPTION_FACET);
  1373. facets_register_key_name(facets, "UNIT",
  1374. FACET_KEY_OPTION_FACET);
  1375. facets_register_key_name(facets, "USER_UNIT",
  1376. FACET_KEY_OPTION_FACET);
  1377. facets_register_key_name_transformation(facets, "MESSAGE_ID",
  1378. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW |
  1379. FACET_KEY_OPTION_EXPANDED_FILTER,
  1380. netdata_systemd_journal_transform_message_id, NULL);
  1381. facets_register_key_name_transformation(facets, "_BOOT_ID",
  1382. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1383. netdata_systemd_journal_transform_boot_id, NULL);
  1384. facets_register_key_name_transformation(facets, "_SYSTEMD_OWNER_UID",
  1385. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1386. netdata_systemd_journal_transform_uid, NULL);
  1387. facets_register_key_name_transformation(facets, "_UID",
  1388. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1389. netdata_systemd_journal_transform_uid, NULL);
  1390. facets_register_key_name_transformation(facets, "OBJECT_SYSTEMD_OWNER_UID",
  1391. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1392. netdata_systemd_journal_transform_uid, NULL);
  1393. facets_register_key_name_transformation(facets, "OBJECT_UID",
  1394. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1395. netdata_systemd_journal_transform_uid, NULL);
  1396. facets_register_key_name_transformation(facets, "_GID",
  1397. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1398. netdata_systemd_journal_transform_gid, NULL);
  1399. facets_register_key_name_transformation(facets, "OBJECT_GID",
  1400. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1401. netdata_systemd_journal_transform_gid, NULL);
  1402. facets_register_key_name_transformation(facets, "_CAP_EFFECTIVE",
  1403. FACET_KEY_OPTION_TRANSFORM_VIEW,
  1404. netdata_systemd_journal_transform_cap_effective, NULL);
  1405. facets_register_key_name_transformation(facets, "_AUDIT_LOGINUID",
  1406. FACET_KEY_OPTION_TRANSFORM_VIEW,
  1407. netdata_systemd_journal_transform_uid, NULL);
  1408. facets_register_key_name_transformation(facets, "OBJECT_AUDIT_LOGINUID",
  1409. FACET_KEY_OPTION_TRANSFORM_VIEW,
  1410. netdata_systemd_journal_transform_uid, NULL);
  1411. facets_register_key_name_transformation(facets, "_SOURCE_REALTIME_TIMESTAMP",
  1412. FACET_KEY_OPTION_TRANSFORM_VIEW,
  1413. netdata_systemd_journal_transform_timestamp_usec, NULL);
  1414. // ------------------------------------------------------------------------
  1415. // parse the parameters
  1416. bool info = false, data_only = false, progress = false, slice = JOURNAL_DEFAULT_SLICE_MODE, delta = false, tail = false;
  1417. time_t after_s = 0, before_s = 0;
  1418. usec_t anchor = 0;
  1419. usec_t if_modified_since = 0;
  1420. size_t last = 0;
  1421. FACETS_ANCHOR_DIRECTION direction = JOURNAL_DEFAULT_DIRECTION;
  1422. const char *query = NULL;
  1423. const char *chart = NULL;
  1424. SIMPLE_PATTERN *sources = NULL;
  1425. const char *progress_id = NULL;
  1426. SD_JOURNAL_FILE_SOURCE_TYPE source_type = SDJF_ALL;
  1427. size_t filters = 0;
  1428. size_t sampling = SYSTEMD_JOURNAL_DEFAULT_ITEMS_SAMPLING;
  1429. buffer_json_member_add_object(wb, "_request");
  1430. char *words[SYSTEMD_JOURNAL_MAX_PARAMS] = { NULL };
  1431. size_t num_words = quoted_strings_splitter_pluginsd(function, words, SYSTEMD_JOURNAL_MAX_PARAMS);
  1432. for(int i = 1; i < SYSTEMD_JOURNAL_MAX_PARAMS ;i++) {
  1433. char *keyword = get_word(words, num_words, i);
  1434. if(!keyword) break;
  1435. if(strcmp(keyword, JOURNAL_PARAMETER_HELP) == 0) {
  1436. netdata_systemd_journal_function_help(transaction);
  1437. goto cleanup;
  1438. }
  1439. else if(strcmp(keyword, JOURNAL_PARAMETER_INFO) == 0) {
  1440. info = true;
  1441. }
  1442. else if(strcmp(keyword, JOURNAL_PARAMETER_PROGRESS) == 0) {
  1443. progress = true;
  1444. }
  1445. else if(strncmp(keyword, JOURNAL_PARAMETER_DELTA ":", sizeof(JOURNAL_PARAMETER_DELTA ":") - 1) == 0) {
  1446. char *v = &keyword[sizeof(JOURNAL_PARAMETER_DELTA ":") - 1];
  1447. if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
  1448. delta = false;
  1449. else
  1450. delta = true;
  1451. }
  1452. else if(strncmp(keyword, JOURNAL_PARAMETER_TAIL ":", sizeof(JOURNAL_PARAMETER_TAIL ":") - 1) == 0) {
  1453. char *v = &keyword[sizeof(JOURNAL_PARAMETER_TAIL ":") - 1];
  1454. if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
  1455. tail = false;
  1456. else
  1457. tail = true;
  1458. }
  1459. else if(strncmp(keyword, JOURNAL_PARAMETER_SAMPLING ":", sizeof(JOURNAL_PARAMETER_SAMPLING ":") - 1) == 0) {
  1460. sampling = str2ul(&keyword[sizeof(JOURNAL_PARAMETER_SAMPLING ":") - 1]);
  1461. }
  1462. else if(strncmp(keyword, JOURNAL_PARAMETER_DATA_ONLY ":", sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1) == 0) {
  1463. char *v = &keyword[sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1];
  1464. if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
  1465. data_only = false;
  1466. else
  1467. data_only = true;
  1468. }
  1469. else if(strncmp(keyword, JOURNAL_PARAMETER_SLICE ":", sizeof(JOURNAL_PARAMETER_SLICE ":") - 1) == 0) {
  1470. char *v = &keyword[sizeof(JOURNAL_PARAMETER_SLICE ":") - 1];
  1471. if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
  1472. slice = false;
  1473. else
  1474. slice = true;
  1475. }
  1476. else if(strncmp(keyword, JOURNAL_PARAMETER_ID ":", sizeof(JOURNAL_PARAMETER_ID ":") - 1) == 0) {
  1477. char *id = &keyword[sizeof(JOURNAL_PARAMETER_ID ":") - 1];
  1478. if(*id)
  1479. progress_id = id;
  1480. }
  1481. else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) {
  1482. const char *value = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1];
  1483. buffer_json_member_add_array(wb, JOURNAL_PARAMETER_SOURCE);
  1484. BUFFER *sources_list = buffer_create(0, NULL);
  1485. source_type = SDJF_NONE;
  1486. while(value) {
  1487. char *sep = strchr(value, ',');
  1488. if(sep)
  1489. *sep++ = '\0';
  1490. buffer_json_add_array_item_string(wb, value);
  1491. if(strcmp(value, SDJF_SOURCE_ALL_NAME) == 0) {
  1492. source_type |= SDJF_ALL;
  1493. value = NULL;
  1494. }
  1495. else if(strcmp(value, SDJF_SOURCE_LOCAL_NAME) == 0) {
  1496. source_type |= SDJF_LOCAL_ALL;
  1497. value = NULL;
  1498. }
  1499. else if(strcmp(value, SDJF_SOURCE_REMOTES_NAME) == 0) {
  1500. source_type |= SDJF_REMOTE_ALL;
  1501. value = NULL;
  1502. }
  1503. else if(strcmp(value, SDJF_SOURCE_NAMESPACES_NAME) == 0) {
  1504. source_type |= SDJF_LOCAL_NAMESPACE;
  1505. value = NULL;
  1506. }
  1507. else if(strcmp(value, SDJF_SOURCE_LOCAL_SYSTEM_NAME) == 0) {
  1508. source_type |= SDJF_LOCAL_SYSTEM;
  1509. value = NULL;
  1510. }
  1511. else if(strcmp(value, SDJF_SOURCE_LOCAL_USERS_NAME) == 0) {
  1512. source_type |= SDJF_LOCAL_USER;
  1513. value = NULL;
  1514. }
  1515. else if(strcmp(value, SDJF_SOURCE_LOCAL_OTHER_NAME) == 0) {
  1516. source_type |= SDJF_LOCAL_OTHER;
  1517. value = NULL;
  1518. }
  1519. else {
  1520. // else, match the source, whatever it is
  1521. if(buffer_strlen(sources_list))
  1522. buffer_strcat(sources_list, ",");
  1523. buffer_strcat(sources_list, value);
  1524. }
  1525. value = sep;
  1526. }
  1527. if(buffer_strlen(sources_list)) {
  1528. simple_pattern_free(sources);
  1529. sources = simple_pattern_create(buffer_tostring(sources_list), ",", SIMPLE_PATTERN_EXACT, false);
  1530. }
  1531. buffer_free(sources_list);
  1532. buffer_json_array_close(wb); // source
  1533. }
  1534. else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", sizeof(JOURNAL_PARAMETER_AFTER ":") - 1) == 0) {
  1535. after_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_AFTER ":") - 1]);
  1536. }
  1537. else if(strncmp(keyword, JOURNAL_PARAMETER_BEFORE ":", sizeof(JOURNAL_PARAMETER_BEFORE ":") - 1) == 0) {
  1538. before_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_BEFORE ":") - 1]);
  1539. }
  1540. else if(strncmp(keyword, JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":", sizeof(JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":") - 1) == 0) {
  1541. if_modified_since = str2ull(&keyword[sizeof(JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":") - 1], NULL);
  1542. }
  1543. else if(strncmp(keyword, JOURNAL_PARAMETER_ANCHOR ":", sizeof(JOURNAL_PARAMETER_ANCHOR ":") - 1) == 0) {
  1544. anchor = str2ull(&keyword[sizeof(JOURNAL_PARAMETER_ANCHOR ":") - 1], NULL);
  1545. }
  1546. else if(strncmp(keyword, JOURNAL_PARAMETER_DIRECTION ":", sizeof(JOURNAL_PARAMETER_DIRECTION ":") - 1) == 0) {
  1547. direction = strcasecmp(&keyword[sizeof(JOURNAL_PARAMETER_DIRECTION ":") - 1], "forward") == 0 ? FACETS_ANCHOR_DIRECTION_FORWARD : FACETS_ANCHOR_DIRECTION_BACKWARD;
  1548. }
  1549. else if(strncmp(keyword, JOURNAL_PARAMETER_LAST ":", sizeof(JOURNAL_PARAMETER_LAST ":") - 1) == 0) {
  1550. last = str2ul(&keyword[sizeof(JOURNAL_PARAMETER_LAST ":") - 1]);
  1551. }
  1552. else if(strncmp(keyword, JOURNAL_PARAMETER_QUERY ":", sizeof(JOURNAL_PARAMETER_QUERY ":") - 1) == 0) {
  1553. query= &keyword[sizeof(JOURNAL_PARAMETER_QUERY ":") - 1];
  1554. }
  1555. else if(strncmp(keyword, JOURNAL_PARAMETER_HISTOGRAM ":", sizeof(JOURNAL_PARAMETER_HISTOGRAM ":") - 1) == 0) {
  1556. chart = &keyword[sizeof(JOURNAL_PARAMETER_HISTOGRAM ":") - 1];
  1557. }
  1558. else if(strncmp(keyword, JOURNAL_PARAMETER_FACETS ":", sizeof(JOURNAL_PARAMETER_FACETS ":") - 1) == 0) {
  1559. char *value = &keyword[sizeof(JOURNAL_PARAMETER_FACETS ":") - 1];
  1560. if(*value) {
  1561. buffer_json_member_add_array(wb, JOURNAL_PARAMETER_FACETS);
  1562. while(value) {
  1563. char *sep = strchr(value, ',');
  1564. if(sep)
  1565. *sep++ = '\0';
  1566. facets_register_facet_id(facets, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER);
  1567. buffer_json_add_array_item_string(wb, value);
  1568. value = sep;
  1569. }
  1570. buffer_json_array_close(wb); // JOURNAL_PARAMETER_FACETS
  1571. }
  1572. }
  1573. else {
  1574. char *value = strchr(keyword, ':');
  1575. if(value) {
  1576. *value++ = '\0';
  1577. buffer_json_member_add_array(wb, keyword);
  1578. while(value) {
  1579. char *sep = strchr(value, ',');
  1580. if(sep)
  1581. *sep++ = '\0';
  1582. facets_register_facet_id_filter(facets, keyword, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER);
  1583. buffer_json_add_array_item_string(wb, value);
  1584. filters++;
  1585. value = sep;
  1586. }
  1587. buffer_json_array_close(wb); // keyword
  1588. }
  1589. }
  1590. }
  1591. // ------------------------------------------------------------------------
  1592. // put this request into the progress db
  1593. if(progress_id && *progress_id) {
  1594. fqs_item = dictionary_set_and_acquire_item(function_query_status_dict, progress_id, &tmp_fqs, sizeof(tmp_fqs));
  1595. fqs = dictionary_acquired_item_value(fqs_item);
  1596. }
  1597. else {
  1598. // no progress id given, proceed without registering our progress in the dictionary
  1599. fqs = &tmp_fqs;
  1600. fqs_item = NULL;
  1601. }
  1602. // ------------------------------------------------------------------------
  1603. // validate parameters
  1604. time_t now_s = now_realtime_sec();
  1605. time_t expires = now_s + 1;
  1606. if(!after_s && !before_s) {
  1607. before_s = now_s;
  1608. after_s = before_s - SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION;
  1609. }
  1610. else
  1611. rrdr_relative_window_to_absolute(&after_s, &before_s, now_s);
  1612. if(after_s > before_s) {
  1613. time_t tmp = after_s;
  1614. after_s = before_s;
  1615. before_s = tmp;
  1616. }
  1617. if(after_s == before_s)
  1618. after_s = before_s - SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION;
  1619. if(!last)
  1620. last = SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY;
  1621. // ------------------------------------------------------------------------
  1622. // set query time-frame, anchors and direction
  1623. fqs->after_ut = after_s * USEC_PER_SEC;
  1624. fqs->before_ut = (before_s * USEC_PER_SEC) + USEC_PER_SEC - 1;
  1625. fqs->if_modified_since = if_modified_since;
  1626. fqs->data_only = data_only;
  1627. fqs->delta = (fqs->data_only) ? delta : false;
  1628. fqs->tail = (fqs->data_only && fqs->if_modified_since) ? tail : false;
  1629. fqs->sources = sources;
  1630. fqs->source_type = source_type;
  1631. fqs->entries = last;
  1632. fqs->last_modified = 0;
  1633. fqs->filters = filters;
  1634. fqs->query = (query && *query) ? query : NULL;
  1635. fqs->histogram = (chart && *chart) ? chart : NULL;
  1636. fqs->direction = direction;
  1637. fqs->anchor.start_ut = anchor;
  1638. fqs->anchor.stop_ut = 0;
  1639. fqs->sampling = sampling;
  1640. if(fqs->anchor.start_ut && fqs->tail) {
  1641. // a tail request
  1642. // we need the top X entries from BEFORE
  1643. // but, we need to calculate the facets and the
  1644. // histogram up to the anchor
  1645. fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
  1646. fqs->anchor.start_ut = 0;
  1647. fqs->anchor.stop_ut = anchor;
  1648. }
  1649. if(anchor && anchor < fqs->after_ut) {
  1650. log_fqs(fqs, "received anchor is too small for query timeframe, ignoring anchor");
  1651. anchor = 0;
  1652. fqs->anchor.start_ut = 0;
  1653. fqs->anchor.stop_ut = 0;
  1654. fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
  1655. }
  1656. else if(anchor > fqs->before_ut) {
  1657. log_fqs(fqs, "received anchor is too big for query timeframe, ignoring anchor");
  1658. anchor = 0;
  1659. fqs->anchor.start_ut = 0;
  1660. fqs->anchor.stop_ut = 0;
  1661. fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
  1662. }
  1663. facets_set_anchor(facets, fqs->anchor.start_ut, fqs->anchor.stop_ut, fqs->direction);
  1664. facets_set_additional_options(facets,
  1665. ((fqs->data_only) ? FACETS_OPTION_DATA_ONLY : 0) |
  1666. ((fqs->delta) ? FACETS_OPTION_SHOW_DELTAS : 0));
  1667. // ------------------------------------------------------------------------
  1668. // set the rest of the query parameters
  1669. facets_set_items(facets, fqs->entries);
  1670. facets_set_query(facets, fqs->query);
  1671. #ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
  1672. fqs->slice = slice;
  1673. if(slice)
  1674. facets_enable_slice_mode(facets);
  1675. #else
  1676. fqs->slice = false;
  1677. #endif
  1678. if(fqs->histogram)
  1679. facets_set_timeframe_and_histogram_by_id(facets, fqs->histogram, fqs->after_ut, fqs->before_ut);
  1680. else
  1681. facets_set_timeframe_and_histogram_by_name(facets, "PRIORITY", fqs->after_ut, fqs->before_ut);
  1682. // ------------------------------------------------------------------------
  1683. // complete the request object
  1684. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_INFO, false);
  1685. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_SLICE, fqs->slice);
  1686. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DATA_ONLY, fqs->data_only);
  1687. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_PROGRESS, false);
  1688. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DELTA, fqs->delta);
  1689. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_TAIL, fqs->tail);
  1690. buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_SAMPLING, fqs->sampling);
  1691. buffer_json_member_add_string(wb, JOURNAL_PARAMETER_ID, progress_id);
  1692. buffer_json_member_add_uint64(wb, "source_type", fqs->source_type);
  1693. buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_AFTER, fqs->after_ut / USEC_PER_SEC);
  1694. buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_BEFORE, fqs->before_ut / USEC_PER_SEC);
  1695. buffer_json_member_add_uint64(wb, "if_modified_since", fqs->if_modified_since);
  1696. buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_ANCHOR, anchor);
  1697. buffer_json_member_add_string(wb, JOURNAL_PARAMETER_DIRECTION, fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward");
  1698. buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_LAST, fqs->entries);
  1699. buffer_json_member_add_string(wb, JOURNAL_PARAMETER_QUERY, fqs->query);
  1700. buffer_json_member_add_string(wb, JOURNAL_PARAMETER_HISTOGRAM, fqs->histogram);
  1701. buffer_json_object_close(wb); // request
  1702. buffer_json_journal_versions(wb);
  1703. // ------------------------------------------------------------------------
  1704. // run the request
  1705. int response;
  1706. if(info) {
  1707. facets_accepted_parameters_to_json_array(facets, wb, false);
  1708. buffer_json_member_add_array(wb, "required_params");
  1709. {
  1710. buffer_json_add_array_item_object(wb);
  1711. {
  1712. buffer_json_member_add_string(wb, "id", "source");
  1713. buffer_json_member_add_string(wb, "name", "source");
  1714. buffer_json_member_add_string(wb, "help", "Select the SystemD Journal source to query");
  1715. buffer_json_member_add_string(wb, "type", "multiselect");
  1716. buffer_json_member_add_array(wb, "options");
  1717. {
  1718. available_journal_file_sources_to_json_array(wb);
  1719. }
  1720. buffer_json_array_close(wb); // options array
  1721. }
  1722. buffer_json_object_close(wb); // required params object
  1723. }
  1724. buffer_json_array_close(wb); // required_params array
  1725. facets_table_config(wb);
  1726. buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
  1727. buffer_json_member_add_string(wb, "type", "table");
  1728. buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
  1729. buffer_json_finalize(wb);
  1730. response = HTTP_RESP_OK;
  1731. goto output;
  1732. }
  1733. if(progress) {
  1734. function_systemd_journal_progress(wb, transaction, progress_id);
  1735. goto cleanup;
  1736. }
  1737. response = netdata_systemd_journal_query(wb, facets, fqs);
  1738. // ------------------------------------------------------------------------
  1739. // handle error response
  1740. if(response != HTTP_RESP_OK) {
  1741. netdata_mutex_lock(&stdout_mutex);
  1742. pluginsd_function_json_error_to_stdout(transaction, response, "failed");
  1743. netdata_mutex_unlock(&stdout_mutex);
  1744. goto cleanup;
  1745. }
  1746. output:
  1747. netdata_mutex_lock(&stdout_mutex);
  1748. pluginsd_function_result_to_stdout(transaction, response, "application/json", expires, wb);
  1749. netdata_mutex_unlock(&stdout_mutex);
  1750. cleanup:
  1751. simple_pattern_free(sources);
  1752. facets_destroy(facets);
  1753. buffer_free(wb);
  1754. if(fqs_item) {
  1755. dictionary_del(function_query_status_dict, dictionary_acquired_item_name(fqs_item));
  1756. dictionary_acquired_item_release(function_query_status_dict, fqs_item);
  1757. dictionary_garbage_collect(function_query_status_dict);
  1758. }
  1759. }
  1760. void journal_init_query_status(void) {
  1761. function_query_status_dict = dictionary_create_advanced(
  1762. DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
  1763. NULL, sizeof(FUNCTION_QUERY_STATUS));
  1764. }