rrdpush.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifndef NETDATA_RRDPUSH_H
  3. #define NETDATA_RRDPUSH_H 1
  4. #include "libnetdata/libnetdata.h"
  5. #include "daemon/common.h"
  6. #include "web/server/web_client.h"
  7. #include "database/rrd.h"
  8. #define CONNECTED_TO_SIZE 100
  9. #define CBUFFER_INITIAL_SIZE (16 * 1024)
  10. #define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2)
  11. // ----------------------------------------------------------------------------
  12. // obsolete versions - do not use anymore
  13. #define STREAM_OLD_VERSION_CLAIM 3
  14. #define STREAM_OLD_VERSION_CLABELS 4
  15. #define STREAM_OLD_VERSION_COMPRESSION 5 // this is production
  16. // ----------------------------------------------------------------------------
  17. // capabilities negotiation
  18. typedef enum {
  19. // do not use the first 3 bits
  20. // they used to be versions 1, 2 and 3
  21. // before we introduce capabilities
  22. STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol
  23. STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels)
  24. STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol)
  25. // v3 = claiming supported
  26. // v4 = chart labels supported
  27. // v5 = lz4 compression supported
  28. STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported
  29. STREAM_CAP_HLABELS = (1 << 7), // host labels supported
  30. STREAM_CAP_CLAIM = (1 << 8), // claiming supported
  31. STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
  32. STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported
  33. STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
  34. STREAM_CAP_REPLICATION = (1 << 12), // replication supported
  35. STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
  36. STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
  37. STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
  38. STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
  39. // this must be signed int, so don't use the last bit
  40. // needed for negotiating errors between parent and child
  41. } STREAM_CAPABILITIES;
  42. #ifdef ENABLE_COMPRESSION
  43. #define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
  44. #else
  45. #define STREAM_HAS_COMPRESSION 0
  46. #endif // ENABLE_COMPRESSION
  47. STREAM_CAPABILITIES stream_our_capabilities();
  48. #define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
  49. // ----------------------------------------------------------------------------
  50. // stream handshake
  51. #define HTTP_HEADER_SIZE 8192
  52. #define STREAMING_PROTOCOL_VERSION "1.1"
  53. #define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..."
  54. #define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
  55. #define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
  56. #define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
  57. #define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
  58. #define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
  59. typedef enum {
  60. STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
  61. STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS
  62. STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM
  63. STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS
  64. STREAM_HANDSHAKE_OK_V1 = 1,
  65. STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
  66. STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
  67. STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
  68. STREAM_HANDSHAKE_ERROR_DENIED = -4,
  69. STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5,
  70. STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
  71. STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
  72. STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
  73. STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9
  74. } STREAM_HANDSHAKE;
  75. // ----------------------------------------------------------------------------
  76. typedef struct {
  77. char *os_name;
  78. char *os_id;
  79. char *os_version;
  80. char *kernel_name;
  81. char *kernel_version;
  82. } stream_encoded_t;
  83. #ifdef ENABLE_COMPRESSION
  84. struct compressor_state {
  85. char *compression_result_buffer;
  86. size_t compression_result_buffer_size;
  87. struct compressor_data *data; // Compression API specific data
  88. void (*reset)(struct compressor_state *state);
  89. size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
  90. void (*destroy)(struct compressor_state **state);
  91. };
  92. struct decompressor_state {
  93. size_t signature_size;
  94. size_t total_compressed;
  95. size_t total_uncompressed;
  96. size_t packet_count;
  97. struct decompressor_stream *stream; // Decompression API specific data
  98. void (*reset)(struct decompressor_state *state);
  99. size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
  100. size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
  101. size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
  102. size_t (*get)(struct decompressor_state *state, char *data, size_t size);
  103. void (*destroy)(struct decompressor_state **state);
  104. };
  105. #endif
  106. // Thread-local storage
  107. // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
  108. typedef enum {
  109. SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
  110. SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
  111. } SENDER_FLAGS;
  112. struct sender_state {
  113. RRDHOST *host;
  114. pid_t tid; // the thread id of the sender, from gettid()
  115. SENDER_FLAGS flags;
  116. int timeout;
  117. int default_port;
  118. usec_t reconnect_delay;
  119. char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
  120. size_t begin;
  121. size_t reconnects_counter;
  122. size_t sent_bytes;
  123. size_t sent_bytes_on_this_connection;
  124. size_t send_attempts;
  125. time_t last_traffic_seen_t;
  126. size_t not_connected_loops;
  127. // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
  128. // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
  129. netdata_mutex_t mutex;
  130. struct circular_buffer *buffer;
  131. char read_buffer[PLUGINSD_LINE_MAX + 1];
  132. int read_len;
  133. STREAM_CAPABILITIES capabilities;
  134. int rrdpush_sender_pipe[2]; // collector to sender thread signaling
  135. int rrdpush_sender_socket;
  136. uint16_t hops;
  137. #ifdef ENABLE_COMPRESSION
  138. struct compressor_state *compressor;
  139. #endif
  140. #ifdef ENABLE_HTTPS
  141. struct netdata_ssl ssl; // structure used to encrypt the connection
  142. #endif
  143. struct {
  144. bool shutdown;
  145. const char *reason;
  146. } exit;
  147. struct {
  148. DICTIONARY *requests; // de-duplication of replication requests, per chart
  149. struct {
  150. size_t pending_requests; // the currently outstanding replication requests
  151. size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
  152. bool reached_max; // true when the sender buffer should not get more replication responses
  153. } atomic;
  154. } replication;
  155. struct {
  156. bool pending_data;
  157. size_t buffer_used_percentage; // the current utilization of the sending buffer
  158. usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
  159. time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
  160. } atomic;
  161. };
  162. #define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
  163. #define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
  164. #define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
  165. #define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
  166. #define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
  167. #define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
  168. #define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
  169. #define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED)
  170. #define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
  171. #define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED)
  172. #define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
  173. #define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
  174. #define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
  175. #define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
  176. #define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED)
  177. #define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED)
  178. #define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
  179. #define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
  180. #define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
  181. struct receiver_state {
  182. RRDHOST *host;
  183. netdata_thread_t thread;
  184. int fd;
  185. char *key;
  186. char *hostname;
  187. char *registry_hostname;
  188. char *machine_guid;
  189. char *os;
  190. char *timezone; // Unused?
  191. char *abbrev_timezone;
  192. int32_t utc_offset;
  193. char *tags;
  194. char *client_ip; // Duplicated in pluginsd
  195. char *client_port; // Duplicated in pluginsd
  196. char *program_name; // Duplicated in pluginsd
  197. char *program_version;
  198. struct rrdhost_system_info *system_info;
  199. STREAM_CAPABILITIES capabilities;
  200. time_t last_msg_t;
  201. char read_buffer[PLUGINSD_LINE_MAX + 1];
  202. int read_len;
  203. uint16_t hops;
  204. struct {
  205. bool shutdown; // signal the streaming parser to exit
  206. const char *reason; // the reason of disconnection to log
  207. } exit;
  208. struct {
  209. RRD_MEMORY_MODE mode;
  210. int history;
  211. int update_every;
  212. int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO
  213. time_t alarms_delay;
  214. int rrdpush_enabled;
  215. char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig
  216. char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig
  217. bool rrdpush_enable_replication;
  218. time_t rrdpush_seconds_to_replicate;
  219. time_t rrdpush_replication_step;
  220. char *rrdpush_destination; // DONT FREE - it is allocated in appconfig
  221. unsigned int rrdpush_compression;
  222. } config;
  223. #ifdef ENABLE_HTTPS
  224. struct netdata_ssl ssl;
  225. #endif
  226. #ifdef ENABLE_COMPRESSION
  227. unsigned int rrdpush_compression;
  228. struct decompressor_state *decompressor;
  229. #endif
  230. time_t replication_first_time_t;
  231. };
  232. struct rrdpush_destinations {
  233. STRING *destination;
  234. const char *last_error;
  235. time_t postpone_reconnection_until;
  236. STREAM_HANDSHAKE last_handshake;
  237. struct rrdpush_destinations *prev;
  238. struct rrdpush_destinations *next;
  239. };
  240. extern unsigned int default_rrdpush_enabled;
  241. #ifdef ENABLE_COMPRESSION
  242. extern unsigned int default_compression_enabled;
  243. #endif
  244. extern char *default_rrdpush_destination;
  245. extern char *default_rrdpush_api_key;
  246. extern char *default_rrdpush_send_charts_matching;
  247. extern bool default_rrdpush_enable_replication;
  248. extern time_t default_rrdpush_seconds_to_replicate;
  249. extern time_t default_rrdpush_replication_step;
  250. extern unsigned int remote_clock_resync_iterations;
  251. void rrdpush_destinations_init(RRDHOST *host);
  252. void rrdpush_destinations_free(RRDHOST *host);
  253. BUFFER *sender_start(struct sender_state *s);
  254. void sender_commit(struct sender_state *s, BUFFER *wb);
  255. int rrdpush_init();
  256. bool rrdpush_receiver_needs_dbengine();
  257. int configured_as_parent();
  258. typedef struct rrdset_stream_buffer {
  259. STREAM_CAPABILITIES capabilities;
  260. bool v2;
  261. bool begin_v2_added;
  262. time_t wall_clock_time;
  263. uint64_t rrdset_flags; // RRDSET_FLAGS
  264. time_t last_point_end_time_s;
  265. BUFFER *wb;
  266. } RRDSET_STREAM_BUFFER;
  267. RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time);
  268. void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
  269. void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
  270. void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags);
  271. bool rrdset_push_chart_definition_now(RRDSET *st);
  272. void *rrdpush_sender_thread(void *ptr);
  273. void rrdpush_send_host_labels(RRDHOST *host);
  274. void rrdpush_claimed_id(RRDHOST *host);
  275. #define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
  276. #define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
  277. int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
  278. void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait);
  279. void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
  280. void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
  281. int connect_to_one_of_destinations(
  282. RRDHOST *host,
  283. int default_port,
  284. struct timeval *timeout,
  285. size_t *reconnects_counter,
  286. char *connected_to,
  287. size_t connected_to_size,
  288. struct rrdpush_destinations **destination);
  289. void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
  290. #ifdef ENABLE_COMPRESSION
  291. struct compressor_state *create_compressor();
  292. struct decompressor_state *create_decompressor();
  293. #endif
  294. void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
  295. void log_receiver_capabilities(struct receiver_state *rpt);
  296. void log_sender_capabilities(struct sender_state *s);
  297. STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
  298. int32_t stream_capabilities_to_vn(uint32_t caps);
  299. void receiver_state_free(struct receiver_state *rpt);
  300. bool stop_streaming_receiver(RRDHOST *host, const char *reason);
  301. void sender_thread_buffer_free(void);
  302. #include "replication.h"
  303. #endif //NETDATA_RRDPUSH_H