rrdpush.c 61 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdpush.h"
  3. /*
  4. * rrdpush
  5. *
  6. * 3 threads are involved for all stream operations
  7. *
  8. * 1. a random data collection thread, calling rrdset_done_push()
  9. * this is called for each chart.
  10. *
  11. * the output of this work is kept in a BUFFER in RRDHOST
  12. * the sender thread is signalled via a pipe (also in RRDHOST)
  13. *
  14. * 2. a sender thread running at the sending netdata
  15. * this is spawned automatically on the first chart to be pushed
  16. *
  17. * It tries to push the metrics to the remote netdata, as fast
  18. * as possible (i.e. immediately after they are collected).
  19. *
  20. * 3. a receiver thread, running at the receiving netdata
  21. * this is spawned automatically when the sender connects to
  22. * the receiver.
  23. *
  24. */
  25. #define START_STREAMING_PROMPT "Hit me baby, push them over..."
  26. typedef enum {
  27. RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
  28. RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
  29. } RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
  30. static struct config stream_config = {
  31. .sections = NULL,
  32. .mutex = NETDATA_MUTEX_INITIALIZER,
  33. .index = {
  34. .avl_tree = {
  35. .root = NULL,
  36. .compar = appconfig_section_compare
  37. },
  38. .rwlock = AVL_LOCK_INITIALIZER
  39. }
  40. };
  41. unsigned int default_rrdpush_enabled = 0;
  42. char *default_rrdpush_destination = NULL;
  43. char *default_rrdpush_api_key = NULL;
  44. char *default_rrdpush_send_charts_matching = NULL;
  45. #ifdef ENABLE_HTTPS
  46. int netdata_use_ssl_on_stream = NETDATA_SSL_OPTIONAL;
  47. char *netdata_ssl_ca_path = NULL;
  48. char *netdata_ssl_ca_file = NULL;
  49. #endif
  50. static void load_stream_conf() {
  51. errno = 0;
  52. char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
  53. if(!appconfig_load(&stream_config, filename, 0)) {
  54. info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
  55. freez(filename);
  56. filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
  57. if(!appconfig_load(&stream_config, filename, 0))
  58. info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
  59. }
  60. freez(filename);
  61. }
  62. int rrdpush_init() {
  63. // --------------------------------------------------------------------
  64. // load stream.conf
  65. load_stream_conf();
  66. default_rrdpush_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
  67. default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
  68. default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
  69. default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
  70. rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
  71. if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
  72. error("STREAM [send]: cannot enable sending thread - information is missing.");
  73. default_rrdpush_enabled = 0;
  74. }
  75. #ifdef ENABLE_HTTPS
  76. if (netdata_use_ssl_on_stream == NETDATA_SSL_OPTIONAL) {
  77. if (default_rrdpush_destination){
  78. char *test = strstr(default_rrdpush_destination,":SSL");
  79. if(test){
  80. *test = 0X00;
  81. netdata_use_ssl_on_stream = NETDATA_SSL_FORCE;
  82. }
  83. }
  84. }
  85. char *invalid_certificate = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", "no");
  86. if ( !strcmp(invalid_certificate,"yes")){
  87. if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
  88. info("Netdata is configured to accept invalid SSL certificate.");
  89. netdata_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
  90. }
  91. }
  92. netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", "/etc/ssl/certs/");
  93. netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", "/etc/ssl/certs/certs.pem");
  94. #endif
  95. return default_rrdpush_enabled;
  96. }
  97. #define CONNECTED_TO_SIZE 100
  98. // data collection happens from multiple threads
  99. // each of these threads calls rrdset_done()
  100. // which in turn calls rrdset_done_push()
  101. // which uses this pipe to notify the streaming thread
  102. // that there are more data ready to be sent
  103. #define PIPE_READ 0
  104. #define PIPE_WRITE 1
  105. // to have the remote netdata re-sync the charts
  106. // to its current clock, we send for this many
  107. // iterations a BEGIN line without microseconds
  108. // this is for the first iterations of each chart
  109. unsigned int remote_clock_resync_iterations = 60;
  110. #define rrdpush_buffer_lock(host) netdata_mutex_lock(&((host)->rrdpush_sender_buffer_mutex))
  111. #define rrdpush_buffer_unlock(host) netdata_mutex_unlock(&((host)->rrdpush_sender_buffer_mutex))
  112. static inline int should_send_chart_matching(RRDSET *st) {
  113. if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) {
  114. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
  115. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
  116. }
  117. else if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) {
  118. RRDHOST *host = st->rrdhost;
  119. if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) ||
  120. simple_pattern_matches(host->rrdpush_send_charts_matching, st->name)) {
  121. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
  122. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
  123. }
  124. else {
  125. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
  126. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
  127. }
  128. }
  129. return(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND));
  130. }
  131. // checks if the current chart definition has been sent
  132. static inline int need_to_send_chart_definition(RRDSET *st) {
  133. rrdset_check_rdlock(st);
  134. if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))))
  135. return 1;
  136. RRDDIM *rd;
  137. rrddim_foreach_read(rd, st) {
  138. if(unlikely(!rd->exposed)) {
  139. #ifdef NETDATA_INTERNAL_CHECKS
  140. info("host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", st->rrdhost->hostname, st->id, rd->id);
  141. #endif
  142. return 1;
  143. }
  144. }
  145. return 0;
  146. }
  147. // sends the current chart definition
  148. static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
  149. RRDHOST *host = st->rrdhost;
  150. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
  151. // properly set the name for the remote end to parse it
  152. char *name = "";
  153. if(likely(st->name)) {
  154. if(unlikely(strcmp(st->id, st->name))) {
  155. // they differ
  156. name = strchr(st->name, '.');
  157. if(name)
  158. name++;
  159. else
  160. name = "";
  161. }
  162. }
  163. // info("CHART '%s' '%s'", st->id, name);
  164. // send the chart
  165. buffer_sprintf(
  166. host->rrdpush_sender_buffer
  167. , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
  168. , st->id
  169. , name
  170. , st->title
  171. , st->units
  172. , st->family
  173. , st->context
  174. , rrdset_type_name(st->chart_type)
  175. , st->priority
  176. , st->update_every
  177. , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":""
  178. , rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":""
  179. , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
  180. , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":""
  181. , (st->plugin_name)?st->plugin_name:""
  182. , (st->module_name)?st->module_name:""
  183. );
  184. // send the dimensions
  185. RRDDIM *rd;
  186. rrddim_foreach_read(rd, st) {
  187. buffer_sprintf(
  188. host->rrdpush_sender_buffer
  189. , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
  190. , rd->id
  191. , rd->name
  192. , rrd_algorithm_name(rd->algorithm)
  193. , rd->multiplier
  194. , rd->divisor
  195. , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
  196. , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
  197. , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
  198. );
  199. rd->exposed = 1;
  200. }
  201. // send the chart local custom variables
  202. RRDSETVAR *rs;
  203. for(rs = st->variables; rs ;rs = rs->next) {
  204. if(unlikely(rs->type == RRDVAR_TYPE_CALCULATED && rs->options & RRDVAR_OPTION_CUSTOM_CHART_VAR)) {
  205. calculated_number *value = (calculated_number *) rs->value;
  206. buffer_sprintf(
  207. host->rrdpush_sender_buffer
  208. , "VARIABLE CHART %s = " CALCULATED_NUMBER_FORMAT "\n"
  209. , rs->variable
  210. , *value
  211. );
  212. }
  213. }
  214. st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
  215. }
  216. // sends the current chart dimensions
  217. static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st) {
  218. RRDHOST *host = st->rrdhost;
  219. buffer_sprintf(host->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
  220. RRDDIM *rd;
  221. rrddim_foreach_read(rd, st) {
  222. if(rd->updated && rd->exposed)
  223. buffer_sprintf(host->rrdpush_sender_buffer
  224. , "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n"
  225. , rd->id
  226. , rd->collected_value
  227. );
  228. }
  229. buffer_strcat(host->rrdpush_sender_buffer, "END\n");
  230. }
  231. static void rrdpush_sender_thread_spawn(RRDHOST *host);
  232. void rrdset_push_chart_definition_now(RRDSET *st) {
  233. RRDHOST *host = st->rrdhost;
  234. if(unlikely(!host->rrdpush_send_enabled || !should_send_chart_matching(st)))
  235. return;
  236. rrdset_rdlock(st);
  237. rrdpush_buffer_lock(host);
  238. rrdpush_send_chart_definition_nolock(st);
  239. rrdpush_buffer_unlock(host);
  240. rrdset_unlock(st);
  241. }
  242. void rrdset_done_push(RRDSET *st) {
  243. if(unlikely(!should_send_chart_matching(st)))
  244. return;
  245. RRDHOST *host = st->rrdhost;
  246. rrdpush_buffer_lock(host);
  247. if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn))
  248. rrdpush_sender_thread_spawn(host);
  249. if(unlikely(!host->rrdpush_sender_buffer || !host->rrdpush_sender_connected)) {
  250. if(unlikely(!host->rrdpush_sender_error_shown))
  251. error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname);
  252. host->rrdpush_sender_error_shown = 1;
  253. rrdpush_buffer_unlock(host);
  254. return;
  255. }
  256. else if(unlikely(host->rrdpush_sender_error_shown)) {
  257. info("STREAM %s [send]: sending metrics...", host->hostname);
  258. host->rrdpush_sender_error_shown = 0;
  259. }
  260. if(need_to_send_chart_definition(st))
  261. rrdpush_send_chart_definition_nolock(st);
  262. rrdpush_send_chart_metrics_nolock(st);
  263. // signal the sender there are more data
  264. if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
  265. error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
  266. rrdpush_buffer_unlock(host);
  267. }
  268. // ----------------------------------------------------------------------------
  269. // rrdpush sender thread
  270. static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) {
  271. calculated_number *value = (calculated_number *)rv->value;
  272. buffer_sprintf(
  273. host->rrdpush_sender_buffer
  274. , "VARIABLE HOST %s = " CALCULATED_NUMBER_FORMAT "\n"
  275. , rv->name
  276. , *value
  277. );
  278. debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " CALCULATED_NUMBER_FORMAT, rv->name, *value);
  279. }
  280. void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) {
  281. if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && host->rrdpush_sender_connected) {
  282. rrdpush_buffer_lock(host);
  283. rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
  284. rrdpush_buffer_unlock(host);
  285. }
  286. }
  287. static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr, void *host_ptr) {
  288. RRDVAR *rv = (RRDVAR *)rrdvar_ptr;
  289. RRDHOST *host = (RRDHOST *)host_ptr;
  290. if(unlikely(rv->options & RRDVAR_OPTION_CUSTOM_HOST_VAR && rv->type == RRDVAR_TYPE_CALCULATED)) {
  291. rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
  292. // return 1, so that the traversal will return the number of variables sent
  293. return 1;
  294. }
  295. // returning a negative number will break the traversal
  296. return 0;
  297. }
  298. static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
  299. int ret = rrdvar_callback_for_all_host_variables(host, rrdpush_sender_thread_custom_host_variables_callback, host);
  300. (void)ret;
  301. debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
  302. }
  303. // resets all the chart, so that their definitions
  304. // will be resent to the central netdata
  305. static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
  306. rrdhost_rdlock(host);
  307. RRDSET *st;
  308. rrdset_foreach_read(st, host) {
  309. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
  310. st->upstream_resync_time = 0;
  311. rrdset_rdlock(st);
  312. RRDDIM *rd;
  313. rrddim_foreach_read(rd, st)
  314. rd->exposed = 0;
  315. rrdset_unlock(st);
  316. }
  317. rrdhost_unlock(host);
  318. }
  319. static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
  320. rrdpush_buffer_lock(host);
  321. if(buffer_strlen(host->rrdpush_sender_buffer))
  322. error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, buffer_strlen(host->rrdpush_sender_buffer));
  323. buffer_flush(host->rrdpush_sender_buffer);
  324. rrdpush_sender_thread_reset_all_charts(host);
  325. rrdpush_sender_thread_send_custom_host_variables(host);
  326. rrdpush_buffer_unlock(host);
  327. }
  328. void rrdpush_sender_thread_stop(RRDHOST *host) {
  329. rrdpush_buffer_lock(host);
  330. rrdhost_wrlock(host);
  331. netdata_thread_t thr = 0;
  332. if(host->rrdpush_sender_spawn) {
  333. info("STREAM %s [send]: signaling sending thread to stop...", host->hostname);
  334. // signal the thread that we want to join it
  335. host->rrdpush_sender_join = 1;
  336. // copy the thread id, so that we will be waiting for the right one
  337. // even if a new one has been spawn
  338. thr = host->rrdpush_sender_thread;
  339. // signal it to cancel
  340. netdata_thread_cancel(host->rrdpush_sender_thread);
  341. }
  342. rrdhost_unlock(host);
  343. rrdpush_buffer_unlock(host);
  344. if(thr != 0) {
  345. info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname);
  346. void *result;
  347. netdata_thread_join(thr, &result);
  348. info("STREAM %s [send]: sending thread has exited.", host->hostname);
  349. }
  350. }
  351. static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
  352. host->rrdpush_sender_connected = 0;
  353. if(host->rrdpush_sender_socket != -1) {
  354. close(host->rrdpush_sender_socket);
  355. host->rrdpush_sender_socket = -1;
  356. }
  357. }
  358. //called from client side
  359. static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_port, int timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size) {
  360. struct timeval tv = {
  361. .tv_sec = timeout,
  362. .tv_usec = 0
  363. };
  364. // make sure the socket is closed
  365. rrdpush_sender_thread_close_socket(host);
  366. debug(D_STREAM, "STREAM: Attempting to connect...");
  367. info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_send_destination);
  368. host->rrdpush_sender_socket = connect_to_one_of(
  369. host->rrdpush_send_destination
  370. , default_port
  371. , &tv
  372. , reconnects_counter
  373. , connected_to
  374. , connected_to_size
  375. );
  376. if(unlikely(host->rrdpush_sender_socket == -1)) {
  377. error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_send_destination);
  378. return 0;
  379. }
  380. info("STREAM %s [send to %s]: initializing communication...", host->hostname, connected_to);
  381. #ifdef ENABLE_HTTPS
  382. if( netdata_client_ctx ){
  383. host->ssl.flags = NETDATA_SSL_START;
  384. if (!host->ssl.conn){
  385. host->ssl.conn = SSL_new(netdata_client_ctx);
  386. if(!host->ssl.conn){
  387. error("Failed to allocate SSL structure.");
  388. host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  389. }
  390. }
  391. else{
  392. SSL_clear(host->ssl.conn);
  393. }
  394. if (host->ssl.conn)
  395. {
  396. if (SSL_set_fd(host->ssl.conn, host->rrdpush_sender_socket) != 1) {
  397. error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket);
  398. host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  399. } else{
  400. host->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
  401. }
  402. }
  403. }
  404. else {
  405. host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  406. }
  407. #endif
  408. #define HTTP_HEADER_SIZE 8192
  409. char http[HTTP_HEADER_SIZE + 1];
  410. int eol = snprintfz(http, HTTP_HEADER_SIZE,
  411. "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s"
  412. "&NETDATA_SYSTEM_OS_NAME=%s"
  413. "&NETDATA_SYSTEM_OS_ID=%s"
  414. "&NETDATA_SYSTEM_OS_ID_LIKE=%s"
  415. "&NETDATA_SYSTEM_OS_VERSION=%s"
  416. "&NETDATA_SYSTEM_OS_VERSION_ID=%s"
  417. "&NETDATA_SYSTEM_OS_DETECTION=%s"
  418. "&NETDATA_SYSTEM_KERNEL_NAME=%s"
  419. "&NETDATA_SYSTEM_KERNEL_VERSION=%s"
  420. "&NETDATA_SYSTEM_ARCHITECTURE=%s"
  421. "&NETDATA_SYSTEM_VIRTUALIZATION=%s"
  422. "&NETDATA_SYSTEM_VIRT_DETECTION=%s"
  423. "&NETDATA_SYSTEM_CONTAINER=%s"
  424. "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
  425. " HTTP/1.1\r\n"
  426. "User-Agent: %s/%s\r\n"
  427. "Accept: */*\r\n\r\n"
  428. , host->rrdpush_send_api_key
  429. , host->hostname
  430. , host->registry_hostname
  431. , host->machine_guid
  432. , default_rrd_update_every
  433. , host->os
  434. , host->timezone
  435. , (host->tags) ? host->tags : ""
  436. , (host->system_info->os_name) ? host->system_info->os_name : ""
  437. , (host->system_info->os_id) ? host->system_info->os_id : ""
  438. , (host->system_info->os_id_like) ? host->system_info->os_id_like : ""
  439. , (host->system_info->os_version) ? host->system_info->os_version : ""
  440. , (host->system_info->os_version_id) ? host->system_info->os_version_id : ""
  441. , (host->system_info->os_detection) ? host->system_info->os_detection : ""
  442. , (host->system_info->kernel_name) ? host->system_info->kernel_name : ""
  443. , (host->system_info->kernel_version) ? host->system_info->kernel_version : ""
  444. , (host->system_info->architecture) ? host->system_info->architecture : ""
  445. , (host->system_info->virtualization) ? host->system_info->virtualization : ""
  446. , (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
  447. , (host->system_info->container) ? host->system_info->container : ""
  448. , (host->system_info->container_detection) ? host->system_info->container_detection : ""
  449. , host->program_name
  450. , host->program_version
  451. );
  452. http[eol] = 0x00;
  453. #ifdef ENABLE_HTTPS
  454. if (!host->ssl.flags) {
  455. ERR_clear_error();
  456. SSL_set_connect_state(host->ssl.conn);
  457. int err = SSL_connect(host->ssl.conn);
  458. if (err != 1){
  459. err = SSL_get_error(host->ssl.conn, err);
  460. error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->ssl.conn,err),NULL));
  461. if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
  462. rrdpush_sender_thread_close_socket(host);
  463. return 0;
  464. }else {
  465. host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  466. }
  467. }
  468. else {
  469. if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) {
  470. if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE) {
  471. if ( security_test_certificate(host->ssl.conn)) {
  472. error("Closing the stream connection, because the server SSL certificate is not valid.");
  473. rrdpush_sender_thread_close_socket(host);
  474. return 0;
  475. }
  476. }
  477. }
  478. }
  479. }
  480. if(send_timeout(&host->ssl,host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
  481. #else
  482. if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
  483. #endif
  484. error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", host->hostname, connected_to);
  485. rrdpush_sender_thread_close_socket(host);
  486. return 0;
  487. }
  488. info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, connected_to);
  489. #ifdef ENABLE_HTTPS
  490. if(recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout) == -1) {
  491. #else
  492. if(recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout) == -1) {
  493. #endif
  494. error("STREAM %s [send to %s]: remote netdata does not respond.", host->hostname, connected_to);
  495. rrdpush_sender_thread_close_socket(host);
  496. return 0;
  497. }
  498. if(strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)) != 0) {
  499. error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, connected_to);
  500. rrdpush_sender_thread_close_socket(host);
  501. return 0;
  502. }
  503. info("STREAM %s [send to %s]: established communication - ready to send metrics...", host->hostname, connected_to);
  504. if(sock_setnonblock(host->rrdpush_sender_socket) < 0)
  505. error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, connected_to);
  506. if(sock_enlarge_out(host->rrdpush_sender_socket) < 0)
  507. error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", host->hostname, connected_to);
  508. debug(D_STREAM, "STREAM: Connected on fd %d...", host->rrdpush_sender_socket);
  509. return 1;
  510. }
  511. static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
  512. RRDHOST *host = (RRDHOST *)ptr;
  513. rrdpush_buffer_lock(host);
  514. rrdhost_wrlock(host);
  515. info("STREAM %s [send]: sending thread cleans up...", host->hostname);
  516. rrdpush_sender_thread_close_socket(host);
  517. // close the pipe
  518. if(host->rrdpush_sender_pipe[PIPE_READ] != -1) {
  519. close(host->rrdpush_sender_pipe[PIPE_READ]);
  520. host->rrdpush_sender_pipe[PIPE_READ] = -1;
  521. }
  522. if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1) {
  523. close(host->rrdpush_sender_pipe[PIPE_WRITE]);
  524. host->rrdpush_sender_pipe[PIPE_WRITE] = -1;
  525. }
  526. buffer_free(host->rrdpush_sender_buffer);
  527. host->rrdpush_sender_buffer = NULL;
  528. if(!host->rrdpush_sender_join) {
  529. info("STREAM %s [send]: sending thread detaches itself.", host->hostname);
  530. netdata_thread_detach(netdata_thread_self());
  531. }
  532. host->rrdpush_sender_spawn = 0;
  533. info("STREAM %s [send]: sending thread now exits.", host->hostname);
  534. rrdhost_unlock(host);
  535. rrdpush_buffer_unlock(host);
  536. }
  537. void *rrdpush_sender_thread(void *ptr) {
  538. RRDHOST *host = (RRDHOST *)ptr;
  539. if(!host->rrdpush_send_enabled || !host->rrdpush_send_destination || !*host->rrdpush_send_destination || !host->rrdpush_send_api_key || !*host->rrdpush_send_api_key) {
  540. error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", host->hostname, gettid());
  541. return NULL;
  542. }
  543. #ifdef ENABLE_HTTPS
  544. if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ){
  545. security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING);
  546. security_location_for_context(netdata_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
  547. }
  548. #endif
  549. info("STREAM %s [send]: thread created (task id %d)", host->hostname, gettid());
  550. int timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60);
  551. int default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
  552. size_t max_size = (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024);
  553. unsigned int reconnect_delay = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
  554. remote_clock_resync_iterations = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations);
  555. char connected_to[CONNECTED_TO_SIZE + 1] = "";
  556. // initialize rrdpush globals
  557. host->rrdpush_sender_buffer = buffer_create(1);
  558. host->rrdpush_sender_connected = 0;
  559. if(pipe(host->rrdpush_sender_pipe) == -1) fatal("STREAM %s [send]: cannot create required pipe.", host->hostname);
  560. // initialize local variables
  561. size_t begin = 0;
  562. size_t reconnects_counter = 0;
  563. size_t sent_bytes = 0;
  564. size_t sent_bytes_on_this_connection = 0;
  565. size_t send_attempts = 0;
  566. time_t last_sent_t = 0;
  567. struct pollfd fds[2], *ifd, *ofd;
  568. nfds_t fdmax;
  569. ifd = &fds[0];
  570. ofd = &fds[1];
  571. size_t not_connected_loops = 0;
  572. netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, host);
  573. for(; host->rrdpush_send_enabled && !netdata_exit ;) {
  574. // check for outstanding cancellation requests
  575. netdata_thread_testcancel();
  576. // if we don't have socket open, lets wait a bit
  577. if(unlikely(host->rrdpush_sender_socket == -1)) {
  578. send_attempts = 0;
  579. if(not_connected_loops == 0 && sent_bytes_on_this_connection > 0) {
  580. // fast re-connection on first disconnect
  581. sleep_usec(USEC_PER_MS * 500); // milliseconds
  582. }
  583. else {
  584. // slow re-connection on repeating errors
  585. sleep_usec(USEC_PER_SEC * reconnect_delay); // seconds
  586. }
  587. if(rrdpush_sender_thread_connect_to_master(host, default_port, timeout, &reconnects_counter, connected_to, CONNECTED_TO_SIZE)) {
  588. last_sent_t = now_monotonic_sec();
  589. // reset the buffer, to properly send charts and metrics
  590. rrdpush_sender_thread_data_flush(host);
  591. // send from the beginning
  592. begin = 0;
  593. // make sure the next reconnection will be immediate
  594. not_connected_loops = 0;
  595. // reset the bytes we have sent for this session
  596. sent_bytes_on_this_connection = 0;
  597. // let the data collection threads know we are ready
  598. host->rrdpush_sender_connected = 1;
  599. }
  600. else {
  601. // increase the failed connections counter
  602. not_connected_loops++;
  603. // reset the number of bytes sent
  604. sent_bytes_on_this_connection = 0;
  605. }
  606. // loop through
  607. continue;
  608. }
  609. else if(unlikely(now_monotonic_sec() - last_sent_t > timeout)) {
  610. error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", host->hostname, connected_to, timeout, sent_bytes_on_this_connection, send_attempts);
  611. rrdpush_sender_thread_close_socket(host);
  612. }
  613. ifd->fd = host->rrdpush_sender_pipe[PIPE_READ];
  614. ifd->events = POLLIN;
  615. ifd->revents = 0;
  616. ofd->fd = host->rrdpush_sender_socket;
  617. ofd->revents = 0;
  618. if(ofd->fd != -1 && begin < buffer_strlen(host->rrdpush_sender_buffer)) {
  619. debug(D_STREAM, "STREAM: Requesting data output on streaming socket %d...", ofd->fd);
  620. ofd->events = POLLOUT;
  621. fdmax = 2;
  622. send_attempts++;
  623. }
  624. else {
  625. debug(D_STREAM, "STREAM: Not requesting data output on streaming socket %d (nothing to send now)...", ofd->fd);
  626. ofd->events = 0;
  627. fdmax = 1;
  628. }
  629. debug(D_STREAM, "STREAM: Waiting for poll() events (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer));
  630. if(unlikely(netdata_exit)) break;
  631. int retval = poll(fds, fdmax, 1000);
  632. if(unlikely(netdata_exit)) break;
  633. if(unlikely(retval == -1)) {
  634. debug(D_STREAM, "STREAM: poll() failed (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer));
  635. if(errno == EAGAIN || errno == EINTR) {
  636. debug(D_STREAM, "STREAM: poll() failed with EAGAIN or EINTR...");
  637. }
  638. else {
  639. error("STREAM %s [send to %s]: failed to poll(). Closing socket.", host->hostname, connected_to);
  640. rrdpush_sender_thread_close_socket(host);
  641. }
  642. continue;
  643. }
  644. else if(likely(retval)) {
  645. if (ifd->revents & POLLIN || ifd->revents & POLLPRI) {
  646. debug(D_STREAM, "STREAM: Data added to send buffer (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer));
  647. char buffer[1000 + 1];
  648. if (read(host->rrdpush_sender_pipe[PIPE_READ], buffer, 1000) == -1)
  649. error("STREAM %s [send to %s]: cannot read from internal pipe.", host->hostname, connected_to);
  650. }
  651. if (ofd->revents & POLLOUT) {
  652. if (begin < buffer_strlen(host->rrdpush_sender_buffer)) {
  653. debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes, begin = %zu)...", buffer_strlen(host->rrdpush_sender_buffer), begin);
  654. // BEGIN RRDPUSH LOCKED SESSION
  655. // during this session, data collectors
  656. // will not be able to append data to our buffer
  657. // but the socket is in non-blocking mode
  658. // so, we will not block at send()
  659. netdata_thread_disable_cancelability();
  660. debug(D_STREAM, "STREAM: Getting exclusive lock on host...");
  661. rrdpush_buffer_lock(host);
  662. debug(D_STREAM, "STREAM: Sending data, starting from %zu, size %zu...", begin, buffer_strlen(host->rrdpush_sender_buffer));
  663. ssize_t ret;
  664. #ifdef ENABLE_HTTPS
  665. SSL *conn = host->ssl.conn ;
  666. if(conn && !host->ssl.flags) {
  667. ret = SSL_write(conn,&host->rrdpush_sender_buffer->buffer[begin], buffer_strlen(host->rrdpush_sender_buffer) - begin);
  668. } else {
  669. ret = send(host->rrdpush_sender_socket, &host->rrdpush_sender_buffer->buffer[begin], buffer_strlen(host->rrdpush_sender_buffer) - begin, MSG_DONTWAIT);
  670. }
  671. #else
  672. ret = send(host->rrdpush_sender_socket, &host->rrdpush_sender_buffer->buffer[begin], buffer_strlen(host->rrdpush_sender_buffer) - begin, MSG_DONTWAIT);
  673. #endif
  674. if (unlikely(ret == -1)) {
  675. if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) {
  676. debug(D_STREAM, "STREAM: Send failed - closing socket...");
  677. error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_bytes_on_this_connection);
  678. rrdpush_sender_thread_close_socket(host);
  679. }
  680. else {
  681. debug(D_STREAM, "STREAM: Send failed - will retry...");
  682. }
  683. }
  684. else if (likely(ret > 0)) {
  685. // DEBUG - dump the string to see it
  686. //char c = host->rrdpush_sender_buffer->buffer[begin + ret];
  687. //host->rrdpush_sender_buffer->buffer[begin + ret] = '\0';
  688. //debug(D_STREAM, "STREAM: sent from %zu to %zd:\n%s\n", begin, ret, &host->rrdpush_sender_buffer->buffer[begin]);
  689. //host->rrdpush_sender_buffer->buffer[begin + ret] = c;
  690. sent_bytes_on_this_connection += ret;
  691. sent_bytes += ret;
  692. begin += ret;
  693. if (begin == buffer_strlen(host->rrdpush_sender_buffer)) {
  694. // we send it all
  695. debug(D_STREAM, "STREAM: Sent %zd bytes (the whole buffer)...", ret);
  696. buffer_flush(host->rrdpush_sender_buffer);
  697. begin = 0;
  698. }
  699. else {
  700. debug(D_STREAM, "STREAM: Sent %zd bytes (part of the data buffer)...", ret);
  701. }
  702. last_sent_t = now_monotonic_sec();
  703. }
  704. else {
  705. debug(D_STREAM, "STREAM: send() returned %zd - closing the socket...", ret);
  706. error("STREAM %s [send to %s]: failed to send metrics (send() returned %zd) - closing connection - we have sent %zu bytes on this connection.",
  707. host->hostname, connected_to, ret, sent_bytes_on_this_connection);
  708. rrdpush_sender_thread_close_socket(host);
  709. }
  710. debug(D_STREAM, "STREAM: Releasing exclusive lock on host...");
  711. rrdpush_buffer_unlock(host);
  712. netdata_thread_enable_cancelability();
  713. // END RRDPUSH LOCKED SESSION
  714. }
  715. else {
  716. debug(D_STREAM, "STREAM: we have sent the entire buffer, but we received POLLOUT...");
  717. }
  718. }
  719. if(host->rrdpush_sender_socket != -1) {
  720. char *error = NULL;
  721. if (unlikely(ofd->revents & POLLERR))
  722. error = "socket reports errors (POLLERR)";
  723. else if (unlikely(ofd->revents & POLLHUP))
  724. error = "connection closed by remote end (POLLHUP)";
  725. else if (unlikely(ofd->revents & POLLNVAL))
  726. error = "connection is invalid (POLLNVAL)";
  727. if(unlikely(error)) {
  728. debug(D_STREAM, "STREAM: %s - closing socket...", error);
  729. error("STREAM %s [send to %s]: %s - reopening socket - we have sent %zu bytes on this connection.", host->hostname, connected_to, error, sent_bytes_on_this_connection);
  730. rrdpush_sender_thread_close_socket(host);
  731. }
  732. }
  733. }
  734. else {
  735. debug(D_STREAM, "STREAM: poll() timed out.");
  736. }
  737. // protection from overflow
  738. if(buffer_strlen(host->rrdpush_sender_buffer) > max_size) {
  739. debug(D_STREAM, "STREAM: Buffer is too big (%zu bytes), bigger than the max (%zu) - flushing it...", buffer_strlen(host->rrdpush_sender_buffer), max_size);
  740. errno = 0;
  741. error("STREAM %s [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", host->hostname, connected_to, host->rrdpush_sender_buffer->len, host->rrdpush_sender_buffer->len - begin, sent_bytes, sent_bytes_on_this_connection);
  742. rrdpush_sender_thread_close_socket(host);
  743. }
  744. }
  745. netdata_thread_cleanup_pop(1);
  746. return NULL;
  747. }
  748. // ----------------------------------------------------------------------------
  749. // rrdpush receiver thread
  750. static void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
  751. log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
  752. }
  753. static RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY get_multiple_connections_strategy(struct config *c, const char *section, const char *name, RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY def) {
  754. char *value;
  755. switch(def) {
  756. default:
  757. case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
  758. value = "allow";
  759. break;
  760. case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
  761. value = "deny";
  762. break;
  763. }
  764. value = appconfig_get(c, section, name, value);
  765. RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY ret = def;
  766. if(strcasecmp(value, "allow") == 0 || strcasecmp(value, "permit") == 0 || strcasecmp(value, "accept") == 0)
  767. ret = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
  768. else if(strcasecmp(value, "deny") == 0 || strcasecmp(value, "reject") == 0 || strcasecmp(value, "block") == 0)
  769. ret = RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW;
  770. else
  771. error("Invalid stream config value at section [%s], setting '%s', value '%s'", section, name, value);
  772. return ret;
  773. }
  774. static int rrdpush_receive(int fd
  775. , const char *key
  776. , const char *hostname
  777. , const char *registry_hostname
  778. , const char *machine_guid
  779. , const char *os
  780. , const char *timezone
  781. , const char *tags
  782. , const char *program_name
  783. , const char *program_version
  784. , struct rrdhost_system_info *system_info
  785. , int update_every
  786. , char *client_ip
  787. , char *client_port
  788. #ifdef ENABLE_HTTPS
  789. , struct netdata_ssl *ssl
  790. #endif
  791. ) {
  792. RRDHOST *host;
  793. int history = default_rrd_history_entries;
  794. RRD_MEMORY_MODE mode = default_rrd_memory_mode;
  795. int health_enabled = default_health_enabled;
  796. int rrdpush_enabled = default_rrdpush_enabled;
  797. char *rrdpush_destination = default_rrdpush_destination;
  798. char *rrdpush_api_key = default_rrdpush_api_key;
  799. char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
  800. time_t alarms_delay = 60;
  801. RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY rrdpush_multiple_connections_strategy = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
  802. update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
  803. if(update_every < 0) update_every = 1;
  804. history = (int)appconfig_get_number(&stream_config, key, "default history", history);
  805. history = (int)appconfig_get_number(&stream_config, machine_guid, "history", history);
  806. if(history < 5) history = 5;
  807. mode = rrd_memory_mode_id(appconfig_get(&stream_config, key, "default memory mode", rrd_memory_mode_name(mode)));
  808. mode = rrd_memory_mode_id(appconfig_get(&stream_config, machine_guid, "memory mode", rrd_memory_mode_name(mode)));
  809. health_enabled = appconfig_get_boolean_ondemand(&stream_config, key, "health enabled by default", health_enabled);
  810. health_enabled = appconfig_get_boolean_ondemand(&stream_config, machine_guid, "health enabled", health_enabled);
  811. alarms_delay = appconfig_get_number(&stream_config, key, "default postpone alarms on connect seconds", alarms_delay);
  812. alarms_delay = appconfig_get_number(&stream_config, machine_guid, "postpone alarms on connect seconds", alarms_delay);
  813. rrdpush_enabled = appconfig_get_boolean(&stream_config, key, "default proxy enabled", rrdpush_enabled);
  814. rrdpush_enabled = appconfig_get_boolean(&stream_config, machine_guid, "proxy enabled", rrdpush_enabled);
  815. rrdpush_destination = appconfig_get(&stream_config, key, "default proxy destination", rrdpush_destination);
  816. rrdpush_destination = appconfig_get(&stream_config, machine_guid, "proxy destination", rrdpush_destination);
  817. rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key);
  818. rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key);
  819. rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, key, "multiple connections", rrdpush_multiple_connections_strategy);
  820. rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, machine_guid, "multiple connections", rrdpush_multiple_connections_strategy);
  821. rrdpush_send_charts_matching = appconfig_get(&stream_config, key, "default proxy send charts matching", rrdpush_send_charts_matching);
  822. rrdpush_send_charts_matching = appconfig_get(&stream_config, machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
  823. tags = appconfig_set_default(&stream_config, machine_guid, "host tags", (tags)?tags:"");
  824. if(tags && !*tags) tags = NULL;
  825. if (strcmp(machine_guid, localhost->machine_guid) == 0) {
  826. log_stream_connection(client_ip, client_port, key, machine_guid, hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO MASTER");
  827. error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the master/proxy machine guid to a slave?", hostname, client_ip, client_port, machine_guid);
  828. close(fd);
  829. return 1;
  830. }
  831. else
  832. host = rrdhost_find_or_create(
  833. hostname
  834. , registry_hostname
  835. , machine_guid
  836. , os
  837. , timezone
  838. , tags
  839. , program_name
  840. , program_version
  841. , update_every
  842. , history
  843. , mode
  844. , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
  845. , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
  846. , rrdpush_destination
  847. , rrdpush_api_key
  848. , rrdpush_send_charts_matching
  849. , system_info
  850. );
  851. if(!host) {
  852. close(fd);
  853. log_stream_connection(client_ip, client_port, key, machine_guid, hostname, "FAILED - CANNOT ACQUIRE HOST");
  854. error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", hostname, client_ip, client_port);
  855. return 1;
  856. }
  857. #ifdef NETDATA_INTERNAL_CHECKS
  858. info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s, tags '%s'"
  859. , hostname
  860. , client_ip
  861. , client_port
  862. , host->hostname
  863. , host->machine_guid
  864. , host->rrd_update_every
  865. , host->rrd_history_entries
  866. , rrd_memory_mode_name(host->rrd_memory_mode)
  867. , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
  868. , host->tags?host->tags:""
  869. );
  870. #endif // NETDATA_INTERNAL_CHECKS
  871. struct plugind cd = {
  872. .enabled = 1,
  873. .update_every = default_rrd_update_every,
  874. .pid = 0,
  875. .serial_failures = 0,
  876. .successful_collections = 0,
  877. .obsolete = 0,
  878. .started_t = now_realtime_sec(),
  879. .next = NULL,
  880. };
  881. // put the client IP and port into the buffers used by plugins.d
  882. snprintfz(cd.id, CONFIG_MAX_NAME, "%s:%s", client_ip, client_port);
  883. snprintfz(cd.filename, FILENAME_MAX, "%s:%s", client_ip, client_port);
  884. snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", client_ip, client_port);
  885. snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port);
  886. info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port);
  887. #ifdef ENABLE_HTTPS
  888. host->stream_ssl.conn = ssl->conn;
  889. host->stream_ssl.flags = ssl->flags;
  890. if(send_timeout(ssl,fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
  891. #else
  892. if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
  893. #endif
  894. log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "FAILED - CANNOT REPLY");
  895. error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port);
  896. close(fd);
  897. return 0;
  898. }
  899. // remove the non-blocking flag from the socket
  900. if(sock_delnonblock(fd) < 0)
  901. error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", host->hostname, client_ip, client_port, fd);
  902. // convert the socket to a FILE *
  903. FILE *fp = fdopen(fd, "r");
  904. if(!fp) {
  905. log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "FAILED - SOCKET ERROR");
  906. error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", host->hostname, client_ip, client_port, fd);
  907. close(fd);
  908. return 0;
  909. }
  910. rrdhost_wrlock(host);
  911. if(host->connected_senders > 0) {
  912. switch(rrdpush_multiple_connections_strategy) {
  913. case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
  914. info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port);
  915. break;
  916. case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
  917. rrdhost_unlock(host);
  918. log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED");
  919. info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port);
  920. fclose(fp);
  921. return 0;
  922. }
  923. }
  924. rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
  925. host->connected_senders++;
  926. host->senders_disconnected_time = 0;
  927. if(health_enabled != CONFIG_BOOLEAN_NO) {
  928. if(alarms_delay > 0) {
  929. host->health_delay_up_to = now_realtime_sec() + alarms_delay;
  930. info("Postponing health checks for %ld seconds, on host '%s', because it was just connected."
  931. , alarms_delay
  932. , host->hostname
  933. );
  934. }
  935. }
  936. rrdhost_unlock(host);
  937. // call the plugins.d processor to receive the metrics
  938. info("STREAM %s [receive from [%s]:%s]: receiving metrics...", host->hostname, client_ip, client_port);
  939. log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "CONNECTED");
  940. size_t count = pluginsd_process(host, &cd, fp, 1);
  941. log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "DISCONNECTED");
  942. error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", host->hostname, client_ip, client_port, count);
  943. rrdhost_wrlock(host);
  944. host->senders_disconnected_time = now_realtime_sec();
  945. host->connected_senders--;
  946. if(!host->connected_senders) {
  947. rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN);
  948. if(health_enabled == CONFIG_BOOLEAN_AUTO)
  949. host->health_enabled = 0;
  950. }
  951. rrdhost_unlock(host);
  952. if(host->connected_senders == 0)
  953. rrdpush_sender_thread_stop(host);
  954. // cleanup
  955. fclose(fp);
  956. return (int)count;
  957. }
  958. struct rrdpush_thread {
  959. int fd;
  960. char *key;
  961. char *hostname;
  962. char *registry_hostname;
  963. char *machine_guid;
  964. char *os;
  965. char *timezone;
  966. char *tags;
  967. char *client_ip;
  968. char *client_port;
  969. char *program_name;
  970. char *program_version;
  971. struct rrdhost_system_info *system_info;
  972. int update_every;
  973. #ifdef ENABLE_HTTPS
  974. struct netdata_ssl ssl;
  975. #endif
  976. };
  977. static void rrdpush_receiver_thread_cleanup(void *ptr) {
  978. static __thread int executed = 0;
  979. if(!executed) {
  980. executed = 1;
  981. struct rrdpush_thread *rpt = (struct rrdpush_thread *) ptr;
  982. info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
  983. freez(rpt->key);
  984. freez(rpt->hostname);
  985. freez(rpt->registry_hostname);
  986. freez(rpt->machine_guid);
  987. freez(rpt->os);
  988. freez(rpt->timezone);
  989. freez(rpt->tags);
  990. freez(rpt->client_ip);
  991. freez(rpt->client_port);
  992. freez(rpt->program_name);
  993. freez(rpt->program_version);
  994. #ifdef ENABLE_HTTPS
  995. if(rpt->ssl.conn){
  996. SSL_free(rpt->ssl.conn);
  997. }
  998. #endif
  999. freez(rpt);
  1000. }
  1001. }
  1002. static void *rrdpush_receiver_thread(void *ptr) {
  1003. netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
  1004. struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr;
  1005. info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
  1006. rrdpush_receive(
  1007. rpt->fd
  1008. , rpt->key
  1009. , rpt->hostname
  1010. , rpt->registry_hostname
  1011. , rpt->machine_guid
  1012. , rpt->os
  1013. , rpt->timezone
  1014. , rpt->tags
  1015. , rpt->program_name
  1016. , rpt->program_version
  1017. , rpt->system_info
  1018. , rpt->update_every
  1019. , rpt->client_ip
  1020. , rpt->client_port
  1021. #ifdef ENABLE_HTTPS
  1022. , &rpt->ssl
  1023. #endif
  1024. );
  1025. netdata_thread_cleanup_pop(1);
  1026. return NULL;
  1027. }
  1028. static void rrdpush_sender_thread_spawn(RRDHOST *host) {
  1029. rrdhost_wrlock(host);
  1030. if(!host->rrdpush_sender_spawn) {
  1031. char tag[NETDATA_THREAD_TAG_MAX + 1];
  1032. snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", host->hostname);
  1033. if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host))
  1034. error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
  1035. else
  1036. host->rrdpush_sender_spawn = 1;
  1037. }
  1038. rrdhost_unlock(host);
  1039. }
  1040. int rrdpush_receiver_permission_denied(struct web_client *w) {
  1041. // we always respond with the same message and error code
  1042. // to prevent an attacker from gaining info about the error
  1043. buffer_flush(w->response.data);
  1044. buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info.");
  1045. return 401;
  1046. }
  1047. int rrdpush_receiver_too_busy_now(struct web_client *w) {
  1048. // we always respond with the same message and error code
  1049. // to prevent an attacker from gaining info about the error
  1050. buffer_flush(w->response.data);
  1051. buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later.");
  1052. return 503;
  1053. }
  1054. int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) {
  1055. (void)host;
  1056. info("clients wants to STREAM metrics.");
  1057. char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL;
  1058. int update_every = default_rrd_update_every;
  1059. char buf[GUID_LEN + 1];
  1060. struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
  1061. while(url) {
  1062. char *value = mystrsep(&url, "&");
  1063. if(!value || !*value) continue;
  1064. char *name = mystrsep(&value, "=");
  1065. if(!name || !*name) continue;
  1066. if(!value || !*value) continue;
  1067. if(!strcmp(name, "key"))
  1068. key = value;
  1069. else if(!strcmp(name, "hostname"))
  1070. hostname = value;
  1071. else if(!strcmp(name, "registry_hostname"))
  1072. registry_hostname = value;
  1073. else if(!strcmp(name, "machine_guid"))
  1074. machine_guid = value;
  1075. else if(!strcmp(name, "update_every"))
  1076. update_every = (int)strtoul(value, NULL, 0);
  1077. else if(!strcmp(name, "os"))
  1078. os = value;
  1079. else if(!strcmp(name, "timezone"))
  1080. timezone = value;
  1081. else if(!strcmp(name, "tags"))
  1082. tags = value;
  1083. else
  1084. if(unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
  1085. info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", w->client_ip, w->client_port, key, value);
  1086. }
  1087. }
  1088. if(!key || !*key) {
  1089. rrdhost_system_info_free(system_info);
  1090. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO KEY");
  1091. error("STREAM [receive from [%s]:%s]: request without an API key. Forbidding access.", w->client_ip, w->client_port);
  1092. return rrdpush_receiver_permission_denied(w);
  1093. }
  1094. if(!hostname || !*hostname) {
  1095. rrdhost_system_info_free(system_info);
  1096. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO HOSTNAME");
  1097. error("STREAM [receive from [%s]:%s]: request without a hostname. Forbidding access.", w->client_ip, w->client_port);
  1098. return rrdpush_receiver_permission_denied(w);
  1099. }
  1100. if(!machine_guid || !*machine_guid) {
  1101. rrdhost_system_info_free(system_info);
  1102. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO MACHINE GUID");
  1103. error("STREAM [receive from [%s]:%s]: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port);
  1104. return rrdpush_receiver_permission_denied(w);
  1105. }
  1106. if(regenerate_guid(key, buf) == -1) {
  1107. rrdhost_system_info_free(system_info);
  1108. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - INVALID KEY");
  1109. error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key);
  1110. return rrdpush_receiver_permission_denied(w);
  1111. }
  1112. if(regenerate_guid(machine_guid, buf) == -1) {
  1113. rrdhost_system_info_free(system_info);
  1114. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - INVALID MACHINE GUID");
  1115. error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid);
  1116. return rrdpush_receiver_permission_denied(w);
  1117. }
  1118. if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) {
  1119. rrdhost_system_info_free(system_info);
  1120. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - KEY NOT ENABLED");
  1121. error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
  1122. return rrdpush_receiver_permission_denied(w);
  1123. }
  1124. {
  1125. SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
  1126. if(key_allow_from) {
  1127. if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
  1128. simple_pattern_free(key_allow_from);
  1129. rrdhost_system_info_free(system_info);
  1130. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname) ? hostname : "-", "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP");
  1131. error("STREAM [receive from [%s]:%s]: API key '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, key);
  1132. return rrdpush_receiver_permission_denied(w);
  1133. }
  1134. simple_pattern_free(key_allow_from);
  1135. }
  1136. }
  1137. if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) {
  1138. rrdhost_system_info_free(system_info);
  1139. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - MACHINE GUID NOT ENABLED");
  1140. error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
  1141. return rrdpush_receiver_permission_denied(w);
  1142. }
  1143. {
  1144. SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
  1145. if(machine_allow_from) {
  1146. if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
  1147. simple_pattern_free(machine_allow_from);
  1148. rrdhost_system_info_free(system_info);
  1149. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname) ? hostname : "-", "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP");
  1150. error("STREAM [receive from [%s]:%s]: Machine GUID '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, machine_guid);
  1151. return rrdpush_receiver_permission_denied(w);
  1152. }
  1153. simple_pattern_free(machine_allow_from);
  1154. }
  1155. }
  1156. if(unlikely(web_client_streaming_rate_t > 0)) {
  1157. static netdata_mutex_t stream_rate_mutex = NETDATA_MUTEX_INITIALIZER;
  1158. static volatile time_t last_stream_accepted_t = 0;
  1159. netdata_mutex_lock(&stream_rate_mutex);
  1160. time_t now = now_realtime_sec();
  1161. if(unlikely(last_stream_accepted_t == 0))
  1162. last_stream_accepted_t = now;
  1163. if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
  1164. netdata_mutex_unlock(&stream_rate_mutex);
  1165. rrdhost_system_info_free(system_info);
  1166. error("STREAM [receive from [%s]:%s]: too busy to accept new streaming request. Will be allowed in %ld secs.", w->client_ip, w->client_port, (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
  1167. return rrdpush_receiver_too_busy_now(w);
  1168. }
  1169. last_stream_accepted_t = now;
  1170. netdata_mutex_unlock(&stream_rate_mutex);
  1171. }
  1172. struct rrdpush_thread *rpt = callocz(1, sizeof(struct rrdpush_thread));
  1173. rpt->fd = w->ifd;
  1174. rpt->key = strdupz(key);
  1175. rpt->hostname = strdupz(hostname);
  1176. rpt->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname);
  1177. rpt->machine_guid = strdupz(machine_guid);
  1178. rpt->os = strdupz(os);
  1179. rpt->timezone = strdupz(timezone);
  1180. rpt->tags = (tags)?strdupz(tags):NULL;
  1181. rpt->client_ip = strdupz(w->client_ip);
  1182. rpt->client_port = strdupz(w->client_port);
  1183. rpt->update_every = update_every;
  1184. rpt->system_info = system_info;
  1185. #ifdef ENABLE_HTTPS
  1186. rpt->ssl.conn = w->ssl.conn;
  1187. rpt->ssl.flags = w->ssl.flags;
  1188. w->ssl.conn = NULL;
  1189. w->ssl.flags = NETDATA_SSL_START;
  1190. #endif
  1191. if(w->user_agent && w->user_agent[0]) {
  1192. char *t = strchr(w->user_agent, '/');
  1193. if(t && *t) {
  1194. *t = '\0';
  1195. t++;
  1196. }
  1197. rpt->program_name = strdupz(w->user_agent);
  1198. if(t && *t) rpt->program_version = strdupz(t);
  1199. }
  1200. netdata_thread_t thread;
  1201. debug(D_SYSTEM, "starting STREAM receive thread.");
  1202. char tag[FILENAME_MAX + 1];
  1203. snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
  1204. if(netdata_thread_create(&thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
  1205. error("Failed to create new STREAM receive thread for client.");
  1206. // prevent the caller from closing the streaming socket
  1207. if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) {
  1208. web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET);
  1209. }
  1210. else {
  1211. if(w->ifd == w->ofd)
  1212. w->ifd = w->ofd = -1;
  1213. else
  1214. w->ifd = -1;
  1215. }
  1216. buffer_flush(w->response.data);
  1217. return 200;
  1218. }