rrdpush.h 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifndef NETDATA_RRDPUSH_H
  3. #define NETDATA_RRDPUSH_H 1
  4. #include "database/rrd.h"
  5. #include "libnetdata/libnetdata.h"
  6. #include "web/server/web_client.h"
  7. #include "daemon/common.h"
  8. #define CONNECTED_TO_SIZE 100
  9. #define STREAM_VERSION_CLAIM 3
  10. #define STREAM_VERSION_CLABELS 4
  11. #define STREAM_VERSION_COMPRESSION 5
  12. #define VERSION_GAP_FILLING 6
  13. #ifdef ENABLE_COMPRESSION
  14. #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_COMPRESSION)
  15. #else
  16. #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_CLABELS)
  17. #endif //ENABLE_COMPRESSION
  18. #define STREAMING_PROTOCOL_VERSION "1.1"
  19. #define START_STREAMING_PROMPT "Hit me baby, push them over..."
  20. #define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
  21. #define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
  22. #define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
  23. #define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
  24. #define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
  25. #define HTTP_HEADER_SIZE 8192
  26. typedef enum {
  27. RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
  28. RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
  29. } RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
  30. typedef struct {
  31. char *os_name;
  32. char *os_id;
  33. char *os_version;
  34. char *kernel_name;
  35. char *kernel_version;
  36. } stream_encoded_t;
  37. #ifdef ENABLE_COMPRESSION
  38. struct compressor_state {
  39. char *buffer;
  40. size_t buffer_size;
  41. struct compressor_data *data; // Compression API specific data
  42. void (*reset)(struct compressor_state *state);
  43. size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
  44. void (*destroy)(struct compressor_state **state);
  45. };
  46. struct decompressor_state {
  47. char *buffer;
  48. size_t buffer_size;
  49. size_t buffer_len;
  50. size_t buffer_pos;
  51. char *out_buffer;
  52. size_t out_buffer_len;
  53. size_t out_buffer_pos;
  54. size_t total_compressed;
  55. size_t total_uncompressed;
  56. size_t packet_count;
  57. struct decompressor_data *data; // Decompression API specific data
  58. void (*reset)(struct decompressor_state *state);
  59. size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
  60. size_t (*put)(struct decompressor_state *state, const char *data, size_t size);
  61. size_t (*decompress)(struct decompressor_state *state);
  62. size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
  63. size_t (*get)(struct decompressor_state *state, char *data, size_t size);
  64. void (*destroy)(struct decompressor_state **state);
  65. };
  66. #endif
  67. // Thread-local storage
  68. // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
  69. struct sender_state {
  70. RRDHOST *host;
  71. pid_t task_id;
  72. unsigned int overflow:1;
  73. int timeout, default_port;
  74. usec_t reconnect_delay;
  75. char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
  76. size_t begin;
  77. size_t reconnects_counter;
  78. size_t sent_bytes;
  79. size_t sent_bytes_on_this_connection;
  80. size_t send_attempts;
  81. time_t last_sent_t;
  82. size_t not_connected_loops;
  83. // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
  84. // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
  85. netdata_mutex_t mutex;
  86. struct circular_buffer *buffer;
  87. BUFFER *build;
  88. char read_buffer[512];
  89. int read_len;
  90. int32_t version;
  91. #ifdef ENABLE_COMPRESSION
  92. unsigned int rrdpush_compression;
  93. struct compressor_state *compressor;
  94. #endif
  95. };
  96. struct receiver_state {
  97. RRDHOST *host;
  98. netdata_thread_t thread;
  99. int fd;
  100. char *key;
  101. char *hostname;
  102. char *registry_hostname;
  103. char *machine_guid;
  104. char *os;
  105. char *timezone; // Unused?
  106. char *abbrev_timezone;
  107. int32_t utc_offset;
  108. char *tags;
  109. char *client_ip; // Duplicated in pluginsd
  110. char *client_port; // Duplicated in pluginsd
  111. char *program_name; // Duplicated in pluginsd
  112. char *program_version;
  113. struct rrdhost_system_info *system_info;
  114. int update_every;
  115. uint32_t stream_version;
  116. time_t last_msg_t;
  117. char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields
  118. int read_len;
  119. unsigned int shutdown:1; // Tell the thread to exit
  120. unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
  121. #ifdef ENABLE_HTTPS
  122. struct netdata_ssl ssl;
  123. #endif
  124. #ifdef ENABLE_COMPRESSION
  125. unsigned int rrdpush_compression;
  126. struct decompressor_state *decompressor;
  127. #endif
  128. };
  129. struct rrdpush_destinations {
  130. char destination[CONNECTED_TO_SIZE + 1];
  131. int disabled_no_proper_reply;
  132. int disabled_because_of_localhost;
  133. time_t disabled_already_streaming;
  134. int disabled_because_of_denied_access;
  135. struct rrdpush_destinations *next;
  136. };
  137. extern unsigned int default_rrdpush_enabled;
  138. #ifdef ENABLE_COMPRESSION
  139. extern unsigned int default_compression_enabled;
  140. #endif
  141. extern char *default_rrdpush_destination;
  142. extern char *default_rrdpush_api_key;
  143. extern char *default_rrdpush_send_charts_matching;
  144. extern unsigned int remote_clock_resync_iterations;
  145. extern void sender_init(RRDHOST *parent);
  146. extern struct rrdpush_destinations *destinations_init(const char *destinations);
  147. void sender_start(struct sender_state *s);
  148. void sender_commit(struct sender_state *s);
  149. extern int rrdpush_init();
  150. extern int configured_as_parent();
  151. extern void rrdset_done_push(RRDSET *st);
  152. extern void rrdset_push_chart_definition_now(RRDSET *st);
  153. extern void *rrdpush_sender_thread(void *ptr);
  154. extern void rrdpush_send_labels(RRDHOST *host);
  155. extern void rrdpush_claimed_id(RRDHOST *host);
  156. extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
  157. extern void rrdpush_sender_thread_stop(RRDHOST *host);
  158. extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv);
  159. extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
  160. extern int connect_to_one_of_destinations(
  161. struct rrdpush_destinations *destinations,
  162. int default_port,
  163. struct timeval *timeout,
  164. size_t *reconnects_counter,
  165. char *connected_to,
  166. size_t connected_to_size,
  167. struct rrdpush_destinations **destination);
  168. #ifdef ENABLE_COMPRESSION
  169. struct compressor_state *create_compressor();
  170. struct decompressor_state *create_decompressor();
  171. size_t is_compressed_data(const char *data, size_t data_size);
  172. #endif
  173. #endif //NETDATA_RRDPUSH_H