pubsub.c 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "pubsub.h"
  3. /**
  4. * Initialize Pub/Sub connector instance
  5. *
  6. * @param instance an instance data structure.
  7. * @return Returns 0 on success, 1 on failure.
  8. */
  9. int init_pubsub_instance(struct instance *instance)
  10. {
  11. instance->worker = pubsub_connector_worker;
  12. instance->start_batch_formatting = NULL;
  13. instance->start_host_formatting = format_host_labels_json_plaintext;
  14. instance->start_chart_formatting = NULL;
  15. if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
  16. instance->metric_formatting = format_dimension_collected_json_plaintext;
  17. else
  18. instance->metric_formatting = format_dimension_stored_json_plaintext;
  19. instance->end_chart_formatting = NULL;
  20. instance->end_host_formatting = flush_host_labels;
  21. instance->end_batch_formatting = NULL;
  22. instance->prepare_header = NULL;
  23. instance->check_response = NULL;
  24. instance->buffer = (void *)buffer_create(0);
  25. if (!instance->buffer) {
  26. error("EXPORTING: cannot create buffer for Pub/Sub exporting connector instance %s", instance->config.name);
  27. return 1;
  28. }
  29. uv_mutex_init(&instance->mutex);
  30. uv_cond_init(&instance->cond_var);
  31. struct pubsub_specific_data *connector_specific_data = callocz(1, sizeof(struct pubsub_specific_data));
  32. instance->connector_specific_data = (void *)connector_specific_data;
  33. struct pubsub_specific_config *connector_specific_config =
  34. (struct pubsub_specific_config *)instance->config.connector_specific_config;
  35. char error_message[ERROR_LINE_MAX + 1] = "";
  36. if (pubsub_init(
  37. (void *)connector_specific_data, error_message, instance->config.destination,
  38. connector_specific_config->credentials_file, connector_specific_config->project_id,
  39. connector_specific_config->topic_id)) {
  40. error(
  41. "EXPORTING: Cannot initialize a Pub/Sub publisher for instance %s: %s",
  42. instance->config.name, error_message);
  43. return 1;
  44. }
  45. return 0;
  46. }
  47. /**
  48. * Clean a PubSub connector instance
  49. *
  50. * @param instance an instance data structure.
  51. */
  52. void clean_pubsub_instance(struct instance *instance)
  53. {
  54. info("EXPORTING: cleaning up instance %s ...", instance->config.name);
  55. struct pubsub_specific_data *connector_specific_data =
  56. (struct pubsub_specific_data *)instance->connector_specific_data;
  57. pubsub_cleanup(connector_specific_data);
  58. freez(connector_specific_data);
  59. buffer_free(instance->buffer);
  60. struct pubsub_specific_config *connector_specific_config =
  61. (struct pubsub_specific_config *)instance->config.connector_specific_config;
  62. freez(connector_specific_config->credentials_file);
  63. freez(connector_specific_config->project_id);
  64. freez(connector_specific_config->topic_id);
  65. freez(connector_specific_config);
  66. info("EXPORTING: instance %s exited", instance->config.name);
  67. instance->exited = 1;
  68. return;
  69. }
  70. /**
  71. * Pub/Sub connector worker
  72. *
  73. * Runs in a separate thread for every instance.
  74. *
  75. * @param instance_p an instance data structure.
  76. */
  77. void pubsub_connector_worker(void *instance_p)
  78. {
  79. struct instance *instance = (struct instance *)instance_p;
  80. struct pubsub_specific_config *connector_specific_config = instance->config.connector_specific_config;
  81. struct pubsub_specific_data *connector_specific_data = instance->connector_specific_data;
  82. while (!instance->engine->exit) {
  83. struct stats *stats = &instance->stats;
  84. char error_message[ERROR_LINE_MAX + 1] = "";
  85. uv_mutex_lock(&instance->mutex);
  86. while (!instance->data_is_ready)
  87. uv_cond_wait(&instance->cond_var, &instance->mutex);
  88. instance->data_is_ready = 0;
  89. if (unlikely(instance->engine->exit)) {
  90. uv_mutex_unlock(&instance->mutex);
  91. break;
  92. }
  93. // reset the monitoring chart counters
  94. stats->received_bytes =
  95. stats->sent_bytes =
  96. stats->sent_metrics =
  97. stats->lost_metrics =
  98. stats->receptions =
  99. stats->transmission_successes =
  100. stats->transmission_failures =
  101. stats->data_lost_events =
  102. stats->lost_bytes =
  103. stats->reconnects = 0;
  104. BUFFER *buffer = (BUFFER *)instance->buffer;
  105. size_t buffer_len = buffer_strlen(buffer);
  106. stats->buffered_bytes = buffer_len;
  107. if (pubsub_add_message(instance->connector_specific_data, (char *)buffer_tostring(buffer))) {
  108. error("EXPORTING: Instance %s: Cannot add data to a message", instance->config.name);
  109. stats->data_lost_events++;
  110. stats->lost_metrics += stats->buffered_metrics;
  111. stats->lost_bytes += buffer_len;
  112. goto cleanup;
  113. }
  114. debug(
  115. D_EXPORTING, "EXPORTING: pubsub_publish(): project = %s, topic = %s, buffer = %zu",
  116. connector_specific_config->project_id, connector_specific_config->topic_id, buffer_len);
  117. if (pubsub_publish((void *)connector_specific_data, error_message, stats->buffered_metrics, buffer_len)) {
  118. error("EXPORTING: Instance: %s: Cannot publish a message: %s", instance->config.name, error_message);
  119. stats->transmission_failures++;
  120. stats->data_lost_events++;
  121. stats->lost_metrics += stats->buffered_metrics;
  122. stats->lost_bytes += buffer_len;
  123. goto cleanup;
  124. }
  125. stats->sent_bytes = buffer_len;
  126. stats->transmission_successes++;
  127. size_t sent_metrics = 0, lost_metrics = 0, sent_bytes = 0, lost_bytes = 0;
  128. if (unlikely(pubsub_get_result(
  129. connector_specific_data, error_message, &sent_metrics, &sent_bytes, &lost_metrics, &lost_bytes))) {
  130. // oops! we couldn't send (all or some of the) data
  131. error("EXPORTING: %s", error_message);
  132. error(
  133. "EXPORTING: failed to write data to service '%s'. Willing to write %zu bytes, wrote %zu bytes.",
  134. instance->config.destination, lost_bytes, sent_bytes);
  135. stats->transmission_failures++;
  136. stats->data_lost_events++;
  137. stats->lost_metrics += lost_metrics;
  138. stats->lost_bytes += lost_bytes;
  139. } else {
  140. stats->receptions++;
  141. stats->sent_metrics = sent_metrics;
  142. }
  143. cleanup:
  144. send_internal_metrics(instance);
  145. buffer_flush(buffer);
  146. stats->buffered_metrics = 0;
  147. uv_mutex_unlock(&instance->mutex);
  148. #ifdef UNIT_TESTING
  149. return;
  150. #endif
  151. }
  152. clean_pubsub_instance(instance);
  153. }