systemd-cat-native.c 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "systemd-cat-native.h"
  3. #include "../required_dummies.h"
  4. #ifdef __FreeBSD__
  5. #include <sys/endian.h>
  6. #endif
  7. #ifdef __APPLE__
  8. #include <machine/endian.h>
  9. #endif
  10. static void log_message_to_stderr(BUFFER *msg) {
  11. CLEAN_BUFFER *tmp = buffer_create(0, NULL);
  12. for(size_t i = 0; i < msg->len ;i++) {
  13. if(isprint(msg->buffer[i]))
  14. buffer_putc(tmp, msg->buffer[i]);
  15. else {
  16. buffer_putc(tmp, '[');
  17. buffer_print_uint64_hex(tmp, msg->buffer[i]);
  18. buffer_putc(tmp, ']');
  19. }
  20. }
  21. fprintf(stderr, "SENDING: %s\n", buffer_tostring(tmp));
  22. }
  23. static inline buffered_reader_ret_t get_next_line(struct buffered_reader *reader, BUFFER *line, int timeout_ms) {
  24. while(true) {
  25. if(unlikely(!buffered_reader_next_line(reader, line))) {
  26. buffered_reader_ret_t ret = buffered_reader_read_timeout(reader, STDIN_FILENO, timeout_ms, false);
  27. if(unlikely(ret != BUFFERED_READER_READ_OK))
  28. return ret;
  29. continue;
  30. }
  31. else {
  32. // make sure the buffer is NULL terminated
  33. line->buffer[line->len] = '\0';
  34. // remove the trailing newlines
  35. while(line->len && line->buffer[line->len - 1] == '\n')
  36. line->buffer[--line->len] = '\0';
  37. return BUFFERED_READER_READ_OK;
  38. }
  39. }
  40. }
  41. static inline size_t copy_replacing_newlines(char *dst, size_t dst_len, const char *src, size_t src_len, const char *newline) {
  42. if (!dst || !src) return 0;
  43. const char *current_src = src;
  44. const char *src_end = src + src_len; // Pointer to the end of src
  45. char *current_dst = dst;
  46. size_t remaining_dst_len = dst_len;
  47. size_t newline_len = newline && *newline ? strlen(newline) : 0;
  48. size_t bytes_copied = 0; // To track the number of bytes copied
  49. while (remaining_dst_len > 1 && current_src < src_end) {
  50. if (newline_len > 0) {
  51. const char *found = strstr(current_src, newline);
  52. if (found && found < src_end) {
  53. size_t copy_len = found - current_src;
  54. if (copy_len >= remaining_dst_len) copy_len = remaining_dst_len - 1;
  55. memcpy(current_dst, current_src, copy_len);
  56. current_dst += copy_len;
  57. *current_dst++ = '\n';
  58. remaining_dst_len -= (copy_len + 1);
  59. bytes_copied += copy_len + 1; // +1 for the newline character
  60. current_src = found + newline_len;
  61. continue;
  62. }
  63. }
  64. // Copy the remaining part of src to dst
  65. size_t copy_len = src_end - current_src;
  66. if (copy_len >= remaining_dst_len) copy_len = remaining_dst_len - 1;
  67. memcpy(current_dst, current_src, copy_len);
  68. current_dst += copy_len;
  69. remaining_dst_len -= copy_len;
  70. bytes_copied += copy_len;
  71. break;
  72. }
  73. // Ensure the string is null-terminated
  74. *current_dst = '\0';
  75. return bytes_copied;
  76. }
  77. static inline void buffer_memcat_replacing_newlines(BUFFER *wb, const char *src, size_t src_len, const char *newline) {
  78. if(!src) return;
  79. const char *equal;
  80. if(!newline || !*newline || !strstr(src, newline) || !(equal = strchr(src, '='))) {
  81. buffer_memcat(wb, src, src_len);
  82. buffer_putc(wb, '\n');
  83. return;
  84. }
  85. size_t key_len = equal - src;
  86. buffer_memcat(wb, src, key_len);
  87. buffer_putc(wb, '\n');
  88. char *length_ptr = &wb->buffer[wb->len];
  89. uint64_t le_size = 0;
  90. buffer_memcat(wb, &le_size, sizeof(le_size));
  91. const char *value = ++equal;
  92. size_t value_len = src_len - key_len - 1;
  93. buffer_need_bytes(wb, value_len + 1);
  94. size_t size = copy_replacing_newlines(&wb->buffer[wb->len], value_len + 1, value, value_len, newline);
  95. wb->len += size;
  96. buffer_putc(wb, '\n');
  97. le_size = htole64(size);
  98. memcpy(length_ptr, &le_size, sizeof(le_size));
  99. }
  100. // ----------------------------------------------------------------------------
  101. // log to a systemd-journal-remote
  102. #ifdef HAVE_CURL
  103. #include <curl/curl.h>
  104. #ifndef HOST_NAME_MAX
  105. #define HOST_NAME_MAX 256
  106. #endif
  107. char global_hostname[HOST_NAME_MAX] = "";
  108. char global_boot_id[UUID_COMPACT_STR_LEN] = "";
  109. char global_machine_id[UUID_COMPACT_STR_LEN] = "";
  110. char global_stream_id[UUID_COMPACT_STR_LEN] = "";
  111. char global_namespace[1024] = "";
  112. char global_systemd_invocation_id[1024] = "";
  113. #define BOOT_ID_PATH "/proc/sys/kernel/random/boot_id"
  114. #define MACHINE_ID_PATH "/etc/machine-id"
  115. #define DEFAULT_PRIVATE_KEY "/etc/ssl/private/journal-upload.pem"
  116. #define DEFAULT_PUBLIC_KEY "/etc/ssl/certs/journal-upload.pem"
  117. #define DEFAULT_CA_CERT "/etc/ssl/ca/trusted.pem"
  118. struct upload_data {
  119. char *data;
  120. size_t length;
  121. };
  122. static size_t systemd_journal_remote_read_callback(void *ptr, size_t size, size_t nmemb, void *userp) {
  123. struct upload_data *upload = (struct upload_data *)userp;
  124. size_t buffer_size = size * nmemb;
  125. if (upload->length) {
  126. size_t copy_size = upload->length < buffer_size ? upload->length : buffer_size;
  127. memcpy(ptr, upload->data, copy_size);
  128. upload->data += copy_size;
  129. upload->length -= copy_size;
  130. return copy_size;
  131. }
  132. return 0;
  133. }
  134. CURL* initialize_connection_to_systemd_journal_remote(const char* url, const char* private_key, const char* public_key, const char* ca_cert, struct curl_slist **headers) {
  135. CURL *curl = curl_easy_init();
  136. if (!curl) {
  137. fprintf(stderr, "Failed to initialize curl\n");
  138. return NULL;
  139. }
  140. *headers = curl_slist_append(*headers, "Content-Type: application/vnd.fdo.journal");
  141. *headers = curl_slist_append(*headers, "Transfer-Encoding: chunked");
  142. curl_easy_setopt(curl, CURLOPT_HTTPHEADER, *headers);
  143. curl_easy_setopt(curl, CURLOPT_URL, url);
  144. curl_easy_setopt(curl, CURLOPT_POST, 1L);
  145. curl_easy_setopt(curl, CURLOPT_READFUNCTION, systemd_journal_remote_read_callback);
  146. if (strncmp(url, "https://", 8) == 0) {
  147. if (private_key) curl_easy_setopt(curl, CURLOPT_SSLKEY, private_key);
  148. if (public_key) curl_easy_setopt(curl, CURLOPT_SSLCERT, public_key);
  149. if (strcmp(ca_cert, "all") != 0) {
  150. curl_easy_setopt(curl, CURLOPT_CAINFO, ca_cert);
  151. } else {
  152. curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
  153. }
  154. }
  155. // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); // Remove for less verbose output
  156. return curl;
  157. }
  158. static void journal_remote_complete_event(BUFFER *msg, usec_t *monotonic_ut) {
  159. usec_t ut = now_monotonic_usec();
  160. if(monotonic_ut)
  161. *monotonic_ut = ut;
  162. buffer_sprintf(msg,
  163. ""
  164. "__REALTIME_TIMESTAMP=%llu\n"
  165. "__MONOTONIC_TIMESTAMP=%llu\n"
  166. "_MACHINE_ID=%s\n"
  167. "_BOOT_ID=%s\n"
  168. "_HOSTNAME=%s\n"
  169. "_TRANSPORT=stdout\n"
  170. "_LINE_BREAK=nul\n"
  171. "_STREAM_ID=%s\n"
  172. "_RUNTIME_SCOPE=system\n"
  173. "%s%s\n"
  174. , now_realtime_usec()
  175. , ut
  176. , global_machine_id
  177. , global_boot_id
  178. , global_hostname
  179. , global_stream_id
  180. , global_namespace
  181. , global_systemd_invocation_id
  182. );
  183. }
  184. static CURLcode journal_remote_send_buffer(CURL* curl, BUFFER *msg) {
  185. // log_message_to_stderr(msg);
  186. struct upload_data upload = {0};
  187. if (!curl || !buffer_strlen(msg))
  188. return CURLE_FAILED_INIT;
  189. upload.data = (char *) buffer_tostring(msg);
  190. upload.length = buffer_strlen(msg);
  191. curl_easy_setopt(curl, CURLOPT_READDATA, &upload);
  192. curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)upload.length);
  193. return curl_easy_perform(curl);
  194. }
  195. typedef enum {
  196. LOG_TO_JOURNAL_REMOTE_BAD_PARAMS = -1,
  197. LOG_TO_JOURNAL_REMOTE_CANNOT_INITIALIZE = -2,
  198. LOG_TO_JOURNAL_REMOTE_CANNOT_SEND = -3,
  199. LOG_TO_JOURNAL_REMOTE_CANNOT_READ = -4,
  200. } log_to_journal_remote_ret_t;
  201. static log_to_journal_remote_ret_t log_input_to_journal_remote(const char *url, const char *key, const char *cert, const char *trust, const char *newline, int timeout_ms) {
  202. if(!url || !*url) {
  203. fprintf(stderr, "No URL is given.\n");
  204. return LOG_TO_JOURNAL_REMOTE_BAD_PARAMS;
  205. }
  206. if(timeout_ms < 10)
  207. timeout_ms = 10;
  208. global_boot_id[0] = '\0';
  209. char buffer[1024];
  210. if(read_file(BOOT_ID_PATH, buffer, sizeof(buffer)) == 0) {
  211. uuid_t uuid;
  212. if(uuid_parse_flexi(buffer, uuid) == 0)
  213. uuid_unparse_lower_compact(uuid, global_boot_id);
  214. else
  215. fprintf(stderr, "WARNING: cannot parse the UUID found in '%s'.\n", BOOT_ID_PATH);
  216. }
  217. if(global_boot_id[0] == '\0') {
  218. fprintf(stderr, "WARNING: cannot read '%s'. Will generate a random _BOOT_ID.\n", BOOT_ID_PATH);
  219. uuid_t uuid;
  220. uuid_generate_random(uuid);
  221. uuid_unparse_lower_compact(uuid, global_boot_id);
  222. }
  223. if(read_file(MACHINE_ID_PATH, buffer, sizeof(buffer)) == 0) {
  224. uuid_t uuid;
  225. if(uuid_parse_flexi(buffer, uuid) == 0)
  226. uuid_unparse_lower_compact(uuid, global_machine_id);
  227. else
  228. fprintf(stderr, "WARNING: cannot parse the UUID found in '%s'.\n", MACHINE_ID_PATH);
  229. }
  230. if(global_machine_id[0] == '\0') {
  231. fprintf(stderr, "WARNING: cannot read '%s'. Will generate a random _MACHINE_ID.\n", MACHINE_ID_PATH);
  232. uuid_t uuid;
  233. uuid_generate_random(uuid);
  234. uuid_unparse_lower_compact(uuid, global_boot_id);
  235. }
  236. if(global_stream_id[0] == '\0') {
  237. uuid_t uuid;
  238. uuid_generate_random(uuid);
  239. uuid_unparse_lower_compact(uuid, global_stream_id);
  240. }
  241. if(global_hostname[0] == '\0') {
  242. if(gethostname(global_hostname, sizeof(global_hostname)) != 0) {
  243. fprintf(stderr, "WARNING: cannot get system's hostname. Will use internal default.\n");
  244. snprintfz(global_hostname, sizeof(global_hostname), "systemd-cat-native-unknown-hostname");
  245. }
  246. }
  247. if(global_systemd_invocation_id[0] == '\0' && getenv("INVOCATION_ID"))
  248. snprintfz(global_systemd_invocation_id, sizeof(global_systemd_invocation_id), "_SYSTEMD_INVOCATION_ID=%s\n", getenv("INVOCATION_ID"));
  249. if(!key)
  250. key = DEFAULT_PRIVATE_KEY;
  251. if(!cert)
  252. cert = DEFAULT_PUBLIC_KEY;
  253. if(!trust)
  254. trust = DEFAULT_CA_CERT;
  255. char full_url[4096];
  256. snprintfz(full_url, sizeof(full_url), "%s/upload", url);
  257. CURL *curl;
  258. CURLcode res = CURLE_OK;
  259. struct curl_slist *headers = NULL;
  260. curl_global_init(CURL_GLOBAL_ALL);
  261. curl = initialize_connection_to_systemd_journal_remote(full_url, key, cert, trust, &headers);
  262. if(!curl)
  263. return LOG_TO_JOURNAL_REMOTE_CANNOT_INITIALIZE;
  264. struct buffered_reader reader;
  265. buffered_reader_init(&reader);
  266. CLEAN_BUFFER *line = buffer_create(sizeof(reader.read_buffer), NULL);
  267. CLEAN_BUFFER *msg = buffer_create(sizeof(reader.read_buffer), NULL);
  268. size_t msg_full_events = 0;
  269. size_t msg_partial_fields = 0;
  270. usec_t msg_started_ut = 0;
  271. size_t failures = 0;
  272. size_t messages_logged = 0;
  273. log_to_journal_remote_ret_t ret = 0;
  274. while(true) {
  275. buffered_reader_ret_t rc = get_next_line(&reader, line, timeout_ms);
  276. if(rc == BUFFERED_READER_READ_POLL_TIMEOUT) {
  277. if(msg_full_events && !msg_partial_fields) {
  278. res = journal_remote_send_buffer(curl, msg);
  279. if(res != CURLE_OK) {
  280. fprintf(stderr, "journal_remote_send_buffer() failed: %s\n", curl_easy_strerror(res));
  281. failures++;
  282. ret = LOG_TO_JOURNAL_REMOTE_CANNOT_SEND;
  283. goto cleanup;
  284. }
  285. else
  286. messages_logged++;
  287. msg_full_events = 0;
  288. buffer_flush(msg);
  289. }
  290. }
  291. else if(rc == BUFFERED_READER_READ_OK) {
  292. if(!line->len) {
  293. // an empty line - we are done for this message
  294. if(msg_partial_fields) {
  295. msg_partial_fields = 0;
  296. usec_t ut;
  297. journal_remote_complete_event(msg, &ut);
  298. if(!msg_full_events)
  299. msg_started_ut = ut;
  300. msg_full_events++;
  301. if(ut - msg_started_ut >= USEC_PER_SEC / 2) {
  302. res = journal_remote_send_buffer(curl, msg);
  303. if(res != CURLE_OK) {
  304. fprintf(stderr, "journal_remote_send_buffer() failed: %s\n", curl_easy_strerror(res));
  305. failures++;
  306. ret = LOG_TO_JOURNAL_REMOTE_CANNOT_SEND;
  307. goto cleanup;
  308. }
  309. else
  310. messages_logged++;
  311. msg_full_events = 0;
  312. buffer_flush(msg);
  313. }
  314. }
  315. }
  316. else {
  317. buffer_memcat_replacing_newlines(msg, line->buffer, line->len, newline);
  318. msg_partial_fields++;
  319. }
  320. buffer_flush(line);
  321. }
  322. else {
  323. fprintf(stderr, "cannot read input data, failed with code %d\n", rc);
  324. ret = LOG_TO_JOURNAL_REMOTE_CANNOT_READ;
  325. break;
  326. }
  327. }
  328. if (msg_full_events || msg_partial_fields) {
  329. if(msg_partial_fields) {
  330. msg_partial_fields = 0;
  331. msg_full_events++;
  332. journal_remote_complete_event(msg, NULL);
  333. }
  334. if(msg_full_events) {
  335. res = journal_remote_send_buffer(curl, msg);
  336. if(res != CURLE_OK) {
  337. fprintf(stderr, "journal_remote_send_buffer() failed: %s\n", curl_easy_strerror(res));
  338. failures++;
  339. }
  340. else
  341. messages_logged++;
  342. msg_full_events = 0;
  343. buffer_flush(msg);
  344. }
  345. }
  346. cleanup:
  347. curl_easy_cleanup(curl);
  348. curl_slist_free_all(headers);
  349. curl_global_cleanup();
  350. return ret;
  351. }
  352. #endif
  353. static int help(void) {
  354. fprintf(stderr,
  355. "\n"
  356. "Netdata systemd-cat-native " PACKAGE_VERSION "\n"
  357. "\n"
  358. "This program reads from its standard input, lines in the format:\n"
  359. "\n"
  360. "KEY1=VALUE1\\n\n"
  361. "KEY2=VALUE2\\n\n"
  362. "KEYN=VALUEN\\n\n"
  363. "\\n\n"
  364. "\n"
  365. "and sends them to systemd-journal.\n"
  366. "\n"
  367. " - Binary journal fields are not accepted at its input\n"
  368. " - Binary journal fields can be generated after newline processing\n"
  369. " - Messages have to be separated by an empty line\n"
  370. " - Keys starting with underscore are not accepted (by journald)\n"
  371. " - Other rules imposed by systemd-journald are imposed (by journald)\n"
  372. "\n"
  373. "Usage:\n"
  374. "\n"
  375. " %s\n"
  376. " [--newline=STRING]\n"
  377. " [--log-as-netdata|-N]\n"
  378. " [--namespace=NAMESPACE] [--socket=PATH]\n"
  379. #ifdef HAVE_CURL
  380. " [--url=URL [--key=FILENAME] [--cert=FILENAME] [--trust=FILENAME|all]]\n"
  381. #endif
  382. "\n"
  383. "The program has the following modes of logging:\n"
  384. "\n"
  385. " * Log to a local systemd-journald or stderr\n"
  386. "\n"
  387. " This is the default mode. If systemd-journald is available, logs will be\n"
  388. " sent to systemd, otherwise logs will be printed on stderr, using logfmt\n"
  389. " formatting. Options --socket and --namespace are available to configure\n"
  390. " the journal destination:\n"
  391. "\n"
  392. " --socket=PATH\n"
  393. " The path of a systemd-journald UNIX socket.\n"
  394. " The program will use the default systemd-journald socket when this\n"
  395. " option is not used.\n"
  396. "\n"
  397. " --namespace=NAMESPACE\n"
  398. " The name of a configured and running systemd-journald namespace.\n"
  399. " The program will produce the socket path based on its internal\n"
  400. " defaults, to send the messages to the systemd journal namespace.\n"
  401. "\n"
  402. " * Log as Netdata, enabled with --log-as-netdata or -N\n"
  403. "\n"
  404. " In this mode the program uses environment variables set by Netdata for\n"
  405. " the log destination. Only log fields defined by Netdata are accepted.\n"
  406. " If the environment variables expected by Netdata are not found, it\n"
  407. " falls back to stderr logging in logfmt format.\n"
  408. #ifdef HAVE_CURL
  409. "\n"
  410. " * Log to a systemd-journal-remote TCP socket, enabled with --url=URL\n"
  411. "\n"
  412. " In this mode, the program will directly sent logs to a remote systemd\n"
  413. " journal (systemd-journal-remote expected at the destination)\n"
  414. " This mode is available even when the local system does not support\n"
  415. " systemd, or even it is not Linux, allowing a remote Linux systemd\n"
  416. " journald to become the logs database of the local system.\n"
  417. "\n"
  418. " Unfortunately systemd-journal-remote does not accept compressed\n"
  419. " data over the network, so the stream will be uncompressed.\n"
  420. "\n"
  421. " --url=URL\n"
  422. " The destination systemd-journal-remote address and port, similarly\n"
  423. " to what /etc/systemd/journal-upload.conf accepts.\n"
  424. " Usually it is in the form: https://ip.address:19532\n"
  425. " Both http and https URLs are accepted. When using https, the\n"
  426. " following additional options are accepted:\n"
  427. "\n"
  428. " --key=FILENAME\n"
  429. " The filename of the private key of the server.\n"
  430. " The default is: " DEFAULT_PRIVATE_KEY "\n"
  431. "\n"
  432. " --cert=FILENAME\n"
  433. " The filename of the public key of the server.\n"
  434. " The default is: " DEFAULT_PUBLIC_KEY "\n"
  435. "\n"
  436. " --trust=FILENAME | all\n"
  437. " The filename of the trusted CA public key.\n"
  438. " The default is: " DEFAULT_CA_CERT "\n"
  439. " The keyword 'all' can be used to trust all CAs.\n"
  440. "\n"
  441. " --namespace=NAMESPACE\n"
  442. " Set the namespace of the messages sent.\n"
  443. "\n"
  444. " --keep-trying\n"
  445. " Keep trying to send the message, if the remote journal is not there.\n"
  446. #endif
  447. "\n"
  448. " NEWLINES PROCESSING\n"
  449. " systemd-journal logs entries may have newlines in them. However the\n"
  450. " Journal Export Format uses binary formatted data to achieve this,\n"
  451. " making it hard for text processing.\n"
  452. "\n"
  453. " To overcome this limitation, this program allows single-line text\n"
  454. " formatted values at its input, to be binary formatted multi-line Journal\n"
  455. " Export Format at its output.\n"
  456. "\n"
  457. " To achieve that it allows replacing a given string to a newline.\n"
  458. " The parameter --newline=STRING allows setting the string to be replaced\n"
  459. " with newlines.\n"
  460. "\n"
  461. " For example by setting --newline='--NEWLINE--', the program will replace\n"
  462. " all occurrences of --NEWLINE-- with the newline character, within each\n"
  463. " VALUE of the KEY=VALUE lines. Once this this done, the program will\n"
  464. " switch the field to the binary Journal Export Format before sending the\n"
  465. " log event to systemd-journal.\n"
  466. "\n",
  467. program_name);
  468. return 1;
  469. }
  470. // ----------------------------------------------------------------------------
  471. // log as Netdata
  472. static void lgs_reset(struct log_stack_entry *lgs) {
  473. for(size_t i = 1; i < _NDF_MAX ;i++) {
  474. if(lgs[i].type == NDFT_TXT && lgs[i].set && lgs[i].txt)
  475. freez((void *)lgs[i].txt);
  476. lgs[i] = ND_LOG_FIELD_TXT(i, NULL);
  477. }
  478. lgs[0] = ND_LOG_FIELD_TXT(NDF_MESSAGE, NULL);
  479. lgs[_NDF_MAX] = ND_LOG_FIELD_END();
  480. }
  481. static const char *strdupz_replacing_newlines(const char *src, const char *newline) {
  482. if(!src) src = "";
  483. size_t src_len = strlen(src);
  484. char *buffer = mallocz(src_len + 1);
  485. copy_replacing_newlines(buffer, src_len + 1, src, src_len, newline);
  486. return buffer;
  487. }
  488. static int log_input_as_netdata(const char *newline, int timeout_ms) {
  489. struct buffered_reader reader;
  490. buffered_reader_init(&reader);
  491. CLEAN_BUFFER *line = buffer_create(sizeof(reader.read_buffer), NULL);
  492. ND_LOG_STACK lgs[_NDF_MAX + 1] = { 0 };
  493. ND_LOG_STACK_PUSH(lgs);
  494. lgs_reset(lgs);
  495. size_t fields_added = 0;
  496. size_t messages_logged = 0;
  497. ND_LOG_FIELD_PRIORITY priority = NDLP_INFO;
  498. while(get_next_line(&reader, line, timeout_ms) == BUFFERED_READER_READ_OK) {
  499. if(!line->len) {
  500. // an empty line - we are done for this message
  501. nd_log(NDLS_HEALTH, priority,
  502. "added %d fields", // if the user supplied a MESSAGE, this will be ignored
  503. fields_added);
  504. lgs_reset(lgs);
  505. fields_added = 0;
  506. messages_logged++;
  507. }
  508. else {
  509. char *equal = strchr(line->buffer, '=');
  510. if(equal) {
  511. const char *field = line->buffer;
  512. size_t field_len = equal - line->buffer;
  513. ND_LOG_FIELD_ID id = nd_log_field_id_by_name(field, field_len);
  514. if(id != NDF_STOP) {
  515. const char *value = ++equal;
  516. if(lgs[id].txt)
  517. freez((void *) lgs[id].txt);
  518. lgs[id].txt = strdupz_replacing_newlines(value, newline);
  519. lgs[id].set = true;
  520. fields_added++;
  521. if(id == NDF_PRIORITY)
  522. priority = nd_log_priority2id(value);
  523. }
  524. else {
  525. struct log_stack_entry backup = lgs[NDF_MESSAGE];
  526. lgs[NDF_MESSAGE] = ND_LOG_FIELD_TXT(NDF_MESSAGE, NULL);
  527. nd_log(NDLS_COLLECTORS, NDLP_ERR,
  528. "Field '%.*s' is not a Netdata field. Ignoring it.",
  529. field_len, field);
  530. lgs[NDF_MESSAGE] = backup;
  531. }
  532. }
  533. else {
  534. struct log_stack_entry backup = lgs[NDF_MESSAGE];
  535. lgs[NDF_MESSAGE] = ND_LOG_FIELD_TXT(NDF_MESSAGE, NULL);
  536. nd_log(NDLS_COLLECTORS, NDLP_ERR,
  537. "Line does not contain an = sign; ignoring it: %s",
  538. line->buffer);
  539. lgs[NDF_MESSAGE] = backup;
  540. }
  541. }
  542. buffer_flush(line);
  543. }
  544. if(fields_added) {
  545. nd_log(NDLS_HEALTH, priority, "added %d fields", fields_added);
  546. messages_logged++;
  547. }
  548. return messages_logged ? 0 : 1;
  549. }
  550. // ----------------------------------------------------------------------------
  551. // log to a local systemd-journald
  552. static bool journal_local_send_buffer(int fd, BUFFER *msg) {
  553. // log_message_to_stderr(msg);
  554. bool ret = journal_direct_send(fd, msg->buffer, msg->len);
  555. if (!ret)
  556. fprintf(stderr, "Cannot send message to systemd journal.\n");
  557. return ret;
  558. }
  559. static int log_input_to_journal(const char *socket, const char *namespace, const char *newline, int timeout_ms) {
  560. char path[FILENAME_MAX + 1];
  561. int fd = -1;
  562. if(socket)
  563. snprintfz(path, sizeof(path), "%s", socket);
  564. else
  565. journal_construct_path(path, sizeof(path), NULL, namespace);
  566. fd = journal_direct_fd(path);
  567. if (fd == -1) {
  568. fprintf(stderr, "Cannot open '%s' as a UNIX socket (errno = %d)\n",
  569. path, errno);
  570. return 1;
  571. }
  572. struct buffered_reader reader;
  573. buffered_reader_init(&reader);
  574. CLEAN_BUFFER *line = buffer_create(sizeof(reader.read_buffer), NULL);
  575. CLEAN_BUFFER *msg = buffer_create(sizeof(reader.read_buffer), NULL);
  576. size_t messages_logged = 0;
  577. size_t failed_messages = 0;
  578. while(get_next_line(&reader, line, timeout_ms) == BUFFERED_READER_READ_OK) {
  579. if (!line->len) {
  580. // an empty line - we are done for this message
  581. if (msg->len) {
  582. if(journal_local_send_buffer(fd, msg))
  583. messages_logged++;
  584. else {
  585. failed_messages++;
  586. goto cleanup;
  587. }
  588. }
  589. buffer_flush(msg);
  590. }
  591. else
  592. buffer_memcat_replacing_newlines(msg, line->buffer, line->len, newline);
  593. buffer_flush(line);
  594. }
  595. if (msg && msg->len) {
  596. if(journal_local_send_buffer(fd, msg))
  597. messages_logged++;
  598. else
  599. failed_messages++;
  600. }
  601. cleanup:
  602. return !failed_messages && messages_logged ? 0 : 1;
  603. }
  604. int main(int argc, char *argv[]) {
  605. clocks_init();
  606. nd_log_initialize_for_external_plugins(argv[0]);
  607. int timeout_ms = -1; // wait forever
  608. bool log_as_netdata = false;
  609. const char *newline = NULL;
  610. const char *namespace = NULL;
  611. const char *socket = getenv("NETDATA_SYSTEMD_JOURNAL_PATH");
  612. #ifdef HAVE_CURL
  613. const char *url = NULL;
  614. const char *key = NULL;
  615. const char *cert = NULL;
  616. const char *trust = NULL;
  617. bool keep_trying = false;
  618. #endif
  619. for(int i = 1; i < argc ;i++) {
  620. const char *k = argv[i];
  621. if(strcmp(k, "--help") == 0 || strcmp(k, "-h") == 0)
  622. return help();
  623. else if(strcmp(k, "--log-as-netdata") == 0 || strcmp(k, "-N") == 0)
  624. log_as_netdata = true;
  625. else if(strncmp(k, "--namespace=", 12) == 0)
  626. namespace = &k[12];
  627. else if(strncmp(k, "--socket=", 9) == 0)
  628. socket = &k[9];
  629. else if(strncmp(k, "--newline=", 10) == 0)
  630. newline = &k[10];
  631. #ifdef HAVE_CURL
  632. else if (strncmp(k, "--url=", 6) == 0)
  633. url = &k[6];
  634. else if (strncmp(k, "--key=", 6) == 0)
  635. key = &k[6];
  636. else if (strncmp(k, "--cert=", 7) == 0)
  637. cert = &k[7];
  638. else if (strncmp(k, "--trust=", 8) == 0)
  639. trust = &k[8];
  640. else if (strcmp(k, "--keep-trying") == 0)
  641. keep_trying = true;
  642. #endif
  643. else {
  644. fprintf(stderr, "Unknown parameter '%s'\n", k);
  645. return 1;
  646. }
  647. }
  648. #ifdef HAVE_CURL
  649. if(log_as_netdata && url) {
  650. fprintf(stderr, "Cannot log to a systemd-journal-remote URL as Netdata. "
  651. "Please either give --url or --log-as-netdata, not both.\n");
  652. return 1;
  653. }
  654. if(socket && url) {
  655. fprintf(stderr, "Cannot log to a systemd-journal-remote URL using a UNIX socket. "
  656. "Please either give --url or --socket, not both.\n");
  657. return 1;
  658. }
  659. #endif
  660. if(log_as_netdata && namespace) {
  661. fprintf(stderr, "Cannot log as netdata using a namespace. "
  662. "Please either give --log-as-netdata or --namespace, not both.\n");
  663. return 1;
  664. }
  665. if(log_as_netdata)
  666. return log_input_as_netdata(newline, timeout_ms);
  667. #ifdef HAVE_CURL
  668. if(url) {
  669. if(url && namespace && *namespace)
  670. snprintfz(global_namespace, sizeof(global_namespace), "_NAMESPACE=%s\n", namespace);
  671. log_to_journal_remote_ret_t rc;
  672. do {
  673. rc = log_input_to_journal_remote(url, key, cert, trust, newline, timeout_ms);
  674. } while(keep_trying && rc == LOG_TO_JOURNAL_REMOTE_CANNOT_SEND);
  675. }
  676. #endif
  677. return log_input_to_journal(socket, namespace, newline, timeout_ms);
  678. }