rrdpush.c 56 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdpush.h"
  3. /*
  4. * rrdpush
  5. *
  6. * 3 threads are involved for all stream operations
  7. *
  8. * 1. a random data collection thread, calling rrdset_done_push()
  9. * this is called for each chart.
  10. *
  11. * the output of this work is kept in a BUFFER in RRDHOST
  12. * the sender thread is signalled via a pipe (also in RRDHOST)
  13. *
  14. * 2. a sender thread running at the sending netdata
  15. * this is spawned automatically on the first chart to be pushed
  16. *
  17. * It tries to push the metrics to the remote netdata, as fast
  18. * as possible (i.e. immediately after they are collected).
  19. *
  20. * 3. a receiver thread, running at the receiving netdata
  21. * this is spawned automatically when the sender connects to
  22. * the receiver.
  23. *
  24. */
  25. #define START_STREAMING_PROMPT "Hit me baby, push them over..."
  26. typedef enum {
  27. RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
  28. RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
  29. } RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
  30. static struct config stream_config = {
  31. .sections = NULL,
  32. .mutex = NETDATA_MUTEX_INITIALIZER,
  33. .index = {
  34. .avl_tree = {
  35. .root = NULL,
  36. .compar = appconfig_section_compare
  37. },
  38. .rwlock = AVL_LOCK_INITIALIZER
  39. }
  40. };
  41. unsigned int default_rrdpush_enabled = 0;
  42. char *default_rrdpush_destination = NULL;
  43. char *default_rrdpush_api_key = NULL;
  44. char *default_rrdpush_send_charts_matching = NULL;
  45. static void load_stream_conf() {
  46. errno = 0;
  47. char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
  48. if(!appconfig_load(&stream_config, filename, 0)) {
  49. info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
  50. freez(filename);
  51. filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
  52. if(!appconfig_load(&stream_config, filename, 0))
  53. info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
  54. }
  55. freez(filename);
  56. }
  57. int rrdpush_init() {
  58. // --------------------------------------------------------------------
  59. // load stream.conf
  60. load_stream_conf();
  61. default_rrdpush_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
  62. default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
  63. default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
  64. default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
  65. rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
  66. if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
  67. error("STREAM [send]: cannot enable sending thread - information is missing.");
  68. default_rrdpush_enabled = 0;
  69. }
  70. return default_rrdpush_enabled;
  71. }
  72. #define CONNECTED_TO_SIZE 100
  73. // data collection happens from multiple threads
  74. // each of these threads calls rrdset_done()
  75. // which in turn calls rrdset_done_push()
  76. // which uses this pipe to notify the streaming thread
  77. // that there are more data ready to be sent
  78. #define PIPE_READ 0
  79. #define PIPE_WRITE 1
  80. // to have the remote netdata re-sync the charts
  81. // to its current clock, we send for this many
  82. // iterations a BEGIN line without microseconds
  83. // this is for the first iterations of each chart
  84. unsigned int remote_clock_resync_iterations = 60;
  85. #define rrdpush_buffer_lock(host) netdata_mutex_lock(&((host)->rrdpush_sender_buffer_mutex))
  86. #define rrdpush_buffer_unlock(host) netdata_mutex_unlock(&((host)->rrdpush_sender_buffer_mutex))
  87. static inline int should_send_chart_matching(RRDSET *st) {
  88. if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) {
  89. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
  90. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
  91. }
  92. else if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) {
  93. RRDHOST *host = st->rrdhost;
  94. if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) ||
  95. simple_pattern_matches(host->rrdpush_send_charts_matching, st->name)) {
  96. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
  97. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
  98. }
  99. else {
  100. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
  101. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
  102. }
  103. }
  104. return(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND));
  105. }
  106. // checks if the current chart definition has been sent
  107. static inline int need_to_send_chart_definition(RRDSET *st) {
  108. rrdset_check_rdlock(st);
  109. if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))))
  110. return 1;
  111. RRDDIM *rd;
  112. rrddim_foreach_read(rd, st) {
  113. if(unlikely(!rd->exposed)) {
  114. #ifdef NETDATA_INTERNAL_CHECKS
  115. info("host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", st->rrdhost->hostname, st->id, rd->id);
  116. #endif
  117. return 1;
  118. }
  119. }
  120. return 0;
  121. }
  122. // sends the current chart definition
  123. static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
  124. RRDHOST *host = st->rrdhost;
  125. rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
  126. // properly set the name for the remote end to parse it
  127. char *name = "";
  128. if(likely(st->name)) {
  129. if(unlikely(strcmp(st->id, st->name))) {
  130. // they differ
  131. name = strchr(st->name, '.');
  132. if(name)
  133. name++;
  134. else
  135. name = "";
  136. }
  137. }
  138. // info("CHART '%s' '%s'", st->id, name);
  139. // send the chart
  140. buffer_sprintf(
  141. host->rrdpush_sender_buffer
  142. , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
  143. , st->id
  144. , name
  145. , st->title
  146. , st->units
  147. , st->family
  148. , st->context
  149. , rrdset_type_name(st->chart_type)
  150. , st->priority
  151. , st->update_every
  152. , rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)?"obsolete":""
  153. , rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":""
  154. , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
  155. , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":""
  156. , (st->plugin_name)?st->plugin_name:""
  157. , (st->module_name)?st->module_name:""
  158. );
  159. // send the dimensions
  160. RRDDIM *rd;
  161. rrddim_foreach_read(rd, st) {
  162. buffer_sprintf(
  163. host->rrdpush_sender_buffer
  164. , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
  165. , rd->id
  166. , rd->name
  167. , rrd_algorithm_name(rd->algorithm)
  168. , rd->multiplier
  169. , rd->divisor
  170. , rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)?"obsolete":""
  171. , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
  172. , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
  173. );
  174. rd->exposed = 1;
  175. }
  176. // send the chart local custom variables
  177. RRDSETVAR *rs;
  178. for(rs = st->variables; rs ;rs = rs->next) {
  179. if(unlikely(rs->type == RRDVAR_TYPE_CALCULATED && rs->options & RRDVAR_OPTION_CUSTOM_CHART_VAR)) {
  180. calculated_number *value = (calculated_number *) rs->value;
  181. buffer_sprintf(
  182. host->rrdpush_sender_buffer
  183. , "VARIABLE CHART %s = " CALCULATED_NUMBER_FORMAT "\n"
  184. , rs->variable
  185. , *value
  186. );
  187. }
  188. }
  189. st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
  190. }
  191. // sends the current chart dimensions
  192. static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st) {
  193. RRDHOST *host = st->rrdhost;
  194. buffer_sprintf(host->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
  195. RRDDIM *rd;
  196. rrddim_foreach_read(rd, st) {
  197. if(rd->updated && rd->exposed)
  198. buffer_sprintf(host->rrdpush_sender_buffer
  199. , "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n"
  200. , rd->id
  201. , rd->collected_value
  202. );
  203. }
  204. buffer_strcat(host->rrdpush_sender_buffer, "END\n");
  205. }
  206. static void rrdpush_sender_thread_spawn(RRDHOST *host);
  207. void rrdset_push_chart_definition_now(RRDSET *st) {
  208. RRDHOST *host = st->rrdhost;
  209. if(unlikely(!host->rrdpush_send_enabled || !should_send_chart_matching(st)))
  210. return;
  211. rrdset_rdlock(st);
  212. rrdpush_buffer_lock(host);
  213. rrdpush_send_chart_definition_nolock(st);
  214. rrdpush_buffer_unlock(host);
  215. rrdset_unlock(st);
  216. }
  217. void rrdset_done_push(RRDSET *st) {
  218. if(unlikely(!should_send_chart_matching(st)))
  219. return;
  220. RRDHOST *host = st->rrdhost;
  221. rrdpush_buffer_lock(host);
  222. if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn))
  223. rrdpush_sender_thread_spawn(host);
  224. if(unlikely(!host->rrdpush_sender_buffer || !host->rrdpush_sender_connected)) {
  225. if(unlikely(!host->rrdpush_sender_error_shown))
  226. error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname);
  227. host->rrdpush_sender_error_shown = 1;
  228. rrdpush_buffer_unlock(host);
  229. return;
  230. }
  231. else if(unlikely(host->rrdpush_sender_error_shown)) {
  232. info("STREAM %s [send]: sending metrics...", host->hostname);
  233. host->rrdpush_sender_error_shown = 0;
  234. }
  235. if(need_to_send_chart_definition(st))
  236. rrdpush_send_chart_definition_nolock(st);
  237. rrdpush_send_chart_metrics_nolock(st);
  238. // signal the sender there are more data
  239. if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
  240. error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
  241. rrdpush_buffer_unlock(host);
  242. }
  243. // ----------------------------------------------------------------------------
  244. // rrdpush sender thread
  245. static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) {
  246. calculated_number *value = (calculated_number *)rv->value;
  247. buffer_sprintf(
  248. host->rrdpush_sender_buffer
  249. , "VARIABLE HOST %s = " CALCULATED_NUMBER_FORMAT "\n"
  250. , rv->name
  251. , *value
  252. );
  253. debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " CALCULATED_NUMBER_FORMAT, rv->name, *value);
  254. }
  255. void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) {
  256. if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && host->rrdpush_sender_connected) {
  257. rrdpush_buffer_lock(host);
  258. rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
  259. rrdpush_buffer_unlock(host);
  260. }
  261. }
  262. static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr, void *host_ptr) {
  263. RRDVAR *rv = (RRDVAR *)rrdvar_ptr;
  264. RRDHOST *host = (RRDHOST *)host_ptr;
  265. if(unlikely(rv->options & RRDVAR_OPTION_CUSTOM_HOST_VAR && rv->type == RRDVAR_TYPE_CALCULATED)) {
  266. rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv);
  267. // return 1, so that the traversal will return the number of variables sent
  268. return 1;
  269. }
  270. // returning a negative number will break the traversal
  271. return 0;
  272. }
  273. static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
  274. int ret = rrdvar_callback_for_all_host_variables(host, rrdpush_sender_thread_custom_host_variables_callback, host);
  275. (void)ret;
  276. debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
  277. }
  278. // resets all the chart, so that their definitions
  279. // will be resent to the central netdata
  280. static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
  281. rrdhost_rdlock(host);
  282. RRDSET *st;
  283. rrdset_foreach_read(st, host) {
  284. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
  285. st->upstream_resync_time = 0;
  286. rrdset_rdlock(st);
  287. RRDDIM *rd;
  288. rrddim_foreach_read(rd, st)
  289. rd->exposed = 0;
  290. rrdset_unlock(st);
  291. }
  292. rrdhost_unlock(host);
  293. }
  294. static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
  295. rrdpush_buffer_lock(host);
  296. if(buffer_strlen(host->rrdpush_sender_buffer))
  297. error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, buffer_strlen(host->rrdpush_sender_buffer));
  298. buffer_flush(host->rrdpush_sender_buffer);
  299. rrdpush_sender_thread_reset_all_charts(host);
  300. rrdpush_sender_thread_send_custom_host_variables(host);
  301. rrdpush_buffer_unlock(host);
  302. }
  303. void rrdpush_sender_thread_stop(RRDHOST *host) {
  304. rrdpush_buffer_lock(host);
  305. rrdhost_wrlock(host);
  306. netdata_thread_t thr = 0;
  307. if(host->rrdpush_sender_spawn) {
  308. info("STREAM %s [send]: signaling sending thread to stop...", host->hostname);
  309. // signal the thread that we want to join it
  310. host->rrdpush_sender_join = 1;
  311. // copy the thread id, so that we will be waiting for the right one
  312. // even if a new one has been spawn
  313. thr = host->rrdpush_sender_thread;
  314. // signal it to cancel
  315. netdata_thread_cancel(host->rrdpush_sender_thread);
  316. }
  317. rrdhost_unlock(host);
  318. rrdpush_buffer_unlock(host);
  319. if(thr != 0) {
  320. info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname);
  321. void *result;
  322. netdata_thread_join(thr, &result);
  323. info("STREAM %s [send]: sending thread has exited.", host->hostname);
  324. }
  325. }
  326. static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
  327. host->rrdpush_sender_connected = 0;
  328. if(host->rrdpush_sender_socket != -1) {
  329. close(host->rrdpush_sender_socket);
  330. host->rrdpush_sender_socket = -1;
  331. }
  332. }
  333. static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_port, int timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size) {
  334. struct timeval tv = {
  335. .tv_sec = timeout,
  336. .tv_usec = 0
  337. };
  338. // make sure the socket is closed
  339. rrdpush_sender_thread_close_socket(host);
  340. debug(D_STREAM, "STREAM: Attempting to connect...");
  341. info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_send_destination);
  342. host->rrdpush_sender_socket = connect_to_one_of(
  343. host->rrdpush_send_destination
  344. , default_port
  345. , &tv
  346. , reconnects_counter
  347. , connected_to
  348. , connected_to_size
  349. );
  350. if(unlikely(host->rrdpush_sender_socket == -1)) {
  351. error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_send_destination);
  352. return 0;
  353. }
  354. info("STREAM %s [send to %s]: initializing communication...", host->hostname, connected_to);
  355. #define HTTP_HEADER_SIZE 8192
  356. char http[HTTP_HEADER_SIZE + 1];
  357. snprintfz(http, HTTP_HEADER_SIZE,
  358. "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s"
  359. "&NETDATA_SYSTEM_OS_NAME=%s"
  360. "&NETDATA_SYSTEM_OS_ID=%s"
  361. "&NETDATA_SYSTEM_OS_ID_LIKE=%s"
  362. "&NETDATA_SYSTEM_OS_VERSION=%s"
  363. "&NETDATA_SYSTEM_OS_VERSION_ID=%s"
  364. "&NETDATA_SYSTEM_OS_DETECTION=%s"
  365. "&NETDATA_SYSTEM_KERNEL_NAME=%s"
  366. "&NETDATA_SYSTEM_KERNEL_VERSION=%s"
  367. "&NETDATA_SYSTEM_ARCHITECTURE=%s"
  368. "&NETDATA_SYSTEM_VIRTUALIZATION=%s"
  369. "&NETDATA_SYSTEM_VIRT_DETECTION=%s"
  370. "&NETDATA_SYSTEM_CONTAINER=%s"
  371. "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
  372. " HTTP/1.1\r\n"
  373. "User-Agent: %s/%s\r\n"
  374. "Accept: */*\r\n\r\n"
  375. , host->rrdpush_send_api_key
  376. , host->hostname
  377. , host->registry_hostname
  378. , host->machine_guid
  379. , default_rrd_update_every
  380. , host->os
  381. , host->timezone
  382. , (host->tags) ? host->tags : ""
  383. , (host->system_info->os_name) ? host->system_info->os_name : ""
  384. , (host->system_info->os_id) ? host->system_info->os_id : ""
  385. , (host->system_info->os_id_like) ? host->system_info->os_id_like : ""
  386. , (host->system_info->os_version) ? host->system_info->os_version : ""
  387. , (host->system_info->os_version_id) ? host->system_info->os_version_id : ""
  388. , (host->system_info->os_detection) ? host->system_info->os_detection : ""
  389. , (host->system_info->kernel_name) ? host->system_info->kernel_name : ""
  390. , (host->system_info->kernel_version) ? host->system_info->kernel_version : ""
  391. , (host->system_info->architecture) ? host->system_info->architecture : ""
  392. , (host->system_info->virtualization) ? host->system_info->virtualization : ""
  393. , (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
  394. , (host->system_info->container) ? host->system_info->container : ""
  395. , (host->system_info->container_detection) ? host->system_info->container_detection : ""
  396. , host->program_name
  397. , host->program_version
  398. );
  399. if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
  400. error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", host->hostname, connected_to);
  401. rrdpush_sender_thread_close_socket(host);
  402. return 0;
  403. }
  404. info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, connected_to);
  405. if(recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout) == -1) {
  406. error("STREAM %s [send to %s]: remote netdata does not respond.", host->hostname, connected_to);
  407. rrdpush_sender_thread_close_socket(host);
  408. return 0;
  409. }
  410. if(strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)) != 0) {
  411. error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, connected_to);
  412. rrdpush_sender_thread_close_socket(host);
  413. return 0;
  414. }
  415. info("STREAM %s [send to %s]: established communication - ready to send metrics...", host->hostname, connected_to);
  416. if(sock_setnonblock(host->rrdpush_sender_socket) < 0)
  417. error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, connected_to);
  418. if(sock_enlarge_out(host->rrdpush_sender_socket) < 0)
  419. error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", host->hostname, connected_to);
  420. debug(D_STREAM, "STREAM: Connected on fd %d...", host->rrdpush_sender_socket);
  421. return 1;
  422. }
  423. static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
  424. RRDHOST *host = (RRDHOST *)ptr;
  425. rrdpush_buffer_lock(host);
  426. rrdhost_wrlock(host);
  427. info("STREAM %s [send]: sending thread cleans up...", host->hostname);
  428. rrdpush_sender_thread_close_socket(host);
  429. // close the pipe
  430. if(host->rrdpush_sender_pipe[PIPE_READ] != -1) {
  431. close(host->rrdpush_sender_pipe[PIPE_READ]);
  432. host->rrdpush_sender_pipe[PIPE_READ] = -1;
  433. }
  434. if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1) {
  435. close(host->rrdpush_sender_pipe[PIPE_WRITE]);
  436. host->rrdpush_sender_pipe[PIPE_WRITE] = -1;
  437. }
  438. buffer_free(host->rrdpush_sender_buffer);
  439. host->rrdpush_sender_buffer = NULL;
  440. if(!host->rrdpush_sender_join) {
  441. info("STREAM %s [send]: sending thread detaches itself.", host->hostname);
  442. netdata_thread_detach(netdata_thread_self());
  443. }
  444. host->rrdpush_sender_spawn = 0;
  445. info("STREAM %s [send]: sending thread now exits.", host->hostname);
  446. rrdhost_unlock(host);
  447. rrdpush_buffer_unlock(host);
  448. }
  449. void *rrdpush_sender_thread(void *ptr) {
  450. RRDHOST *host = (RRDHOST *)ptr;
  451. if(!host->rrdpush_send_enabled || !host->rrdpush_send_destination || !*host->rrdpush_send_destination || !host->rrdpush_send_api_key || !*host->rrdpush_send_api_key) {
  452. error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", host->hostname, gettid());
  453. return NULL;
  454. }
  455. info("STREAM %s [send]: thread created (task id %d)", host->hostname, gettid());
  456. int timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60);
  457. int default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
  458. size_t max_size = (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024);
  459. unsigned int reconnect_delay = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
  460. remote_clock_resync_iterations = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations);
  461. char connected_to[CONNECTED_TO_SIZE + 1] = "";
  462. // initialize rrdpush globals
  463. host->rrdpush_sender_buffer = buffer_create(1);
  464. host->rrdpush_sender_connected = 0;
  465. if(pipe(host->rrdpush_sender_pipe) == -1) fatal("STREAM %s [send]: cannot create required pipe.", host->hostname);
  466. // initialize local variables
  467. size_t begin = 0;
  468. size_t reconnects_counter = 0;
  469. size_t sent_bytes = 0;
  470. size_t sent_bytes_on_this_connection = 0;
  471. size_t send_attempts = 0;
  472. time_t last_sent_t = 0;
  473. struct pollfd fds[2], *ifd, *ofd;
  474. nfds_t fdmax;
  475. ifd = &fds[0];
  476. ofd = &fds[1];
  477. size_t not_connected_loops = 0;
  478. netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, host);
  479. for(; host->rrdpush_send_enabled && !netdata_exit ;) {
  480. // check for outstanding cancellation requests
  481. netdata_thread_testcancel();
  482. // if we don't have socket open, lets wait a bit
  483. if(unlikely(host->rrdpush_sender_socket == -1)) {
  484. send_attempts = 0;
  485. if(not_connected_loops == 0 && sent_bytes_on_this_connection > 0) {
  486. // fast re-connection on first disconnect
  487. sleep_usec(USEC_PER_MS * 500); // milliseconds
  488. }
  489. else {
  490. // slow re-connection on repeating errors
  491. sleep_usec(USEC_PER_SEC * reconnect_delay); // seconds
  492. }
  493. if(rrdpush_sender_thread_connect_to_master(host, default_port, timeout, &reconnects_counter, connected_to, CONNECTED_TO_SIZE)) {
  494. last_sent_t = now_monotonic_sec();
  495. // reset the buffer, to properly send charts and metrics
  496. rrdpush_sender_thread_data_flush(host);
  497. // send from the beginning
  498. begin = 0;
  499. // make sure the next reconnection will be immediate
  500. not_connected_loops = 0;
  501. // reset the bytes we have sent for this session
  502. sent_bytes_on_this_connection = 0;
  503. // let the data collection threads know we are ready
  504. host->rrdpush_sender_connected = 1;
  505. }
  506. else {
  507. // increase the failed connections counter
  508. not_connected_loops++;
  509. // reset the number of bytes sent
  510. sent_bytes_on_this_connection = 0;
  511. }
  512. // loop through
  513. continue;
  514. }
  515. else if(unlikely(now_monotonic_sec() - last_sent_t > timeout)) {
  516. error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", host->hostname, connected_to, timeout, sent_bytes_on_this_connection, send_attempts);
  517. rrdpush_sender_thread_close_socket(host);
  518. }
  519. ifd->fd = host->rrdpush_sender_pipe[PIPE_READ];
  520. ifd->events = POLLIN;
  521. ifd->revents = 0;
  522. ofd->fd = host->rrdpush_sender_socket;
  523. ofd->revents = 0;
  524. if(ofd->fd != -1 && begin < buffer_strlen(host->rrdpush_sender_buffer)) {
  525. debug(D_STREAM, "STREAM: Requesting data output on streaming socket %d...", ofd->fd);
  526. ofd->events = POLLOUT;
  527. fdmax = 2;
  528. send_attempts++;
  529. }
  530. else {
  531. debug(D_STREAM, "STREAM: Not requesting data output on streaming socket %d (nothing to send now)...", ofd->fd);
  532. ofd->events = 0;
  533. fdmax = 1;
  534. }
  535. debug(D_STREAM, "STREAM: Waiting for poll() events (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer));
  536. if(unlikely(netdata_exit)) break;
  537. int retval = poll(fds, fdmax, 1000);
  538. if(unlikely(netdata_exit)) break;
  539. if(unlikely(retval == -1)) {
  540. debug(D_STREAM, "STREAM: poll() failed (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer));
  541. if(errno == EAGAIN || errno == EINTR) {
  542. debug(D_STREAM, "STREAM: poll() failed with EAGAIN or EINTR...");
  543. }
  544. else {
  545. error("STREAM %s [send to %s]: failed to poll(). Closing socket.", host->hostname, connected_to);
  546. rrdpush_sender_thread_close_socket(host);
  547. }
  548. continue;
  549. }
  550. else if(likely(retval)) {
  551. if (ifd->revents & POLLIN || ifd->revents & POLLPRI) {
  552. debug(D_STREAM, "STREAM: Data added to send buffer (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer));
  553. char buffer[1000 + 1];
  554. if (read(host->rrdpush_sender_pipe[PIPE_READ], buffer, 1000) == -1)
  555. error("STREAM %s [send to %s]: cannot read from internal pipe.", host->hostname, connected_to);
  556. }
  557. if (ofd->revents & POLLOUT) {
  558. if (begin < buffer_strlen(host->rrdpush_sender_buffer)) {
  559. debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes, begin = %zu)...", buffer_strlen(host->rrdpush_sender_buffer), begin);
  560. // BEGIN RRDPUSH LOCKED SESSION
  561. // during this session, data collectors
  562. // will not be able to append data to our buffer
  563. // but the socket is in non-blocking mode
  564. // so, we will not block at send()
  565. netdata_thread_disable_cancelability();
  566. debug(D_STREAM, "STREAM: Getting exclusive lock on host...");
  567. rrdpush_buffer_lock(host);
  568. debug(D_STREAM, "STREAM: Sending data, starting from %zu, size %zu...", begin, buffer_strlen(host->rrdpush_sender_buffer));
  569. ssize_t ret = send(host->rrdpush_sender_socket, &host->rrdpush_sender_buffer->buffer[begin], buffer_strlen(host->rrdpush_sender_buffer) - begin, MSG_DONTWAIT);
  570. if (unlikely(ret == -1)) {
  571. if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) {
  572. debug(D_STREAM, "STREAM: Send failed - closing socket...");
  573. error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_bytes_on_this_connection);
  574. rrdpush_sender_thread_close_socket(host);
  575. }
  576. else {
  577. debug(D_STREAM, "STREAM: Send failed - will retry...");
  578. }
  579. }
  580. else if (likely(ret > 0)) {
  581. // DEBUG - dump the string to see it
  582. //char c = host->rrdpush_sender_buffer->buffer[begin + ret];
  583. //host->rrdpush_sender_buffer->buffer[begin + ret] = '\0';
  584. //debug(D_STREAM, "STREAM: sent from %zu to %zd:\n%s\n", begin, ret, &host->rrdpush_sender_buffer->buffer[begin]);
  585. //host->rrdpush_sender_buffer->buffer[begin + ret] = c;
  586. sent_bytes_on_this_connection += ret;
  587. sent_bytes += ret;
  588. begin += ret;
  589. if (begin == buffer_strlen(host->rrdpush_sender_buffer)) {
  590. // we send it all
  591. debug(D_STREAM, "STREAM: Sent %zd bytes (the whole buffer)...", ret);
  592. buffer_flush(host->rrdpush_sender_buffer);
  593. begin = 0;
  594. }
  595. else {
  596. debug(D_STREAM, "STREAM: Sent %zd bytes (part of the data buffer)...", ret);
  597. }
  598. last_sent_t = now_monotonic_sec();
  599. }
  600. else {
  601. debug(D_STREAM, "STREAM: send() returned %zd - closing the socket...", ret);
  602. error("STREAM %s [send to %s]: failed to send metrics (send() returned %zd) - closing connection - we have sent %zu bytes on this connection.",
  603. host->hostname, connected_to, ret, sent_bytes_on_this_connection);
  604. rrdpush_sender_thread_close_socket(host);
  605. }
  606. debug(D_STREAM, "STREAM: Releasing exclusive lock on host...");
  607. rrdpush_buffer_unlock(host);
  608. netdata_thread_enable_cancelability();
  609. // END RRDPUSH LOCKED SESSION
  610. }
  611. else {
  612. debug(D_STREAM, "STREAM: we have sent the entire buffer, but we received POLLOUT...");
  613. }
  614. }
  615. if(host->rrdpush_sender_socket != -1) {
  616. char *error = NULL;
  617. if (unlikely(ofd->revents & POLLERR))
  618. error = "socket reports errors (POLLERR)";
  619. else if (unlikely(ofd->revents & POLLHUP))
  620. error = "connection closed by remote end (POLLHUP)";
  621. else if (unlikely(ofd->revents & POLLNVAL))
  622. error = "connection is invalid (POLLNVAL)";
  623. if(unlikely(error)) {
  624. debug(D_STREAM, "STREAM: %s - closing socket...", error);
  625. error("STREAM %s [send to %s]: %s - reopening socket - we have sent %zu bytes on this connection.", host->hostname, connected_to, error, sent_bytes_on_this_connection);
  626. rrdpush_sender_thread_close_socket(host);
  627. }
  628. }
  629. }
  630. else {
  631. debug(D_STREAM, "STREAM: poll() timed out.");
  632. }
  633. // protection from overflow
  634. if(buffer_strlen(host->rrdpush_sender_buffer) > max_size) {
  635. debug(D_STREAM, "STREAM: Buffer is too big (%zu bytes), bigger than the max (%zu) - flushing it...", buffer_strlen(host->rrdpush_sender_buffer), max_size);
  636. errno = 0;
  637. error("STREAM %s [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", host->hostname, connected_to, host->rrdpush_sender_buffer->len, host->rrdpush_sender_buffer->len - begin, sent_bytes, sent_bytes_on_this_connection);
  638. rrdpush_sender_thread_close_socket(host);
  639. }
  640. }
  641. netdata_thread_cleanup_pop(1);
  642. return NULL;
  643. }
  644. // ----------------------------------------------------------------------------
  645. // rrdpush receiver thread
  646. static void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
  647. log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
  648. }
  649. static RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY get_multiple_connections_strategy(struct config *c, const char *section, const char *name, RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY def) {
  650. char *value;
  651. switch(def) {
  652. default:
  653. case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
  654. value = "allow";
  655. break;
  656. case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
  657. value = "deny";
  658. break;
  659. }
  660. value = appconfig_get(c, section, name, value);
  661. RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY ret = def;
  662. if(strcasecmp(value, "allow") == 0 || strcasecmp(value, "permit") == 0 || strcasecmp(value, "accept") == 0)
  663. ret = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
  664. else if(strcasecmp(value, "deny") == 0 || strcasecmp(value, "reject") == 0 || strcasecmp(value, "block") == 0)
  665. ret = RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW;
  666. else
  667. error("Invalid stream config value at section [%s], setting '%s', value '%s'", section, name, value);
  668. return ret;
  669. }
  670. static int rrdpush_receive(int fd
  671. , const char *key
  672. , const char *hostname
  673. , const char *registry_hostname
  674. , const char *machine_guid
  675. , const char *os
  676. , const char *timezone
  677. , const char *tags
  678. , const char *program_name
  679. , const char *program_version
  680. , struct rrdhost_system_info *system_info
  681. , int update_every
  682. , char *client_ip
  683. , char *client_port
  684. ) {
  685. RRDHOST *host;
  686. int history = default_rrd_history_entries;
  687. RRD_MEMORY_MODE mode = default_rrd_memory_mode;
  688. int health_enabled = default_health_enabled;
  689. int rrdpush_enabled = default_rrdpush_enabled;
  690. char *rrdpush_destination = default_rrdpush_destination;
  691. char *rrdpush_api_key = default_rrdpush_api_key;
  692. char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
  693. time_t alarms_delay = 60;
  694. RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY rrdpush_multiple_connections_strategy = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
  695. update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
  696. if(update_every < 0) update_every = 1;
  697. history = (int)appconfig_get_number(&stream_config, key, "default history", history);
  698. history = (int)appconfig_get_number(&stream_config, machine_guid, "history", history);
  699. if(history < 5) history = 5;
  700. mode = rrd_memory_mode_id(appconfig_get(&stream_config, key, "default memory mode", rrd_memory_mode_name(mode)));
  701. mode = rrd_memory_mode_id(appconfig_get(&stream_config, machine_guid, "memory mode", rrd_memory_mode_name(mode)));
  702. health_enabled = appconfig_get_boolean_ondemand(&stream_config, key, "health enabled by default", health_enabled);
  703. health_enabled = appconfig_get_boolean_ondemand(&stream_config, machine_guid, "health enabled", health_enabled);
  704. alarms_delay = appconfig_get_number(&stream_config, key, "default postpone alarms on connect seconds", alarms_delay);
  705. alarms_delay = appconfig_get_number(&stream_config, machine_guid, "postpone alarms on connect seconds", alarms_delay);
  706. rrdpush_enabled = appconfig_get_boolean(&stream_config, key, "default proxy enabled", rrdpush_enabled);
  707. rrdpush_enabled = appconfig_get_boolean(&stream_config, machine_guid, "proxy enabled", rrdpush_enabled);
  708. rrdpush_destination = appconfig_get(&stream_config, key, "default proxy destination", rrdpush_destination);
  709. rrdpush_destination = appconfig_get(&stream_config, machine_guid, "proxy destination", rrdpush_destination);
  710. rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key);
  711. rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key);
  712. rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, key, "multiple connections", rrdpush_multiple_connections_strategy);
  713. rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, machine_guid, "multiple connections", rrdpush_multiple_connections_strategy);
  714. rrdpush_send_charts_matching = appconfig_get(&stream_config, key, "default proxy send charts matching", rrdpush_send_charts_matching);
  715. rrdpush_send_charts_matching = appconfig_get(&stream_config, machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
  716. tags = appconfig_set_default(&stream_config, machine_guid, "host tags", (tags)?tags:"");
  717. if(tags && !*tags) tags = NULL;
  718. if (strcmp(machine_guid, localhost->machine_guid) == 0) {
  719. log_stream_connection(client_ip, client_port, key, machine_guid, hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO MASTER");
  720. error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the master/proxy machine guid to a slave?", hostname, client_ip, client_port, machine_guid);
  721. close(fd);
  722. return 1;
  723. }
  724. else
  725. host = rrdhost_find_or_create(
  726. hostname
  727. , registry_hostname
  728. , machine_guid
  729. , os
  730. , timezone
  731. , tags
  732. , program_name
  733. , program_version
  734. , update_every
  735. , history
  736. , mode
  737. , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
  738. , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
  739. , rrdpush_destination
  740. , rrdpush_api_key
  741. , rrdpush_send_charts_matching
  742. , system_info
  743. );
  744. if(!host) {
  745. close(fd);
  746. log_stream_connection(client_ip, client_port, key, machine_guid, hostname, "FAILED - CANNOT ACQUIRE HOST");
  747. error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", hostname, client_ip, client_port);
  748. return 1;
  749. }
  750. #ifdef NETDATA_INTERNAL_CHECKS
  751. info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s, tags '%s'"
  752. , hostname
  753. , client_ip
  754. , client_port
  755. , host->hostname
  756. , host->machine_guid
  757. , host->rrd_update_every
  758. , host->rrd_history_entries
  759. , rrd_memory_mode_name(host->rrd_memory_mode)
  760. , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
  761. , host->tags?host->tags:""
  762. );
  763. #endif // NETDATA_INTERNAL_CHECKS
  764. struct plugind cd = {
  765. .enabled = 1,
  766. .update_every = default_rrd_update_every,
  767. .pid = 0,
  768. .serial_failures = 0,
  769. .successful_collections = 0,
  770. .obsolete = 0,
  771. .started_t = now_realtime_sec(),
  772. .next = NULL,
  773. };
  774. // put the client IP and port into the buffers used by plugins.d
  775. snprintfz(cd.id, CONFIG_MAX_NAME, "%s:%s", client_ip, client_port);
  776. snprintfz(cd.filename, FILENAME_MAX, "%s:%s", client_ip, client_port);
  777. snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", client_ip, client_port);
  778. snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port);
  779. info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port);
  780. if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
  781. log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "FAILED - CANNOT REPLY");
  782. error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port);
  783. close(fd);
  784. return 0;
  785. }
  786. // remove the non-blocking flag from the socket
  787. if(sock_delnonblock(fd) < 0)
  788. error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", host->hostname, client_ip, client_port, fd);
  789. // convert the socket to a FILE *
  790. FILE *fp = fdopen(fd, "r");
  791. if(!fp) {
  792. log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "FAILED - SOCKET ERROR");
  793. error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", host->hostname, client_ip, client_port, fd);
  794. close(fd);
  795. return 0;
  796. }
  797. rrdhost_wrlock(host);
  798. if(host->connected_senders > 0) {
  799. switch(rrdpush_multiple_connections_strategy) {
  800. case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
  801. info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port);
  802. break;
  803. case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
  804. rrdhost_unlock(host);
  805. log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED");
  806. info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port);
  807. fclose(fp);
  808. return 0;
  809. }
  810. }
  811. rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
  812. host->connected_senders++;
  813. host->senders_disconnected_time = 0;
  814. if(health_enabled != CONFIG_BOOLEAN_NO) {
  815. if(alarms_delay > 0) {
  816. host->health_delay_up_to = now_realtime_sec() + alarms_delay;
  817. info("Postponing health checks for %ld seconds, on host '%s', because it was just connected."
  818. , alarms_delay
  819. , host->hostname
  820. );
  821. }
  822. }
  823. rrdhost_unlock(host);
  824. // call the plugins.d processor to receive the metrics
  825. info("STREAM %s [receive from [%s]:%s]: receiving metrics...", host->hostname, client_ip, client_port);
  826. log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "CONNECTED");
  827. size_t count = pluginsd_process(host, &cd, fp, 1);
  828. log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "DISCONNECTED");
  829. error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", host->hostname, client_ip, client_port, count);
  830. rrdhost_wrlock(host);
  831. host->senders_disconnected_time = now_realtime_sec();
  832. host->connected_senders--;
  833. if(!host->connected_senders) {
  834. rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN);
  835. if(health_enabled == CONFIG_BOOLEAN_AUTO)
  836. host->health_enabled = 0;
  837. }
  838. rrdhost_unlock(host);
  839. if(host->connected_senders == 0)
  840. rrdpush_sender_thread_stop(host);
  841. // cleanup
  842. fclose(fp);
  843. return (int)count;
  844. }
  845. struct rrdpush_thread {
  846. int fd;
  847. char *key;
  848. char *hostname;
  849. char *registry_hostname;
  850. char *machine_guid;
  851. char *os;
  852. char *timezone;
  853. char *tags;
  854. char *client_ip;
  855. char *client_port;
  856. char *program_name;
  857. char *program_version;
  858. struct rrdhost_system_info *system_info;
  859. int update_every;
  860. };
  861. static void rrdpush_receiver_thread_cleanup(void *ptr) {
  862. static __thread int executed = 0;
  863. if(!executed) {
  864. executed = 1;
  865. struct rrdpush_thread *rpt = (struct rrdpush_thread *) ptr;
  866. info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
  867. freez(rpt->key);
  868. freez(rpt->hostname);
  869. freez(rpt->registry_hostname);
  870. freez(rpt->machine_guid);
  871. freez(rpt->os);
  872. freez(rpt->timezone);
  873. freez(rpt->tags);
  874. freez(rpt->client_ip);
  875. freez(rpt->client_port);
  876. freez(rpt->program_name);
  877. freez(rpt->program_version);
  878. freez(rpt);
  879. }
  880. }
  881. static void *rrdpush_receiver_thread(void *ptr) {
  882. netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
  883. struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr;
  884. info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
  885. rrdpush_receive(
  886. rpt->fd
  887. , rpt->key
  888. , rpt->hostname
  889. , rpt->registry_hostname
  890. , rpt->machine_guid
  891. , rpt->os
  892. , rpt->timezone
  893. , rpt->tags
  894. , rpt->program_name
  895. , rpt->program_version
  896. , rpt->system_info
  897. , rpt->update_every
  898. , rpt->client_ip
  899. , rpt->client_port
  900. );
  901. netdata_thread_cleanup_pop(1);
  902. return NULL;
  903. }
  904. static void rrdpush_sender_thread_spawn(RRDHOST *host) {
  905. rrdhost_wrlock(host);
  906. if(!host->rrdpush_sender_spawn) {
  907. char tag[NETDATA_THREAD_TAG_MAX + 1];
  908. snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", host->hostname);
  909. if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host))
  910. error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
  911. else
  912. host->rrdpush_sender_spawn = 1;
  913. }
  914. rrdhost_unlock(host);
  915. }
  916. int rrdpush_receiver_permission_denied(struct web_client *w) {
  917. // we always respond with the same message and error code
  918. // to prevent an attacker from gaining info about the error
  919. buffer_flush(w->response.data);
  920. buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info.");
  921. return 401;
  922. }
  923. int rrdpush_receiver_too_busy_now(struct web_client *w) {
  924. // we always respond with the same message and error code
  925. // to prevent an attacker from gaining info about the error
  926. buffer_flush(w->response.data);
  927. buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later.");
  928. return 503;
  929. }
  930. int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) {
  931. (void)host;
  932. info("clients wants to STREAM metrics.");
  933. char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL;
  934. int update_every = default_rrd_update_every;
  935. char buf[GUID_LEN + 1];
  936. struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
  937. while(url) {
  938. char *value = mystrsep(&url, "&");
  939. if(!value || !*value) continue;
  940. char *name = mystrsep(&value, "=");
  941. if(!name || !*name) continue;
  942. if(!value || !*value) continue;
  943. if(!strcmp(name, "key"))
  944. key = value;
  945. else if(!strcmp(name, "hostname"))
  946. hostname = value;
  947. else if(!strcmp(name, "registry_hostname"))
  948. registry_hostname = value;
  949. else if(!strcmp(name, "machine_guid"))
  950. machine_guid = value;
  951. else if(!strcmp(name, "update_every"))
  952. update_every = (int)strtoul(value, NULL, 0);
  953. else if(!strcmp(name, "os"))
  954. os = value;
  955. else if(!strcmp(name, "timezone"))
  956. timezone = value;
  957. else if(!strcmp(name, "tags"))
  958. tags = value;
  959. else
  960. if(unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
  961. info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", w->client_ip, w->client_port, key, value);
  962. }
  963. }
  964. if(!key || !*key) {
  965. rrdhost_system_info_free(system_info);
  966. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO KEY");
  967. error("STREAM [receive from [%s]:%s]: request without an API key. Forbidding access.", w->client_ip, w->client_port);
  968. return rrdpush_receiver_permission_denied(w);
  969. }
  970. if(!hostname || !*hostname) {
  971. rrdhost_system_info_free(system_info);
  972. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO HOSTNAME");
  973. error("STREAM [receive from [%s]:%s]: request without a hostname. Forbidding access.", w->client_ip, w->client_port);
  974. return rrdpush_receiver_permission_denied(w);
  975. }
  976. if(!machine_guid || !*machine_guid) {
  977. rrdhost_system_info_free(system_info);
  978. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO MACHINE GUID");
  979. error("STREAM [receive from [%s]:%s]: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port);
  980. return rrdpush_receiver_permission_denied(w);
  981. }
  982. if(regenerate_guid(key, buf) == -1) {
  983. rrdhost_system_info_free(system_info);
  984. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - INVALID KEY");
  985. error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key);
  986. return rrdpush_receiver_permission_denied(w);
  987. }
  988. if(regenerate_guid(machine_guid, buf) == -1) {
  989. rrdhost_system_info_free(system_info);
  990. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - INVALID MACHINE GUID");
  991. error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid);
  992. return rrdpush_receiver_permission_denied(w);
  993. }
  994. if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) {
  995. rrdhost_system_info_free(system_info);
  996. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - KEY NOT ENABLED");
  997. error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
  998. return rrdpush_receiver_permission_denied(w);
  999. }
  1000. {
  1001. SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
  1002. if(key_allow_from) {
  1003. if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
  1004. simple_pattern_free(key_allow_from);
  1005. rrdhost_system_info_free(system_info);
  1006. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname) ? hostname : "-", "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP");
  1007. error("STREAM [receive from [%s]:%s]: API key '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, key);
  1008. return rrdpush_receiver_permission_denied(w);
  1009. }
  1010. simple_pattern_free(key_allow_from);
  1011. }
  1012. }
  1013. if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) {
  1014. rrdhost_system_info_free(system_info);
  1015. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - MACHINE GUID NOT ENABLED");
  1016. error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
  1017. return rrdpush_receiver_permission_denied(w);
  1018. }
  1019. {
  1020. SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
  1021. if(machine_allow_from) {
  1022. if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
  1023. simple_pattern_free(machine_allow_from);
  1024. rrdhost_system_info_free(system_info);
  1025. log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname) ? hostname : "-", "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP");
  1026. error("STREAM [receive from [%s]:%s]: Machine GUID '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, machine_guid);
  1027. return rrdpush_receiver_permission_denied(w);
  1028. }
  1029. simple_pattern_free(machine_allow_from);
  1030. }
  1031. }
  1032. if(unlikely(web_client_streaming_rate_t > 0)) {
  1033. static netdata_mutex_t stream_rate_mutex = NETDATA_MUTEX_INITIALIZER;
  1034. static volatile time_t last_stream_accepted_t = 0;
  1035. netdata_mutex_lock(&stream_rate_mutex);
  1036. time_t now = now_realtime_sec();
  1037. if(unlikely(last_stream_accepted_t == 0))
  1038. last_stream_accepted_t = now;
  1039. if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
  1040. netdata_mutex_unlock(&stream_rate_mutex);
  1041. rrdhost_system_info_free(system_info);
  1042. error("STREAM [receive from [%s]:%s]: too busy to accept new streaming request. Will be allowed in %ld secs.", w->client_ip, w->client_port, (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
  1043. return rrdpush_receiver_too_busy_now(w);
  1044. }
  1045. last_stream_accepted_t = now;
  1046. netdata_mutex_unlock(&stream_rate_mutex);
  1047. }
  1048. struct rrdpush_thread *rpt = callocz(1, sizeof(struct rrdpush_thread));
  1049. rpt->fd = w->ifd;
  1050. rpt->key = strdupz(key);
  1051. rpt->hostname = strdupz(hostname);
  1052. rpt->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname);
  1053. rpt->machine_guid = strdupz(machine_guid);
  1054. rpt->os = strdupz(os);
  1055. rpt->timezone = strdupz(timezone);
  1056. rpt->tags = (tags)?strdupz(tags):NULL;
  1057. rpt->client_ip = strdupz(w->client_ip);
  1058. rpt->client_port = strdupz(w->client_port);
  1059. rpt->update_every = update_every;
  1060. rpt->system_info = system_info;
  1061. if(w->user_agent && w->user_agent[0]) {
  1062. char *t = strchr(w->user_agent, '/');
  1063. if(t && *t) {
  1064. *t = '\0';
  1065. t++;
  1066. }
  1067. rpt->program_name = strdupz(w->user_agent);
  1068. if(t && *t) rpt->program_version = strdupz(t);
  1069. }
  1070. netdata_thread_t thread;
  1071. debug(D_SYSTEM, "starting STREAM receive thread.");
  1072. char tag[FILENAME_MAX + 1];
  1073. snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
  1074. if(netdata_thread_create(&thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
  1075. error("Failed to create new STREAM receive thread for client.");
  1076. // prevent the caller from closing the streaming socket
  1077. if(web_server_mode == WEB_SERVER_MODE_STATIC_THREADED) {
  1078. web_client_flag_set(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET);
  1079. }
  1080. else {
  1081. if(w->ifd == w->ofd)
  1082. w->ifd = w->ofd = -1;
  1083. else
  1084. w->ifd = -1;
  1085. }
  1086. buffer_flush(w->response.data);
  1087. return 200;
  1088. }