Costa Tsaousis 1 год назад
Родитель
Сommit
ce75313de0

+ 16 - 0
Makefile.am

@@ -154,6 +154,8 @@ LIBNETDATA_FILES = \
     libnetdata/dictionary/dictionary.h \
     libnetdata/eval/eval.c \
     libnetdata/eval/eval.h \
+    libnetdata/facets/facets.c \
+    libnetdata/facets/facets.h \
     libnetdata/gorilla/gorilla.h \
     libnetdata/gorilla/gorilla.cc \
     libnetdata/inlined.h \
@@ -302,6 +304,11 @@ FREEIPMI_PLUGIN_FILES = \
     $(LIBNETDATA_FILES) \
     $(NULL)
 
+SYSTEMD_JOURNAL_PLUGIN_FILES = \
+    collectors/systemd-journal.plugin/systemd-journal.c \
+    $(LIBNETDATA_FILES) \
+    $(NULL)
+
 CUPS_PLUGIN_FILES = \
     collectors/cups.plugin/cups_plugin.c \
     $(LIBNETDATA_FILES) \
@@ -1232,6 +1239,15 @@ if ENABLE_PLUGIN_FREEIPMI
         $(NULL)
 endif
 
+if ENABLE_PLUGIN_SYSTEMD_JOURNAL
+    plugins_PROGRAMS += systemd-journal.plugin
+    systemd_journal_plugin_SOURCES = $(SYSTEMD_JOURNAL_PLUGIN_FILES)
+    systemd_journal_plugin_LDADD = \
+        $(NETDATA_COMMON_LIBS) \
+        $(OPTIONAL_SYSTEMD_LIBS) \
+        $(NULL)
+endif
+
 if ENABLE_PLUGIN_EBPF
     plugins_PROGRAMS += ebpf.plugin
     ebpf_plugin_SOURCES = $(EBPF_PLUGIN_FILES)

+ 2 - 2
claim/claim.c

@@ -356,7 +356,7 @@ int api_v2_claim(struct web_client *w, char *url) {
 
     BUFFER *wb = w->response.data;
     buffer_flush(wb);
-    buffer_json_initialize(wb, "\"", "\"", 0, true, false);
+    buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_DEFAULT);
 
     time_t now_s = now_realtime_sec();
     CLOUD_STATUS status = buffer_json_cloud_status(wb, now_s);
@@ -462,7 +462,7 @@ int api_v2_claim(struct web_client *w, char *url) {
         // our status may have changed
         // refresh the status in our output
         buffer_flush(wb);
-        buffer_json_initialize(wb, "\"", "\"", 0, true, false);
+        buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_DEFAULT);
         now_s = now_realtime_sec();
         buffer_json_cloud_status(wb, now_s);
 

+ 1 - 0
collectors/Makefile.am

@@ -25,6 +25,7 @@ SUBDIRS = \
     statsd.plugin \
     ebpf.plugin \
     tc.plugin \
+    systemd-journal.plugin \
     $(NULL)
 
 usercustompluginsconfigdir=$(configdir)/custom-plugins.d

+ 15 - 22
collectors/apps.plugin/apps_plugin.c

@@ -13,7 +13,7 @@
 #define APPS_PLUGIN_PROCESSES_FUNCTION_DESCRIPTION "Detailed information on the currently running processes."
 
 #define APPS_PLUGIN_FUNCTIONS() do { \
-        fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " \"processes\" 10 \"%s\"\n", APPS_PLUGIN_PROCESSES_FUNCTION_DESCRIPTION); \
+        fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " \"processes\" %d \"%s\"\n", PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT, APPS_PLUGIN_PROCESSES_FUNCTION_DESCRIPTION); \
     } while(0)
 
 
@@ -4572,7 +4572,7 @@ static int check_capabilities() {
 }
 #endif
 
-netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
+static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
 
 #define PROCESS_FILTER_CATEGORY "category:"
 #define PROCESS_FILTER_USER "user:"
@@ -4625,15 +4625,6 @@ static void get_MemTotal(void) {
 #endif
 }
 
-static void apps_plugin_function_error(const char *transaction, int code, const char *msg) {
-    char buffer[PLUGINSD_LINE_MAX + 1];
-    json_escape_string(buffer, msg, PLUGINSD_LINE_MAX);
-
-    pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
-    fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
-    pluginsd_function_result_end_to_stdout();
-}
-
 static void apps_plugin_function_processes_help(const char *transaction) {
     pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600);
     fprintf(stdout, "%s",
@@ -4681,7 +4672,7 @@ static void apps_plugin_function_processes_help(const char *transaction) {
     buffer_json_add_array_item_double(wb, _tmp);                                                                \
 } while(0)
 
-static void apps_plugin_function_processes(const char *transaction, char *function __maybe_unused, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
+static void function_processes(const char *transaction, char *function __maybe_unused, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
     struct pid_stat *p;
 
     char *words[PLUGINSD_MAX_WORDS] = { NULL };
@@ -4702,21 +4693,21 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
         if(!category && strncmp(keyword, PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) {
             category = find_target_by_name(apps_groups_root_target, &keyword[strlen(PROCESS_FILTER_CATEGORY)]);
             if(!category) {
-                apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No category with that name found.");
+                pluginsd_function_json_error(transaction, HTTP_RESP_BAD_REQUEST, "No category with that name found.");
                 return;
             }
         }
         else if(!user && strncmp(keyword, PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) {
             user = find_target_by_name(users_root_target, &keyword[strlen(PROCESS_FILTER_USER)]);
             if(!user) {
-                apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No user with that name found.");
+                pluginsd_function_json_error(transaction, HTTP_RESP_BAD_REQUEST, "No user with that name found.");
                 return;
             }
         }
         else if(strncmp(keyword, PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) {
             group = find_target_by_name(groups_root_target, &keyword[strlen(PROCESS_FILTER_GROUP)]);
             if(!group) {
-                apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No group with that name found.");
+                pluginsd_function_json_error(transaction, HTTP_RESP_BAD_REQUEST, "No group with that name found.");
                 return;
             }
         }
@@ -4742,7 +4733,7 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
         else {
             char msg[PLUGINSD_LINE_MAX];
             snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", keyword);
-            apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, msg);
+            pluginsd_function_json_error(transaction, HTTP_RESP_BAD_REQUEST, msg);
             return;
         }
     }
@@ -4755,7 +4746,7 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
     unsigned int io_divisor = 1024 * RATES_DETAIL;
 
     BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
-    buffer_json_initialize(wb, "\"", "\"", 0, true, false);
+    buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAYS);
     buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
     buffer_json_member_add_string(wb, "type", "table");
     buffer_json_member_add_time_t(wb, "update_every", update_every);
@@ -5232,7 +5223,7 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
                                     RRDF_FIELD_FILTER_RANGE,
                                     RRDF_FIELD_OPTS_VISIBLE, NULL);
         buffer_rrdf_table_add_field(wb, field_id++, "Uptime", "Uptime in seconds", RRDF_FIELD_TYPE_DURATION,
-                                    RRDF_FIELD_VISUAL_BAR, RRDF_FIELD_TRANSFORM_DURATION, 2,
+                                    RRDF_FIELD_VISUAL_BAR, RRDF_FIELD_TRANSFORM_DURATION_S, 2,
                                     "seconds", Uptime_max, RRDF_FIELD_SORT_DESCENDING, NULL, RRDF_FIELD_SUMMARY_MAX,
                                     RRDF_FIELD_FILTER_RANGE,
                                     RRDF_FIELD_OPTS_VISIBLE, NULL);
@@ -5532,9 +5523,9 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
     pluginsd_function_result_end_to_stdout();
 }
 
-bool apps_plugin_exit = false;
+static bool apps_plugin_exit = false;
 
-void *reader_main(void *arg __maybe_unused) {
+static void *reader_main(void *arg __maybe_unused) {
     char buffer[PLUGINSD_LINE_MAX + 1];
 
     char *s = NULL;
@@ -5566,9 +5557,9 @@ void *reader_main(void *arg __maybe_unused) {
                 netdata_mutex_lock(&mutex);
 
                 if(strncmp(function, "processes", strlen("processes")) == 0)
-                    apps_plugin_function_processes(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
+                    function_processes(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
                 else
-                    apps_plugin_function_error(transaction, HTTP_RESP_NOT_FOUND, "No function with this name found in apps.plugin.");
+                    pluginsd_function_json_error(transaction, HTTP_RESP_NOT_FOUND, "No function with this name found in apps.plugin.");
 
                 fflush(stdout);
                 netdata_mutex_unlock(&mutex);
@@ -5696,6 +5687,8 @@ int main(int argc, char **argv) {
     netdata_thread_create(&reader_thread, "APPS_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL);
     netdata_mutex_lock(&mutex);
 
+    APPS_PLUGIN_FUNCTIONS();
+
     usec_t step = update_every * USEC_PER_SEC;
     global_iterations_counter = 1;
     heartbeat_t hb;

+ 1 - 1
collectors/ebpf.plugin/ebpf_functions.c

@@ -206,7 +206,7 @@ static void ebpf_function_thread_manipulation(const char *transaction,
     time_t expires = now_realtime_sec() + em->update_every;
 
     BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
-    buffer_json_initialize(wb, "\"", "\"", 0, true, false);
+    buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAYS);
     buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
     buffer_json_member_add_string(wb, "type", "table");
     buffer_json_member_add_time_t(wb, "update_every", em->update_every);

+ 9 - 2
collectors/plugins.d/plugins_d.h

@@ -99,8 +99,6 @@ void pluginsd_process_thread_cleanup(void *ptr);
 
 size_t pluginsd_initialize_plugin_directories();
 
-
-
 #define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires)      \
     buffer_sprintf(wb                                                                               \
                     , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n"              \
@@ -125,4 +123,13 @@ size_t pluginsd_initialize_plugin_directories();
 #define pluginsd_function_result_end_to_stdout() \
     fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
 
+static inline void pluginsd_function_json_error(const char *transaction, int code, const char *msg) {
+    char buffer[PLUGINSD_LINE_MAX + 1];
+    json_escape_string(buffer, msg, PLUGINSD_LINE_MAX);
+
+    pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
+    fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
+    pluginsd_function_result_end_to_stdout();
+}
+
 #endif /* NETDATA_PLUGINS_D_H */

+ 0 - 0
collectors/systemd-journal.plugin/Makefile.am


+ 0 - 0
collectors/systemd-journal.plugin/README.md


+ 578 - 0
collectors/systemd-journal.plugin/systemd-journal.c

@@ -0,0 +1,578 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+/*
+ *  netdata systemd-journal.plugin
+ *  Copyright (C) 2023 Netdata Inc.
+ *  GPL v3+
+ */
+
+// TODO - 1) MARKDOC
+
+#include "collectors/all.h"
+#include "libnetdata/libnetdata.h"
+#include "libnetdata/required_dummies.h"
+
+#include <systemd/sd-journal.h>
+#include <syslog.h>
+
+#define FACET_MAX_VALUE_LENGTH                  8192
+
+#define SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION    "View, search and analyze systemd journal entries."
+#define SYSTEMD_JOURNAL_FUNCTION_NAME           "systemd-journal"
+#define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT         30
+#define SYSTEMD_JOURNAL_MAX_PARAMS              100
+#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION  (3 * 3600)
+#define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200
+
+#define JOURNAL_PARAMETER_HELP                  "help"
+#define JOURNAL_PARAMETER_AFTER                 "after"
+#define JOURNAL_PARAMETER_BEFORE                "before"
+#define JOURNAL_PARAMETER_ANCHOR                "anchor"
+#define JOURNAL_PARAMETER_LAST                  "last"
+#define JOURNAL_PARAMETER_QUERY                 "query"
+
+#define SYSTEMD_ALWAYS_VISIBLE_KEYS             NULL
+#define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS       NULL
+#define SYSTEMD_KEYS_INCLUDED_IN_FACETS         \
+    "_TRANSPORT"                                \
+    "|SYSLOG_IDENTIFIER"                        \
+    "|SYSLOG_FACILITY"                          \
+    "|PRIORITY"                                 \
+    "|_HOSTNAME"                                \
+    "|_RUNTIME_SCOPE"                           \
+    "|_PID"                                     \
+    "|_UID"                                     \
+    "|_GID"                                     \
+    "|_SYSTEMD_UNIT"                            \
+    "|_SYSTEMD_SLICE"                           \
+    "|_SYSTEMD_USER_SLICE"                      \
+    "|_COMM"                                    \
+    "|_EXE"                                     \
+    "|_SYSTEMD_CGROUP"                          \
+    "|_SYSTEMD_USER_UNIT"                       \
+    "|USER_UNIT"                                \
+    "|UNIT"                                     \
+    ""
+
+static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
+static bool plugin_should_exit = false;
+
+DICTIONARY *uids = NULL;
+DICTIONARY *gids = NULL;
+
+
+// ----------------------------------------------------------------------------
+
+int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t stop_monotonic_ut) {
+    sd_journal *j;
+    int r;
+
+    // Open the system journal for reading
+    r = sd_journal_open(&j, SD_JOURNAL_ALL_NAMESPACES);
+    if (r < 0)
+        return HTTP_RESP_INTERNAL_SERVER_ERROR;
+
+    facets_rows_begin(facets);
+
+    bool timed_out = false;
+    size_t row_counter = 0;
+    sd_journal_seek_realtime_usec(j, before_ut);
+    SD_JOURNAL_FOREACH_BACKWARDS(j) {
+            row_counter++;
+
+            uint64_t msg_ut;
+            sd_journal_get_realtime_usec(j, &msg_ut);
+            if (msg_ut < after_ut)
+                break;
+
+            const void *data;
+            size_t length;
+            SD_JOURNAL_FOREACH_DATA(j, data, length) {
+                const char *key = data;
+                const char *equal = strchr(key, '=');
+                if(unlikely(!equal))
+                    continue;
+
+                const char *value = ++equal;
+                size_t key_length = value - key; // including '\0'
+
+                char key_copy[key_length];
+                memcpy(key_copy, key, key_length - 1);
+                key_copy[key_length - 1] = '\0';
+
+                size_t value_length = length - key_length; // without '\0'
+                facets_add_key_value_length(facets, key_copy, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH);
+            }
+
+            facets_row_finished(facets, msg_ut);
+
+            if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
+                timed_out = true;
+                break;
+            }
+        }
+
+    sd_journal_close(j);
+
+    buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
+    buffer_json_member_add_boolean(wb, "partial", timed_out);
+    buffer_json_member_add_string(wb, "type", "table");
+    buffer_json_member_add_time_t(wb, "update_every", 1);
+    buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
+
+    facets_report(facets, wb);
+
+    buffer_json_member_add_time_t(wb, "expires", now_realtime_sec());
+    buffer_json_finalize(wb);
+
+    return HTTP_RESP_OK;
+}
+
+static void systemd_journal_function_help(const char *transaction) {
+    pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600);
+    fprintf(stdout,
+            "%s / %s\n"
+            "\n"
+            "%s\n"
+            "\n"
+            "The following filters are supported:\n"
+            "\n"
+            "   help\n"
+            "      Shows this help message.\n"
+            "\n"
+            "   before:TIMESTAMP\n"
+            "      Absolute or relative (to now) timestamp in seconds, to start the query.\n"
+            "      The query is always executed from the most recent to the oldest log entry.\n"
+            "      If not given the default is: now.\n"
+            "\n"
+            "   after:TIMESTAMP\n"
+            "      Absolute or relative (to `before`) timestamp in seconds, to end the query.\n"
+            "      If not given, the default is %d.\n"
+            "\n"
+            "   last:ITEMS\n"
+            "      The number of items to return.\n"
+            "      The default is %d.\n"
+            "\n"
+            "   anchor:NUMBER\n"
+            "      The `timestamp` of the item last received, to return log entries after that.\n"
+            "      If not given, the query will return the top `ITEMS` from the most recent.\n"
+            "\n"
+            "   facet_id:value_id1,value_id2,value_id3,...\n"
+            "      Apply filters to the query, based on the facet IDs returned.\n"
+            "      Each `facet_id` can be given once, but multiple `facet_ids` can be given.\n"
+            "\n"
+            "Filters can be combined. Each filter can be given only one time.\n"
+            , program_name
+            , SYSTEMD_JOURNAL_FUNCTION_NAME
+            , SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION
+            , -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION
+            , SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY
+    );
+    pluginsd_function_result_end_to_stdout();
+}
+
+static const char *syslog_facility_to_name(int facility) {
+    switch (facility) {
+        case LOG_FAC(LOG_KERN): return "kern";
+        case LOG_FAC(LOG_USER): return "user";
+        case LOG_FAC(LOG_MAIL): return "mail";
+        case LOG_FAC(LOG_DAEMON): return "daemon";
+        case LOG_FAC(LOG_AUTH): return "auth";
+        case LOG_FAC(LOG_SYSLOG): return "syslog";
+        case LOG_FAC(LOG_LPR): return "lpr";
+        case LOG_FAC(LOG_NEWS): return "news";
+        case LOG_FAC(LOG_UUCP): return "uucp";
+        case LOG_FAC(LOG_CRON): return "cron";
+        case LOG_FAC(LOG_AUTHPRIV): return "authpriv";
+        case LOG_FAC(LOG_FTP): return "ftp";
+        case LOG_FAC(LOG_LOCAL0): return "local0";
+        case LOG_FAC(LOG_LOCAL1): return "local1";
+        case LOG_FAC(LOG_LOCAL2): return "local2";
+        case LOG_FAC(LOG_LOCAL3): return "local3";
+        case LOG_FAC(LOG_LOCAL4): return "local4";
+        case LOG_FAC(LOG_LOCAL5): return "local5";
+        case LOG_FAC(LOG_LOCAL6): return "local6";
+        case LOG_FAC(LOG_LOCAL7): return "local7";
+        default: return NULL;
+    }
+}
+
+static const char *syslog_priority_to_name(int priority) {
+    switch (priority) {
+        case LOG_ALERT: return "alert";
+        case LOG_CRIT: return "critical";
+        case LOG_DEBUG: return "debug";
+        case LOG_EMERG: return "panic";
+        case LOG_ERR: return "error";
+        case LOG_INFO: return "info";
+        case LOG_NOTICE: return "notice";
+        case LOG_WARNING: return "warning";
+        default: return NULL;
+    }
+}
+
+static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) {
+    struct passwd pw, *result;
+    char tmp[1024 + 1];
+
+    if (getpwuid_r(uid, &pw, tmp, 1024, &result) != 0 || result == NULL)
+        return NULL;
+
+    strncpy(buffer, pw.pw_name, buffer_size - 1);
+    buffer[buffer_size - 1] = '\0'; // Null-terminate just in case
+    return buffer;
+}
+
+static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) {
+    struct group grp, *result;
+    char tmp[1024 + 1];
+
+    if (getgrgid_r(gid, &grp, tmp, 1024, &result) != 0 || result == NULL)
+        return NULL;
+
+    strncpy(buffer, grp.gr_name, buffer_size - 1);
+    buffer[buffer_size - 1] = '\0'; // Null-terminate just in case
+    return buffer;
+}
+
+static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
+    const char *v = buffer_tostring(wb);
+    if(*v && isdigit(*v)) {
+        int facility = str2i(buffer_tostring(wb));
+        const char *name = syslog_facility_to_name(facility);
+        if (name) {
+            buffer_flush(wb);
+            buffer_json_add_array_item_string(wb, name);
+        }
+    }
+}
+
+static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
+    const char *v = buffer_tostring(wb);
+    if(*v && isdigit(*v)) {
+        int priority = str2i(buffer_tostring(wb));
+        const char *name = syslog_priority_to_name(priority);
+        if (name) {
+            buffer_flush(wb);
+            buffer_json_add_array_item_string(wb, name);
+        }
+    }
+}
+
+static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
+    DICTIONARY *cache = data;
+    const char *v = buffer_tostring(wb);
+    if(*v && isdigit(*v)) {
+        const char *sv = dictionary_get(cache, v);
+        if(!sv) {
+            char buf[1024 + 1];
+            int uid = str2i(buffer_tostring(wb));
+            const char *name = uid_to_username(uid, buf, 1024);
+            if (!name)
+                name = v;
+
+            sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1);
+        }
+
+        buffer_flush(wb);
+        buffer_strcat(wb, sv);
+    }
+}
+
+static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
+    DICTIONARY *cache = data;
+    const char *v = buffer_tostring(wb);
+    if(*v && isdigit(*v)) {
+        const char *sv = dictionary_get(cache, v);
+        if(!sv) {
+            char buf[1024 + 1];
+            int gid = str2i(buffer_tostring(wb));
+            const char *name = gid_to_groupname(gid, buf, 1024);
+            if (!name)
+                name = v;
+
+            sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1);
+        }
+
+        buffer_flush(wb);
+        buffer_strcat(wb, sv);
+    }
+}
+
+static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *wb, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
+    FACET_ROW_KEY_VALUE *syslog_identifier_rkv = dictionary_get(row->dict, "SYSLOG_IDENTIFIER");
+    FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID");
+
+    const char *identifier = syslog_identifier_rkv ? buffer_tostring(syslog_identifier_rkv->wb) : "UNKNOWN";
+    const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : "UNKNOWN";
+
+    buffer_flush(rkv->wb);
+    buffer_sprintf(rkv->wb, "%s[%s]", identifier, pid);
+
+    buffer_json_add_array_item_string(wb, buffer_tostring(rkv->wb));
+}
+
+static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
+    char *words[SYSTEMD_JOURNAL_MAX_PARAMS] = { NULL };
+    size_t num_words = quoted_strings_splitter_pluginsd(function, words, SYSTEMD_JOURNAL_MAX_PARAMS);
+
+    BUFFER *wb = buffer_create(0, NULL);
+    buffer_flush(wb);
+    buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAYS);
+
+    FACETS *facets = facets_create(50, 0, FACETS_OPTION_ALL_KEYS_FTS,
+                                   SYSTEMD_ALWAYS_VISIBLE_KEYS,
+                                   SYSTEMD_KEYS_INCLUDED_IN_FACETS,
+                                   SYSTEMD_KEYS_EXCLUDED_FROM_FACETS);
+
+    facets_accepted_param(facets, JOURNAL_PARAMETER_AFTER);
+    facets_accepted_param(facets, JOURNAL_PARAMETER_BEFORE);
+    facets_accepted_param(facets, JOURNAL_PARAMETER_ANCHOR);
+    facets_accepted_param(facets, JOURNAL_PARAMETER_LAST);
+    facets_accepted_param(facets, JOURNAL_PARAMETER_QUERY);
+
+    // register the fields in the order you want them on the dashboard
+
+    facets_register_dynamic_key(facets, "ND_JOURNAL_PROCESS", FACET_KEY_OPTION_NO_FACET|FACET_KEY_OPTION_VISIBLE|FACET_KEY_OPTION_FTS,
+                                systemd_journal_dynamic_row_id, NULL);
+
+    facets_register_key(facets, "MESSAGE",
+                        FACET_KEY_OPTION_NO_FACET|FACET_KEY_OPTION_MAIN_TEXT|FACET_KEY_OPTION_VISIBLE|FACET_KEY_OPTION_FTS);
+
+    facets_register_key_transformation(facets, "PRIORITY", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
+                                       systemd_journal_transform_priority, NULL);
+
+    facets_register_key_transformation(facets, "SYSLOG_FACILITY", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
+                                       systemd_journal_transform_syslog_facility, NULL);
+
+    facets_register_key(facets, "SYSLOG_IDENTIFIER", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS);
+    facets_register_key(facets, "UNIT", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS);
+    facets_register_key(facets, "USER_UNIT", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS);
+
+    facets_register_key_transformation(facets, "_UID", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
+                                       systemd_journal_transform_uid, uids);
+
+    facets_register_key_transformation(facets, "_GID", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
+                                       systemd_journal_transform_gid, gids);
+
+    time_t after_s = 0, before_s = 0;
+    usec_t anchor = 0;
+    size_t last = 0;
+    const char *query = NULL;
+
+    buffer_json_member_add_object(wb, "request");
+    buffer_json_member_add_object(wb, "filters");
+
+    for(int i = 1; i < SYSTEMD_JOURNAL_MAX_PARAMS ;i++) {
+        const char *keyword = get_word(words, num_words, i);
+        if(!keyword) break;
+
+        if(strcmp(keyword, JOURNAL_PARAMETER_HELP) == 0) {
+            systemd_journal_function_help(transaction);
+            goto cleanup;
+        }
+        else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", strlen(JOURNAL_PARAMETER_AFTER ":")) == 0) {
+            after_s = str2l(&keyword[strlen(JOURNAL_PARAMETER_AFTER ":")]);
+        }
+        else if(strncmp(keyword, JOURNAL_PARAMETER_BEFORE ":", strlen(JOURNAL_PARAMETER_BEFORE ":")) == 0) {
+            before_s = str2l(&keyword[strlen(JOURNAL_PARAMETER_BEFORE ":")]);
+        }
+        else if(strncmp(keyword, JOURNAL_PARAMETER_ANCHOR ":", strlen(JOURNAL_PARAMETER_ANCHOR ":")) == 0) {
+            anchor = str2ull(&keyword[strlen(JOURNAL_PARAMETER_ANCHOR ":")], NULL);
+        }
+        else if(strncmp(keyword, JOURNAL_PARAMETER_LAST ":", strlen(JOURNAL_PARAMETER_LAST ":")) == 0) {
+            last = str2ul(&keyword[strlen(JOURNAL_PARAMETER_LAST ":")]);
+        }
+        else if(strncmp(keyword, JOURNAL_PARAMETER_QUERY ":", strlen(JOURNAL_PARAMETER_QUERY ":")) == 0) {
+            query= &keyword[strlen(JOURNAL_PARAMETER_QUERY ":")];
+        }
+        else {
+            char *value = strchr(keyword, ':');
+            if(value) {
+                *value++ = '\0';
+
+                buffer_json_member_add_array(wb, keyword);
+
+                while(value) {
+                    char *sep = strchr(value, ',');
+                    if(sep)
+                        *sep++ = '\0';
+
+                    facets_register_facet_filter(facets, keyword, value, FACET_KEY_OPTION_REORDER);
+                    buffer_json_add_array_item_string(wb, value);
+
+                    value = sep;
+                }
+
+                buffer_json_array_close(wb); // keyword
+            }
+        }
+    }
+
+    buffer_json_object_close(wb); // filters
+
+    time_t expires = now_realtime_sec() + 1;
+    time_t now_s;
+
+    if(!after_s && !before_s) {
+        now_s = now_realtime_sec();
+        before_s = now_s;
+        after_s = before_s - SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION;
+    }
+    else
+        rrdr_relative_window_to_absolute(&after_s, &before_s, &now_s, false);
+
+    if(after_s > before_s) {
+        time_t tmp = after_s;
+        after_s = before_s;
+        before_s = tmp;
+    }
+
+    if(after_s == before_s)
+        after_s = before_s - SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION;
+
+    if(!last)
+        last = SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY;
+
+    buffer_json_member_add_time_t(wb, "after", after_s);
+    buffer_json_member_add_time_t(wb, "before", before_s);
+    buffer_json_member_add_uint64(wb, "anchor", anchor);
+    buffer_json_member_add_uint64(wb, "last", last);
+    buffer_json_member_add_string(wb, "query", query);
+    buffer_json_member_add_time_t(wb, "timeout", timeout);
+    buffer_json_object_close(wb); // request
+
+    facets_set_items(facets, last);
+    facets_set_anchor(facets, anchor);
+    facets_set_query(facets, query);
+    int response = systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC,
+                                       now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC);
+
+    if(response != HTTP_RESP_OK) {
+        pluginsd_function_json_error(transaction, response, "failed");
+        goto cleanup;
+    }
+
+    pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires);
+    fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout);
+
+    pluginsd_function_result_end_to_stdout();
+
+cleanup:
+    facets_destroy(facets);
+    buffer_free(wb);
+}
+
+static void *reader_main(void *arg __maybe_unused) {
+    char buffer[PLUGINSD_LINE_MAX + 1];
+
+    char *s = NULL;
+    while(!plugin_should_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
+
+        char *words[PLUGINSD_MAX_WORDS] = { NULL };
+        size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, PLUGINSD_MAX_WORDS);
+
+        const char *keyword = get_word(words, num_words, 0);
+
+        if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+            char *transaction = get_word(words, num_words, 1);
+            char *timeout_s = get_word(words, num_words, 2);
+            char *function = get_word(words, num_words, 3);
+
+            if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
+                netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+                                  keyword,
+                                  transaction?transaction:"(unset)",
+                                  timeout_s?timeout_s:"(unset)",
+                                  function?function:"(unset)");
+            }
+            else {
+                int timeout = str2i(timeout_s);
+                if(timeout <= 0) timeout = SYSTEMD_JOURNAL_DEFAULT_TIMEOUT;
+
+                netdata_mutex_lock(&mutex);
+
+                if(strncmp(function, SYSTEMD_JOURNAL_FUNCTION_NAME, strlen(SYSTEMD_JOURNAL_FUNCTION_NAME)) == 0)
+                    function_systemd_journal(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
+                else
+                    pluginsd_function_json_error(transaction, HTTP_RESP_NOT_FOUND, "No function with this name found in systemd-journal.plugin.");
+
+                fflush(stdout);
+                netdata_mutex_unlock(&mutex);
+            }
+        }
+        else
+            netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
+    }
+
+    if(!s || feof(stdin) || ferror(stdin)) {
+        plugin_should_exit = true;
+        netdata_log_error("Received error on stdin.");
+    }
+
+    exit(1);
+}
+
+int main(int argc __maybe_unused, char **argv __maybe_unused) {
+    stderror = stderr;
+    clocks_init();
+
+    program_name = "systemd-journal.plugin";
+
+    // disable syslog
+    error_log_syslog = 0;
+
+    // set errors flood protection to 100 logs per hour
+    error_log_errors_per_period = 100;
+    error_log_throttle_period = 3600;
+
+    uids = dictionary_create(0);
+    gids = dictionary_create(0);
+
+    // ------------------------------------------------------------------------
+    // debug
+
+    if(argc == 2 && strcmp(argv[1], "debug") == 0) {
+        char buf[] = "systemd-journal after:-86400 before:0 last:500";
+        function_systemd_journal("123", buf, "", 0, 30);
+        exit(1);
+    }
+
+    // ------------------------------------------------------------------------
+
+    netdata_thread_t reader_thread;
+    netdata_thread_create(&reader_thread, "SDJ_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL);
+
+    // ------------------------------------------------------------------------
+
+    time_t started_t = now_monotonic_sec();
+
+    size_t iteration;
+    usec_t step = 1000 * USEC_PER_MS;
+    bool tty = isatty(fileno(stderr)) == 1;
+
+    netdata_mutex_lock(&mutex);
+    fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " \"%s\" %d \"%s\"\n",
+            SYSTEMD_JOURNAL_FUNCTION_NAME, SYSTEMD_JOURNAL_DEFAULT_TIMEOUT, SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
+
+    heartbeat_t hb;
+    heartbeat_init(&hb);
+    for(iteration = 0; 1 ; iteration++) {
+        netdata_mutex_unlock(&mutex);
+        heartbeat_next(&hb, step);
+        netdata_mutex_lock(&mutex);
+
+        if(!tty)
+            fprintf(stdout, "\n");
+
+        fflush(stdout);
+
+        time_t now = now_monotonic_sec();
+        if(now - started_t > 86400)
+            break;
+    }
+
+    dictionary_destroy(uids);
+    dictionary_destroy(gids);
+
+    exit(0);
+}

+ 42 - 0
configure.ac

@@ -69,6 +69,12 @@ AC_ARG_ENABLE(
     ,
     [enable_plugin_freeipmi="detect"]
 )
+AC_ARG_ENABLE(
+    [plugin-systemd-journal],
+    [AS_HELP_STRING([--enable-plugin-systemd-journal], [enable systemd-journal plugin @<:@default autodetect@:>@])],
+    ,
+    [enable_plugin_systemd_journal="detect"]
+)
 AC_ARG_ENABLE(
     [plugin-cups],
     [AS_HELP_STRING([--enable-plugin-cups], [enable cups plugin @<:@default autodetect@:>@])],
@@ -1106,6 +1112,39 @@ AC_MSG_RESULT([${enable_plugin_freeipmi}])
 AM_CONDITIONAL([ENABLE_PLUGIN_FREEIPMI], [test "${enable_plugin_freeipmi}" = "yes"])
 
 
+# -----------------------------------------------------------------------------
+# systemd-journal.plugin - systemd
+
+LIBS_BAK="${LIBS}"
+
+AC_CHECK_LIB([systemd], [sd_journal_open], [have_systemd_libs=yes], [have_systemd_libs=no])
+AC_CHECK_HEADERS([systemd/sd-journal.h], [have_systemd_journal_header=yes], [have_systemd_journal_header=no])
+
+if test "${have_systemd_libs}" = "yes" -a "${have_systemd_journal_header}" = "yes"; then
+    have_systemd="yes"
+else
+    have_systemd="no"
+fi
+
+test "${enable_plugin_systemd_journal}" = "yes" -a "${have_systemd}" != "yes" && \
+    AC_MSG_ERROR([systemd is required but not found. Try installing 'libsystemd-dev' or 'libsystemd-devel'])
+
+AC_MSG_CHECKING([if systemd-journal.plugin should be enabled])
+if test "${enable_plugin_systemd_journal}" != "no" -a "${have_systemd}" = "yes"; then
+    enable_plugin_systemd_journal="yes"
+    AC_DEFINE([HAVE_SYSTEMD], [1], [systemd usability])
+    OPTIONAL_SYSTEMD_CFLAGS="-I/usr/include"
+    OPTIONAL_SYSTEMD_LIBS="-lsystemd"
+else
+    enable_plugin_systemd_journal="no"
+fi
+AC_MSG_RESULT([${enable_plugin_systemd_journal}])
+AM_CONDITIONAL([ENABLE_PLUGIN_SYSTEMD_JOURNAL], [test "${enable_plugin_systemd_journal}" = "yes"])
+
+AC_MSG_NOTICE([OPTIONAL_SYSTEMD_LIBS is set to: ${OPTIONAL_SYSTEMD_LIBS}])
+
+LIBS="${LIBS_BAK}"
+
 # -----------------------------------------------------------------------------
 # cups.plugin - libcups
 
@@ -1874,6 +1913,7 @@ AC_SUBST([OPTIONAL_GTEST_CFLAGS])
 AC_SUBST([OPTIONAL_GTEST_LIBS])
 AC_SUBST([OPTIONAL_ML_CFLAGS])
 AC_SUBST([OPTIONAL_ML_LIBS])
+AC_SUBST(OPTIONAL_SYSTEMD_LIBS)
 
 # -----------------------------------------------------------------------------
 # Check if cmocka is available - needed for unit testing
@@ -1937,6 +1977,7 @@ AC_CONFIG_FILES([
     collectors/tc.plugin/Makefile
     collectors/xenstat.plugin/Makefile
     collectors/perf.plugin/Makefile
+    collectors/systemd-journal.plugin/Makefile
     daemon/Makefile
     database/Makefile
     database/contexts/Makefile
@@ -1968,6 +2009,7 @@ AC_CONFIG_FILES([
     libnetdata/dictionary/Makefile
     libnetdata/ebpf/Makefile
     libnetdata/eval/Makefile
+    libnetdata/facets/Makefile
     libnetdata/july/Makefile
     libnetdata/locks/Makefile
     libnetdata/log/Makefile

Некоторые файлы не были показаны из-за большого количества измененных файлов