static-threaded.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define WEB_SERVER_INTERNALS 1
  3. #include "static-threaded.h"
  4. int web_client_timeout = DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS;
  5. int web_client_first_request_timeout = DEFAULT_TIMEOUT_TO_RECEIVE_FIRST_WEB_REQUEST;
  6. long web_client_streaming_rate_t = 0L;
  7. #define WORKER_JOB_ADD_CONNECTION 0
  8. #define WORKER_JOB_DEL_COLLECTION 1
  9. #define WORKER_JOB_ADD_FILE 2
  10. #define WORKER_JOB_DEL_FILE 3
  11. #define WORKER_JOB_READ_FILE 4
  12. #define WORKER_JOB_WRITE_FILE 5
  13. #define WORKER_JOB_RCV_DATA 6
  14. #define WORKER_JOB_SND_DATA 7
  15. #define WORKER_JOB_PROCESS 8
  16. #if (WORKER_UTILIZATION_MAX_JOB_TYPES < 9)
  17. #error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least 8
  18. #endif
  19. /*
  20. * --------------------------------------------------------------------------------------------------------------------
  21. * Build web_client state from the pollinfo that describes an accepted connection.
  22. */
  23. static struct web_client *web_client_create_on_fd(POLLINFO *pi) {
  24. struct web_client *w;
  25. w = web_client_get_from_cache();
  26. w->ifd = w->ofd = pi->fd;
  27. strncpyz(w->client_ip, pi->client_ip, sizeof(w->client_ip) - 1);
  28. strncpyz(w->client_port, pi->client_port, sizeof(w->client_port) - 1);
  29. strncpyz(w->client_host, pi->client_host, sizeof(w->client_host) - 1);
  30. if(unlikely(!*w->client_ip)) strcpy(w->client_ip, "-");
  31. if(unlikely(!*w->client_port)) strcpy(w->client_port, "-");
  32. w->port_acl = pi->port_acl;
  33. int flag = 1;
  34. if(unlikely(web_client_check_tcp(w) && setsockopt(w->ifd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0))
  35. debug(D_WEB_CLIENT, "%llu: failed to enable TCP_NODELAY on socket fd %d.", w->id, w->ifd);
  36. flag = 1;
  37. if(unlikely(setsockopt(w->ifd, SOL_SOCKET, SO_KEEPALIVE, (char *) &flag, sizeof(int)) != 0))
  38. debug(D_WEB_CLIENT, "%llu: failed to enable SO_KEEPALIVE on socket fd %d.", w->id, w->ifd);
  39. web_client_update_acl_matches(w);
  40. web_client_enable_wait_receive(w);
  41. web_server_log_connection(w, "CONNECTED");
  42. w->pollinfo_slot = pi->slot;
  43. return(w);
  44. }
  45. // --------------------------------------------------------------------------------------
  46. // the main socket listener - STATIC-THREADED
  47. struct web_server_static_threaded_worker {
  48. netdata_thread_t thread;
  49. int id;
  50. int running;
  51. size_t max_sockets;
  52. volatile size_t connected;
  53. volatile size_t disconnected;
  54. volatile size_t receptions;
  55. volatile size_t sends;
  56. volatile size_t max_concurrent;
  57. volatile size_t files_read;
  58. volatile size_t file_reads;
  59. };
  60. static long long static_threaded_workers_count = 1;
  61. static struct web_server_static_threaded_worker *static_workers_private_data = NULL;
  62. static __thread struct web_server_static_threaded_worker *worker_private = NULL;
  63. // ----------------------------------------------------------------------------
  64. static inline int web_server_check_client_status(struct web_client *w) {
  65. if(unlikely(web_client_check_dead(w) || (!web_client_has_wait_receive(w) && !web_client_has_wait_send(w))))
  66. return -1;
  67. return 0;
  68. }
  69. // ----------------------------------------------------------------------------
  70. // web server files
  71. static void *web_server_file_add_callback(POLLINFO *pi, short int *events, void *data) {
  72. struct web_client *w = (struct web_client *)data;
  73. worker_is_busy(WORKER_JOB_ADD_FILE);
  74. worker_private->files_read++;
  75. debug(D_WEB_CLIENT, "%llu: ADDED FILE READ ON FD %d", w->id, pi->fd);
  76. *events = POLLIN;
  77. pi->data = w;
  78. worker_is_idle();
  79. return w;
  80. }
  81. static void web_server_file_del_callback(POLLINFO *pi) {
  82. struct web_client *w = (struct web_client *)pi->data;
  83. debug(D_WEB_CLIENT, "%llu: RELEASE FILE READ ON FD %d", w->id, pi->fd);
  84. worker_is_busy(WORKER_JOB_DEL_FILE);
  85. w->pollinfo_filecopy_slot = 0;
  86. if(unlikely(!w->pollinfo_slot)) {
  87. debug(D_WEB_CLIENT, "%llu: CROSS WEB CLIENT CLEANUP (iFD %d, oFD %d)", w->id, pi->fd, w->ofd);
  88. web_server_log_connection(w, "DISCONNECTED");
  89. web_client_request_done(w);
  90. web_client_release_to_cache(w);
  91. global_statistics_web_client_disconnected();
  92. }
  93. worker_is_idle();
  94. }
  95. static int web_server_file_read_callback(POLLINFO *pi, short int *events) {
  96. int retval = -1;
  97. struct web_client *w = (struct web_client *)pi->data;
  98. worker_is_busy(WORKER_JOB_READ_FILE);
  99. // if there is no POLLINFO linked to this, it means the client disconnected
  100. // stop the file reading too
  101. if(unlikely(!w->pollinfo_slot)) {
  102. debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON CLOSED WEB CLIENT", w->id, pi->fd);
  103. retval = -1;
  104. goto cleanup;
  105. }
  106. if(unlikely(w->mode != WEB_CLIENT_MODE_FILECOPY || w->ifd == w->ofd)) {
  107. debug(D_WEB_CLIENT, "%llu: PREVENTED ATTEMPT TO READ FILE ON FD %d, ON NON-FILECOPY WEB CLIENT", w->id, pi->fd);
  108. retval = -1;
  109. goto cleanup;
  110. }
  111. debug(D_WEB_CLIENT, "%llu: READING FILE ON FD %d", w->id, pi->fd);
  112. worker_private->file_reads++;
  113. ssize_t ret = unlikely(web_client_read_file(w));
  114. if(likely(web_client_has_wait_send(w))) {
  115. POLLJOB *p = pi->p; // our POLLJOB
  116. POLLINFO *wpi = pollinfo_from_slot(p, w->pollinfo_slot); // POLLINFO of the client socket
  117. debug(D_WEB_CLIENT, "%llu: SIGNALING W TO SEND (iFD %d, oFD %d)", w->id, pi->fd, wpi->fd);
  118. p->fds[wpi->slot].events |= POLLOUT;
  119. }
  120. if(unlikely(ret <= 0 || w->ifd == w->ofd)) {
  121. debug(D_WEB_CLIENT, "%llu: DONE READING FILE ON FD %d", w->id, pi->fd);
  122. retval = -1;
  123. goto cleanup;
  124. }
  125. *events = POLLIN;
  126. retval = 0;
  127. cleanup:
  128. worker_is_idle();
  129. return retval;
  130. }
  131. static int web_server_file_write_callback(POLLINFO *pi, short int *events) {
  132. (void)pi;
  133. (void)events;
  134. worker_is_busy(WORKER_JOB_WRITE_FILE);
  135. error("Writing to web files is not supported!");
  136. worker_is_idle();
  137. return -1;
  138. }
  139. // ----------------------------------------------------------------------------
  140. // web server clients
  141. static void *web_server_add_callback(POLLINFO *pi, short int *events, void *data) {
  142. (void)data; // Suppress warning on unused argument
  143. worker_is_busy(WORKER_JOB_ADD_CONNECTION);
  144. worker_private->connected++;
  145. size_t concurrent = worker_private->connected - worker_private->disconnected;
  146. if(unlikely(concurrent > worker_private->max_concurrent))
  147. worker_private->max_concurrent = concurrent;
  148. *events = POLLIN;
  149. debug(D_WEB_CLIENT_ACCESS, "LISTENER on %d: new connection.", pi->fd);
  150. struct web_client *w = web_client_create_on_fd(pi);
  151. if (!strncmp(pi->client_port, "UNIX", 4)) {
  152. web_client_set_unix(w);
  153. } else {
  154. web_client_set_tcp(w);
  155. }
  156. #ifdef ENABLE_HTTPS
  157. if ((!web_client_check_unix(w)) && (netdata_ssl_srv_ctx)) {
  158. if( sock_delnonblock(w->ifd) < 0 ){
  159. error("Web server cannot remove the non-blocking flag from socket %d",w->ifd);
  160. }
  161. //Read the first 7 bytes from the message, but the message
  162. //is not removed from the queue, because we are using MSG_PEEK
  163. char test[8];
  164. if ( recv(w->ifd,test, 7,MSG_PEEK) == 7 ) {
  165. test[7] = 0x00;
  166. }
  167. else {
  168. //Case I do not have success to read 7 bytes,
  169. //this means that the mensage was not completely read, so
  170. //I cannot identify it yet.
  171. sock_setnonblock(w->ifd);
  172. goto cleanup;
  173. }
  174. //The next two ifs are not together because I am reusing SSL structure
  175. if (!w->ssl.conn)
  176. {
  177. w->ssl.conn = SSL_new(netdata_ssl_srv_ctx);
  178. if ( w->ssl.conn ) {
  179. SSL_set_accept_state(w->ssl.conn);
  180. } else {
  181. error("Failed to create SSL context on socket fd %d.", w->ifd);
  182. if (test[0] < 0x18){
  183. WEB_CLIENT_IS_DEAD(w);
  184. sock_setnonblock(w->ifd);
  185. goto cleanup;
  186. }
  187. }
  188. }
  189. if (w->ssl.conn) {
  190. if (SSL_set_fd(w->ssl.conn, w->ifd) != 1) {
  191. error("Failed to set the socket to the SSL on socket fd %d.", w->ifd);
  192. //The client is not set dead, because I received a normal HTTP request
  193. //instead a Client Hello(HTTPS).
  194. if ( test[0] < 0x18 ){
  195. WEB_CLIENT_IS_DEAD(w);
  196. }
  197. }
  198. else{
  199. w->ssl.flags = security_process_accept(w->ssl.conn, (int)test[0]);
  200. }
  201. }
  202. sock_setnonblock(w->ifd);
  203. } else{
  204. w->ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  205. }
  206. #endif
  207. debug(D_WEB_CLIENT, "%llu: ADDED CLIENT FD %d", w->id, pi->fd);
  208. cleanup:
  209. worker_is_idle();
  210. return w;
  211. }
  212. // TCP client disconnected
  213. static void web_server_del_callback(POLLINFO *pi) {
  214. worker_is_busy(WORKER_JOB_DEL_COLLECTION);
  215. worker_private->disconnected++;
  216. struct web_client *w = (struct web_client *)pi->data;
  217. w->pollinfo_slot = 0;
  218. if(unlikely(w->pollinfo_filecopy_slot)) {
  219. POLLINFO *fpi = pollinfo_from_slot(pi->p, w->pollinfo_filecopy_slot); // POLLINFO of the client socket
  220. (void)fpi;
  221. debug(D_WEB_CLIENT, "%llu: THE CLIENT WILL BE FRED BY READING FILE JOB ON FD %d", w->id, fpi->fd);
  222. }
  223. else {
  224. if(web_client_flag_check(w, WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET))
  225. pi->flags |= POLLINFO_FLAG_DONT_CLOSE;
  226. debug(D_WEB_CLIENT, "%llu: CLOSING CLIENT FD %d", w->id, pi->fd);
  227. web_server_log_connection(w, "DISCONNECTED");
  228. web_client_request_done(w);
  229. web_client_release_to_cache(w);
  230. global_statistics_web_client_disconnected();
  231. }
  232. worker_is_idle();
  233. }
  234. static int web_server_rcv_callback(POLLINFO *pi, short int *events) {
  235. int ret = -1;
  236. worker_is_busy(WORKER_JOB_RCV_DATA);
  237. worker_private->receptions++;
  238. struct web_client *w = (struct web_client *)pi->data;
  239. int fd = pi->fd;
  240. ssize_t bytes;
  241. bytes = web_client_receive(w);
  242. if (likely(bytes > 0)) {
  243. debug(D_WEB_CLIENT, "%llu: processing received data on fd %d.", w->id, fd);
  244. worker_is_idle();
  245. worker_is_busy(WORKER_JOB_PROCESS);
  246. web_client_process_request(w);
  247. if (unlikely(w->mode == WEB_CLIENT_MODE_STREAM)) {
  248. web_client_send(w);
  249. }
  250. else if(unlikely(w->mode == WEB_CLIENT_MODE_FILECOPY)) {
  251. if(w->pollinfo_filecopy_slot == 0) {
  252. debug(D_WEB_CLIENT, "%llu: FILECOPY DETECTED ON FD %d", w->id, pi->fd);
  253. if (unlikely(w->ifd != -1 && w->ifd != w->ofd && w->ifd != fd)) {
  254. // add a new socket to poll_events, with the same
  255. debug(D_WEB_CLIENT, "%llu: CREATING FILECOPY SLOT ON FD %d", w->id, pi->fd);
  256. POLLINFO *fpi = poll_add_fd(
  257. pi->p
  258. , w->ifd
  259. , pi->port_acl
  260. , 0
  261. , POLLINFO_FLAG_CLIENT_SOCKET
  262. , "FILENAME"
  263. , ""
  264. , ""
  265. , web_server_file_add_callback
  266. , web_server_file_del_callback
  267. , web_server_file_read_callback
  268. , web_server_file_write_callback
  269. , (void *) w
  270. );
  271. if(fpi)
  272. w->pollinfo_filecopy_slot = fpi->slot;
  273. else {
  274. error("Failed to add filecopy fd. Closing client.");
  275. ret = -1;
  276. goto cleanup;
  277. }
  278. }
  279. }
  280. }
  281. else {
  282. if(unlikely(w->ifd == fd && web_client_has_wait_receive(w)))
  283. *events |= POLLIN;
  284. }
  285. if(unlikely(w->ofd == fd && web_client_has_wait_send(w)))
  286. *events |= POLLOUT;
  287. } else if(unlikely(bytes < 0)) {
  288. ret = -1;
  289. goto cleanup;
  290. } else if (unlikely(bytes == 0)) {
  291. if(unlikely(w->ifd == fd && web_client_has_ssl_wait_receive(w)))
  292. *events |= POLLIN;
  293. if(unlikely(w->ofd == fd && web_client_has_ssl_wait_send(w)))
  294. *events |= POLLOUT;
  295. }
  296. ret = web_server_check_client_status(w);
  297. cleanup:
  298. worker_is_idle();
  299. return ret;
  300. }
  301. static int web_server_snd_callback(POLLINFO *pi, short int *events) {
  302. int retval = -1;
  303. worker_is_busy(WORKER_JOB_SND_DATA);
  304. worker_private->sends++;
  305. struct web_client *w = (struct web_client *)pi->data;
  306. int fd = pi->fd;
  307. debug(D_WEB_CLIENT, "%llu: sending data on fd %d.", w->id, fd);
  308. int ret = web_client_send(w);
  309. if(unlikely(ret < 0)) {
  310. retval = -1;
  311. goto cleanup;
  312. }
  313. if(unlikely(w->ifd == fd && web_client_has_wait_receive(w)))
  314. *events |= POLLIN;
  315. if(unlikely(w->ofd == fd && web_client_has_wait_send(w)))
  316. *events |= POLLOUT;
  317. retval = web_server_check_client_status(w);
  318. cleanup:
  319. worker_is_idle();
  320. return retval;
  321. }
  322. // ----------------------------------------------------------------------------
  323. // web server worker thread
  324. static void socket_listen_main_static_threaded_worker_cleanup(void *ptr) {
  325. worker_private = (struct web_server_static_threaded_worker *)ptr;
  326. info("stopped after %zu connects, %zu disconnects (max concurrent %zu), %zu receptions and %zu sends",
  327. worker_private->connected,
  328. worker_private->disconnected,
  329. worker_private->max_concurrent,
  330. worker_private->receptions,
  331. worker_private->sends
  332. );
  333. worker_private->running = 0;
  334. worker_unregister();
  335. }
  336. static bool web_server_should_stop(void) {
  337. return !service_running(SERVICE_WEB_SERVER);
  338. }
  339. void *socket_listen_main_static_threaded_worker(void *ptr) {
  340. worker_private = (struct web_server_static_threaded_worker *)ptr;
  341. worker_private->running = 1;
  342. worker_register("WEB");
  343. worker_register_job_name(WORKER_JOB_ADD_CONNECTION, "connect");
  344. worker_register_job_name(WORKER_JOB_DEL_COLLECTION, "disconnect");
  345. worker_register_job_name(WORKER_JOB_ADD_FILE, "file start");
  346. worker_register_job_name(WORKER_JOB_DEL_FILE, "file end");
  347. worker_register_job_name(WORKER_JOB_READ_FILE, "file read");
  348. worker_register_job_name(WORKER_JOB_WRITE_FILE, "file write");
  349. worker_register_job_name(WORKER_JOB_RCV_DATA, "receive");
  350. worker_register_job_name(WORKER_JOB_SND_DATA, "send");
  351. worker_register_job_name(WORKER_JOB_PROCESS, "process");
  352. netdata_thread_cleanup_push(socket_listen_main_static_threaded_worker_cleanup, ptr);
  353. poll_events(&api_sockets
  354. , web_server_add_callback
  355. , web_server_del_callback
  356. , web_server_rcv_callback
  357. , web_server_snd_callback
  358. , NULL
  359. , web_server_should_stop
  360. , web_allow_connections_from
  361. , web_allow_connections_dns
  362. , NULL
  363. , web_client_first_request_timeout
  364. , web_client_timeout
  365. , default_rrd_update_every * 1000 // timer_milliseconds
  366. , ptr // timer_data
  367. , worker_private->max_sockets
  368. );
  369. netdata_thread_cleanup_pop(1);
  370. return NULL;
  371. }
  372. // ----------------------------------------------------------------------------
  373. // web server main thread - also becomes a worker
  374. static void socket_listen_main_static_threaded_cleanup(void *ptr) {
  375. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  376. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  377. // int i, found = 0;
  378. // usec_t max = 2 * USEC_PER_SEC, step = 50000;
  379. //
  380. // // we start from 1, - 0 is self
  381. // for(i = 1; i < static_threaded_workers_count; i++) {
  382. // if(static_workers_private_data[i].running) {
  383. // found++;
  384. // info("stopping worker %d", i + 1);
  385. // netdata_thread_cancel(static_workers_private_data[i].thread);
  386. // }
  387. // else
  388. // info("found stopped worker %d", i + 1);
  389. // }
  390. //
  391. // while(found && max > 0) {
  392. // max -= step;
  393. // info("Waiting %d static web threads to finish...", found);
  394. // sleep_usec(step);
  395. // found = 0;
  396. //
  397. // // we start from 1, - 0 is self
  398. // for(i = 1; i < static_threaded_workers_count; i++) {
  399. // if (static_workers_private_data[i].running)
  400. // found++;
  401. // }
  402. // }
  403. //
  404. // if(found)
  405. // error("%d static web threads are taking too long to finish. Giving up.", found);
  406. info("closing all web server sockets...");
  407. listen_sockets_close(&api_sockets);
  408. info("all static web threads stopped.");
  409. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  410. }
  411. void *socket_listen_main_static_threaded(void *ptr) {
  412. netdata_thread_cleanup_push(socket_listen_main_static_threaded_cleanup, ptr);
  413. web_server_mode = WEB_SERVER_MODE_STATIC_THREADED;
  414. if(!api_sockets.opened)
  415. fatal("LISTENER: no listen sockets available.");
  416. #ifdef ENABLE_HTTPS
  417. security_start_ssl(NETDATA_SSL_CONTEXT_SERVER);
  418. #endif
  419. // 6 threads is the optimal value
  420. // since 6 are the parallel connections browsers will do
  421. // so, if the machine has more CPUs, avoid using resources unnecessarily
  422. int def_thread_count = MIN(get_netdata_cpus(), 6);
  423. if (!strcmp(config_get(CONFIG_SECTION_WEB, "mode", ""),"single-threaded")) {
  424. info("Running web server with one thread, because mode is single-threaded");
  425. config_set(CONFIG_SECTION_WEB, "mode", "static-threaded");
  426. def_thread_count = 1;
  427. }
  428. static_threaded_workers_count = config_get_number(CONFIG_SECTION_WEB, "web server threads", def_thread_count);
  429. if (static_threaded_workers_count < 1) static_threaded_workers_count = 1;
  430. #ifdef ENABLE_HTTPS
  431. // See https://github.com/netdata/netdata/issues/11081#issuecomment-831998240 for more details
  432. if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) {
  433. static_threaded_workers_count = 1;
  434. info("You are running an OpenSSL older than 1.1.0, web server will not enable multithreading.");
  435. }
  436. #endif
  437. size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_WEB, "web server max sockets",
  438. (long long int)(rlimit_nofile.rlim_cur / 4));
  439. static_workers_private_data = callocz((size_t)static_threaded_workers_count,
  440. sizeof(struct web_server_static_threaded_worker));
  441. web_server_is_multithreaded = (static_threaded_workers_count > 1);
  442. int i;
  443. for (i = 1; i < static_threaded_workers_count; i++) {
  444. static_workers_private_data[i].id = i;
  445. static_workers_private_data[i].max_sockets = max_sockets / static_threaded_workers_count;
  446. char tag[50 + 1];
  447. snprintfz(tag, 50, "WEB[%d]", i+1);
  448. info("starting worker %d", i+1);
  449. netdata_thread_create(&static_workers_private_data[i].thread, tag, NETDATA_THREAD_OPTION_DEFAULT,
  450. socket_listen_main_static_threaded_worker, (void *)&static_workers_private_data[i]);
  451. }
  452. // and the main one
  453. static_workers_private_data[0].max_sockets = max_sockets / static_threaded_workers_count;
  454. socket_listen_main_static_threaded_worker((void *)&static_workers_private_data[0]);
  455. netdata_thread_cleanup_pop(1);
  456. return NULL;
  457. }