mongodb.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define EXPORTING_INTERNALS
  3. #include "mongodb.h"
  4. #define CONFIG_FILE_LINE_MAX ((CONFIG_MAX_NAME + CONFIG_MAX_VALUE + 1024) * 2)
  5. /**
  6. * Initialize MongoDB connector specific data, including a ring buffer
  7. *
  8. * @param instance an instance data structure.
  9. * @return Returns 0 on success, 1 on failure.
  10. */
  11. int mongodb_init(struct instance *instance)
  12. {
  13. struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config;
  14. mongoc_uri_t *uri;
  15. bson_error_t bson_error;
  16. if (unlikely(!connector_specific_config->collection || !*connector_specific_config->collection)) {
  17. netdata_log_error("EXPORTING: collection name is a mandatory MongoDB parameter, but it is not configured");
  18. return 1;
  19. }
  20. uri = mongoc_uri_new_with_error(instance->config.destination, &bson_error);
  21. if (unlikely(!uri)) {
  22. netdata_log_error("EXPORTING: failed to parse URI: %s. Error message: %s",
  23. instance->config.destination,
  24. bson_error.message);
  25. return 1;
  26. }
  27. int32_t socket_timeout =
  28. mongoc_uri_get_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, instance->config.timeoutms);
  29. if (!mongoc_uri_set_option_as_int32(uri, MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout)) {
  30. netdata_log_error("EXPORTING: failed to set %s to the value %d", MONGOC_URI_SOCKETTIMEOUTMS, socket_timeout);
  31. return 1;
  32. };
  33. struct mongodb_specific_data *connector_specific_data =
  34. (struct mongodb_specific_data *)instance->connector_specific_data;
  35. connector_specific_data->client = mongoc_client_new_from_uri(uri);
  36. if (unlikely(!connector_specific_data->client)) {
  37. netdata_log_error("EXPORTING: failed to create a new client");
  38. return 1;
  39. }
  40. if (!mongoc_client_set_appname(connector_specific_data->client, "netdata")) {
  41. netdata_log_error("EXPORTING: failed to set client appname");
  42. };
  43. connector_specific_data->collection = mongoc_client_get_collection(
  44. connector_specific_data->client, connector_specific_config->database, connector_specific_config->collection);
  45. mongoc_uri_destroy(uri);
  46. // create a ring buffer
  47. struct bson_buffer *first_buffer = NULL;
  48. if (instance->config.buffer_on_failures < 2)
  49. instance->config.buffer_on_failures = 1;
  50. else
  51. instance->config.buffer_on_failures -= 1;
  52. for (int i = 0; i < instance->config.buffer_on_failures; i++) {
  53. struct bson_buffer *current_buffer = callocz(1, sizeof(struct bson_buffer));
  54. if (!connector_specific_data->first_buffer)
  55. first_buffer = current_buffer;
  56. else
  57. current_buffer->next = connector_specific_data->first_buffer;
  58. connector_specific_data->first_buffer = current_buffer;
  59. }
  60. first_buffer->next = connector_specific_data->first_buffer;
  61. connector_specific_data->last_buffer = connector_specific_data->first_buffer;
  62. return 0;
  63. }
  64. /**
  65. * Initialize a MongoDB connector instance
  66. *
  67. * @param instance an instance data structure.
  68. * @return Returns 0 on success, 1 on failure.
  69. */
  70. int init_mongodb_instance(struct instance *instance)
  71. {
  72. instance->worker = mongodb_connector_worker;
  73. instance->start_batch_formatting = NULL;
  74. instance->start_host_formatting = format_host_labels_json_plaintext;
  75. instance->start_chart_formatting = NULL;
  76. if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
  77. instance->metric_formatting = format_dimension_collected_json_plaintext;
  78. else
  79. instance->metric_formatting = format_dimension_stored_json_plaintext;
  80. instance->end_chart_formatting = NULL;
  81. instance->variables_formatting = NULL;
  82. instance->end_host_formatting = flush_host_labels;
  83. instance->end_batch_formatting = format_batch_mongodb;
  84. instance->prepare_header = NULL;
  85. instance->check_response = NULL;
  86. instance->buffer = (void *)buffer_create(0, &netdata_buffers_statistics.buffers_exporters);
  87. if (!instance->buffer) {
  88. netdata_log_error("EXPORTING: cannot create buffer for MongoDB exporting connector instance %s",
  89. instance->config.name);
  90. return 1;
  91. }
  92. if (uv_mutex_init(&instance->mutex))
  93. return 1;
  94. if (uv_cond_init(&instance->cond_var))
  95. return 1;
  96. struct mongodb_specific_data *connector_specific_data = callocz(1, sizeof(struct mongodb_specific_data));
  97. instance->connector_specific_data = (void *)connector_specific_data;
  98. instance->config.timeoutms =
  99. (instance->config.update_every >= 2) ? (instance->engine->config.update_every * MSEC_PER_SEC - 500) : 1000;
  100. if (!instance->engine->mongoc_initialized) {
  101. mongoc_init();
  102. instance->engine->mongoc_initialized = 1;
  103. }
  104. if (unlikely(mongodb_init(instance))) {
  105. netdata_log_error("EXPORTING: cannot initialize MongoDB exporting connector");
  106. return 1;
  107. }
  108. return 0;
  109. }
  110. /**
  111. * Free an array of BSON structures
  112. *
  113. * @param insert an array of documents.
  114. * @param documents_inserted the number of documents inserted.
  115. */
  116. void free_bson(bson_t **insert, size_t documents_inserted)
  117. {
  118. size_t i;
  119. for (i = 0; i < documents_inserted; i++)
  120. bson_destroy(insert[i]);
  121. freez(insert);
  122. }
  123. /**
  124. * Format a batch for the MongoDB connector
  125. *
  126. * @param instance an instance data structure.
  127. * @return Returns 0 on success, 1 on failure.
  128. */
  129. int format_batch_mongodb(struct instance *instance)
  130. {
  131. struct mongodb_specific_data *connector_specific_data =
  132. (struct mongodb_specific_data *)instance->connector_specific_data;
  133. struct stats *stats = &instance->stats;
  134. bson_t **insert = connector_specific_data->last_buffer->insert;
  135. if (insert) {
  136. // ring buffer is full, reuse the oldest element
  137. connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
  138. free_bson(insert, connector_specific_data->last_buffer->documents_inserted);
  139. connector_specific_data->total_documents_inserted -= connector_specific_data->last_buffer->documents_inserted;
  140. stats->buffered_bytes -= connector_specific_data->last_buffer->buffered_bytes;
  141. }
  142. insert = callocz((size_t)stats->buffered_metrics, sizeof(bson_t *));
  143. connector_specific_data->last_buffer->insert = insert;
  144. BUFFER *buffer = (BUFFER *)instance->buffer;
  145. char *start = (char *)buffer_tostring(buffer);
  146. char *end = start;
  147. size_t documents_inserted = 0;
  148. while (*end && documents_inserted <= (size_t)stats->buffered_metrics) {
  149. while (*end && *end != '\n')
  150. end++;
  151. if (likely(*end)) {
  152. *end = '\0';
  153. end++;
  154. } else {
  155. break;
  156. }
  157. bson_error_t bson_error;
  158. insert[documents_inserted] = bson_new_from_json((const uint8_t *)start, -1, &bson_error);
  159. if (unlikely(!insert[documents_inserted])) {
  160. netdata_log_error(
  161. "EXPORTING: Failed creating a BSON document from a JSON string \"%s\" : %s", start, bson_error.message);
  162. free_bson(insert, documents_inserted);
  163. return 1;
  164. }
  165. start = end;
  166. documents_inserted++;
  167. }
  168. stats->buffered_bytes += connector_specific_data->last_buffer->buffered_bytes = buffer_strlen(buffer);
  169. buffer_flush(buffer);
  170. // The stats->buffered_metrics is used in the MongoDB batch formatting as a variable for the number
  171. // of metrics, added in the current iteration, so we are clearing it here. We will use the
  172. // connector_specific_data->total_documents_inserted in the worker to show the statistics.
  173. stats->buffered_metrics = 0;
  174. connector_specific_data->total_documents_inserted += documents_inserted;
  175. connector_specific_data->last_buffer->documents_inserted = documents_inserted;
  176. connector_specific_data->last_buffer = connector_specific_data->last_buffer->next;
  177. return 0;
  178. }
  179. /**
  180. * Clean a MongoDB connector instance up
  181. *
  182. * @param instance an instance data structure.
  183. */
  184. void mongodb_cleanup(struct instance *instance)
  185. {
  186. netdata_log_info("EXPORTING: cleaning up instance %s ...", instance->config.name);
  187. struct mongodb_specific_data *connector_specific_data =
  188. (struct mongodb_specific_data *)instance->connector_specific_data;
  189. mongoc_collection_destroy(connector_specific_data->collection);
  190. mongoc_client_destroy(connector_specific_data->client);
  191. if (instance->engine->mongoc_initialized) {
  192. mongoc_cleanup();
  193. instance->engine->mongoc_initialized = 0;
  194. }
  195. buffer_free(instance->buffer);
  196. struct bson_buffer *next_buffer = connector_specific_data->first_buffer;
  197. for (int i = 0; i < instance->config.buffer_on_failures; i++) {
  198. struct bson_buffer *current_buffer = next_buffer;
  199. next_buffer = next_buffer->next;
  200. if (current_buffer->insert)
  201. free_bson(current_buffer->insert, current_buffer->documents_inserted);
  202. freez(current_buffer);
  203. }
  204. freez(connector_specific_data);
  205. struct mongodb_specific_config *connector_specific_config =
  206. (struct mongodb_specific_config *)instance->config.connector_specific_config;
  207. freez(connector_specific_config->database);
  208. freez(connector_specific_config->collection);
  209. freez(connector_specific_config);
  210. netdata_log_info("EXPORTING: instance %s exited", instance->config.name);
  211. instance->exited = 1;
  212. return;
  213. }
  214. /**
  215. * MongoDB connector worker
  216. *
  217. * Runs in a separate thread for every instance.
  218. *
  219. * @param instance_p an instance data structure.
  220. */
  221. void mongodb_connector_worker(void *instance_p)
  222. {
  223. struct instance *instance = (struct instance *)instance_p;
  224. #ifdef NETDATA_INTERNAL_CHECKS
  225. struct mongodb_specific_config *connector_specific_config = instance->config.connector_specific_config;
  226. #endif
  227. struct mongodb_specific_data *connector_specific_data =
  228. (struct mongodb_specific_data *)instance->connector_specific_data;
  229. while (!instance->engine->exit) {
  230. struct stats *stats = &instance->stats;
  231. uv_mutex_lock(&instance->mutex);
  232. if (!connector_specific_data->first_buffer->insert ||
  233. !connector_specific_data->first_buffer->documents_inserted) {
  234. while (!instance->data_is_ready)
  235. uv_cond_wait(&instance->cond_var, &instance->mutex);
  236. instance->data_is_ready = 0;
  237. }
  238. if (unlikely(instance->engine->exit)) {
  239. uv_mutex_unlock(&instance->mutex);
  240. break;
  241. }
  242. // reset the monitoring chart counters
  243. stats->received_bytes =
  244. stats->sent_bytes =
  245. stats->sent_metrics =
  246. stats->lost_metrics =
  247. stats->receptions =
  248. stats->transmission_successes =
  249. stats->transmission_failures =
  250. stats->data_lost_events =
  251. stats->lost_bytes =
  252. stats->reconnects = 0;
  253. bson_t **insert = connector_specific_data->first_buffer->insert;
  254. size_t documents_inserted = connector_specific_data->first_buffer->documents_inserted;
  255. size_t buffered_bytes = connector_specific_data->first_buffer->buffered_bytes;
  256. connector_specific_data->first_buffer->insert = NULL;
  257. connector_specific_data->first_buffer->documents_inserted = 0;
  258. connector_specific_data->first_buffer->buffered_bytes = 0;
  259. connector_specific_data->first_buffer = connector_specific_data->first_buffer->next;
  260. uv_mutex_unlock(&instance->mutex);
  261. size_t data_size = 0;
  262. for (size_t i = 0; i < documents_inserted; i++) {
  263. data_size += insert[i]->len;
  264. }
  265. netdata_log_debug(
  266. D_EXPORTING,
  267. "EXPORTING: mongodb_insert(): destination = %s, database = %s, collection = %s, data size = %zu",
  268. instance->config.destination,
  269. connector_specific_config->database,
  270. connector_specific_config->collection,
  271. data_size);
  272. if (likely(documents_inserted != 0)) {
  273. bson_error_t bson_error;
  274. if (likely(mongoc_collection_insert_many(
  275. connector_specific_data->collection,
  276. (const bson_t **)insert,
  277. documents_inserted,
  278. NULL,
  279. NULL,
  280. &bson_error))) {
  281. stats->sent_metrics = documents_inserted;
  282. stats->sent_bytes += data_size;
  283. stats->transmission_successes++;
  284. stats->receptions++;
  285. } else {
  286. // oops! we couldn't send (all or some of the) data
  287. netdata_log_error("EXPORTING: %s", bson_error.message);
  288. netdata_log_error(
  289. "EXPORTING: failed to write data to the database '%s'. "
  290. "Willing to write %zu bytes, wrote %zu bytes.",
  291. instance->config.destination, data_size, 0UL);
  292. stats->transmission_failures++;
  293. stats->data_lost_events++;
  294. stats->lost_bytes += buffered_bytes;
  295. stats->lost_metrics += documents_inserted;
  296. }
  297. }
  298. free_bson(insert, documents_inserted);
  299. if (unlikely(instance->engine->exit))
  300. break;
  301. uv_mutex_lock(&instance->mutex);
  302. stats->buffered_metrics = connector_specific_data->total_documents_inserted;
  303. send_internal_metrics(instance);
  304. connector_specific_data->total_documents_inserted -= documents_inserted;
  305. stats->buffered_metrics = 0;
  306. stats->buffered_bytes -= buffered_bytes;
  307. uv_mutex_unlock(&instance->mutex);
  308. #ifdef UNIT_TESTING
  309. return;
  310. #endif
  311. }
  312. mongodb_cleanup(instance);
  313. }