|
@@ -0,0 +1,155 @@
|
|
|
+// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
+
|
|
|
+#include "pubsub.h"
|
|
|
+
|
|
|
+/**
|
|
|
+ * Initialize Pub/Sub connector instance
|
|
|
+ *
|
|
|
+ * @param instance an instance data structure.
|
|
|
+ * @return Returns 0 on success, 1 on failure.
|
|
|
+ */
|
|
|
+int init_pubsub_instance(struct instance *instance)
|
|
|
+{
|
|
|
+ instance->worker = pubsub_connector_worker;
|
|
|
+
|
|
|
+ instance->start_batch_formatting = NULL;
|
|
|
+ instance->start_host_formatting = format_host_labels_json_plaintext;
|
|
|
+ instance->start_chart_formatting = NULL;
|
|
|
+
|
|
|
+
|
|
|
+ if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
|
|
|
+ instance->metric_formatting = format_dimension_collected_json_plaintext;
|
|
|
+ else
|
|
|
+ instance->metric_formatting = format_dimension_stored_json_plaintext;
|
|
|
+
|
|
|
+ instance->end_chart_formatting = NULL;
|
|
|
+ instance->end_host_formatting = flush_host_labels;
|
|
|
+ instance->end_batch_formatting = NULL;
|
|
|
+
|
|
|
+ instance->send_header = NULL;
|
|
|
+ instance->check_response = NULL;
|
|
|
+
|
|
|
+ instance->buffer = (void *)buffer_create(0);
|
|
|
+ if (!instance->buffer) {
|
|
|
+ error("EXPORTING: cannot create buffer for Pub/Sub exporting connector instance %s", instance->config.name);
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+ uv_mutex_init(&instance->mutex);
|
|
|
+ uv_cond_init(&instance->cond_var);
|
|
|
+
|
|
|
+ struct pubsub_specific_data *connector_specific_data = callocz(1, sizeof(struct pubsub_specific_data));
|
|
|
+ instance->connector_specific_data = (void *)connector_specific_data;
|
|
|
+
|
|
|
+ struct pubsub_specific_config *connector_specific_config =
|
|
|
+ (struct pubsub_specific_config *)instance->config.connector_specific_config;
|
|
|
+ char error_message[ERROR_LINE_MAX + 1] = "";
|
|
|
+ if (pubsub_init(
|
|
|
+ (void *)connector_specific_data, error_message, instance->config.destination,
|
|
|
+ connector_specific_config->credentials_file, connector_specific_config->project_id,
|
|
|
+ connector_specific_config->topic_id)) {
|
|
|
+ error(
|
|
|
+ "EXPORTING: Cannot initialize a Pub/Sub publisher for instance %s: %s",
|
|
|
+ instance->config.name, error_message);
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Pub/Sub connector worker
|
|
|
+ *
|
|
|
+ * Runs in a separate thread for every instance.
|
|
|
+ *
|
|
|
+ * @param instance_p an instance data structure.
|
|
|
+ */
|
|
|
+void pubsub_connector_worker(void *instance_p)
|
|
|
+{
|
|
|
+ struct instance *instance = (struct instance *)instance_p;
|
|
|
+ struct pubsub_specific_config *connector_specific_config = instance->config.connector_specific_config;
|
|
|
+ struct pubsub_specific_data *connector_specific_data = instance->connector_specific_data;
|
|
|
+
|
|
|
+ while (!netdata_exit) {
|
|
|
+ struct stats *stats = &instance->stats;
|
|
|
+ char error_message[ERROR_LINE_MAX + 1] = "";
|
|
|
+
|
|
|
+ uv_mutex_lock(&instance->mutex);
|
|
|
+ uv_cond_wait(&instance->cond_var, &instance->mutex);
|
|
|
+
|
|
|
+ // reset the monitoring chart counters
|
|
|
+ stats->received_bytes =
|
|
|
+ stats->sent_bytes =
|
|
|
+ stats->sent_metrics =
|
|
|
+ stats->lost_metrics =
|
|
|
+ stats->receptions =
|
|
|
+ stats->transmission_successes =
|
|
|
+ stats->transmission_failures =
|
|
|
+ stats->data_lost_events =
|
|
|
+ stats->lost_bytes =
|
|
|
+ stats->reconnects = 0;
|
|
|
+
|
|
|
+ BUFFER *buffer = (BUFFER *)instance->buffer;
|
|
|
+ size_t buffer_len = buffer_strlen(buffer);
|
|
|
+
|
|
|
+ stats->buffered_bytes = buffer_len;
|
|
|
+
|
|
|
+ if (pubsub_add_message(instance->connector_specific_data, (char *)buffer_tostring(buffer))) {
|
|
|
+ error("EXPORTING: Instance %s: Cannot add data to a message", instance->config.name);
|
|
|
+
|
|
|
+ stats->data_lost_events++;
|
|
|
+ stats->lost_metrics += stats->buffered_metrics;
|
|
|
+ stats->lost_bytes += buffer_len;
|
|
|
+
|
|
|
+ goto cleanup;
|
|
|
+ }
|
|
|
+
|
|
|
+ debug(
|
|
|
+ D_BACKEND, "EXPORTING: pubsub_publish(): project = %s, topic = %s, buffer = %zu",
|
|
|
+ connector_specific_config->project_id, connector_specific_config->topic_id, buffer_len);
|
|
|
+
|
|
|
+ if (pubsub_publish((void *)connector_specific_data, error_message, stats->buffered_metrics, buffer_len)) {
|
|
|
+ error("EXPORTING: Instance: %s: Cannot publish a message: %s", instance->config.name, error_message);
|
|
|
+
|
|
|
+ stats->transmission_failures++;
|
|
|
+ stats->data_lost_events++;
|
|
|
+ stats->lost_metrics += stats->buffered_metrics;
|
|
|
+ stats->lost_bytes += buffer_len;
|
|
|
+
|
|
|
+ goto cleanup;
|
|
|
+ }
|
|
|
+
|
|
|
+ stats->sent_bytes = buffer_len;
|
|
|
+ stats->transmission_successes++;
|
|
|
+
|
|
|
+ size_t sent_metrics = 0, lost_metrics = 0, sent_bytes = 0, lost_bytes = 0;
|
|
|
+
|
|
|
+ if (unlikely(pubsub_get_result(
|
|
|
+ connector_specific_data, error_message, &sent_metrics, &sent_bytes, &lost_metrics, &lost_bytes))) {
|
|
|
+ // oops! we couldn't send (all or some of the) data
|
|
|
+ error("EXPORTING: %s", error_message);
|
|
|
+ error(
|
|
|
+ "EXPORTING: failed to write data to service '%s'. Willing to write %zu bytes, wrote %zu bytes.",
|
|
|
+ instance->config.destination, lost_bytes, sent_bytes);
|
|
|
+
|
|
|
+ stats->transmission_failures++;
|
|
|
+ stats->data_lost_events++;
|
|
|
+ stats->lost_metrics += lost_metrics;
|
|
|
+ stats->lost_bytes += lost_bytes;
|
|
|
+ } else {
|
|
|
+ stats->receptions++;
|
|
|
+ stats->sent_metrics = sent_metrics;
|
|
|
+ }
|
|
|
+
|
|
|
+ cleanup:
|
|
|
+ send_internal_metrics(instance);
|
|
|
+
|
|
|
+ buffer_flush(buffer);
|
|
|
+ stats->buffered_metrics = 0;
|
|
|
+
|
|
|
+ uv_mutex_unlock(&instance->mutex);
|
|
|
+
|
|
|
+#ifdef UNIT_TESTING
|
|
|
+ break;
|
|
|
+#endif
|
|
|
+ }
|
|
|
+}
|