exporting_engine.c 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "exporting_engine.h"
  3. static struct engine *engine = NULL;
  4. void analytics_exporting_connectors_ssl(BUFFER *b)
  5. {
  6. #ifdef ENABLE_HTTPS
  7. if (netdata_exporting_ctx) {
  8. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  9. struct simple_connector_data *connector_specific_data = instance->connector_specific_data;
  10. if (connector_specific_data->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
  11. buffer_strcat(b, "exporting");
  12. break;
  13. }
  14. }
  15. }
  16. #endif
  17. buffer_strcat(b, "|");
  18. }
  19. void analytics_exporting_connectors(BUFFER *b)
  20. {
  21. if (!engine)
  22. return;
  23. uint8_t count = 0;
  24. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  25. if (count)
  26. buffer_strcat(b, "|");
  27. switch (instance->config.type) {
  28. case EXPORTING_CONNECTOR_TYPE_GRAPHITE:
  29. buffer_strcat(b, "Graphite");
  30. break;
  31. case EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP:
  32. buffer_strcat(b, "GraphiteHTTP");
  33. break;
  34. case EXPORTING_CONNECTOR_TYPE_JSON:
  35. buffer_strcat(b, "JSON");
  36. break;
  37. case EXPORTING_CONNECTOR_TYPE_JSON_HTTP:
  38. buffer_strcat(b, "JSONHTTP");
  39. break;
  40. case EXPORTING_CONNECTOR_TYPE_OPENTSDB:
  41. buffer_strcat(b, "OpenTSDB");
  42. break;
  43. case EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP:
  44. buffer_strcat(b, "OpenTSDBHTTP");
  45. break;
  46. case EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE:
  47. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  48. buffer_strcat(b, "PrometheusRemoteWrite");
  49. #endif
  50. break;
  51. case EXPORTING_CONNECTOR_TYPE_KINESIS:
  52. #if HAVE_KINESIS
  53. buffer_strcat(b, "Kinesis");
  54. #endif
  55. break;
  56. case EXPORTING_CONNECTOR_TYPE_PUBSUB:
  57. #if ENABLE_EXPORTING_PUBSUB
  58. buffer_strcat(b, "Pubsub");
  59. #endif
  60. break;
  61. case EXPORTING_CONNECTOR_TYPE_MONGODB:
  62. #if HAVE_MONGOC
  63. buffer_strcat(b, "MongoDB");
  64. #endif
  65. break;
  66. default:
  67. buffer_strcat(b, "Unknown");
  68. }
  69. count++;
  70. }
  71. }
  72. /**
  73. * Exporting Clean Engine
  74. *
  75. * Clean all variables allocated inside engine structure
  76. *
  77. * @param en a pointer to the structure that will be cleaned.
  78. */
  79. static void exporting_clean_engine()
  80. {
  81. if (!engine)
  82. return;
  83. #if HAVE_KINESIS
  84. if (engine->aws_sdk_initialized)
  85. aws_sdk_shutdown();
  86. #endif
  87. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  88. if (engine->protocol_buffers_initialized)
  89. protocol_buffers_shutdown();
  90. #endif
  91. //Cleanup web api
  92. prometheus_clean_server_root();
  93. for (struct instance *instance = engine->instance_root; instance;) {
  94. struct instance *current_instance = instance;
  95. instance = instance->next;
  96. clean_instance(current_instance);
  97. }
  98. freez((void *)engine->config.hostname);
  99. freez(engine);
  100. }
  101. /**
  102. * Clean up the main exporting thread and all connector workers on Netdata exit
  103. *
  104. * @param ptr thread data.
  105. */
  106. static void exporting_main_cleanup(void *ptr)
  107. {
  108. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  109. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  110. info("cleaning up...");
  111. if (!engine) {
  112. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  113. return;
  114. }
  115. engine->exit = 1;
  116. int found = 0;
  117. usec_t max = 2 * USEC_PER_SEC, step = 50000;
  118. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  119. if (!instance->exited) {
  120. found++;
  121. info("stopping worker for instance %s", instance->config.name);
  122. uv_mutex_unlock(&instance->mutex);
  123. instance->data_is_ready = 1;
  124. uv_cond_signal(&instance->cond_var);
  125. } else
  126. info("found stopped worker for instance %s", instance->config.name);
  127. }
  128. while (found && max > 0) {
  129. max -= step;
  130. info("Waiting %d exporting connectors to finish...", found);
  131. sleep_usec(step);
  132. found = 0;
  133. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  134. if (!instance->exited)
  135. found++;
  136. }
  137. }
  138. exporting_clean_engine();
  139. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  140. }
  141. /**
  142. * Exporting engine main
  143. *
  144. * The main thread used to control the exporting engine.
  145. *
  146. * @param ptr a pointer to netdata_static_structure.
  147. *
  148. * @return It always returns NULL.
  149. */
  150. void *exporting_main(void *ptr)
  151. {
  152. netdata_thread_cleanup_push(exporting_main_cleanup, ptr);
  153. engine = read_exporting_config();
  154. if (!engine) {
  155. info("EXPORTING: no exporting connectors configured");
  156. goto cleanup;
  157. }
  158. if (init_connectors(engine) != 0) {
  159. error("EXPORTING: cannot initialize exporting connectors");
  160. send_statistics("EXPORTING_START", "FAIL", "-");
  161. goto cleanup;
  162. }
  163. RRDSET *st_main_rusage = NULL;
  164. RRDDIM *rd_main_user = NULL;
  165. RRDDIM *rd_main_system = NULL;
  166. create_main_rusage_chart(&st_main_rusage, &rd_main_user, &rd_main_system);
  167. usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
  168. heartbeat_t hb;
  169. heartbeat_init(&hb);
  170. while (!netdata_exit) {
  171. heartbeat_next(&hb, step_ut);
  172. engine->now = now_realtime_sec();
  173. if (mark_scheduled_instances(engine))
  174. prepare_buffers(engine);
  175. send_main_rusage(st_main_rusage, rd_main_user, rd_main_system);
  176. #ifdef UNIT_TESTING
  177. return NULL;
  178. #endif
  179. }
  180. cleanup:
  181. netdata_thread_cleanup_pop(1);
  182. return NULL;
  183. }