Browse Source

log2journal moved to collectors (#16481)

* log2journal moved to collectors

* split log2journal into multiple files

* update the path xxh headers

* json support for log2journal

* logfmt support

* fix warning

* fix logfmt prefix

* added support for UTF-8 escape sequences in json values
Costa Tsaousis 1 year ago
parent
commit
a64c8cdb43

+ 1 - 0
.gitignore

@@ -43,6 +43,7 @@ netdata
 netdatacli
 systemd-cat-native
 log2journal
+!log2journal/
 !netdata/
 upload/
 artifacts/

+ 7 - 1
Makefile.am

@@ -347,7 +347,13 @@ SYSTEMD_CAT_NATIVE_FILES = \
     $(NULL)
 
 LOG2JOURNAL_FILES = \
-    libnetdata/log/log2journal.c \
+    collectors/log2journal/log2journal.h \
+    collectors/log2journal/log2journal.c \
+    collectors/log2journal/log2journal-help.c \
+    collectors/log2journal/log2journal-yaml.c \
+    collectors/log2journal/log2journal-json.c \
+    collectors/log2journal/log2journal-logfmt.c \
+    collectors/log2journal/log2journal-params.c \
     $(NULL)
 
 

+ 1 - 0
collectors/Makefile.am

@@ -15,6 +15,7 @@ SUBDIRS = \
     freebsd.plugin \
     freeipmi.plugin \
     idlejitter.plugin \
+    log2journal \
     macos.plugin \
     nfacct.plugin \
     xenstat.plugin \

+ 12 - 0
collectors/log2journal/Makefile.am

@@ -0,0 +1,12 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+    README.md \
+    $(NULL)
+
+dist_libconfig_DATA = \
+    log2journal.d/nginx-combined.yaml \
+    $(NULL)

+ 0 - 0
libnetdata/log/log2journal.md → collectors/log2journal/README.md


+ 221 - 0
collectors/log2journal/log2journal-help.c

@@ -0,0 +1,221 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "log2journal.h"
+
+static void config_dir_print_available(void) {
+    const char *path = LOG2JOURNAL_CONFIG_PATH;
+    DIR *dir;
+    struct dirent *entry;
+
+    dir = opendir(path);
+
+    if (dir == NULL) {
+        log2stderr(" >>> Cannot open directory '%s'", path);
+        return;
+    }
+
+    size_t column_width = 80;
+    size_t current_columns = 0;
+
+    while ((entry = readdir(dir))) {
+        if (entry->d_type == DT_REG) { // Check if it's a regular file
+            const char *file_name = entry->d_name;
+            size_t len = strlen(file_name);
+            if (len >= 5 && strcmp(file_name + len - 5, ".yaml") == 0) {
+                // Remove the ".yaml" extension
+                len -= 5;
+                if (current_columns + len + 1 > column_width) {
+                    // Start a new line if the current line is full
+                    printf("\n       ");
+                    current_columns = 0;
+                }
+                printf("%.*s ", (int)len, file_name); // Print the filename without extension
+                current_columns += len + 1; // Add filename length and a space
+            }
+        }
+    }
+
+    closedir(dir);
+    printf("\n"); // Add a newline at the end
+}
+
+void log2journal_command_line_help(const char *name) {
+    printf("\n");
+    printf("Netdata log2journal " PACKAGE_VERSION "\n");
+    printf("\n");
+    printf("Convert logs to systemd Journal Export Format.\n");
+    printf("\n");
+    printf(" - JSON logs: extracts all JSON fields.\n");
+    printf(" - logfmt logs: extracts all logfmt fields.\n");
+    printf(" - free-form logs: uses PCRE2 patterns to extracts fields.\n");
+    printf("\n");
+    printf("Usage: %s [OPTIONS] PATTERN|json\n", name);
+    printf("\n");
+    printf("Options:\n");
+    printf("\n");
+#ifdef HAVE_LIBYAML
+    printf("  --file /path/to/file.yaml\n");
+    printf("       Read yaml configuration file for instructions.\n");
+    printf("\n");
+    printf("  --config CONFIG_NAME\n");
+    printf("       Run with the internal configuration named CONFIG_NAME\n");
+    printf("       Available internal configs:\n");
+    printf("\n");
+    config_dir_print_available();
+    printf("\n");
+#else
+    printf("  IMPORTANT:\n");
+    printf("  YAML configuration parsing is not compiled in this binary.\n");
+    printf("\n");
+#endif
+    printf("  --show-config\n");
+    printf("       Show the configuration in YAML format before starting the job.\n");
+    printf("       This is also an easy way to convert command line parameters to yaml.\n");
+    printf("\n");
+    printf("  --filename-key KEY\n");
+    printf("       Add a field with KEY as the key and the current filename as value.\n");
+    printf("       Automatically detects filenames when piped after 'tail -F',\n");
+    printf("       and tail matches multiple filenames.\n");
+    printf("       To inject the filename when tailing a single file, use --inject.\n");
+    printf("\n");
+    printf("  --unmatched-key KEY\n");
+    printf("       Include unmatched log entries in the output with KEY as the field name.\n");
+    printf("       Use this to include unmatched entries to the output stream.\n");
+    printf("       Usually it should be set to --unmatched-key=MESSAGE so that the\n");
+    printf("       unmatched entry will appear as the log message in the journals.\n");
+    printf("       Use --inject-unmatched to inject additional fields to unmatched lines.\n");
+    printf("\n");
+    printf("  --duplicate TARGET=KEY1[,KEY2[,KEY3[,...]]\n");
+    printf("       Create a new key called TARGET, duplicating the values of the keys\n");
+    printf("       given. Useful for further processing. When multiple keys are given,\n");
+    printf("       their values are separated by comma.\n");
+    printf("       Up to %d duplications can be given on the command line, and up to\n", MAX_KEY_DUPS);
+    printf("       %d keys per duplication command are allowed.\n", MAX_KEY_DUPS_KEYS);
+    printf("\n");
+    printf("  --inject LINE\n");
+    printf("       Inject constant fields to the output (both matched and unmatched logs).\n");
+    printf("       --inject entries are added to unmatched lines too, when their key is\n");
+    printf("       not used in --inject-unmatched (--inject-unmatched override --inject).\n");
+    printf("       Up to %d fields can be injected.\n", MAX_INJECTIONS);
+    printf("\n");
+    printf("  --inject-unmatched LINE\n");
+    printf("       Inject lines into the output for each unmatched log entry.\n");
+    printf("       Usually, --inject-unmatched=PRIORITY=3 is needed to mark the unmatched\n");
+    printf("       lines as errors, so that they can easily be spotted in the journals.\n");
+    printf("       Up to %d such lines can be injected.\n", MAX_INJECTIONS);
+    printf("\n");
+    printf("  --rewrite KEY=/SearchPattern/ReplacePattern\n");
+    printf("       Apply a rewrite rule to the values of a specific key.\n");
+    printf("       The first character after KEY= is the separator, which should also\n");
+    printf("       be used between the search pattern and the replacement pattern.\n");
+    printf("       The search pattern is a PCRE2 regular expression, and the replacement\n");
+    printf("       pattern supports literals and named capture groups from the search pattern.\n");
+    printf("       Example:\n");
+    printf("              --rewrite DATE=/^(?<year>\\d{4})-(?<month>\\d{2})-(?<day>\\d{2})$/\n");
+    printf("                             ${day}/${month}/${year}\n");
+    printf("       This will rewrite dates in the format YYYY-MM-DD to DD/MM/YYYY.\n");
+    printf("\n");
+    printf("       Only one rewrite rule is applied per key; the sequence of rewrites stops\n");
+    printf("       for the key once a rule matches it. This allows providing a sequence of\n");
+    printf("       independent rewriting rules for the same key, matching the different values\n");
+    printf("       the key may get, and also provide a catch-all rewrite rule at the end of the\n");
+    printf("       sequence for setting the key value if no other rule matched it.\n");
+    printf("\n");
+    printf("       The combination of duplicating keys with the values of multiple other keys\n");
+    printf("       combined with multiple rewrite rules, allows creating complex rules for\n");
+    printf("       rewriting key values.\n");
+    printf("       Up to %d rewriting rules are allowed.\n", MAX_REWRITES);
+    printf("\n");
+    printf("  --prefix PREFIX\n");
+    printf("       Prefix all JSON or logfmt fields with PREFIX.\n");
+    printf("\n");
+    printf("  --rename NEW=OLD\n");
+    printf("       Rename fields, before rewriting their values.\n");
+    printf("       Up to %d renaming rules are allowed.\n", MAX_RENAMES);
+    printf("\n");
+    printf("  -h, --help\n");
+    printf("       Display this help and exit.\n");
+    printf("\n");
+    printf("  PATTERN\n");
+    printf("       PATTERN should be a valid PCRE2 regular expression.\n");
+    printf("       RE2 regular expressions (like the ones usually used in Go applications),\n");
+    printf("       are usually valid PCRE2 patterns too.\n");
+    printf("       Regular expressions without named groups are evaluated but their matches\n");
+    printf("       are not added to the output.\n");
+    printf("\n");
+    printf("  JSON mode\n");
+    printf("       JSON mode is enabled when the pattern is set to: json\n");
+    printf("       Field names are extracted from the JSON logs and are converted to the\n");
+    printf("       format expected by Journal Export Format (all caps, only _ is allowed).\n");
+    printf("       Prefixing is enabled in this mode.\n");
+    printf("  logfmt mode\n");
+    printf("       logfmt mode is enabled when the pattern is set to: logfmt\n");
+    printf("       Field names are extracted from the logfmt logs and are converted to the\n");
+    printf("       format expected by Journal Export Format (all caps, only _ is allowed).\n");
+    printf("       Prefixing is enabled in this mode.\n");
+    printf("\n");
+    printf("\n");
+    printf("The program accepts all parameters as both --option=value and --option value.\n");
+    printf("\n");
+    printf("The maximum line length accepted is %d characters.\n", MAX_LINE_LENGTH);
+    printf("The maximum number of fields in the PCRE2 pattern is %d.\n", OVECCOUNT / 3);
+    printf("\n");
+    printf("PIPELINE AND SEQUENCE OF PROCESSING\n");
+    printf("\n");
+    printf("This is a simple diagram of the pipeline taking place:\n");
+    printf("\n");
+    printf("           +---------------------------------------------------+\n");
+    printf("           |                       INPUT                       |\n");
+    printf("           +---------------------------------------------------+\n");
+    printf("                            v                          v\n");
+    printf("           +---------------------------------+         |\n");
+    printf("           |   EXTRACT FIELDS AND VALUES     |         |\n");
+    printf("           +---------------------------------+         |\n");
+    printf("                  v                  v                 |\n");
+    printf("           +---------------+  +--------------+         |\n");
+    printf("           |   DUPLICATE   |  |    RENAME    |         |\n");
+    printf("           | create fields |  |  change the  |         |\n");
+    printf("           |  with values  |  |  field name  |         |\n");
+    printf("           +---------------+  +--------------+         |\n");
+    printf("                  v                  v                 v\n");
+    printf("           +---------------------------------+  +--------------+\n");
+    printf("           |        REWRITE PIPELINES        |  |    INJECT    |\n");
+    printf("           |    altering keys and values     |  |   constants  |\n");
+    printf("           +---------------------------------+  +--------------+\n");
+    printf("                             v                          v\n");
+    printf("           +---------------------------------------------------+\n");
+    printf("           |                       OUTPUT                      |\n");
+    printf("           +---------------------------------------------------+\n");
+    printf("\n");
+    printf("JOURNAL FIELDS RULES (enforced by systemd-journald)\n");
+    printf("\n");
+    printf("     - field names can be up to 64 characters\n");
+    printf("     - the only allowed field characters are A-Z, 0-9 and underscore\n");
+    printf("     - the first character of fields cannot be a digit\n");
+    printf("     - protected journal fields start with underscore:\n");
+    printf("       * they are accepted by systemd-journal-remote\n");
+    printf("       * they are NOT accepted by a local systemd-journald\n");
+    printf("\n");
+    printf("     For best results, always include these fields:\n");
+    printf("\n");
+    printf("      MESSAGE=TEXT\n");
+    printf("      The MESSAGE is the body of the log entry.\n");
+    printf("      This field is what we usually see in our logs.\n");
+    printf("\n");
+    printf("      PRIORITY=NUMBER\n");
+    printf("      PRIORITY sets the severity of the log entry.\n");
+    printf("      0=emerg, 1=alert, 2=crit, 3=err, 4=warn, 5=notice, 6=info, 7=debug\n");
+    printf("      - Emergency events (0) are usually broadcast to all terminals.\n");
+    printf("      - Emergency, alert, critical, and error (0-3) are usually colored red.\n");
+    printf("      - Warning (4) entries are usually colored yellow.\n");
+    printf("      - Notice (5) entries are usually bold or have a brighter white color.\n");
+    printf("      - Info (6) entries are the default.\n");
+    printf("      - Debug (7) entries are usually grayed or dimmed.\n");
+    printf("\n");
+    printf("      SYSLOG_IDENTIFIER=NAME\n");
+    printf("      SYSLOG_IDENTIFIER sets the name of application.\n");
+    printf("      Use something descriptive, like: SYSLOG_IDENTIFIER=nginx-logs\n");
+    printf("\n");
+    printf("You can find the most common fields at 'man systemd.journal-fields'.\n");
+    printf("\n");
+}

+ 633 - 0
collectors/log2journal/log2journal-json.c

@@ -0,0 +1,633 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "log2journal.h"
+
+#define ERROR_LINE_MAX 1024
+#define KEY_MAX 1024
+#define JSON_DEPTH_MAX 100
+
+struct log_json_state {
+    const char *line;
+    size_t pos;
+    char msg[ERROR_LINE_MAX];
+
+    char key[KEY_MAX];
+    char *key_stack[JSON_DEPTH_MAX];
+    size_t depth;
+
+    struct log_job *jb;
+};
+
+static inline bool json_parse_object(LOG_JSON_STATE *js);
+static inline bool json_parse_array(LOG_JSON_STATE *js);
+
+#define json_current_pos(js) &(js)->line[(js)->pos]
+#define json_consume_char(js) ++(js)->pos
+
+static inline void json_process_key_value(LOG_JSON_STATE *js, const char *value, size_t len) {
+    jb_send_extracted_key_value(js->jb, js->key, value, len);
+}
+
+static inline void json_skip_spaces(LOG_JSON_STATE *js) {
+    const char *s = json_current_pos(js);
+    const char *start = s;
+
+    while(isspace(*s)) s++;
+
+    js->pos += s - start;
+}
+
+static inline bool json_expect_char_after_white_space(LOG_JSON_STATE *js, const char *expected) {
+    json_skip_spaces(js);
+
+    const char *s = json_current_pos(js);
+    for(const char *e = expected; *e ;e++) {
+        if (*s == *e)
+            return true;
+    }
+
+    snprintf(js->msg, sizeof(js->msg),
+             "JSON PARSER: character '%c' is not one of the expected characters (%s), at pos %zu",
+             *s ? *s : '?', expected, js->pos);
+
+    return false;
+}
+
+static inline bool json_parse_null(LOG_JSON_STATE *js) {
+    const char *s = json_current_pos(js);
+    if (strncmp(s, "null", 4) == 0) {
+        json_process_key_value(js, "null", 4);
+        js->pos += 4;
+        return true;
+    }
+    else {
+        snprintf(js->msg, sizeof(js->msg),
+                 "JSON PARSER: expected 'null', found '%.4s' at position %zu", s, js->pos);
+        return false;
+    }
+}
+
+static inline bool json_parse_true(LOG_JSON_STATE *js) {
+    const char *s = json_current_pos(js);
+    if (strncmp(s, "true", 4) == 0) {
+        json_process_key_value(js, "true", 4);
+        js->pos += 4;
+        return true;
+    }
+    else {
+        snprintf(js->msg, sizeof(js->msg),
+                 "JSON PARSER: expected 'true', found '%.4s' at position %zu", s, js->pos);
+        return false;
+    }
+}
+
+static inline bool json_parse_false(LOG_JSON_STATE *js) {
+    const char *s = json_current_pos(js);
+    if (strncmp(s, "false", 5) == 0) {
+        json_process_key_value(js, "false", 5);
+        js->pos += 5;
+        return true;
+    }
+    else {
+        snprintf(js->msg, sizeof(js->msg),
+                 "JSON PARSER: expected 'false', found '%.4s' at position %zu", s, js->pos);
+        return false;
+    }
+}
+
+static inline bool json_parse_number(LOG_JSON_STATE *js) {
+    static __thread char value[8192];
+
+    value[0] = '\0';
+    char *d = value;
+    const char *s = json_current_pos(js);
+    size_t remaining = sizeof(value) - 1; // Reserve space for null terminator
+
+    // Optional minus sign
+    if (*s == '-') {
+        *d++ = *s++;
+        remaining--;
+    }
+
+    // Digits before decimal point
+    while (*s >= '0' && *s <= '9') {
+        if (remaining < 2) {
+            snprintf(js->msg, sizeof(js->msg), "JSON PARSER: truncated number value at pos %zu", js->pos);
+            return false;
+        }
+        *d++ = *s++;
+        remaining--;
+    }
+
+    // Decimal point and fractional part
+    if (*s == '.') {
+        *d++ = *s++;
+        remaining--;
+
+        while (*s >= '0' && *s <= '9') {
+            if (remaining < 2) {
+                snprintf(js->msg, sizeof(js->msg), "JSON PARSER: truncated fractional part at pos %zu", js->pos);
+                return false;
+            }
+            *d++ = *s++;
+            remaining--;
+        }
+    }
+
+    // Exponent part
+    if (*s == 'e' || *s == 'E') {
+        *d++ = *s++;
+        remaining--;
+
+        // Optional sign in exponent
+        if (*s == '+' || *s == '-') {
+            *d++ = *s++;
+            remaining--;
+        }
+
+        while (*s >= '0' && *s <= '9') {
+            if (remaining < 2) {
+                snprintf(js->msg, sizeof(js->msg), "JSON PARSER: truncated exponent at pos %zu", js->pos);
+                return false;
+            }
+            *d++ = *s++;
+            remaining--;
+        }
+    }
+
+    *d = '\0';
+    js->pos += d - value;
+
+    if (d > value) {
+        json_process_key_value(js, value, d - value);
+        return true;
+    } else {
+        snprintf(js->msg, sizeof(js->msg), "JSON PARSER: invalid number format at pos %zu", js->pos);
+        return false;
+    }
+}
+
+static bool encode_utf8(unsigned codepoint, char **d, size_t *remaining) {
+    if (codepoint <= 0x7F) {
+        // 1-byte sequence
+        if (*remaining < 2) return false; // +1 for the null
+        *(*d)++ = (char)codepoint;
+        (*remaining)--;
+    }
+    else if (codepoint <= 0x7FF) {
+        // 2-byte sequence
+        if (*remaining < 3) return false; // +1 for the null
+        *(*d)++ = (char)(0xC0 | ((codepoint >> 6) & 0x1F));
+        *(*d)++ = (char)(0x80 | (codepoint & 0x3F));
+        (*remaining) -= 2;
+    }
+    else if (codepoint <= 0xFFFF) {
+        // 3-byte sequence
+        if (*remaining < 4) return false; // +1 for the null
+        *(*d)++ = (char)(0xE0 | ((codepoint >> 12) & 0x0F));
+        *(*d)++ = (char)(0x80 | ((codepoint >> 6) & 0x3F));
+        *(*d)++ = (char)(0x80 | (codepoint & 0x3F));
+        (*remaining) -= 3;
+    }
+    else if (codepoint <= 0x10FFFF) {
+        // 4-byte sequence
+        if (*remaining < 5) return false; // +1 for the null
+        *(*d)++ = (char)(0xF0 | ((codepoint >> 18) & 0x07));
+        *(*d)++ = (char)(0x80 | ((codepoint >> 12) & 0x3F));
+        *(*d)++ = (char)(0x80 | ((codepoint >> 6) & 0x3F));
+        *(*d)++ = (char)(0x80 | (codepoint & 0x3F));
+        (*remaining) -= 4;
+    }
+    else
+        // Invalid code point
+        return false;
+
+    return true;
+}
+
+static inline bool json_parse_string(LOG_JSON_STATE *js) {
+    static __thread char value[MAX_VALUE_LEN];
+
+    if(!json_expect_char_after_white_space(js, "\""))
+        return false;
+
+    json_consume_char(js);
+
+    value[0] = '\0';
+    char *d = value;
+    const char *s = json_current_pos(js);
+    size_t remaining = sizeof(value);
+
+    while (*s && *s != '"') {
+        char c;
+
+        if (*s == '\\') {
+            s++;
+
+            switch (*s) {
+                case 'n':
+                    c = '\n';
+                    s++;
+                    break;
+                case 't':
+                    c = '\t';
+                    s++;
+                    break;
+                case 'b':
+                    c = '\b';
+                    s++;
+                    break;
+                case 'f':
+                    c = '\f';
+                    s++;
+                    break;
+                case 'r':
+                    c = '\r';
+                    s++;
+                    break;
+                case 'u':
+                    if(isxdigit(s[1]) && isxdigit(s[2]) && isxdigit(s[3]) && isxdigit(s[4])) {
+                        char b[5] = {
+                                [0] = s[1],
+                                [1] = s[2],
+                                [2] = s[3],
+                                [3] = s[4],
+                                [4] = '\0',
+                        };
+                        unsigned codepoint = strtoul(b, NULL, 16);
+                        if(encode_utf8(codepoint, &d, &remaining)) {
+                            s += 5;
+                            continue;
+                        }
+                        else {
+                            *d++ = '\\';
+                            remaining--;
+                            c = *s++;
+                        }
+                    }
+                    else {
+                        *d++ = '\\';
+                        remaining--;
+                        c = *s++;
+                    }
+                    break;
+
+                default:
+                    c = *s++;
+                    break;
+            }
+        }
+        else
+            c = *s++;
+
+        if(remaining < 2) {
+            snprintf(js->msg, sizeof(js->msg),
+                     "JSON PARSER: truncated string value at pos %zu", js->pos);
+            return false;
+        }
+        else {
+            *d++ = c;
+            remaining--;
+        }
+    }
+    *d = '\0';
+    js->pos += s - json_current_pos(js);
+
+    if(!json_expect_char_after_white_space(js, "\""))
+        return false;
+
+    json_consume_char(js);
+
+    if(d > value)
+        json_process_key_value(js, value, d - value);
+
+    return true;
+}
+
+static inline bool json_parse_key_and_push(LOG_JSON_STATE *js) {
+    static const char valid_journal_key_chars[256] = {
+            // control characters
+            [0] = '\0', [1] = '_', [2] = '_', [3] = '_', [4] = '_', [5] = '_', [6] = '_', [7] = '_',
+            [8] = '_', [9] = '_', [10] = '_', [11] = '_', [12] = '_', [13] = '_', [14] = '_', [15] = '_',
+            [16] = '_', [17] = '_', [18] = '_', [19] = '_', [20] = '_', [21] = '_', [22] = '_', [23] = '_',
+            [24] = '_', [25] = '_', [26] = '_', [27] = '_', [28] = '_', [29] = '_', [30] = '_', [31] = '_',
+
+            // symbols
+            [' '] = '_', ['!'] = '_', ['"'] = '_', ['#'] = '_', ['$'] = '_', ['%'] = '_', ['&'] = '_', ['\''] = '_',
+            ['('] = '_', [')'] = '_', ['*'] = '_', ['+'] = '_', [','] = '_', ['-'] = '_', ['.'] = '_', ['/'] = '_',
+
+            // numbers
+            ['0'] = '0', ['1'] = '1', ['2'] = '2', ['3'] = '3', ['4'] = '4', ['5'] = '5', ['6'] = '6', ['7'] = '7',
+            ['8'] = '8', ['9'] = '9',
+
+            // symbols
+            [':'] = '_', [';'] = '_', ['<'] = '_', ['='] = '_', ['>'] = '_', ['?'] = '_', ['@'] = '_',
+
+            // capitals
+            ['A'] = 'A', ['B'] = 'B', ['C'] = 'C', ['D'] = 'D', ['E'] = 'E', ['F'] = 'F', ['G'] = 'G', ['H'] = 'H',
+            ['I'] = 'I', ['J'] = 'J', ['K'] = 'K', ['L'] = 'L', ['M'] = 'M', ['N'] = 'N', ['O'] = 'O', ['P'] = 'P',
+            ['Q'] = 'Q', ['R'] = 'R', ['S'] = 'S', ['T'] = 'T', ['U'] = 'U', ['V'] = 'V', ['W'] = 'W', ['X'] = 'X',
+            ['Y'] = 'Y', ['Z'] = 'Z',
+
+            // symbols
+            ['['] = '_', ['\\'] = '_', [']'] = '_', ['^'] = '_', ['_'] = '_', ['`'] = '_',
+
+            // lower to upper
+            ['a'] = 'A', ['b'] = 'B', ['c'] = 'C', ['d'] = 'D', ['e'] = 'E', ['f'] = 'F', ['g'] = 'G', ['h'] = 'H',
+            ['i'] = 'I', ['j'] = 'J', ['k'] = 'K', ['l'] = 'L', ['m'] = 'M', ['n'] = 'N', ['o'] = 'O', ['p'] = 'P',
+            ['q'] = 'Q', ['r'] = 'R', ['s'] = 'S', ['t'] = 'T', ['u'] = 'U', ['v'] = 'V', ['w'] = 'W', ['x'] = 'X',
+            ['y'] = 'Y', ['z'] = 'Z',
+
+            // symbols
+            ['{'] = '_', ['|'] = '_', ['}'] = '_', ['~'] = '_', [127] = '_', // Delete (DEL)
+
+            // Extended ASCII characters (128-255) set to underscore
+            [128] = '_', [129] = '_', [130] = '_', [131] = '_', [132] = '_', [133] = '_', [134] = '_', [135] = '_',
+            [136] = '_', [137] = '_', [138] = '_', [139] = '_', [140] = '_', [141] = '_', [142] = '_', [143] = '_',
+            [144] = '_', [145] = '_', [146] = '_', [147] = '_', [148] = '_', [149] = '_', [150] = '_', [151] = '_',
+            [152] = '_', [153] = '_', [154] = '_', [155] = '_', [156] = '_', [157] = '_', [158] = '_', [159] = '_',
+            [160] = '_', [161] = '_', [162] = '_', [163] = '_', [164] = '_', [165] = '_', [166] = '_', [167] = '_',
+            [168] = '_', [169] = '_', [170] = '_', [171] = '_', [172] = '_', [173] = '_', [174] = '_', [175] = '_',
+            [176] = '_', [177] = '_', [178] = '_', [179] = '_', [180] = '_', [181] = '_', [182] = '_', [183] = '_',
+            [184] = '_', [185] = '_', [186] = '_', [187] = '_', [188] = '_', [189] = '_', [190] = '_', [191] = '_',
+            [192] = '_', [193] = '_', [194] = '_', [195] = '_', [196] = '_', [197] = '_', [198] = '_', [199] = '_',
+            [200] = '_', [201] = '_', [202] = '_', [203] = '_', [204] = '_', [205] = '_', [206] = '_', [207] = '_',
+            [208] = '_', [209] = '_', [210] = '_', [211] = '_', [212] = '_', [213] = '_', [214] = '_', [215] = '_',
+            [216] = '_', [217] = '_', [218] = '_', [219] = '_', [220] = '_', [221] = '_', [222] = '_', [223] = '_',
+            [224] = '_', [225] = '_', [226] = '_', [227] = '_', [228] = '_', [229] = '_', [230] = '_', [231] = '_',
+            [232] = '_', [233] = '_', [234] = '_', [235] = '_', [236] = '_', [237] = '_', [238] = '_', [239] = '_',
+            [240] = '_', [241] = '_', [242] = '_', [243] = '_', [244] = '_', [245] = '_', [246] = '_', [247] = '_',
+            [248] = '_', [249] = '_', [250] = '_', [251] = '_', [252] = '_', [253] = '_', [254] = '_', [255] = '_',
+    };
+
+    if (!json_expect_char_after_white_space(js, "\""))
+        return false;
+
+    if(js->depth >= JSON_DEPTH_MAX - 1) {
+        snprintf(js->msg, sizeof(js->msg),
+                 "JSON PARSER: object too deep, at pos %zu", js->pos);
+        return false;
+    }
+
+    json_consume_char(js);
+
+    char *d = js->key_stack[js->depth];
+    if(js->depth)
+        *d++ = '_';
+
+    size_t remaining = sizeof(js->key) - (d - js->key);
+
+    const char *s = json_current_pos(js);
+    char last_c = '\0';
+    while(*s && *s != '\"') {
+        char c;
+
+        if (*s == '\\') {
+            s++;
+            c = (char)((*s == 'u') ? '_' : valid_journal_key_chars[(unsigned char)*s]);
+            s += (*s == 'u') ? 5 : 1;
+        }
+        else
+            c = valid_journal_key_chars[(unsigned char)*s++];
+
+        if(c == '_' && last_c == '_')
+            continue;
+        else {
+            if(remaining < 2) {
+                snprintf(js->msg, sizeof(js->msg),
+                         "JSON PARSER: key buffer full - keys are too long, at pos %zu", js->pos);
+                return false;
+            }
+            *d++ = c;
+            remaining--;
+        }
+
+        last_c = c;
+    }
+    *d = '\0';
+    js->pos += s - json_current_pos(js);
+
+    if (!json_expect_char_after_white_space(js, "\""))
+        return false;
+
+    json_consume_char(js);
+
+    js->key_stack[++js->depth] = d;
+
+    return true;
+}
+
+static inline bool json_key_pop(LOG_JSON_STATE *js) {
+    if(js->depth <= 0) {
+        snprintf(js->msg, sizeof(js->msg),
+                 "JSON PARSER: cannot pop a key at depth %zu, at pos %zu", js->depth, js->pos);
+        return false;
+    }
+
+    char *k = js->key_stack[js->depth--];
+    *k = '\0';
+    return true;
+}
+
+static inline bool json_parse_value(LOG_JSON_STATE *js) {
+    if(!json_expect_char_after_white_space(js, "-.0123456789tfn\"{["))
+        return false;
+
+    const char *s = json_current_pos(js);
+    switch(*s) {
+        case '-':
+        case '0':
+        case '1':
+        case '2':
+        case '3':
+        case '4':
+        case '5':
+        case '6':
+        case '7':
+        case '8':
+        case '9':
+            return json_parse_number(js);
+
+        case 't':
+            return json_parse_true(js);
+
+        case 'f':
+            return json_parse_false(js);
+
+        case 'n':
+            return json_parse_null(js);
+
+        case '"':
+            return json_parse_string(js);
+
+        case '{':
+            return json_parse_object(js);
+
+        case '[':
+            return json_parse_array(js);
+    }
+
+    snprintf(js->msg, sizeof(js->msg),
+             "JSON PARSER: unexpected character at pos %zu", js->pos);
+    return false;
+}
+
+static inline bool json_key_index_and_push(LOG_JSON_STATE *js, size_t index) {
+    char *d = js->key_stack[js->depth];
+    if(js->depth > 0) {
+        *d++ = '_';
+    }
+
+    // Convert index to string manually
+    char temp[32];
+    char *t = temp + sizeof(temp) - 1; // Start at the end of the buffer
+    *t = '\0';
+
+    do {
+        *--t = (char)((index % 10) + '0');
+        index /= 10;
+    } while (index > 0);
+
+    size_t remaining = sizeof(js->key) - (d - js->key);
+
+    // Append the index to the key
+    while (*t) {
+        if(remaining < 2) {
+            snprintf(js->msg, sizeof(js->msg),
+                     "JSON PARSER: key buffer full - keys are too long, at pos %zu", js->pos);
+            return false;
+        }
+
+        *d++ = *t++;
+        remaining--;
+    }
+
+    *d = '\0'; // Null-terminate the key
+    js->key_stack[++js->depth] = d;
+
+    return true;
+}
+
+static inline bool json_parse_array(LOG_JSON_STATE *js) {
+    if(!json_expect_char_after_white_space(js, "["))
+        return false;
+
+    json_consume_char(js);
+
+    size_t index = 0;
+    do {
+        if(!json_key_index_and_push(js, index))
+            return false;
+
+        if(!json_parse_value(js))
+            return false;
+
+        json_key_pop(js);
+
+        if(!json_expect_char_after_white_space(js, ",]"))
+            return false;
+
+        const char *s = json_current_pos(js);
+        json_consume_char(js);
+        if(*s == ',') {
+            index++;
+            continue;
+        }
+        else // }
+            break;
+
+    } while(true);
+
+    return true;
+}
+
+static inline bool json_parse_object(LOG_JSON_STATE *js) {
+    if(!json_expect_char_after_white_space(js, "{"))
+        return false;
+
+    json_consume_char(js);
+
+    do {
+        if (!json_expect_char_after_white_space(js, "\""))
+            return false;
+
+        if(!json_parse_key_and_push(js))
+            return false;
+
+        if(!json_expect_char_after_white_space(js, ":"))
+            return false;
+
+        json_consume_char(js);
+
+        if(!json_parse_value(js))
+            return false;
+
+        json_key_pop(js);
+
+        if(!json_expect_char_after_white_space(js, ",}"))
+            return false;
+
+        const char *s = json_current_pos(js);
+        json_consume_char(js);
+        if(*s == ',')
+            continue;
+        else // }
+            break;
+
+    } while(true);
+
+    return true;
+}
+
+LOG_JSON_STATE *json_parser_create(struct log_job *jb) {
+    LOG_JSON_STATE *js = mallocz(sizeof(LOG_JSON_STATE));
+    memset(js, 0, sizeof(LOG_JSON_STATE));
+    js->jb = jb;
+
+    if(jb->prefix)
+        copy_to_buffer(js->key, sizeof(js->key), js->jb->prefix, strlen(js->jb->prefix));
+
+    js->key_stack[0] = &js->key[strlen(js->key)];
+
+    return js;
+}
+
+void json_parser_destroy(LOG_JSON_STATE *js) {
+    if(js)
+        freez(js);
+}
+
+const char *json_parser_error(LOG_JSON_STATE *js) {
+    return js->msg;
+}
+
+bool json_parse_document(LOG_JSON_STATE *js, const char *txt) {
+    js->line = txt;
+    js->pos = 0;
+    js->msg[0] = '\0';
+    js->key_stack[0][0] = '\0';
+    js->depth = 0;
+
+    if(!json_parse_object(js))
+        return false;
+
+    json_skip_spaces(js);
+    const char *s = json_current_pos(js);
+
+    if(*s) {
+        snprintf(js->msg, sizeof(js->msg),
+                 "JSON PARSER: excess characters found after document is finished, at pos %zu", js->pos);
+        return false;
+    }
+
+    return true;
+}
+
+void json_test(void) {
+    struct log_job jb = { .prefix = "NIGNX_" };
+    LOG_JSON_STATE *json = json_parser_create(&jb);
+
+    json_parse_document(json, "{\"value\":\"\\u\\u039A\\u03B1\\u03BB\\u03B7\\u03BC\\u03AD\\u03C1\\u03B1\"}");
+
+    json_parser_destroy(json);
+}

+ 268 - 0
collectors/log2journal/log2journal-logfmt.c

@@ -0,0 +1,268 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "log2journal.h"
+
+#define ERROR_LINE_MAX 1024
+#define KEY_MAX 1024
+
+struct logfmt_state {
+    const char *line;
+    size_t pos;
+    char msg[ERROR_LINE_MAX];
+
+    char key[KEY_MAX];
+    size_t key_start;
+
+    struct log_job *jb;
+};
+
+#define logfmt_current_pos(lfs) &(lfs)->line[(lfs)->pos]
+#define logfmt_consume_char(lfs) ++(lfs)->pos
+
+static inline void logfmt_process_key_value(LOGFMT_STATE *lfs, const char *value, size_t len) {
+    jb_send_extracted_key_value(lfs->jb, lfs->key, value, len);
+}
+
+static inline void logfmt_skip_spaces(LOGFMT_STATE *lfs) {
+    const char *s = logfmt_current_pos(lfs);
+    const char *start = s;
+
+    while(isspace(*s)) s++;
+
+    lfs->pos += s - start;
+}
+
+static inline bool logftm_parse_value(LOGFMT_STATE *lfs) {
+    static __thread char value[MAX_VALUE_LEN];
+
+    char quote = '\0';
+    const char *s = logfmt_current_pos(lfs);
+    if(*s == '\"' || *s == '\'') {
+        quote = *s;
+        logfmt_consume_char(lfs);
+    }
+
+    value[0] = '\0';
+    char *d = value;
+    s = logfmt_current_pos(lfs);
+    size_t remaining = sizeof(value);
+
+    char end_char = (char)(quote == '\0' ? ' ' : quote);
+    while (*s && *s != end_char) {
+        char c;
+
+        if (*s == '\\') {
+            s++;
+
+            switch (*s) {
+                case 'n':
+                    c = '\n';
+                    s++;
+                    break;
+                case 't':
+                    c = '\t';
+                    s++;
+                    break;
+                case 'b':
+                    c = '\b';
+                    s++;
+                    break;
+                case 'f':
+                    c = '\f';
+                    s++;
+                    break;
+                case 'r':
+                    c = '\r';
+                    s++;
+                    break;
+                default:
+                    c = *s++;
+                    break;
+            }
+        }
+        else
+            c = *s++;
+
+        if(remaining < 2) {
+            snprintf(lfs->msg, sizeof(lfs->msg),
+                     "LOGFMT PARSER: truncated string value at pos %zu", lfs->pos);
+            return false;
+        }
+        else {
+            *d++ = c;
+            remaining--;
+        }
+    }
+    *d = '\0';
+    lfs->pos += s - logfmt_current_pos(lfs);
+
+    s = logfmt_current_pos(lfs);
+
+    if(quote != '\0') {
+        if (*s != quote) {
+            snprintf(lfs->msg, sizeof(lfs->msg),
+                     "LOGFMT PARSER: missing quote at pos %zu: '%s'",
+                     lfs->pos, s);
+            return false;
+        }
+        else
+            logfmt_consume_char(lfs);
+    }
+
+    if(d > value)
+        logfmt_process_key_value(lfs, value, d - value);
+
+    return true;
+}
+
+static inline bool logfmt_parse_key(LOGFMT_STATE *lfs) {
+    static const char valid_journal_key_chars[256] = {
+            // control characters
+            [0] = '\0', [1] = '_', [2] = '_', [3] = '_', [4] = '_', [5] = '_', [6] = '_', [7] = '_',
+            [8] = '_', [9] = '_', [10] = '_', [11] = '_', [12] = '_', [13] = '_', [14] = '_', [15] = '_',
+            [16] = '_', [17] = '_', [18] = '_', [19] = '_', [20] = '_', [21] = '_', [22] = '_', [23] = '_',
+            [24] = '_', [25] = '_', [26] = '_', [27] = '_', [28] = '_', [29] = '_', [30] = '_', [31] = '_',
+
+            // symbols
+            [' '] = '_', ['!'] = '_', ['"'] = '_', ['#'] = '_', ['$'] = '_', ['%'] = '_', ['&'] = '_', ['\''] = '_',
+            ['('] = '_', [')'] = '_', ['*'] = '_', ['+'] = '_', [','] = '_', ['-'] = '_', ['.'] = '_', ['/'] = '_',
+
+            // numbers
+            ['0'] = '0', ['1'] = '1', ['2'] = '2', ['3'] = '3', ['4'] = '4', ['5'] = '5', ['6'] = '6', ['7'] = '7',
+            ['8'] = '8', ['9'] = '9',
+
+            // symbols
+            [':'] = '_', [';'] = '_', ['<'] = '_', ['='] = '_', ['>'] = '_', ['?'] = '_', ['@'] = '_',
+
+            // capitals
+            ['A'] = 'A', ['B'] = 'B', ['C'] = 'C', ['D'] = 'D', ['E'] = 'E', ['F'] = 'F', ['G'] = 'G', ['H'] = 'H',
+            ['I'] = 'I', ['J'] = 'J', ['K'] = 'K', ['L'] = 'L', ['M'] = 'M', ['N'] = 'N', ['O'] = 'O', ['P'] = 'P',
+            ['Q'] = 'Q', ['R'] = 'R', ['S'] = 'S', ['T'] = 'T', ['U'] = 'U', ['V'] = 'V', ['W'] = 'W', ['X'] = 'X',
+            ['Y'] = 'Y', ['Z'] = 'Z',
+
+            // symbols
+            ['['] = '_', ['\\'] = '_', [']'] = '_', ['^'] = '_', ['_'] = '_', ['`'] = '_',
+
+            // lower to upper
+            ['a'] = 'A', ['b'] = 'B', ['c'] = 'C', ['d'] = 'D', ['e'] = 'E', ['f'] = 'F', ['g'] = 'G', ['h'] = 'H',
+            ['i'] = 'I', ['j'] = 'J', ['k'] = 'K', ['l'] = 'L', ['m'] = 'M', ['n'] = 'N', ['o'] = 'O', ['p'] = 'P',
+            ['q'] = 'Q', ['r'] = 'R', ['s'] = 'S', ['t'] = 'T', ['u'] = 'U', ['v'] = 'V', ['w'] = 'W', ['x'] = 'X',
+            ['y'] = 'Y', ['z'] = 'Z',
+
+            // symbols
+            ['{'] = '_', ['|'] = '_', ['}'] = '_', ['~'] = '_', [127] = '_', // Delete (DEL)
+
+            // Extended ASCII characters (128-255) set to underscore
+            [128] = '_', [129] = '_', [130] = '_', [131] = '_', [132] = '_', [133] = '_', [134] = '_', [135] = '_',
+            [136] = '_', [137] = '_', [138] = '_', [139] = '_', [140] = '_', [141] = '_', [142] = '_', [143] = '_',
+            [144] = '_', [145] = '_', [146] = '_', [147] = '_', [148] = '_', [149] = '_', [150] = '_', [151] = '_',
+            [152] = '_', [153] = '_', [154] = '_', [155] = '_', [156] = '_', [157] = '_', [158] = '_', [159] = '_',
+            [160] = '_', [161] = '_', [162] = '_', [163] = '_', [164] = '_', [165] = '_', [166] = '_', [167] = '_',
+            [168] = '_', [169] = '_', [170] = '_', [171] = '_', [172] = '_', [173] = '_', [174] = '_', [175] = '_',
+            [176] = '_', [177] = '_', [178] = '_', [179] = '_', [180] = '_', [181] = '_', [182] = '_', [183] = '_',
+            [184] = '_', [185] = '_', [186] = '_', [187] = '_', [188] = '_', [189] = '_', [190] = '_', [191] = '_',
+            [192] = '_', [193] = '_', [194] = '_', [195] = '_', [196] = '_', [197] = '_', [198] = '_', [199] = '_',
+            [200] = '_', [201] = '_', [202] = '_', [203] = '_', [204] = '_', [205] = '_', [206] = '_', [207] = '_',
+            [208] = '_', [209] = '_', [210] = '_', [211] = '_', [212] = '_', [213] = '_', [214] = '_', [215] = '_',
+            [216] = '_', [217] = '_', [218] = '_', [219] = '_', [220] = '_', [221] = '_', [222] = '_', [223] = '_',
+            [224] = '_', [225] = '_', [226] = '_', [227] = '_', [228] = '_', [229] = '_', [230] = '_', [231] = '_',
+            [232] = '_', [233] = '_', [234] = '_', [235] = '_', [236] = '_', [237] = '_', [238] = '_', [239] = '_',
+            [240] = '_', [241] = '_', [242] = '_', [243] = '_', [244] = '_', [245] = '_', [246] = '_', [247] = '_',
+            [248] = '_', [249] = '_', [250] = '_', [251] = '_', [252] = '_', [253] = '_', [254] = '_', [255] = '_',
+    };
+
+    logfmt_skip_spaces(lfs);
+
+    char *d = &lfs->key[lfs->key_start];
+
+    size_t remaining = sizeof(lfs->key) - (d - lfs->key);
+
+    const char *s = logfmt_current_pos(lfs);
+    char last_c = '\0';
+    while(*s && *s != '=') {
+        char c;
+
+        if (*s == '\\')
+            s++;
+
+        c = valid_journal_key_chars[(unsigned char)*s++];
+
+        if(c == '_' && last_c == '_')
+            continue;
+        else {
+            if(remaining < 2) {
+                snprintf(lfs->msg, sizeof(lfs->msg),
+                         "LOGFMT PARSER: key buffer full - keys are too long, at pos %zu", lfs->pos);
+                return false;
+            }
+            *d++ = c;
+            remaining--;
+        }
+
+        last_c = c;
+    }
+    *d = '\0';
+    lfs->pos += s - logfmt_current_pos(lfs);
+
+    s = logfmt_current_pos(lfs);
+    if(*s != '=') {
+        snprintf(lfs->msg, sizeof(lfs->msg),
+                 "LOGFMT PARSER: key is missing the equal sign, at pos %zu", lfs->pos);
+        return false;
+    }
+
+    logfmt_consume_char(lfs);
+
+    return true;
+}
+
+LOGFMT_STATE *logfmt_parser_create(struct log_job *jb) {
+    LOGFMT_STATE *lfs = mallocz(sizeof(LOGFMT_STATE));
+    memset(lfs, 0, sizeof(LOGFMT_STATE));
+    lfs->jb = jb;
+
+    if(jb->prefix)
+        lfs->key_start = copy_to_buffer(lfs->key, sizeof(lfs->key), lfs->jb->prefix, strlen(lfs->jb->prefix));
+
+    return lfs;
+}
+
+void logfmt_parser_destroy(LOGFMT_STATE *lfs) {
+    if(lfs)
+        freez(lfs);
+}
+
+const char *logfmt_parser_error(LOGFMT_STATE *lfs) {
+    return lfs->msg;
+}
+
+bool logfmt_parse_document(LOGFMT_STATE *lfs, const char *txt) {
+    lfs->line = txt;
+    lfs->pos = 0;
+    lfs->msg[0] = '\0';
+
+    const char *s;
+    do {
+        if(!logfmt_parse_key(lfs))
+            return false;
+
+        if(!logftm_parse_value(lfs))
+            return false;
+
+        logfmt_skip_spaces(lfs);
+
+        s = logfmt_current_pos(lfs);
+    } while(*s);
+
+    return true;
+}
+
+
+void logfmt_test(void) {
+    struct log_job jb = { .prefix = "NIGNX_" };
+    LOGFMT_STATE *logfmt = logfmt_parser_create(&jb);
+
+    logfmt_parse_document(logfmt, "x=1 y=2 z=\"3 \\ 4\" 5  ");
+
+    logfmt_parser_destroy(logfmt);
+}

+ 519 - 0
collectors/log2journal/log2journal-params.c

@@ -0,0 +1,519 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "log2journal.h"
+
+static bool parse_replacement_pattern(struct key_rewrite *rw);
+
+// ----------------------------------------------------------------------------
+
+void nd_log_destroy(struct log_job *jb) {
+    for(size_t i = 0; i < jb->injections.used ;i++) {
+        if(jb->injections.keys[i].value.s)
+            freez(jb->injections.keys[i].value.s);
+    }
+
+    for(size_t i = 0; i < jb->unmatched.injections.used ;i++) {
+        if(jb->unmatched.injections.keys[i].value.s)
+            freez(jb->unmatched.injections.keys[i].value.s);
+    }
+
+    for(size_t i = 0; i < jb->dups.used ;i++) {
+        struct key_dup *kd = &jb->dups.array[i];
+
+        if(kd->target)
+            freez(kd->target);
+
+        for(size_t j = 0; j < kd->used ; j++) {
+            if (kd->keys[j])
+                freez(kd->keys[j]);
+
+            if (kd->values[j].s)
+                freez(kd->values[j].s);
+        }
+    }
+
+    for(size_t i = 0; i < jb->rewrites.used; i++) {
+        struct key_rewrite *rw = &jb->rewrites.array[i];
+
+        if (rw->key)
+            freez(rw->key);
+
+        if (rw->search_pattern)
+            freez(rw->search_pattern);
+
+        if (rw->replace_pattern)
+            freez(rw->replace_pattern);
+
+        if(rw->match_data)
+            pcre2_match_data_free(rw->match_data);
+
+        if (rw->re)
+            pcre2_code_free(rw->re);
+
+        // Cleanup for replacement nodes linked list
+        struct replacement_node *current = rw->nodes;
+        while (current != NULL) {
+            struct replacement_node *next = current->next;
+
+            if (current->s)
+                freez((void *)current->s);
+
+            freez(current);
+            current = next;
+        }
+    }
+
+    memset(jb, 0, sizeof(*jb));
+}
+
+// ----------------------------------------------------------------------------
+
+bool log_job_add_filename_key(struct log_job *jb, const char *key, size_t key_len) {
+    if(!key || !*key) {
+        log2stderr("filename key cannot be empty.");
+        return false;
+    }
+
+    if(jb->filename.key)
+        freez((char*)jb->filename.key);
+
+    jb->filename.key = strndupz(key, key_len);
+
+    return true;
+}
+
+bool log_job_add_key_prefix(struct log_job *jb, const char *prefix, size_t prefix_len) {
+    if(!prefix || !*prefix) {
+        log2stderr("filename key cannot be empty.");
+        return false;
+    }
+
+    if(jb->prefix)
+        freez((char*)jb->prefix);
+
+    jb->prefix = strndupz(prefix, prefix_len);
+
+    return true;
+}
+
+bool log_job_add_injection(struct log_job *jb, const char *key, size_t key_len, const char *value, size_t value_len, bool unmatched) {
+    if (unmatched) {
+        if (jb->unmatched.injections.used >= MAX_INJECTIONS) {
+            log2stderr("Error: too many unmatched injections. You can inject up to %d lines.", MAX_INJECTIONS);
+            return false;
+        }
+    }
+    else {
+        if (jb->injections.used >= MAX_INJECTIONS) {
+            log2stderr("Error: too many injections. You can inject up to %d lines.", MAX_INJECTIONS);
+            return false;
+        }
+    }
+
+    if (unmatched) {
+        key_value_replace(&jb->unmatched.injections.keys[jb->unmatched.injections.used++],
+                key, key_len,
+                value, value_len);
+    } else {
+        key_value_replace(&jb->injections.keys[jb->injections.used++],
+                key, key_len,
+                value, value_len);
+    }
+
+    return true;
+}
+
+bool log_job_add_rename(struct log_job *jb, const char *new_key, size_t new_key_len, const char *old_key, size_t old_key_len) {
+    if(jb->renames.used >= MAX_RENAMES) {
+        log2stderr("Error: too many renames. You can rename up to %d fields.", MAX_RENAMES);
+        return false;
+    }
+
+    struct key_rename *rn = &jb->renames.array[jb->renames.used++];
+    rn->new_key = strndupz(new_key, new_key_len);
+    rn->new_hash = XXH3_64bits(rn->new_key, strlen(rn->new_key));
+    rn->old_key = strndupz(old_key, old_key_len);
+    rn->old_hash = XXH3_64bits(rn->old_key, strlen(rn->old_key));
+
+    return true;
+}
+
+bool log_job_add_rewrite(struct log_job *jb, const char *key, const char *search_pattern, const char *replace_pattern) {
+    if(jb->rewrites.used >= MAX_REWRITES) {
+        log2stderr("Error: too many rewrites. You can add up to %d rewrite rules.", MAX_REWRITES);
+        return false;
+    }
+
+    pcre2_code *re = jb_compile_pcre2_pattern(search_pattern);
+    if (!re) {
+        return false;
+    }
+
+    struct key_rewrite *rw = &jb->rewrites.array[jb->rewrites.used++];
+    rw->key = strdupz(key);
+    rw->hash = XXH3_64bits(rw->key, strlen(rw->key));
+    rw->search_pattern = strdupz(search_pattern);
+    rw->replace_pattern = strdupz(replace_pattern);
+    rw->re = re;
+    rw->match_data = pcre2_match_data_create_from_pattern(rw->re, NULL);
+
+    // Parse the replacement pattern and create the linked list
+    if (!parse_replacement_pattern(rw)) {
+        pcre2_match_data_free(rw->match_data);
+        pcre2_code_free(rw->re);
+        freez(rw->key);
+        freez(rw->search_pattern);
+        freez(rw->replace_pattern);
+        jb->rewrites.used--;
+        return false;
+    }
+
+    return true;
+}
+
+// ----------------------------------------------------------------------------
+
+struct key_dup *log_job_add_duplication_to_job(struct log_job *jb, const char *target, size_t target_len) {
+    if (jb->dups.used >= MAX_KEY_DUPS) {
+        log2stderr("Error: Too many duplicates defined. Maximum allowed is %d.", MAX_KEY_DUPS);
+        return NULL;
+    }
+
+    struct key_dup *kd = &jb->dups.array[jb->dups.used++];
+    kd->target = strndupz(target, target_len);
+    kd->hash = XXH3_64bits(kd->target, target_len);
+    kd->used = 0;
+    kd->exposed = false;
+
+    // Initialize values array
+    for (size_t i = 0; i < MAX_KEY_DUPS_KEYS; i++) {
+        kd->values[i].s = NULL;
+        kd->values[i].size = 0;
+    }
+
+    return kd;
+}
+
+bool log_job_add_key_to_duplication(struct key_dup *kd, const char *key, size_t key_len) {
+    if (kd->used >= MAX_KEY_DUPS_KEYS) {
+        log2stderr("Error: Too many keys in duplication of target '%s'.", kd->target);
+        return false;
+    }
+
+    kd->keys[kd->used++] = strndupz(key, key_len);
+    return true;
+}
+
+// ----------------------------------------------------------------------------
+// command line params
+
+struct replacement_node *add_replacement_node(struct replacement_node **head, bool is_variable, const char *text) {
+    struct replacement_node *new_node = mallocz(sizeof(struct replacement_node));
+    if (!new_node)
+        return NULL;
+
+    new_node->is_variable = is_variable;
+    new_node->s = text;
+    new_node->len = strlen(text);
+    new_node->next = NULL;
+
+    if (*head == NULL)
+        *head = new_node;
+
+    else {
+        struct replacement_node *current = *head;
+
+        // append it
+        while (current->next != NULL)
+            current = current->next;
+
+        current->next = new_node;
+    }
+
+    return new_node;
+}
+
+static bool parse_replacement_pattern(struct key_rewrite *rw) {
+    const char *current = rw->replace_pattern;
+
+    while (*current != '\0') {
+        if (*current == '$' && *(current + 1) == '{') {
+            // Start of a variable
+            const char *end = strchr(current, '}');
+            if (!end) {
+                log2stderr("Error: Missing closing brace in replacement pattern: %s", rw->replace_pattern);
+                return false;
+            }
+
+            size_t name_length = end - current - 2; // Length of the variable name
+            char *variable_name = strndupz(current + 2, name_length);
+            if (!variable_name) {
+                log2stderr("Error: Memory allocation failed for variable name.");
+                return false;
+            }
+
+            struct replacement_node *node = add_replacement_node(&(rw->nodes), true, variable_name);
+            if (!node) {
+                freez(variable_name);
+                log2stderr("Error: Failed to add replacement node for variable.");
+                return false;
+            }
+
+            current = end + 1; // Move past the variable
+        }
+        else {
+            // Start of literal text
+            const char *start = current;
+            while (*current != '\0' && !(*current == '$' && *(current + 1) == '{')) {
+                current++;
+            }
+
+            size_t text_length = current - start;
+            char *text = strndupz(start, text_length);
+            if (!text) {
+                log2stderr("Error: Memory allocation failed for literal text.");
+                return false;
+            }
+
+            struct replacement_node *node = add_replacement_node(&(rw->nodes), false, text);
+            if (!node) {
+                freez(text);
+                log2stderr("Error: Failed to add replacement node for text.");
+                return false;
+            }
+        }
+    }
+
+    return true;
+}
+
+static bool parse_rename(struct log_job *jb, const char *param) {
+    // Search for '=' in param
+    const char *equal_sign = strchr(param, '=');
+    if (!equal_sign || equal_sign == param) {
+        log2stderr("Error: Invalid rename format, '=' not found in %s", param);
+        return false;
+    }
+
+    const char *new_key = param;
+    size_t new_key_len = equal_sign - new_key;
+
+    const char *old_key = equal_sign + 1;
+    size_t old_key_len = strlen(old_key);
+
+    return log_job_add_rename(jb, new_key, new_key_len, old_key, old_key_len);
+}
+
+static bool is_symbol(char c) {
+    return !isalpha(c) && !isdigit(c) && !iscntrl(c);
+}
+
+static bool parse_rewrite(struct log_job *jb, const char *param) {
+    // Search for '=' in param
+    const char *equal_sign = strchr(param, '=');
+    if (!equal_sign || equal_sign == param) {
+        log2stderr("Error: Invalid rewrite format, '=' not found in %s", param);
+        return false;
+    }
+
+    // Get the next character as the separator
+    char separator = *(equal_sign + 1);
+    if (!separator || !is_symbol(separator)) {
+        log2stderr("Error: rewrite separator not found after '=', or is not one of /\\|-# in: %s", param);
+        return false;
+    }
+
+    // Find the next occurrence of the separator
+    const char *second_separator = strchr(equal_sign + 2, separator);
+    if (!second_separator) {
+        log2stderr("Error: rewrite second separator not found in: %s", param);
+        return false;
+    }
+
+    // Check if the search pattern is empty
+    if (equal_sign + 1 == second_separator) {
+        log2stderr("Error: rewrite search pattern is empty in: %s", param);
+        return false;
+    }
+
+    // Check if the replacement pattern is empty
+    if (*(second_separator + 1) == '\0') {
+        log2stderr("Error: rewrite replacement pattern is empty in: %s", param);
+        return false;
+    }
+
+    // Reserve a slot in rewrites
+    if (jb->rewrites.used >= MAX_REWRITES) {
+        log2stderr("Error: Exceeded maximum number of rewrite rules, while processing: %s", param);
+        return false;
+    }
+
+    // Extract key, search pattern, and replacement pattern
+    char *key = strndupz(param, equal_sign - param);
+    char *search_pattern = strndupz(equal_sign + 2, second_separator - (equal_sign + 2));
+    char *replace_pattern = strdupz(second_separator + 1);
+
+    bool ret = log_job_add_rewrite(jb, key, search_pattern, replace_pattern);
+
+    freez(key);
+    freez(search_pattern);
+    freez(replace_pattern);
+
+    return ret;
+}
+
+static bool parse_inject(struct log_job *jb, const char *value, bool unmatched) {
+    const char *equal = strchr(value, '=');
+    if (!equal) {
+        log2stderr("Error: injection '%s' does not have an equal sign.", value);
+        return false;
+    }
+
+    const char *key = value;
+    const char *val = equal + 1;
+    log_job_add_injection(jb, key, equal - key, val, strlen(val), unmatched);
+
+    return true;
+}
+
+static bool parse_duplicate(struct log_job *jb, const char *value) {
+    const char *target = value;
+    const char *equal_sign = strchr(value, '=');
+    if (!equal_sign || equal_sign == target) {
+        log2stderr("Error: Invalid duplicate format, '=' not found or at the start in %s", value);
+        return false;
+    }
+
+    size_t target_len = equal_sign - target;
+    struct key_dup *kd = log_job_add_duplication_to_job(jb, target, target_len);
+    if(!kd) return false;
+
+    const char *key = equal_sign + 1;
+    while (key) {
+        if (kd->used >= MAX_KEY_DUPS_KEYS) {
+            log2stderr("Error: too many keys in duplication of target '%s'.", kd->target);
+            return false;
+        }
+
+        const char *comma = strchr(key, ',');
+        size_t key_len;
+        if (comma) {
+            key_len = comma - key;
+            log_job_add_key_to_duplication(kd, key, key_len);
+            key = comma + 1;
+        }
+        else {
+            log_job_add_key_to_duplication(kd, key, strlen(key));
+            break;  // No more keys
+        }
+    }
+
+    return true;
+}
+
+bool parse_log2journal_parameters(struct log_job *jb, int argc, char **argv) {
+    for (int i = 1; i < argc; i++) {
+        char *arg = argv[i];
+        if (strcmp(arg, "--help") == 0 || strcmp(arg, "-h") == 0) {
+            log2journal_command_line_help(argv[0]);
+            exit(0);
+        }
+#if defined(NETDATA_DEV_MODE) || defined(NETDATA_INTERNAL_CHECKS)
+        else if(strcmp(arg, "--test") == 0) {
+            // logfmt_test();
+            json_test();
+            exit(1);
+        }
+#endif
+        else if (strcmp(arg, "--show-config") == 0) {
+            jb->show_config = true;
+        }
+        else {
+            char buffer[1024];
+            char *param = NULL;
+            char *value = NULL;
+
+            char *equal_sign = strchr(arg, '=');
+            if (equal_sign) {
+                copy_to_buffer(buffer, sizeof(buffer), arg, equal_sign - arg);
+                param = buffer;
+                value = equal_sign + 1;
+            }
+            else {
+                param = arg;
+                if (i + 1 < argc) {
+                    value = argv[++i];
+                }
+                else {
+                    if (!jb->pattern) {
+                        jb->pattern = arg;
+                        continue;
+                    } else {
+                        log2stderr("Error: Multiple patterns detected. Specify only one pattern. The first is '%s', the second is '%s'", jb->pattern, arg);
+                        return false;
+                    }
+                }
+            }
+
+            if (strcmp(param, "--filename-key") == 0) {
+                if(!log_job_add_filename_key(jb, value, value ? strlen(value) : 0))
+                    return false;
+            }
+            if (strcmp(param, "--prefix") == 0) {
+                if(!log_job_add_key_prefix(jb, value, value ? strlen(value) : 0))
+                    return false;
+            }
+#ifdef HAVE_LIBYAML
+            else if (strcmp(param, "-f") == 0 || strcmp(param, "--file") == 0) {
+                if (!yaml_parse_file(value, jb))
+                    return false;
+            }
+            else if (strcmp(param, "--config") == 0) {
+                if (!yaml_parse_config(value, jb))
+                    return false;
+            }
+#endif
+            else if (strcmp(param, "--unmatched-key") == 0)
+                jb->unmatched.key = value;
+            else if (strcmp(param, "--duplicate") == 0) {
+                if (!parse_duplicate(jb, value))
+                    return false;
+            }
+            else if (strcmp(param, "--inject") == 0) {
+                if (!parse_inject(jb, value, false))
+                    return false;
+            }
+            else if (strcmp(param, "--inject-unmatched") == 0) {
+                if (!parse_inject(jb, value, true))
+                    return false;
+            }
+            else if (strcmp(param, "--rewrite") == 0) {
+                if (!parse_rewrite(jb, value))
+                    return false;
+            }
+            else if (strcmp(param, "--rename") == 0) {
+                if (!parse_rename(jb, value))
+                    return false;
+            }
+            else {
+                i--;
+                if (!jb->pattern) {
+                    jb->pattern = arg;
+                    continue;
+                } else {
+                    log2stderr("Error: Multiple patterns detected. Specify only one pattern. The first is '%s', the second is '%s'", jb->pattern, arg);
+                    return false;
+                }
+            }
+        }
+    }
+
+    // Check if a pattern is set and exactly one pattern is specified
+    if (!jb->pattern) {
+        log2stderr("Error: Pattern not specified.");
+        log2journal_command_line_help(argv[0]);
+        return false;
+    }
+
+    return true;
+}

+ 838 - 0
collectors/log2journal/log2journal-yaml.c

@@ -0,0 +1,838 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "log2journal.h"
+
+// ----------------------------------------------------------------------------
+// yaml configuration file
+
+#ifdef HAVE_LIBYAML
+
+static const char *yaml_event_name(yaml_event_type_t type) {
+    switch (type) {
+        case YAML_NO_EVENT:
+            return "YAML_NO_EVENT";
+
+        case YAML_SCALAR_EVENT:
+            return "YAML_SCALAR_EVENT";
+
+        case YAML_ALIAS_EVENT:
+            return "YAML_ALIAS_EVENT";
+
+        case YAML_MAPPING_START_EVENT:
+            return "YAML_MAPPING_START_EVENT";
+
+        case YAML_MAPPING_END_EVENT:
+            return "YAML_MAPPING_END_EVENT";
+
+        case YAML_SEQUENCE_START_EVENT:
+            return "YAML_SEQUENCE_START_EVENT";
+
+        case YAML_SEQUENCE_END_EVENT:
+            return "YAML_SEQUENCE_END_EVENT";
+
+        case YAML_STREAM_START_EVENT:
+            return "YAML_STREAM_START_EVENT";
+
+        case YAML_STREAM_END_EVENT:
+            return "YAML_STREAM_END_EVENT";
+
+        case YAML_DOCUMENT_START_EVENT:
+            return "YAML_DOCUMENT_START_EVENT";
+
+        case YAML_DOCUMENT_END_EVENT:
+            return "YAML_DOCUMENT_END_EVENT";
+
+        default:
+            return "UNKNOWN";
+    }
+}
+
+#define yaml_error(parser, event, fmt, args...) yaml_error_with_trace(parser, event, __LINE__, __FUNCTION__, __FILE__, fmt, ##args)
+static void yaml_error_with_trace(yaml_parser_t *parser, yaml_event_t *event, size_t line, const char *function, const char *file, const char *format, ...) __attribute__ ((format(__printf__, 6, 7)));
+static void yaml_error_with_trace(yaml_parser_t *parser, yaml_event_t *event, size_t line, const char *function, const char *file, const char *format, ...) {
+    char buf[1024] = ""; // Initialize buf to an empty string
+    const char *type = "";
+
+    if(event) {
+        type = yaml_event_name(event->type);
+
+        switch (event->type) {
+            case YAML_SCALAR_EVENT:
+                copy_to_buffer(buf, sizeof(buf), (char *)event->data.scalar.value, event->data.scalar.length);
+                break;
+
+            case YAML_ALIAS_EVENT:
+                snprintf(buf, sizeof(buf), "%s", event->data.alias.anchor);
+                break;
+
+            default:
+                break;
+        }
+    }
+
+    fprintf(stderr, "YAML %zu@%s, %s(): (line %d, column %d, %s%s%s): ",
+            line, file, function,
+            (int)(parser->mark.line + 1), (int)(parser->mark.column + 1),
+            type, buf[0]? ", near ": "", buf);
+
+    va_list args;
+    va_start(args, format);
+    vfprintf(stderr, format, args);
+    va_end(args);
+    fprintf(stderr, "\n");
+}
+
+#define yaml_parse(parser, event) yaml_parse_with_trace(parser, event, __LINE__, __FUNCTION__, __FILE__)
+static bool yaml_parse_with_trace(yaml_parser_t *parser, yaml_event_t *event, size_t line, const char *function, const char *file) {
+    if (!yaml_parser_parse(parser, event)) {
+        yaml_error(parser, NULL, "YAML parser error %d", parser->error);
+        return false;
+    }
+
+//    fprintf(stderr, ">>> %s >>> %.*s\n",
+//            yaml_event_name(event->type),
+//            event->type == YAML_SCALAR_EVENT ? event->data.scalar.length : 0,
+//            event->type == YAML_SCALAR_EVENT ? (char *)event->data.scalar.value : "");
+
+    return true;
+}
+
+#define yaml_parse_expect_event(parser, type) yaml_parse_expect_event_with_trace(parser, type, __LINE__, __FUNCTION__, __FILE__)
+static bool yaml_parse_expect_event_with_trace(yaml_parser_t *parser, yaml_event_type_t type, size_t line, const char *function, const char *file) {
+    yaml_event_t event;
+    if (!yaml_parse(parser, &event))
+        return false;
+
+    bool ret = true;
+    if(event.type != type) {
+        yaml_error_with_trace(parser, &event, line, function, file, "unexpected event - expecting: %s", yaml_event_name(type));
+        ret = false;
+    }
+//    else
+//        fprintf(stderr, "OK (%zu@%s, %s()\n", line, file, function);
+
+    yaml_event_delete(&event);
+    return ret;
+}
+
+#define yaml_scalar_matches(event, s, len) yaml_scalar_matches_with_trace(event, s, len, __LINE__, __FUNCTION__, __FILE__)
+static bool yaml_scalar_matches_with_trace(yaml_event_t *event, const char *s, size_t len, size_t line __maybe_unused, const char *function __maybe_unused, const char *file __maybe_unused) {
+    if(event->type != YAML_SCALAR_EVENT)
+        return false;
+
+    if(len != event->data.scalar.length)
+        return false;
+//    else
+//        fprintf(stderr, "OK (%zu@%s, %s()\n", line, file, function);
+
+    return strcmp((char *)event->data.scalar.value, s) == 0;
+}
+
+// ----------------------------------------------------------------------------
+
+static struct key_dup *yaml_parse_duplicate_key(struct log_job *jb, yaml_parser_t *parser) {
+    yaml_event_t event;
+
+    if (!yaml_parse(parser, &event))
+        return false;
+
+    struct key_dup *kd = NULL;
+    if(event.type == YAML_SCALAR_EVENT) {
+        kd = log_job_add_duplication_to_job(jb, (char *) event.data.scalar.value, event.data.scalar.length);
+    }
+    else
+        yaml_error(parser, &event, "duplicate key must be a scalar.");
+
+    yaml_event_delete(&event);
+    return kd;
+}
+
+static size_t yaml_parse_duplicate_from(struct log_job *jb, yaml_parser_t *parser, struct key_dup *kd) {
+    size_t errors = 0;
+    yaml_event_t event;
+
+    if (!yaml_parse(parser, &event))
+        return 1;
+
+    bool ret = true;
+    if(event.type == YAML_SCALAR_EVENT)
+        ret = log_job_add_key_to_duplication(kd, (char *) event.data.scalar.value, event.data.scalar.length);
+
+    else if(event.type == YAML_SEQUENCE_START_EVENT) {
+        bool finished = false;
+        while(!errors && !finished) {
+            yaml_event_t sub_event;
+            if (!yaml_parse(parser, &sub_event))
+                return errors++;
+            else {
+                if (sub_event.type == YAML_SCALAR_EVENT)
+                    log_job_add_key_to_duplication(kd, (char *) sub_event.data.scalar.value
+                                                   , sub_event.data.scalar.length
+                                                  );
+
+                else if (sub_event.type == YAML_SEQUENCE_END_EVENT)
+                    finished = true;
+
+                yaml_event_delete(&sub_event);
+            }
+        }
+    }
+    else
+        yaml_error(parser, &event, "not expected event type");
+
+    yaml_event_delete(&event);
+    return errors;
+}
+
+static size_t yaml_parse_filename_injection(yaml_parser_t *parser, struct log_job *jb) {
+    yaml_event_t event;
+    size_t errors = 0;
+
+    if(!yaml_parse_expect_event(parser, YAML_MAPPING_START_EVENT))
+        return 1;
+
+    if (!yaml_parse(parser, &event))
+        return 1;
+
+    if (yaml_scalar_matches(&event, "key", strlen("key"))) {
+        yaml_event_t sub_event;
+        if (!yaml_parse(parser, &sub_event))
+            errors++;
+
+        else {
+            if (event.type == YAML_SCALAR_EVENT) {
+                if(!log_job_add_filename_key(jb, (char *)sub_event.data.scalar.value, sub_event.data.scalar.length))
+                    errors++;
+            }
+
+            else {
+                yaml_error(parser, &sub_event, "expected the filename as %s", yaml_event_name(YAML_SCALAR_EVENT));
+                errors++;
+            }
+
+            yaml_event_delete(&sub_event);
+        }
+    }
+
+    if(!yaml_parse_expect_event(parser, YAML_MAPPING_END_EVENT))
+        errors++;
+
+    yaml_event_delete(&event);
+    return errors;
+}
+
+static size_t yaml_parse_duplicates_injection(yaml_parser_t *parser, struct log_job *jb) {
+    if (!yaml_parse_expect_event(parser, YAML_SEQUENCE_START_EVENT))
+        return 1;
+
+    struct key_dup *kd = NULL;
+
+    // Expecting a key-value pair for each duplicate
+    bool finished;
+    size_t errors = 0;
+    while (!errors && !finished) {
+        yaml_event_t event;
+        if (!yaml_parse(parser, &event)) {
+            errors++;
+            break;
+        }
+
+        if(event.type == YAML_MAPPING_START_EVENT) {
+            ;
+        }
+        if (event.type == YAML_SEQUENCE_END_EVENT) {
+            finished = true;
+        }
+        else if(event.type == YAML_SCALAR_EVENT) {
+            if (yaml_scalar_matches(&event, "key", strlen("key"))) {
+                kd = yaml_parse_duplicate_key(jb, parser);
+                if (!kd)
+                    errors++;
+                else {
+                    while (!errors && kd) {
+                        yaml_event_t sub_event;
+                        if (!yaml_parse(parser, &sub_event)) {
+                            errors++;
+                            break;
+                        }
+
+                        if (sub_event.type == YAML_MAPPING_END_EVENT) {
+                            kd = NULL;
+                        } else if (sub_event.type == YAML_SCALAR_EVENT) {
+                            if (yaml_scalar_matches(&sub_event, "values_of", strlen("values_of"))) {
+                                if (!kd) {
+                                    yaml_error(parser, &sub_event, "Found 'values_of' but the 'key' is not set.");
+                                    errors++;
+                                } else
+                                    errors += yaml_parse_duplicate_from(jb, parser, kd);
+                            } else {
+                                yaml_error(parser, &sub_event, "unknown scalar");
+                                errors++;
+                            }
+                        } else {
+                            yaml_error(parser, &sub_event, "unexpected event type");
+                            errors++;
+                        }
+
+                        // Delete the event after processing
+                        yaml_event_delete(&event);
+                    }
+                }
+            } else {
+                yaml_error(parser, &event, "unknown scalar");
+                errors++;
+            }
+        }
+
+        yaml_event_delete(&event);
+    }
+
+    return errors;
+}
+
+static bool yaml_parse_constant_field_injection(yaml_parser_t *parser, struct log_job *jb, bool unmatched) {
+    yaml_event_t event;
+    if (!yaml_parse(parser, &event) || event.type != YAML_SCALAR_EVENT) {
+        yaml_error(parser, &event, "Expected scalar for constant field injection key");
+        yaml_event_delete(&event);
+        return false;
+    }
+
+    char *key = strndupz((char *)event.data.scalar.value, event.data.scalar.length);
+    char *value = NULL;
+    bool ret = false;
+
+    yaml_event_delete(&event);
+
+    if (!yaml_parse(parser, &event) || event.type != YAML_SCALAR_EVENT) {
+        yaml_error(parser, &event, "Expected scalar for constant field injection value");
+        goto cleanup;
+    }
+
+    if(!yaml_scalar_matches(&event, "value", strlen("value"))) {
+        yaml_error(parser, &event, "Expected scalar 'value'");
+        goto cleanup;
+    }
+
+    if (!yaml_parse(parser, &event) || event.type != YAML_SCALAR_EVENT) {
+        yaml_error(parser, &event, "Expected scalar for constant field injection value");
+        goto cleanup;
+    }
+
+    value = strndupz((char *)event.data.scalar.value, event.data.scalar.length);
+
+    if(!log_job_add_injection(jb, key, strlen(key), value, strlen(value), unmatched))
+        ret = false;
+    else
+        ret = true;
+
+    ret = true;
+
+cleanup:
+    yaml_event_delete(&event);
+    freez(key);
+    freez(value);
+    return !ret ? 1 : 0;
+}
+
+static bool yaml_parse_injection_mapping(yaml_parser_t *parser, struct log_job *jb, bool unmatched) {
+    yaml_event_t event;
+    size_t errors = 0;
+    bool finished = false;
+
+    while (!errors && !finished) {
+        if (!yaml_parse(parser, &event)) {
+            errors++;
+            continue;
+        }
+
+        switch (event.type) {
+            case YAML_SCALAR_EVENT:
+                if (yaml_scalar_matches(&event, "key", strlen("key"))) {
+                    errors += yaml_parse_constant_field_injection(parser, jb, unmatched);
+                } else {
+                    yaml_error(parser, &event, "Unexpected scalar in injection mapping");
+                    errors++;
+                }
+                break;
+
+            case YAML_MAPPING_END_EVENT:
+                finished = true;
+                break;
+
+            default:
+                yaml_error(parser, &event, "Unexpected event in injection mapping");
+                errors++;
+                break;
+        }
+
+        yaml_event_delete(&event);
+    }
+
+    return errors == 0;
+}
+
+static size_t yaml_parse_injections(yaml_parser_t *parser, struct log_job *jb, bool unmatched) {
+    yaml_event_t event;
+    size_t errors = 0;
+    bool finished = false;
+
+    if (!yaml_parse_expect_event(parser, YAML_SEQUENCE_START_EVENT))
+        return 1;
+
+    while (!errors && !finished) {
+        if (!yaml_parse(parser, &event)) {
+            errors++;
+            continue;
+        }
+
+        switch (event.type) {
+            case YAML_MAPPING_START_EVENT:
+                if (!yaml_parse_injection_mapping(parser, jb, unmatched))
+                    errors++;
+                break;
+
+            case YAML_SEQUENCE_END_EVENT:
+                finished = true;
+                break;
+
+            default:
+                yaml_error(parser, &event, "Unexpected event in injections sequence");
+                errors++;
+                break;
+        }
+
+        yaml_event_delete(&event);
+    }
+
+    return errors;
+}
+
+static size_t yaml_parse_unmatched(yaml_parser_t *parser, struct log_job *jb) {
+    size_t errors = 0;
+    bool finished = false;
+
+    if (!yaml_parse_expect_event(parser, YAML_MAPPING_START_EVENT))
+        return 1;
+
+    while (!errors && !finished) {
+        yaml_event_t event;
+        if (!yaml_parse(parser, &event)) {
+            errors++;
+            continue;
+        }
+
+        switch (event.type) {
+            case YAML_SCALAR_EVENT:
+                if (yaml_scalar_matches(&event, "key", strlen("key"))) {
+                    yaml_event_t sub_event;
+                    if (!yaml_parse(parser, &sub_event)) {
+                        errors++;
+                    } else {
+                        if (sub_event.type == YAML_SCALAR_EVENT) {
+                            jb->unmatched.key = strndupz((char *)sub_event.data.scalar.value, sub_event.data.scalar.length);
+                        } else {
+                            yaml_error(parser, &sub_event, "expected a scalar value for 'key'");
+                            errors++;
+                        }
+                        yaml_event_delete(&sub_event);
+                    }
+                } else if (yaml_scalar_matches(&event, "inject", strlen("inject"))) {
+                    errors += yaml_parse_injections(parser, jb, true);
+                } else {
+                    yaml_error(parser, &event, "Unexpected scalar in unmatched section");
+                    errors++;
+                }
+                break;
+
+            case YAML_MAPPING_END_EVENT:
+                finished = true;
+                break;
+
+            default:
+                yaml_error(parser, &event, "Unexpected event in unmatched section");
+                errors++;
+                break;
+        }
+
+        yaml_event_delete(&event);
+    }
+
+    return errors;
+}
+
+static size_t yaml_parse_rewrites(yaml_parser_t *parser, struct log_job *jb) {
+    size_t errors = 0;
+
+    if (!yaml_parse_expect_event(parser, YAML_SEQUENCE_START_EVENT))
+        return 1;
+
+    bool finished = false;
+    while (!errors && !finished) {
+        yaml_event_t event;
+        if (!yaml_parse(parser, &event)) {
+            errors++;
+            continue;
+        }
+
+        switch (event.type) {
+            case YAML_MAPPING_START_EVENT:
+            {
+                struct key_rewrite rw = {0};
+
+                bool mapping_finished = false;
+                while (!errors && !mapping_finished) {
+                    yaml_event_t sub_event;
+                    if (!yaml_parse(parser, &sub_event)) {
+                        errors++;
+                        continue;
+                    }
+
+                    switch (sub_event.type) {
+                        case YAML_SCALAR_EVENT:
+                            if (yaml_scalar_matches(&sub_event, "key", strlen("key"))) {
+                                if (!yaml_parse(parser, &sub_event) || sub_event.type != YAML_SCALAR_EVENT) {
+                                    yaml_error(parser, &sub_event, "Expected scalar for rewrite key");
+                                    errors++;
+                                } else {
+                                    rw.key = strndupz((char *)sub_event.data.scalar.value, sub_event.data.scalar.length);
+                                    yaml_event_delete(&sub_event);
+                                }
+                            } else if (yaml_scalar_matches(&sub_event, "search", strlen("search"))) {
+                                if (!yaml_parse(parser, &sub_event) || sub_event.type != YAML_SCALAR_EVENT) {
+                                    yaml_error(parser, &sub_event, "Expected scalar for rewrite search pattern");
+                                    errors++;
+                                } else {
+                                    rw.search_pattern = strndupz((char *)sub_event.data.scalar.value, sub_event.data.scalar.length);
+                                    yaml_event_delete(&sub_event);
+                                }
+                            } else if (yaml_scalar_matches(&sub_event, "replace", strlen("replace"))) {
+                                if (!yaml_parse(parser, &sub_event) || sub_event.type != YAML_SCALAR_EVENT) {
+                                    yaml_error(parser, &sub_event, "Expected scalar for rewrite replace pattern");
+                                    errors++;
+                                } else {
+                                    rw.replace_pattern = strndupz((char *)sub_event.data.scalar.value, sub_event.data.scalar.length);
+                                    yaml_event_delete(&sub_event);
+                                }
+                            } else {
+                                yaml_error(parser, &sub_event, "Unexpected scalar in rewrite mapping");
+                                errors++;
+                            }
+                            break;
+
+                        case YAML_MAPPING_END_EVENT:
+                            if(rw.key && rw.search_pattern && rw.replace_pattern) {
+                                if (!log_job_add_rewrite(jb, rw.key, rw.search_pattern, rw.replace_pattern))
+                                    errors++;
+                            }
+                            freez(rw.key);
+                            freez(rw.search_pattern);
+                            freez(rw.replace_pattern);
+                            memset(&rw, 0, sizeof(rw));
+
+                            mapping_finished = true;
+                            break;
+
+                        default:
+                            yaml_error(parser, &sub_event, "Unexpected event in rewrite mapping");
+                            errors++;
+                            break;
+                    }
+
+                    yaml_event_delete(&sub_event);
+                }
+            }
+                break;
+
+            case YAML_SEQUENCE_END_EVENT:
+                finished = true;
+                break;
+
+            default:
+                yaml_error(parser, &event, "Unexpected event in rewrites sequence");
+                errors++;
+                break;
+        }
+
+        yaml_event_delete(&event);
+    }
+
+    return errors;
+}
+
+static size_t yaml_parse_pattern(yaml_parser_t *parser, struct log_job *jb) {
+    yaml_event_t event;
+    size_t errors = 0;
+
+    if (!yaml_parse(parser, &event))
+        return 1;
+
+    if(event.type == YAML_SCALAR_EVENT)
+        jb->pattern = strndupz((char *)event.data.scalar.value, event.data.scalar.length);
+    else {
+        yaml_error(parser, &event, "unexpected event type");
+        errors++;
+    }
+
+    yaml_event_delete(&event);
+    return errors;
+}
+
+static size_t yaml_parse_initialized(yaml_parser_t *parser, struct log_job *jb) {
+    size_t errors = 0;
+
+    if(!yaml_parse_expect_event(parser, YAML_STREAM_START_EVENT)) {
+        errors++;
+        goto cleanup;
+    }
+
+    if(!yaml_parse_expect_event(parser, YAML_DOCUMENT_START_EVENT)) {
+        errors++;
+        goto cleanup;
+    }
+
+    if(!yaml_parse_expect_event(parser, YAML_MAPPING_START_EVENT)) {
+        errors++;
+        goto cleanup;
+    }
+
+    bool finished = false;
+    while (!errors && !finished) {
+        yaml_event_t event;
+        if(!yaml_parse(parser, &event)) {
+            errors++;
+            continue;
+        }
+
+        switch(event.type) {
+            default:
+                yaml_error(parser, &event, "unexpected type");
+                errors++;
+                break;
+
+            case YAML_MAPPING_END_EVENT:
+                finished = true;
+                break;
+
+            case YAML_SCALAR_EVENT:
+                if (yaml_scalar_matches(&event, "pattern", strlen("pattern")))
+                    errors += yaml_parse_pattern(parser, jb);
+
+                else if (yaml_scalar_matches(&event, "filename", strlen("filename")))
+                    errors += yaml_parse_filename_injection(parser, jb);
+
+                else if (yaml_scalar_matches(&event, "duplicate", strlen("duplicate")))
+                    errors += yaml_parse_duplicates_injection(parser, jb);
+
+                else if (yaml_scalar_matches(&event, "inject", strlen("inject")))
+                    errors += yaml_parse_injections(parser, jb, false);
+
+                else if (yaml_scalar_matches(&event, "unmatched", strlen("unmatched")))
+                    errors += yaml_parse_unmatched(parser, jb);
+
+                else if (yaml_scalar_matches(&event, "rewrite", strlen("rewrite")))
+                    errors += yaml_parse_rewrites(parser, jb);
+
+                else {
+                    yaml_error(parser, &event, "unexpected scalar");
+                    errors++;
+                }
+                break;
+        }
+
+        yaml_event_delete(&event);
+    }
+
+    if(!yaml_parse_expect_event(parser, YAML_DOCUMENT_END_EVENT)) {
+        errors++;
+        goto cleanup;
+    }
+
+    if(!yaml_parse_expect_event(parser, YAML_STREAM_END_EVENT)) {
+        errors++;
+        goto cleanup;
+    }
+
+cleanup:
+    return errors;
+}
+
+bool yaml_parse_file(const char *config_file_path, struct log_job *jb) {
+    if(!config_file_path || !*config_file_path) {
+        log2stderr("yaml configuration filename cannot be empty.");
+        return false;
+    }
+
+    FILE *fp = fopen(config_file_path, "r");
+    if (!fp) {
+        log2stderr("Error opening config file: %s", config_file_path);
+        return false;
+    }
+
+    yaml_parser_t parser;
+    yaml_parser_initialize(&parser);
+    yaml_parser_set_input_file(&parser, fp);
+
+    size_t errors = yaml_parse_initialized(&parser, jb);
+
+    yaml_parser_delete(&parser);
+    fclose(fp);
+    return errors == 0;
+}
+
+bool yaml_parse_config(const char *config_name, struct log_job *jb) {
+    char filename[FILENAME_MAX + 1];
+
+    snprintf(filename, sizeof(filename), "%s/%s.yaml", LOG2JOURNAL_CONFIG_PATH, config_name);
+    return yaml_parse_file(filename, jb);
+}
+
+#endif // HAVE_LIBYAML
+
+// ----------------------------------------------------------------------------
+// printing yaml
+
+static void yaml_print_multiline_value(const char *s, size_t depth) {
+    if (!s)
+        s = "";
+
+    do {
+        const char* next = strchr(s, '\n');
+        if(next) next++;
+
+        size_t len = next ? (size_t)(next - s) : strlen(s);
+        char buf[len + 1];
+        strncpy(buf, s, len);
+        buf[len] = '\0';
+
+        fprintf(stderr, "%.*s%s%s",
+                (int)(depth * 2), "                    ",
+                buf, next ? "" : "\n");
+
+        s = next;
+    } while(s && *s);
+}
+
+static bool needs_quotes_in_yaml(const char *str) {
+    // Lookup table for special YAML characters
+    static bool special_chars[256] = { false };
+    static bool table_initialized = false;
+
+    if (!table_initialized) {
+        // Initialize the lookup table
+        const char *special_chars_str = ":{}[],&*!|>'\"%@`^";
+        for (const char *c = special_chars_str; *c; ++c) {
+            special_chars[(unsigned char)*c] = true;
+        }
+        table_initialized = true;
+    }
+
+    while (*str) {
+        if (special_chars[(unsigned char)*str]) {
+            return true;
+        }
+        str++;
+    }
+    return false;
+}
+
+static void yaml_print_node(const char *key, const char *value, size_t depth, bool dash) {
+    if(depth > 10) depth = 10;
+    const char *quote = "\"";
+
+    const char *second_line = NULL;
+    if(value && strchr(value, '\n')) {
+        second_line = value;
+        value = "|";
+        quote = "";
+    }
+    else if(!value || !needs_quotes_in_yaml(value))
+        quote = "";
+
+    fprintf(stderr, "%.*s%s%s%s%s%s%s\n",
+            (int)(depth * 2), "                    ", dash ? "- ": "",
+            key ? key : "", key ? ": " : "",
+            quote, value ? value : "", quote);
+
+    if(second_line) {
+        yaml_print_multiline_value(second_line, depth + 1);
+    }
+}
+
+void log_job_to_yaml(struct log_job *jb) {
+    if(jb->pattern)
+        yaml_print_node("pattern", jb->pattern, 0, false);
+
+    if(jb->prefix) {
+        fprintf(stderr, "\n");
+        yaml_print_node("prefix", jb->prefix, 0, false);
+    }
+
+    if(jb->filename.key) {
+        fprintf(stderr, "\n");
+        yaml_print_node("filename", NULL, 0, false);
+        yaml_print_node("key", jb->filename.key, 1, false);
+    }
+
+    if(jb->dups.used) {
+        fprintf(stderr, "\n");
+        yaml_print_node("duplicate", NULL, 0, false);
+        for(size_t i = 0; i < jb->dups.used ;i++) {
+            struct key_dup *kd = &jb->dups.array[i];
+            yaml_print_node("key", kd->target, 1, true);
+            yaml_print_node("values_of", NULL, 2, false);
+
+            for(size_t k = 0; k < kd->used ;k++)
+                yaml_print_node(NULL, kd->keys[k], 3, true);
+        }
+    }
+
+    if(jb->injections.used) {
+        fprintf(stderr, "\n");
+        yaml_print_node("inject", NULL, 0, false);
+
+        for (size_t i = 0; i < jb->injections.used; i++) {
+            yaml_print_node("key", jb->injections.keys[i].key, 1, true);
+            yaml_print_node("value", jb->injections.keys[i].value.s, 2, false);
+        }
+    }
+
+    if(jb->rewrites.used) {
+        fprintf(stderr, "\n");
+        yaml_print_node("rewrite", NULL, 0, false);
+
+        for(size_t i = 0; i < jb->rewrites.used ;i++) {
+            yaml_print_node("key", jb->rewrites.array[i].key, 1, true);
+            yaml_print_node("search", jb->rewrites.array[i].search_pattern, 2, false);
+            yaml_print_node("replace", jb->rewrites.array[i].replace_pattern, 2, false);
+        }
+    }
+
+    if(jb->renames.used) {
+        fprintf(stderr, "\n");
+        yaml_print_node("rename", NULL, 0, false);
+
+        for(size_t i = 0; i < jb->renames.used ;i++) {
+            yaml_print_node("new_key", jb->renames.array[i].new_key, 1, true);
+            yaml_print_node("old_key", jb->renames.array[i].old_key, 2, false);
+        }
+    }
+
+    if(jb->unmatched.key || jb->unmatched.injections.used) {
+        fprintf(stderr, "\n");
+        yaml_print_node("unmatched", NULL, 0, false);
+
+        if(jb->unmatched.key)
+            yaml_print_node("key", jb->unmatched.key, 1, false);
+
+        if(jb->unmatched.injections.used) {
+            fprintf(stderr, "\n");
+            yaml_print_node("inject", NULL, 1, false);
+
+            for (size_t i = 0; i < jb->unmatched.injections.used; i++) {
+                yaml_print_node("key", jb->unmatched.injections.keys[i].key, 2, true);
+                yaml_print_node("value", jb->unmatched.injections.keys[i].value.s, 3, false);
+            }
+        }
+    }
+}

Some files were not shown because too many files changed in this diff