rrdpush.h 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifndef NETDATA_RRDPUSH_H
  3. #define NETDATA_RRDPUSH_H 1
  4. #include "libnetdata/libnetdata.h"
  5. #include "daemon/common.h"
  6. #include "web/server/web_client.h"
  7. #include "database/rrd.h"
  8. #define CONNECTED_TO_SIZE 100
  9. #define CBUFFER_INITIAL_SIZE (16 * 1024)
  10. #define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2)
  11. // ----------------------------------------------------------------------------
  12. // obsolete versions - do not use anymore
  13. #define STREAM_OLD_VERSION_CLAIM 3
  14. #define STREAM_OLD_VERSION_CLABELS 4
  15. #define STREAM_OLD_VERSION_COMPRESSION 5 // this is production
  16. // ----------------------------------------------------------------------------
  17. // capabilities negotiation
  18. typedef enum {
  19. // do not use the first 3 bits
  20. // they used to be versions 1, 2 and 3
  21. // before we introduce capabilities
  22. STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol
  23. STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels)
  24. STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol)
  25. // v3 = claiming supported
  26. // v4 = chart labels supported
  27. // v5 = lz4 compression supported
  28. STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported
  29. STREAM_CAP_HLABELS = (1 << 7), // host labels supported
  30. STREAM_CAP_CLAIM = (1 << 8), // claiming supported
  31. STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
  32. STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported
  33. STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
  34. STREAM_CAP_REPLICATION = (1 << 12), // replication supported
  35. STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
  36. STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
  37. STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
  38. STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
  39. STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
  40. // this must be signed int, so don't use the last bit
  41. // needed for negotiating errors between parent and child
  42. } STREAM_CAPABILITIES;
  43. #ifdef ENABLE_RRDPUSH_COMPRESSION
  44. #define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
  45. #else
  46. #define STREAM_HAS_COMPRESSION 0
  47. #endif // ENABLE_RRDPUSH_COMPRESSION
  48. STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
  49. #define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
  50. // ----------------------------------------------------------------------------
  51. // stream handshake
  52. #define HTTP_HEADER_SIZE 8192
  53. #define STREAMING_PROTOCOL_VERSION "1.1"
  54. #define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..."
  55. #define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
  56. #define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
  57. #define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
  58. #define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
  59. #define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
  60. #define START_STREAMING_ERROR_BUSY_TRY_LATER "The server is too busy now to accept this request. Try later."
  61. #define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later."
  62. #define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."
  63. typedef enum {
  64. STREAM_HANDSHAKE_OK_V3 = 3, // v3+
  65. STREAM_HANDSHAKE_OK_V2 = 2, // v2
  66. STREAM_HANDSHAKE_OK_V1 = 1, // v1
  67. STREAM_HANDSHAKE_NEVER = 0, // never tried to connect
  68. STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
  69. STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
  70. STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
  71. STREAM_HANDSHAKE_ERROR_DENIED = -4,
  72. STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5,
  73. STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
  74. STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
  75. STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
  76. STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9,
  77. STREAM_HANDSHAKE_BUSY_TRY_LATER = -10,
  78. STREAM_HANDSHAKE_INTERNAL_ERROR = -11,
  79. STREAM_HANDSHAKE_INITIALIZATION = -12,
  80. STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP = -13,
  81. STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER = -14,
  82. STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15,
  83. STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16,
  84. STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17,
  85. STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18,
  86. STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19,
  87. STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20,
  88. STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21,
  89. STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22,
  90. } STREAM_HANDSHAKE;
  91. // ----------------------------------------------------------------------------
  92. typedef struct {
  93. char *os_name;
  94. char *os_id;
  95. char *os_version;
  96. char *kernel_name;
  97. char *kernel_version;
  98. } stream_encoded_t;
  99. #ifdef ENABLE_RRDPUSH_COMPRESSION
  100. // signature MUST end with a newline
  101. #define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
  102. #define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
  103. #define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4
  104. struct compressor_state {
  105. bool initialized;
  106. char *compression_result_buffer;
  107. size_t compression_result_buffer_size;
  108. struct {
  109. void *lz4_stream;
  110. char *input_ring_buffer;
  111. size_t input_ring_buffer_size;
  112. size_t input_ring_buffer_pos;
  113. } stream;
  114. size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
  115. void (*destroy)(struct compressor_state **state);
  116. };
  117. void rrdpush_compressor_reset(struct compressor_state *state);
  118. void rrdpush_compressor_destroy(struct compressor_state *state);
  119. size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out);
  120. struct decompressor_state {
  121. bool initialized;
  122. size_t signature_size;
  123. size_t total_compressed;
  124. size_t total_uncompressed;
  125. size_t packet_count;
  126. struct {
  127. void *lz4_stream;
  128. char *buffer;
  129. size_t size;
  130. size_t write_at;
  131. size_t read_at;
  132. } stream;
  133. };
  134. void rrdpush_decompressor_destroy(struct decompressor_state *state);
  135. void rrdpush_decompressor_reset(struct decompressor_state *state);
  136. size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
  137. static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) {
  138. if (unlikely(!data || !data_size))
  139. return 0;
  140. if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
  141. return 0;
  142. uint32_t sign = *(uint32_t *)data;
  143. if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
  144. return 0;
  145. size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
  146. return length;
  147. }
  148. static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
  149. if(unlikely(state->stream.read_at != state->stream.write_at))
  150. fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
  151. return rrdpush_decompress_decode_header(header, header_size);
  152. }
  153. static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
  154. if(unlikely(state->stream.read_at > state->stream.write_at))
  155. fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
  156. return state->stream.write_at - state->stream.read_at;
  157. }
  158. static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
  159. if (unlikely(!state || !size || !dst))
  160. return 0;
  161. size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);
  162. if(unlikely(!remaining))
  163. return 0;
  164. size_t bytes_to_return = size;
  165. if(bytes_to_return > remaining)
  166. bytes_to_return = remaining;
  167. memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return);
  168. state->stream.read_at += bytes_to_return;
  169. if(unlikely(state->stream.read_at > state->stream.write_at))
  170. fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
  171. return bytes_to_return;
  172. }
  173. #endif
  174. // Thread-local storage
  175. // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
  176. typedef enum __attribute__((packed)) {
  177. STREAM_TRAFFIC_TYPE_REPLICATION = 0,
  178. STREAM_TRAFFIC_TYPE_FUNCTIONS,
  179. STREAM_TRAFFIC_TYPE_METADATA,
  180. STREAM_TRAFFIC_TYPE_DATA,
  181. // terminator
  182. STREAM_TRAFFIC_TYPE_MAX,
  183. } STREAM_TRAFFIC_TYPE;
  184. typedef enum __attribute__((packed)) {
  185. SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
  186. SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
  187. } SENDER_FLAGS;
  188. struct sender_state {
  189. RRDHOST *host;
  190. pid_t tid; // the thread id of the sender, from gettid()
  191. SENDER_FLAGS flags;
  192. int timeout;
  193. int default_port;
  194. uint32_t reconnect_delay;
  195. char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
  196. size_t begin;
  197. size_t reconnects_counter;
  198. size_t sent_bytes;
  199. size_t sent_bytes_on_this_connection;
  200. size_t send_attempts;
  201. time_t last_traffic_seen_t;
  202. time_t last_state_since_t; // the timestamp of the last state (online/offline) change
  203. size_t not_connected_loops;
  204. // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
  205. // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
  206. SPINLOCK spinlock;
  207. struct circular_buffer *buffer;
  208. char read_buffer[PLUGINSD_LINE_MAX + 1];
  209. ssize_t read_len;
  210. STREAM_CAPABILITIES capabilities;
  211. size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
  212. int rrdpush_sender_pipe[2]; // collector to sender thread signaling
  213. int rrdpush_sender_socket;
  214. uint16_t hops;
  215. #ifdef ENABLE_RRDPUSH_COMPRESSION
  216. struct compressor_state compressor;
  217. #endif // ENABLE_RRDPUSH_COMPRESSION
  218. #ifdef ENABLE_HTTPS
  219. NETDATA_SSL ssl; // structure used to encrypt the connection
  220. #endif
  221. struct {
  222. bool shutdown;
  223. STREAM_HANDSHAKE reason;
  224. } exit;
  225. struct {
  226. DICTIONARY *requests; // de-duplication of replication requests, per chart
  227. time_t oldest_request_after_t; // the timestamp of the oldest replication request
  228. time_t latest_completed_before_t; // the timestamp of the latest replication request
  229. struct {
  230. size_t pending_requests; // the currently outstanding replication requests
  231. size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
  232. bool reached_max; // true when the sender buffer should not get more replication responses
  233. } atomic;
  234. } replication;
  235. struct {
  236. bool pending_data;
  237. size_t buffer_used_percentage; // the current utilization of the sending buffer
  238. usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
  239. time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
  240. } atomic;
  241. };
  242. #define sender_lock(sender) spinlock_lock(&(sender)->spinlock)
  243. #define sender_unlock(sender) spinlock_unlock(&(sender)->spinlock)
  244. #define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
  245. #define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
  246. #define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
  247. #define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
  248. #define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
  249. #define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
  250. #define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
  251. #define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED)
  252. #define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
  253. #define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED)
  254. #define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
  255. #define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
  256. #define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
  257. #define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
  258. #define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED)
  259. #define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED)
  260. #define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
  261. #define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
  262. #define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
  263. /*
  264. typedef enum {
  265. STREAM_NODE_INSTANCE_FEATURE_CLOUD_ONLINE = (1 << 0),
  266. STREAM_NODE_INSTANCE_FEATURE_VIRTUAL_HOST = (1 << 1),
  267. STREAM_NODE_INSTANCE_FEATURE_HEALTH_ENABLED = (1 << 2),
  268. STREAM_NODE_INSTANCE_FEATURE_ML_SELF = (1 << 3),
  269. STREAM_NODE_INSTANCE_FEATURE_ML_RECEIVED = (1 << 4),
  270. STREAM_NODE_INSTANCE_FEATURE_SSL = (1 << 5),
  271. } STREAM_NODE_INSTANCE_FEATURES;
  272. typedef struct stream_node_instance {
  273. uuid_t uuid;
  274. STRING *agent;
  275. STREAM_NODE_INSTANCE_FEATURES features;
  276. uint32_t hops;
  277. // receiver information on that agent
  278. int32_t capabilities;
  279. uint32_t local_port;
  280. uint32_t remote_port;
  281. STRING *local_ip;
  282. STRING *remote_ip;
  283. } STREAM_NODE_INSTANCE;
  284. */
  285. struct buffered_reader {
  286. ssize_t read_len;
  287. ssize_t pos;
  288. char read_buffer[PLUGINSD_LINE_MAX + 1];
  289. };
  290. char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size);
  291. static inline void buffered_reader_init(struct buffered_reader *reader) {
  292. reader->read_buffer[0] = '\0';
  293. reader->read_len = 0;
  294. reader->pos = 0;
  295. }
  296. struct receiver_state {
  297. RRDHOST *host;
  298. pid_t tid;
  299. netdata_thread_t thread;
  300. int fd;
  301. char *key;
  302. char *hostname;
  303. char *registry_hostname;
  304. char *machine_guid;
  305. char *os;
  306. char *timezone; // Unused?
  307. char *abbrev_timezone;
  308. int32_t utc_offset;
  309. char *tags;
  310. char *client_ip; // Duplicated in pluginsd
  311. char *client_port; // Duplicated in pluginsd
  312. char *program_name; // Duplicated in pluginsd
  313. char *program_version;
  314. struct rrdhost_system_info *system_info;
  315. STREAM_CAPABILITIES capabilities;
  316. time_t last_msg_t;
  317. struct buffered_reader reader;
  318. uint16_t hops;
  319. struct {
  320. bool shutdown; // signal the streaming parser to exit
  321. STREAM_HANDSHAKE reason;
  322. } exit;
  323. struct {
  324. RRD_MEMORY_MODE mode;
  325. int history;
  326. int update_every;
  327. int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO
  328. time_t alarms_delay;
  329. uint32_t alarms_history;
  330. int rrdpush_enabled;
  331. char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig
  332. char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig
  333. bool rrdpush_enable_replication;
  334. time_t rrdpush_seconds_to_replicate;
  335. time_t rrdpush_replication_step;
  336. char *rrdpush_destination; // DONT FREE - it is allocated in appconfig
  337. unsigned int rrdpush_compression;
  338. } config;
  339. #ifdef ENABLE_HTTPS
  340. NETDATA_SSL ssl;
  341. #endif
  342. time_t replication_first_time_t;
  343. #ifdef ENABLE_RRDPUSH_COMPRESSION
  344. struct decompressor_state decompressor;
  345. #endif // ENABLE_RRDPUSH_COMPRESSION
  346. /*
  347. struct {
  348. uint32_t count;
  349. STREAM_NODE_INSTANCE *array;
  350. } instances;
  351. */
  352. };
  353. struct rrdpush_destinations {
  354. STRING *destination;
  355. bool ssl;
  356. uint32_t attempts;
  357. time_t since;
  358. time_t postpone_reconnection_until;
  359. STREAM_HANDSHAKE reason;
  360. struct rrdpush_destinations *prev;
  361. struct rrdpush_destinations *next;
  362. };
  363. extern unsigned int default_rrdpush_enabled;
  364. #ifdef ENABLE_RRDPUSH_COMPRESSION
  365. extern unsigned int default_rrdpush_compression_enabled;
  366. #endif // ENABLE_RRDPUSH_COMPRESSION
  367. extern char *default_rrdpush_destination;
  368. extern char *default_rrdpush_api_key;
  369. extern char *default_rrdpush_send_charts_matching;
  370. extern bool default_rrdpush_enable_replication;
  371. extern time_t default_rrdpush_seconds_to_replicate;
  372. extern time_t default_rrdpush_replication_step;
  373. extern unsigned int remote_clock_resync_iterations;
  374. void rrdpush_destinations_init(RRDHOST *host);
  375. void rrdpush_destinations_free(RRDHOST *host);
  376. BUFFER *sender_start(struct sender_state *s);
  377. void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type);
  378. int rrdpush_init();
  379. bool rrdpush_receiver_needs_dbengine();
  380. int configured_as_parent();
  381. typedef struct rrdset_stream_buffer {
  382. STREAM_CAPABILITIES capabilities;
  383. bool v2;
  384. bool begin_v2_added;
  385. time_t wall_clock_time;
  386. uint64_t rrdset_flags; // RRDSET_FLAGS
  387. time_t last_point_end_time_s;
  388. BUFFER *wb;
  389. } RRDSET_STREAM_BUFFER;
  390. RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time);
  391. void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
  392. void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
  393. void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags);
  394. bool rrdset_push_chart_definition_now(RRDSET *st);
  395. void *rrdpush_sender_thread(void *ptr);
  396. void rrdpush_send_host_labels(RRDHOST *host);
  397. void rrdpush_send_claimed_id(RRDHOST *host);
  398. void rrdpush_send_global_functions(RRDHOST *host);
  399. #define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
  400. #define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
  401. int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string);
  402. void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait);
  403. void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
  404. void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
  405. int connect_to_one_of_destinations(
  406. RRDHOST *host,
  407. int default_port,
  408. struct timeval *timeout,
  409. size_t *reconnects_counter,
  410. char *connected_to,
  411. size_t connected_to_size,
  412. struct rrdpush_destinations **destination);
  413. void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
  414. #ifdef ENABLE_RRDPUSH_COMPRESSION
  415. struct compressor_state *create_compressor();
  416. #endif // ENABLE_RRDPUSH_COMPRESSION
  417. void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
  418. const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
  419. void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
  420. void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
  421. void log_receiver_capabilities(struct receiver_state *rpt);
  422. void log_sender_capabilities(struct sender_state *s);
  423. STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender);
  424. int32_t stream_capabilities_to_vn(uint32_t caps);
  425. void receiver_state_free(struct receiver_state *rpt);
  426. bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason);
  427. void sender_thread_buffer_free(void);
  428. #include "replication.h"
  429. typedef enum __attribute__((packed)) {
  430. RRDHOST_DB_STATUS_INITIALIZING = 0,
  431. RRDHOST_DB_STATUS_QUERYABLE,
  432. } RRDHOST_DB_STATUS;
  433. static inline const char *rrdhost_db_status_to_string(RRDHOST_DB_STATUS status) {
  434. switch(status) {
  435. default:
  436. case RRDHOST_DB_STATUS_INITIALIZING:
  437. return "initializing";
  438. case RRDHOST_DB_STATUS_QUERYABLE:
  439. return "online";
  440. }
  441. }
  442. typedef enum __attribute__((packed)) {
  443. RRDHOST_DB_LIVENESS_STALE = 0,
  444. RRDHOST_DB_LIVENESS_LIVE,
  445. } RRDHOST_DB_LIVENESS;
  446. static inline const char *rrdhost_db_liveness_to_string(RRDHOST_DB_LIVENESS status) {
  447. switch(status) {
  448. default:
  449. case RRDHOST_DB_LIVENESS_STALE:
  450. return "stale";
  451. case RRDHOST_DB_LIVENESS_LIVE:
  452. return "live";
  453. }
  454. }
  455. typedef enum __attribute__((packed)) {
  456. RRDHOST_INGEST_STATUS_ARCHIVED = 0,
  457. RRDHOST_INGEST_STATUS_INITIALIZING,
  458. RRDHOST_INGEST_STATUS_REPLICATING,
  459. RRDHOST_INGEST_STATUS_ONLINE,
  460. RRDHOST_INGEST_STATUS_OFFLINE,
  461. } RRDHOST_INGEST_STATUS;
  462. static inline const char *rrdhost_ingest_status_to_string(RRDHOST_INGEST_STATUS status) {
  463. switch(status) {
  464. case RRDHOST_INGEST_STATUS_ARCHIVED:
  465. return "archived";
  466. case RRDHOST_INGEST_STATUS_INITIALIZING:
  467. return "initializing";
  468. case RRDHOST_INGEST_STATUS_REPLICATING:
  469. return "replicating";
  470. case RRDHOST_INGEST_STATUS_ONLINE:
  471. return "online";
  472. default:
  473. case RRDHOST_INGEST_STATUS_OFFLINE:
  474. return "offline";
  475. }
  476. }
  477. typedef enum __attribute__((packed)) {
  478. RRDHOST_INGEST_TYPE_LOCALHOST = 0,
  479. RRDHOST_INGEST_TYPE_VIRTUAL,
  480. RRDHOST_INGEST_TYPE_CHILD,
  481. RRDHOST_INGEST_TYPE_ARCHIVED,
  482. } RRDHOST_INGEST_TYPE;
  483. static inline const char *rrdhost_ingest_type_to_string(RRDHOST_INGEST_TYPE type) {
  484. switch(type) {
  485. case RRDHOST_INGEST_TYPE_LOCALHOST:
  486. return "localhost";
  487. case RRDHOST_INGEST_TYPE_VIRTUAL:
  488. return "virtual";
  489. case RRDHOST_INGEST_TYPE_CHILD:
  490. return "child";
  491. default:
  492. case RRDHOST_INGEST_TYPE_ARCHIVED:
  493. return "archived";
  494. }
  495. }
  496. typedef enum __attribute__((packed)) {
  497. RRDHOST_STREAM_STATUS_DISABLED = 0,
  498. RRDHOST_STREAM_STATUS_REPLICATING,
  499. RRDHOST_STREAM_STATUS_ONLINE,
  500. RRDHOST_STREAM_STATUS_OFFLINE,
  501. } RRDHOST_STREAMING_STATUS;
  502. static inline const char *rrdhost_streaming_status_to_string(RRDHOST_STREAMING_STATUS status) {
  503. switch(status) {
  504. case RRDHOST_STREAM_STATUS_DISABLED:
  505. return "disabled";
  506. case RRDHOST_STREAM_STATUS_REPLICATING:
  507. return "replicating";
  508. case RRDHOST_STREAM_STATUS_ONLINE:
  509. return "online";
  510. default:
  511. case RRDHOST_STREAM_STATUS_OFFLINE:
  512. return "offline";
  513. }
  514. }
  515. typedef enum __attribute__((packed)) {
  516. RRDHOST_ML_STATUS_DISABLED = 0,
  517. RRDHOST_ML_STATUS_OFFLINE,
  518. RRDHOST_ML_STATUS_RUNNING,
  519. } RRDHOST_ML_STATUS;
  520. static inline const char *rrdhost_ml_status_to_string(RRDHOST_ML_STATUS status) {
  521. switch(status) {
  522. case RRDHOST_ML_STATUS_RUNNING:
  523. return "online";
  524. case RRDHOST_ML_STATUS_OFFLINE:
  525. return "offline";
  526. default:
  527. case RRDHOST_ML_STATUS_DISABLED:
  528. return "disabled";
  529. }
  530. }
  531. typedef enum __attribute__((packed)) {
  532. RRDHOST_ML_TYPE_DISABLED = 0,
  533. RRDHOST_ML_TYPE_SELF,
  534. RRDHOST_ML_TYPE_RECEIVED,
  535. } RRDHOST_ML_TYPE;
  536. static inline const char *rrdhost_ml_type_to_string(RRDHOST_ML_TYPE type) {
  537. switch(type) {
  538. case RRDHOST_ML_TYPE_SELF:
  539. return "self";
  540. case RRDHOST_ML_TYPE_RECEIVED:
  541. return "received";
  542. default:
  543. case RRDHOST_ML_TYPE_DISABLED:
  544. return "disabled";
  545. }
  546. }
  547. typedef enum __attribute__((packed)) {
  548. RRDHOST_HEALTH_STATUS_DISABLED = 0,
  549. RRDHOST_HEALTH_STATUS_INITIALIZING,
  550. RRDHOST_HEALTH_STATUS_RUNNING,
  551. } RRDHOST_HEALTH_STATUS;
  552. static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS status) {
  553. switch(status) {
  554. default:
  555. case RRDHOST_HEALTH_STATUS_DISABLED:
  556. return "disabled";
  557. case RRDHOST_HEALTH_STATUS_INITIALIZING:
  558. return "initializing";
  559. case RRDHOST_HEALTH_STATUS_RUNNING:
  560. return "online";
  561. }
  562. }
  563. typedef struct rrdhost_status {
  564. RRDHOST *host;
  565. time_t now;
  566. struct {
  567. RRDHOST_DB_STATUS status;
  568. RRDHOST_DB_LIVENESS liveness;
  569. RRD_MEMORY_MODE mode;
  570. time_t first_time_s;
  571. time_t last_time_s;
  572. size_t metrics;
  573. size_t instances;
  574. size_t contexts;
  575. } db;
  576. struct {
  577. RRDHOST_ML_STATUS status;
  578. RRDHOST_ML_TYPE type;
  579. struct ml_metrics_statistics metrics;
  580. } ml;
  581. struct {
  582. size_t hops;
  583. RRDHOST_INGEST_TYPE type;
  584. RRDHOST_INGEST_STATUS status;
  585. SOCKET_PEERS peers;
  586. bool ssl;
  587. STREAM_CAPABILITIES capabilities;
  588. uint32_t id;
  589. time_t since;
  590. STREAM_HANDSHAKE reason;
  591. struct {
  592. bool in_progress;
  593. NETDATA_DOUBLE completion;
  594. size_t instances;
  595. } replication;
  596. } ingest;
  597. struct {
  598. size_t hops;
  599. RRDHOST_STREAMING_STATUS status;
  600. SOCKET_PEERS peers;
  601. bool ssl;
  602. bool compression;
  603. STREAM_CAPABILITIES capabilities;
  604. uint32_t id;
  605. time_t since;
  606. STREAM_HANDSHAKE reason;
  607. struct {
  608. bool in_progress;
  609. NETDATA_DOUBLE completion;
  610. size_t instances;
  611. } replication;
  612. size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
  613. } stream;
  614. struct {
  615. RRDHOST_HEALTH_STATUS status;
  616. struct {
  617. uint32_t undefined;
  618. uint32_t uninitialized;
  619. uint32_t clear;
  620. uint32_t warning;
  621. uint32_t critical;
  622. } alerts;
  623. } health;
  624. } RRDHOST_STATUS;
  625. void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
  626. bool rrdhost_state_cloud_emulation(RRDHOST *host);
  627. #endif //NETDATA_RRDPUSH_H