rrdpush.h 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifndef NETDATA_RRDPUSH_H
  3. #define NETDATA_RRDPUSH_H 1
  4. #include "libnetdata/libnetdata.h"
  5. #include "daemon/common.h"
  6. #include "web/server/web_client.h"
  7. #include "database/rrdfunctions.h"
  8. #include "database/rrd.h"
  9. #define CONNECTED_TO_SIZE 100
  10. #define CBUFFER_INITIAL_SIZE (16 * 1024)
  11. #define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2)
  12. // ----------------------------------------------------------------------------
  13. // obsolete versions - do not use anymore
  14. #define STREAM_OLD_VERSION_CLAIM 3
  15. #define STREAM_OLD_VERSION_CLABELS 4
  16. #define STREAM_OLD_VERSION_LZ4 5
  17. // ----------------------------------------------------------------------------
  18. // capabilities negotiation
  19. typedef enum {
  20. STREAM_CAP_NONE = 0,
  21. // do not use the first 3 bits
  22. // they used to be versions 1, 2 and 3
  23. // before we introduce capabilities
  24. STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol
  25. STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels)
  26. STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol)
  27. // v3 = claiming supported
  28. // v4 = chart labels supported
  29. // v5 = lz4 compression supported
  30. STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported
  31. STREAM_CAP_HLABELS = (1 << 7), // host labels supported
  32. STREAM_CAP_CLAIM = (1 << 8), // claiming supported
  33. STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
  34. STREAM_CAP_LZ4 = (1 << 10), // lz4 compression supported
  35. STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
  36. STREAM_CAP_REPLICATION = (1 << 12), // replication supported
  37. STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
  38. STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
  39. STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
  40. STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
  41. // STREAM_CAP_DYNCFG = (1 << 17), // leave this unused for as long as possible
  42. STREAM_CAP_SLOTS = (1 << 18), // the sender can appoint a unique slot for each chart
  43. STREAM_CAP_ZSTD = (1 << 19), // ZSTD compression supported
  44. STREAM_CAP_GZIP = (1 << 20), // GZIP compression supported
  45. STREAM_CAP_BROTLI = (1 << 21), // BROTLI compression supported
  46. STREAM_CAP_PROGRESS = (1 << 22), // Functions PROGRESS support
  47. STREAM_CAP_DYNCFG = (1 << 23), // support for DYNCFG
  48. STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
  49. // this must be signed int, so don't use the last bit
  50. // needed for negotiating errors between parent and child
  51. } STREAM_CAPABILITIES;
  52. #ifdef ENABLE_LZ4
  53. #define STREAM_CAP_LZ4_AVAILABLE STREAM_CAP_LZ4
  54. #else
  55. #define STREAM_CAP_LZ4_AVAILABLE 0
  56. #endif // ENABLE_LZ4
  57. #ifdef ENABLE_ZSTD
  58. #define STREAM_CAP_ZSTD_AVAILABLE STREAM_CAP_ZSTD
  59. #else
  60. #define STREAM_CAP_ZSTD_AVAILABLE 0
  61. #endif // ENABLE_ZSTD
  62. #ifdef ENABLE_BROTLI
  63. #define STREAM_CAP_BROTLI_AVAILABLE STREAM_CAP_BROTLI
  64. #else
  65. #define STREAM_CAP_BROTLI_AVAILABLE 0
  66. #endif // ENABLE_BROTLI
  67. #define STREAM_CAP_COMPRESSIONS_AVAILABLE (STREAM_CAP_LZ4_AVAILABLE|STREAM_CAP_ZSTD_AVAILABLE|STREAM_CAP_BROTLI_AVAILABLE|STREAM_CAP_GZIP)
  68. extern STREAM_CAPABILITIES globally_disabled_capabilities;
  69. STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
  70. #define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
  71. static inline bool stream_has_more_than_one_capability_of(STREAM_CAPABILITIES caps, STREAM_CAPABILITIES mask) {
  72. STREAM_CAPABILITIES common = (STREAM_CAPABILITIES)(caps & mask);
  73. return (common & (common - 1)) != 0 && common != 0;
  74. }
  75. // ----------------------------------------------------------------------------
  76. // stream handshake
  77. #define HTTP_HEADER_SIZE 8192
  78. #define STREAMING_PROTOCOL_VERSION "1.1"
  79. #define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..."
  80. #define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
  81. #define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
  82. #define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
  83. #define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
  84. #define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
  85. #define START_STREAMING_ERROR_BUSY_TRY_LATER "The server is too busy now to accept this request. Try later."
  86. #define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later."
  87. #define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."
  88. #define RRDPUSH_STATUS_CONNECTED "CONNECTED"
  89. #define RRDPUSH_STATUS_ALREADY_CONNECTED "ALREADY CONNECTED"
  90. #define RRDPUSH_STATUS_DISCONNECTED "DISCONNECTED"
  91. #define RRDPUSH_STATUS_RATE_LIMIT "RATE LIMIT TRY LATER"
  92. #define RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS "INITIALIZATION IN PROGRESS RETRY LATER"
  93. #define RRDPUSH_STATUS_INTERNAL_SERVER_ERROR "INTERNAL SERVER ERROR DROPPING CONNECTION"
  94. #define RRDPUSH_STATUS_DUPLICATE_RECEIVER "DUPLICATE RECEIVER DROPPING CONNECTION"
  95. #define RRDPUSH_STATUS_CANT_REPLY "CANT REPLY DROPPING CONNECTION"
  96. #define RRDPUSH_STATUS_NO_HOSTNAME "NO HOSTNAME PERMISSION DENIED"
  97. #define RRDPUSH_STATUS_NO_API_KEY "NO API KEY PERMISSION DENIED"
  98. #define RRDPUSH_STATUS_INVALID_API_KEY "INVALID API KEY PERMISSION DENIED"
  99. #define RRDPUSH_STATUS_NO_MACHINE_GUID "NO MACHINE GUID PERMISSION DENIED"
  100. #define RRDPUSH_STATUS_MACHINE_GUID_DISABLED "MACHINE GUID DISABLED PERMISSION DENIED"
  101. #define RRDPUSH_STATUS_INVALID_MACHINE_GUID "INVALID MACHINE GUID PERMISSION DENIED"
  102. #define RRDPUSH_STATUS_API_KEY_DISABLED "API KEY DISABLED PERMISSION DENIED"
  103. #define RRDPUSH_STATUS_NOT_ALLOWED_IP "NOT ALLOWED IP PERMISSION DENIED"
  104. #define RRDPUSH_STATUS_LOCALHOST "LOCALHOST PERMISSION DENIED"
  105. #define RRDPUSH_STATUS_PERMISSION_DENIED "PERMISSION DENIED"
  106. #define RRDPUSH_STATUS_BAD_HANDSHAKE "BAD HANDSHAKE"
  107. #define RRDPUSH_STATUS_TIMEOUT "TIMEOUT"
  108. #define RRDPUSH_STATUS_CANT_UPGRADE_CONNECTION "CANT UPGRADE CONNECTION"
  109. #define RRDPUSH_STATUS_SSL_ERROR "SSL ERROR"
  110. #define RRDPUSH_STATUS_INVALID_SSL_CERTIFICATE "INVALID SSL CERTIFICATE"
  111. #define RRDPUSH_STATUS_CANT_ESTABLISH_SSL_CONNECTION "CANT ESTABLISH SSL CONNECTION"
  112. typedef enum {
  113. STREAM_HANDSHAKE_OK_V3 = 3, // v3+
  114. STREAM_HANDSHAKE_OK_V2 = 2, // v2
  115. STREAM_HANDSHAKE_OK_V1 = 1, // v1
  116. STREAM_HANDSHAKE_NEVER = 0, // never tried to connect
  117. STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
  118. STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
  119. STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
  120. STREAM_HANDSHAKE_ERROR_DENIED = -4,
  121. STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5,
  122. STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
  123. STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
  124. STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
  125. STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9,
  126. STREAM_HANDSHAKE_BUSY_TRY_LATER = -10,
  127. STREAM_HANDSHAKE_INTERNAL_ERROR = -11,
  128. STREAM_HANDSHAKE_INITIALIZATION = -12,
  129. STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP = -13,
  130. STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER = -14,
  131. STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15,
  132. STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16,
  133. STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17,
  134. STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR = -18,
  135. STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19,
  136. STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20,
  137. STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21,
  138. STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22,
  139. STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER = -23,
  140. STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF = -24,
  141. STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED = -25,
  142. STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT = -26,
  143. STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE = -27,
  144. } STREAM_HANDSHAKE;
  145. // ----------------------------------------------------------------------------
  146. typedef struct {
  147. char *os_name;
  148. char *os_id;
  149. char *os_version;
  150. char *kernel_name;
  151. char *kernel_version;
  152. } stream_encoded_t;
  153. #include "compression.h"
  154. // Thread-local storage
  155. // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
  156. typedef enum __attribute__((packed)) {
  157. STREAM_TRAFFIC_TYPE_REPLICATION = 0,
  158. STREAM_TRAFFIC_TYPE_FUNCTIONS,
  159. STREAM_TRAFFIC_TYPE_METADATA,
  160. STREAM_TRAFFIC_TYPE_DATA,
  161. STREAM_TRAFFIC_TYPE_DYNCFG,
  162. // terminator
  163. STREAM_TRAFFIC_TYPE_MAX,
  164. } STREAM_TRAFFIC_TYPE;
  165. typedef enum __attribute__((packed)) {
  166. SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
  167. } SENDER_FLAGS;
  168. struct sender_state {
  169. RRDHOST *host;
  170. pid_t tid; // the thread id of the sender, from gettid()
  171. SENDER_FLAGS flags;
  172. int timeout;
  173. int default_port;
  174. uint32_t reconnect_delay;
  175. char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
  176. size_t begin;
  177. size_t reconnects_counter;
  178. size_t sent_bytes;
  179. size_t sent_bytes_on_this_connection;
  180. size_t send_attempts;
  181. time_t last_traffic_seen_t;
  182. time_t last_state_since_t; // the timestamp of the last state (online/offline) change
  183. size_t not_connected_loops;
  184. // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
  185. // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
  186. SPINLOCK spinlock;
  187. struct circular_buffer *buffer;
  188. char read_buffer[PLUGINSD_LINE_MAX + 1];
  189. ssize_t read_len;
  190. STREAM_CAPABILITIES capabilities;
  191. STREAM_CAPABILITIES disabled_capabilities;
  192. size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
  193. int rrdpush_sender_pipe[2]; // collector to sender thread signaling
  194. int rrdpush_sender_socket;
  195. uint16_t hops;
  196. struct line_splitter line;
  197. struct compressor_state compressor;
  198. #ifdef NETDATA_LOG_STREAM_SENDER
  199. FILE *stream_log_fp;
  200. #endif
  201. #ifdef ENABLE_HTTPS
  202. NETDATA_SSL ssl; // structure used to encrypt the connection
  203. #endif
  204. struct {
  205. bool shutdown;
  206. STREAM_HANDSHAKE reason;
  207. } exit;
  208. struct {
  209. DICTIONARY *requests; // de-duplication of replication requests, per chart
  210. time_t oldest_request_after_t; // the timestamp of the oldest replication request
  211. time_t latest_completed_before_t; // the timestamp of the latest replication request
  212. struct {
  213. size_t pending_requests; // the currently outstanding replication requests
  214. size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
  215. bool reached_max; // true when the sender buffer should not get more replication responses
  216. } atomic;
  217. } replication;
  218. struct {
  219. bool pending_data;
  220. size_t buffer_used_percentage; // the current utilization of the sending buffer
  221. usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
  222. time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
  223. } atomic;
  224. struct {
  225. bool intercept_input;
  226. const char *transaction;
  227. const char *timeout_s;
  228. const char *function;
  229. const char *source;
  230. BUFFER *payload;
  231. } functions;
  232. int parent_using_h2o;
  233. };
  234. #define sender_lock(sender) spinlock_lock(&(sender)->spinlock)
  235. #define sender_unlock(sender) spinlock_unlock(&(sender)->spinlock)
  236. #define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
  237. #define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
  238. #define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
  239. #define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
  240. #define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
  241. #define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
  242. #define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
  243. #define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED)
  244. #define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
  245. #define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED)
  246. #define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
  247. #define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
  248. #define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
  249. #define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
  250. #define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED)
  251. #define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED)
  252. #define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
  253. #define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
  254. #define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
  255. /*
  256. typedef enum {
  257. STREAM_NODE_INSTANCE_FEATURE_CLOUD_ONLINE = (1 << 0),
  258. STREAM_NODE_INSTANCE_FEATURE_VIRTUAL_HOST = (1 << 1),
  259. STREAM_NODE_INSTANCE_FEATURE_HEALTH_ENABLED = (1 << 2),
  260. STREAM_NODE_INSTANCE_FEATURE_ML_SELF = (1 << 3),
  261. STREAM_NODE_INSTANCE_FEATURE_ML_RECEIVED = (1 << 4),
  262. STREAM_NODE_INSTANCE_FEATURE_SSL = (1 << 5),
  263. } STREAM_NODE_INSTANCE_FEATURES;
  264. typedef struct stream_node_instance {
  265. uuid_t uuid;
  266. STRING *agent;
  267. STREAM_NODE_INSTANCE_FEATURES features;
  268. uint32_t hops;
  269. // receiver information on that agent
  270. int32_t capabilities;
  271. uint32_t local_port;
  272. uint32_t remote_port;
  273. STRING *local_ip;
  274. STRING *remote_ip;
  275. } STREAM_NODE_INSTANCE;
  276. */
  277. struct receiver_state {
  278. RRDHOST *host;
  279. pid_t tid;
  280. netdata_thread_t thread;
  281. int fd;
  282. char *key;
  283. char *hostname;
  284. char *registry_hostname;
  285. char *machine_guid;
  286. char *os;
  287. char *timezone; // Unused?
  288. char *abbrev_timezone;
  289. int32_t utc_offset;
  290. char *tags;
  291. char *client_ip; // Duplicated in pluginsd
  292. char *client_port; // Duplicated in pluginsd
  293. char *program_name; // Duplicated in pluginsd
  294. char *program_version;
  295. struct rrdhost_system_info *system_info;
  296. STREAM_CAPABILITIES capabilities;
  297. time_t last_msg_t;
  298. struct buffered_reader reader;
  299. uint16_t hops;
  300. struct {
  301. bool shutdown; // signal the streaming parser to exit
  302. STREAM_HANDSHAKE reason;
  303. } exit;
  304. struct {
  305. RRD_MEMORY_MODE mode;
  306. int history;
  307. int update_every;
  308. int health_enabled; // CONFIG_BOOLEAN_YES, CONFIG_BOOLEAN_NO, CONFIG_BOOLEAN_AUTO
  309. time_t alarms_delay;
  310. uint32_t alarms_history;
  311. int rrdpush_enabled;
  312. char *rrdpush_api_key; // DONT FREE - it is allocated in appconfig
  313. char *rrdpush_send_charts_matching; // DONT FREE - it is allocated in appconfig
  314. bool rrdpush_enable_replication;
  315. time_t rrdpush_seconds_to_replicate;
  316. time_t rrdpush_replication_step;
  317. char *rrdpush_destination; // DONT FREE - it is allocated in appconfig
  318. unsigned int rrdpush_compression;
  319. STREAM_CAPABILITIES compression_priorities[COMPRESSION_ALGORITHM_MAX];
  320. } config;
  321. #ifdef ENABLE_HTTPS
  322. NETDATA_SSL ssl;
  323. #endif
  324. time_t replication_first_time_t;
  325. struct decompressor_state decompressor;
  326. /*
  327. struct {
  328. uint32_t count;
  329. STREAM_NODE_INSTANCE *array;
  330. } instances;
  331. */
  332. #ifdef ENABLE_H2O
  333. void *h2o_ctx;
  334. #endif
  335. };
  336. #ifdef ENABLE_H2O
  337. #define is_h2o_rrdpush(x) ((x)->h2o_ctx != NULL)
  338. #define unless_h2o_rrdpush(x) if(!is_h2o_rrdpush(x))
  339. #endif
  340. struct rrdpush_destinations {
  341. STRING *destination;
  342. bool ssl;
  343. uint32_t attempts;
  344. time_t since;
  345. time_t postpone_reconnection_until;
  346. STREAM_HANDSHAKE reason;
  347. struct rrdpush_destinations *prev;
  348. struct rrdpush_destinations *next;
  349. };
  350. extern unsigned int default_rrdpush_enabled;
  351. extern unsigned int default_rrdpush_compression_enabled;
  352. extern char *default_rrdpush_destination;
  353. extern char *default_rrdpush_api_key;
  354. extern char *default_rrdpush_send_charts_matching;
  355. extern bool default_rrdpush_enable_replication;
  356. extern time_t default_rrdpush_seconds_to_replicate;
  357. extern time_t default_rrdpush_replication_step;
  358. extern unsigned int remote_clock_resync_iterations;
  359. void rrdpush_destinations_init(RRDHOST *host);
  360. void rrdpush_destinations_free(RRDHOST *host);
  361. BUFFER *sender_start(struct sender_state *s);
  362. void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type);
  363. int rrdpush_init();
  364. bool rrdpush_receiver_needs_dbengine();
  365. int configured_as_parent();
  366. typedef struct rrdset_stream_buffer {
  367. STREAM_CAPABILITIES capabilities;
  368. bool v2;
  369. bool begin_v2_added;
  370. time_t wall_clock_time;
  371. uint64_t rrdset_flags; // RRDSET_FLAGS
  372. time_t last_point_end_time_s;
  373. BUFFER *wb;
  374. } RRDSET_STREAM_BUFFER;
  375. RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time);
  376. void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
  377. void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
  378. void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags);
  379. bool rrdset_push_chart_definition_now(RRDSET *st);
  380. void *rrdpush_sender_thread(void *ptr);
  381. void rrdpush_send_host_labels(RRDHOST *host);
  382. void rrdpush_send_claimed_id(RRDHOST *host);
  383. void rrdpush_send_global_functions(RRDHOST *host);
  384. int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx);
  385. void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait);
  386. void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
  387. int connect_to_one_of_destinations(
  388. RRDHOST *host,
  389. int default_port,
  390. struct timeval *timeout,
  391. size_t *reconnects_counter,
  392. char *connected_to,
  393. size_t connected_to_size,
  394. struct rrdpush_destinations **destination);
  395. void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
  396. void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
  397. const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
  398. void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
  399. void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority);
  400. void log_receiver_capabilities(struct receiver_state *rpt);
  401. void log_sender_capabilities(struct sender_state *s);
  402. STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender);
  403. int32_t stream_capabilities_to_vn(uint32_t caps);
  404. void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps);
  405. void receiver_state_free(struct receiver_state *rpt);
  406. bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason);
  407. void sender_thread_buffer_free(void);
  408. #include "replication.h"
  409. typedef enum __attribute__((packed)) {
  410. RRDHOST_DB_STATUS_INITIALIZING = 0,
  411. RRDHOST_DB_STATUS_QUERYABLE,
  412. } RRDHOST_DB_STATUS;
  413. static inline const char *rrdhost_db_status_to_string(RRDHOST_DB_STATUS status) {
  414. switch(status) {
  415. default:
  416. case RRDHOST_DB_STATUS_INITIALIZING:
  417. return "initializing";
  418. case RRDHOST_DB_STATUS_QUERYABLE:
  419. return "online";
  420. }
  421. }
  422. typedef enum __attribute__((packed)) {
  423. RRDHOST_DB_LIVENESS_STALE = 0,
  424. RRDHOST_DB_LIVENESS_LIVE,
  425. } RRDHOST_DB_LIVENESS;
  426. static inline const char *rrdhost_db_liveness_to_string(RRDHOST_DB_LIVENESS status) {
  427. switch(status) {
  428. default:
  429. case RRDHOST_DB_LIVENESS_STALE:
  430. return "stale";
  431. case RRDHOST_DB_LIVENESS_LIVE:
  432. return "live";
  433. }
  434. }
  435. typedef enum __attribute__((packed)) {
  436. RRDHOST_INGEST_STATUS_ARCHIVED = 0,
  437. RRDHOST_INGEST_STATUS_INITIALIZING,
  438. RRDHOST_INGEST_STATUS_REPLICATING,
  439. RRDHOST_INGEST_STATUS_ONLINE,
  440. RRDHOST_INGEST_STATUS_OFFLINE,
  441. } RRDHOST_INGEST_STATUS;
  442. static inline const char *rrdhost_ingest_status_to_string(RRDHOST_INGEST_STATUS status) {
  443. switch(status) {
  444. case RRDHOST_INGEST_STATUS_ARCHIVED:
  445. return "archived";
  446. case RRDHOST_INGEST_STATUS_INITIALIZING:
  447. return "initializing";
  448. case RRDHOST_INGEST_STATUS_REPLICATING:
  449. return "replicating";
  450. case RRDHOST_INGEST_STATUS_ONLINE:
  451. return "online";
  452. default:
  453. case RRDHOST_INGEST_STATUS_OFFLINE:
  454. return "offline";
  455. }
  456. }
  457. typedef enum __attribute__((packed)) {
  458. RRDHOST_INGEST_TYPE_LOCALHOST = 0,
  459. RRDHOST_INGEST_TYPE_VIRTUAL,
  460. RRDHOST_INGEST_TYPE_CHILD,
  461. RRDHOST_INGEST_TYPE_ARCHIVED,
  462. } RRDHOST_INGEST_TYPE;
  463. static inline const char *rrdhost_ingest_type_to_string(RRDHOST_INGEST_TYPE type) {
  464. switch(type) {
  465. case RRDHOST_INGEST_TYPE_LOCALHOST:
  466. return "localhost";
  467. case RRDHOST_INGEST_TYPE_VIRTUAL:
  468. return "virtual";
  469. case RRDHOST_INGEST_TYPE_CHILD:
  470. return "child";
  471. default:
  472. case RRDHOST_INGEST_TYPE_ARCHIVED:
  473. return "archived";
  474. }
  475. }
  476. typedef enum __attribute__((packed)) {
  477. RRDHOST_STREAM_STATUS_DISABLED = 0,
  478. RRDHOST_STREAM_STATUS_REPLICATING,
  479. RRDHOST_STREAM_STATUS_ONLINE,
  480. RRDHOST_STREAM_STATUS_OFFLINE,
  481. } RRDHOST_STREAMING_STATUS;
  482. static inline const char *rrdhost_streaming_status_to_string(RRDHOST_STREAMING_STATUS status) {
  483. switch(status) {
  484. case RRDHOST_STREAM_STATUS_DISABLED:
  485. return "disabled";
  486. case RRDHOST_STREAM_STATUS_REPLICATING:
  487. return "replicating";
  488. case RRDHOST_STREAM_STATUS_ONLINE:
  489. return "online";
  490. default:
  491. case RRDHOST_STREAM_STATUS_OFFLINE:
  492. return "offline";
  493. }
  494. }
  495. typedef enum __attribute__((packed)) {
  496. RRDHOST_ML_STATUS_DISABLED = 0,
  497. RRDHOST_ML_STATUS_OFFLINE,
  498. RRDHOST_ML_STATUS_RUNNING,
  499. } RRDHOST_ML_STATUS;
  500. static inline const char *rrdhost_ml_status_to_string(RRDHOST_ML_STATUS status) {
  501. switch(status) {
  502. case RRDHOST_ML_STATUS_RUNNING:
  503. return "online";
  504. case RRDHOST_ML_STATUS_OFFLINE:
  505. return "offline";
  506. default:
  507. case RRDHOST_ML_STATUS_DISABLED:
  508. return "disabled";
  509. }
  510. }
  511. typedef enum __attribute__((packed)) {
  512. RRDHOST_ML_TYPE_DISABLED = 0,
  513. RRDHOST_ML_TYPE_SELF,
  514. RRDHOST_ML_TYPE_RECEIVED,
  515. } RRDHOST_ML_TYPE;
  516. static inline const char *rrdhost_ml_type_to_string(RRDHOST_ML_TYPE type) {
  517. switch(type) {
  518. case RRDHOST_ML_TYPE_SELF:
  519. return "self";
  520. case RRDHOST_ML_TYPE_RECEIVED:
  521. return "received";
  522. default:
  523. case RRDHOST_ML_TYPE_DISABLED:
  524. return "disabled";
  525. }
  526. }
  527. typedef enum __attribute__((packed)) {
  528. RRDHOST_HEALTH_STATUS_DISABLED = 0,
  529. RRDHOST_HEALTH_STATUS_INITIALIZING,
  530. RRDHOST_HEALTH_STATUS_RUNNING,
  531. } RRDHOST_HEALTH_STATUS;
  532. static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS status) {
  533. switch(status) {
  534. default:
  535. case RRDHOST_HEALTH_STATUS_DISABLED:
  536. return "disabled";
  537. case RRDHOST_HEALTH_STATUS_INITIALIZING:
  538. return "initializing";
  539. case RRDHOST_HEALTH_STATUS_RUNNING:
  540. return "online";
  541. }
  542. }
  543. typedef enum __attribute__((packed)) {
  544. RRDHOST_DYNCFG_STATUS_UNAVAILABLE = 0,
  545. RRDHOST_DYNCFG_STATUS_AVAILABLE,
  546. } RRDHOST_DYNCFG_STATUS;
  547. static inline const char *rrdhost_dyncfg_status_to_string(RRDHOST_DYNCFG_STATUS status) {
  548. switch(status) {
  549. default:
  550. case RRDHOST_DYNCFG_STATUS_UNAVAILABLE:
  551. return "unavailable";
  552. case RRDHOST_DYNCFG_STATUS_AVAILABLE:
  553. return "online";
  554. }
  555. }
  556. typedef struct rrdhost_status {
  557. RRDHOST *host;
  558. time_t now;
  559. struct {
  560. RRDHOST_DYNCFG_STATUS status;
  561. } dyncfg;
  562. struct {
  563. RRDHOST_DB_STATUS status;
  564. RRDHOST_DB_LIVENESS liveness;
  565. RRD_MEMORY_MODE mode;
  566. time_t first_time_s;
  567. time_t last_time_s;
  568. size_t metrics;
  569. size_t instances;
  570. size_t contexts;
  571. } db;
  572. struct {
  573. RRDHOST_ML_STATUS status;
  574. RRDHOST_ML_TYPE type;
  575. struct ml_metrics_statistics metrics;
  576. } ml;
  577. struct {
  578. size_t hops;
  579. RRDHOST_INGEST_TYPE type;
  580. RRDHOST_INGEST_STATUS status;
  581. SOCKET_PEERS peers;
  582. bool ssl;
  583. STREAM_CAPABILITIES capabilities;
  584. uint32_t id;
  585. time_t since;
  586. STREAM_HANDSHAKE reason;
  587. struct {
  588. bool in_progress;
  589. NETDATA_DOUBLE completion;
  590. size_t instances;
  591. } replication;
  592. } ingest;
  593. struct {
  594. size_t hops;
  595. RRDHOST_STREAMING_STATUS status;
  596. SOCKET_PEERS peers;
  597. bool ssl;
  598. bool compression;
  599. STREAM_CAPABILITIES capabilities;
  600. uint32_t id;
  601. time_t since;
  602. STREAM_HANDSHAKE reason;
  603. struct {
  604. bool in_progress;
  605. NETDATA_DOUBLE completion;
  606. size_t instances;
  607. } replication;
  608. size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
  609. } stream;
  610. struct {
  611. RRDHOST_HEALTH_STATUS status;
  612. struct {
  613. uint32_t undefined;
  614. uint32_t uninitialized;
  615. uint32_t clear;
  616. uint32_t warning;
  617. uint32_t critical;
  618. } alerts;
  619. } health;
  620. } RRDHOST_STATUS;
  621. void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
  622. bool rrdhost_state_cloud_emulation(RRDHOST *host);
  623. bool rrdpush_compression_initialize(struct sender_state *s);
  624. bool rrdpush_decompression_initialize(struct receiver_state *rpt);
  625. void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order);
  626. void rrdpush_select_receiver_compression_algorithm(struct receiver_state *rpt);
  627. void rrdpush_compression_deactivate(struct sender_state *s);
  628. #endif //NETDATA_RRDPUSH_H