rrdpush.h 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifndef NETDATA_RRDPUSH_H
  3. #define NETDATA_RRDPUSH_H 1
  4. #include "libnetdata/libnetdata.h"
  5. #include "daemon/common.h"
  6. #include "web/server/web_client.h"
  7. #include "database/rrd.h"
  8. #define CONNECTED_TO_SIZE 100
  9. #define CBUFFER_INITIAL_SIZE (16 * 1024)
  10. #define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2)
  11. // ----------------------------------------------------------------------------
  12. // obsolete versions - do not use anymore
  13. #define STREAM_OLD_VERSION_CLAIM 3
  14. #define STREAM_OLD_VERSION_CLABELS 4
  15. #define STREAM_OLD_VERSION_COMPRESSION 5 // this is production
  16. // ----------------------------------------------------------------------------
  17. // capabilities negotiation
  18. typedef enum {
  19. // do not use the first 3 bits
  20. // they used to be versions 1, 2 and 3
  21. // before we introduce capabilities
  22. STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol
  23. STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels)
  24. STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol)
  25. // v3 = claiming supported
  26. // v4 = chart labels supported
  27. // v5 = lz4 compression supported
  28. STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported
  29. STREAM_CAP_HLABELS = (1 << 7), // host labels supported
  30. STREAM_CAP_CLAIM = (1 << 8), // claiming supported
  31. STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
  32. STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported
  33. STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
  34. STREAM_CAP_REPLICATION = (1 << 12), // replication supported
  35. STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
  36. STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
  37. STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
  38. STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
  39. // this must be signed int, so don't use the last bit
  40. // needed for negotiating errors between parent and child
  41. } STREAM_CAPABILITIES;
  42. #ifdef ENABLE_COMPRESSION
  43. #define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
  44. #else
  45. #define STREAM_HAS_COMPRESSION 0
  46. #endif // ENABLE_COMPRESSION
  47. STREAM_CAPABILITIES stream_our_capabilities();
  48. #define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
  49. // ----------------------------------------------------------------------------
  50. // stream handshake
  51. #define HTTP_HEADER_SIZE 8192
  52. #define STREAMING_PROTOCOL_VERSION "1.1"
  53. #define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..."
  54. #define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
  55. #define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
  56. #define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
  57. #define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
  58. #define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
  59. #define START_STREAMING_ERROR_BUSY_TRY_LATER "The server is too busy now to accept this request. Try later."
  60. #define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later."
  61. #define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."
  62. typedef enum {
  63. STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
  64. STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS
  65. STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM
  66. STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS
  67. STREAM_HANDSHAKE_OK_V1 = 1,
  68. STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
  69. STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
  70. STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
  71. STREAM_HANDSHAKE_ERROR_DENIED = -4,
  72. STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5,
  73. STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
  74. STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
  75. STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
  76. STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9,
  77. STREAM_HANDSHAKE_BUSY_TRY_LATER = -10,
  78. STREAM_HANDSHAKE_INTERNAL_ERROR = -11,
  79. STREAM_HANDSHAKE_INITIALIZATION = -12,
  80. } STREAM_HANDSHAKE;
  81. // ----------------------------------------------------------------------------
  82. typedef enum __attribute__((packed)) {
  83. STREAM_TRAFFIC_TYPE_REPLICATION,
  84. STREAM_TRAFFIC_TYPE_FUNCTIONS,
  85. STREAM_TRAFFIC_TYPE_METADATA,
  86. STREAM_TRAFFIC_TYPE_DATA,
  87. // terminator
  88. STREAM_TRAFFIC_TYPE_MAX,
  89. } STREAM_TRAFFIC_TYPE;
  90. // ----------------------------------------------------------------------------
  91. typedef struct {
  92. char *os_name;
  93. char *os_id;
  94. char *os_version;
  95. char *kernel_name;
  96. char *kernel_version;
  97. } stream_encoded_t;
  98. #ifdef ENABLE_COMPRESSION
  99. struct compressor_state {
  100. char *compression_result_buffer;
  101. size_t compression_result_buffer_size;
  102. struct compressor_data *data; // Compression API specific data
  103. void (*reset)(struct compressor_state *state);
  104. size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
  105. void (*destroy)(struct compressor_state **state);
  106. };
  107. struct decompressor_state {
  108. size_t signature_size;
  109. size_t total_compressed;
  110. size_t total_uncompressed;
  111. size_t packet_count;
  112. struct decompressor_stream *stream; // Decompression API specific data
  113. void (*reset)(struct decompressor_state *state);
  114. size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
  115. size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
  116. size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
  117. size_t (*get)(struct decompressor_state *state, char *data, size_t size);
  118. void (*destroy)(struct decompressor_state **state);
  119. };
  120. #endif
  121. // Thread-local storage
  122. // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
  123. typedef enum {
  124. SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
  125. SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
  126. } SENDER_FLAGS;
  127. struct sender_state {
  128. RRDHOST *host;
  129. pid_t tid; // the thread id of the sender, from gettid()
  130. SENDER_FLAGS flags;
  131. int timeout;
  132. int default_port;
  133. usec_t reconnect_delay;
  134. char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
  135. size_t begin;
  136. size_t reconnects_counter;
  137. size_t sent_bytes;
  138. size_t sent_bytes_on_this_connection;
  139. size_t send_attempts;
  140. time_t last_traffic_seen_t;
  141. time_t last_state_since_t; // the timestamp of the last state (online/offline) change
  142. size_t not_connected_loops;
  143. // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
  144. // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
  145. netdata_mutex_t mutex;
  146. struct circular_buffer *buffer;
  147. char read_buffer[PLUGINSD_LINE_MAX + 1];
  148. int read_len;
  149. STREAM_CAPABILITIES capabilities;
  150. size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
  151. int rrdpush_sender_pipe[2]; // collector to sender thread signaling
  152. int rrdpush_sender_socket;
  153. uint16_t hops;
  154. #ifdef ENABLE_COMPRESSION
  155. struct compressor_state *compressor;
  156. #endif
  157. #ifdef ENABLE_HTTPS
  158. NETDATA_SSL ssl; // structure used to encrypt the connection
  159. #endif
  160. struct {
  161. bool shutdown;
  162. const char *reason;
  163. } exit;
  164. struct {
  165. DICTIONARY *requests; // de-duplication of replication requests, per chart
  166. time_t oldest_request_after_t; // the timestamp of the oldest replication request
  167. time_t latest_completed_before_t; // the timestamp of the latest replication request
  168. struct {
  169. size_t pending_requests; // the currently outstanding replication requests
  170. size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
  171. bool reached_max; // true when the sender buffer should not get more replication responses
  172. } atomic;
  173. } replication;
  174. struct {
  175. bool pending_data;
  176. size_t buffer_used_percentage; // the current utilization of the sending buffer
  177. usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
  178. time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
  179. } atomic;
  180. };
  181. #define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
  182. #define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
  183. #define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
  184. #define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
  185. #define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
  186. #define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
  187. #define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
  188. #define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED)
  189. #define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
  190. #define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED)
  191. #define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
  192. #define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
  193. #define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
  194. #define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
  195. #define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED)
  196. #define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED)
  197. #define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
  198. #define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
  199. #define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
  200. struct receiver_state {
  201. RRDHOST *host;
  202. pid_t tid;
  203. netdata_thread_t thread;
  204. int fd;
  205. char *key;
  206. char *hostname;
  207. char *registry_hostname;
  208. char *machine_guid;
  209. char *os;
  210. char *timezone; // Unused?
  211. char *abbrev_timezone;
  212. int32_t utc_offset;
  213. char *tags;
  214. char *client_ip; // Duplicated in pluginsd
  215. char *client_port; // Duplicated in pluginsd
  216. char *program_name; // Duplicated in pluginsd
  217. char *program_version;
  218. struct rrdhost_system_info *system_info;
  219. STREAM_CAPABILITIES capabilities;
  220. time_t last_msg_t;
  221. char read_buffer[PLUGINSD_LINE_MAX + 1];
  222. int read_len;
  223. uint16_t hops;
  224. struct {
  225. bool shutdown; // signal the streaming parser to exit
  226. const char *reason; // the reason of disconnection to log
  227. } exit;
  228. struct {
  229. RRD_MEMORY_MODE mode;
  230. int history;
  231. int update_every;
  232. int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO
  233. time_t alarms_delay;
  234. int rrdpush_enabled;
  235. char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig
  236. char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig
  237. bool rrdpush_enable_replication;
  238. time_t rrdpush_seconds_to_replicate;
  239. time_t rrdpush_replication_step;
  240. char *rrdpush_destination; // DONT FREE - it is allocated in appconfig
  241. unsigned int rrdpush_compression;
  242. } config;
  243. #ifdef ENABLE_HTTPS
  244. NETDATA_SSL ssl;
  245. #endif
  246. #ifdef ENABLE_COMPRESSION
  247. unsigned int rrdpush_compression;
  248. struct decompressor_state *decompressor;
  249. #endif
  250. time_t replication_first_time_t;
  251. };
  252. struct rrdpush_destinations {
  253. STRING *destination;
  254. bool ssl;
  255. const char *last_error;
  256. time_t last_attempt;
  257. time_t postpone_reconnection_until;
  258. STREAM_HANDSHAKE last_handshake;
  259. struct rrdpush_destinations *prev;
  260. struct rrdpush_destinations *next;
  261. };
  262. extern unsigned int default_rrdpush_enabled;
  263. #ifdef ENABLE_COMPRESSION
  264. extern unsigned int default_compression_enabled;
  265. #endif
  266. extern char *default_rrdpush_destination;
  267. extern char *default_rrdpush_api_key;
  268. extern char *default_rrdpush_send_charts_matching;
  269. extern bool default_rrdpush_enable_replication;
  270. extern time_t default_rrdpush_seconds_to_replicate;
  271. extern time_t default_rrdpush_replication_step;
  272. extern unsigned int remote_clock_resync_iterations;
  273. void rrdpush_destinations_init(RRDHOST *host);
  274. void rrdpush_destinations_free(RRDHOST *host);
  275. BUFFER *sender_start(struct sender_state *s);
  276. void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type);
  277. int rrdpush_init();
  278. bool rrdpush_receiver_needs_dbengine();
  279. int configured_as_parent();
  280. typedef struct rrdset_stream_buffer {
  281. STREAM_CAPABILITIES capabilities;
  282. bool v2;
  283. bool begin_v2_added;
  284. time_t wall_clock_time;
  285. uint64_t rrdset_flags; // RRDSET_FLAGS
  286. time_t last_point_end_time_s;
  287. BUFFER *wb;
  288. } RRDSET_STREAM_BUFFER;
  289. RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time);
  290. void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
  291. void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
  292. void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags);
  293. bool rrdset_push_chart_definition_now(RRDSET *st);
  294. void *rrdpush_sender_thread(void *ptr);
  295. void rrdpush_send_host_labels(RRDHOST *host);
  296. void rrdpush_claimed_id(RRDHOST *host);
  297. #define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
  298. #define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
  299. int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
  300. void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait);
  301. void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
  302. void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
  303. int connect_to_one_of_destinations(
  304. RRDHOST *host,
  305. int default_port,
  306. struct timeval *timeout,
  307. size_t *reconnects_counter,
  308. char *connected_to,
  309. size_t connected_to_size,
  310. struct rrdpush_destinations **destination);
  311. void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
  312. #ifdef ENABLE_COMPRESSION
  313. struct compressor_state *create_compressor();
  314. struct decompressor_state *create_decompressor();
  315. #endif
  316. void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
  317. const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
  318. void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
  319. void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
  320. void log_receiver_capabilities(struct receiver_state *rpt);
  321. void log_sender_capabilities(struct sender_state *s);
  322. STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
  323. int32_t stream_capabilities_to_vn(uint32_t caps);
  324. void receiver_state_free(struct receiver_state *rpt);
  325. bool stop_streaming_receiver(RRDHOST *host, const char *reason);
  326. void sender_thread_buffer_free(void);
  327. void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
  328. void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
  329. #include "replication.h"
  330. #endif //NETDATA_RRDPUSH_H