|
@@ -79,22 +79,6 @@ int mongodb_init(struct instance *instance)
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-/**
|
|
|
- * Clean a MongoDB connector instance up
|
|
|
- *
|
|
|
- * @param instance an instance data structure.
|
|
|
- */
|
|
|
-void mongodb_cleanup(struct instance *instance)
|
|
|
-{
|
|
|
- struct mongodb_specific_data *connector_specific_data =
|
|
|
- (struct mongodb_specific_data *)instance->connector_specific_data;
|
|
|
-
|
|
|
- mongoc_collection_destroy(connector_specific_data->collection);
|
|
|
- mongoc_client_destroy(connector_specific_data->client);
|
|
|
-
|
|
|
- return;
|
|
|
-}
|
|
|
-
|
|
|
/**
|
|
|
* Initialize a MongoDB connector instance
|
|
|
*
|
|
@@ -237,6 +221,51 @@ int format_batch_mongodb(struct instance *instance)
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * Clean a MongoDB connector instance up
|
|
|
+ *
|
|
|
+ * @param instance an instance data structure.
|
|
|
+ */
|
|
|
+void mongodb_cleanup(struct instance *instance)
|
|
|
+{
|
|
|
+ info("EXPORTING: cleaning up instance %s ...", instance->config.name);
|
|
|
+
|
|
|
+ struct mongodb_specific_data *connector_specific_data =
|
|
|
+ (struct mongodb_specific_data *)instance->connector_specific_data;
|
|
|
+
|
|
|
+ mongoc_collection_destroy(connector_specific_data->collection);
|
|
|
+ mongoc_client_destroy(connector_specific_data->client);
|
|
|
+ if (instance->engine->mongoc_initialized) {
|
|
|
+ mongoc_cleanup();
|
|
|
+ instance->engine->mongoc_initialized = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ buffer_free(instance->buffer);
|
|
|
+
|
|
|
+ struct bson_buffer *next_buffer = connector_specific_data->first_buffer;
|
|
|
+ for (int i = 0; i < instance->config.buffer_on_failures; i++) {
|
|
|
+ struct bson_buffer *current_buffer = next_buffer;
|
|
|
+ next_buffer = next_buffer->next;
|
|
|
+
|
|
|
+ if (current_buffer->insert)
|
|
|
+ free_bson(current_buffer->insert, current_buffer->documents_inserted);
|
|
|
+ freez(current_buffer);
|
|
|
+ }
|
|
|
+
|
|
|
+ freez(connector_specific_data);
|
|
|
+
|
|
|
+ struct mongodb_specific_config *connector_specific_config =
|
|
|
+ (struct mongodb_specific_config *)instance->config.connector_specific_config;
|
|
|
+ freez(connector_specific_config->database);
|
|
|
+ freez(connector_specific_config->collection);
|
|
|
+ freez(connector_specific_config);
|
|
|
+
|
|
|
+ info("EXPORTING: instance %s exited", instance->config.name);
|
|
|
+ instance->exited = 1;
|
|
|
+
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* MongoDB connector worker
|
|
|
*
|
|
@@ -251,12 +280,17 @@ void mongodb_connector_worker(void *instance_p)
|
|
|
struct mongodb_specific_data *connector_specific_data =
|
|
|
(struct mongodb_specific_data *)instance->connector_specific_data;
|
|
|
|
|
|
- while (!netdata_exit) {
|
|
|
+ while (!instance->engine->exit) {
|
|
|
struct stats *stats = &instance->stats;
|
|
|
|
|
|
uv_mutex_lock(&instance->mutex);
|
|
|
uv_cond_wait(&instance->cond_var, &instance->mutex);
|
|
|
|
|
|
+ if (unlikely(instance->engine->exit)) {
|
|
|
+ uv_mutex_unlock(&instance->mutex);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
// reset the monitoring chart counters
|
|
|
stats->received_bytes =
|
|
|
stats->sent_bytes =
|
|
@@ -293,38 +327,37 @@ void mongodb_connector_worker(void *instance_p)
|
|
|
connector_specific_config->collection,
|
|
|
data_size);
|
|
|
|
|
|
- if (unlikely(documents_inserted == 0))
|
|
|
- continue;
|
|
|
-
|
|
|
- bson_error_t bson_error;
|
|
|
- if (likely(mongoc_collection_insert_many(
|
|
|
- connector_specific_data->collection,
|
|
|
- (const bson_t **)insert,
|
|
|
- documents_inserted,
|
|
|
- NULL,
|
|
|
- NULL,
|
|
|
- &bson_error))) {
|
|
|
- stats->sent_metrics = documents_inserted;
|
|
|
- stats->sent_bytes += data_size;
|
|
|
- stats->transmission_successes++;
|
|
|
- stats->receptions++;
|
|
|
- } else {
|
|
|
- // oops! we couldn't send (all or some of the) data
|
|
|
- error("EXPORTING: %s", bson_error.message);
|
|
|
- error(
|
|
|
- "EXPORTING: failed to write data to the database '%s'. "
|
|
|
- "Willing to write %zu bytes, wrote %zu bytes.",
|
|
|
- instance->config.destination, data_size, 0UL);
|
|
|
-
|
|
|
- stats->transmission_failures++;
|
|
|
- stats->data_lost_events++;
|
|
|
- stats->lost_bytes += buffered_bytes;
|
|
|
- stats->lost_metrics += documents_inserted;
|
|
|
+ if (likely(documents_inserted != 0)) {
|
|
|
+ bson_error_t bson_error;
|
|
|
+ if (likely(mongoc_collection_insert_many(
|
|
|
+ connector_specific_data->collection,
|
|
|
+ (const bson_t **)insert,
|
|
|
+ documents_inserted,
|
|
|
+ NULL,
|
|
|
+ NULL,
|
|
|
+ &bson_error))) {
|
|
|
+ stats->sent_metrics = documents_inserted;
|
|
|
+ stats->sent_bytes += data_size;
|
|
|
+ stats->transmission_successes++;
|
|
|
+ stats->receptions++;
|
|
|
+ } else {
|
|
|
+ // oops! we couldn't send (all or some of the) data
|
|
|
+ error("EXPORTING: %s", bson_error.message);
|
|
|
+ error(
|
|
|
+ "EXPORTING: failed to write data to the database '%s'. "
|
|
|
+ "Willing to write %zu bytes, wrote %zu bytes.",
|
|
|
+ instance->config.destination, data_size, 0UL);
|
|
|
+
|
|
|
+ stats->transmission_failures++;
|
|
|
+ stats->data_lost_events++;
|
|
|
+ stats->lost_bytes += buffered_bytes;
|
|
|
+ stats->lost_metrics += documents_inserted;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
free_bson(insert, documents_inserted);
|
|
|
|
|
|
- if (unlikely(netdata_exit))
|
|
|
+ if (unlikely(instance->engine->exit))
|
|
|
break;
|
|
|
|
|
|
uv_mutex_lock(&instance->mutex);
|
|
@@ -341,7 +374,7 @@ void mongodb_connector_worker(void *instance_p)
|
|
|
uv_mutex_unlock(&instance->mutex);
|
|
|
|
|
|
#ifdef UNIT_TESTING
|
|
|
- break;
|
|
|
+ return;
|
|
|
#endif
|
|
|
}
|
|
|
|