rrdpush.c 54 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdpush.h"
  3. /*
  4. * rrdpush
  5. *
  6. * 3 threads are involved for all stream operations
  7. *
  8. * 1. a random data collection thread, calling rrdset_done_push()
  9. * this is called for each chart.
  10. *
  11. * the output of this work is kept in a thread BUFFER
  12. * the sender thread is signalled via a pipe (in RRDHOST)
  13. *
  14. * 2. a sender thread running at the sending netdata
  15. * this is spawned automatically on the first chart to be pushed
  16. *
  17. * It tries to push the metrics to the remote netdata, as fast
  18. * as possible (i.e. immediately after they are collected).
  19. *
  20. * 3. a receiver thread, running at the receiving netdata
  21. * this is spawned automatically when the sender connects to
  22. * the receiver.
  23. *
  24. */
  25. struct config stream_config = {
  26. .first_section = NULL,
  27. .last_section = NULL,
  28. .mutex = NETDATA_MUTEX_INITIALIZER,
  29. .index = {
  30. .avl_tree = {
  31. .root = NULL,
  32. .compar = appconfig_section_compare
  33. },
  34. .rwlock = AVL_LOCK_INITIALIZER
  35. }
  36. };
  37. unsigned int default_rrdpush_enabled = 0;
  38. #ifdef ENABLE_RRDPUSH_COMPRESSION
  39. unsigned int default_rrdpush_compression_enabled = 1;
  40. #endif
  41. char *default_rrdpush_destination = NULL;
  42. char *default_rrdpush_api_key = NULL;
  43. char *default_rrdpush_send_charts_matching = NULL;
  44. bool default_rrdpush_enable_replication = true;
  45. time_t default_rrdpush_seconds_to_replicate = 86400;
  46. time_t default_rrdpush_replication_step = 600;
  47. #ifdef ENABLE_HTTPS
  48. char *netdata_ssl_ca_path = NULL;
  49. char *netdata_ssl_ca_file = NULL;
  50. #endif
  51. static void load_stream_conf() {
  52. errno = 0;
  53. char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
  54. if(!appconfig_load(&stream_config, filename, 0, NULL)) {
  55. netdata_log_info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
  56. freez(filename);
  57. filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
  58. if(!appconfig_load(&stream_config, filename, 0, NULL))
  59. netdata_log_info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
  60. }
  61. freez(filename);
  62. }
  63. STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
  64. // we can have DATA_WITH_ML when INTERPOLATED is available
  65. bool ml_capability = true;
  66. if(host && sender) {
  67. // we have DATA_WITH_ML capability
  68. // we should remove the DATA_WITH_ML capability if our database does not have anomaly info
  69. // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML
  70. netdata_mutex_lock(&host->receiver_lock);
  71. if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML))
  72. ml_capability = false;
  73. netdata_mutex_unlock(&host->receiver_lock);
  74. }
  75. return STREAM_CAP_V1 |
  76. STREAM_CAP_V2 |
  77. STREAM_CAP_VN |
  78. STREAM_CAP_VCAPS |
  79. STREAM_CAP_HLABELS |
  80. STREAM_CAP_CLAIM |
  81. STREAM_CAP_CLABELS |
  82. STREAM_CAP_FUNCTIONS |
  83. STREAM_CAP_REPLICATION |
  84. STREAM_CAP_BINARY |
  85. STREAM_CAP_INTERPOLATED |
  86. STREAM_HAS_COMPRESSION |
  87. #ifdef NETDATA_TEST_DYNCFG
  88. STREAM_CAP_DYNCFG |
  89. #endif
  90. (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
  91. (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) |
  92. 0;
  93. }
  94. bool rrdpush_receiver_needs_dbengine() {
  95. struct section *co;
  96. for(co = stream_config.first_section; co; co = co->next) {
  97. if(strcmp(co->name, "stream") == 0)
  98. continue; // the first section is not relevant
  99. char *s;
  100. s = appconfig_get_by_section(co, "enabled", NULL);
  101. if(!s || !appconfig_test_boolean_value(s))
  102. continue;
  103. s = appconfig_get_by_section(co, "default memory mode", NULL);
  104. if(s && strcmp(s, "dbengine") == 0)
  105. return true;
  106. s = appconfig_get_by_section(co, "memory mode", NULL);
  107. if(s && strcmp(s, "dbengine") == 0)
  108. return true;
  109. }
  110. return false;
  111. }
  112. int rrdpush_init() {
  113. // --------------------------------------------------------------------
  114. // load stream.conf
  115. load_stream_conf();
  116. default_rrdpush_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
  117. default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
  118. default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
  119. default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
  120. default_rrdpush_enable_replication = config_get_boolean(CONFIG_SECTION_DB, "enable replication", default_rrdpush_enable_replication);
  121. default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate);
  122. default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB, "seconds per replication step", default_rrdpush_replication_step);
  123. rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s);
  124. #ifdef ENABLE_RRDPUSH_COMPRESSION
  125. default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
  126. "enable compression", default_rrdpush_compression_enabled);
  127. #endif
  128. if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
  129. netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing.");
  130. default_rrdpush_enabled = 0;
  131. }
  132. #ifdef ENABLE_HTTPS
  133. netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate);
  134. if(!netdata_ssl_validate_certificate_sender)
  135. netdata_log_info("SSL: streaming senders will skip SSL certificates verification.");
  136. netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL);
  137. netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL);
  138. #endif
  139. return default_rrdpush_enabled;
  140. }
  141. // data collection happens from multiple threads
  142. // each of these threads calls rrdset_done()
  143. // which in turn calls rrdset_done_push()
  144. // which uses this pipe to notify the streaming thread
  145. // that there are more data ready to be sent
  146. #define PIPE_READ 0
  147. #define PIPE_WRITE 1
  148. // to have the remote netdata re-sync the charts
  149. // to its current clock, we send for this many
  150. // iterations a BEGIN line without microseconds
  151. // this is for the first iterations of each chart
  152. unsigned int remote_clock_resync_iterations = 60;
  153. static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) {
  154. if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
  155. return false;
  156. if(unlikely(!(flags & (RRDSET_FLAG_UPSTREAM_SEND | RRDSET_FLAG_UPSTREAM_IGNORE)))) {
  157. RRDHOST *host = st->rrdhost;
  158. if (flags & RRDSET_FLAG_ANOMALY_DETECTION) {
  159. if(ml_streaming_enabled())
  160. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
  161. else
  162. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
  163. }
  164. else if(simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->id) ||
  165. simple_pattern_matches_string(host->rrdpush_send_charts_matching, st->name))
  166. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
  167. else
  168. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
  169. // get the flags again, to know how to respond
  170. flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE);
  171. }
  172. return flags & RRDSET_FLAG_UPSTREAM_SEND;
  173. }
  174. int configured_as_parent() {
  175. struct section *section = NULL;
  176. int is_parent = 0;
  177. appconfig_wrlock(&stream_config);
  178. for (section = stream_config.first_section; section; section = section->next) {
  179. uuid_t uuid;
  180. if (uuid_parse(section->name, uuid) != -1 &&
  181. appconfig_get_boolean_by_section(section, "enabled", 0)) {
  182. is_parent = 1;
  183. break;
  184. }
  185. }
  186. appconfig_unlock(&stream_config);
  187. return is_parent;
  188. }
  189. // chart labels
  190. static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  191. BUFFER *wb = (BUFFER *)data;
  192. buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls & ~(RRDLABEL_FLAG_INTERNAL));
  193. return 1;
  194. }
  195. static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) {
  196. if (st->rrdlabels) {
  197. if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0)
  198. buffer_sprintf(wb, "CLABEL_COMMIT\n");
  199. }
  200. }
  201. // Send the current chart definition.
  202. // Assumes that collector thread has already called sender_start for mutex / buffer state.
  203. static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
  204. bool replication_progress = false;
  205. RRDHOST *host = st->rrdhost;
  206. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
  207. // properly set the name for the remote end to parse it
  208. char *name = "";
  209. if(likely(st->name)) {
  210. if(unlikely(st->id != st->name)) {
  211. // they differ
  212. name = strchr(rrdset_name(st), '.');
  213. if(name)
  214. name++;
  215. else
  216. name = "";
  217. }
  218. }
  219. // send the chart
  220. buffer_sprintf(
  221. wb
  222. , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
  223. , rrdset_id(st)
  224. , name
  225. , rrdset_title(st)
  226. , rrdset_units(st)
  227. , rrdset_family(st)
  228. , rrdset_context(st)
  229. , rrdset_type_name(st->chart_type)
  230. , st->priority
  231. , st->update_every
  232. , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":""
  233. , rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":""
  234. , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
  235. , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":""
  236. , rrdset_plugin_name(st)
  237. , rrdset_module_name(st)
  238. );
  239. // send the chart labels
  240. if (stream_has_capability(host->sender, STREAM_CAP_CLABELS))
  241. rrdpush_send_clabels(wb, st);
  242. // send the dimensions
  243. RRDDIM *rd;
  244. rrddim_foreach_read(rd, st) {
  245. buffer_sprintf(
  246. wb
  247. , "DIMENSION \"%s\" \"%s\" \"%s\" %d %d \"%s %s %s\"\n"
  248. , rrddim_id(rd)
  249. , rrddim_name(rd)
  250. , rrd_algorithm_name(rd->algorithm)
  251. , rd->multiplier
  252. , rd->divisor
  253. , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
  254. , rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)?"hidden":""
  255. , rrddim_option_check(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
  256. );
  257. rrddim_set_exposed(rd);
  258. }
  259. rrddim_foreach_done(rd);
  260. // send the chart functions
  261. if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
  262. rrd_functions_expose_rrdpush(st, wb);
  263. // send the chart local custom variables
  264. rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
  265. if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) {
  266. time_t db_first_time_t, db_last_time_t;
  267. time_t now = now_realtime_sec();
  268. rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_time_t, &db_last_time_t, now, 0);
  269. buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n",
  270. (unsigned long long)db_first_time_t,
  271. (unsigned long long)db_last_time_t,
  272. (unsigned long long)now);
  273. if(!rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  274. rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
  275. rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  276. rrdhost_sender_replicating_charts_plus_one(st->rrdhost);
  277. }
  278. replication_progress = true;
  279. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  280. internal_error(true, "REPLAY: 'host:%s/chart:%s' replication starts",
  281. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  282. #endif
  283. }
  284. st->upstream_resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
  285. return replication_progress;
  286. }
  287. // sends the current chart dimensions
  288. static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s __maybe_unused, RRDSET_FLAGS flags) {
  289. buffer_fast_strcat(wb, "BEGIN \"", 7);
  290. buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
  291. buffer_fast_strcat(wb, "\" ", 2);
  292. if(st->last_collected_time.tv_sec > st->upstream_resync_time_s)
  293. buffer_print_uint64(wb, st->usec_since_last_update);
  294. else
  295. buffer_fast_strcat(wb, "0", 1);
  296. buffer_fast_strcat(wb, "\n", 1);
  297. RRDDIM *rd;
  298. rrddim_foreach_read(rd, st) {
  299. if(unlikely(!rrddim_check_updated(rd)))
  300. continue;
  301. if(likely(rrddim_check_exposed(rd))) {
  302. buffer_fast_strcat(wb, "SET \"", 5);
  303. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  304. buffer_fast_strcat(wb, "\" = ", 4);
  305. buffer_print_int64(wb, rd->collector.collected_value);
  306. buffer_fast_strcat(wb, "\n", 1);
  307. }
  308. else {
  309. internal_error(true, "STREAM: 'host:%s/chart:%s/dim:%s' flag 'exposed' is updated but not exposed",
  310. rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
  311. // we will include it in the next iteration
  312. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
  313. }
  314. }
  315. rrddim_foreach_done(rd);
  316. if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
  317. rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
  318. buffer_fast_strcat(wb, "END\n", 4);
  319. }
  320. static void rrdpush_sender_thread_spawn(RRDHOST *host);
  321. // Called from the internal collectors to mark a chart obsolete.
  322. bool rrdset_push_chart_definition_now(RRDSET *st) {
  323. RRDHOST *host = st->rrdhost;
  324. if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
  325. || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST))))
  326. return false;
  327. BUFFER *wb = sender_start(host->sender);
  328. rrdpush_send_chart_definition(wb, st);
  329. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  330. sender_thread_buffer_free();
  331. return true;
  332. }
  333. void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
  334. RRDHOST *host = st->rrdhost;
  335. rrdpush_send_chart_metrics(rsb->wb, st, host->sender, rsb->rrdset_flags);
  336. }
  337. void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) {
  338. if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags))
  339. return;
  340. NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  341. NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  342. BUFFER *wb = rsb->wb;
  343. time_t point_end_time_s = (time_t)(point_end_time_ut / USEC_PER_SEC);
  344. if(unlikely(rsb->last_point_end_time_s != point_end_time_s)) {
  345. if(unlikely(rsb->begin_v2_added))
  346. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
  347. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
  348. buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id));
  349. buffer_fast_strcat(wb, "' ", 2);
  350. buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every);
  351. buffer_fast_strcat(wb, " ", 1);
  352. buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s);
  353. buffer_fast_strcat(wb, " ", 1);
  354. if(point_end_time_s == rsb->wall_clock_time)
  355. buffer_fast_strcat(wb, "#", 1);
  356. else
  357. buffer_print_uint64_encoded(wb, integer_encoding, rsb->wall_clock_time);
  358. buffer_fast_strcat(wb, "\n", 1);
  359. rsb->last_point_end_time_s = point_end_time_s;
  360. rsb->begin_v2_added = true;
  361. }
  362. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
  363. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  364. buffer_fast_strcat(wb, "' ", 2);
  365. buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value);
  366. buffer_fast_strcat(wb, " ", 1);
  367. if((NETDATA_DOUBLE)rd->collector.last_collected_value == n)
  368. buffer_fast_strcat(wb, "#", 1);
  369. else
  370. buffer_print_netdata_double_encoded(wb, doubles_encoding, n);
  371. buffer_fast_strcat(wb, " ", 1);
  372. buffer_print_sn_flags(wb, flags, true);
  373. buffer_fast_strcat(wb, "\n", 1);
  374. }
  375. void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
  376. if(!rsb->wb)
  377. return;
  378. if(rsb->v2 && rsb->begin_v2_added) {
  379. if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
  380. rrdsetvar_print_to_streaming_custom_chart_variables(st, rsb->wb);
  381. buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
  382. }
  383. sender_commit(st->rrdhost->sender, rsb->wb, STREAM_TRAFFIC_TYPE_DATA);
  384. *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
  385. }
  386. // TODO enable this macro before release
  387. #define bail_if_no_cap(cap) \
  388. if(unlikely(!stream_has_capability(host->sender, cap))) { \
  389. return; \
  390. }
  391. #define dyncfg_check_can_push(host) \
  392. if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) \
  393. return; \
  394. bail_if_no_cap(STREAM_CAP_DYNCFG)
  395. // assumes job is locked and acquired!!!
  396. void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) {
  397. dyncfg_check_can_push(host);
  398. BUFFER *wb = sender_start(host->sender);
  399. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d\n", plugin_name, module_name, job->name, job_status2str(job->status), job->state);
  400. if (job->reason)
  401. buffer_sprintf(wb, " \"%s\"", job->reason);
  402. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  403. sender_thread_buffer_free();
  404. job->dirty = 0;
  405. }
  406. void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) {
  407. dyncfg_check_can_push(host);
  408. BUFFER *wb = sender_start(host->sender);
  409. buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name);
  410. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  411. sender_thread_buffer_free();
  412. }
  413. RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
  414. RRDHOST *host = st->rrdhost;
  415. // fetch the flags we need to check with one atomic operation
  416. RRDHOST_FLAGS host_flags = __atomic_load_n(&host->flags, __ATOMIC_SEQ_CST);
  417. // check if we are not connected
  418. if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) {
  419. if(unlikely(!(host_flags & (RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED))))
  420. rrdpush_sender_thread_spawn(host);
  421. if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
  422. rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
  423. netdata_log_error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
  424. }
  425. return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
  426. }
  427. else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
  428. netdata_log_info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
  429. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
  430. }
  431. if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
  432. BUFFER *wb = sender_start(host->sender);
  433. rrd_functions_expose_global_rrdpush(host, wb);
  434. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  435. }
  436. RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
  437. bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED);
  438. bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  439. if(unlikely((exposed_upstream && replication_in_progress) ||
  440. !should_send_chart_matching(st, rrdset_flags)))
  441. return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
  442. if(unlikely(!exposed_upstream)) {
  443. BUFFER *wb = sender_start(host->sender);
  444. replication_in_progress = rrdpush_send_chart_definition(wb, st);
  445. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  446. }
  447. if(replication_in_progress)
  448. return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
  449. return (RRDSET_STREAM_BUFFER) {
  450. .capabilities = host->sender->capabilities,
  451. .v2 = stream_has_capability(host->sender, STREAM_CAP_INTERPOLATED),
  452. .rrdset_flags = rrdset_flags,
  453. .wb = sender_start(host->sender),
  454. .wall_clock_time = wall_clock_time,
  455. };
  456. }
  457. // labels
  458. static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  459. BUFFER *wb = (BUFFER *)data;
  460. buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
  461. return 1;
  462. }
  463. void rrdpush_send_host_labels(RRDHOST *host) {
  464. if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
  465. || !stream_has_capability(host->sender, STREAM_CAP_HLABELS)))
  466. return;
  467. BUFFER *wb = sender_start(host->sender);
  468. rrdlabels_walkthrough_read(host->rrdlabels, send_labels_callback, wb);
  469. buffer_sprintf(wb, "OVERWRITE %s\n", "labels");
  470. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  471. sender_thread_buffer_free();
  472. }
  473. void rrdpush_send_global_functions(RRDHOST *host) {
  474. if(!stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
  475. return;
  476. if(unlikely(!rrdhost_can_send_definitions_to_parent(host)))
  477. return;
  478. BUFFER *wb = sender_start(host->sender);
  479. rrd_functions_expose_global_rrdpush(host, wb);
  480. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  481. sender_thread_buffer_free();
  482. }
  483. void rrdpush_send_dyncfg(RRDHOST *host) {
  484. dyncfg_check_can_push(host);
  485. BUFFER *wb = sender_start(host->sender);
  486. DICTIONARY *plugins_dict = host->configurable_plugins;
  487. struct configurable_plugin *plug;
  488. dfe_start_read(plugins_dict, plug) {
  489. buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name);
  490. struct module *mod;
  491. dfe_start_read(plug->modules, mod) {
  492. buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type));
  493. struct job *job;
  494. dfe_start_read(mod->jobs, job) {
  495. pthread_mutex_lock(&job->lock);
  496. buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags);
  497. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state);
  498. if (job->reason)
  499. buffer_sprintf(wb, " \"%s\"", job->reason);
  500. buffer_sprintf(wb, "\n");
  501. job->dirty = 0;
  502. pthread_mutex_unlock(&job->lock);
  503. } dfe_done(job);
  504. } dfe_done(mod);
  505. }
  506. dfe_done(plug);
  507. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  508. sender_thread_buffer_free();
  509. }
  510. void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name)
  511. {
  512. dyncfg_check_can_push(host);
  513. BUFFER *wb = sender_start(host->sender);
  514. buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name);
  515. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  516. sender_thread_buffer_free();
  517. }
  518. void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type)
  519. {
  520. dyncfg_check_can_push(host);
  521. BUFFER *wb = sender_start(host->sender);
  522. buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type));
  523. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  524. sender_thread_buffer_free();
  525. }
  526. void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags)
  527. {
  528. dyncfg_check_can_push(host);
  529. BUFFER *wb = sender_start(host->sender);
  530. buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags);
  531. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  532. sender_thread_buffer_free();
  533. }
  534. void rrdpush_send_claimed_id(RRDHOST *host) {
  535. if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
  536. return;
  537. if(unlikely(!rrdhost_can_send_definitions_to_parent(host)))
  538. return;
  539. BUFFER *wb = sender_start(host->sender);
  540. rrdhost_aclk_state_lock(host);
  541. buffer_sprintf(wb, "CLAIMED_ID %s %s\n", host->machine_guid, (host->aclk_state.claimed_id ? host->aclk_state.claimed_id : "NULL") );
  542. rrdhost_aclk_state_unlock(host);
  543. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  544. sender_thread_buffer_free();
  545. }
  546. int connect_to_one_of_destinations(
  547. RRDHOST *host,
  548. int default_port,
  549. struct timeval *timeout,
  550. size_t *reconnects_counter,
  551. char *connected_to,
  552. size_t connected_to_size,
  553. struct rrdpush_destinations **destination)
  554. {
  555. int sock = -1;
  556. for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) {
  557. time_t now = now_realtime_sec();
  558. if(d->postpone_reconnection_until > now)
  559. continue;
  560. internal_error(true,
  561. "STREAM %s: connecting to '%s' (default port: %d)...",
  562. rrdhost_hostname(host),
  563. string2str(d->destination),
  564. default_port);
  565. if (reconnects_counter)
  566. *reconnects_counter += 1;
  567. d->since = now;
  568. d->attempts++;
  569. sock = connect_to_this(string2str(d->destination), default_port, timeout);
  570. if (sock != -1) {
  571. if (connected_to && connected_to_size)
  572. strncpyz(connected_to, string2str(d->destination), connected_to_size);
  573. *destination = d;
  574. // move the current item to the end of the list
  575. // without this, this destination will break the loop again and again
  576. // not advancing the destinations to find one that may work
  577. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, d, prev, next);
  578. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(host->destinations, d, prev, next);
  579. break;
  580. }
  581. }
  582. return sock;
  583. }
  584. struct destinations_init_tmp {
  585. RRDHOST *host;
  586. struct rrdpush_destinations *list;
  587. int count;
  588. };
  589. bool destinations_init_add_one(char *entry, void *data) {
  590. struct destinations_init_tmp *t = data;
  591. struct rrdpush_destinations *d = callocz(1, sizeof(struct rrdpush_destinations));
  592. char *colon_ssl = strstr(entry, ":SSL");
  593. if(colon_ssl) {
  594. *colon_ssl = '\0';
  595. d->ssl = true;
  596. }
  597. else
  598. d->ssl = false;
  599. d->destination = string_strdupz(entry);
  600. __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
  601. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next);
  602. t->count++;
  603. netdata_log_info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
  604. return false; // we return false, so that we will get all defined destinations
  605. }
  606. void rrdpush_destinations_init(RRDHOST *host) {
  607. if(!host->rrdpush_send_destination) return;
  608. rrdpush_destinations_free(host);
  609. struct destinations_init_tmp t = {
  610. .host = host,
  611. .list = NULL,
  612. .count = 0,
  613. };
  614. foreach_entry_in_connection_string(host->rrdpush_send_destination, destinations_init_add_one, &t);
  615. host->destinations = t.list;
  616. }
  617. void rrdpush_destinations_free(RRDHOST *host) {
  618. while (host->destinations) {
  619. struct rrdpush_destinations *tmp = host->destinations;
  620. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(host->destinations, tmp, prev, next);
  621. string_freez(tmp->destination);
  622. freez(tmp);
  623. __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_senders, sizeof(struct rrdpush_destinations), __ATOMIC_RELAXED);
  624. }
  625. host->destinations = NULL;
  626. }
  627. // ----------------------------------------------------------------------------
  628. // rrdpush sender thread
  629. // Either the receiver lost the connection or the host is being destroyed.
  630. // The sender mutex guards thread creation, any spurious data is wiped on reconnection.
  631. void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait) {
  632. if (!host->sender)
  633. return;
  634. sender_lock(host->sender);
  635. if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
  636. host->sender->exit.shutdown = true;
  637. host->sender->exit.reason = reason;
  638. // signal it to cancel
  639. netdata_thread_cancel(host->rrdpush_sender_thread);
  640. }
  641. sender_unlock(host->sender);
  642. if(wait) {
  643. sender_lock(host->sender);
  644. while(host->sender->tid) {
  645. sender_unlock(host->sender);
  646. sleep_usec(10 * USEC_PER_MS);
  647. sender_lock(host->sender);
  648. }
  649. sender_unlock(host->sender);
  650. }
  651. }
  652. // ----------------------------------------------------------------------------
  653. // rrdpush receiver thread
  654. void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
  655. netdata_log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
  656. }
  657. static void rrdpush_sender_thread_spawn(RRDHOST *host) {
  658. sender_lock(host->sender);
  659. if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
  660. char tag[NETDATA_THREAD_TAG_MAX + 1];
  661. snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host));
  662. if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender))
  663. netdata_log_error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
  664. else
  665. rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
  666. }
  667. sender_unlock(host->sender);
  668. }
  669. int rrdpush_receiver_permission_denied(struct web_client *w) {
  670. // we always respond with the same message and error code
  671. // to prevent an attacker from gaining info about the error
  672. buffer_flush(w->response.data);
  673. buffer_strcat(w->response.data, START_STREAMING_ERROR_NOT_PERMITTED);
  674. return HTTP_RESP_UNAUTHORIZED;
  675. }
  676. int rrdpush_receiver_too_busy_now(struct web_client *w) {
  677. // we always respond with the same message and error code
  678. // to prevent an attacker from gaining info about the error
  679. buffer_flush(w->response.data);
  680. buffer_strcat(w->response.data, START_STREAMING_ERROR_BUSY_TRY_LATER);
  681. return HTTP_RESP_SERVICE_UNAVAILABLE;
  682. }
  683. static void rrdpush_receiver_takeover_web_connection(struct web_client *w, struct receiver_state *rpt) {
  684. rpt->fd = w->ifd;
  685. #ifdef ENABLE_HTTPS
  686. rpt->ssl.conn = w->ssl.conn;
  687. rpt->ssl.state = w->ssl.state;
  688. w->ssl = NETDATA_SSL_UNSET_CONNECTION;
  689. #endif
  690. WEB_CLIENT_IS_DEAD(w);
  691. if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) {
  692. web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET);
  693. }
  694. else {
  695. if(w->ifd == w->ofd)
  696. w->ifd = w->ofd = -1;
  697. else
  698. w->ifd = -1;
  699. }
  700. buffer_flush(w->response.data);
  701. }
  702. void *rrdpush_receiver_thread(void *ptr);
  703. int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string) {
  704. if(!service_running(ABILITY_STREAMING_CONNECTIONS))
  705. return rrdpush_receiver_too_busy_now(w);
  706. struct receiver_state *rpt = callocz(1, sizeof(*rpt));
  707. rpt->last_msg_t = now_monotonic_sec();
  708. rpt->capabilities = STREAM_CAP_INVALID;
  709. rpt->hops = 1;
  710. __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
  711. __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
  712. rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info));
  713. rpt->system_info->hops = rpt->hops;
  714. rpt->fd = -1;
  715. rpt->client_ip = strdupz(w->client_ip);
  716. rpt->client_port = strdupz(w->client_port);
  717. #ifdef ENABLE_HTTPS
  718. rpt->ssl = NETDATA_SSL_UNSET_CONNECTION;
  719. #endif
  720. rpt->config.update_every = default_rrd_update_every;
  721. // parse the parameters and fill rpt and rpt->system_info
  722. while(decoded_query_string) {
  723. char *value = strsep_skip_consecutive_separators(&decoded_query_string, "&");
  724. if(!value || !*value) continue;
  725. char *name = strsep_skip_consecutive_separators(&value, "=");
  726. if(!name || !*name) continue;
  727. if(!value || !*value) continue;
  728. if(!strcmp(name, "key") && !rpt->key)
  729. rpt->key = strdupz(value);
  730. else if(!strcmp(name, "hostname") && !rpt->hostname)
  731. rpt->hostname = strdupz(value);
  732. else if(!strcmp(name, "registry_hostname") && !rpt->registry_hostname)
  733. rpt->registry_hostname = strdupz(value);
  734. else if(!strcmp(name, "machine_guid") && !rpt->machine_guid)
  735. rpt->machine_guid = strdupz(value);
  736. else if(!strcmp(name, "update_every"))
  737. rpt->config.update_every = (int)strtoul(value, NULL, 0);
  738. else if(!strcmp(name, "os") && !rpt->os)
  739. rpt->os = strdupz(value);
  740. else if(!strcmp(name, "timezone") && !rpt->timezone)
  741. rpt->timezone = strdupz(value);
  742. else if(!strcmp(name, "abbrev_timezone") && !rpt->abbrev_timezone)
  743. rpt->abbrev_timezone = strdupz(value);
  744. else if(!strcmp(name, "utc_offset"))
  745. rpt->utc_offset = (int32_t)strtol(value, NULL, 0);
  746. else if(!strcmp(name, "hops"))
  747. rpt->hops = rpt->system_info->hops = (uint16_t) strtoul(value, NULL, 0);
  748. else if(!strcmp(name, "ml_capable"))
  749. rpt->system_info->ml_capable = strtoul(value, NULL, 0);
  750. else if(!strcmp(name, "ml_enabled"))
  751. rpt->system_info->ml_enabled = strtoul(value, NULL, 0);
  752. else if(!strcmp(name, "mc_version"))
  753. rpt->system_info->mc_version = strtoul(value, NULL, 0);
  754. else if(!strcmp(name, "tags") && !rpt->tags)
  755. rpt->tags = strdupz(value);
  756. else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID))
  757. rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0), NULL, false);
  758. else {
  759. // An old Netdata child does not have a compatible streaming protocol, map to something sane.
  760. if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME"))
  761. name = "NETDATA_HOST_OS_NAME";
  762. else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID"))
  763. name = "NETDATA_HOST_OS_ID";
  764. else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE"))
  765. name = "NETDATA_HOST_OS_ID_LIKE";
  766. else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION"))
  767. name = "NETDATA_HOST_OS_VERSION";
  768. else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID"))
  769. name = "NETDATA_HOST_OS_VERSION_ID";
  770. else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION"))
  771. name = "NETDATA_HOST_OS_DETECTION";
  772. else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID))
  773. rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false);
  774. if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) {
  775. netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
  776. "request has parameter '%s' = '%s', which is not used."
  777. , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-"
  778. , rpt->client_ip, rpt->client_port
  779. , name, value);
  780. }
  781. }
  782. }
  783. if (rpt->capabilities & STREAM_CAP_INVALID)
  784. // no version is supplied, assume version 0;
  785. rpt->capabilities = convert_stream_version_to_capabilities(0, NULL, false);
  786. // find the program name and version
  787. if(w->user_agent && w->user_agent[0]) {
  788. char *t = strchr(w->user_agent, '/');
  789. if(t && *t) {
  790. *t = '\0';
  791. t++;
  792. }
  793. rpt->program_name = strdupz(w->user_agent);
  794. if(t && *t) rpt->program_version = strdupz(t);
  795. }
  796. // check if we should accept this connection
  797. if(!rpt->key || !*rpt->key) {
  798. rrdpush_receive_log_status(
  799. rpt,
  800. "request without an API key",
  801. "NO API KEY PERMISSION DENIED");
  802. receiver_state_free(rpt);
  803. return rrdpush_receiver_permission_denied(w);
  804. }
  805. if(!rpt->hostname || !*rpt->hostname) {
  806. rrdpush_receive_log_status(
  807. rpt,
  808. "request without a hostname",
  809. "NO HOSTNAME PERMISSION DENIED");
  810. receiver_state_free(rpt);
  811. return rrdpush_receiver_permission_denied(w);
  812. }
  813. if(!rpt->registry_hostname)
  814. rpt->registry_hostname = strdupz(rpt->hostname);
  815. if(!rpt->machine_guid || !*rpt->machine_guid) {
  816. rrdpush_receive_log_status(
  817. rpt,
  818. "request without a machine GUID",
  819. "NO MACHINE GUID PERMISSION DENIED");
  820. receiver_state_free(rpt);
  821. return rrdpush_receiver_permission_denied(w);
  822. }
  823. {
  824. char buf[GUID_LEN + 1];
  825. if (regenerate_guid(rpt->key, buf) == -1) {
  826. rrdpush_receive_log_status(
  827. rpt,
  828. "API key is not a valid UUID (use the command uuidgen to generate one)",
  829. "INVALID API KEY PERMISSION DENIED");
  830. receiver_state_free(rpt);
  831. return rrdpush_receiver_permission_denied(w);
  832. }
  833. if (regenerate_guid(rpt->machine_guid, buf) == -1) {
  834. rrdpush_receive_log_status(
  835. rpt,
  836. "machine GUID is not a valid UUID",
  837. "INVALID MACHINE GUID PERMISSION DENIED");
  838. receiver_state_free(rpt);
  839. return rrdpush_receiver_permission_denied(w);
  840. }
  841. }
  842. const char *api_key_type = appconfig_get(&stream_config, rpt->key, "type", "api");
  843. if(!api_key_type || !*api_key_type) api_key_type = "unknown";
  844. if(strcmp(api_key_type, "api") != 0) {
  845. rrdpush_receive_log_status(
  846. rpt,
  847. "API key is a machine GUID",
  848. "INVALID API KEY PERMISSION DENIED");
  849. receiver_state_free(rpt);
  850. return rrdpush_receiver_permission_denied(w);
  851. }
  852. if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) {
  853. rrdpush_receive_log_status(
  854. rpt,
  855. "API key is not enabled",
  856. "API KEY DISABLED PERMISSION DENIED");
  857. receiver_state_free(rpt);
  858. return rrdpush_receiver_permission_denied(w);
  859. }
  860. {
  861. SIMPLE_PATTERN *key_allow_from = simple_pattern_create(
  862. appconfig_get(&stream_config, rpt->key, "allow from", "*"),
  863. NULL, SIMPLE_PATTERN_EXACT, true);
  864. if(key_allow_from) {
  865. if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
  866. simple_pattern_free(key_allow_from);
  867. rrdpush_receive_log_status(
  868. rpt,
  869. "API key is not allowed from this IP",
  870. "NOT ALLOWED IP PERMISSION DENIED");
  871. receiver_state_free(rpt);
  872. return rrdpush_receiver_permission_denied(w);
  873. }
  874. simple_pattern_free(key_allow_from);
  875. }
  876. }
  877. {
  878. const char *machine_guid_type = appconfig_get(&stream_config, rpt->machine_guid, "type", "machine");
  879. if (!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown";
  880. if (strcmp(machine_guid_type, "machine") != 0) {
  881. rrdpush_receive_log_status(
  882. rpt,
  883. "machine GUID is an API key",
  884. "INVALID MACHINE GUID PERMISSION DENIED");
  885. receiver_state_free(rpt);
  886. return rrdpush_receiver_permission_denied(w);
  887. }
  888. }
  889. if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) {
  890. rrdpush_receive_log_status(
  891. rpt,
  892. "machine GUID is not enabled",
  893. "MACHINE GUID DISABLED PERMISSION DENIED");
  894. receiver_state_free(rpt);
  895. return rrdpush_receiver_permission_denied(w);
  896. }
  897. {
  898. SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(
  899. appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"),
  900. NULL, SIMPLE_PATTERN_EXACT, true);
  901. if(machine_allow_from) {
  902. if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
  903. simple_pattern_free(machine_allow_from);
  904. rrdpush_receive_log_status(
  905. rpt,
  906. "machine GUID is not allowed from this IP",
  907. "NOT ALLOWED IP PERMISSION DENIED");
  908. receiver_state_free(rpt);
  909. return rrdpush_receiver_permission_denied(w);
  910. }
  911. simple_pattern_free(machine_allow_from);
  912. }
  913. }
  914. if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
  915. rrdpush_receiver_takeover_web_connection(w, rpt);
  916. rrdpush_receive_log_status(
  917. rpt,
  918. "machine GUID is my own",
  919. "LOCALHOST PERMISSION DENIED");
  920. char initial_response[HTTP_HEADER_SIZE + 1];
  921. snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
  922. if(send_timeout(
  923. #ifdef ENABLE_HTTPS
  924. &rpt->ssl,
  925. #endif
  926. rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
  927. netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
  928. "failed to reply."
  929. , rpt->hostname
  930. , rpt->client_ip, rpt->client_port
  931. );
  932. }
  933. receiver_state_free(rpt);
  934. return HTTP_RESP_OK;
  935. }
  936. if(unlikely(web_client_streaming_rate_t > 0)) {
  937. static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
  938. static time_t last_stream_accepted_t = 0;
  939. time_t now = now_realtime_sec();
  940. spinlock_lock(&spinlock);
  941. if(unlikely(last_stream_accepted_t == 0))
  942. last_stream_accepted_t = now;
  943. if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
  944. spinlock_unlock(&spinlock);
  945. char msg[100 + 1];
  946. snprintfz(msg, 100,
  947. "rate limit, will accept new connection in %ld secs",
  948. (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
  949. rrdpush_receive_log_status(
  950. rpt,
  951. msg,
  952. "RATE LIMIT TRY LATER");
  953. receiver_state_free(rpt);
  954. return rrdpush_receiver_too_busy_now(w);
  955. }
  956. last_stream_accepted_t = now;
  957. spinlock_unlock(&spinlock);
  958. }
  959. /*
  960. * Quick path for rejecting multiple connections. The lock taken is fine-grained - it only protects the receiver
  961. * pointer within the host (if a host exists). This protects against multiple concurrent web requests hitting
  962. * separate threads within the web-server and landing here. The lock guards the thread-shutdown sequence that
  963. * detaches the receiver from the host. If the host is being created (first time-access) then we also use the
  964. * lock to prevent race-hazard (two threads try to create the host concurrently, one wins and the other does a
  965. * lookup to the now-attached structure).
  966. */
  967. {
  968. time_t age = 0;
  969. bool receiver_stale = false;
  970. bool receiver_working = false;
  971. rrd_rdlock();
  972. RRDHOST *host = rrdhost_find_by_guid(rpt->machine_guid);
  973. if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
  974. host = NULL;
  975. if (host) {
  976. netdata_mutex_lock(&host->receiver_lock);
  977. if (host->receiver) {
  978. age = now_monotonic_sec() - host->receiver->last_msg_t;
  979. if (age < 30)
  980. receiver_working = true;
  981. else
  982. receiver_stale = true;
  983. }
  984. netdata_mutex_unlock(&host->receiver_lock);
  985. }
  986. rrd_unlock();
  987. if (receiver_stale && stop_streaming_receiver(host, STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER)) {
  988. // we stopped the receiver
  989. // we can proceed with this connection
  990. receiver_stale = false;
  991. netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
  992. "stopped previous stale receiver to accept this one."
  993. , rpt->hostname
  994. , rpt->client_ip, rpt->client_port
  995. );
  996. }
  997. if (receiver_working || receiver_stale) {
  998. // another receiver is already connected
  999. // try again later
  1000. #ifdef NETDATA_INTERNAL_CHECKS
  1001. char msg[200 + 1];
  1002. snprintfz(msg, 200,
  1003. "multiple connections for same host, "
  1004. "old connection was used %ld secs ago%s",
  1005. age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)");
  1006. rrdpush_receive_log_status(
  1007. rpt,
  1008. msg,
  1009. "ALREADY CONNECTED");
  1010. #endif
  1011. // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
  1012. buffer_flush(w->response.data);
  1013. buffer_strcat(w->response.data, START_STREAMING_ERROR_ALREADY_STREAMING);
  1014. receiver_state_free(rpt);
  1015. return HTTP_RESP_CONFLICT;
  1016. }
  1017. }
  1018. netdata_log_debug(D_SYSTEM, "starting STREAM receive thread.");
  1019. rrdpush_receiver_takeover_web_connection(w, rpt);
  1020. char tag[NETDATA_THREAD_TAG_MAX + 1];
  1021. snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_RECEIVER "[%s]", rpt->hostname);
  1022. tag[NETDATA_THREAD_TAG_MAX] = '\0';
  1023. if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) {
  1024. rrdpush_receive_log_status(
  1025. rpt,
  1026. "can't create receiver thread",
  1027. "INTERNAL SERVER ERROR");
  1028. buffer_flush(w->response.data);
  1029. buffer_strcat(w->response.data, "Can't handle this request");
  1030. receiver_state_free(rpt);
  1031. return HTTP_RESP_INTERNAL_SERVER_ERROR;
  1032. }
  1033. // prevent the caller from closing the streaming socket
  1034. return HTTP_RESP_OK;
  1035. }
  1036. void rrdpush_reset_destinations_postpone_time(RRDHOST *host) {
  1037. uint32_t wait = (host->sender) ? host->sender->reconnect_delay : 5;
  1038. time_t now = now_realtime_sec();
  1039. for (struct rrdpush_destinations *d = host->destinations; d; d = d->next)
  1040. d->postpone_reconnection_until = now + wait;
  1041. }
  1042. static struct {
  1043. STREAM_HANDSHAKE err;
  1044. const char *str;
  1045. } handshake_errors[] = {
  1046. { STREAM_HANDSHAKE_OK_V3, "CONNECTED" },
  1047. { STREAM_HANDSHAKE_OK_V2, "CONNECTED" },
  1048. { STREAM_HANDSHAKE_OK_V1, "CONNECTED" },
  1049. { STREAM_HANDSHAKE_NEVER, "" },
  1050. { STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, "BAD HANDSHAKE" },
  1051. { STREAM_HANDSHAKE_ERROR_LOCALHOST, "LOCALHOST" },
  1052. { STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, "ALREADY CONNECTED" },
  1053. { STREAM_HANDSHAKE_ERROR_DENIED, "DENIED" },
  1054. { STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT, "SEND TIMEOUT" },
  1055. { STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT, "RECEIVE TIMEOUT" },
  1056. { STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE, "INVALID CERTIFICATE" },
  1057. { STREAM_HANDSHAKE_ERROR_SSL_ERROR, "SSL ERROR" },
  1058. { STREAM_HANDSHAKE_ERROR_CANT_CONNECT, "CANT CONNECT" },
  1059. { STREAM_HANDSHAKE_BUSY_TRY_LATER, "BUSY TRY LATER" },
  1060. { STREAM_HANDSHAKE_INTERNAL_ERROR, "INTERNAL ERROR" },
  1061. { STREAM_HANDSHAKE_INITIALIZATION, "REMOTE IS INITIALIZING" },
  1062. { STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, "DISCONNECTED HOST CLEANUP" },
  1063. { STREAM_HANDSHAKE_DISCONNECT_STALE_RECEIVER, "DISCONNECTED STALE RECEIVER" },
  1064. { STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, "DISCONNECTED SHUTDOWN REQUESTED" },
  1065. { STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, "DISCONNECTED NETDATA EXIT" },
  1066. { STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, "DISCONNECTED PARSE ENDED" },
  1067. { STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, "DISCONNECTED SOCKET READ ERROR" },
  1068. { STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, "DISCONNECTED PARSE ERROR" },
  1069. { STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, "DISCONNECTED RECEIVER LEFT" },
  1070. { STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST, "DISCONNECTED ORPHAN HOST" },
  1071. { STREAM_HANDSHAKE_NON_STREAMABLE_HOST, "NON STREAMABLE HOST" },
  1072. { 0, NULL },
  1073. };
  1074. const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error) {
  1075. if(handshake_error >= STREAM_HANDSHAKE_OK_V1)
  1076. // handshake_error is the whole version / capabilities number
  1077. return "CONNECTED";
  1078. for(size_t i = 0; handshake_errors[i].str ; i++) {
  1079. if(handshake_error == handshake_errors[i].err)
  1080. return handshake_errors[i].str;
  1081. }
  1082. return "UNKNOWN";
  1083. }
  1084. static struct {
  1085. STREAM_CAPABILITIES cap;
  1086. const char *str;
  1087. } capability_names[] = {
  1088. { STREAM_CAP_V1, "V1" },
  1089. { STREAM_CAP_V2, "V2" },
  1090. { STREAM_CAP_VN, "VN" },
  1091. { STREAM_CAP_VCAPS, "VCAPS" },
  1092. { STREAM_CAP_HLABELS, "HLABELS" },
  1093. { STREAM_CAP_CLAIM, "CLAIM" },
  1094. { STREAM_CAP_CLABELS, "CLABELS" },
  1095. { STREAM_CAP_COMPRESSION, "COMPRESSION" },
  1096. { STREAM_CAP_FUNCTIONS, "FUNCTIONS" },
  1097. { STREAM_CAP_REPLICATION, "REPLICATION" },
  1098. { STREAM_CAP_BINARY, "BINARY" },
  1099. { STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
  1100. { STREAM_CAP_IEEE754, "IEEE754" },
  1101. { STREAM_CAP_DATA_WITH_ML, "ML" },
  1102. { STREAM_CAP_DYNCFG, "DYN_CFG" },
  1103. { 0 , NULL },
  1104. };
  1105. static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
  1106. for(size_t i = 0; capability_names[i].str ; i++) {
  1107. if(caps & capability_names[i].cap) {
  1108. buffer_strcat(wb, capability_names[i].str);
  1109. buffer_strcat(wb, " ");
  1110. }
  1111. }
  1112. }
  1113. void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) {
  1114. if(key)
  1115. buffer_json_member_add_array(wb, key);
  1116. else
  1117. buffer_json_add_array_item_array(wb);
  1118. for(size_t i = 0; capability_names[i].str ; i++) {
  1119. if(caps & capability_names[i].cap)
  1120. buffer_json_add_array_item_string(wb, capability_names[i].str);
  1121. }
  1122. buffer_json_array_close(wb);
  1123. }
  1124. void log_receiver_capabilities(struct receiver_state *rpt) {
  1125. BUFFER *wb = buffer_create(100, NULL);
  1126. stream_capabilities_to_string(wb, rpt->capabilities);
  1127. netdata_log_info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
  1128. rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
  1129. buffer_free(wb);
  1130. }
  1131. void log_sender_capabilities(struct sender_state *s) {
  1132. BUFFER *wb = buffer_create(100, NULL);
  1133. stream_capabilities_to_string(wb, s->capabilities);
  1134. netdata_log_info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
  1135. rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
  1136. buffer_free(wb);
  1137. }
  1138. STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) {
  1139. STREAM_CAPABILITIES caps = 0;
  1140. if(version <= 1) caps = STREAM_CAP_V1;
  1141. else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS;
  1142. else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM;
  1143. else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS;
  1144. else if(version <= STREAM_OLD_VERSION_COMPRESSION) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION;
  1145. else caps = version;
  1146. if(caps & STREAM_CAP_VCAPS)
  1147. caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2|STREAM_CAP_VN);
  1148. if(caps & STREAM_CAP_VN)
  1149. caps &= ~(STREAM_CAP_V1|STREAM_CAP_V2);
  1150. if(caps & STREAM_CAP_V2)
  1151. caps &= ~(STREAM_CAP_V1);
  1152. STREAM_CAPABILITIES common_caps = caps & stream_our_capabilities(host, sender);
  1153. if(!(common_caps & STREAM_CAP_INTERPOLATED))
  1154. // DATA WITH ML requires INTERPOLATED
  1155. common_caps &= ~STREAM_CAP_DATA_WITH_ML;
  1156. return common_caps;
  1157. }
  1158. int32_t stream_capabilities_to_vn(uint32_t caps) {
  1159. if(caps & STREAM_CAP_COMPRESSION) return STREAM_OLD_VERSION_COMPRESSION;
  1160. if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS;
  1161. return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM)
  1162. }