rrdpush.c 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdpush.h"
  3. /*
  4. * rrdpush
  5. *
  6. * 3 threads are involved for all stream operations
  7. *
  8. * 1. a random data collection thread, calling rrdset_done_push()
  9. * this is called for each chart.
  10. *
  11. * the output of this work is kept in a thread BUFFER
  12. * the sender thread is signalled via a pipe (in RRDHOST)
  13. *
  14. * 2. a sender thread running at the sending netdata
  15. * this is spawned automatically on the first chart to be pushed
  16. *
  17. * It tries to push the metrics to the remote netdata, as fast
  18. * as possible (i.e. immediately after they are collected).
  19. *
  20. * 3. a receiver thread, running at the receiving netdata
  21. * this is spawned automatically when the sender connects to
  22. * the receiver.
  23. *
  24. */
  25. struct config stream_config = {
  26. .first_section = NULL,
  27. .last_section = NULL,
  28. .mutex = NETDATA_MUTEX_INITIALIZER,
  29. .index = {
  30. .avl_tree = {
  31. .root = NULL,
  32. .compar = appconfig_section_compare
  33. },
  34. .rwlock = AVL_LOCK_INITIALIZER
  35. }
  36. };
  37. unsigned int default_rrdpush_enabled = 0;
  38. #ifdef ENABLE_COMPRESSION
  39. unsigned int default_compression_enabled = 1;
  40. #endif
  41. char *default_rrdpush_destination = NULL;
  42. char *default_rrdpush_api_key = NULL;
  43. char *default_rrdpush_send_charts_matching = NULL;
  44. bool default_rrdpush_enable_replication = true;
  45. time_t default_rrdpush_seconds_to_replicate = 86400;
  46. time_t default_rrdpush_replication_step = 600;
  47. #ifdef ENABLE_HTTPS
  48. int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL;
  49. char *netdata_ssl_ca_path = NULL;
  50. char *netdata_ssl_ca_file = NULL;
  51. #endif
  52. static void load_stream_conf() {
  53. errno = 0;
  54. char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
  55. if(!appconfig_load(&stream_config, filename, 0, NULL)) {
  56. info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
  57. freez(filename);
  58. filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
  59. if(!appconfig_load(&stream_config, filename, 0, NULL))
  60. info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
  61. }
  62. freez(filename);
  63. }
  64. STREAM_CAPABILITIES stream_our_capabilities() {
  65. return STREAM_CAP_V1 |
  66. STREAM_CAP_V2 |
  67. STREAM_CAP_VN |
  68. STREAM_CAP_VCAPS |
  69. STREAM_CAP_HLABELS |
  70. STREAM_CAP_CLAIM |
  71. STREAM_CAP_CLABELS |
  72. STREAM_CAP_FUNCTIONS |
  73. STREAM_CAP_REPLICATION |
  74. STREAM_CAP_BINARY |
  75. STREAM_CAP_INTERPOLATED |
  76. STREAM_HAS_COMPRESSION |
  77. (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
  78. 0;
  79. }
  80. bool rrdpush_receiver_needs_dbengine() {
  81. struct section *co;
  82. for(co = stream_config.first_section; co; co = co->next) {
  83. if(strcmp(co->name, "stream") == 0)
  84. continue; // the first section is not relevant
  85. char *s;
  86. s = appconfig_get_by_section(co, "enabled", NULL);
  87. if(!s || !appconfig_test_boolean_value(s))
  88. continue;
  89. s = appconfig_get_by_section(co, "default memory mode", NULL);
  90. if(s && strcmp(s, "dbengine") == 0)
  91. return true;
  92. s = appconfig_get_by_section(co, "memory mode", NULL);
  93. if(s && strcmp(s, "dbengine") == 0)
  94. return true;
  95. }
  96. return false;
  97. }
  98. int rrdpush_init() {
  99. // --------------------------------------------------------------------
  100. // load stream.conf
  101. load_stream_conf();
  102. default_rrdpush_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
  103. default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
  104. default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
  105. default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
  106. default_rrdpush_enable_replication = config_get_boolean(CONFIG_SECTION_DB, "enable replication", default_rrdpush_enable_replication);
  107. default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate);
  108. default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB, "seconds per replication step", default_rrdpush_replication_step);
  109. rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s);
  110. #ifdef ENABLE_COMPRESSION
  111. default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
  112. "enable compression", default_compression_enabled);
  113. #endif
  114. if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
  115. error("STREAM [send]: cannot enable sending thread - information is missing.");
  116. default_rrdpush_enabled = 0;
  117. }
  118. #ifdef ENABLE_HTTPS
  119. if (netdata_use_ssl_on_stream == NETDATA_SSL_OPTIONAL) {
  120. if (default_rrdpush_destination){
  121. char *test = strstr(default_rrdpush_destination,":SSL");
  122. if(test){
  123. *test = 0X00;
  124. netdata_use_ssl_on_stream = NETDATA_SSL_FORCE;
  125. }
  126. }
  127. }
  128. bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO);
  129. if(invalid_certificate == CONFIG_BOOLEAN_YES){
  130. if(netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
  131. info("Netdata is configured to accept invalid SSL certificate.");
  132. netdata_ssl_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
  133. }
  134. }
  135. netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL);
  136. netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL);
  137. #endif
  138. return default_rrdpush_enabled;
  139. }
  140. // data collection happens from multiple threads
  141. // each of these threads calls rrdset_done()
  142. // which in turn calls rrdset_done_push()
  143. // which uses this pipe to notify the streaming thread
  144. // that there are more data ready to be sent
  145. #define PIPE_READ 0
  146. #define PIPE_WRITE 1
  147. // to have the remote netdata re-sync the charts
  148. // to its current clock, we send for this many
  149. // iterations a BEGIN line without microseconds
  150. // this is for the first iterations of each chart
  151. unsigned int remote_clock_resync_iterations = 60;
  152. static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) {
  153. if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
  154. return false;
  155. if(unlikely(!(flags & (RRDSET_FLAG_UPSTREAM_SEND | RRDSET_FLAG_UPSTREAM_IGNORE)))) {
  156. RRDHOST *host = st->rrdhost;
  157. if (flags & RRDSET_FLAG_ANOMALY_DETECTION) {
  158. if(ml_streaming_enabled())
  159. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
  160. else
  161. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
  162. }
  163. else if(simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->id) ||
  164. simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->name))
  165. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
  166. else
  167. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
  168. // get the flags again, to know how to respond
  169. flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE);
  170. }
  171. return flags & RRDSET_FLAG_UPSTREAM_SEND;
  172. }
  173. int configured_as_parent() {
  174. struct section *section = NULL;
  175. int is_parent = 0;
  176. appconfig_wrlock(&stream_config);
  177. for (section = stream_config.first_section; section; section = section->next) {
  178. uuid_t uuid;
  179. if (uuid_parse(section->name, uuid) != -1 &&
  180. appconfig_get_boolean_by_section(section, "enabled", 0)) {
  181. is_parent = 1;
  182. break;
  183. }
  184. }
  185. appconfig_unlock(&stream_config);
  186. return is_parent;
  187. }
  188. // chart labels
  189. static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  190. BUFFER *wb = (BUFFER *)data;
  191. buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls);
  192. return 1;
  193. }
  194. static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) {
  195. if (st->rrdlabels) {
  196. if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0)
  197. buffer_sprintf(wb, "CLABEL_COMMIT\n");
  198. }
  199. }
  200. // Send the current chart definition.
  201. // Assumes that collector thread has already called sender_start for mutex / buffer state.
  202. static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
  203. bool replication_progress = false;
  204. RRDHOST *host = st->rrdhost;
  205. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
  206. // properly set the name for the remote end to parse it
  207. char *name = "";
  208. if(likely(st->name)) {
  209. if(unlikely(st->id != st->name)) {
  210. // they differ
  211. name = strchr(rrdset_name(st), '.');
  212. if(name)
  213. name++;
  214. else
  215. name = "";
  216. }
  217. }
  218. // send the chart
  219. buffer_sprintf(
  220. wb
  221. , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
  222. , rrdset_id(st)
  223. , name
  224. , rrdset_title(st)
  225. , rrdset_units(st)
  226. , rrdset_family(st)
  227. , rrdset_context(st)
  228. , rrdset_type_name(st->chart_type)
  229. , st->priority
  230. , st->update_every
  231. , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":""
  232. , rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":""
  233. , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
  234. , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":""
  235. , rrdset_plugin_name(st)
  236. , rrdset_module_name(st)
  237. );
  238. // send the chart labels
  239. if (stream_has_capability(host->sender, STREAM_CAP_CLABELS))
  240. rrdpush_send_clabels(wb, st);
  241. // send the dimensions
  242. RRDDIM *rd;
  243. rrddim_foreach_read(rd, st) {
  244. buffer_sprintf(
  245. wb
  246. , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
  247. , rrddim_id(rd)
  248. , rrddim_name(rd)
  249. , rrd_algorithm_name(rd->algorithm)
  250. , rd->multiplier
  251. , rd->divisor
  252. , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
  253. , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
  254. , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
  255. );
  256. rd->exposed = 1;
  257. }
  258. rrddim_foreach_done(rd);
  259. // send the chart functions
  260. if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
  261. rrd_functions_expose_rrdpush(st, wb);
  262. // send the chart local custom variables
  263. rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
  264. if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) {
  265. time_t db_first_time_t, db_last_time_t;
  266. time_t now = now_realtime_sec();
  267. rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_time_t, &db_last_time_t, now, 0);
  268. buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n",
  269. (unsigned long long)db_first_time_t,
  270. (unsigned long long)db_last_time_t,
  271. (unsigned long long)now);
  272. if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  273. rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
  274. rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  275. rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
  276. }
  277. replication_progress = true;
  278. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  279. internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts",
  280. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  281. #endif
  282. }
  283. st->upstream_resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
  284. return replication_progress;
  285. }
  286. // sends the current chart dimensions
  287. static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s __maybe_unused, RRDSET_FLAGS flags) {
  288. buffer_fast_strcat(wb, "BEGIN \"", 7);
  289. buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
  290. buffer_fast_strcat(wb, "\" ", 2);
  291. if(st->last_collected_time.tv_sec > st->upstream_resync_time_s)
  292. buffer_print_uint64(wb, st->usec_since_last_update);
  293. else
  294. buffer_fast_strcat(wb, "0", 1);
  295. buffer_fast_strcat(wb, "\n", 1);
  296. RRDDIM *rd;
  297. rrddim_foreach_read(rd, st) {
  298. if(unlikely(!rd->updated))
  299. continue;
  300. if(likely(rd->exposed)) {
  301. buffer_fast_strcat(wb, "SET \"", 5);
  302. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  303. buffer_fast_strcat(wb, "\" = ", 4);
  304. buffer_print_int64(wb, rd->collected_value);
  305. buffer_fast_strcat(wb, "\n", 1);
  306. }
  307. else {
  308. internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed",
  309. rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
  310. // we will include it in the next iteration
  311. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
  312. }
  313. }
  314. rrddim_foreach_done(rd);
  315. if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
  316. rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
  317. buffer_fast_strcat(wb, "END\n", 4);
  318. }
  319. static void rrdpush_sender_thread_spawn(RRDHOST *host);
  320. // Called from the internal collectors to mark a chart obsolete.
  321. bool rrdset_push_chart_definition_now(RRDSET *st) {
  322. RRDHOST *host = st->rrdhost;
  323. if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
  324. || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST))))
  325. return false;
  326. BUFFER *wb = sender_start(host->sender);
  327. rrdpush_send_chart_definition(wb, st);
  328. sender_commit(host->sender, wb);
  329. sender_thread_buffer_free();
  330. return true;
  331. }
  332. void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
  333. RRDHOST *host = st->rrdhost;
  334. rrdpush_send_chart_metrics(rsb->wb, st, host->sender, rsb->rrdset_flags);
  335. }
  336. void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) {
  337. if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags))
  338. return;
  339. NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  340. NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  341. BUFFER *wb = rsb->wb;
  342. time_t point_end_time_s = (time_t)(point_end_time_ut / USEC_PER_SEC);
  343. if(unlikely(rsb->last_point_end_time_s != point_end_time_s)) {
  344. if(unlikely(rsb->begin_v2_added))
  345. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
  346. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
  347. buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id));
  348. buffer_fast_strcat(wb, "' ", 2);
  349. buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every);
  350. buffer_fast_strcat(wb, " ", 1);
  351. buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s);
  352. buffer_fast_strcat(wb, " ", 1);
  353. if(point_end_time_s == rsb->wall_clock_time)
  354. buffer_fast_strcat(wb, "#", 1);
  355. else
  356. buffer_print_uint64_encoded(wb, integer_encoding, rsb->wall_clock_time);
  357. buffer_fast_strcat(wb, "\n", 1);
  358. rsb->last_point_end_time_s = point_end_time_s;
  359. rsb->begin_v2_added = true;
  360. }
  361. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
  362. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  363. buffer_fast_strcat(wb, "' ", 2);
  364. buffer_print_int64_encoded(wb, integer_encoding, rd->last_collected_value);
  365. buffer_fast_strcat(wb, " ", 1);
  366. if((NETDATA_DOUBLE)rd->last_collected_value == n)
  367. buffer_fast_strcat(wb, "#", 1);
  368. else
  369. buffer_print_netdata_double_encoded(wb, doubles_encoding, n);
  370. buffer_fast_strcat(wb, " ", 1);
  371. buffer_print_sn_flags(wb, flags, true);
  372. buffer_fast_strcat(wb, "\n", 1);
  373. }
  374. void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
  375. if(!rsb->wb)
  376. return;
  377. if(rsb->v2 && rsb->begin_v2_added) {
  378. if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
  379. rrdsetvar_print_to_streaming_custom_chart_variables(st, rsb->wb);
  380. buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
  381. }
  382. sender_commit(st->rrdhost->sender, rsb->wb);
  383. *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
  384. }
  385. RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
  386. RRDHOST *host = st->rrdhost;
  387. // fetch the flags we need to check with one atomic operation
  388. RRDHOST_FLAGS host_flags = __atomic_load_n(&host->flags, __ATOMIC_SEQ_CST);
  389. // check if we are not connected
  390. if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) {
  391. if(unlikely(!(host_flags & (RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED))))
  392. rrdpush_sender_thread_spawn(host);
  393. if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
  394. rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
  395. error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
  396. }
  397. return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
  398. }
  399. else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
  400. info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
  401. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
  402. }
  403. RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
  404. bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED);
  405. bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  406. if(unlikely((exposed_upstream && replication_in_progress) ||
  407. !should_send_chart_matching(st, rrdset_flags)))
  408. return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
  409. if(unlikely(!exposed_upstream)) {
  410. BUFFER *wb = sender_start(host->sender);
  411. replication_in_progress = rrdpush_send_chart_definition(wb, st);
  412. sender_commit(host->sender, wb);
  413. }
  414. if(replication_in_progress)
  415. return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
  416. return (RRDSET_STREAM_BUFFER) {
  417. .capabilities = host->sender->capabilities,
  418. .v2 = stream_has_capability(host->sender, STREAM_CAP_INTERPOLATED),
  419. .rrdset_flags = rrdset_flags,
  420. .wb = sender_start(host->sender),
  421. .wall_clock_time = wall_clock_time,
  422. };
  423. }
  424. // labels
  425. static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  426. BUFFER *wb = (BUFFER *)data;
  427. buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
  428. return 1;
  429. }
  430. void rrdpush_send_host_labels(RRDHOST *host) {
  431. if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
  432. || !stream_has_capability(host->sender, STREAM_CAP_HLABELS)))
  433. return;
  434. BUFFER *wb = sender_start(host->sender);
  435. rrdlabels_walkthrough_read(host->rrdlabels, send_labels_callback, wb);
  436. buffer_sprintf(wb, "OVERWRITE %s\n", "labels");
  437. sender_commit(host->sender, wb);
  438. sender_thread_buffer_free();
  439. }
  440. void rrdpush_claimed_id(RRDHOST *host)
  441. {
  442. if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
  443. return;
  444. if(unlikely(!rrdhost_can_send_definitions_to_parent(host)))
  445. return;
  446. BUFFER *wb = sender_start(host->sender);
  447. rrdhost_aclk_state_lock(host);
  448. buffer_sprintf(wb, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") );
  449. rrdhost_aclk_state_unlock(host);
  450. sender_commit(host->sender, wb);
  451. sender_thread_buffer_free();
  452. }
  453. int connect_to_one_of_destinations(
  454. RRDHOST *host,
  455. int default_port,
  456. struct timeval *timeout,
  457. size_t *reconnects_counter,
  458. char *connected_to,
  459. size_t connected_to_size,
  460. struct rrdpush_destinations **destination)
  461. {
  462. int sock = -1;
  463. for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) {
  464. time_t now = now_realtime_sec();
  465. if(d->postpone_reconnection_until > now)
  466. continue;
  467. info(
  468. "STREAM %s: connecting to '%s' (default port: %d)...",
  469. rrdhost_hostname(host),
  470. string2str(d->destination),
  471. default_port);
  472. if (reconnects_counter)
  473. *reconnects_counter += 1;
  474. sock = connect_to_this(string2str(d->destination), default_port, timeout);
  475. if (sock != -1) {
  476. if (connected_to && connected_to_size)
  477. strncpyz(connected_to, string2str(d->destination), connected_to_size);
  478. *destination = d;
  479. // move the current item to the end of the list
  480. // without this, this destination will break the loop again and again
  481. // not advancing the destinations to find one that may work
  482. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, d, prev, next);
  483. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(host->destinations, d, prev, next);
  484. break;
  485. }
  486. }
  487. return sock;
  488. }
  489. struct destinations_init_tmp {
  490. RRDHOST *host;
  491. struct rrdpush_destinations *list;
  492. int count;
  493. };
  494. bool destinations_init_add_one(char *entry, void *data) {
  495. struct destinations_init_tmp *t = data;
  496. struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
  497. d->destination = string_strdupz(entry);
  498. __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
  499. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next);
  500. t->count++;
  501. info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
  502. return false; // we return false, so that we will get all defined destinations
  503. }
  504. void rrdpush_destinations_init(RRDHOST *host) {
  505. if(!host->rrdpush_send_destination) return;
  506. rrdpush_destinations_free(host);
  507. struct destinations_init_tmp t = {
  508. .host = host,
  509. .list = NULL,
  510. .count = 0,
  511. };
  512. foreach_entry_in_connection_string(host->rrdpush_send_destination, destinations_init_add_one, &t);
  513. host->destinations = t.list;
  514. }
  515. void rrdpush_destinations_free(RRDHOST *host) {
  516. while (host->destinations) {
  517. struct rrdpush_destinations *tmp = host->destinations;
  518. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, tmp, prev, next);
  519. string_freez(tmp->destination);
  520. freez(tmp);
  521. __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
  522. }
  523. host->destinations = NULL;
  524. }
  525. // ----------------------------------------------------------------------------
  526. // rrdpush sender thread
  527. // Either the receiver lost the connection or the host is being destroyed.
  528. // The sender mutex guards thread creation, any spurious data is wiped on reconnection.
  529. void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) {
  530. if (!host->sender)
  531. return;
  532. netdata_mutex_lock(&host->sender->mutex);
  533. if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
  534. host->sender->exit.shutdown = true;
  535. host->sender->exit.reason = reason;
  536. // signal it to cancel
  537. netdata_thread_cancel(host->rrdpush_sender_thread);
  538. }
  539. netdata_mutex_unlock(&host->sender->mutex);
  540. if(wait) {
  541. netdata_mutex_lock(&host->sender->mutex);
  542. while(host->sender->tid) {
  543. netdata_mutex_unlock(&host->sender->mutex);
  544. sleep_usec(10 * USEC_PER_MS);
  545. netdata_mutex_lock(&host->sender->mutex);
  546. }
  547. netdata_mutex_unlock(&host->sender->mutex);
  548. }
  549. }
  550. // ----------------------------------------------------------------------------
  551. // rrdpush receiver thread
  552. void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
  553. log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
  554. }
  555. static void rrdpush_sender_thread_spawn(RRDHOST *host) {
  556. netdata_mutex_lock(&host->sender->mutex);
  557. if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
  558. char tag[NETDATA_THREAD_TAG_MAX + 1];
  559. snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host));
  560. if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender))
  561. error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
  562. else
  563. rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
  564. }
  565. netdata_mutex_unlock(&host->sender->mutex);
  566. }
  567. int rrdpush_receiver_permission_denied(struct web_client *w) {
  568. // we always respond with the same message and error code
  569. // to prevent an attacker from gaining info about the error
  570. buffer_flush(w->response.data);
  571. buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info.");
  572. return HTTP_RESP_UNAUTHORIZED;
  573. }
  574. int rrdpush_receiver_too_busy_now(struct web_client *w) {
  575. // we always respond with the same message and error code
  576. // to prevent an attacker from gaining info about the error
  577. buffer_flush(w->response.data);
  578. buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later.");
  579. return HTTP_RESP_SERVICE_UNAVAILABLE;
  580. }
  581. void *rrdpush_receiver_thread(void *ptr);
  582. int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) {
  583. if(!service_running(ABILITY_STREAMING_CONNECTIONS))
  584. return rrdpush_receiver_too_busy_now(w);
  585. struct receiver_state *rpt = callocz(1, sizeof(*rpt));
  586. rpt->last_msg_t = now_realtime_sec();
  587. rpt->capabilities = STREAM_CAP_INVALID;
  588. rpt->hops = 1;
  589. __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
  590. __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
  591. rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info));
  592. rpt->system_info->hops = rpt->hops;
  593. rpt->fd = w->ifd;
  594. rpt->client_ip = strdupz(w->client_ip);
  595. rpt->client_port = strdupz(w->client_port);
  596. rpt->config.update_every = default_rrd_update_every;
  597. #ifdef ENABLE_HTTPS
  598. rpt->ssl.conn = w->ssl.conn;
  599. rpt->ssl.flags = w->ssl.flags;
  600. w->ssl.conn = NULL;
  601. w->ssl.flags = NETDATA_SSL_START;
  602. #endif
  603. // parse the parameters and fill rpt and rpt->system_info
  604. while(decoded_query_string) {
  605. char *value = strsep_skip_consecutive_separators(&decoded_query_string, "&");
  606. if(!value || !*value) continue;
  607. char *name = strsep_skip_consecutive_separators(&value, "=");
  608. if(!name || !*name) continue;
  609. if(!value || !*value) continue;
  610. if(!strcmp(name, "key") && !rpt->key)
  611. rpt->key = strdupz(value);
  612. else if(!strcmp(name, "hostname") && !rpt->hostname)
  613. rpt->hostname = strdupz(value);
  614. else if(!strcmp(name, "registry_hostname") && !rpt->registry_hostname)
  615. rpt->registry_hostname = strdupz(value);
  616. else if(!strcmp(name, "machine_guid") && !rpt->machine_guid)
  617. rpt->machine_guid = strdupz(value);
  618. else if(!strcmp(name, "update_every"))
  619. rpt->config.update_every = (int)strtoul(value, NULL, 0);
  620. else if(!strcmp(name, "os") && !rpt->os)
  621. rpt->os = strdupz(value);
  622. else if(!strcmp(name, "timezone") && !rpt->timezone)
  623. rpt->timezone = strdupz(value);
  624. else if(!strcmp(name, "abbrev_timezone") && !rpt->abbrev_timezone)
  625. rpt->abbrev_timezone = strdupz(value);
  626. else if(!strcmp(name, "utc_offset"))
  627. rpt->utc_offset = (int32_t)strtol(value, NULL, 0);
  628. else if(!strcmp(name, "hops"))
  629. rpt->hops = rpt->system_info->hops = (uint16_t) strtoul(value, NULL, 0);
  630. else if(!strcmp(name, "ml_capable"))
  631. rpt->system_info->ml_capable = strtoul(value, NULL, 0);
  632. else if(!strcmp(name, "ml_enabled"))
  633. rpt->system_info->ml_enabled = strtoul(value, NULL, 0);
  634. else if(!strcmp(name, "mc_version"))
  635. rpt->system_info->mc_version = strtoul(value, NULL, 0);
  636. else if(!strcmp(name, "tags") && !rpt->tags)
  637. rpt->tags = strdupz(value);
  638. else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID))
  639. rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0));
  640. else {
  641. // An old Netdata child does not have a compatible streaming protocol, map to something sane.
  642. if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME"))
  643. name = "NETDATA_HOST_OS_NAME";
  644. else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID"))
  645. name = "NETDATA_HOST_OS_ID";
  646. else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE"))
  647. name = "NETDATA_HOST_OS_ID_LIKE";
  648. else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION"))
  649. name = "NETDATA_HOST_OS_VERSION";
  650. else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID"))
  651. name = "NETDATA_HOST_OS_VERSION_ID";
  652. else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION"))
  653. name = "NETDATA_HOST_OS_DETECTION";
  654. else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID))
  655. rpt->capabilities = convert_stream_version_to_capabilities(1);
  656. if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) {
  657. info("STREAM '%s' [receive from [%s]:%s]: "
  658. "request has parameter '%s' = '%s', which is not used."
  659. , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-"
  660. , rpt->client_ip, rpt->client_port
  661. , name, value);
  662. }
  663. }
  664. }
  665. if (rpt->capabilities & STREAM_CAP_INVALID)
  666. // no version is supplied, assume version 0;
  667. rpt->capabilities = convert_stream_version_to_capabilities(0);
  668. // find the program name and version
  669. if(w->user_agent && w->user_agent[0]) {
  670. char *t = strchr(w->user_agent, '/');
  671. if(t && *t) {
  672. *t = '\0';
  673. t++;
  674. }
  675. rpt->program_name = strdupz(w->user_agent);
  676. if(t && *t) rpt->program_version = strdupz(t);
  677. }
  678. // check if we should accept this connection
  679. if(!rpt->key || !*rpt->key) {
  680. rrdpush_receive_log_status(
  681. rpt,
  682. "request without an API key",
  683. "NO API KEY PERMISSION DENIED");
  684. receiver_state_free(rpt);
  685. return rrdpush_receiver_permission_denied(w);
  686. }
  687. if(!rpt->hostname || !*rpt->hostname) {
  688. rrdpush_receive_log_status(
  689. rpt,
  690. "request without a hostname",
  691. "NO HOSTNAME PERMISSION DENIED");
  692. receiver_state_free(rpt);
  693. return rrdpush_receiver_permission_denied(w);
  694. }
  695. if(!rpt->registry_hostname)
  696. rpt->registry_hostname = strdupz(rpt->hostname);
  697. if(!rpt->machine_guid || !*rpt->machine_guid) {
  698. rrdpush_receive_log_status(
  699. rpt,
  700. "request without a machine GUID",
  701. "NO MACHINE GUID PERMISSION DENIED");
  702. receiver_state_free(rpt);
  703. return rrdpush_receiver_permission_denied(w);
  704. }
  705. {
  706. char buf[GUID_LEN + 1];
  707. if (regenerate_guid(rpt->key, buf) == -1) {
  708. rrdpush_receive_log_status(
  709. rpt,
  710. "API key is not a valid UUID (use the command uuidgen to generate one)",
  711. "INVALID API KEY PERMISSION DENIED");
  712. receiver_state_free(rpt);
  713. return rrdpush_receiver_permission_denied(w);
  714. }
  715. if (regenerate_guid(rpt->machine_guid, buf) == -1) {
  716. rrdpush_receive_log_status(
  717. rpt,
  718. "machine GUID is not a valid UUID",
  719. "INVALID MACHINE GUID PERMISSION DENIED");
  720. receiver_state_free(rpt);
  721. return rrdpush_receiver_permission_denied(w);
  722. }
  723. }
  724. const char *api_key_type = appconfig_get(&stream_config, rpt->key, "type", "api");
  725. if(!api_key_type || !*api_key_type) api_key_type = "unknown";
  726. if(strcmp(api_key_type, "api") != 0) {
  727. rrdpush_receive_log_status(
  728. rpt,
  729. "API key is a machine GUID",
  730. "INVALID API KEY PERMISSION DENIED");
  731. receiver_state_free(rpt);
  732. return rrdpush_receiver_permission_denied(w);
  733. }
  734. if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) {
  735. rrdpush_receive_log_status(
  736. rpt,
  737. "API key is not enabled",
  738. "API KEY DISABLED PERMISSION DENIED");
  739. receiver_state_free(rpt);
  740. return rrdpush_receiver_permission_denied(w);
  741. }
  742. {
  743. SIMPLE_PATTERN *key_allow_from = simple_pattern_create(
  744. appconfig_get(&stream_config, rpt->key, "allow from", "*"),
  745. NULL, SIMPLE_PATTERN_EXACT, true);
  746. if(key_allow_from) {
  747. if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
  748. simple_pattern_free(key_allow_from);
  749. rrdpush_receive_log_status(
  750. rpt,
  751. "API key is not allowed from this IP",
  752. "NOT ALLOWED IP PERMISSION DENIED");
  753. receiver_state_free(rpt);
  754. return rrdpush_receiver_permission_denied(w);
  755. }
  756. simple_pattern_free(key_allow_from);
  757. }
  758. }
  759. {
  760. const char *machine_guid_type = appconfig_get(&stream_config, rpt->machine_guid, "type", "machine");
  761. if (!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown";
  762. if (strcmp(machine_guid_type, "machine") != 0) {
  763. rrdpush_receive_log_status(
  764. rpt,
  765. "machine GUID is an API key",
  766. "INVALID MACHINE GUID PERMISSION DENIED");
  767. receiver_state_free(rpt);
  768. return rrdpush_receiver_permission_denied(w);
  769. }
  770. }
  771. if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) {
  772. rrdpush_receive_log_status(
  773. rpt,
  774. "machine GUID is not enabled",
  775. "MACHINE GUID DISABLED PERMISSION DENIED");
  776. receiver_state_free(rpt);
  777. return rrdpush_receiver_permission_denied(w);
  778. }
  779. {
  780. SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(
  781. appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"),
  782. NULL, SIMPLE_PATTERN_EXACT, true);
  783. if(machine_allow_from) {
  784. if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
  785. simple_pattern_free(machine_allow_from);
  786. rrdpush_receive_log_status(
  787. rpt,
  788. "machine GUID is not allowed from this IP",
  789. "NOT ALLOWED IP PERMISSION DENIED");
  790. receiver_state_free(rpt);
  791. return rrdpush_receiver_permission_denied(w);
  792. }
  793. simple_pattern_free(machine_allow_from);
  794. }
  795. }
  796. if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
  797. rrdpush_receive_log_status(
  798. rpt,
  799. "machine GUID is my own",
  800. "LOCALHOST PERMISSION DENIED");
  801. char initial_response[HTTP_HEADER_SIZE + 1];
  802. snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
  803. if(send_timeout(
  804. #ifdef ENABLE_HTTPS
  805. &rpt->ssl,
  806. #endif
  807. rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
  808. error("STREAM '%s' [receive from [%s]:%s]: "
  809. "failed to reply."
  810. , rpt->hostname
  811. , rpt->client_ip, rpt->client_port
  812. );
  813. }
  814. close(rpt->fd);
  815. receiver_state_free(rpt);
  816. return web_client_socket_is_now_used_for_streaming(w);
  817. }
  818. if(unlikely(web_client_streaming_rate_t > 0)) {
  819. static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
  820. static time_t last_stream_accepted_t = 0;
  821. time_t now = now_realtime_sec();
  822. netdata_spinlock_lock(&spinlock);
  823. if(unlikely(last_stream_accepted_t == 0))
  824. last_stream_accepted_t = now;
  825. if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
  826. netdata_spinlock_unlock(&spinlock);
  827. char msg[100 + 1];
  828. snprintfz(msg, 100,
  829. "rate limit, will accept new connection in %ld secs",
  830. (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
  831. rrdpush_receive_log_status(
  832. rpt,
  833. msg,
  834. "RATE LIMIT TRY LATER");
  835. receiver_state_free(rpt);
  836. return rrdpush_receiver_too_busy_now(w);
  837. }
  838. last_stream_accepted_t = now;
  839. netdata_spinlock_unlock(&spinlock);
  840. }
  841. /*
  842. * Quick path for rejecting multiple connections. The lock taken is fine-grained - it only protects the receiver
  843. * pointer within the host (if a host exists). This protects against multiple concurrent web requests hitting
  844. * separate threads within the web-server and landing here. The lock guards the thread-shutdown sequence that
  845. * detaches the receiver from the host. If the host is being created (first time-access) then we also use the
  846. * lock to prevent race-hazard (two threads try to create the host concurrently, one wins and the other does a
  847. * lookup to the now-attached structure).
  848. */
  849. {
  850. time_t age = 0;
  851. bool receiver_stale = false;
  852. bool receiver_working = false;
  853. rrd_rdlock();
  854. RRDHOST *host = rrdhost_find_by_guid(rpt->machine_guid);
  855. if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
  856. host = NULL;
  857. if (host) {
  858. netdata_mutex_lock(&host->receiver_lock);
  859. if (host->receiver) {
  860. age = now_realtime_sec() - host->receiver->last_msg_t;
  861. if (age < 30)
  862. receiver_working = true;
  863. else
  864. receiver_stale = true;
  865. }
  866. netdata_mutex_unlock(&host->receiver_lock);
  867. }
  868. rrd_unlock();
  869. if (receiver_stale && stop_streaming_receiver(host, "STALE RECEIVER")) {
  870. // we stopped the receiver
  871. // we can proceed with this connection
  872. receiver_stale = false;
  873. info("STREAM '%s' [receive from [%s]:%s]: "
  874. "stopped previous stale receiver to accept this one."
  875. , rpt->hostname
  876. , rpt->client_ip, rpt->client_port
  877. );
  878. }
  879. if (receiver_working || receiver_stale) {
  880. // another receiver is already connected
  881. // try again later
  882. char msg[200 + 1];
  883. snprintfz(msg, 200,
  884. "multiple connections for same host, "
  885. "old connection was used %ld secs ago%s",
  886. age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)");
  887. rrdpush_receive_log_status(
  888. rpt,
  889. msg,
  890. "ALREADY CONNECTED");
  891. // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
  892. buffer_flush(w->response.data);
  893. buffer_strcat(w->response.data, "This GUID is already streaming to this server");
  894. receiver_state_free(rpt);
  895. return HTTP_RESP_CONFLICT;
  896. }
  897. }
  898. debug(D_SYSTEM, "starting STREAM receive thread.");
  899. char tag[FILENAME_MAX + 1];
  900. snprintfz(tag, FILENAME_MAX, THREAD_TAG_STREAM_RECEIVER "[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
  901. if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) {
  902. rrdpush_receive_log_status(
  903. rpt,
  904. "can't create receiver thread",
  905. "INTERNAL SERVER ERROR");
  906. buffer_flush(w->response.data);
  907. buffer_strcat(w->response.data, "Can't handle this request");
  908. receiver_state_free(rpt);
  909. return HTTP_RESP_INTERNAL_SERVER_ERROR;
  910. }
  911. // prevent the caller from closing the streaming socket
  912. return web_client_socket_is_now_used_for_streaming(w);
  913. }
  914. static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
  915. if(caps & STREAM_CAP_V1) buffer_strcat(wb, "V1 ");
  916. if(caps & STREAM_CAP_V2) buffer_strcat(wb, "V2 ");
  917. if(caps & STREAM_CAP_VN) buffer_strcat(wb, "VN ");
  918. if(caps & STREAM_CAP_VCAPS) buffer_strcat(wb, "VCAPS ");
  919. if(caps & STREAM_CAP_HLABELS) buffer_strcat(wb, "HLABELS ");
  920. if(caps & STREAM_CAP_CLAIM) buffer_strcat(wb, "CLAIM ");
  921. if(caps & STREAM_CAP_CLABELS) buffer_strcat(wb, "CLABELS ");
  922. if(caps & STREAM_CAP_COMPRESSION) buffer_strcat(wb, "COMPRESSION ");
  923. if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS ");
  924. if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION ");
  925. if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY ");
  926. if(caps & STREAM_CAP_INTERPOLATED) buffer_strcat(wb, "INTERPOLATED ");
  927. if(caps & STREAM_CAP_IEEE754) buffer_strcat(wb, "IEEE754 ");
  928. }
  929. void log_receiver_capabilities(struct receiver_state *rpt) {
  930. BUFFER *wb = buffer_create(100, NULL);
  931. stream_capabilities_to_string(wb, rpt->capabilities);
  932. info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
  933. rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
  934. buffer_free(wb);
  935. }
  936. void log_sender_capabilities(struct sender_state *s) {
  937. BUFFER *wb = buffer_create(100, NULL);
  938. stream_capabilities_to_string(wb, s->capabilities);
  939. info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
  940. rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
  941. buffer_free(wb);
  942. }
  943. STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
  944. STREAM_CAPABILITIES caps = 0;
  945. if(version <= 1) caps = STREAM_CAP_V1;
  946. else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS;
  947. else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM;
  948. else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS;
  949. else if(version <= STREAM_OLD_VERSION_COMPRESSION) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION;
  950. else caps = version;
  951. if(caps & STREAM_CAP_VCAPS)
  952. caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2|STREAM_CAP_VN);
  953. if(caps & STREAM_CAP_VN)
  954. caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2);
  955. if(caps & STREAM_CAP_V2)
  956. caps &= ~(STREAM_CAP_V1);
  957. return caps & stream_our_capabilities();
  958. }
  959. int32_t stream_capabilities_to_vn(uint32_t caps) {
  960. if(caps & STREAM_CAP_COMPRESSION) return STREAM_OLD_VERSION_COMPRESSION;
  961. if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS;
  962. return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM)
  963. }