rrdpush.h 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifndef NETDATA_RRDPUSH_H
  3. #define NETDATA_RRDPUSH_H 1
  4. #include "database/rrd.h"
  5. #include "libnetdata/libnetdata.h"
  6. #include "web/server/web_client.h"
  7. #include "daemon/common.h"
  8. #define CONNECTED_TO_SIZE 100
  9. #define STREAM_VERSION_CLAIM 3
  10. #define STREAM_VERSION_CLABELS 4
  11. #define STREAM_VERSION_COMPRESSION 5
  12. #define VERSION_GAP_FILLING 6
  13. #ifdef ENABLE_COMPRESSION
  14. #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_COMPRESSION)
  15. #else
  16. #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_CLABELS)
  17. #endif //ENABLE_COMPRESSION
  18. #define STREAMING_PROTOCOL_VERSION "1.1"
  19. #define START_STREAMING_PROMPT "Hit me baby, push them over..."
  20. #define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
  21. #define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
  22. #define HTTP_HEADER_SIZE 8192
  23. typedef enum {
  24. RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
  25. RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
  26. } RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
  27. typedef struct {
  28. char *os_name;
  29. char *os_id;
  30. char *os_version;
  31. char *kernel_name;
  32. char *kernel_version;
  33. } stream_encoded_t;
  34. #ifdef ENABLE_COMPRESSION
  35. struct compressor_state {
  36. char *buffer;
  37. size_t buffer_size;
  38. struct compressor_data *data; // Compression API specific data
  39. void (*reset)(struct compressor_state *state);
  40. size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
  41. void (*destroy)(struct compressor_state **state);
  42. };
  43. struct decompressor_state {
  44. char *buffer;
  45. size_t buffer_size;
  46. size_t buffer_len;
  47. size_t buffer_pos;
  48. char *out_buffer;
  49. size_t out_buffer_len;
  50. size_t out_buffer_pos;
  51. size_t total_compressed;
  52. size_t total_uncompressed;
  53. size_t packet_count;
  54. struct decompressor_data *data; // Decompression API specific data
  55. void (*reset)(struct decompressor_state *state);
  56. size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
  57. size_t (*put)(struct decompressor_state *state, const char *data, size_t size);
  58. size_t (*decompress)(struct decompressor_state *state);
  59. size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
  60. size_t (*get)(struct decompressor_state *state, char *data, size_t size);
  61. void (*destroy)(struct decompressor_state **state);
  62. };
  63. #endif
  64. // Thread-local storage
  65. // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
  66. struct sender_state {
  67. RRDHOST *host;
  68. pid_t task_id;
  69. unsigned int overflow:1;
  70. int timeout, default_port;
  71. usec_t reconnect_delay;
  72. char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
  73. size_t begin;
  74. size_t reconnects_counter;
  75. size_t sent_bytes;
  76. size_t sent_bytes_on_this_connection;
  77. size_t send_attempts;
  78. time_t last_sent_t;
  79. size_t not_connected_loops;
  80. // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
  81. // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
  82. netdata_mutex_t mutex;
  83. struct circular_buffer *buffer;
  84. BUFFER *build;
  85. char read_buffer[512];
  86. int read_len;
  87. int32_t version;
  88. #ifdef ENABLE_COMPRESSION
  89. unsigned int rrdpush_compression;
  90. struct compressor_state *compressor;
  91. #endif
  92. };
  93. struct receiver_state {
  94. RRDHOST *host;
  95. netdata_thread_t thread;
  96. int fd;
  97. char *key;
  98. char *hostname;
  99. char *registry_hostname;
  100. char *machine_guid;
  101. char *os;
  102. char *timezone; // Unused?
  103. char *abbrev_timezone;
  104. int32_t utc_offset;
  105. char *tags;
  106. char *client_ip; // Duplicated in pluginsd
  107. char *client_port; // Duplicated in pluginsd
  108. char *program_name; // Duplicated in pluginsd
  109. char *program_version;
  110. struct rrdhost_system_info *system_info;
  111. int update_every;
  112. uint32_t stream_version;
  113. time_t last_msg_t;
  114. char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields
  115. int read_len;
  116. unsigned int shutdown:1; // Tell the thread to exit
  117. unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
  118. #ifdef ENABLE_HTTPS
  119. struct netdata_ssl ssl;
  120. #endif
  121. #ifdef ENABLE_COMPRESSION
  122. unsigned int rrdpush_compression;
  123. struct decompressor_state *decompressor;
  124. #endif
  125. };
  126. extern unsigned int default_rrdpush_enabled;
  127. #ifdef ENABLE_COMPRESSION
  128. extern unsigned int default_compression_enabled;
  129. #endif
  130. extern char *default_rrdpush_destination;
  131. extern char *default_rrdpush_api_key;
  132. extern char *default_rrdpush_send_charts_matching;
  133. extern unsigned int remote_clock_resync_iterations;
  134. extern void sender_init(struct sender_state *s, RRDHOST *parent);
  135. void sender_start(struct sender_state *s);
  136. void sender_commit(struct sender_state *s);
  137. extern int rrdpush_init();
  138. extern int configured_as_parent();
  139. extern void rrdset_done_push(RRDSET *st);
  140. extern void rrdset_push_chart_definition_now(RRDSET *st);
  141. extern void *rrdpush_sender_thread(void *ptr);
  142. extern void rrdpush_send_labels(RRDHOST *host);
  143. extern void rrdpush_claimed_id(RRDHOST *host);
  144. extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
  145. extern void rrdpush_sender_thread_stop(RRDHOST *host);
  146. extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv);
  147. extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
  148. #ifdef ENABLE_COMPRESSION
  149. struct compressor_state *create_compressor();
  150. struct decompressor_state *create_decompressor();
  151. size_t is_compressed_data(const char *data, size_t data_size);
  152. #endif
  153. #endif //NETDATA_RRDPUSH_H