aws_kinesis.c 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aws_kinesis.h"
  3. /**
  4. * Clean AWS Kinesis *
  5. */
  6. void aws_kinesis_cleanup(struct instance *instance)
  7. {
  8. netdata_log_info("EXPORTING: cleaning up instance %s ...", instance->config.name);
  9. kinesis_shutdown(instance->connector_specific_data);
  10. freez(instance->connector_specific_data);
  11. struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config;
  12. if (connector_specific_config) {
  13. freez(connector_specific_config->auth_key_id);
  14. freez(connector_specific_config->secure_key);
  15. freez(connector_specific_config->stream_name);
  16. freez(connector_specific_config);
  17. }
  18. netdata_log_info("EXPORTING: instance %s exited", instance->config.name);
  19. instance->exited = 1;
  20. }
  21. /**
  22. * Initialize AWS Kinesis connector instance
  23. *
  24. * @param instance an instance data structure.
  25. * @return Returns 0 on success, 1 on failure.
  26. */
  27. int init_aws_kinesis_instance(struct instance *instance)
  28. {
  29. instance->worker = aws_kinesis_connector_worker;
  30. instance->start_batch_formatting = NULL;
  31. instance->start_host_formatting = format_host_labels_json_plaintext;
  32. instance->start_chart_formatting = NULL;
  33. if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
  34. instance->metric_formatting = format_dimension_collected_json_plaintext;
  35. else
  36. instance->metric_formatting = format_dimension_stored_json_plaintext;
  37. instance->end_chart_formatting = NULL;
  38. instance->variables_formatting = NULL;
  39. instance->end_host_formatting = flush_host_labels;
  40. instance->end_batch_formatting = NULL;
  41. instance->prepare_header = NULL;
  42. instance->check_response = NULL;
  43. instance->buffer = (void *)buffer_create(0, &netdata_buffers_statistics.buffers_exporters);
  44. if (!instance->buffer) {
  45. netdata_log_error("EXPORTING: cannot create buffer for AWS Kinesis exporting connector instance %s",
  46. instance->config.name);
  47. return 1;
  48. }
  49. if (uv_mutex_init(&instance->mutex))
  50. return 1;
  51. if (uv_cond_init(&instance->cond_var))
  52. return 1;
  53. if (!instance->engine->aws_sdk_initialized) {
  54. aws_sdk_init();
  55. instance->engine->aws_sdk_initialized = 1;
  56. }
  57. struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config;
  58. struct aws_kinesis_specific_data *connector_specific_data = callocz(1, sizeof(struct aws_kinesis_specific_data));
  59. instance->connector_specific_data = (void *)connector_specific_data;
  60. if (!strcmp(connector_specific_config->stream_name, "")) {
  61. netdata_log_error("stream name is a mandatory Kinesis parameter but it is not configured");
  62. return 1;
  63. }
  64. kinesis_init(
  65. (void *)connector_specific_data,
  66. instance->config.destination,
  67. connector_specific_config->auth_key_id,
  68. connector_specific_config->secure_key,
  69. instance->config.timeoutms);
  70. return 0;
  71. }
  72. /**
  73. * AWS Kinesis connector worker
  74. *
  75. * Runs in a separate thread for every instance.
  76. *
  77. * @param instance_p an instance data structure.
  78. */
  79. void aws_kinesis_connector_worker(void *instance_p)
  80. {
  81. struct instance *instance = (struct instance *)instance_p;
  82. struct aws_kinesis_specific_config *connector_specific_config = instance->config.connector_specific_config;
  83. struct aws_kinesis_specific_data *connector_specific_data = instance->connector_specific_data;
  84. while (!instance->engine->exit) {
  85. unsigned long long partition_key_seq = 0;
  86. struct stats *stats = &instance->stats;
  87. uv_mutex_lock(&instance->mutex);
  88. while (!instance->data_is_ready)
  89. uv_cond_wait(&instance->cond_var, &instance->mutex);
  90. instance->data_is_ready = 0;
  91. if (unlikely(instance->engine->exit)) {
  92. uv_mutex_unlock(&instance->mutex);
  93. break;
  94. }
  95. // reset the monitoring chart counters
  96. stats->received_bytes =
  97. stats->sent_bytes =
  98. stats->sent_metrics =
  99. stats->lost_metrics =
  100. stats->receptions =
  101. stats->transmission_successes =
  102. stats->transmission_failures =
  103. stats->data_lost_events =
  104. stats->lost_bytes =
  105. stats->reconnects = 0;
  106. BUFFER *buffer = (BUFFER *)instance->buffer;
  107. size_t buffer_len = buffer_strlen(buffer);
  108. stats->buffered_bytes = buffer_len;
  109. size_t sent = 0;
  110. while (sent < buffer_len) {
  111. char partition_key[KINESIS_PARTITION_KEY_MAX + 1];
  112. snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++);
  113. size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX);
  114. const char *first_char = buffer_tostring(buffer) + sent;
  115. size_t record_len = 0;
  116. // split buffer into chunks of maximum allowed size
  117. if (buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) {
  118. record_len = buffer_len - sent;
  119. } else {
  120. record_len = KINESIS_RECORD_MAX - partition_key_len;
  121. while (record_len && *(first_char + record_len - 1) != '\n')
  122. record_len--;
  123. }
  124. char error_message[ERROR_LINE_MAX + 1] = "";
  125. netdata_log_debug(D_EXPORTING,
  126. "EXPORTING: kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, "
  127. "buffer = %zu, record = %zu",
  128. instance->config.destination,
  129. connector_specific_config->auth_key_id,
  130. connector_specific_config->secure_key,
  131. connector_specific_config->stream_name,
  132. partition_key,
  133. buffer_len,
  134. record_len);
  135. kinesis_put_record(
  136. connector_specific_data, connector_specific_config->stream_name, partition_key, first_char, record_len);
  137. sent += record_len;
  138. stats->transmission_successes++;
  139. size_t sent_bytes = 0, lost_bytes = 0;
  140. if (unlikely(kinesis_get_result(
  141. connector_specific_data->request_outcomes, error_message, &sent_bytes, &lost_bytes))) {
  142. // oops! we couldn't send (all or some of the) data
  143. netdata_log_error("EXPORTING: %s", error_message);
  144. netdata_log_error("EXPORTING: failed to write data to external database '%s'. Willing to write %zu bytes, wrote %zu bytes.",
  145. instance->config.destination,
  146. sent_bytes,
  147. sent_bytes - lost_bytes);
  148. stats->transmission_failures++;
  149. stats->data_lost_events++;
  150. stats->lost_bytes += lost_bytes;
  151. // estimate the number of lost metrics
  152. stats->lost_metrics += (collected_number)(
  153. stats->buffered_metrics *
  154. (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1));
  155. break;
  156. } else {
  157. stats->receptions++;
  158. }
  159. if (unlikely(instance->engine->exit))
  160. break;
  161. }
  162. stats->sent_bytes += sent;
  163. if (likely(sent == buffer_len))
  164. stats->sent_metrics = stats->buffered_metrics;
  165. buffer_flush(buffer);
  166. send_internal_metrics(instance);
  167. stats->buffered_metrics = 0;
  168. uv_mutex_unlock(&instance->mutex);
  169. #ifdef UNIT_TESTING
  170. return;
  171. #endif
  172. }
  173. aws_kinesis_cleanup(instance);
  174. }