systemd-journal-watcher.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "systemd-internals.h"
  3. #include <sys/inotify.h>
  4. #define EVENT_SIZE (sizeof(struct inotify_event))
  5. #define INITIAL_WATCHES 256
  6. #define WATCH_FOR (IN_CREATE | IN_MODIFY | IN_DELETE | IN_DELETE_SELF | IN_MOVED_FROM | IN_MOVED_TO | IN_UNMOUNT)
  7. typedef struct watch_entry {
  8. int slot;
  9. int wd; // Watch descriptor
  10. char *path; // Dynamically allocated path
  11. struct watch_entry *next; // for the free list
  12. } WatchEntry;
  13. typedef struct {
  14. WatchEntry *watchList;
  15. WatchEntry *freeList;
  16. int watchCount;
  17. int watchListSize;
  18. size_t errors;
  19. DICTIONARY *pending;
  20. } Watcher;
  21. static WatchEntry *get_slot(Watcher *watcher) {
  22. WatchEntry *t;
  23. if (watcher->freeList != NULL) {
  24. t = watcher->freeList;
  25. watcher->freeList = t->next;
  26. t->next = NULL;
  27. return t;
  28. }
  29. if (watcher->watchCount == watcher->watchListSize) {
  30. watcher->watchListSize *= 2;
  31. watcher->watchList = reallocz(watcher->watchList, watcher->watchListSize * sizeof(WatchEntry));
  32. }
  33. watcher->watchList[watcher->watchCount] = (WatchEntry){
  34. .slot = watcher->watchCount,
  35. .wd = -1,
  36. .path = NULL,
  37. .next = NULL,
  38. };
  39. t = &watcher->watchList[watcher->watchCount];
  40. watcher->watchCount++;
  41. return t;
  42. }
  43. static void free_slot(Watcher *watcher, WatchEntry *t) {
  44. t->wd = -1;
  45. freez(t->path);
  46. t->path = NULL;
  47. // link it to the free list
  48. t->next = watcher->freeList;
  49. watcher->freeList = t;
  50. }
  51. static int add_watch(Watcher *watcher, int inotifyFd, const char *path) {
  52. WatchEntry *t = get_slot(watcher);
  53. t->wd = inotify_add_watch(inotifyFd, path, WATCH_FOR);
  54. if (t->wd == -1) {
  55. nd_log(NDLS_COLLECTORS, NDLP_ERR,
  56. "JOURNAL WATCHER: cannot watch directory: '%s'",
  57. path);
  58. free_slot(watcher, t);
  59. struct stat info;
  60. if(stat(path, &info) == 0 && S_ISDIR(info.st_mode)) {
  61. // the directory exists, but we failed to add the watch
  62. // increase errors
  63. watcher->errors++;
  64. }
  65. }
  66. else {
  67. t->path = strdupz(path);
  68. nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
  69. "JOURNAL WATCHER: watching directory: '%s'",
  70. path);
  71. }
  72. return t->wd;
  73. }
  74. static void remove_watch(Watcher *watcher, int inotifyFd, int wd) {
  75. int i;
  76. for (i = 0; i < watcher->watchCount; ++i) {
  77. if (watcher->watchList[i].wd == wd) {
  78. nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
  79. "JOURNAL WATCHER: removing watch from directory: '%s'",
  80. watcher->watchList[i].path);
  81. inotify_rm_watch(inotifyFd, watcher->watchList[i].wd);
  82. free_slot(watcher, &watcher->watchList[i]);
  83. return;
  84. }
  85. }
  86. nd_log(NDLS_COLLECTORS, NDLP_WARNING,
  87. "JOURNAL WATCHER: cannot find directory watch %d to remove.",
  88. wd);
  89. }
  90. static void free_watches(Watcher *watcher, int inotifyFd) {
  91. for (int i = 0; i < watcher->watchCount; ++i) {
  92. if (watcher->watchList[i].wd != -1) {
  93. inotify_rm_watch(inotifyFd, watcher->watchList[i].wd);
  94. free_slot(watcher, &watcher->watchList[i]);
  95. }
  96. }
  97. freez(watcher->watchList);
  98. watcher->watchList = NULL;
  99. dictionary_destroy(watcher->pending);
  100. watcher->pending = NULL;
  101. }
  102. static char* get_path_from_wd(Watcher *watcher, int wd) {
  103. for (int i = 0; i < watcher->watchCount; ++i) {
  104. if (watcher->watchList[i].wd == wd)
  105. return watcher->watchList[i].path;
  106. }
  107. return NULL;
  108. }
  109. static bool is_directory_watched(Watcher *watcher, const char *path) {
  110. for (int i = 0; i < watcher->watchCount; ++i) {
  111. if (watcher->watchList[i].wd != -1 && strcmp(watcher->watchList[i].path, path) == 0) {
  112. return true;
  113. }
  114. }
  115. return false;
  116. }
  117. static void watch_directory_and_subdirectories(Watcher *watcher, int inotifyFd, const char *basePath) {
  118. DICTIONARY *dirs = dictionary_create(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE);
  119. journal_directory_scan_recursively(NULL, dirs, basePath, 0);
  120. void *x;
  121. dfe_start_read(dirs, x) {
  122. const char *dirname = x_dfe.name;
  123. // Check if this directory is already being watched
  124. if (!is_directory_watched(watcher, dirname)) {
  125. add_watch(watcher, inotifyFd, dirname);
  126. }
  127. }
  128. dfe_done(x);
  129. dictionary_destroy(dirs);
  130. }
  131. static bool is_subpath(const char *path, const char *subpath) {
  132. // Use strncmp to compare the paths
  133. if (strncmp(path, subpath, strlen(path)) == 0) {
  134. // Ensure that the next character is a '/' or '\0'
  135. char next_char = subpath[strlen(path)];
  136. return next_char == '/' || next_char == '\0';
  137. }
  138. return false;
  139. }
  140. void remove_directory_watch(Watcher *watcher, int inotifyFd, const char *dirPath) {
  141. for (int i = 0; i < watcher->watchCount; ++i) {
  142. WatchEntry *t = &watcher->watchList[i];
  143. if (t->wd != -1 && is_subpath(t->path, dirPath)) {
  144. inotify_rm_watch(inotifyFd, t->wd);
  145. free_slot(watcher, t);
  146. }
  147. }
  148. struct journal_file *jf;
  149. dfe_start_write(journal_files_registry, jf) {
  150. if(is_subpath(jf->filename, dirPath))
  151. dictionary_del(journal_files_registry, jf->filename);
  152. }
  153. dfe_done(jf);
  154. dictionary_garbage_collect(journal_files_registry);
  155. }
  156. void process_event(Watcher *watcher, int inotifyFd, struct inotify_event *event) {
  157. if(!event->len) {
  158. nd_log(NDLS_COLLECTORS, NDLP_NOTICE
  159. , "JOURNAL WATCHER: received event with mask %u and len %u (this is zero) for path: '%s' - ignoring it."
  160. , event->mask, event->len, event->name);
  161. return;
  162. }
  163. char *dirPath = get_path_from_wd(watcher, event->wd);
  164. if(!dirPath) {
  165. nd_log(NDLS_COLLECTORS, NDLP_NOTICE,
  166. "JOURNAL WATCHER: received event with mask %u and len %u for path: '%s' - "
  167. "but we can't find its watch descriptor - ignoring it."
  168. , event->mask, event->len, event->name);
  169. return;
  170. }
  171. if(event->mask & IN_DELETE_SELF) {
  172. remove_watch(watcher, inotifyFd, event->wd);
  173. return;
  174. }
  175. static __thread char fullPath[PATH_MAX];
  176. snprintfz(fullPath, sizeof(fullPath), "%s/%s", dirPath, event->name);
  177. // fullPath contains the full path to the file
  178. size_t len = strlen(event->name);
  179. if(event->mask & IN_ISDIR) {
  180. if (event->mask & (IN_DELETE | IN_MOVED_FROM)) {
  181. // A directory is deleted or moved out
  182. nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
  183. "JOURNAL WATCHER: Directory deleted or moved out: '%s'",
  184. fullPath);
  185. // Remove the watch - implement this function based on how you manage your watches
  186. remove_directory_watch(watcher, inotifyFd, fullPath);
  187. }
  188. else if (event->mask & (IN_CREATE | IN_MOVED_TO)) {
  189. // A new directory is created or moved in
  190. nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
  191. "JOURNAL WATCHER: New directory created or moved in: '%s'",
  192. fullPath);
  193. // Start watching the new directory - recursive watch
  194. watch_directory_and_subdirectories(watcher, inotifyFd, fullPath);
  195. }
  196. else
  197. nd_log(NDLS_COLLECTORS, NDLP_WARNING,
  198. "JOURNAL WATCHER: Received unhandled event with mask %u for directory '%s'",
  199. event->mask, fullPath);
  200. }
  201. else if(len > sizeof(".journal") - 1 && strcmp(&event->name[len - (sizeof(".journal") - 1)], ".journal") == 0) {
  202. // It is a file that ends in .journal
  203. // add it to our pending list
  204. dictionary_set(watcher->pending, fullPath, NULL, 0);
  205. }
  206. else
  207. nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
  208. "JOURNAL WATCHER: ignoring event with mask %u for file '%s'",
  209. event->mask, fullPath);
  210. }
  211. static void process_pending(Watcher *watcher) {
  212. void *x;
  213. dfe_start_write(watcher->pending, x) {
  214. struct stat info;
  215. const char *fullPath = x_dfe.name;
  216. if(stat(fullPath, &info) != 0) {
  217. nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
  218. "JOURNAL WATCHER: file '%s' no longer exists, removing it from the registry",
  219. fullPath);
  220. dictionary_del(journal_files_registry, fullPath);
  221. }
  222. else if(S_ISREG(info.st_mode)) {
  223. nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
  224. "JOURNAL WATCHER: file '%s' has been added/updated, updating the registry",
  225. fullPath);
  226. struct journal_file t = {
  227. .file_last_modified_ut = info.st_mtim.tv_sec * USEC_PER_SEC +
  228. info.st_mtim.tv_nsec / NSEC_PER_USEC,
  229. .last_scan_monotonic_ut = now_monotonic_usec(),
  230. .size = info.st_size,
  231. .max_journal_vs_realtime_delta_ut = JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT,
  232. };
  233. struct journal_file *jf = dictionary_set(journal_files_registry, fullPath, &t, sizeof(t));
  234. journal_file_update_header(jf->filename, jf);
  235. }
  236. dictionary_del(watcher->pending, fullPath);
  237. }
  238. dfe_done(x);
  239. dictionary_garbage_collect(watcher->pending);
  240. }
  241. size_t journal_watcher_wanted_session_id = 0;
  242. void journal_watcher_restart(void) {
  243. __atomic_add_fetch(&journal_watcher_wanted_session_id, 1, __ATOMIC_RELAXED);
  244. }
  245. void *journal_watcher_main(void *arg __maybe_unused) {
  246. while(1) {
  247. size_t journal_watcher_session_id = journal_watcher_wanted_session_id;
  248. Watcher watcher = {
  249. .watchList = mallocz(INITIAL_WATCHES * sizeof(WatchEntry)),
  250. .freeList = NULL,
  251. .watchCount = 0,
  252. .watchListSize = INITIAL_WATCHES,
  253. .pending = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE|DICT_OPTION_SINGLE_THREADED),
  254. .errors = 0,
  255. };
  256. int inotifyFd = inotify_init();
  257. if (inotifyFd < 0) {
  258. nd_log(NDLS_COLLECTORS, NDLP_ERR, "inotify_init() failed.");
  259. free_watches(&watcher, inotifyFd);
  260. return NULL;
  261. }
  262. for (unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES; i++) {
  263. if (!journal_directories[i].path) break;
  264. watch_directory_and_subdirectories(&watcher, inotifyFd, string2str(journal_directories[i].path));
  265. }
  266. usec_t last_headers_update_ut = now_monotonic_usec();
  267. struct buffered_reader reader;
  268. while (journal_watcher_session_id == __atomic_load_n(&journal_watcher_wanted_session_id, __ATOMIC_RELAXED)) {
  269. buffered_reader_ret_t rc = buffered_reader_read_timeout(
  270. &reader, inotifyFd, SYSTEMD_JOURNAL_EXECUTE_WATCHER_PENDING_EVERY_MS, false);
  271. if (rc != BUFFERED_READER_READ_OK && rc != BUFFERED_READER_READ_POLL_TIMEOUT) {
  272. nd_log(NDLS_COLLECTORS, NDLP_CRIT,
  273. "JOURNAL WATCHER: cannot read inotify events, buffered_reader_read_timeout() returned %d - "
  274. "restarting the watcher.",
  275. rc);
  276. break;
  277. }
  278. if(rc == BUFFERED_READER_READ_OK) {
  279. bool unmount_event = false;
  280. ssize_t i = 0;
  281. while (i < reader.read_len) {
  282. struct inotify_event *event = (struct inotify_event *) &reader.read_buffer[i];
  283. if(event->mask & IN_UNMOUNT) {
  284. unmount_event = true;
  285. break;
  286. }
  287. process_event(&watcher, inotifyFd, event);
  288. i += (ssize_t)EVENT_SIZE + event->len;
  289. }
  290. reader.read_buffer[0] = '\0';
  291. reader.read_len = 0;
  292. reader.pos = 0;
  293. if(unmount_event)
  294. break;
  295. }
  296. usec_t ut = now_monotonic_usec();
  297. if (dictionary_entries(watcher.pending) && (rc == BUFFERED_READER_READ_POLL_TIMEOUT ||
  298. last_headers_update_ut + (SYSTEMD_JOURNAL_EXECUTE_WATCHER_PENDING_EVERY_MS * USEC_PER_MS) <= ut)) {
  299. process_pending(&watcher);
  300. last_headers_update_ut = ut;
  301. }
  302. if(watcher.errors) {
  303. nd_log(NDLS_COLLECTORS, NDLP_NOTICE,
  304. "JOURNAL WATCHER: there were errors in setting up inotify watches - restarting the watcher.");
  305. }
  306. }
  307. close(inotifyFd);
  308. free_watches(&watcher, inotifyFd);
  309. // this will scan the directories and cleanup the registry
  310. journal_files_registry_update();
  311. sleep_usec(2 * USEC_PER_SEC);
  312. }
  313. return NULL;
  314. }