123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #define EXPORTING_INTERNALS
- #include "mongodb.h"
- #define CONFIG_FILE_LINE_MAX ((CONFIG_MAX_NAME + CONFIG_MAX_VALUE + 1024) * 2)
- /**
- * Initialize MongoDB connector specific data, including a ring buffer
- *
- * @param instance an instance data structure.
- * @return Returns 0 on success, 1 on failure.
- */
- int mongodb_init(struct instance *instance)
- {
- struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config;
- mongoc_uri_t *uri;
- bson_error_t bson_error;
- if (unlikely(!connector_specific_config->collection || !*connector_specific_config->collection)) {
- error("EXPORTING: collection name is a mandatory MongoDB parameter, but it is not configured");
- return 1;
- }
- uri = mongoc_uri_new_with_error(instance->config.destination, &bson_error);
- if (unlikely(!uri)) {
- error(
- "EXPORTING: failed to parse URI: %s. Error message: %s", instance->config.destination, bson_error.message);
- return 1;
- }
- int32_t socket_timeout =
- mongoc_uri_get_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, instance->config.timeoutms);
- if (!mongoc_uri_set_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout)) {
- error("EXPORTING: failed to set %s to the value %d", MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout);
- return 1;
- };
- struct mongodb_specific_data *connector_specific_data =
- (struct mongodb_specific_data *)instance->connector_specific_data;
- connector_specific_data->client = mongoc_client_new_from_uri(uri);
- if (unlikely(!connector_specific_data->client)) {
- error("EXPORTING: failed to create a new client");
- return 1;
- }
- if (!mongoc_client_set_appname(connector_specific_data->client, "netdata")) {
- error("EXPORTING: failed to set client appname");
- };
- connector_specific_data->collection = mongoc_client_get_collection(
- connector_specific_data->client, connector_specific_config->database, connector_specific_config->collection);
- mongoc_uri_destroy(uri);
- // create a ring buffer
- struct bson_buffer *first_buffer = NULL;
- if (instance->config.buffer_on_failures < 2)
- instance->config.buffer_on_failures = 1;
- else
- instance->config.buffer_on_failures -= 1;
- for (int i = 0; i < instance->config.buffer_on_failures; i++) {
- struct bson_buffer *current_buffer = callocz(1, sizeof(struct bson_buffer));
- if (!connector_specific_data->first_buffer)
- first_buffer = current_buffer;
- else
- current_buffer->next = connector_specific_data->first_buffer;
- connector_specific_data->first_buffer = current_buffer;
- }
- first_buffer->next = connector_specific_data->first_buffer;
- connector_specific_data->last_buffer = connector_specific_data->first_buffer;
- return 0;
- }
- /**
- * Initialize a MongoDB connector instance
- *
- * @param instance an instance data structure.
- * @return Returns 0 on success, 1 on failure.
- */
- int init_mongodb_instance(struct instance *instance)
- {
- instance->worker = mongodb_connector_worker;
- instance->start_batch_formatting = NULL;
- instance->start_host_formatting = format_host_labels_json_plaintext;
- instance->start_chart_formatting = NULL;
- if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
- instance->metric_formatting = format_dimension_collected_json_plaintext;
- else
- instance->metric_formatting = format_dimension_stored_json_plaintext;
- instance->end_chart_formatting = NULL;
- instance->variables_formatting = NULL;
- instance->end_host_formatting = flush_host_labels;
- instance->end_batch_formatting = format_batch_mongodb;
- instance->prepare_header = NULL;
- instance->check_response = NULL;
- instance->buffer = (void *)buffer_create(0);
- if (!instance->buffer) {
- error("EXPORTING: cannot create buffer for MongoDB exporting connector instance %s", instance->config.name);
- return 1;
- }
- if (uv_mutex_init(&instance->mutex))
- return 1;
- if (uv_cond_init(&instance->cond_var))
- return 1;
- struct mongodb_specific_data *connector_specific_data = callocz(1, sizeof(struct mongodb_specific_data));
- instance->connector_specific_data = (void *)connector_specific_data;
- instance->config.timeoutms =
- (instance->config.update_every >= 2) ? (instance->engine->config.update_every * MSEC_PER_SEC - 500) : 1000;
- if (!instance->engine->mongoc_initialized) {
- mongoc_init();
- instance->engine->mongoc_initialized = 1;
- }
- if (unlikely(mongodb_init(instance))) {
- error("EXPORTING: cannot initialize MongoDB exporting connector");
- return 1;
- }
- return 0;
- }
- /**
- * Free an array of BSON structures
- *
- * @param insert an array of documents.
- * @param documents_inserted the number of documents inserted.
- */
- void free_bson(bson_t **insert, size_t documents_inserted)
- {
- size_t i;
- for (i = 0; i < documents_inserted; i++)
- bson_destroy(insert[i]);
- freez(insert);
- }
- /**
- * Format a batch for the MongoDB connector
- *
- * @param instance an instance data structure.
- * @return Returns 0 on success, 1 on failure.
- */
- int format_batch_mongodb(struct instance *instance)
- {
- struct mongodb_specific_data *connector_specific_data =
- (struct mongodb_specific_data *)instance->connector_specific_data;
- struct stats *stats = &instance->stats;
- bson_t **insert = connector_specific_data->last_buffer->insert;
- if (insert) {
- // ring buffer is full, reuse the oldest element
- connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
- free_bson(insert, connector_specific_data->last_buffer->documents_inserted);
- connector_specific_data->total_documents_inserted -= connector_specific_data->last_buffer->documents_inserted;
- stats->buffered_bytes -= connector_specific_data->last_buffer->buffered_bytes;
- }
- insert = callocz((size_t)stats->buffered_metrics, sizeof(bson_t *));
- connector_specific_data->last_buffer->insert = insert;
- BUFFER *buffer = (BUFFER *)instance->buffer;
- char *start = (char *)buffer_tostring(buffer);
- char *end = start;
- size_t documents_inserted = 0;
- while (*end && documents_inserted <= (size_t)stats->buffered_metrics) {
- while (*end && *end != '\n')
- end++;
- if (likely(*end)) {
- *end = '\0';
- end++;
- } else {
- break;
- }
- bson_error_t bson_error;
- insert[documents_inserted] = bson_new_from_json((const uint8_t *)start, -1, &bson_error);
- if (unlikely(!insert[documents_inserted])) {
- error(
- "EXPORTING: Failed creating a BSON document from a JSON string \"%s\" : %s", start, bson_error.message);
- free_bson(insert, documents_inserted);
- return 1;
- }
- start = end;
- documents_inserted++;
- }
- stats->buffered_bytes += connector_specific_data->last_buffer->buffered_bytes = buffer_strlen(buffer);
- buffer_flush(buffer);
- // The stats->buffered_metrics is used in the MongoDB batch formatting as a variable for the number
- // of metrics, added in the current iteration, so we are clearing it here. We will use the
- // connector_specific_data->total_documents_inserted in the worker to show the statistics.
- stats->buffered_metrics = 0;
- connector_specific_data->total_documents_inserted += documents_inserted;
- connector_specific_data->last_buffer->documents_inserted = documents_inserted;
- connector_specific_data->last_buffer = connector_specific_data->last_buffer->next;
- return 0;
- }
- /**
- * Clean a MongoDB connector instance up
- *
- * @param instance an instance data structure.
- */
- void mongodb_cleanup(struct instance *instance)
- {
- info("EXPORTING: cleaning up instance %s ...", instance->config.name);
- struct mongodb_specific_data *connector_specific_data =
- (struct mongodb_specific_data *)instance->connector_specific_data;
- mongoc_collection_destroy(connector_specific_data->collection);
- mongoc_client_destroy(connector_specific_data->client);
- if (instance->engine->mongoc_initialized) {
- mongoc_cleanup();
- instance->engine->mongoc_initialized = 0;
- }
- buffer_free(instance->buffer);
- struct bson_buffer *next_buffer = connector_specific_data->first_buffer;
- for (int i = 0; i < instance->config.buffer_on_failures; i++) {
- struct bson_buffer *current_buffer = next_buffer;
- next_buffer = next_buffer->next;
- if (current_buffer->insert)
- free_bson(current_buffer->insert, current_buffer->documents_inserted);
- freez(current_buffer);
- }
- freez(connector_specific_data);
- struct mongodb_specific_config *connector_specific_config =
- (struct mongodb_specific_config *)instance->config.connector_specific_config;
- freez(connector_specific_config->database);
- freez(connector_specific_config->collection);
- freez(connector_specific_config);
- info("EXPORTING: instance %s exited", instance->config.name);
- instance->exited = 1;
- return;
- }
- /**
- * MongoDB connector worker
- *
- * Runs in a separate thread for every instance.
- *
- * @param instance_p an instance data structure.
- */
- void mongodb_connector_worker(void *instance_p)
- {
- struct instance *instance = (struct instance *)instance_p;
- #ifdef NETDATA_INTERNAL_CHECKS
- struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config;
- #endif
- struct mongodb_specific_data *connector_specific_data =
- (struct mongodb_specific_data *)instance->connector_specific_data;
- while (!instance->engine->exit) {
- struct stats *stats = &instance->stats;
- uv_mutex_lock(&instance->mutex);
- if (!connector_specific_data->first_buffer->insert ||
- !connector_specific_data->first_buffer->documents_inserted) {
- while (!instance->data_is_ready)
- uv_cond_wait(&instance->cond_var, &instance->mutex);
- instance->data_is_ready = 0;
- }
- if (unlikely(instance->engine->exit)) {
- uv_mutex_unlock(&instance->mutex);
- break;
- }
- // reset the monitoring chart counters
- stats->received_bytes =
- stats->sent_bytes =
- stats->sent_metrics =
- stats->lost_metrics =
- stats->receptions =
- stats->transmission_successes =
- stats->transmission_failures =
- stats->data_lost_events =
- stats->lost_bytes =
- stats->reconnects = 0;
- bson_t **insert = connector_specific_data->first_buffer->insert;
- size_t documents_inserted = connector_specific_data->first_buffer->documents_inserted;
- size_t buffered_bytes = connector_specific_data->first_buffer->buffered_bytes;
- connector_specific_data->first_buffer->insert = NULL;
- connector_specific_data->first_buffer->documents_inserted = 0;
- connector_specific_data->first_buffer->buffered_bytes = 0;
- connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
- uv_mutex_unlock(&instance->mutex);
- size_t data_size = 0;
- for (size_t i = 0; i < documents_inserted; i++) {
- data_size += insert[i]->len;
- }
- debug(
- D_EXPORTING,
- "EXPORTING: mongodb_insert(): destination = %s, database = %s, collection = %s, data size = %zu",
- instance->config.destination,
- connector_specific_config->database,
- connector_specific_config->collection,
- data_size);
- if (likely(documents_inserted != 0)) {
- bson_error_t bson_error;
- if (likely(mongoc_collection_insert_many(
- connector_specific_data->collection,
- (const bson_t **)insert,
- documents_inserted,
- NULL,
- NULL,
- &bson_error))) {
- stats->sent_metrics = documents_inserted;
- stats->sent_bytes += data_size;
- stats->transmission_successes++;
- stats->receptions++;
- } else {
- // oops! we couldn't send (all or some of the) data
- error("EXPORTING: %s", bson_error.message);
- error(
- "EXPORTING: failed to write data to the database '%s'. "
- "Willing to write %zu bytes, wrote %zu bytes.",
- instance->config.destination, data_size, 0UL);
- stats->transmission_failures++;
- stats->data_lost_events++;
- stats->lost_bytes += buffered_bytes;
- stats->lost_metrics += documents_inserted;
- }
- }
- free_bson(insert, documents_inserted);
- if (unlikely(instance->engine->exit))
- break;
- uv_mutex_lock(&instance->mutex);
- stats->buffered_metrics = connector_specific_data->total_documents_inserted;
- send_internal_metrics(instance);
- connector_specific_data->total_documents_inserted -= documents_inserted;
- stats->buffered_metrics = 0;
- stats->buffered_bytes -= buffered_bytes;
- uv_mutex_unlock(&instance->mutex);
- #ifdef UNIT_TESTING
- return;
- #endif
- }
- mongodb_cleanup(instance);
- }
|