systemd-journal.c 111 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. /*
  3. * netdata systemd-journal.plugin
  4. * Copyright (C) 2023 Netdata Inc.
  5. * GPL v3+
  6. */
  7. #include "collectors/all.h"
  8. #include "libnetdata/libnetdata.h"
  9. #include "libnetdata/required_dummies.h"
  10. #include <linux/capability.h>
  11. #include <systemd/sd-journal.h>
  12. #include <syslog.h>
  13. /*
  14. * TODO
  15. *
  16. * _UDEV_DEVLINK is frequently set more than once per field - support multi-value faces
  17. *
  18. */
  19. // ----------------------------------------------------------------------------
  20. // fstat64 overloading to speed up libsystemd
  21. // https://github.com/systemd/systemd/pull/29261
  22. #define ND_SD_JOURNAL_OPEN_FLAGS (0)
  23. #include <dlfcn.h>
  24. #include <sys/stat.h>
  25. #define FSTAT_CACHE_MAX 1024
  26. struct fdstat64_cache_entry {
  27. bool enabled;
  28. bool updated;
  29. int err_no;
  30. struct stat64 stat;
  31. int ret;
  32. size_t cached_count;
  33. size_t session;
  34. };
  35. struct fdstat64_cache_entry fstat64_cache[FSTAT_CACHE_MAX] = {0 };
  36. static __thread size_t fstat_thread_calls = 0;
  37. static __thread size_t fstat_thread_cached_responses = 0;
  38. static __thread bool enable_thread_fstat = false;
  39. static __thread size_t fstat_caching_thread_session = 0;
  40. static size_t fstat_caching_global_session = 0;
  41. static void fstat_cache_enable_on_thread(void) {
  42. fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_ACQUIRE);
  43. enable_thread_fstat = true;
  44. }
  45. static void fstat_cache_disable_on_thread(void) {
  46. fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_RELEASE);
  47. enable_thread_fstat = false;
  48. }
  49. int fstat64(int fd, struct stat64 *buf) {
  50. static int (*real_fstat)(int, struct stat64 *) = NULL;
  51. if (!real_fstat)
  52. real_fstat = dlsym(RTLD_NEXT, "fstat64");
  53. fstat_thread_calls++;
  54. if(fd >= 0 && fd < FSTAT_CACHE_MAX) {
  55. if(enable_thread_fstat && fstat64_cache[fd].session != fstat_caching_thread_session) {
  56. fstat64_cache[fd].session = fstat_caching_thread_session;
  57. fstat64_cache[fd].enabled = true;
  58. fstat64_cache[fd].updated = false;
  59. }
  60. if(fstat64_cache[fd].enabled && fstat64_cache[fd].updated && fstat64_cache[fd].session == fstat_caching_thread_session) {
  61. fstat_thread_cached_responses++;
  62. errno = fstat64_cache[fd].err_no;
  63. *buf = fstat64_cache[fd].stat;
  64. fstat64_cache[fd].cached_count++;
  65. return fstat64_cache[fd].ret;
  66. }
  67. }
  68. int ret = real_fstat(fd, buf);
  69. if(fd >= 0 && fd < FSTAT_CACHE_MAX && fstat64_cache[fd].enabled) {
  70. fstat64_cache[fd].ret = ret;
  71. fstat64_cache[fd].updated = true;
  72. fstat64_cache[fd].err_no = errno;
  73. fstat64_cache[fd].stat = *buf;
  74. fstat64_cache[fd].session = fstat_caching_thread_session;
  75. }
  76. return ret;
  77. }
  78. // ----------------------------------------------------------------------------
  79. #define FACET_MAX_VALUE_LENGTH 8192
  80. #define SYSTEMD_JOURNAL_MAX_SOURCE_LEN 64
  81. #define SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION "View, search and analyze systemd journal entries."
  82. #define SYSTEMD_JOURNAL_FUNCTION_NAME "systemd-journal"
  83. #define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 60
  84. #define SYSTEMD_JOURNAL_MAX_PARAMS 100
  85. #define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (1 * 3600)
  86. #define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200
  87. #define SYSTEMD_JOURNAL_WORKER_THREADS 5
  88. #define JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT (5 * USEC_PER_SEC) // assume always 5 seconds latency
  89. #define JOURNAL_VS_REALTIME_DELTA_MAX_UT (2 * 60 * USEC_PER_SEC) // up to 2 minutes latency
  90. #define JOURNAL_PARAMETER_HELP "help"
  91. #define JOURNAL_PARAMETER_AFTER "after"
  92. #define JOURNAL_PARAMETER_BEFORE "before"
  93. #define JOURNAL_PARAMETER_ANCHOR "anchor"
  94. #define JOURNAL_PARAMETER_LAST "last"
  95. #define JOURNAL_PARAMETER_QUERY "query"
  96. #define JOURNAL_PARAMETER_FACETS "facets"
  97. #define JOURNAL_PARAMETER_HISTOGRAM "histogram"
  98. #define JOURNAL_PARAMETER_DIRECTION "direction"
  99. #define JOURNAL_PARAMETER_IF_MODIFIED_SINCE "if_modified_since"
  100. #define JOURNAL_PARAMETER_DATA_ONLY "data_only"
  101. #define JOURNAL_PARAMETER_SOURCE "source"
  102. #define JOURNAL_PARAMETER_INFO "info"
  103. #define JOURNAL_PARAMETER_ID "id"
  104. #define JOURNAL_PARAMETER_PROGRESS "progress"
  105. #define JOURNAL_PARAMETER_SLICE "slice"
  106. #define JOURNAL_PARAMETER_DELTA "delta"
  107. #define JOURNAL_PARAMETER_TAIL "tail"
  108. #define JOURNAL_KEY_ND_JOURNAL_FILE "ND_JOURNAL_FILE"
  109. #define JOURNAL_KEY_ND_JOURNAL_PROCESS "ND_JOURNAL_PROCESS"
  110. #define JOURNAL_DEFAULT_SLICE_MODE true
  111. #define JOURNAL_DEFAULT_DIRECTION FACETS_ANCHOR_DIRECTION_BACKWARD
  112. #define SYSTEMD_ALWAYS_VISIBLE_KEYS NULL
  113. #define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS \
  114. "*MESSAGE*" \
  115. "|*_RAW" \
  116. "|*_USEC" \
  117. "|*_NSEC" \
  118. "|*TIMESTAMP*" \
  119. "|*_ID" \
  120. "|*_ID_*" \
  121. "|__*" \
  122. ""
  123. #define SYSTEMD_KEYS_INCLUDED_IN_FACETS \
  124. \
  125. /* --- USER JOURNAL FIELDS --- */ \
  126. \
  127. /* "|MESSAGE" */ \
  128. /* "|MESSAGE_ID" */ \
  129. "|PRIORITY" \
  130. "|CODE_FILE" \
  131. /* "|CODE_LINE" */ \
  132. "|CODE_FUNC" \
  133. "|ERRNO" \
  134. /* "|INVOCATION_ID" */ \
  135. /* "|USER_INVOCATION_ID" */ \
  136. "|SYSLOG_FACILITY" \
  137. "|SYSLOG_IDENTIFIER" \
  138. /* "|SYSLOG_PID" */ \
  139. /* "|SYSLOG_TIMESTAMP" */ \
  140. /* "|SYSLOG_RAW" */ \
  141. /* "!DOCUMENTATION" */ \
  142. /* "|TID" */ \
  143. "|UNIT" \
  144. "|USER_UNIT" \
  145. "|UNIT_RESULT" /* undocumented */ \
  146. \
  147. \
  148. /* --- TRUSTED JOURNAL FIELDS --- */ \
  149. \
  150. /* "|_PID" */ \
  151. "|_UID" \
  152. "|_GID" \
  153. "|_COMM" \
  154. "|_EXE" \
  155. /* "|_CMDLINE" */ \
  156. "|_CAP_EFFECTIVE" \
  157. /* "|_AUDIT_SESSION" */ \
  158. "|_AUDIT_LOGINUID" \
  159. "|_SYSTEMD_CGROUP" \
  160. "|_SYSTEMD_SLICE" \
  161. "|_SYSTEMD_UNIT" \
  162. "|_SYSTEMD_USER_UNIT" \
  163. "|_SYSTEMD_USER_SLICE" \
  164. "|_SYSTEMD_SESSION" \
  165. "|_SYSTEMD_OWNER_UID" \
  166. "|_SELINUX_CONTEXT" \
  167. /* "|_SOURCE_REALTIME_TIMESTAMP" */ \
  168. "|_BOOT_ID" \
  169. "|_MACHINE_ID" \
  170. /* "|_SYSTEMD_INVOCATION_ID" */ \
  171. "|_HOSTNAME" \
  172. "|_TRANSPORT" \
  173. "|_STREAM_ID" \
  174. /* "|LINE_BREAK" */ \
  175. "|_NAMESPACE" \
  176. "|_RUNTIME_SCOPE" \
  177. \
  178. \
  179. /* --- KERNEL JOURNAL FIELDS --- */ \
  180. \
  181. /* "|_KERNEL_DEVICE" */ \
  182. "|_KERNEL_SUBSYSTEM" \
  183. /* "|_UDEV_SYSNAME" */ \
  184. "|_UDEV_DEVNODE" \
  185. /* "|_UDEV_DEVLINK" */ \
  186. \
  187. \
  188. /* --- LOGGING ON BEHALF --- */ \
  189. \
  190. "|OBJECT_UID" \
  191. "|OBJECT_GID" \
  192. "|OBJECT_COMM" \
  193. "|OBJECT_EXE" \
  194. /* "|OBJECT_CMDLINE" */ \
  195. /* "|OBJECT_AUDIT_SESSION" */ \
  196. "|OBJECT_AUDIT_LOGINUID" \
  197. "|OBJECT_SYSTEMD_CGROUP" \
  198. "|OBJECT_SYSTEMD_SESSION" \
  199. "|OBJECT_SYSTEMD_OWNER_UID" \
  200. "|OBJECT_SYSTEMD_UNIT" \
  201. "|OBJECT_SYSTEMD_USER_UNIT" \
  202. \
  203. \
  204. /* --- CORE DUMPS --- */ \
  205. \
  206. "|COREDUMP_COMM" \
  207. "|COREDUMP_UNIT" \
  208. "|COREDUMP_USER_UNIT" \
  209. "|COREDUMP_SIGNAL_NAME" \
  210. "|COREDUMP_CGROUP" \
  211. \
  212. \
  213. /* --- DOCKER --- */ \
  214. \
  215. "|CONTAINER_ID" \
  216. /* "|CONTAINER_ID_FULL" */ \
  217. "|CONTAINER_NAME" \
  218. "|CONTAINER_TAG" \
  219. "|IMAGE_NAME" /* undocumented */ \
  220. /* "|CONTAINER_PARTIAL_MESSAGE" */ \
  221. \
  222. ""
  223. static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER;
  224. static bool plugin_should_exit = false;
  225. // ----------------------------------------------------------------------------
  226. typedef enum {
  227. ND_SD_JOURNAL_NO_FILE_MATCHED,
  228. ND_SD_JOURNAL_FAILED_TO_OPEN,
  229. ND_SD_JOURNAL_FAILED_TO_SEEK,
  230. ND_SD_JOURNAL_TIMED_OUT,
  231. ND_SD_JOURNAL_OK,
  232. ND_SD_JOURNAL_NOT_MODIFIED,
  233. ND_SD_JOURNAL_CANCELLED,
  234. } ND_SD_JOURNAL_STATUS;
  235. typedef enum {
  236. SDJF_ALL = 0,
  237. SDJF_LOCAL = (1 << 0),
  238. SDJF_REMOTE = (1 << 1),
  239. SDJF_SYSTEM = (1 << 2),
  240. SDJF_USER = (1 << 3),
  241. SDJF_NAMESPACE = (1 << 4),
  242. SDJF_OTHER = (1 << 5),
  243. } SD_JOURNAL_FILE_SOURCE_TYPE;
  244. typedef struct function_query_status {
  245. bool *cancelled; // a pointer to the cancelling boolean
  246. usec_t stop_monotonic_ut;
  247. usec_t started_monotonic_ut;
  248. // request
  249. SD_JOURNAL_FILE_SOURCE_TYPE source_type;
  250. STRING *source;
  251. usec_t after_ut;
  252. usec_t before_ut;
  253. struct {
  254. usec_t start_ut;
  255. usec_t stop_ut;
  256. } anchor;
  257. FACETS_ANCHOR_DIRECTION direction;
  258. size_t entries;
  259. usec_t if_modified_since;
  260. bool delta;
  261. bool tail;
  262. bool data_only;
  263. bool slice;
  264. size_t filters;
  265. usec_t last_modified;
  266. const char *query;
  267. const char *histogram;
  268. // per file progress info
  269. size_t cached_count;
  270. // progress statistics
  271. usec_t matches_setup_ut;
  272. size_t rows_useful;
  273. size_t rows_read;
  274. size_t bytes_read;
  275. size_t files_matched;
  276. size_t file_working;
  277. } FUNCTION_QUERY_STATUS;
  278. struct journal_file {
  279. const char *filename;
  280. size_t filename_len;
  281. STRING *source;
  282. SD_JOURNAL_FILE_SOURCE_TYPE source_type;
  283. usec_t file_last_modified_ut;
  284. usec_t msg_first_ut;
  285. usec_t msg_last_ut;
  286. usec_t last_scan_ut;
  287. size_t size;
  288. bool logged_failure;
  289. usec_t max_journal_vs_realtime_delta_ut;
  290. };
  291. static void log_fqs(FUNCTION_QUERY_STATUS *fqs, const char *msg) {
  292. netdata_log_error("ERROR: %s, on query "
  293. "timeframe [%"PRIu64" - %"PRIu64"], "
  294. "anchor [%"PRIu64" - %"PRIu64"], "
  295. "if_modified_since %"PRIu64", "
  296. "data_only:%s, delta:%s, tail:%s, direction:%s"
  297. , msg
  298. , fqs->after_ut, fqs->before_ut
  299. , fqs->anchor.start_ut, fqs->anchor.stop_ut
  300. , fqs->if_modified_since
  301. , fqs->data_only ? "true" : "false"
  302. , fqs->delta ? "true" : "false"
  303. , fqs->tail ? "tail" : "false"
  304. , fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward");
  305. }
  306. static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) {
  307. if(sd_journal_seek_realtime_usec(j, timestamp) < 0) {
  308. netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, timestamp);
  309. if(sd_journal_seek_tail(j) < 0) {
  310. netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail");
  311. return false;
  312. }
  313. }
  314. return true;
  315. }
  316. #define JD_SOURCE_REALTIME_TIMESTAMP "_SOURCE_REALTIME_TIMESTAMP"
  317. static inline bool parse_journal_field(const char *data, size_t data_length, const char **key, size_t *key_length, const char **value, size_t *value_length) {
  318. const char *k = data;
  319. const char *equal = strchr(k, '=');
  320. if(unlikely(!equal))
  321. return false;
  322. size_t kl = equal - k;
  323. const char *v = ++equal;
  324. size_t vl = data_length - kl - 1;
  325. *key = k;
  326. *key_length = kl;
  327. *value = v;
  328. *value_length = vl;
  329. return true;
  330. }
  331. static inline size_t netdata_systemd_journal_process_row(sd_journal *j, FACETS *facets, struct journal_file *jf, usec_t *msg_ut) {
  332. const void *data;
  333. size_t length, bytes = 0;
  334. facets_add_key_value_length(facets, JOURNAL_KEY_ND_JOURNAL_FILE, sizeof(JOURNAL_KEY_ND_JOURNAL_FILE) - 1, jf->filename, jf->filename_len);
  335. SD_JOURNAL_FOREACH_DATA(j, data, length) {
  336. const char *key, *value;
  337. size_t key_length, value_length;
  338. if(!parse_journal_field(data, length, &key, &key_length, &value, &value_length))
  339. continue;
  340. #ifdef NETDATA_INTERNAL_CHECKS
  341. usec_t origin_journal_ut = *msg_ut;
  342. #endif
  343. if(unlikely(key_length == sizeof(JD_SOURCE_REALTIME_TIMESTAMP) - 1 &&
  344. memcmp(key, JD_SOURCE_REALTIME_TIMESTAMP, sizeof(JD_SOURCE_REALTIME_TIMESTAMP) - 1) == 0)) {
  345. usec_t ut = str2ull(value, NULL);
  346. if(ut && ut < *msg_ut) {
  347. usec_t delta = *msg_ut - ut;
  348. *msg_ut = ut;
  349. if(delta > JOURNAL_VS_REALTIME_DELTA_MAX_UT)
  350. delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT;
  351. // update max_journal_vs_realtime_delta_ut if the delta increased
  352. usec_t expected = jf->max_journal_vs_realtime_delta_ut;
  353. do {
  354. if(delta <= expected)
  355. break;
  356. } while(!__atomic_compare_exchange_n(&jf->max_journal_vs_realtime_delta_ut, &expected, delta, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
  357. internal_error(delta > expected,
  358. "increased max_journal_vs_realtime_delta_ut from %"PRIu64" to %"PRIu64", "
  359. "journal %"PRIu64", actual %"PRIu64" (delta %"PRIu64")"
  360. , expected, delta, origin_journal_ut, *msg_ut, origin_journal_ut - (*msg_ut));
  361. }
  362. }
  363. bytes += length;
  364. facets_add_key_value_length(facets, key, key_length, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH);
  365. }
  366. return bytes;
  367. }
  368. #define FUNCTION_PROGRESS_UPDATE_ROWS(rows_read, rows) __atomic_fetch_add(&(rows_read), rows, __ATOMIC_RELAXED)
  369. #define FUNCTION_PROGRESS_UPDATE_BYTES(bytes_read, bytes) __atomic_fetch_add(&(bytes_read), bytes, __ATOMIC_RELAXED)
  370. #define FUNCTION_PROGRESS_EVERY_ROWS (1ULL << 13)
  371. #define FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS (1ULL << 7)
  372. static inline ND_SD_JOURNAL_STATUS check_stop(const bool *cancelled, const usec_t *stop_monotonic_ut) {
  373. if(cancelled && __atomic_load_n(cancelled, __ATOMIC_RELAXED)) {
  374. internal_error(true, "Function has been cancelled");
  375. return ND_SD_JOURNAL_CANCELLED;
  376. }
  377. if(now_monotonic_usec() > __atomic_load_n(stop_monotonic_ut, __ATOMIC_RELAXED)) {
  378. internal_error(true, "Function timed out");
  379. return ND_SD_JOURNAL_TIMED_OUT;
  380. }
  381. return ND_SD_JOURNAL_OK;
  382. }
  383. ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward(
  384. sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
  385. struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
  386. usec_t anchor_delta = __atomic_load_n(&jf->max_journal_vs_realtime_delta_ut, __ATOMIC_RELAXED);
  387. usec_t start_ut = ((fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut : fqs->before_ut) + anchor_delta;
  388. usec_t stop_ut = (fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->after_ut;
  389. bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut);
  390. if(!netdata_systemd_journal_seek_to(j, start_ut))
  391. return ND_SD_JOURNAL_FAILED_TO_SEEK;
  392. size_t errors_no_timestamp = 0;
  393. usec_t earliest_msg_ut = 0;
  394. size_t row_counter = 0, last_row_counter = 0, rows_useful = 0;
  395. size_t bytes = 0, last_bytes = 0;
  396. usec_t last_usec_from = 0;
  397. usec_t last_usec_to = 0;
  398. ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
  399. facets_rows_begin(facets);
  400. while (status == ND_SD_JOURNAL_OK && sd_journal_previous(j) > 0) {
  401. usec_t msg_ut = 0;
  402. if(sd_journal_get_realtime_usec(j, &msg_ut) < 0 || !msg_ut) {
  403. errors_no_timestamp++;
  404. continue;
  405. }
  406. if(unlikely(msg_ut > earliest_msg_ut))
  407. earliest_msg_ut = msg_ut;
  408. if (unlikely(msg_ut > start_ut))
  409. continue;
  410. if (unlikely(msg_ut < stop_ut))
  411. break;
  412. bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut);
  413. // make sure each line gets a unique timestamp
  414. if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to))
  415. msg_ut = --last_usec_from;
  416. else
  417. last_usec_from = last_usec_to = msg_ut;
  418. if(facets_row_finished(facets, msg_ut))
  419. rows_useful++;
  420. row_counter++;
  421. if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 &&
  422. stop_when_full &&
  423. facets_rows(facets) >= fqs->entries)) {
  424. // stop the data only query
  425. usec_t oldest = facets_row_oldest_ut(facets);
  426. if(oldest && msg_ut < (oldest - anchor_delta))
  427. break;
  428. }
  429. if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) {
  430. FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
  431. last_row_counter = row_counter;
  432. FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
  433. last_bytes = bytes;
  434. status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
  435. }
  436. }
  437. FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
  438. FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
  439. fqs->rows_useful += rows_useful;
  440. if(errors_no_timestamp)
  441. netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
  442. if(earliest_msg_ut > fqs->last_modified)
  443. fqs->last_modified = earliest_msg_ut;
  444. return status;
  445. }
  446. ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward(
  447. sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
  448. struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
  449. usec_t anchor_delta = __atomic_load_n(&jf->max_journal_vs_realtime_delta_ut, __ATOMIC_RELAXED);
  450. usec_t start_ut = (fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut : fqs->after_ut;
  451. usec_t stop_ut = ((fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->before_ut) + anchor_delta;
  452. bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut);
  453. if(!netdata_systemd_journal_seek_to(j, start_ut))
  454. return ND_SD_JOURNAL_FAILED_TO_SEEK;
  455. size_t errors_no_timestamp = 0;
  456. usec_t earliest_msg_ut = 0;
  457. size_t row_counter = 0, last_row_counter = 0, rows_useful = 0;
  458. size_t bytes = 0, last_bytes = 0;
  459. usec_t last_usec_from = 0;
  460. usec_t last_usec_to = 0;
  461. ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
  462. facets_rows_begin(facets);
  463. while (status == ND_SD_JOURNAL_OK && sd_journal_next(j) > 0) {
  464. usec_t msg_ut = 0;
  465. if(sd_journal_get_realtime_usec(j, &msg_ut) < 0 || !msg_ut) {
  466. errors_no_timestamp++;
  467. continue;
  468. }
  469. if(likely(msg_ut > earliest_msg_ut))
  470. earliest_msg_ut = msg_ut;
  471. if (unlikely(msg_ut < start_ut))
  472. continue;
  473. if (unlikely(msg_ut > stop_ut))
  474. break;
  475. bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut);
  476. // make sure each line gets a unique timestamp
  477. if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to))
  478. msg_ut = ++last_usec_to;
  479. else
  480. last_usec_from = last_usec_to = msg_ut;
  481. if(facets_row_finished(facets, msg_ut))
  482. rows_useful++;
  483. row_counter++;
  484. if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 &&
  485. stop_when_full &&
  486. facets_rows(facets) >= fqs->entries)) {
  487. // stop the data only query
  488. usec_t newest = facets_row_newest_ut(facets);
  489. if(newest && msg_ut > (newest + anchor_delta))
  490. break;
  491. }
  492. if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) {
  493. FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
  494. last_row_counter = row_counter;
  495. FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
  496. last_bytes = bytes;
  497. status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
  498. }
  499. }
  500. FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
  501. FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
  502. fqs->rows_useful += rows_useful;
  503. if(errors_no_timestamp)
  504. netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
  505. if(earliest_msg_ut > fqs->last_modified)
  506. fqs->last_modified = earliest_msg_ut;
  507. return status;
  508. }
  509. bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) {
  510. // return true, if data have been modified since the timestamp
  511. if(!last_modified || !seek_to)
  512. return false;
  513. if(!netdata_systemd_journal_seek_to(j, seek_to))
  514. return false;
  515. usec_t first_msg_ut = 0;
  516. while (sd_journal_previous(j) > 0) {
  517. usec_t msg_ut;
  518. if(sd_journal_get_realtime_usec(j, &msg_ut) < 0)
  519. continue;
  520. first_msg_ut = msg_ut;
  521. break;
  522. }
  523. return first_msg_ut != last_modified;
  524. }
  525. #ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
  526. static bool netdata_systemd_filtering_by_journal(sd_journal *j, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) {
  527. const char *field = NULL;
  528. const void *data = NULL;
  529. size_t data_length;
  530. size_t added_keys = 0;
  531. size_t failures = 0;
  532. size_t filters_added = 0;
  533. SD_JOURNAL_FOREACH_FIELD(j, field) { // for each key
  534. bool interesting;
  535. if(fqs->data_only)
  536. interesting = facets_key_name_is_filter(facets, field);
  537. else
  538. interesting = facets_key_name_is_facet(facets, field);
  539. if(interesting) {
  540. if(sd_journal_query_unique(j, field) >= 0) {
  541. bool added_this_key = false;
  542. size_t added_values = 0;
  543. SD_JOURNAL_FOREACH_UNIQUE(j, data, data_length) { // for each value of the key
  544. const char *key, *value;
  545. size_t key_length, value_length;
  546. if(!parse_journal_field(data, data_length, &key, &key_length, &value, &value_length))
  547. continue;
  548. facets_add_possible_value_name_to_key(facets, key, key_length, value, value_length);
  549. if(!facets_key_name_value_length_is_selected(facets, key, key_length, value, value_length))
  550. continue;
  551. if(added_keys && !added_this_key) {
  552. if(sd_journal_add_conjunction(j) < 0) // key AND key AND key
  553. failures++;
  554. added_this_key = true;
  555. added_keys++;
  556. }
  557. else if(added_values)
  558. if(sd_journal_add_disjunction(j) < 0) // value OR value OR value
  559. failures++;
  560. if(sd_journal_add_match(j, data, data_length) < 0)
  561. failures++;
  562. if(!added_keys) {
  563. added_keys++;
  564. added_this_key = true;
  565. }
  566. added_values++;
  567. filters_added++;
  568. }
  569. }
  570. }
  571. }
  572. if(failures) {
  573. log_fqs(fqs, "failed to setup journal filter, will run the full query.");
  574. sd_journal_flush_matches(j);
  575. return true;
  576. }
  577. return filters_added ? true : false;
  578. }
  579. #endif // HAVE_SD_JOURNAL_RESTART_FIELDS
  580. static ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_one_file(
  581. const char *filename, BUFFER *wb, FACETS *facets,
  582. struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
  583. sd_journal *j = NULL;
  584. errno = 0;
  585. fstat_cache_enable_on_thread();
  586. const char *paths[2] = {
  587. [0] = filename,
  588. [1] = NULL,
  589. };
  590. if(sd_journal_open_files(&j, paths, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) {
  591. fstat_cache_disable_on_thread();
  592. return ND_SD_JOURNAL_FAILED_TO_OPEN;
  593. }
  594. ND_SD_JOURNAL_STATUS status;
  595. bool matches_filters = true;
  596. #ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
  597. if(fqs->slice) {
  598. usec_t started = now_monotonic_usec();
  599. matches_filters = netdata_systemd_filtering_by_journal(j, facets, fqs) || !fqs->filters;
  600. usec_t ended = now_monotonic_usec();
  601. fqs->matches_setup_ut += (ended - started);
  602. }
  603. #endif // HAVE_SD_JOURNAL_RESTART_FIELDS
  604. if(matches_filters) {
  605. if(fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD)
  606. status = netdata_systemd_journal_query_forward(j, wb, facets, jf, fqs);
  607. else
  608. status = netdata_systemd_journal_query_backward(j, wb, facets, jf, fqs);
  609. }
  610. else
  611. status = ND_SD_JOURNAL_NO_FILE_MATCHED;
  612. sd_journal_close(j);
  613. fstat_cache_disable_on_thread();
  614. return status;
  615. }
  616. // ----------------------------------------------------------------------------
  617. // journal files registry
  618. #define VAR_LOG_JOURNAL_MAX_DEPTH 10
  619. #define MAX_JOURNAL_DIRECTORIES 100
  620. struct journal_directory {
  621. char *path;
  622. bool logged_failure;
  623. };
  624. static struct journal_directory journal_directories[MAX_JOURNAL_DIRECTORIES] = { 0 };
  625. static DICTIONARY *journal_files_registry = NULL;
  626. static DICTIONARY *used_hashes_registry = NULL;
  627. static usec_t systemd_journal_session = 0;
  628. static void buffer_json_journal_versions(BUFFER *wb) {
  629. buffer_json_member_add_object(wb, "versions");
  630. {
  631. buffer_json_member_add_uint64(wb, "sources",
  632. systemd_journal_session + dictionary_version(journal_files_registry));
  633. }
  634. buffer_json_object_close(wb);
  635. }
  636. static void journal_file_update_msg_ut(const char *filename, struct journal_file *jf) {
  637. fstat_cache_enable_on_thread();
  638. const char *files[2] = {
  639. [0] = filename,
  640. [1] = NULL,
  641. };
  642. sd_journal *j = NULL;
  643. if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) {
  644. fstat_cache_disable_on_thread();
  645. if(!jf->logged_failure) {
  646. netdata_log_error("cannot open journal file '%s', using file timestamps to understand time-frame.", filename);
  647. jf->logged_failure = true;
  648. }
  649. jf->msg_first_ut = 0;
  650. jf->msg_last_ut = jf->file_last_modified_ut;
  651. return;
  652. }
  653. usec_t first_ut = 0, last_ut = 0;
  654. if(sd_journal_seek_head(j) < 0 || sd_journal_next(j) < 0 || sd_journal_get_realtime_usec(j, &first_ut) < 0 || !first_ut) {
  655. internal_error(true, "cannot find the timestamp of the first message in '%s'", filename);
  656. first_ut = 0;
  657. }
  658. if(sd_journal_seek_tail(j) < 0 || sd_journal_previous(j) < 0 || sd_journal_get_realtime_usec(j, &last_ut) < 0 || !last_ut) {
  659. internal_error(true, "cannot find the timestamp of the last message in '%s'", filename);
  660. last_ut = jf->file_last_modified_ut;
  661. }
  662. sd_journal_close(j);
  663. fstat_cache_disable_on_thread();
  664. if(first_ut > last_ut) {
  665. internal_error(true, "timestamps are flipped in file '%s'", filename);
  666. usec_t t = first_ut;
  667. first_ut = last_ut;
  668. last_ut = t;
  669. }
  670. jf->msg_first_ut = first_ut;
  671. jf->msg_last_ut = last_ut;
  672. }
  673. static STRING *string_strdupz_source(const char *s, const char *e, size_t max_len, const char *prefix) {
  674. char buf[max_len];
  675. size_t len;
  676. char *dst = buf;
  677. if(prefix) {
  678. len = strlen(prefix);
  679. memcpy(buf, prefix, len);
  680. dst = &buf[len];
  681. max_len -= len;
  682. }
  683. len = e - s;
  684. if(len >= max_len)
  685. len = max_len - 1;
  686. memcpy(dst, s, len);
  687. dst[len] = '\0';
  688. buf[max_len - 1] = '\0';
  689. for(size_t i = 0; buf[i] ;i++)
  690. if(!isalnum(buf[i]) && buf[i] != '-' && buf[i] != '.' && buf[i] != ':')
  691. buf[i] = '_';
  692. return string_strdupz(buf);
  693. }
  694. static void files_registry_insert_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) {
  695. struct journal_file *jf = value;
  696. jf->filename = dictionary_acquired_item_name(item);
  697. jf->filename_len = strlen(jf->filename);
  698. // based on the filename
  699. // decide the source to show to the user
  700. const char *s = strrchr(jf->filename, '/');
  701. if(s) {
  702. if(strstr(jf->filename, "/remote/"))
  703. jf->source_type = SDJF_REMOTE;
  704. else {
  705. const char *t = s - 1;
  706. while(t >= jf->filename && *t != '.' && *t != '/')
  707. t--;
  708. if(t >= jf->filename && *t == '.') {
  709. jf->source_type = SDJF_NAMESPACE;
  710. jf->source = string_strdupz_source(t + 1, s, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "namespace-");
  711. }
  712. else
  713. jf->source_type = SDJF_LOCAL;
  714. }
  715. if(strncmp(s, "/system", 7) == 0)
  716. jf->source_type |= SDJF_SYSTEM;
  717. else if(strncmp(s, "/user", 5) == 0)
  718. jf->source_type |= SDJF_USER;
  719. else if(strncmp(s, "/remote-", 8) == 0) {
  720. jf->source_type |= SDJF_REMOTE;
  721. s = &s[8]; // skip "/remote-"
  722. char *e = strchr(s, '@');
  723. if(!e)
  724. e = strstr(s, ".journal");
  725. if(e) {
  726. const char *d = s;
  727. for(; d < e && (isdigit(*d) || *d == '.' || *d == ':') ; d++) ;
  728. if(d == e) {
  729. // a valid IP address
  730. char ip[e - s + 1];
  731. memcpy(ip, s, e - s);
  732. ip[e - s] = '\0';
  733. char buf[SYSTEMD_JOURNAL_MAX_SOURCE_LEN];
  734. if(ip_to_hostname(ip, buf, sizeof(buf)))
  735. jf->source = string_strdupz_source(buf, &buf[strlen(buf)], SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-");
  736. else {
  737. internal_error(true, "Cannot find the hostname for IP '%s'", ip);
  738. jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-");
  739. }
  740. }
  741. else
  742. jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-");
  743. }
  744. else
  745. jf->source_type |= SDJF_OTHER;
  746. }
  747. else
  748. jf->source_type |= SDJF_OTHER;
  749. }
  750. else
  751. jf->source_type = SDJF_LOCAL | SDJF_OTHER;
  752. journal_file_update_msg_ut(jf->filename, jf);
  753. internal_error(true,
  754. "found journal file '%s', type %d, source '%s', "
  755. "file modified: %"PRIu64", "
  756. "msg {first: %"PRIu64", last: %"PRIu64"}",
  757. jf->filename, jf->source_type, jf->source ? string2str(jf->source) : "<unset>",
  758. jf->file_last_modified_ut,
  759. jf->msg_first_ut, jf->msg_last_ut);
  760. }
  761. static bool files_registry_conflict_cb(const DICTIONARY_ITEM *item, void *old_value, void *new_value, void *data __maybe_unused) {
  762. struct journal_file *jf = old_value;
  763. struct journal_file *njf = new_value;
  764. if(njf->last_scan_ut > jf->last_scan_ut)
  765. jf->last_scan_ut = njf->last_scan_ut;
  766. if(njf->file_last_modified_ut > jf->file_last_modified_ut) {
  767. jf->file_last_modified_ut = njf->file_last_modified_ut;
  768. jf->size = njf->size;
  769. const char *filename = dictionary_acquired_item_name(item);
  770. journal_file_update_msg_ut(filename, jf);
  771. // internal_error(true,
  772. // "updated journal file '%s', type %d, "
  773. // "file modified: %"PRIu64", "
  774. // "msg {first: %"PRIu64", last: %"PRIu64"}",
  775. // filename, jf->source_type,
  776. // jf->file_last_modified_ut,
  777. // jf->msg_first_ut, jf->msg_last_ut);
  778. }
  779. return false;
  780. }
  781. #define SDJF_SOURCE_ALL_NAME "all"
  782. #define SDJF_SOURCE_LOCAL_NAME "all-local-logs"
  783. #define SDJF_SOURCE_LOCAL_SYSTEM_NAME "all-local-system-logs"
  784. #define SDJF_SOURCE_LOCAL_USERS_NAME "all-local-user-logs"
  785. #define SDJF_SOURCE_LOCAL_OTHER_NAME "all-uncategorized"
  786. #define SDJF_SOURCE_NAMESPACES_NAME "all-local-namespaces"
  787. #define SDJF_SOURCE_REMOTES_NAME "all-remote-systems"
  788. struct journal_file_source {
  789. usec_t first_ut;
  790. usec_t last_ut;
  791. size_t count;
  792. uint64_t size;
  793. };
  794. static void human_readable_size_ib(uint64_t size, char *dst, size_t dst_len) {
  795. if(size > 1024ULL * 1024 * 1024 * 1024)
  796. snprintfz(dst, dst_len, "%0.2f TiB", (double)size / 1024.0 / 1024.0 / 1024.0 / 1024.0);
  797. else if(size > 1024ULL * 1024 * 1024)
  798. snprintfz(dst, dst_len, "%0.2f GiB", (double)size / 1024.0 / 1024.0 / 1024.0);
  799. else if(size > 1024ULL * 1024)
  800. snprintfz(dst, dst_len, "%0.2f MiB", (double)size / 1024.0 / 1024.0);
  801. else if(size > 1024ULL)
  802. snprintfz(dst, dst_len, "%0.2f KiB", (double)size / 1024.0);
  803. else
  804. snprintfz(dst, dst_len, "%"PRIu64" B", size);
  805. }
  806. #define print_duration(dst, dst_len, pos, remaining, duration, one, many, printed) do { \
  807. if((remaining) > (duration)) { \
  808. uint64_t _count = (remaining) / (duration); \
  809. uint64_t _rem = (remaining) - (_count * (duration)); \
  810. (pos) += snprintfz(&(dst)[pos], (dst_len) - (pos), "%s%s%"PRIu64" %s", (printed) ? ", " : "", _rem ? "" : "and ", _count, _count > 1 ? (many) : (one)); \
  811. (remaining) = _rem; \
  812. (printed) = true; \
  813. } \
  814. } while(0)
  815. static void human_readable_duration_s(time_t duration_s, char *dst, size_t dst_len) {
  816. if(duration_s < 0)
  817. duration_s = -duration_s;
  818. size_t pos = 0;
  819. dst[0] = 0 ;
  820. bool printed = false;
  821. print_duration(dst, dst_len, pos, duration_s, 86400 * 365, "year", "years", printed);
  822. print_duration(dst, dst_len, pos, duration_s, 86400 * 30, "month", "months", printed);
  823. print_duration(dst, dst_len, pos, duration_s, 86400 * 1, "day", "days", printed);
  824. print_duration(dst, dst_len, pos, duration_s, 3600 * 1, "hour", "hours", printed);
  825. print_duration(dst, dst_len, pos, duration_s, 60 * 1, "min", "mins", printed);
  826. print_duration(dst, dst_len, pos, duration_s, 1, "sec", "secs", printed);
  827. }
  828. static int journal_file_to_json_array_cb(const DICTIONARY_ITEM *item, void *entry, void *data) {
  829. struct journal_file_source *jfs = entry;
  830. BUFFER *wb = data;
  831. const char *name = dictionary_acquired_item_name(item);
  832. buffer_json_add_array_item_object(wb);
  833. {
  834. char size_for_humans[100];
  835. human_readable_size_ib(jfs->size, size_for_humans, sizeof(size_for_humans));
  836. char duration_for_humans[1024];
  837. human_readable_duration_s((time_t)((jfs->last_ut - jfs->first_ut) / USEC_PER_SEC),
  838. duration_for_humans, sizeof(duration_for_humans));
  839. char info[1024];
  840. snprintfz(info, sizeof(info), "%zu files, with a total size of %s, covering %s",
  841. jfs->count, size_for_humans, duration_for_humans);
  842. buffer_json_member_add_string(wb, "id", name);
  843. buffer_json_member_add_string(wb, "name", name);
  844. buffer_json_member_add_string(wb, "pill", size_for_humans);
  845. buffer_json_member_add_string(wb, "info", info);
  846. }
  847. buffer_json_object_close(wb); // options object
  848. return 1;
  849. }
  850. static bool journal_file_merge_sizes(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value , void *data __maybe_unused) {
  851. struct journal_file_source *jfs = old_value, *njfs = new_value;
  852. jfs->count += njfs->count;
  853. jfs->size += njfs->size;
  854. if(njfs->first_ut && njfs->first_ut < jfs->first_ut)
  855. jfs->first_ut = njfs->first_ut;
  856. if(njfs->last_ut && njfs->last_ut > jfs->last_ut)
  857. jfs->last_ut = njfs->last_ut;
  858. return false;
  859. }
  860. static void available_journal_file_sources_to_json_array(BUFFER *wb) {
  861. DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_NAME_LINK_DONT_CLONE|DICT_OPTION_DONT_OVERWRITE_VALUE);
  862. dictionary_register_conflict_callback(dict, journal_file_merge_sizes, NULL);
  863. struct journal_file_source t = { 0 };
  864. struct journal_file *jf;
  865. dfe_start_read(journal_files_registry, jf) {
  866. t.first_ut = jf->msg_first_ut;
  867. t.last_ut = jf->msg_last_ut;
  868. t.count = 1;
  869. t.size = jf->size;
  870. dictionary_set(dict, SDJF_SOURCE_ALL_NAME, &t, sizeof(t));
  871. if((jf->source_type & (SDJF_LOCAL)) == (SDJF_LOCAL))
  872. dictionary_set(dict, SDJF_SOURCE_LOCAL_NAME, &t, sizeof(t));
  873. if((jf->source_type & (SDJF_LOCAL | SDJF_SYSTEM)) == (SDJF_LOCAL | SDJF_SYSTEM))
  874. dictionary_set(dict, SDJF_SOURCE_LOCAL_SYSTEM_NAME, &t, sizeof(t));
  875. if((jf->source_type & (SDJF_LOCAL | SDJF_USER)) == (SDJF_LOCAL | SDJF_USER))
  876. dictionary_set(dict, SDJF_SOURCE_LOCAL_USERS_NAME, &t, sizeof(t));
  877. if((jf->source_type & (SDJF_LOCAL | SDJF_OTHER)) == (SDJF_LOCAL | SDJF_OTHER))
  878. dictionary_set(dict, SDJF_SOURCE_LOCAL_OTHER_NAME, &t, sizeof(t));
  879. if((jf->source_type & (SDJF_NAMESPACE)) == (SDJF_NAMESPACE))
  880. dictionary_set(dict, SDJF_SOURCE_NAMESPACES_NAME, &t, sizeof(t));
  881. if((jf->source_type & (SDJF_REMOTE)) == (SDJF_REMOTE))
  882. dictionary_set(dict, SDJF_SOURCE_REMOTES_NAME, &t, sizeof(t));
  883. if(jf->source)
  884. dictionary_set(dict, string2str(jf->source), &t, sizeof(t));
  885. }
  886. dfe_done(jf);
  887. dictionary_sorted_walkthrough_read(dict, journal_file_to_json_array_cb, wb);
  888. dictionary_destroy(dict);
  889. }
  890. static void files_registry_delete_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) {
  891. struct journal_file *jf = value; (void)jf;
  892. const char *filename = dictionary_acquired_item_name(item); (void)filename;
  893. string_freez(jf->source);
  894. internal_error(true, "removed journal file '%s'", filename);
  895. }
  896. void journal_directory_scan(const char *dirname, int depth, usec_t last_scan_ut) {
  897. static const char *ext = ".journal";
  898. static const size_t ext_len = sizeof(".journal") - 1;
  899. if (depth > VAR_LOG_JOURNAL_MAX_DEPTH)
  900. return;
  901. DIR *dir;
  902. struct dirent *entry;
  903. struct stat info;
  904. char absolute_path[FILENAME_MAX];
  905. // Open the directory.
  906. if ((dir = opendir(dirname)) == NULL) {
  907. if(errno != ENOENT && errno != ENOTDIR)
  908. netdata_log_error("Cannot opendir() '%s'", dirname);
  909. return;
  910. }
  911. // Read each entry in the directory.
  912. while ((entry = readdir(dir)) != NULL) {
  913. snprintfz(absolute_path, sizeof(absolute_path), "%s/%s", dirname, entry->d_name);
  914. if (stat(absolute_path, &info) != 0) {
  915. netdata_log_error("Failed to stat() '%s", absolute_path);
  916. continue;
  917. }
  918. if (S_ISDIR(info.st_mode)) {
  919. // If entry is a directory, call traverse recursively.
  920. if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0)
  921. journal_directory_scan(absolute_path, depth + 1, last_scan_ut);
  922. }
  923. else if (S_ISREG(info.st_mode)) {
  924. // If entry is a regular file, check if it ends with .journal.
  925. char *filename = entry->d_name;
  926. size_t len = strlen(filename);
  927. if (len > ext_len && strcmp(filename + len - ext_len, ext) == 0) {
  928. struct journal_file t = {
  929. .file_last_modified_ut = info.st_mtim.tv_sec * USEC_PER_SEC + info.st_mtim.tv_nsec / NSEC_PER_USEC,
  930. .last_scan_ut = last_scan_ut,
  931. .size = info.st_size,
  932. .max_journal_vs_realtime_delta_ut = JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT,
  933. };
  934. dictionary_set(journal_files_registry, absolute_path, &t, sizeof(t));
  935. }
  936. }
  937. }
  938. closedir(dir);
  939. }
  940. static void journal_files_registry_update() {
  941. usec_t scan_ut = now_monotonic_usec();
  942. for(unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES ;i++) {
  943. if(!journal_directories[i].path)
  944. break;
  945. journal_directory_scan(journal_directories[i].path, 0, scan_ut);
  946. }
  947. struct journal_file *jf;
  948. dfe_start_write(journal_files_registry, jf) {
  949. if(jf->last_scan_ut < scan_ut)
  950. dictionary_del(journal_files_registry, jf_dfe.name);
  951. }
  952. dfe_done(jf);
  953. }
  954. // ----------------------------------------------------------------------------
  955. static bool jf_is_mine(struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
  956. if((fqs->source_type == SDJF_ALL || (jf->source_type & fqs->source_type) == fqs->source_type) &&
  957. (!fqs->source || fqs->source == jf->source)) {
  958. usec_t anchor_delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT;
  959. usec_t first_ut = jf->msg_first_ut;
  960. usec_t last_ut = jf->msg_last_ut + anchor_delta;
  961. if(last_ut >= fqs->after_ut && first_ut <= fqs->before_ut)
  962. return true;
  963. }
  964. return false;
  965. }
  966. static int journal_file_dict_items_backward_compar(const void *a, const void *b) {
  967. const DICTIONARY_ITEM **ad = (const DICTIONARY_ITEM **)a, **bd = (const DICTIONARY_ITEM **)b;
  968. struct journal_file *jfa = dictionary_acquired_item_value(*ad);
  969. struct journal_file *jfb = dictionary_acquired_item_value(*bd);
  970. if(jfa->msg_last_ut < jfb->msg_last_ut)
  971. return 1;
  972. if(jfa->msg_last_ut > jfb->msg_last_ut)
  973. return -1;
  974. if(jfa->msg_first_ut < jfb->msg_first_ut)
  975. return 1;
  976. if(jfa->msg_first_ut > jfb->msg_first_ut)
  977. return -1;
  978. return 0;
  979. }
  980. static int journal_file_dict_items_forward_compar(const void *a, const void *b) {
  981. return -journal_file_dict_items_backward_compar(a, b);
  982. }
  983. static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) {
  984. ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_NO_FILE_MATCHED;
  985. struct journal_file *jf;
  986. fqs->files_matched = 0;
  987. fqs->file_working = 0;
  988. fqs->rows_useful = 0;
  989. fqs->rows_read = 0;
  990. fqs->bytes_read = 0;
  991. size_t files_used = 0;
  992. size_t files_max = dictionary_entries(journal_files_registry);
  993. const DICTIONARY_ITEM *file_items[files_max];
  994. // count the files
  995. bool files_are_newer = false;
  996. dfe_start_read(journal_files_registry, jf) {
  997. if(!jf_is_mine(jf, fqs))
  998. continue;
  999. file_items[files_used++] = dictionary_acquired_item_dup(journal_files_registry, jf_dfe.item);
  1000. if(jf->msg_last_ut > fqs->if_modified_since)
  1001. files_are_newer = true;
  1002. }
  1003. dfe_done(jf);
  1004. fqs->files_matched = files_used;
  1005. if(fqs->if_modified_since && !files_are_newer) {
  1006. buffer_flush(wb);
  1007. return HTTP_RESP_NOT_MODIFIED;
  1008. }
  1009. // sort the files, so that they are optimal for facets
  1010. if(files_used >= 2) {
  1011. if (fqs->direction == FACETS_ANCHOR_DIRECTION_BACKWARD)
  1012. qsort(file_items, files_used, sizeof(const DICTIONARY_ITEM *),
  1013. journal_file_dict_items_backward_compar);
  1014. else
  1015. qsort(file_items, files_used, sizeof(const DICTIONARY_ITEM *),
  1016. journal_file_dict_items_forward_compar);
  1017. }
  1018. bool partial = false;
  1019. usec_t started_ut;
  1020. usec_t ended_ut = now_monotonic_usec();
  1021. buffer_json_member_add_array(wb, "_journal_files");
  1022. for(size_t f = 0; f < files_used ;f++) {
  1023. const char *filename = dictionary_acquired_item_name(file_items[f]);
  1024. jf = dictionary_acquired_item_value(file_items[f]);
  1025. if(!jf_is_mine(jf, fqs))
  1026. continue;
  1027. fqs->file_working++;
  1028. fqs->cached_count = 0;
  1029. size_t fs_calls = fstat_thread_calls;
  1030. size_t fs_cached = fstat_thread_cached_responses;
  1031. size_t rows_useful = fqs->rows_useful;
  1032. size_t rows_read = fqs->rows_read;
  1033. size_t bytes_read = fqs->bytes_read;
  1034. size_t matches_setup_ut = fqs->matches_setup_ut;
  1035. ND_SD_JOURNAL_STATUS tmp_status = netdata_systemd_journal_query_one_file(filename, wb, facets, jf, fqs);
  1036. rows_useful = fqs->rows_useful - rows_useful;
  1037. rows_read = fqs->rows_read - rows_read;
  1038. bytes_read = fqs->bytes_read - bytes_read;
  1039. matches_setup_ut = fqs->matches_setup_ut - matches_setup_ut;
  1040. fs_calls = fstat_thread_calls - fs_calls;
  1041. fs_cached = fstat_thread_cached_responses - fs_cached;
  1042. started_ut = ended_ut;
  1043. ended_ut = now_monotonic_usec();
  1044. usec_t duration_ut = ended_ut - started_ut;
  1045. buffer_json_add_array_item_object(wb); // journal file
  1046. {
  1047. // information about the file
  1048. buffer_json_member_add_string(wb, "_filename", filename);
  1049. buffer_json_member_add_uint64(wb, "_source_type", jf->source_type);
  1050. buffer_json_member_add_string(wb, "_source", string2str(jf->source));
  1051. buffer_json_member_add_uint64(wb, "_last_modified_ut", jf->file_last_modified_ut);
  1052. buffer_json_member_add_uint64(wb, "_msg_first_ut", jf->msg_first_ut);
  1053. buffer_json_member_add_uint64(wb, "_msg_last_ut", jf->msg_last_ut);
  1054. buffer_json_member_add_uint64(wb, "_journal_vs_realtime_delta_ut", jf->max_journal_vs_realtime_delta_ut);
  1055. // information about the current use of the file
  1056. buffer_json_member_add_uint64(wb, "duration_ut", ended_ut - started_ut);
  1057. buffer_json_member_add_uint64(wb, "rows_read", rows_read);
  1058. buffer_json_member_add_uint64(wb, "rows_useful", rows_useful);
  1059. buffer_json_member_add_double(wb, "rows_per_second", (double) rows_read / (double) duration_ut * (double) USEC_PER_SEC);
  1060. buffer_json_member_add_uint64(wb, "bytes_read", bytes_read);
  1061. buffer_json_member_add_double(wb, "bytes_per_second", (double) bytes_read / (double) duration_ut * (double) USEC_PER_SEC);
  1062. buffer_json_member_add_uint64(wb, "duration_matches_ut", matches_setup_ut);
  1063. buffer_json_member_add_uint64(wb, "fstat_query_calls", fs_calls);
  1064. buffer_json_member_add_uint64(wb, "fstat_query_cached_responses", fs_cached);
  1065. }
  1066. buffer_json_object_close(wb); // journal file
  1067. bool stop = false;
  1068. switch(tmp_status) {
  1069. case ND_SD_JOURNAL_OK:
  1070. case ND_SD_JOURNAL_NO_FILE_MATCHED:
  1071. status = (status == ND_SD_JOURNAL_OK) ? ND_SD_JOURNAL_OK : tmp_status;
  1072. break;
  1073. case ND_SD_JOURNAL_FAILED_TO_OPEN:
  1074. case ND_SD_JOURNAL_FAILED_TO_SEEK:
  1075. partial = true;
  1076. if(status == ND_SD_JOURNAL_NO_FILE_MATCHED)
  1077. status = tmp_status;
  1078. break;
  1079. case ND_SD_JOURNAL_CANCELLED:
  1080. case ND_SD_JOURNAL_TIMED_OUT:
  1081. partial = true;
  1082. stop = true;
  1083. status = tmp_status;
  1084. break;
  1085. case ND_SD_JOURNAL_NOT_MODIFIED:
  1086. internal_fatal(true, "this should never be returned here");
  1087. break;
  1088. }
  1089. if(stop)
  1090. break;
  1091. }
  1092. buffer_json_array_close(wb); // _journal_files
  1093. // release the files
  1094. for(size_t f = 0; f < files_used ;f++)
  1095. dictionary_acquired_item_release(journal_files_registry, file_items[f]);
  1096. switch (status) {
  1097. case ND_SD_JOURNAL_OK:
  1098. if(fqs->if_modified_since && !fqs->rows_useful) {
  1099. buffer_flush(wb);
  1100. return HTTP_RESP_NOT_MODIFIED;
  1101. }
  1102. break;
  1103. case ND_SD_JOURNAL_TIMED_OUT:
  1104. case ND_SD_JOURNAL_NO_FILE_MATCHED:
  1105. break;
  1106. case ND_SD_JOURNAL_CANCELLED:
  1107. buffer_flush(wb);
  1108. return HTTP_RESP_CLIENT_CLOSED_REQUEST;
  1109. case ND_SD_JOURNAL_NOT_MODIFIED:
  1110. buffer_flush(wb);
  1111. return HTTP_RESP_NOT_MODIFIED;
  1112. default:
  1113. case ND_SD_JOURNAL_FAILED_TO_OPEN:
  1114. case ND_SD_JOURNAL_FAILED_TO_SEEK:
  1115. buffer_flush(wb);
  1116. return HTTP_RESP_INTERNAL_SERVER_ERROR;
  1117. }
  1118. buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
  1119. buffer_json_member_add_boolean(wb, "partial", partial);
  1120. buffer_json_member_add_string(wb, "type", "table");
  1121. if(!fqs->data_only) {
  1122. buffer_json_member_add_time_t(wb, "update_every", 1);
  1123. buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
  1124. }
  1125. if(!fqs->data_only || fqs->tail)
  1126. buffer_json_member_add_uint64(wb, "last_modified", fqs->last_modified);
  1127. facets_sort_and_reorder_keys(facets);
  1128. facets_report(facets, wb, used_hashes_registry);
  1129. buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + (fqs->data_only ? 3600 : 0));
  1130. buffer_json_member_add_object(wb, "_fstat_caching");
  1131. {
  1132. buffer_json_member_add_uint64(wb, "calls", fstat_thread_calls);
  1133. buffer_json_member_add_uint64(wb, "cached", fstat_thread_cached_responses);
  1134. }
  1135. buffer_json_object_close(wb); // _fstat_caching
  1136. buffer_json_finalize(wb);
  1137. return HTTP_RESP_OK;
  1138. }
  1139. static void netdata_systemd_journal_function_help(const char *transaction) {
  1140. BUFFER *wb = buffer_create(0, NULL);
  1141. buffer_sprintf(wb,
  1142. "%s / %s\n"
  1143. "\n"
  1144. "%s\n"
  1145. "\n"
  1146. "The following parameters are supported:\n"
  1147. "\n"
  1148. " "JOURNAL_PARAMETER_HELP"\n"
  1149. " Shows this help message.\n"
  1150. "\n"
  1151. " "JOURNAL_PARAMETER_INFO"\n"
  1152. " Request initial configuration information about the plugin.\n"
  1153. " The key entity returned is the required_params array, which includes\n"
  1154. " all the available systemd journal sources.\n"
  1155. " When `"JOURNAL_PARAMETER_INFO"` is requested, all other parameters are ignored.\n"
  1156. "\n"
  1157. " "JOURNAL_PARAMETER_ID":STRING\n"
  1158. " Caller supplied unique ID of the request.\n"
  1159. " This can be used later to request a progress report of the query.\n"
  1160. " Optional, but if omitted no `"JOURNAL_PARAMETER_PROGRESS"` can be requested.\n"
  1161. "\n"
  1162. " "JOURNAL_PARAMETER_PROGRESS"\n"
  1163. " Request a progress report (the `id` of a running query is required).\n"
  1164. " When `"JOURNAL_PARAMETER_PROGRESS"` is requested, only parameter `"JOURNAL_PARAMETER_ID"` is used.\n"
  1165. "\n"
  1166. " "JOURNAL_PARAMETER_DATA_ONLY":true or "JOURNAL_PARAMETER_DATA_ONLY":false\n"
  1167. " Quickly respond with data requested, without generating a\n"
  1168. " `histogram`, `facets` counters and `items`.\n"
  1169. "\n"
  1170. " "JOURNAL_PARAMETER_DELTA":true or "JOURNAL_PARAMETER_DELTA":false\n"
  1171. " When doing data only queries, include deltas for histogram, facets and items.\n"
  1172. "\n"
  1173. " "JOURNAL_PARAMETER_TAIL":true or "JOURNAL_PARAMETER_TAIL":false\n"
  1174. " When doing data only queries, respond with the newest messages,\n"
  1175. " and up to the anchor, but calculate deltas (if requested) for\n"
  1176. " the duration [anchor - before].\n"
  1177. "\n"
  1178. " "JOURNAL_PARAMETER_SLICE":true or "JOURNAL_PARAMETER_SLICE":false\n"
  1179. " When it is turned on, the plugin is executing filtering via libsystemd,\n"
  1180. " utilizing all the available indexes of the journal files.\n"
  1181. " When it is off, only the time constraint is handled by libsystemd and\n"
  1182. " all filtering is done by the plugin.\n"
  1183. " The default is: %s\n"
  1184. "\n"
  1185. " "JOURNAL_PARAMETER_SOURCE":SOURCE\n"
  1186. " Query only the specified journal sources.\n"
  1187. " Do an `"JOURNAL_PARAMETER_INFO"` query to find the sources.\n"
  1188. "\n"
  1189. " "JOURNAL_PARAMETER_BEFORE":TIMESTAMP_IN_SECONDS\n"
  1190. " Absolute or relative (to now) timestamp in seconds, to start the query.\n"
  1191. " The query is always executed from the most recent to the oldest log entry.\n"
  1192. " If not given the default is: now.\n"
  1193. "\n"
  1194. " "JOURNAL_PARAMETER_AFTER":TIMESTAMP_IN_SECONDS\n"
  1195. " Absolute or relative (to `before`) timestamp in seconds, to end the query.\n"
  1196. " If not given, the default is %d.\n"
  1197. "\n"
  1198. " "JOURNAL_PARAMETER_LAST":ITEMS\n"
  1199. " The number of items to return.\n"
  1200. " The default is %d.\n"
  1201. "\n"
  1202. " "JOURNAL_PARAMETER_ANCHOR":TIMESTAMP_IN_MICROSECONDS\n"
  1203. " Return items relative to this timestamp.\n"
  1204. " The exact items to be returned depend on the query `"JOURNAL_PARAMETER_DIRECTION"`.\n"
  1205. "\n"
  1206. " "JOURNAL_PARAMETER_DIRECTION":forward or "JOURNAL_PARAMETER_DIRECTION":backward\n"
  1207. " When set to `backward` (default) the items returned are the newest before the\n"
  1208. " `"JOURNAL_PARAMETER_ANCHOR"`, (or `"JOURNAL_PARAMETER_BEFORE"` if `"JOURNAL_PARAMETER_ANCHOR"` is not set)\n"
  1209. " When set to `forward` the items returned are the oldest after the\n"
  1210. " `"JOURNAL_PARAMETER_ANCHOR"`, (or `"JOURNAL_PARAMETER_AFTER"` if `"JOURNAL_PARAMETER_ANCHOR"` is not set)\n"
  1211. " The default is: %s\n"
  1212. "\n"
  1213. " "JOURNAL_PARAMETER_QUERY":SIMPLE_PATTERN\n"
  1214. " Do a full text search to find the log entries matching the pattern given.\n"
  1215. " The plugin is searching for matches on all fields of the database.\n"
  1216. "\n"
  1217. " "JOURNAL_PARAMETER_IF_MODIFIED_SINCE":TIMESTAMP_IN_MICROSECONDS\n"
  1218. " Each successful response, includes a `last_modified` field.\n"
  1219. " By providing the timestamp to the `"JOURNAL_PARAMETER_IF_MODIFIED_SINCE"` parameter,\n"
  1220. " the plugin will return 200 with a successful response, or 304 if the source has not\n"
  1221. " been modified since that timestamp.\n"
  1222. "\n"
  1223. " "JOURNAL_PARAMETER_HISTOGRAM":facet_id\n"
  1224. " Use the given `facet_id` for the histogram.\n"
  1225. " This parameter is ignored in `"JOURNAL_PARAMETER_DATA_ONLY"` mode.\n"
  1226. "\n"
  1227. " "JOURNAL_PARAMETER_FACETS":facet_id1,facet_id2,facet_id3,...\n"
  1228. " Add the given facets to the list of fields for which analysis is required.\n"
  1229. " The plugin will offer both a histogram and facet value counters for its values.\n"
  1230. " This parameter is ignored in `"JOURNAL_PARAMETER_DATA_ONLY"` mode.\n"
  1231. "\n"
  1232. " facet_id:value_id1,value_id2,value_id3,...\n"
  1233. " Apply filters to the query, based on the facet IDs returned.\n"
  1234. " Each `facet_id` can be given once, but multiple `facet_ids` can be given.\n"
  1235. "\n"
  1236. , program_name
  1237. , SYSTEMD_JOURNAL_FUNCTION_NAME
  1238. , SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION
  1239. , JOURNAL_DEFAULT_SLICE_MODE ? "true" : "false" // slice
  1240. , -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION
  1241. , SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY
  1242. , JOURNAL_DEFAULT_DIRECTION == FACETS_ANCHOR_DIRECTION_BACKWARD ? "backward" : "forward"
  1243. );
  1244. netdata_mutex_lock(&stdout_mutex);
  1245. pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb);
  1246. netdata_mutex_unlock(&stdout_mutex);
  1247. buffer_free(wb);
  1248. }
  1249. const char *errno_map[] = {
  1250. [1] = "1 (EPERM)", // "Operation not permitted",
  1251. [2] = "2 (ENOENT)", // "No such file or directory",
  1252. [3] = "3 (ESRCH)", // "No such process",
  1253. [4] = "4 (EINTR)", // "Interrupted system call",
  1254. [5] = "5 (EIO)", // "Input/output error",
  1255. [6] = "6 (ENXIO)", // "No such device or address",
  1256. [7] = "7 (E2BIG)", // "Argument list too long",
  1257. [8] = "8 (ENOEXEC)", // "Exec format error",
  1258. [9] = "9 (EBADF)", // "Bad file descriptor",
  1259. [10] = "10 (ECHILD)", // "No child processes",
  1260. [11] = "11 (EAGAIN)", // "Resource temporarily unavailable",
  1261. [12] = "12 (ENOMEM)", // "Cannot allocate memory",
  1262. [13] = "13 (EACCES)", // "Permission denied",
  1263. [14] = "14 (EFAULT)", // "Bad address",
  1264. [15] = "15 (ENOTBLK)", // "Block device required",
  1265. [16] = "16 (EBUSY)", // "Device or resource busy",
  1266. [17] = "17 (EEXIST)", // "File exists",
  1267. [18] = "18 (EXDEV)", // "Invalid cross-device link",
  1268. [19] = "19 (ENODEV)", // "No such device",
  1269. [20] = "20 (ENOTDIR)", // "Not a directory",
  1270. [21] = "21 (EISDIR)", // "Is a directory",
  1271. [22] = "22 (EINVAL)", // "Invalid argument",
  1272. [23] = "23 (ENFILE)", // "Too many open files in system",
  1273. [24] = "24 (EMFILE)", // "Too many open files",
  1274. [25] = "25 (ENOTTY)", // "Inappropriate ioctl for device",
  1275. [26] = "26 (ETXTBSY)", // "Text file busy",
  1276. [27] = "27 (EFBIG)", // "File too large",
  1277. [28] = "28 (ENOSPC)", // "No space left on device",
  1278. [29] = "29 (ESPIPE)", // "Illegal seek",
  1279. [30] = "30 (EROFS)", // "Read-only file system",
  1280. [31] = "31 (EMLINK)", // "Too many links",
  1281. [32] = "32 (EPIPE)", // "Broken pipe",
  1282. [33] = "33 (EDOM)", // "Numerical argument out of domain",
  1283. [34] = "34 (ERANGE)", // "Numerical result out of range",
  1284. [35] = "35 (EDEADLK)", // "Resource deadlock avoided",
  1285. [36] = "36 (ENAMETOOLONG)", // "File name too long",
  1286. [37] = "37 (ENOLCK)", // "No locks available",
  1287. [38] = "38 (ENOSYS)", // "Function not implemented",
  1288. [39] = "39 (ENOTEMPTY)", // "Directory not empty",
  1289. [40] = "40 (ELOOP)", // "Too many levels of symbolic links",
  1290. [42] = "42 (ENOMSG)", // "No message of desired type",
  1291. [43] = "43 (EIDRM)", // "Identifier removed",
  1292. [44] = "44 (ECHRNG)", // "Channel number out of range",
  1293. [45] = "45 (EL2NSYNC)", // "Level 2 not synchronized",
  1294. [46] = "46 (EL3HLT)", // "Level 3 halted",
  1295. [47] = "47 (EL3RST)", // "Level 3 reset",
  1296. [48] = "48 (ELNRNG)", // "Link number out of range",
  1297. [49] = "49 (EUNATCH)", // "Protocol driver not attached",
  1298. [50] = "50 (ENOCSI)", // "No CSI structure available",
  1299. [51] = "51 (EL2HLT)", // "Level 2 halted",
  1300. [52] = "52 (EBADE)", // "Invalid exchange",
  1301. [53] = "53 (EBADR)", // "Invalid request descriptor",
  1302. [54] = "54 (EXFULL)", // "Exchange full",
  1303. [55] = "55 (ENOANO)", // "No anode",
  1304. [56] = "56 (EBADRQC)", // "Invalid request code",
  1305. [57] = "57 (EBADSLT)", // "Invalid slot",
  1306. [59] = "59 (EBFONT)", // "Bad font file format",
  1307. [60] = "60 (ENOSTR)", // "Device not a stream",
  1308. [61] = "61 (ENODATA)", // "No data available",
  1309. [62] = "62 (ETIME)", // "Timer expired",
  1310. [63] = "63 (ENOSR)", // "Out of streams resources",
  1311. [64] = "64 (ENONET)", // "Machine is not on the network",
  1312. [65] = "65 (ENOPKG)", // "Package not installed",
  1313. [66] = "66 (EREMOTE)", // "Object is remote",
  1314. [67] = "67 (ENOLINK)", // "Link has been severed",
  1315. [68] = "68 (EADV)", // "Advertise error",
  1316. [69] = "69 (ESRMNT)", // "Srmount error",
  1317. [70] = "70 (ECOMM)", // "Communication error on send",
  1318. [71] = "71 (EPROTO)", // "Protocol error",
  1319. [72] = "72 (EMULTIHOP)", // "Multihop attempted",
  1320. [73] = "73 (EDOTDOT)", // "RFS specific error",
  1321. [74] = "74 (EBADMSG)", // "Bad message",
  1322. [75] = "75 (EOVERFLOW)", // "Value too large for defined data type",
  1323. [76] = "76 (ENOTUNIQ)", // "Name not unique on network",
  1324. [77] = "77 (EBADFD)", // "File descriptor in bad state",
  1325. [78] = "78 (EREMCHG)", // "Remote address changed",
  1326. [79] = "79 (ELIBACC)", // "Can not access a needed shared library",
  1327. [80] = "80 (ELIBBAD)", // "Accessing a corrupted shared library",
  1328. [81] = "81 (ELIBSCN)", // ".lib section in a.out corrupted",
  1329. [82] = "82 (ELIBMAX)", // "Attempting to link in too many shared libraries",
  1330. [83] = "83 (ELIBEXEC)", // "Cannot exec a shared library directly",
  1331. [84] = "84 (EILSEQ)", // "Invalid or incomplete multibyte or wide character",
  1332. [85] = "85 (ERESTART)", // "Interrupted system call should be restarted",
  1333. [86] = "86 (ESTRPIPE)", // "Streams pipe error",
  1334. [87] = "87 (EUSERS)", // "Too many users",
  1335. [88] = "88 (ENOTSOCK)", // "Socket operation on non-socket",
  1336. [89] = "89 (EDESTADDRREQ)", // "Destination address required",
  1337. [90] = "90 (EMSGSIZE)", // "Message too long",
  1338. [91] = "91 (EPROTOTYPE)", // "Protocol wrong type for socket",
  1339. [92] = "92 (ENOPROTOOPT)", // "Protocol not available",
  1340. [93] = "93 (EPROTONOSUPPORT)", // "Protocol not supported",
  1341. [94] = "94 (ESOCKTNOSUPPORT)", // "Socket type not supported",
  1342. [95] = "95 (ENOTSUP)", // "Operation not supported",
  1343. [96] = "96 (EPFNOSUPPORT)", // "Protocol family not supported",
  1344. [97] = "97 (EAFNOSUPPORT)", // "Address family not supported by protocol",
  1345. [98] = "98 (EADDRINUSE)", // "Address already in use",
  1346. [99] = "99 (EADDRNOTAVAIL)", // "Cannot assign requested address",
  1347. [100] = "100 (ENETDOWN)", // "Network is down",
  1348. [101] = "101 (ENETUNREACH)", // "Network is unreachable",
  1349. [102] = "102 (ENETRESET)", // "Network dropped connection on reset",
  1350. [103] = "103 (ECONNABORTED)", // "Software caused connection abort",
  1351. [104] = "104 (ECONNRESET)", // "Connection reset by peer",
  1352. [105] = "105 (ENOBUFS)", // "No buffer space available",
  1353. [106] = "106 (EISCONN)", // "Transport endpoint is already connected",
  1354. [107] = "107 (ENOTCONN)", // "Transport endpoint is not connected",
  1355. [108] = "108 (ESHUTDOWN)", // "Cannot send after transport endpoint shutdown",
  1356. [109] = "109 (ETOOMANYREFS)", // "Too many references: cannot splice",
  1357. [110] = "110 (ETIMEDOUT)", // "Connection timed out",
  1358. [111] = "111 (ECONNREFUSED)", // "Connection refused",
  1359. [112] = "112 (EHOSTDOWN)", // "Host is down",
  1360. [113] = "113 (EHOSTUNREACH)", // "No route to host",
  1361. [114] = "114 (EALREADY)", // "Operation already in progress",
  1362. [115] = "115 (EINPROGRESS)", // "Operation now in progress",
  1363. [116] = "116 (ESTALE)", // "Stale file handle",
  1364. [117] = "117 (EUCLEAN)", // "Structure needs cleaning",
  1365. [118] = "118 (ENOTNAM)", // "Not a XENIX named type file",
  1366. [119] = "119 (ENAVAIL)", // "No XENIX semaphores available",
  1367. [120] = "120 (EISNAM)", // "Is a named type file",
  1368. [121] = "121 (EREMOTEIO)", // "Remote I/O error",
  1369. [122] = "122 (EDQUOT)", // "Disk quota exceeded",
  1370. [123] = "123 (ENOMEDIUM)", // "No medium found",
  1371. [124] = "124 (EMEDIUMTYPE)", // "Wrong medium type",
  1372. [125] = "125 (ECANCELED)", // "Operation canceled",
  1373. [126] = "126 (ENOKEY)", // "Required key not available",
  1374. [127] = "127 (EKEYEXPIRED)", // "Key has expired",
  1375. [128] = "128 (EKEYREVOKED)", // "Key has been revoked",
  1376. [129] = "129 (EKEYREJECTED)", // "Key was rejected by service",
  1377. [130] = "130 (EOWNERDEAD)", // "Owner died",
  1378. [131] = "131 (ENOTRECOVERABLE)", // "State not recoverable",
  1379. [132] = "132 (ERFKILL)", // "Operation not possible due to RF-kill",
  1380. [133] = "133 (EHWPOISON)", // "Memory page has hardware error",
  1381. };
  1382. static const char *syslog_facility_to_name(int facility) {
  1383. switch (facility) {
  1384. case LOG_FAC(LOG_KERN): return "kern";
  1385. case LOG_FAC(LOG_USER): return "user";
  1386. case LOG_FAC(LOG_MAIL): return "mail";
  1387. case LOG_FAC(LOG_DAEMON): return "daemon";
  1388. case LOG_FAC(LOG_AUTH): return "auth";
  1389. case LOG_FAC(LOG_SYSLOG): return "syslog";
  1390. case LOG_FAC(LOG_LPR): return "lpr";
  1391. case LOG_FAC(LOG_NEWS): return "news";
  1392. case LOG_FAC(LOG_UUCP): return "uucp";
  1393. case LOG_FAC(LOG_CRON): return "cron";
  1394. case LOG_FAC(LOG_AUTHPRIV): return "authpriv";
  1395. case LOG_FAC(LOG_FTP): return "ftp";
  1396. case LOG_FAC(LOG_LOCAL0): return "local0";
  1397. case LOG_FAC(LOG_LOCAL1): return "local1";
  1398. case LOG_FAC(LOG_LOCAL2): return "local2";
  1399. case LOG_FAC(LOG_LOCAL3): return "local3";
  1400. case LOG_FAC(LOG_LOCAL4): return "local4";
  1401. case LOG_FAC(LOG_LOCAL5): return "local5";
  1402. case LOG_FAC(LOG_LOCAL6): return "local6";
  1403. case LOG_FAC(LOG_LOCAL7): return "local7";
  1404. default: return NULL;
  1405. }
  1406. }
  1407. static const char *syslog_priority_to_name(int priority) {
  1408. switch (priority) {
  1409. case LOG_ALERT: return "alert";
  1410. case LOG_CRIT: return "critical";
  1411. case LOG_DEBUG: return "debug";
  1412. case LOG_EMERG: return "panic";
  1413. case LOG_ERR: return "error";
  1414. case LOG_INFO: return "info";
  1415. case LOG_NOTICE: return "notice";
  1416. case LOG_WARNING: return "warning";
  1417. default: return NULL;
  1418. }
  1419. }
  1420. static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(FACETS *facets __maybe_unused, FACET_ROW *row, void *data __maybe_unused) {
  1421. // same to
  1422. // https://github.com/systemd/systemd/blob/aab9e4b2b86905a15944a1ac81e471b5b7075932/src/basic/terminal-util.c#L1501
  1423. // function get_log_colors()
  1424. FACET_ROW_KEY_VALUE *priority_rkv = dictionary_get(row->dict, "PRIORITY");
  1425. if(!priority_rkv || priority_rkv->empty)
  1426. return FACET_ROW_SEVERITY_NORMAL;
  1427. int priority = str2i(buffer_tostring(priority_rkv->wb));
  1428. if(priority <= LOG_ERR)
  1429. return FACET_ROW_SEVERITY_CRITICAL;
  1430. else if (priority <= LOG_WARNING)
  1431. return FACET_ROW_SEVERITY_WARNING;
  1432. else if(priority <= LOG_NOTICE)
  1433. return FACET_ROW_SEVERITY_NOTICE;
  1434. else if(priority >= LOG_DEBUG)
  1435. return FACET_ROW_SEVERITY_DEBUG;
  1436. return FACET_ROW_SEVERITY_NORMAL;
  1437. }
  1438. static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) {
  1439. static __thread char tmp[1024 + 1];
  1440. struct passwd pw, *result = NULL;
  1441. if (getpwuid_r(uid, &pw, tmp, sizeof(tmp), &result) != 0 || !result || !pw.pw_name || !(*pw.pw_name))
  1442. snprintfz(buffer, buffer_size - 1, "%u", uid);
  1443. else
  1444. snprintfz(buffer, buffer_size - 1, "%u (%s)", uid, pw.pw_name);
  1445. return buffer;
  1446. }
  1447. static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) {
  1448. static __thread char tmp[1024];
  1449. struct group grp, *result = NULL;
  1450. if (getgrgid_r(gid, &grp, tmp, sizeof(tmp), &result) != 0 || !result || !grp.gr_name || !(*grp.gr_name))
  1451. snprintfz(buffer, buffer_size - 1, "%u", gid);
  1452. else
  1453. snprintfz(buffer, buffer_size - 1, "%u (%s)", gid, grp.gr_name);
  1454. return buffer;
  1455. }
  1456. static void netdata_systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
  1457. const char *v = buffer_tostring(wb);
  1458. if(*v && isdigit(*v)) {
  1459. int facility = str2i(buffer_tostring(wb));
  1460. const char *name = syslog_facility_to_name(facility);
  1461. if (name) {
  1462. buffer_flush(wb);
  1463. buffer_strcat(wb, name);
  1464. }
  1465. }
  1466. }
  1467. static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
  1468. if(scope == FACETS_TRANSFORM_FACET_SORT)
  1469. return;
  1470. const char *v = buffer_tostring(wb);
  1471. if(*v && isdigit(*v)) {
  1472. int priority = str2i(buffer_tostring(wb));
  1473. const char *name = syslog_priority_to_name(priority);
  1474. if (name) {
  1475. buffer_flush(wb);
  1476. buffer_strcat(wb, name);
  1477. }
  1478. }
  1479. }
  1480. static void netdata_systemd_journal_transform_errno(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
  1481. if(scope == FACETS_TRANSFORM_FACET_SORT)
  1482. return;
  1483. const char *v = buffer_tostring(wb);
  1484. if(*v && isdigit(*v)) {
  1485. unsigned err_no = str2u(buffer_tostring(wb));
  1486. if(err_no > 0 && err_no < sizeof(errno_map) / sizeof(*errno_map)) {
  1487. const char *name = errno_map[err_no];
  1488. if(name) {
  1489. buffer_flush(wb);
  1490. buffer_strcat(wb, name);
  1491. }
  1492. }
  1493. }
  1494. }
  1495. // ----------------------------------------------------------------------------
  1496. // UID and GID transformation
  1497. #define UID_GID_HASHTABLE_SIZE 10000
  1498. struct word_t2str_hashtable_entry {
  1499. struct word_t2str_hashtable_entry *next;
  1500. Word_t hash;
  1501. size_t len;
  1502. char str[];
  1503. };
  1504. struct word_t2str_hashtable {
  1505. SPINLOCK spinlock;
  1506. size_t size;
  1507. struct word_t2str_hashtable_entry *hashtable[UID_GID_HASHTABLE_SIZE];
  1508. };
  1509. struct word_t2str_hashtable uid_hashtable = {
  1510. .size = UID_GID_HASHTABLE_SIZE,
  1511. };
  1512. struct word_t2str_hashtable gid_hashtable = {
  1513. .size = UID_GID_HASHTABLE_SIZE,
  1514. };
  1515. struct word_t2str_hashtable_entry **word_t2str_hashtable_slot(struct word_t2str_hashtable *ht, Word_t hash) {
  1516. size_t slot = hash % ht->size;
  1517. struct word_t2str_hashtable_entry **e = &ht->hashtable[slot];
  1518. while(*e && (*e)->hash != hash)
  1519. e = &((*e)->next);
  1520. return e;
  1521. }
  1522. const char *uid_to_username_cached(uid_t uid, size_t *length) {
  1523. spinlock_lock(&uid_hashtable.spinlock);
  1524. struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&uid_hashtable, uid);
  1525. if(!(*e)) {
  1526. static __thread char buf[1024];
  1527. const char *name = uid_to_username(uid, buf, sizeof(buf));
  1528. size_t size = strlen(name) + 1;
  1529. *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size);
  1530. (*e)->len = size - 1;
  1531. (*e)->hash = uid;
  1532. memcpy((*e)->str, name, size);
  1533. }
  1534. spinlock_unlock(&uid_hashtable.spinlock);
  1535. *length = (*e)->len;
  1536. return (*e)->str;
  1537. }
  1538. const char *gid_to_groupname_cached(gid_t gid, size_t *length) {
  1539. spinlock_lock(&gid_hashtable.spinlock);
  1540. struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&gid_hashtable, gid);
  1541. if(!(*e)) {
  1542. static __thread char buf[1024];
  1543. const char *name = gid_to_groupname(gid, buf, sizeof(buf));
  1544. size_t size = strlen(name) + 1;
  1545. *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size);
  1546. (*e)->len = size - 1;
  1547. (*e)->hash = gid;
  1548. memcpy((*e)->str, name, size);
  1549. }
  1550. spinlock_unlock(&gid_hashtable.spinlock);
  1551. *length = (*e)->len;
  1552. return (*e)->str;
  1553. }
  1554. DICTIONARY *boot_ids_to_first_ut = NULL;
  1555. static void netdata_systemd_journal_transform_boot_id(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
  1556. const char *boot_id = buffer_tostring(wb);
  1557. if(*boot_id && isxdigit(*boot_id)) {
  1558. usec_t ut = UINT64_MAX;
  1559. usec_t *p_ut = dictionary_get(boot_ids_to_first_ut, boot_id);
  1560. if(!p_ut) {
  1561. struct journal_file *jf;
  1562. dfe_start_read(journal_files_registry, jf) {
  1563. const char *files[2] = {
  1564. [0] = jf_dfe.name,
  1565. [1] = NULL,
  1566. };
  1567. sd_journal *j = NULL;
  1568. if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j)
  1569. continue;
  1570. char m[100];
  1571. size_t len = snprintfz(m, sizeof(m), "_BOOT_ID=%s", boot_id);
  1572. usec_t t_ut = 0;
  1573. if(sd_journal_add_match(j, m, len) < 0 ||
  1574. sd_journal_seek_head(j) < 0 ||
  1575. sd_journal_next(j) < 0 ||
  1576. sd_journal_get_realtime_usec(j, &t_ut) < 0 || !t_ut) {
  1577. sd_journal_close(j);
  1578. continue;
  1579. }
  1580. if(t_ut < ut)
  1581. ut = t_ut;
  1582. sd_journal_close(j);
  1583. }
  1584. dfe_done(jf);
  1585. dictionary_set(boot_ids_to_first_ut, boot_id, &ut, sizeof(ut));
  1586. }
  1587. else
  1588. ut = *p_ut;
  1589. if(ut != UINT64_MAX) {
  1590. time_t timestamp_sec = (time_t)(ut / USEC_PER_SEC);
  1591. struct tm tm;
  1592. char buffer[30];
  1593. gmtime_r(&timestamp_sec, &tm);
  1594. strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm);
  1595. switch(scope) {
  1596. default:
  1597. case FACETS_TRANSFORM_DATA:
  1598. case FACETS_TRANSFORM_VALUE:
  1599. buffer_sprintf(wb, " (%s UTC) ", buffer);
  1600. break;
  1601. case FACETS_TRANSFORM_FACET:
  1602. case FACETS_TRANSFORM_FACET_SORT:
  1603. case FACETS_TRANSFORM_HISTOGRAM:
  1604. buffer_flush(wb);
  1605. buffer_sprintf(wb, "%s UTC", buffer);
  1606. break;
  1607. }
  1608. }
  1609. }
  1610. }
  1611. static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
  1612. if(scope == FACETS_TRANSFORM_FACET_SORT)
  1613. return;
  1614. const char *v = buffer_tostring(wb);
  1615. if(*v && isdigit(*v)) {
  1616. uid_t uid = str2i(buffer_tostring(wb));
  1617. size_t len;
  1618. const char *name = uid_to_username_cached(uid, &len);
  1619. buffer_contents_replace(wb, name, len);
  1620. }
  1621. }
  1622. static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
  1623. if(scope == FACETS_TRANSFORM_FACET_SORT)
  1624. return;
  1625. const char *v = buffer_tostring(wb);
  1626. if(*v && isdigit(*v)) {
  1627. gid_t gid = str2i(buffer_tostring(wb));
  1628. size_t len;
  1629. const char *name = gid_to_groupname_cached(gid, &len);
  1630. buffer_contents_replace(wb, name, len);
  1631. }
  1632. }
  1633. const char *linux_capabilities[] = {
  1634. [CAP_CHOWN] = "CHOWN",
  1635. [CAP_DAC_OVERRIDE] = "DAC_OVERRIDE",
  1636. [CAP_DAC_READ_SEARCH] = "DAC_READ_SEARCH",
  1637. [CAP_FOWNER] = "FOWNER",
  1638. [CAP_FSETID] = "FSETID",
  1639. [CAP_KILL] = "KILL",
  1640. [CAP_SETGID] = "SETGID",
  1641. [CAP_SETUID] = "SETUID",
  1642. [CAP_SETPCAP] = "SETPCAP",
  1643. [CAP_LINUX_IMMUTABLE] = "LINUX_IMMUTABLE",
  1644. [CAP_NET_BIND_SERVICE] = "NET_BIND_SERVICE",
  1645. [CAP_NET_BROADCAST] = "NET_BROADCAST",
  1646. [CAP_NET_ADMIN] = "NET_ADMIN",
  1647. [CAP_NET_RAW] = "NET_RAW",
  1648. [CAP_IPC_LOCK] = "IPC_LOCK",
  1649. [CAP_IPC_OWNER] = "IPC_OWNER",
  1650. [CAP_SYS_MODULE] = "SYS_MODULE",
  1651. [CAP_SYS_RAWIO] = "SYS_RAWIO",
  1652. [CAP_SYS_CHROOT] = "SYS_CHROOT",
  1653. [CAP_SYS_PTRACE] = "SYS_PTRACE",
  1654. [CAP_SYS_PACCT] = "SYS_PACCT",
  1655. [CAP_SYS_ADMIN] = "SYS_ADMIN",
  1656. [CAP_SYS_BOOT] = "SYS_BOOT",
  1657. [CAP_SYS_NICE] = "SYS_NICE",
  1658. [CAP_SYS_RESOURCE] = "SYS_RESOURCE",
  1659. [CAP_SYS_TIME] = "SYS_TIME",
  1660. [CAP_SYS_TTY_CONFIG] = "SYS_TTY_CONFIG",
  1661. [CAP_MKNOD] = "MKNOD",
  1662. [CAP_LEASE] = "LEASE",
  1663. [CAP_AUDIT_WRITE] = "AUDIT_WRITE",
  1664. [CAP_AUDIT_CONTROL] = "AUDIT_CONTROL",
  1665. [CAP_SETFCAP] = "SETFCAP",
  1666. [CAP_MAC_OVERRIDE] = "MAC_OVERRIDE",
  1667. [CAP_MAC_ADMIN] = "MAC_ADMIN",
  1668. [CAP_SYSLOG] = "SYSLOG",
  1669. [CAP_WAKE_ALARM] = "WAKE_ALARM",
  1670. [CAP_BLOCK_SUSPEND] = "BLOCK_SUSPEND",
  1671. [37 /*CAP_AUDIT_READ*/] = "AUDIT_READ",
  1672. [38 /*CAP_PERFMON*/] = "PERFMON",
  1673. [39 /*CAP_BPF*/] = "BPF",
  1674. [40 /* CAP_CHECKPOINT_RESTORE */] = "CHECKPOINT_RESTORE",
  1675. };
  1676. static void netdata_systemd_journal_transform_cap_effective(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
  1677. if(scope == FACETS_TRANSFORM_FACET_SORT)
  1678. return;
  1679. const char *v = buffer_tostring(wb);
  1680. if(*v && isdigit(*v)) {
  1681. uint64_t cap = strtoul(buffer_tostring(wb), NULL, 16);
  1682. if(cap) {
  1683. buffer_fast_strcat(wb, " (", 2);
  1684. for (size_t i = 0, added = 0; i < sizeof(linux_capabilities) / sizeof(linux_capabilities[0]); i++) {
  1685. if (linux_capabilities[i] && (cap & (1ULL << i))) {
  1686. if (added)
  1687. buffer_fast_strcat(wb, " | ", 3);
  1688. buffer_strcat(wb, linux_capabilities[i]);
  1689. added++;
  1690. }
  1691. }
  1692. buffer_fast_strcat(wb, ")", 1);
  1693. }
  1694. }
  1695. }
  1696. static void netdata_systemd_journal_transform_timestamp_usec(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
  1697. if(scope == FACETS_TRANSFORM_FACET_SORT)
  1698. return;
  1699. const char *v = buffer_tostring(wb);
  1700. if(*v && isdigit(*v)) {
  1701. uint64_t ut = str2ull(buffer_tostring(wb), NULL);
  1702. if(ut) {
  1703. time_t timestamp_sec = ut / USEC_PER_SEC;
  1704. struct tm tm;
  1705. char buffer[30];
  1706. gmtime_r(&timestamp_sec, &tm);
  1707. strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm);
  1708. buffer_sprintf(wb, " (%s.%06llu UTC)", buffer, ut % USEC_PER_SEC);
  1709. }
  1710. }
  1711. }
  1712. // ----------------------------------------------------------------------------
  1713. static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
  1714. FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID");
  1715. const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET;
  1716. const char *identifier = NULL;
  1717. FACET_ROW_KEY_VALUE *container_name_rkv = dictionary_get(row->dict, "CONTAINER_NAME");
  1718. if(container_name_rkv && !container_name_rkv->empty)
  1719. identifier = buffer_tostring(container_name_rkv->wb);
  1720. if(!identifier) {
  1721. FACET_ROW_KEY_VALUE *syslog_identifier_rkv = dictionary_get(row->dict, "SYSLOG_IDENTIFIER");
  1722. if(syslog_identifier_rkv && !syslog_identifier_rkv->empty)
  1723. identifier = buffer_tostring(syslog_identifier_rkv->wb);
  1724. if(!identifier) {
  1725. FACET_ROW_KEY_VALUE *comm_rkv = dictionary_get(row->dict, "_COMM");
  1726. if(comm_rkv && !comm_rkv->empty)
  1727. identifier = buffer_tostring(comm_rkv->wb);
  1728. }
  1729. }
  1730. buffer_flush(rkv->wb);
  1731. if(!identifier)
  1732. buffer_strcat(rkv->wb, FACET_VALUE_UNSET);
  1733. else
  1734. buffer_sprintf(rkv->wb, "%s[%s]", identifier, pid);
  1735. buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb));
  1736. }
  1737. static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row __maybe_unused, void *data __maybe_unused) {
  1738. buffer_json_add_array_item_object(json_array);
  1739. buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb));
  1740. buffer_json_object_close(json_array);
  1741. }
  1742. DICTIONARY *function_query_status_dict = NULL;
  1743. static void function_systemd_journal_progress(BUFFER *wb, const char *transaction, const char *progress_id) {
  1744. if(!progress_id || !(*progress_id)) {
  1745. netdata_mutex_lock(&stdout_mutex);
  1746. pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, "missing progress id");
  1747. netdata_mutex_unlock(&stdout_mutex);
  1748. return;
  1749. }
  1750. const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(function_query_status_dict, progress_id);
  1751. if(!item) {
  1752. netdata_mutex_lock(&stdout_mutex);
  1753. pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, "progress id is not found here");
  1754. netdata_mutex_unlock(&stdout_mutex);
  1755. return;
  1756. }
  1757. FUNCTION_QUERY_STATUS *fqs = dictionary_acquired_item_value(item);
  1758. usec_t now_monotonic_ut = now_monotonic_usec();
  1759. if(now_monotonic_ut + 10 * USEC_PER_SEC > fqs->stop_monotonic_ut)
  1760. fqs->stop_monotonic_ut = now_monotonic_ut + 10 * USEC_PER_SEC;
  1761. usec_t duration_ut = now_monotonic_ut - fqs->started_monotonic_ut;
  1762. size_t files_matched = fqs->files_matched;
  1763. size_t file_working = fqs->file_working;
  1764. if(file_working > files_matched)
  1765. files_matched = file_working;
  1766. size_t rows_read = __atomic_load_n(&fqs->rows_read, __ATOMIC_RELAXED);
  1767. size_t bytes_read = __atomic_load_n(&fqs->bytes_read, __ATOMIC_RELAXED);
  1768. buffer_flush(wb);
  1769. buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
  1770. buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
  1771. buffer_json_member_add_string(wb, "type", "table");
  1772. buffer_json_member_add_uint64(wb, "running_duration_usec", duration_ut);
  1773. buffer_json_member_add_double(wb, "progress", (double)file_working * 100.0 / (double)files_matched);
  1774. char msg[1024 + 1];
  1775. snprintfz(msg, 1024,
  1776. "Read %zu rows (%0.0f rows/s), "
  1777. "data %0.1f MB (%0.1f MB/s), "
  1778. "file %zu of %zu",
  1779. rows_read, (double)rows_read / (double)duration_ut * (double)USEC_PER_SEC,
  1780. (double)bytes_read / 1024.0 / 1024.0, ((double)bytes_read / (double)duration_ut * (double)USEC_PER_SEC) / 1024.0 / 1024.0,
  1781. file_working, files_matched
  1782. );
  1783. buffer_json_member_add_string(wb, "message", msg);
  1784. buffer_json_finalize(wb);
  1785. netdata_mutex_lock(&stdout_mutex);
  1786. pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", now_realtime_sec() + 1, wb);
  1787. netdata_mutex_unlock(&stdout_mutex);
  1788. dictionary_acquired_item_release(function_query_status_dict, item);
  1789. }
  1790. static void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) {
  1791. fstat_thread_calls = 0;
  1792. fstat_thread_cached_responses = 0;
  1793. journal_files_registry_update();
  1794. BUFFER *wb = buffer_create(0, NULL);
  1795. buffer_flush(wb);
  1796. buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
  1797. usec_t now_monotonic_ut = now_monotonic_usec();
  1798. FUNCTION_QUERY_STATUS tmp_fqs = {
  1799. .cancelled = cancelled,
  1800. .started_monotonic_ut = now_monotonic_ut,
  1801. .stop_monotonic_ut = now_monotonic_ut + (timeout * USEC_PER_SEC),
  1802. };
  1803. FUNCTION_QUERY_STATUS *fqs = NULL;
  1804. const DICTIONARY_ITEM *fqs_item = NULL;
  1805. FACETS *facets = facets_create(50, FACETS_OPTION_ALL_KEYS_FTS,
  1806. SYSTEMD_ALWAYS_VISIBLE_KEYS,
  1807. SYSTEMD_KEYS_INCLUDED_IN_FACETS,
  1808. SYSTEMD_KEYS_EXCLUDED_FROM_FACETS);
  1809. facets_accepted_param(facets, JOURNAL_PARAMETER_INFO);
  1810. facets_accepted_param(facets, JOURNAL_PARAMETER_SOURCE);
  1811. facets_accepted_param(facets, JOURNAL_PARAMETER_AFTER);
  1812. facets_accepted_param(facets, JOURNAL_PARAMETER_BEFORE);
  1813. facets_accepted_param(facets, JOURNAL_PARAMETER_ANCHOR);
  1814. facets_accepted_param(facets, JOURNAL_PARAMETER_DIRECTION);
  1815. facets_accepted_param(facets, JOURNAL_PARAMETER_LAST);
  1816. facets_accepted_param(facets, JOURNAL_PARAMETER_QUERY);
  1817. facets_accepted_param(facets, JOURNAL_PARAMETER_FACETS);
  1818. facets_accepted_param(facets, JOURNAL_PARAMETER_HISTOGRAM);
  1819. facets_accepted_param(facets, JOURNAL_PARAMETER_IF_MODIFIED_SINCE);
  1820. facets_accepted_param(facets, JOURNAL_PARAMETER_DATA_ONLY);
  1821. facets_accepted_param(facets, JOURNAL_PARAMETER_ID);
  1822. facets_accepted_param(facets, JOURNAL_PARAMETER_PROGRESS);
  1823. facets_accepted_param(facets, JOURNAL_PARAMETER_DELTA);
  1824. facets_accepted_param(facets, JOURNAL_PARAMETER_TAIL);
  1825. #ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
  1826. facets_accepted_param(facets, JOURNAL_PARAMETER_SLICE);
  1827. #endif // HAVE_SD_JOURNAL_RESTART_FIELDS
  1828. // register the fields in the order you want them on the dashboard
  1829. facets_register_row_severity(facets, syslog_priority_to_facet_severity, NULL);
  1830. facets_register_key_name(facets, "_HOSTNAME",
  1831. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS);
  1832. facets_register_dynamic_key_name(facets, JOURNAL_KEY_ND_JOURNAL_PROCESS,
  1833. FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS,
  1834. netdata_systemd_journal_dynamic_row_id, NULL);
  1835. facets_register_key_name(facets, "MESSAGE",
  1836. FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT |
  1837. FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS);
  1838. // facets_register_dynamic_key_name(facets, "MESSAGE",
  1839. // FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_RICH_TEXT |
  1840. // FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS,
  1841. // netdata_systemd_journal_rich_message, NULL);
  1842. facets_register_key_name_transformation(facets, "PRIORITY",
  1843. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1844. netdata_systemd_journal_transform_priority, NULL);
  1845. facets_register_key_name_transformation(facets, "SYSLOG_FACILITY",
  1846. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1847. netdata_systemd_journal_transform_syslog_facility, NULL);
  1848. facets_register_key_name_transformation(facets, "ERRNO",
  1849. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1850. netdata_systemd_journal_transform_errno, NULL);
  1851. facets_register_key_name(facets, JOURNAL_KEY_ND_JOURNAL_FILE,
  1852. FACET_KEY_OPTION_NEVER_FACET);
  1853. facets_register_key_name(facets, "SYSLOG_IDENTIFIER",
  1854. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
  1855. facets_register_key_name(facets, "UNIT",
  1856. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
  1857. facets_register_key_name(facets, "USER_UNIT",
  1858. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
  1859. facets_register_key_name_transformation(facets, "_BOOT_ID",
  1860. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1861. netdata_systemd_journal_transform_boot_id, NULL);
  1862. facets_register_key_name_transformation(facets, "_SYSTEMD_OWNER_UID",
  1863. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1864. netdata_systemd_journal_transform_uid, NULL);
  1865. facets_register_key_name_transformation(facets, "_UID",
  1866. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1867. netdata_systemd_journal_transform_uid, NULL);
  1868. facets_register_key_name_transformation(facets, "OBJECT_SYSTEMD_OWNER_UID",
  1869. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1870. netdata_systemd_journal_transform_uid, NULL);
  1871. facets_register_key_name_transformation(facets, "OBJECT_UID",
  1872. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1873. netdata_systemd_journal_transform_uid, NULL);
  1874. facets_register_key_name_transformation(facets, "_GID",
  1875. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1876. netdata_systemd_journal_transform_gid, NULL);
  1877. facets_register_key_name_transformation(facets, "OBJECT_GID",
  1878. FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1879. netdata_systemd_journal_transform_gid, NULL);
  1880. facets_register_key_name_transformation(facets, "_CAP_EFFECTIVE",
  1881. FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1882. netdata_systemd_journal_transform_cap_effective, NULL);
  1883. facets_register_key_name_transformation(facets, "_AUDIT_LOGINUID",
  1884. FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1885. netdata_systemd_journal_transform_uid, NULL);
  1886. facets_register_key_name_transformation(facets, "OBJECT_AUDIT_LOGINUID",
  1887. FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1888. netdata_systemd_journal_transform_uid, NULL);
  1889. facets_register_key_name_transformation(facets, "_SOURCE_REALTIME_TIMESTAMP",
  1890. FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
  1891. netdata_systemd_journal_transform_timestamp_usec, NULL);
  1892. // ------------------------------------------------------------------------
  1893. // parse the parameters
  1894. bool info = false, data_only = false, progress = false, slice = JOURNAL_DEFAULT_SLICE_MODE, delta = false, tail = false;
  1895. time_t after_s = 0, before_s = 0;
  1896. usec_t anchor = 0;
  1897. usec_t if_modified_since = 0;
  1898. size_t last = 0;
  1899. FACETS_ANCHOR_DIRECTION direction = JOURNAL_DEFAULT_DIRECTION;
  1900. const char *query = NULL;
  1901. const char *chart = NULL;
  1902. const char *source = NULL;
  1903. const char *progress_id = NULL;
  1904. SD_JOURNAL_FILE_SOURCE_TYPE source_type = SDJF_ALL;
  1905. size_t filters = 0;
  1906. buffer_json_member_add_object(wb, "_request");
  1907. char *words[SYSTEMD_JOURNAL_MAX_PARAMS] = { NULL };
  1908. size_t num_words = quoted_strings_splitter_pluginsd(function, words, SYSTEMD_JOURNAL_MAX_PARAMS);
  1909. for(int i = 1; i < SYSTEMD_JOURNAL_MAX_PARAMS ;i++) {
  1910. char *keyword = get_word(words, num_words, i);
  1911. if(!keyword) break;
  1912. if(strcmp(keyword, JOURNAL_PARAMETER_HELP) == 0) {
  1913. netdata_systemd_journal_function_help(transaction);
  1914. goto cleanup;
  1915. }
  1916. else if(strcmp(keyword, JOURNAL_PARAMETER_INFO) == 0) {
  1917. info = true;
  1918. }
  1919. else if(strcmp(keyword, JOURNAL_PARAMETER_PROGRESS) == 0) {
  1920. progress = true;
  1921. }
  1922. else if(strncmp(keyword, JOURNAL_PARAMETER_DELTA ":", sizeof(JOURNAL_PARAMETER_DELTA ":") - 1) == 0) {
  1923. char *v = &keyword[sizeof(JOURNAL_PARAMETER_DELTA ":") - 1];
  1924. if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
  1925. delta = false;
  1926. else
  1927. delta = true;
  1928. }
  1929. else if(strncmp(keyword, JOURNAL_PARAMETER_TAIL ":", sizeof(JOURNAL_PARAMETER_TAIL ":") - 1) == 0) {
  1930. char *v = &keyword[sizeof(JOURNAL_PARAMETER_TAIL ":") - 1];
  1931. if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
  1932. tail = false;
  1933. else
  1934. tail = true;
  1935. }
  1936. else if(strncmp(keyword, JOURNAL_PARAMETER_DATA_ONLY ":", sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1) == 0) {
  1937. char *v = &keyword[sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1];
  1938. if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
  1939. data_only = false;
  1940. else
  1941. data_only = true;
  1942. }
  1943. else if(strncmp(keyword, JOURNAL_PARAMETER_SLICE ":", sizeof(JOURNAL_PARAMETER_SLICE ":") - 1) == 0) {
  1944. char *v = &keyword[sizeof(JOURNAL_PARAMETER_SLICE ":") - 1];
  1945. if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
  1946. slice = false;
  1947. else
  1948. slice = true;
  1949. }
  1950. else if(strncmp(keyword, JOURNAL_PARAMETER_ID ":", sizeof(JOURNAL_PARAMETER_ID ":") - 1) == 0) {
  1951. char *id = &keyword[sizeof(JOURNAL_PARAMETER_ID ":") - 1];
  1952. if(*id)
  1953. progress_id = id;
  1954. }
  1955. else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) {
  1956. source = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1];
  1957. if(strcmp(source, SDJF_SOURCE_ALL_NAME) == 0) {
  1958. source_type = SDJF_ALL;
  1959. source = NULL;
  1960. }
  1961. else if(strcmp(source, SDJF_SOURCE_LOCAL_NAME) == 0) {
  1962. source_type = SDJF_LOCAL;
  1963. source = NULL;
  1964. }
  1965. else if(strcmp(source, SDJF_SOURCE_REMOTES_NAME) == 0) {
  1966. source_type = SDJF_REMOTE;
  1967. source = NULL;
  1968. }
  1969. else if(strcmp(source, SDJF_SOURCE_NAMESPACES_NAME) == 0) {
  1970. source_type = SDJF_NAMESPACE;
  1971. source = NULL;
  1972. }
  1973. else if(strcmp(source, SDJF_SOURCE_LOCAL_SYSTEM_NAME) == 0) {
  1974. source_type = SDJF_LOCAL | SDJF_SYSTEM;
  1975. source = NULL;
  1976. }
  1977. else if(strcmp(source, SDJF_SOURCE_LOCAL_USERS_NAME) == 0) {
  1978. source_type = SDJF_LOCAL | SDJF_USER;
  1979. source = NULL;
  1980. }
  1981. else if(strcmp(source, SDJF_SOURCE_LOCAL_OTHER_NAME) == 0) {
  1982. source_type = SDJF_LOCAL | SDJF_OTHER;
  1983. source = NULL;
  1984. }
  1985. else {
  1986. source_type = SDJF_ALL;
  1987. // else, match the source, whatever it is
  1988. }
  1989. }
  1990. else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", sizeof(JOURNAL_PARAMETER_AFTER ":") - 1) == 0) {
  1991. after_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_AFTER ":") - 1]);
  1992. }
  1993. else if(strncmp(keyword, JOURNAL_PARAMETER_BEFORE ":", sizeof(JOURNAL_PARAMETER_BEFORE ":") - 1) == 0) {
  1994. before_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_BEFORE ":") - 1]);
  1995. }
  1996. else if(strncmp(keyword, JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":", sizeof(JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":") - 1) == 0) {
  1997. if_modified_since = str2ull(&keyword[sizeof(JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":") - 1], NULL);
  1998. }
  1999. else if(strncmp(keyword, JOURNAL_PARAMETER_ANCHOR ":", sizeof(JOURNAL_PARAMETER_ANCHOR ":") - 1) == 0) {
  2000. anchor = str2ull(&keyword[sizeof(JOURNAL_PARAMETER_ANCHOR ":") - 1], NULL);
  2001. }
  2002. else if(strncmp(keyword, JOURNAL_PARAMETER_DIRECTION ":", sizeof(JOURNAL_PARAMETER_DIRECTION ":") - 1) == 0) {
  2003. direction = strcasecmp(&keyword[sizeof(JOURNAL_PARAMETER_DIRECTION ":") - 1], "forward") == 0 ? FACETS_ANCHOR_DIRECTION_FORWARD : FACETS_ANCHOR_DIRECTION_BACKWARD;
  2004. }
  2005. else if(strncmp(keyword, JOURNAL_PARAMETER_LAST ":", sizeof(JOURNAL_PARAMETER_LAST ":") - 1) == 0) {
  2006. last = str2ul(&keyword[sizeof(JOURNAL_PARAMETER_LAST ":") - 1]);
  2007. }
  2008. else if(strncmp(keyword, JOURNAL_PARAMETER_QUERY ":", sizeof(JOURNAL_PARAMETER_QUERY ":") - 1) == 0) {
  2009. query= &keyword[sizeof(JOURNAL_PARAMETER_QUERY ":") - 1];
  2010. }
  2011. else if(strncmp(keyword, JOURNAL_PARAMETER_HISTOGRAM ":", sizeof(JOURNAL_PARAMETER_HISTOGRAM ":") - 1) == 0) {
  2012. chart = &keyword[sizeof(JOURNAL_PARAMETER_HISTOGRAM ":") - 1];
  2013. }
  2014. else if(strncmp(keyword, JOURNAL_PARAMETER_FACETS ":", sizeof(JOURNAL_PARAMETER_FACETS ":") - 1) == 0) {
  2015. char *value = &keyword[sizeof(JOURNAL_PARAMETER_FACETS ":") - 1];
  2016. if(*value) {
  2017. buffer_json_member_add_array(wb, JOURNAL_PARAMETER_FACETS);
  2018. while(value) {
  2019. char *sep = strchr(value, ',');
  2020. if(sep)
  2021. *sep++ = '\0';
  2022. facets_register_facet_id(facets, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER);
  2023. buffer_json_add_array_item_string(wb, value);
  2024. value = sep;
  2025. }
  2026. buffer_json_array_close(wb); // JOURNAL_PARAMETER_FACETS
  2027. }
  2028. }
  2029. else {
  2030. char *value = strchr(keyword, ':');
  2031. if(value) {
  2032. *value++ = '\0';
  2033. buffer_json_member_add_array(wb, keyword);
  2034. while(value) {
  2035. char *sep = strchr(value, ',');
  2036. if(sep)
  2037. *sep++ = '\0';
  2038. facets_register_facet_id_filter(facets, keyword, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER);
  2039. buffer_json_add_array_item_string(wb, value);
  2040. filters++;
  2041. value = sep;
  2042. }
  2043. buffer_json_array_close(wb); // keyword
  2044. }
  2045. }
  2046. }
  2047. // ------------------------------------------------------------------------
  2048. // put this request into the progress db
  2049. if(progress_id && *progress_id) {
  2050. fqs_item = dictionary_set_and_acquire_item(function_query_status_dict, progress_id, &tmp_fqs, sizeof(tmp_fqs));
  2051. fqs = dictionary_acquired_item_value(fqs_item);
  2052. }
  2053. else {
  2054. // no progress id given, proceed without registering our progress in the dictionary
  2055. fqs = &tmp_fqs;
  2056. fqs_item = NULL;
  2057. }
  2058. // ------------------------------------------------------------------------
  2059. // validate parameters
  2060. time_t now_s = now_realtime_sec();
  2061. time_t expires = now_s + 1;
  2062. if(!after_s && !before_s) {
  2063. before_s = now_s;
  2064. after_s = before_s - SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION;
  2065. }
  2066. else
  2067. rrdr_relative_window_to_absolute(&after_s, &before_s, now_s);
  2068. if(after_s > before_s) {
  2069. time_t tmp = after_s;
  2070. after_s = before_s;
  2071. before_s = tmp;
  2072. }
  2073. if(after_s == before_s)
  2074. after_s = before_s - SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION;
  2075. if(!last)
  2076. last = SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY;
  2077. // ------------------------------------------------------------------------
  2078. // set query time-frame, anchors and direction
  2079. fqs->after_ut = after_s * USEC_PER_SEC;
  2080. fqs->before_ut = (before_s * USEC_PER_SEC) + USEC_PER_SEC - 1;
  2081. fqs->if_modified_since = if_modified_since;
  2082. fqs->data_only = data_only;
  2083. fqs->delta = (fqs->data_only) ? delta : false;
  2084. fqs->tail = (fqs->data_only && fqs->if_modified_since) ? tail : false;
  2085. fqs->source = string_strdupz(source);
  2086. fqs->source_type = source_type;
  2087. fqs->entries = last;
  2088. fqs->last_modified = 0;
  2089. fqs->filters = filters;
  2090. fqs->query = (query && *query) ? query : NULL;
  2091. fqs->histogram = (chart && *chart) ? chart : NULL;
  2092. fqs->direction = direction;
  2093. fqs->anchor.start_ut = anchor;
  2094. fqs->anchor.stop_ut = 0;
  2095. if(fqs->anchor.start_ut && fqs->tail) {
  2096. // a tail request
  2097. // we need the top X entries from BEFORE
  2098. // but, we need to calculate the facets and the
  2099. // histogram up to the anchor
  2100. fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
  2101. fqs->anchor.start_ut = 0;
  2102. fqs->anchor.stop_ut = anchor;
  2103. }
  2104. if(anchor && anchor < fqs->after_ut) {
  2105. log_fqs(fqs, "received anchor is too small for query timeframe, ignoring anchor");
  2106. anchor = 0;
  2107. fqs->anchor.start_ut = 0;
  2108. fqs->anchor.stop_ut = 0;
  2109. fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
  2110. }
  2111. else if(anchor > fqs->before_ut) {
  2112. log_fqs(fqs, "received anchor is too big for query timeframe, ignoring anchor");
  2113. anchor = 0;
  2114. fqs->anchor.start_ut = 0;
  2115. fqs->anchor.stop_ut = 0;
  2116. fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
  2117. }
  2118. facets_set_anchor(facets, fqs->anchor.start_ut, fqs->anchor.stop_ut, fqs->direction);
  2119. facets_set_additional_options(facets,
  2120. ((fqs->data_only) ? FACETS_OPTION_DATA_ONLY : 0) |
  2121. ((fqs->delta) ? FACETS_OPTION_SHOW_DELTAS : 0));
  2122. // ------------------------------------------------------------------------
  2123. // set the rest of the query parameters
  2124. facets_set_items(facets, fqs->entries);
  2125. facets_set_query(facets, fqs->query);
  2126. #ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
  2127. fqs->slice = slice;
  2128. if(slice)
  2129. facets_enable_slice_mode(facets);
  2130. #else
  2131. fqs->slice = false;
  2132. #endif
  2133. if(fqs->histogram)
  2134. facets_set_timeframe_and_histogram_by_id(facets, fqs->histogram, fqs->after_ut, fqs->before_ut);
  2135. else
  2136. facets_set_timeframe_and_histogram_by_name(facets, "PRIORITY", fqs->after_ut, fqs->before_ut);
  2137. // ------------------------------------------------------------------------
  2138. // complete the request object
  2139. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_INFO, false);
  2140. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_SLICE, fqs->slice);
  2141. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DATA_ONLY, fqs->data_only);
  2142. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_PROGRESS, false);
  2143. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DELTA, fqs->delta);
  2144. buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_TAIL, fqs->tail);
  2145. buffer_json_member_add_string(wb, JOURNAL_PARAMETER_ID, progress_id);
  2146. buffer_json_member_add_string(wb, JOURNAL_PARAMETER_SOURCE, string2str(fqs->source));
  2147. buffer_json_member_add_uint64(wb, "source_type", fqs->source_type);
  2148. buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_AFTER, fqs->after_ut / USEC_PER_SEC);
  2149. buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_BEFORE, fqs->before_ut / USEC_PER_SEC);
  2150. buffer_json_member_add_uint64(wb, "if_modified_since", fqs->if_modified_since);
  2151. buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_ANCHOR, anchor);
  2152. buffer_json_member_add_string(wb, JOURNAL_PARAMETER_DIRECTION, fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward");
  2153. buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_LAST, fqs->entries);
  2154. buffer_json_member_add_string(wb, JOURNAL_PARAMETER_QUERY, fqs->query);
  2155. buffer_json_member_add_string(wb, JOURNAL_PARAMETER_HISTOGRAM, fqs->histogram);
  2156. buffer_json_object_close(wb); // request
  2157. buffer_json_journal_versions(wb);
  2158. // ------------------------------------------------------------------------
  2159. // run the request
  2160. int response;
  2161. if(info) {
  2162. facets_accepted_parameters_to_json_array(facets, wb, false);
  2163. buffer_json_member_add_array(wb, "required_params");
  2164. {
  2165. buffer_json_add_array_item_object(wb);
  2166. {
  2167. buffer_json_member_add_string(wb, "id", "source");
  2168. buffer_json_member_add_string(wb, "name", "source");
  2169. buffer_json_member_add_string(wb, "help", "Select the SystemD Journal source to query");
  2170. buffer_json_member_add_string(wb, "type", "select");
  2171. buffer_json_member_add_array(wb, "options");
  2172. {
  2173. available_journal_file_sources_to_json_array(wb);
  2174. }
  2175. buffer_json_array_close(wb); // options array
  2176. }
  2177. buffer_json_object_close(wb); // required params object
  2178. }
  2179. buffer_json_array_close(wb); // required_params array
  2180. facets_table_config(wb);
  2181. buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
  2182. buffer_json_member_add_string(wb, "type", "table");
  2183. buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
  2184. buffer_json_finalize(wb);
  2185. response = HTTP_RESP_OK;
  2186. goto output;
  2187. }
  2188. if(progress) {
  2189. function_systemd_journal_progress(wb, transaction, progress_id);
  2190. goto cleanup;
  2191. }
  2192. response = netdata_systemd_journal_query(wb, facets, fqs);
  2193. // ------------------------------------------------------------------------
  2194. // cleanup query params
  2195. string_freez(fqs->source);
  2196. fqs->source = NULL;
  2197. // ------------------------------------------------------------------------
  2198. // handle error response
  2199. if(response != HTTP_RESP_OK) {
  2200. netdata_mutex_lock(&stdout_mutex);
  2201. pluginsd_function_json_error_to_stdout(transaction, response, "failed");
  2202. netdata_mutex_unlock(&stdout_mutex);
  2203. goto cleanup;
  2204. }
  2205. output:
  2206. netdata_mutex_lock(&stdout_mutex);
  2207. pluginsd_function_result_to_stdout(transaction, response, "application/json", expires, wb);
  2208. netdata_mutex_unlock(&stdout_mutex);
  2209. cleanup:
  2210. facets_destroy(facets);
  2211. buffer_free(wb);
  2212. if(fqs_item) {
  2213. dictionary_del(function_query_status_dict, dictionary_acquired_item_name(fqs_item));
  2214. dictionary_acquired_item_release(function_query_status_dict, fqs_item);
  2215. dictionary_garbage_collect(function_query_status_dict);
  2216. }
  2217. }
  2218. // ----------------------------------------------------------------------------
  2219. int main(int argc __maybe_unused, char **argv __maybe_unused) {
  2220. stderror = stderr;
  2221. clocks_init();
  2222. program_name = "systemd-journal.plugin";
  2223. // disable syslog
  2224. error_log_syslog = 0;
  2225. // set errors flood protection to 100 logs per hour
  2226. error_log_errors_per_period = 100;
  2227. error_log_throttle_period = 3600;
  2228. log_set_global_severity_for_external_plugins();
  2229. netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX");
  2230. if(verify_netdata_host_prefix() == -1) exit(1);
  2231. // ------------------------------------------------------------------------
  2232. // setup the journal directories
  2233. unsigned d = 0;
  2234. journal_directories[d++].path = strdupz("/var/log/journal");
  2235. journal_directories[d++].path = strdupz("/run/log/journal");
  2236. if(*netdata_configured_host_prefix) {
  2237. char path[PATH_MAX];
  2238. snprintfz(path, sizeof(path), "%s/var/log/journal", netdata_configured_host_prefix);
  2239. journal_directories[d++].path = strdupz(path);
  2240. snprintfz(path, sizeof(path), "%s/run/log/journal", netdata_configured_host_prefix);
  2241. journal_directories[d++].path = strdupz(path);
  2242. }
  2243. // terminate the list
  2244. journal_directories[d].path = NULL;
  2245. // ------------------------------------------------------------------------
  2246. function_query_status_dict = dictionary_create_advanced(
  2247. DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
  2248. NULL, sizeof(FUNCTION_QUERY_STATUS));
  2249. // ------------------------------------------------------------------------
  2250. // initialize the used hashes files registry
  2251. used_hashes_registry = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
  2252. // ------------------------------------------------------------------------
  2253. // initialize the journal files registry
  2254. systemd_journal_session = (now_realtime_usec() / USEC_PER_SEC) * USEC_PER_SEC;
  2255. journal_files_registry = dictionary_create_advanced(
  2256. DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
  2257. NULL, sizeof(struct journal_file));
  2258. dictionary_register_insert_callback(journal_files_registry, files_registry_insert_cb, NULL);
  2259. dictionary_register_delete_callback(journal_files_registry, files_registry_delete_cb, NULL);
  2260. dictionary_register_conflict_callback(journal_files_registry, files_registry_conflict_cb, NULL);
  2261. boot_ids_to_first_ut = dictionary_create_advanced(
  2262. DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
  2263. NULL, sizeof(usec_t));
  2264. journal_files_registry_update();
  2265. // ------------------------------------------------------------------------
  2266. // debug
  2267. if(argc == 2 && strcmp(argv[1], "debug") == 0) {
  2268. bool cancelled = false;
  2269. char buf[] = "systemd-journal after:-16000000 before:0 last:1";
  2270. // char buf[] = "systemd-journal after:1695332964 before:1695937764 direction:backward last:100 slice:true source:all DHKucpqUoe1:PtVoyIuX.MU";
  2271. // char buf[] = "systemd-journal after:1694511062 before:1694514662 anchor:1694514122024403";
  2272. function_systemd_journal("123", buf, 600, &cancelled);
  2273. exit(1);
  2274. }
  2275. // ------------------------------------------------------------------------
  2276. // the event loop for functions
  2277. struct functions_evloop_globals *wg =
  2278. functions_evloop_init(SYSTEMD_JOURNAL_WORKER_THREADS, "SDJ", &stdout_mutex, &plugin_should_exit);
  2279. functions_evloop_add_function(wg, SYSTEMD_JOURNAL_FUNCTION_NAME, function_systemd_journal,
  2280. SYSTEMD_JOURNAL_DEFAULT_TIMEOUT);
  2281. // ------------------------------------------------------------------------
  2282. time_t started_t = now_monotonic_sec();
  2283. size_t iteration = 0;
  2284. usec_t step = 1000 * USEC_PER_MS;
  2285. bool tty = isatty(fileno(stderr)) == 1;
  2286. netdata_mutex_lock(&stdout_mutex);
  2287. fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"%s\" %d \"%s\"\n",
  2288. SYSTEMD_JOURNAL_FUNCTION_NAME, SYSTEMD_JOURNAL_DEFAULT_TIMEOUT, SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
  2289. heartbeat_t hb;
  2290. heartbeat_init(&hb);
  2291. while(!plugin_should_exit) {
  2292. iteration++;
  2293. netdata_mutex_unlock(&stdout_mutex);
  2294. heartbeat_next(&hb, step);
  2295. netdata_mutex_lock(&stdout_mutex);
  2296. if(!tty)
  2297. fprintf(stdout, "\n");
  2298. fflush(stdout);
  2299. time_t now = now_monotonic_sec();
  2300. if(now - started_t > 86400)
  2301. break;
  2302. }
  2303. exit(0);
  2304. }