Browse Source

log2journal now uses libnetdata (#18919)

Costa Tsaousis 4 months ago
parent
commit
e8d8e24ed1

+ 4 - 1
CMakeLists.txt

@@ -2557,13 +2557,16 @@ if(PCRE2_FOUND)
                 src/collectors/log2journal/log2journal-replace.c
                 src/collectors/log2journal/log2journal-rename.c
                 src/collectors/log2journal/log2journal-rewrite.c
+                src/collectors/log2journal/log2journal-txt.h
+                src/collectors/log2journal/log2journal-hashed-key.h
         )
 
         add_executable(log2journal ${LOG2JOURNAL_FILES})
         target_include_directories(log2journal BEFORE PUBLIC ${CONFIG_H_DIR} ${CMAKE_SOURCE_DIR}/src ${PCRE2_INCLUDE_DIRS})
         target_compile_options(log2journal PUBLIC ${PCRE2_CFLAGS_OTHER})
-        target_link_libraries(log2journal PUBLIC "${PCRE2_LDFLAGS}")
 
+        target_link_libraries(log2journal PUBLIC libnetdata)
+        target_link_libraries(log2journal PUBLIC "${PCRE2_LDFLAGS}")
         netdata_add_libyaml_to_target(log2journal)
 
         install(TARGETS log2journal

+ 80 - 0
src/collectors/log2journal/log2journal-hashed-key.h

@@ -0,0 +1,80 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_LOG2JOURNAL_HASHED_KEY_H
+#define NETDATA_LOG2JOURNAL_HASHED_KEY_H
+
+#include "log2journal.h"
+
+typedef enum __attribute__((__packed__)) {
+    HK_NONE                 = 0,
+
+    // permanent flags - they are set once to optimize various decisions and lookups
+
+    HK_HASHTABLE_ALLOCATED  = (1 << 0), // this is the key object allocated in the hashtable
+                                        // objects that do not have this, have a pointer to a key in the hashtable
+                                        // objects that have this, value is allocated
+
+    HK_FILTERED             = (1 << 1), // we checked once if this key in filtered
+    HK_FILTERED_INCLUDED    = (1 << 2), // the result of the filtering was to include it in the output
+
+    HK_COLLISION_CHECKED    = (1 << 3), // we checked once for collision check of this key
+
+    HK_RENAMES_CHECKED      = (1 << 4), // we checked once if there are renames on this key
+    HK_HAS_RENAMES          = (1 << 5), // and we found there is a rename rule related to it
+
+    // ephemeral flags - they are unset at the end of each log line
+
+    HK_VALUE_FROM_LOG       = (1 << 14), // the value of this key has been read from the log (or from injection, duplication)
+    HK_VALUE_REWRITTEN      = (1 << 15), // the value of this key has been rewritten due to one of our rewrite rules
+
+} HASHED_KEY_FLAGS;
+
+typedef struct hashed_key {
+    const char *key;
+    uint32_t len;
+    HASHED_KEY_FLAGS flags;
+    XXH64_hash_t hash;
+    union {
+        struct hashed_key *hashtable_ptr;   // HK_HASHTABLE_ALLOCATED is not set
+        TXT_L2J value;                      // HK_HASHTABLE_ALLOCATED is set
+    };
+} HASHED_KEY;
+
+static inline void hashed_key_cleanup(HASHED_KEY *k) {
+    if(k->flags & HK_HASHTABLE_ALLOCATED)
+        txt_l2j_cleanup(&k->value);
+    else
+        k->hashtable_ptr = NULL;
+
+    freez((void *)k->key);
+    k->key = NULL;
+    k->len = 0;
+    k->hash = 0;
+    k->flags = HK_NONE;
+}
+
+static inline void hashed_key_set(HASHED_KEY *k, const char *name, int32_t len) {
+    hashed_key_cleanup(k);
+
+    if(len == -1) {
+        k->key = strdupz(name);
+        k->len = strlen(k->key);
+    }
+    else {
+        k->key = strndupz(name, len);
+        k->len = len;
+    }
+
+    k->hash = XXH3_64bits(k->key, k->len);
+    k->flags = HK_NONE;
+}
+
+static inline bool hashed_keys_match(HASHED_KEY *k1, HASHED_KEY *k2) {
+    return ((k1 == k2) || (k1->hash == k2->hash && strcmp(k1->key, k2->key) == 0));
+}
+
+static inline int compare_keys(struct hashed_key *k1, struct hashed_key *k2) {
+    return strcmp(k1->key, k2->key);
+}
+
+#endif //NETDATA_LOG2JOURNAL_HASHED_KEY_H

+ 1 - 1
src/collectors/log2journal/log2journal-help.c

@@ -10,7 +10,7 @@ static void config_dir_print_available(void) {
     dir = opendir(path);
 
     if (dir == NULL) {
-        log2stderr("       >>> Cannot open directory:\n       %s", path);
+        l2j_log("       >>> Cannot open directory:\n       %s", path);
         return;
     }
 

+ 6 - 5
src/collectors/log2journal/log2journal-inject.c

@@ -9,12 +9,13 @@ void injection_cleanup(INJECTION *inj) {
 
 static inline bool log_job_injection_replace(INJECTION *inj, const char *key, size_t key_len, const char *value, size_t value_len) {
     if(key_len > JOURNAL_MAX_KEY_LEN)
-        log2stderr("WARNING: injection key '%.*s' is too long for journal. Will be truncated.", (int)key_len, key);
+        l2j_log("WARNING: injection key '%.*s' is too long for journal. Will be truncated.", (int)key_len, key);
 
     if(value_len > JOURNAL_MAX_VALUE_LEN)
-        log2stderr("WARNING: injection value of key '%.*s' is too long for journal. Will be truncated.", (int)key_len, key);
+        l2j_log(
+            "WARNING: injection value of key '%.*s' is too long for journal. Will be truncated.", (int)key_len, key);
 
-    hashed_key_len_set(&inj->key, key, key_len);
+    hashed_key_set(&inj->key, key, key_len);
     char *v = strndupz(value, value_len);
     bool ret = replace_pattern_set(&inj->value, v);
     freez(v);
@@ -25,13 +26,13 @@ static inline bool log_job_injection_replace(INJECTION *inj, const char *key, si
 bool log_job_injection_add(LOG_JOB *jb, const char *key, size_t key_len, const char *value, size_t value_len, bool unmatched) {
     if (unmatched) {
         if (jb->unmatched.injections.used >= MAX_INJECTIONS) {
-            log2stderr("Error: too many unmatched injections. You can inject up to %d lines.", MAX_INJECTIONS);
+            l2j_log("Error: too many unmatched injections. You can inject up to %d lines.", MAX_INJECTIONS);
             return false;
         }
     }
     else {
         if (jb->injections.used >= MAX_INJECTIONS) {
-            log2stderr("Error: too many injections. You can inject up to %d lines.", MAX_INJECTIONS);
+            l2j_log("Error: too many injections. You can inject up to %d lines.", MAX_INJECTIONS);
             return false;
         }
     }

+ 29 - 23
src/collectors/log2journal/log2journal-params.c

@@ -7,7 +7,7 @@
 void log_job_init(LOG_JOB *jb) {
     memset(jb, 0, sizeof(*jb));
     simple_hashtable_init_KEY(&jb->hashtable, 32);
-    hashed_key_set(&jb->line.key, "LINE");
+    hashed_key_set(&jb->line.key, "LINE", -1);
 }
 
 static void simple_hashtable_cleanup_allocated_keys(SIMPLE_HASHTABLE_KEY *ht) {
@@ -53,8 +53,8 @@ void log_job_cleanup(LOG_JOB *jb) {
     hashed_key_cleanup(&jb->filename.key);
     hashed_key_cleanup(&jb->unmatched.key);
 
-    txt_cleanup(&jb->rewrites.tmp);
-    txt_cleanup(&jb->filename.current);
+    txt_l2j_cleanup(&jb->rewrites.tmp);
+    txt_l2j_cleanup(&jb->filename.current);
 
     simple_hashtable_cleanup_allocated_keys(&jb->hashtable);
     simple_hashtable_destroy_KEY(&jb->hashtable);
@@ -67,18 +67,18 @@ void log_job_cleanup(LOG_JOB *jb) {
 
 bool log_job_filename_key_set(LOG_JOB *jb, const char *key, size_t key_len) {
     if(!key || !*key) {
-        log2stderr("filename key cannot be empty.");
+        l2j_log("filename key cannot be empty.");
         return false;
     }
 
-    hashed_key_len_set(&jb->filename.key, key, key_len);
+    hashed_key_set(&jb->filename.key, key, key_len);
 
     return true;
 }
 
 bool log_job_key_prefix_set(LOG_JOB *jb, const char *prefix, size_t prefix_len) {
     if(!prefix || !*prefix) {
-        log2stderr("filename key cannot be empty.");
+        l2j_log("filename key cannot be empty.");
         return false;
     }
 
@@ -92,7 +92,7 @@ bool log_job_key_prefix_set(LOG_JOB *jb, const char *prefix, size_t prefix_len)
 
 bool log_job_pattern_set(LOG_JOB *jb, const char *pattern, size_t pattern_len) {
     if(!pattern || !*pattern) {
-        log2stderr("filename key cannot be empty.");
+        l2j_log("filename key cannot be empty.");
         return false;
     }
 
@@ -106,12 +106,12 @@ bool log_job_pattern_set(LOG_JOB *jb, const char *pattern, size_t pattern_len) {
 
 bool log_job_include_pattern_set(LOG_JOB *jb, const char *pattern, size_t pattern_len) {
     if(jb->filter.include.re) {
-        log2stderr("FILTER INCLUDE: there is already an include filter set");
+        l2j_log("FILTER INCLUDE: there is already an include filter set");
         return false;
     }
 
     if(!search_pattern_set(&jb->filter.include, pattern, pattern_len)) {
-        log2stderr("FILTER INCLUDE: failed: %s", jb->filter.include.error.txt);
+        l2j_log("FILTER INCLUDE: failed: %s", jb->filter.include.error.txt);
         return false;
     }
 
@@ -120,12 +120,12 @@ bool log_job_include_pattern_set(LOG_JOB *jb, const char *pattern, size_t patter
 
 bool log_job_exclude_pattern_set(LOG_JOB *jb, const char *pattern, size_t pattern_len) {
     if(jb->filter.exclude.re) {
-        log2stderr("FILTER INCLUDE: there is already an exclude filter set");
+        l2j_log("FILTER INCLUDE: there is already an exclude filter set");
         return false;
     }
 
     if(!search_pattern_set(&jb->filter.exclude, pattern, pattern_len)) {
-        log2stderr("FILTER EXCLUDE: failed: %s", jb->filter.exclude.error.txt);
+        l2j_log("FILTER EXCLUDE: failed: %s", jb->filter.exclude.error.txt);
         return false;
     }
 
@@ -138,7 +138,7 @@ static bool parse_rename(LOG_JOB *jb, const char *param) {
     // Search for '=' in param
     const char *equal_sign = strchr(param, '=');
     if (!equal_sign || equal_sign == param) {
-        log2stderr("Error: Invalid rename format, '=' not found in %s", param);
+        l2j_log("Error: Invalid rename format, '=' not found in %s", param);
         return false;
     }
 
@@ -216,7 +216,7 @@ RW_FLAGS parse_rewrite_flags(const char *options) {
         }
 
         if(!found)
-            log2stderr("Warning: rewrite options '%s' is not understood.", token);
+            l2j_log("Warning: rewrite options '%s' is not understood.", token);
 
         // Get the next token
         token = strtok(NULL, ",");
@@ -232,33 +232,33 @@ static bool parse_rewrite(LOG_JOB *jb, const char *param) {
     // Search for '=' in param
     const char *equal_sign = strchr(param, '=');
     if (!equal_sign || equal_sign == param) {
-        log2stderr("Error: Invalid rewrite format, '=' not found in %s", param);
+        l2j_log("Error: Invalid rewrite format, '=' not found in %s", param);
         return false;
     }
 
     // Get the next character as the separator
     char separator = *(equal_sign + 1);
     if (!separator || !is_symbol(separator)) {
-        log2stderr("Error: rewrite separator not found after '=', or is not one of /\\|-# in: %s", param);
+        l2j_log("Error: rewrite separator not found after '=', or is not one of /\\|-# in: %s", param);
         return false;
     }
 
     // Find the next occurrence of the separator
     const char *second_separator = strchr(equal_sign + 2, separator);
     if (!second_separator) {
-        log2stderr("Error: rewrite second separator not found in: %s", param);
+        l2j_log("Error: rewrite second separator not found in: %s", param);
         return false;
     }
 
     // Check if the search pattern is empty
     if (equal_sign + 1 == second_separator) {
-        log2stderr("Error: rewrite search pattern is empty in: %s", param);
+        l2j_log("Error: rewrite search pattern is empty in: %s", param);
         return false;
     }
 
     // Check if the replacement pattern is empty
     if (*(second_separator + 1) == '\0') {
-        log2stderr("Error: rewrite replacement pattern is empty in: %s", param);
+        l2j_log("Error: rewrite replacement pattern is empty in: %s", param);
         return false;
     }
 
@@ -287,7 +287,7 @@ static bool parse_rewrite(LOG_JOB *jb, const char *param) {
 static bool parse_inject(LOG_JOB *jb, const char *value, bool unmatched) {
     const char *equal = strchr(value, '=');
     if (!equal) {
-        log2stderr("Error: injection '%s' does not have an equal sign.", value);
+        l2j_log("Error: injection '%s' does not have an equal sign.", value);
         return false;
     }
 
@@ -336,7 +336,10 @@ bool log_job_command_line_parse_parameters(LOG_JOB *jb, int argc, char **argv) {
                         log_job_pattern_set(jb, arg, strlen(arg));
                         continue;
                     } else {
-                        log2stderr("Error: Multiple patterns detected. Specify only one pattern. The first is '%s', the second is '%s'", jb->pattern, arg);
+                        l2j_log(
+                            "Error: Multiple patterns detected. Specify only one pattern. The first is '%s', the second is '%s'",
+                            jb->pattern,
+                            arg);
                         return false;
                     }
                 }
@@ -361,7 +364,7 @@ bool log_job_command_line_parse_parameters(LOG_JOB *jb, int argc, char **argv) {
             }
 #endif
             else if (strcmp(param, "--unmatched-key") == 0)
-                hashed_key_set(&jb->unmatched.key, value);
+                hashed_key_set(&jb->unmatched.key, value, -1);
             else if (strcmp(param, "--inject") == 0) {
                 if (!parse_inject(jb, value, false))
                     return false;
@@ -392,7 +395,10 @@ bool log_job_command_line_parse_parameters(LOG_JOB *jb, int argc, char **argv) {
                     log_job_pattern_set(jb, arg, strlen(arg));
                     continue;
                 } else {
-                    log2stderr("Error: Multiple patterns detected. Specify only one pattern. The first is '%s', the second is '%s'", jb->pattern, arg);
+                    l2j_log(
+                        "Error: Multiple patterns detected. Specify only one pattern. The first is '%s', the second is '%s'",
+                        jb->pattern,
+                        arg);
                     return false;
                 }
             }
@@ -401,7 +407,7 @@ bool log_job_command_line_parse_parameters(LOG_JOB *jb, int argc, char **argv) {
 
     // Check if a pattern is set and exactly one pattern is specified
     if (!jb->pattern) {
-        log2stderr("Warning: pattern not specified. Try the default config with: -c default");
+        l2j_log("Warning: pattern not specified. Try the default config with: -c default");
         log_job_command_line_help(argv[0]);
         return false;
     }

+ 2 - 2
src/collectors/log2journal/log2journal-pattern.c

@@ -18,13 +18,13 @@ void search_pattern_cleanup(SEARCH_PATTERN *sp) {
         sp->match_data = NULL;
     }
 
-    txt_cleanup(&sp->error);
+    txt_l2j_cleanup(&sp->error);
 }
 
 static void pcre2_error_message(SEARCH_PATTERN *sp, int rc, int pos) {
     char msg[1024];
     pcre2_get_error_in_buffer(msg, sizeof(msg), rc, pos);
-    txt_replace(&sp->error, msg, strlen(msg));
+    txt_l2j_set(&sp->error, msg, strlen(msg));
 }
 
 static inline bool compile_pcre2(SEARCH_PATTERN *sp) {

+ 3 - 3
src/collectors/log2journal/log2journal-rename.c

@@ -9,13 +9,13 @@ void rename_cleanup(RENAME *rn) {
 
 bool log_job_rename_add(LOG_JOB *jb, const char *new_key, size_t new_key_len, const char *old_key, size_t old_key_len) {
     if(jb->renames.used >= MAX_RENAMES) {
-        log2stderr("Error: too many renames. You can rename up to %d fields.", MAX_RENAMES);
+        l2j_log("Error: too many renames. You can rename up to %d fields.", MAX_RENAMES);
         return false;
     }
 
     RENAME *rn = &jb->renames.array[jb->renames.used++];
-    hashed_key_len_set(&rn->new_key, new_key, new_key_len);
-    hashed_key_len_set(&rn->old_key, old_key, old_key_len);
+    hashed_key_set(&rn->new_key, new_key, new_key_len);
+    hashed_key_set(&rn->old_key, old_key, old_key_len);
 
     return true;
 }

+ 6 - 6
src/collectors/log2journal/log2journal-replace.c

@@ -26,7 +26,7 @@ static REPLACE_NODE *replace_pattern_add_node(REPLACE_NODE **head, bool is_varia
     if (!new_node)
         return NULL;
 
-    hashed_key_set(&new_node->name, text);
+    hashed_key_set(&new_node->name, text, -1);
     new_node->is_variable = is_variable;
     new_node->next = NULL;
 
@@ -57,21 +57,21 @@ bool replace_pattern_set(REPLACE_PATTERN *rp, const char *pattern) {
             // Start of a variable
             const char *end = strchr(current, '}');
             if (!end) {
-                log2stderr("Error: Missing closing brace in replacement pattern: %s", rp->pattern);
+                l2j_log("Error: Missing closing brace in replacement pattern: %s", rp->pattern);
                 return false;
             }
 
             size_t name_length = end - current - 2; // Length of the variable name
             char *variable_name = strndupz(current + 2, name_length);
             if (!variable_name) {
-                log2stderr("Error: Memory allocation failed for variable name.");
+                l2j_log("Error: Memory allocation failed for variable name.");
                 return false;
             }
 
             REPLACE_NODE *node = replace_pattern_add_node(&(rp->nodes), true, variable_name);
             if (!node) {
                 freez(variable_name);
-                log2stderr("Error: Failed to add replacement node for variable.");
+                l2j_log("Error: Failed to add replacement node for variable.");
                 return false;
             }
             freez(variable_name);
@@ -88,14 +88,14 @@ bool replace_pattern_set(REPLACE_PATTERN *rp, const char *pattern) {
             size_t text_length = current - start;
             char *text = strndupz(start, text_length);
             if (!text) {
-                log2stderr("Error: Memory allocation failed for literal text.");
+                l2j_log("Error: Memory allocation failed for literal text.");
                 return false;
             }
 
             REPLACE_NODE *node = replace_pattern_add_node(&(rp->nodes), false, text);
             if (!node) {
                 freez(text);
-                log2stderr("Error: Failed to add replacement node for text.");
+                l2j_log("Error: Failed to add replacement node for text.");
                 return false;
             }
             freez(text);

+ 3 - 3
src/collectors/log2journal/log2journal-rewrite.c

@@ -17,19 +17,19 @@ void rewrite_cleanup(REWRITE *rw) {
 
 bool log_job_rewrite_add(LOG_JOB *jb, const char *key, RW_FLAGS flags, const char *search_pattern, const char *replace_pattern) {
     if(jb->rewrites.used >= MAX_REWRITES) {
-        log2stderr("Error: too many rewrites. You can add up to %d rewrite rules.", MAX_REWRITES);
+        l2j_log("Error: too many rewrites. You can add up to %d rewrite rules.", MAX_REWRITES);
         return false;
     }
 
     if((flags & (RW_MATCH_PCRE2|RW_MATCH_NON_EMPTY)) && (!search_pattern || !*search_pattern)) {
-        log2stderr("Error: rewrite for key '%s' does not specify a search pattern.", key);
+        l2j_log("Error: rewrite for key '%s' does not specify a search pattern.", key);
         return false;
     }
 
     REWRITE *rw = &jb->rewrites.array[jb->rewrites.used++];
     rw->flags = flags;
 
-    hashed_key_set(&rw->key, key);
+    hashed_key_set(&rw->key, key, -1);
 
     if((flags & RW_MATCH_PCRE2) && !search_pattern_set(&rw->match_pcre2, search_pattern, strlen(search_pattern))) {
         rewrite_cleanup(rw);

+ 90 - 0
src/collectors/log2journal/log2journal-txt.h

@@ -0,0 +1,90 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_LOG2JOURNAL_TXT_H
+#define NETDATA_LOG2JOURNAL_TXT_H
+
+#include "log2journal.h"
+
+// ----------------------------------------------------------------------------
+// A dynamically sized, reusable text buffer,
+// allowing us to be fast (no allocations during iterations) while having the
+// smallest possible allocations.
+
+typedef struct txt_l2j {
+    char *txt;
+    uint32_t size;
+    uint32_t len;
+} TXT_L2J;
+
+static inline void txt_l2j_cleanup(TXT_L2J *t) {
+    if(!t)
+        return;
+
+    if(t->txt)
+        freez(t->txt);
+
+    t->txt = NULL;
+    t->size = 0;
+    t->len = 0;
+}
+
+#define TXT_L2J_ALLOC_ALIGN 1024
+
+static inline size_t txt_l2j_compute_new_size(size_t old_size, size_t required_size) {
+    size_t size = (required_size % TXT_L2J_ALLOC_ALIGN == 0) ? required_size : required_size + TXT_L2J_ALLOC_ALIGN;
+    size = (size / TXT_L2J_ALLOC_ALIGN) * TXT_L2J_ALLOC_ALIGN;
+
+    if(size < old_size * 2)
+        size = old_size * 2;
+
+    return size;
+}
+
+static inline void txt_l2j_resize(TXT_L2J *dst, size_t required_size, bool keep) {
+    if(required_size <= dst->size)
+        return;
+
+    size_t new_size = txt_l2j_compute_new_size(dst->size, required_size);
+
+    if(keep && dst->txt)
+        dst->txt = reallocz(dst->txt, new_size);
+    else {
+        txt_l2j_cleanup(dst);
+        dst->txt = mallocz(new_size);
+        dst->len = 0;
+    }
+
+    dst->size = new_size;
+}
+
+static inline void txt_l2j_set(TXT_L2J *dst, const char *s, int32_t len) {
+    if(!s || !*s || len == 0) {
+        s = "";
+        len = 0;
+    }
+
+    if(len == -1)
+        len = (int32_t)strlen(s);
+
+    txt_l2j_resize(dst, len + 1, false);
+    memcpy(dst->txt, s, len);
+    dst->txt[len] = '\0';
+    dst->len = len;
+}
+
+static inline void txt_l2j_append(TXT_L2J *dst, const char *s, int32_t len) {
+    if(!dst->txt || !dst->len)
+        txt_l2j_set(dst, s, len);
+
+    else {
+        if(len == -1)
+            len = (int32_t)strlen(s);
+
+        txt_l2j_resize(dst, dst->len + len + 1, true);
+        memcpy(&dst->txt[dst->len], s, len);
+        dst->len += len;
+        dst->txt[dst->len] = '\0';
+    }
+}
+
+#endif //NETDATA_LOG2JOURNAL_TXT_H

Some files were not shown because too many files changed in this diff