rrdpush.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifndef NETDATA_RRDPUSH_H
  3. #define NETDATA_RRDPUSH_H 1
  4. #include "libnetdata/libnetdata.h"
  5. #include "daemon/common.h"
  6. #include "web/server/web_client.h"
  7. #include "database/rrd.h"
  8. #define CONNECTED_TO_SIZE 100
  9. #define CBUFFER_INITIAL_SIZE (16 * 1024)
  10. #define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2)
  11. // ----------------------------------------------------------------------------
  12. // obsolete versions - do not use anymore
  13. #define STREAM_OLD_VERSION_CLAIM 3
  14. #define STREAM_OLD_VERSION_CLABELS 4
  15. #define STREAM_OLD_VERSION_COMPRESSION 5 // this is production
  16. // ----------------------------------------------------------------------------
  17. // capabilities negotiation
  18. typedef enum {
  19. // do not use the first 3 bits
  20. // they used to be versions 1, 2 and 3
  21. // before we introduce capabilities
  22. STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol
  23. STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels)
  24. STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol)
  25. // v3 = claiming supported
  26. // v4 = chart labels supported
  27. // v5 = lz4 compression supported
  28. STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported
  29. STREAM_CAP_HLABELS = (1 << 7), // host labels supported
  30. STREAM_CAP_CLAIM = (1 << 8), // claiming supported
  31. STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
  32. STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported
  33. STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
  34. STREAM_CAP_REPLICATION = (1 << 12), // replication supported
  35. STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
  36. STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
  37. // this must be signed int, so don't use the last bit
  38. // needed for negotiating errors between parent and child
  39. } STREAM_CAPABILITIES;
  40. #ifdef ENABLE_COMPRESSION
  41. #define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
  42. #else
  43. #define STREAM_HAS_COMPRESSION 0
  44. #endif // ENABLE_COMPRESSION
  45. #define STREAM_OUR_CAPABILITIES ( \
  46. STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | \
  47. STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | \
  48. STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY )
  49. #define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)))
  50. // ----------------------------------------------------------------------------
  51. // stream handshake
  52. #define HTTP_HEADER_SIZE 8192
  53. #define STREAMING_PROTOCOL_VERSION "1.1"
  54. #define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..."
  55. #define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
  56. #define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
  57. #define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
  58. #define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
  59. #define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
  60. typedef enum {
  61. STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
  62. STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS
  63. STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM
  64. STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS
  65. STREAM_HANDSHAKE_OK_V1 = 1,
  66. STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
  67. STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
  68. STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
  69. STREAM_HANDSHAKE_ERROR_DENIED = -4,
  70. STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5,
  71. STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
  72. STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
  73. STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
  74. STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9
  75. } STREAM_HANDSHAKE;
  76. // ----------------------------------------------------------------------------
  77. typedef struct {
  78. char *os_name;
  79. char *os_id;
  80. char *os_version;
  81. char *kernel_name;
  82. char *kernel_version;
  83. } stream_encoded_t;
  84. #ifdef ENABLE_COMPRESSION
  85. struct compressor_state {
  86. char *compression_result_buffer;
  87. size_t compression_result_buffer_size;
  88. struct compressor_data *data; // Compression API specific data
  89. void (*reset)(struct compressor_state *state);
  90. size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
  91. void (*destroy)(struct compressor_state **state);
  92. };
  93. struct decompressor_state {
  94. size_t signature_size;
  95. size_t total_compressed;
  96. size_t total_uncompressed;
  97. size_t packet_count;
  98. struct decompressor_stream *stream; // Decompression API specific data
  99. void (*reset)(struct decompressor_state *state);
  100. size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
  101. size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
  102. size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
  103. size_t (*get)(struct decompressor_state *state, char *data, size_t size);
  104. void (*destroy)(struct decompressor_state **state);
  105. };
  106. #endif
  107. // Thread-local storage
  108. // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
  109. typedef enum {
  110. SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
  111. SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
  112. } SENDER_FLAGS;
  113. struct sender_state {
  114. RRDHOST *host;
  115. pid_t tid; // the thread id of the sender, from gettid()
  116. SENDER_FLAGS flags;
  117. int timeout;
  118. int default_port;
  119. usec_t reconnect_delay;
  120. char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
  121. size_t begin;
  122. size_t reconnects_counter;
  123. size_t sent_bytes;
  124. size_t sent_bytes_on_this_connection;
  125. size_t send_attempts;
  126. time_t last_traffic_seen_t;
  127. size_t not_connected_loops;
  128. // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
  129. // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
  130. netdata_mutex_t mutex;
  131. struct circular_buffer *buffer;
  132. char read_buffer[PLUGINSD_LINE_MAX + 1];
  133. int read_len;
  134. STREAM_CAPABILITIES capabilities;
  135. int rrdpush_sender_pipe[2]; // collector to sender thread signaling
  136. int rrdpush_sender_socket;
  137. uint16_t hops;
  138. #ifdef ENABLE_COMPRESSION
  139. struct compressor_state *compressor;
  140. #endif
  141. #ifdef ENABLE_HTTPS
  142. struct netdata_ssl ssl; // structure used to encrypt the connection
  143. #endif
  144. struct {
  145. bool shutdown;
  146. const char *reason;
  147. } exit;
  148. struct {
  149. DICTIONARY *requests; // de-duplication of replication requests, per chart
  150. struct {
  151. size_t pending_requests; // the currently outstanding replication requests
  152. size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
  153. bool reached_max; // true when the sender buffer should not get more replication responses
  154. } atomic;
  155. } replication;
  156. struct {
  157. size_t buffer_used_percentage; // the current utilization of the sending buffer
  158. usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
  159. time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
  160. } atomic;
  161. };
  162. #define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
  163. #define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
  164. #define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
  165. #define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
  166. #define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED)
  167. #define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
  168. #define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED)
  169. #define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
  170. #define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
  171. #define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
  172. #define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
  173. #define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED)
  174. #define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED)
  175. #define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
  176. #define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
  177. #define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
  178. struct receiver_state {
  179. RRDHOST *host;
  180. netdata_thread_t thread;
  181. int fd;
  182. char *key;
  183. char *hostname;
  184. char *registry_hostname;
  185. char *machine_guid;
  186. char *os;
  187. char *timezone; // Unused?
  188. char *abbrev_timezone;
  189. int32_t utc_offset;
  190. char *tags;
  191. char *client_ip; // Duplicated in pluginsd
  192. char *client_port; // Duplicated in pluginsd
  193. char *program_name; // Duplicated in pluginsd
  194. char *program_version;
  195. struct rrdhost_system_info *system_info;
  196. STREAM_CAPABILITIES capabilities;
  197. time_t last_msg_t;
  198. char read_buffer[PLUGINSD_LINE_MAX + 1];
  199. int read_len;
  200. uint16_t hops;
  201. struct {
  202. bool shutdown; // signal the streaming parser to exit
  203. const char *reason; // the reason of disconnection to log
  204. } exit;
  205. struct {
  206. RRD_MEMORY_MODE mode;
  207. int history;
  208. int update_every;
  209. int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO
  210. time_t alarms_delay;
  211. int rrdpush_enabled;
  212. char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig
  213. char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig
  214. bool rrdpush_enable_replication;
  215. time_t rrdpush_seconds_to_replicate;
  216. time_t rrdpush_replication_step;
  217. char *rrdpush_destination; // DONT FREE - it is allocated in appconfig
  218. unsigned int rrdpush_compression;
  219. } config;
  220. #ifdef ENABLE_HTTPS
  221. struct netdata_ssl ssl;
  222. #endif
  223. #ifdef ENABLE_COMPRESSION
  224. unsigned int rrdpush_compression;
  225. struct decompressor_state *decompressor;
  226. #endif
  227. time_t replication_first_time_t;
  228. };
  229. struct rrdpush_destinations {
  230. STRING *destination;
  231. const char *last_error;
  232. time_t postpone_reconnection_until;
  233. STREAM_HANDSHAKE last_handshake;
  234. struct rrdpush_destinations *prev;
  235. struct rrdpush_destinations *next;
  236. };
  237. extern unsigned int default_rrdpush_enabled;
  238. #ifdef ENABLE_COMPRESSION
  239. extern unsigned int default_compression_enabled;
  240. #endif
  241. extern char *default_rrdpush_destination;
  242. extern char *default_rrdpush_api_key;
  243. extern char *default_rrdpush_send_charts_matching;
  244. extern bool default_rrdpush_enable_replication;
  245. extern time_t default_rrdpush_seconds_to_replicate;
  246. extern time_t default_rrdpush_replication_step;
  247. extern unsigned int remote_clock_resync_iterations;
  248. void rrdpush_destinations_init(RRDHOST *host);
  249. void rrdpush_destinations_free(RRDHOST *host);
  250. BUFFER *sender_start(struct sender_state *s);
  251. void sender_commit(struct sender_state *s, BUFFER *wb);
  252. int rrdpush_init();
  253. bool rrdpush_receiver_needs_dbengine();
  254. int configured_as_parent();
  255. void rrdset_done_push(RRDSET *st);
  256. bool rrdset_push_chart_definition_now(RRDSET *st);
  257. void *rrdpush_sender_thread(void *ptr);
  258. void rrdpush_send_host_labels(RRDHOST *host);
  259. void rrdpush_claimed_id(RRDHOST *host);
  260. #define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
  261. #define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
  262. int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
  263. void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait);
  264. void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
  265. void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
  266. int connect_to_one_of_destinations(
  267. RRDHOST *host,
  268. int default_port,
  269. struct timeval *timeout,
  270. size_t *reconnects_counter,
  271. char *connected_to,
  272. size_t connected_to_size,
  273. struct rrdpush_destinations **destination);
  274. void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
  275. #ifdef ENABLE_COMPRESSION
  276. struct compressor_state *create_compressor();
  277. struct decompressor_state *create_decompressor();
  278. #endif
  279. void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
  280. void log_receiver_capabilities(struct receiver_state *rpt);
  281. void log_sender_capabilities(struct sender_state *s);
  282. STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
  283. int32_t stream_capabilities_to_vn(uint32_t caps);
  284. void receiver_state_free(struct receiver_state *rpt);
  285. bool stop_streaming_receiver(RRDHOST *host, const char *reason);
  286. void sender_thread_buffer_free(void);
  287. #include "replication.h"
  288. #endif //NETDATA_RRDPUSH_H