static-threaded.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define WEB_SERVER_INTERNALS 1
  3. #include "static-threaded.h"
  4. int web_client_timeout = DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS;
  5. int web_client_first_request_timeout = DEFAULT_TIMEOUT_TO_RECEIVE_FIRST_WEB_REQUEST;
  6. long web_client_streaming_rate_t = 0L;
  7. /*
  8. * --------------------------------------------------------------------------------------------------------------------
  9. * Build web_client state from the pollinfo that describes an accepted connection.
  10. */
  11. static struct web_client *web_client_create_on_fd(POLLINFO *pi) {
  12. struct web_client *w;
  13. w = web_client_get_from_cache_or_allocate();
  14. w->ifd = w->ofd = pi->fd;
  15. strncpyz(w->client_ip, pi->client_ip, sizeof(w->client_ip) - 1);
  16. strncpyz(w->client_port, pi->client_port, sizeof(w->client_port) - 1);
  17. strncpyz(w->client_host, pi->client_host, sizeof(w->client_host) - 1);
  18. if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-");
  19. if(unlikely(!*w->client_port)) strcpy(w->client_port, "-");
  20. w->port_acl = pi->port_acl;
  21. web_client_initialize_connection(w);
  22. w->pollinfo_slot = pi->slot;
  23. return(w);
  24. }
  25. // --------------------------------------------------------------------------------------
  26. // the main socket listener - STATIC-THREADED
  27. struct web_server_static_threaded_worker {
  28. netdata_thread_t thread;
  29. int id;
  30. int running;
  31. size_t max_sockets;
  32. volatile size_t connected;
  33. volatile size_t disconnected;
  34. volatile size_t receptions;
  35. volatile size_t sends;
  36. volatile size_t max_concurrent;
  37. volatile size_t files_read;
  38. volatile size_t file_reads;
  39. };
  40. static long long static_threaded_workers_count = 1;
  41. static struct web_server_static_threaded_worker *static_workers_private_data = NULL;
  42. static __thread struct web_server_static_threaded_worker *worker_private = NULL;
  43. // ----------------------------------------------------------------------------
  44. static inline int web_server_check_client_status(struct web_client *w) {
  45. if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w))))
  46. return -1;
  47. return 0;
  48. }
  49. // ----------------------------------------------------------------------------
  50. // web server files
  51. static void *web_server_file_add_callback(POLLINFO *pi, short int *events, void *data) {
  52. struct web_client *w = (struct web_client *)data;
  53. worker_private->files_read++;
  54. debug(D_WEB_CLIENT, "%llu: ADDED FILE READ ON FD %d", w->id, pi->fd);
  55. *events = POLLIN;
  56. pi->data = w;
  57. return w;
  58. }
  59. static void web_server_file_del_callback(POLLINFO *pi) {
  60. struct web_client *w = (struct web_client *)pi->data;
  61. debug(D_WEB_CLIENT, "%llu: RELEASE FILE READ ON FD %d", w->id, pi->fd);
  62. w->pollinfo_filecopy_slot = 0;
  63. if(unlikely(!w->pollinfo_slot)) {
  64. debug(D_WEB_CLIENT, "%llu: CROSS WEB CLIENT CLEANUP (iFD %d, oFD %d)", w->id, pi->fd, w->ofd);
  65. web_client_release(w);
  66. }
  67. }
  68. static int web_server_file_read_callback(POLLINFO *pi, short int *events) {
  69. struct web_client *w = (struct web_client *)pi->data;
  70. // if there is no POLLINFO linked to this, it means the client disconnected
  71. // stop the file reading too
  72. if(unlikely(!w->pollinfo_slot)) {
  73. debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON CLOSED WEB CLIENT", w->id, pi->fd);
  74. return -1;
  75. }
  76. if(unlikely(w->mode != WEB_CLIENT_MODE_FILECOPY || w->ifd == w->ofd)) {
  77. debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON NON-FILECOPY WEB CLIENT", w->id, pi->fd);
  78. return -1;
  79. }
  80. debug(D_WEB_CLIENT, "%llu: READING FILE ON FD %d", w->id, pi->fd);
  81. worker_private->file_reads++;
  82. ssize_t ret = unlikely(web_client_read_file(w));
  83. if(likely(web_client_has_wait_send(w))) {
  84. POLLJOB *p = pi->p; // our POLLJOB
  85. POLLINFO *wpi = pollinfo_from_slot(p, w->pollinfo_slot); // POLLINFO of the client socket
  86. debug(D_WEB_CLIENT, "%llu: SIGNALING W TO SEND (iFD %d, oFD %d)", w->id, pi->fd, wpi->fd);
  87. p->fds[wpi->slot].events |= POLLOUT;
  88. }
  89. if(unlikely(ret <= 0 || w->ifd == w->ofd)) {
  90. debug(D_WEB_CLIENT, "%llu: DONE READING FILE ON FD %d", w->id, pi->fd);
  91. return -1;
  92. }
  93. *events = POLLIN;
  94. return 0;
  95. }
  96. static int web_server_file_write_callback(POLLINFO *pi, short int *events) {
  97. (void)pi;
  98. (void)events;
  99. error("Writing to web files is not supported!");
  100. return -1;
  101. }
  102. // ----------------------------------------------------------------------------
  103. // web server clients
  104. static void *web_server_add_callback(POLLINFO *pi, short int *events, void *data) {
  105. (void)data; // Suppress warning on unused argument
  106. worker_private->connected++;
  107. size_t concurrent = worker_private->connected - worker_private->disconnected;
  108. if(unlikely(concurrent > worker_private->max_concurrent))
  109. worker_private->max_concurrent = concurrent;
  110. *events = POLLIN;
  111. debug(D_WEB_CLIENT_ACCESS, "LISTENER on %d: new connection.", pi->fd);
  112. struct web_client *w = web_client_create_on_fd(pi);
  113. if (!strncmp(pi->client_port, "UNIX", 4)) {
  114. web_client_set_unix(w);
  115. } else {
  116. web_client_set_tcp(w);
  117. }
  118. #ifdef ENABLE_HTTPS
  119. if ((!web_client_check_unix(w)) && ( netdata_srv_ctx )) {
  120. if( sock_delnonblock(w->ifd) < 0 ){
  121. error("Web server cannot remove the non-blocking flag from socket %d",w->ifd);
  122. }
  123. //Read the first 7 bytes from the message, but the message
  124. //is not removed from the queue, because we are using MSG_PEEK
  125. char test[8];
  126. if ( recv(w->ifd,test, 7,MSG_PEEK) == 7 ) {
  127. test[7] = 0x00;
  128. }
  129. else {
  130. //Case I do not have success to read 7 bytes,
  131. //this means that the mensage was not completely read, so
  132. //I cannot identify it yet.
  133. sock_setnonblock(w->ifd);
  134. return w;
  135. }
  136. //The next two ifs are not together because I am reusing SSL structure
  137. if (!w->ssl.conn)
  138. {
  139. w->ssl.conn = SSL_new(netdata_srv_ctx);
  140. if ( w->ssl.conn ) {
  141. SSL_set_accept_state(w->ssl.conn);
  142. } else {
  143. error("Failed to create SSL context on socket fd %d.", w->ifd);
  144. if (test[0] < 0x18){
  145. WEB_CLIENT_IS_DEAD(w);
  146. sock_setnonblock(w->ifd);
  147. return w;
  148. }
  149. }
  150. }
  151. if (w->ssl.conn) {
  152. if (SSL_set_fd(w->ssl.conn, w->ifd) != 1) {
  153. error("Failed to set the socket to the SSL on socket fd %d.", w->ifd);
  154. //The client is not set dead, because I received a normal HTTP request
  155. //instead a Client Hello(HTTPS).
  156. if ( test[0] < 0x18 ){
  157. WEB_CLIENT_IS_DEAD(w);
  158. }
  159. }
  160. else{
  161. w->ssl.flags = security_process_accept(w->ssl.conn, (int)test[0]);
  162. }
  163. }
  164. sock_setnonblock(w->ifd);
  165. } else{
  166. w->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  167. }
  168. #endif
  169. debug(D_WEB_CLIENT, "%llu: ADDED CLIENT FD %d", w->id, pi->fd);
  170. return w;
  171. }
  172. // TCP client disconnected
  173. static void web_server_del_callback(POLLINFO *pi) {
  174. worker_private->disconnected++;
  175. struct web_client *w = (struct web_client *)pi->data;
  176. w->pollinfo_slot = 0;
  177. if(unlikely(w->pollinfo_filecopy_slot)) {
  178. POLLINFO *fpi = pollinfo_from_slot(pi->p, w->pollinfo_filecopy_slot); // POLLINFO of the client socket
  179. (void)fpi;
  180. debug(D_WEB_CLIENT, "%llu: THE CLIENT WILL BE FRED BY READING FILE JOB ON FD %d", w->id, fpi->fd);
  181. }
  182. else {
  183. if(web_client_flag_check(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET))
  184. pi->flags |= POLLINFO_FLAG_DONT_CLOSE;
  185. debug(D_WEB_CLIENT, "%llu: CLOSING CLIENT FD %d", w->id, pi->fd);
  186. web_client_release(w);
  187. }
  188. }
  189. static int web_server_rcv_callback(POLLINFO *pi, short int *events) {
  190. worker_private->receptions++;
  191. struct web_client *w = (struct web_client *)pi->data;
  192. int fd = pi->fd;
  193. if(unlikely(web_client_receive(w) < 0))
  194. return -1;
  195. debug(D_WEB_CLIENT, "%llu: processing received data on fd %d.", w->id, fd);
  196. web_client_process_request(w);
  197. if(unlikely(w->mode == WEB_CLIENT_MODE_FILECOPY)) {
  198. if(w->pollinfo_filecopy_slot == 0) {
  199. debug(D_WEB_CLIENT, "%llu: FILECOPY DETECTED ON FD %d", w->id, pi->fd);
  200. if (unlikely(w->ifd != -1 && w->ifd != w->ofd && w->ifd != fd)) {
  201. // add a new socket to poll_events, with the same
  202. debug(D_WEB_CLIENT, "%llu: CREATING FILECOPY SLOT ON FD %d", w->id, pi->fd);
  203. POLLINFO *fpi = poll_add_fd(
  204. pi->p
  205. , w->ifd
  206. , pi->port_acl
  207. , 0
  208. , POLLINFO_FLAG_CLIENT_SOCKET
  209. , "FILENAME"
  210. , ""
  211. , ""
  212. , web_server_file_add_callback
  213. , web_server_file_del_callback
  214. , web_server_file_read_callback
  215. , web_server_file_write_callback
  216. , (void *) w
  217. );
  218. if(fpi)
  219. w->pollinfo_filecopy_slot = fpi->slot;
  220. else {
  221. error("Failed to add filecopy fd. Closing client.");
  222. return -1;
  223. }
  224. }
  225. }
  226. }
  227. else {
  228. if(unlikely(w->ifd == fd && web_client_has_wait_receive(w)))
  229. *events |= POLLIN;
  230. }
  231. if(unlikely(w->ofd == fd && web_client_has_wait_send(w)))
  232. *events |= POLLOUT;
  233. return web_server_check_client_status(w);
  234. }
  235. static int web_server_snd_callback(POLLINFO *pi, short int *events) {
  236. worker_private->sends++;
  237. struct web_client *w = (struct web_client *)pi->data;
  238. int fd = pi->fd;
  239. debug(D_WEB_CLIENT, "%llu: sending data on fd %d.", w->id, fd);
  240. if(unlikely(web_client_send(w) < 0))
  241. return -1;
  242. if(unlikely(w->ifd == fd && web_client_has_wait_receive(w)))
  243. *events |= POLLIN;
  244. if(unlikely(w->ofd == fd && web_client_has_wait_send(w)))
  245. *events |= POLLOUT;
  246. return web_server_check_client_status(w);
  247. }
  248. static void web_server_tmr_callback(void *timer_data) {
  249. worker_private = (struct web_server_static_threaded_worker *)timer_data;
  250. static __thread RRDSET *st = NULL;
  251. static __thread RRDDIM *rd_user = NULL, *rd_system = NULL;
  252. if(unlikely(netdata_exit)) return;
  253. if(unlikely(!st)) {
  254. char id[100 + 1];
  255. char title[100 + 1];
  256. snprintfz(id, 100, "web_thread%d_cpu", worker_private->id + 1);
  257. snprintfz(title, 100, "Netdata web server thread CPU usage");
  258. st = rrdset_create_localhost(
  259. "netdata"
  260. , id
  261. , NULL
  262. , "web"
  263. , "netdata.web_cpu"
  264. , title
  265. , "milliseconds/s"
  266. , "web"
  267. , "stats"
  268. , 132000 + worker_private->id
  269. , default_rrd_update_every
  270. , RRDSET_TYPE_STACKED
  271. );
  272. rd_user = rrddim_add(st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
  273. rd_system = rrddim_add(st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
  274. }
  275. else
  276. rrdset_next(st);
  277. struct rusage rusage;
  278. getrusage(RUSAGE_THREAD, &rusage);
  279. rrddim_set_by_pointer(st, rd_user, rusage.ru_utime.tv_sec * 1000000ULL + rusage.ru_utime.tv_usec);
  280. rrddim_set_by_pointer(st, rd_system, rusage.ru_stime.tv_sec * 1000000ULL + rusage.ru_stime.tv_usec);
  281. rrdset_done(st);
  282. }
  283. // ----------------------------------------------------------------------------
  284. // web server worker thread
  285. static void socket_listen_main_static_threaded_worker_cleanup(void *ptr) {
  286. worker_private = (struct web_server_static_threaded_worker *)ptr;
  287. info("freeing local web clients cache...");
  288. web_client_cache_destroy();
  289. info("stopped after %zu connects, %zu disconnects (max concurrent %zu), %zu receptions and %zu sends",
  290. worker_private->connected,
  291. worker_private->disconnected,
  292. worker_private->max_concurrent,
  293. worker_private->receptions,
  294. worker_private->sends
  295. );
  296. worker_private->running = 0;
  297. }
  298. void *socket_listen_main_static_threaded_worker(void *ptr) {
  299. worker_private = (struct web_server_static_threaded_worker *)ptr;
  300. worker_private->running = 1;
  301. netdata_thread_cleanup_push(socket_listen_main_static_threaded_worker_cleanup, ptr);
  302. poll_events(&api_sockets
  303. , web_server_add_callback
  304. , web_server_del_callback
  305. , web_server_rcv_callback
  306. , web_server_snd_callback
  307. , web_server_tmr_callback
  308. , web_allow_connections_from
  309. , web_allow_connections_dns
  310. , NULL
  311. , web_client_first_request_timeout
  312. , web_client_timeout
  313. , default_rrd_update_every * 1000 // timer_milliseconds
  314. , ptr // timer_data
  315. , worker_private->max_sockets
  316. );
  317. netdata_thread_cleanup_pop(1);
  318. return NULL;
  319. }
  320. // ----------------------------------------------------------------------------
  321. // web server main thread - also becomes a worker
  322. static void socket_listen_main_static_threaded_cleanup(void *ptr) {
  323. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  324. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  325. int i, found = 0;
  326. usec_t max = 2 * USEC_PER_SEC, step = 50000;
  327. // we start from 1, - 0 is self
  328. for(i = 1; i < static_threaded_workers_count; i++) {
  329. if(static_workers_private_data[i].running) {
  330. found++;
  331. info("stopping worker %d", i + 1);
  332. netdata_thread_cancel(static_workers_private_data[i].thread);
  333. }
  334. else
  335. info("found stopped worker %d", i + 1);
  336. }
  337. while(found && max > 0) {
  338. max -= step;
  339. info("Waiting %d static web threads to finish...", found);
  340. sleep_usec(step);
  341. found = 0;
  342. // we start from 1, - 0 is self
  343. for(i = 1; i < static_threaded_workers_count; i++) {
  344. if (static_workers_private_data[i].running)
  345. found++;
  346. }
  347. }
  348. if(found)
  349. error("%d static web threads are taking too long to finish. Giving up.", found);
  350. info("closing all web server sockets...");
  351. listen_sockets_close(&api_sockets);
  352. info("all static web threads stopped.");
  353. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  354. }
  355. void *socket_listen_main_static_threaded(void *ptr) {
  356. netdata_thread_cleanup_push(socket_listen_main_static_threaded_cleanup, ptr);
  357. web_server_mode = WEB_SERVER_MODE_STATIC_THREADED;
  358. if(!api_sockets.opened)
  359. fatal("LISTENER: no listen sockets available.");
  360. #ifdef ENABLE_HTTPS
  361. security_start_ssl(NETDATA_SSL_CONTEXT_SERVER);
  362. #endif
  363. // 6 threads is the optimal value
  364. // since 6 are the parallel connections browsers will do
  365. // so, if the machine has more CPUs, avoid using resources unnecessarily
  366. int def_thread_count = (processors > 6) ? 6 : processors;
  367. if (!strcmp(config_get(CONFIG_SECTION_WEB, "mode", ""),"single-threaded")) {
  368. info("Running web server with one thread, because mode is single-threaded");
  369. config_set(CONFIG_SECTION_WEB, "mode", "static-threaded");
  370. def_thread_count = 1;
  371. }
  372. static_threaded_workers_count = config_get_number(CONFIG_SECTION_WEB, "web server threads", def_thread_count);
  373. if (static_threaded_workers_count < 1) static_threaded_workers_count = 1;
  374. #ifdef ENABLE_HTTPS
  375. // See https://github.com/netdata/netdata/issues/11081#issuecomment-831998240 for more details
  376. if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) {
  377. static_threaded_workers_count = 1;
  378. info("You are running an OpenSSL older than 1.1.0, web server will not enable multithreading.");
  379. }
  380. #endif
  381. size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_WEB, "web server max sockets",
  382. (long long int)(rlimit_nofile.rlim_cur / 4));
  383. static_workers_private_data = callocz((size_t)static_threaded_workers_count,
  384. sizeof(struct web_server_static_threaded_worker));
  385. web_server_is_multithreaded = (static_threaded_workers_count > 1);
  386. int i;
  387. for (i = 1; i < static_threaded_workers_count; i++) {
  388. static_workers_private_data[i].id = i;
  389. static_workers_private_data[i].max_sockets = max_sockets / static_threaded_workers_count;
  390. char tag[50 + 1];
  391. snprintfz(tag, 50, "WEB_SERVER[static%d]", i+1);
  392. info("starting worker %d", i+1);
  393. netdata_thread_create(&static_workers_private_data[i].thread, tag, NETDATA_THREAD_OPTION_DEFAULT,
  394. socket_listen_main_static_threaded_worker, (void *)&static_workers_private_data[i]);
  395. }
  396. // and the main one
  397. static_workers_private_data[0].max_sockets = max_sockets / static_threaded_workers_count;
  398. socket_listen_main_static_threaded_worker((void *)&static_workers_private_data[0]);
  399. netdata_thread_cleanup_pop(1);
  400. return NULL;
  401. }