streaming.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "streaming.h"
  3. #include "connlist.h"
  4. #include "h2o_utils.h"
  5. #include "streaming/common.h"
  6. static int pending_write_reqs = 0;
  7. #define H2O2STREAM_BUF_SIZE (1024 * 1024)
  8. // h2o_stream_conn_t related functions
  9. void h2o_stream_conn_t_init(h2o_stream_conn_t *conn)
  10. {
  11. memset(conn, 0, sizeof(*conn));
  12. conn->rx = rbuf_create(H2O2STREAM_BUF_SIZE);
  13. conn->tx = rbuf_create(H2O2STREAM_BUF_SIZE);
  14. pthread_mutex_init(&conn->rx_buf_lock, NULL);
  15. pthread_mutex_init(&conn->tx_buf_lock, NULL);
  16. pthread_cond_init(&conn->rx_buf_cond, NULL);
  17. // no need to check for NULL as rbuf_create uses mallocz internally
  18. }
  19. void h2o_stream_conn_t_destroy(h2o_stream_conn_t *conn)
  20. {
  21. rbuf_free(conn->rx);
  22. rbuf_free(conn->tx);
  23. freez(conn->url);
  24. freez(conn->user_agent);
  25. pthread_mutex_destroy(&conn->rx_buf_lock);
  26. pthread_mutex_destroy(&conn->tx_buf_lock);
  27. pthread_cond_destroy(&conn->rx_buf_cond);
  28. }
  29. // streaming upgrade related functions
  30. int is_streaming_handshake(h2o_req_t *req)
  31. {
  32. /* method */
  33. if (!h2o_memis(req->input.method.base, req->input.method.len, H2O_STRLIT("GET")))
  34. return 1;
  35. if (!h2o_memis(req->path_normalized.base, req->path_normalized.len, H2O_STRLIT(NETDATA_STREAM_URL))) {
  36. return 1;
  37. }
  38. /* upgrade header */
  39. if (req->upgrade.base == NULL || !h2o_lcstris(req->upgrade.base, req->upgrade.len, H2O_STRLIT(NETDATA_STREAM_PROTO_NAME)))
  40. return 1;
  41. // TODO consider adding some key in form of random number
  42. // to prevent caching on route especially if TLS is not used
  43. // e.g. client sends random number
  44. // server replies with it xored
  45. return 0;
  46. }
  47. static void stream_on_close(h2o_stream_conn_t *conn);
  48. void stream_process(h2o_stream_conn_t *conn, int initial);
  49. void stream_on_complete(void *user_data, h2o_socket_t *sock, size_t reqsize)
  50. {
  51. h2o_stream_conn_t *conn = user_data;
  52. /* close the connection on error */
  53. if (sock == NULL) {
  54. stream_on_close(conn);
  55. return;
  56. }
  57. conn->sock = sock;
  58. sock->data = conn;
  59. conn_list_insert(&conn_list, conn);
  60. h2o_buffer_consume(&sock->input, reqsize);
  61. stream_process(conn, 1);
  62. }
  63. // handling of active streams
  64. static void stream_on_close(h2o_stream_conn_t *conn)
  65. {
  66. if (conn->sock != NULL)
  67. h2o_socket_close(conn->sock);
  68. conn_list_remove_conn(&conn_list, conn);
  69. pthread_mutex_lock(&conn->rx_buf_lock);
  70. conn->shutdown = 1;
  71. pthread_cond_broadcast(&conn->rx_buf_cond);
  72. pthread_mutex_unlock(&conn->rx_buf_lock);
  73. h2o_stream_conn_t_destroy(conn);
  74. freez(conn);
  75. }
  76. static void on_write_complete(h2o_socket_t *sock, const char *err)
  77. {
  78. h2o_stream_conn_t *conn = sock->data;
  79. if (err != NULL) {
  80. stream_on_close(conn);
  81. error_report("Streaming connection error \"%s\"", err);
  82. return;
  83. }
  84. pthread_mutex_lock(&conn->tx_buf_lock);
  85. rbuf_bump_tail(conn->tx, conn->tx_buf.len);
  86. conn->tx_buf.base = NULL;
  87. conn->tx_buf.len = 0;
  88. pthread_mutex_unlock(&conn->tx_buf_lock);
  89. stream_process(conn, 0);
  90. }
  91. static void stream_on_recv(h2o_socket_t *sock, const char *err)
  92. {
  93. h2o_stream_conn_t *conn = sock->data;
  94. if (err != NULL) {
  95. stream_on_close(conn);
  96. error_report("Streaming connection error \"%s\"", err);
  97. return;
  98. }
  99. stream_process(conn, 0);
  100. }
  101. #define PARSE_DONE 1
  102. #define PARSE_ERROR -1
  103. #define GIMME_MORE_OF_DEM_SWEET_BYTEZ 0
  104. #define STREAM_METHOD "STREAM "
  105. #define USER_AGENT "User-Agent: "
  106. #define NEED_MIN_BYTES(buf, bytes) \
  107. if (rbuf_bytes_available(buf) < bytes) \
  108. return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
  109. // TODO check in streaming code this is probably defined somewhere already
  110. #define MAX_LEN_STREAM_HELLO (1024*2)
  111. static int process_STREAM_X_HTTP_1_1(http_stream_parse_state_t *parser_state, rbuf_t buf, char **url, char **user_agent)
  112. {
  113. int idx;
  114. switch(*parser_state) {
  115. case HTTP_STREAM:
  116. NEED_MIN_BYTES(buf, strlen(STREAM_METHOD));
  117. if (rbuf_memcmp_n(buf, H2O_STRLIT(STREAM_METHOD))) {
  118. error_report("Expected \"%s\"", STREAM_METHOD);
  119. return PARSE_ERROR;
  120. }
  121. rbuf_bump_tail(buf, strlen(STREAM_METHOD));
  122. *parser_state = HTTP_URL;
  123. /* FALLTHROUGH */
  124. case HTTP_URL:
  125. if (!rbuf_find_bytes(buf, " ", 1, &idx)) {
  126. if (rbuf_bytes_available(buf) >= MAX_LEN_STREAM_HELLO) {
  127. error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
  128. return PARSE_ERROR;
  129. }
  130. }
  131. *url = mallocz(idx + 1);
  132. rbuf_pop(buf, *url, idx);
  133. (*url)[idx] = 0;
  134. *parser_state = HTTP_PROTO;
  135. /* FALLTHROUGH */
  136. case HTTP_PROTO:
  137. NEED_MIN_BYTES(buf, strlen(HTTP_1_1));
  138. if (rbuf_memcmp_n(buf, H2O_STRLIT(HTTP_1_1))) {
  139. error_report("Expected \"%s\"", HTTP_1_1);
  140. return PARSE_ERROR;
  141. }
  142. rbuf_bump_tail(buf, strlen(HTTP_1_1));
  143. *parser_state = HTTP_USER_AGENT_KEY;
  144. /* FALLTHROUGH */
  145. case HTTP_USER_AGENT_KEY:
  146. // and OF COURSE EVERYTHING is passed in URL except
  147. // for user agent which we need and is passed as HTTP header
  148. // not worth writing a parser for this so we manually extract
  149. // just the single header we need and skip everything else
  150. if (!rbuf_find_bytes(buf, USER_AGENT, strlen(USER_AGENT), &idx)) {
  151. if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
  152. error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
  153. return PARSE_ERROR;
  154. }
  155. return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
  156. }
  157. rbuf_bump_tail(buf, idx + strlen(USER_AGENT));
  158. *parser_state = HTTP_USER_AGENT_VALUE;
  159. /* FALLTHROUGH */
  160. case HTTP_USER_AGENT_VALUE:
  161. if (!rbuf_find_bytes(buf, "\r\n", 2, &idx)) {
  162. if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
  163. error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
  164. return PARSE_ERROR;
  165. }
  166. return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
  167. }
  168. *user_agent = mallocz(idx + 1);
  169. rbuf_pop(buf, *user_agent, idx);
  170. (*user_agent)[idx] = 0;
  171. *parser_state = HTTP_HDR;
  172. /* FALLTHROUGH */
  173. case HTTP_HDR:
  174. if (!rbuf_find_bytes(buf, HTTP_HDR_END, strlen(HTTP_HDR_END), &idx)) {
  175. if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
  176. error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
  177. return PARSE_ERROR;
  178. }
  179. return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
  180. }
  181. rbuf_bump_tail(buf, idx + strlen(HTTP_HDR_END));
  182. *parser_state = HTTP_DONE;
  183. return PARSE_DONE;
  184. case HTTP_DONE:
  185. error_report("Parsing is done. No need to call again.");
  186. return PARSE_DONE;
  187. default:
  188. error_report("Unknown parser state %d", (int)*parser_state);
  189. return PARSE_ERROR;
  190. }
  191. }
  192. #define SINGLE_WRITE_MAX (1024)
  193. void stream_process(h2o_stream_conn_t *conn, int initial)
  194. {
  195. int rc;
  196. struct web_client w;
  197. pthread_mutex_lock(&conn->tx_buf_lock);
  198. if (h2o_socket_is_writing(conn->sock) || rbuf_bytes_available(conn->tx)) {
  199. if (rbuf_bytes_available(conn->tx) && !conn->tx_buf.base) {
  200. conn->tx_buf.base = rbuf_get_linear_read_range(conn->tx, &conn->tx_buf.len);
  201. if (conn->tx_buf.base) {
  202. conn->tx_buf.len = MIN(conn->tx_buf.len, SINGLE_WRITE_MAX);
  203. h2o_socket_write(conn->sock, &conn->tx_buf, 1, on_write_complete);
  204. }
  205. }
  206. }
  207. pthread_mutex_unlock(&conn->tx_buf_lock);
  208. if (initial)
  209. h2o_socket_read_start(conn->sock, stream_on_recv);
  210. if (conn->sock->input->size) {
  211. size_t insert_max;
  212. pthread_mutex_lock(&conn->rx_buf_lock);
  213. char *insert_loc = rbuf_get_linear_insert_range(conn->rx, &insert_max);
  214. if (insert_loc == NULL) {
  215. pthread_cond_broadcast(&conn->rx_buf_cond);
  216. pthread_mutex_unlock(&conn->rx_buf_lock);
  217. return;
  218. }
  219. insert_max = MIN(insert_max, conn->sock->input->size);
  220. memcpy(insert_loc, conn->sock->input->bytes, insert_max);
  221. rbuf_bump_head(conn->rx, insert_max);
  222. h2o_buffer_consume(&conn->sock->input, insert_max);
  223. pthread_cond_broadcast(&conn->rx_buf_cond);
  224. pthread_mutex_unlock(&conn->rx_buf_lock);
  225. }
  226. switch (conn->state) {
  227. case STREAM_X_HTTP_1_1:
  228. // no conn->rx lock here as at this point we are still single threaded
  229. // until we call rrdpush_receiver_thread_spawn() later down
  230. rc = process_STREAM_X_HTTP_1_1(&conn->parse_state, conn->rx, &conn->url, &conn->user_agent);
  231. if (rc == PARSE_ERROR) {
  232. error_report("error parsing the STREAM hello");
  233. break;
  234. }
  235. if (rc != PARSE_DONE)
  236. break;
  237. conn->state = STREAM_X_HTTP_1_1_DONE;
  238. /* FALLTHROUGH */
  239. case STREAM_X_HTTP_1_1_DONE:
  240. memset(&w, 0, sizeof(w));
  241. w.response.data = buffer_create(1024, NULL);
  242. // get client ip from the conn->sock
  243. struct sockaddr client;
  244. socklen_t len = h2o_socket_getpeername(conn->sock, &client);
  245. char peername[NI_MAXHOST];
  246. size_t peername_len = h2o_socket_getnumerichost(&client, len, peername);
  247. size_t cpy_len = sizeof(w.client_ip) < peername_len ? sizeof(w.client_ip) : peername_len;
  248. memcpy(w.client_ip, peername, cpy_len);
  249. w.client_ip[cpy_len - 1] = 0;
  250. w.user_agent = conn->user_agent;
  251. rc = rrdpush_receiver_thread_spawn(&w, conn->url, conn);
  252. if (rc != HTTP_RESP_OK) {
  253. error_report("HTTPD Failed to spawn the receiver thread %d", rc);
  254. conn->state = STREAM_CLOSE;
  255. stream_on_close(conn);
  256. } else {
  257. conn->state = STREAM_ACTIVE;
  258. }
  259. buffer_free(w.response.data);
  260. /* FALLTHROUGH */
  261. case STREAM_ACTIVE:
  262. break;
  263. default:
  264. error_report("Unknown conn->state");
  265. }
  266. }
  267. // read and write functions to be used by streaming parser
  268. int h2o_stream_write(void *ctx, const char *data, size_t data_len)
  269. {
  270. h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx;
  271. pthread_mutex_lock(&conn->tx_buf_lock);
  272. size_t avail = rbuf_bytes_free(conn->tx);
  273. avail = MIN(avail, data_len);
  274. rbuf_push(conn->tx, data, avail);
  275. pthread_mutex_unlock(&conn->tx_buf_lock);
  276. __atomic_add_fetch(&pending_write_reqs, 1, __ATOMIC_SEQ_CST);
  277. return avail;
  278. }
  279. size_t h2o_stream_read(void *ctx, char *buf, size_t read_bytes)
  280. {
  281. int ret;
  282. h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx;
  283. pthread_mutex_lock(&conn->rx_buf_lock);
  284. size_t avail = rbuf_bytes_available(conn->rx);
  285. if (!avail) {
  286. if (conn->shutdown) {
  287. pthread_mutex_unlock(&conn->rx_buf_lock);
  288. return -1;
  289. }
  290. pthread_cond_wait(&conn->rx_buf_cond, &conn->rx_buf_lock);
  291. if (conn->shutdown) {
  292. pthread_mutex_unlock(&conn->rx_buf_lock);
  293. return -1;
  294. }
  295. avail = rbuf_bytes_available(conn->rx);
  296. if (!avail) {
  297. pthread_mutex_unlock(&conn->rx_buf_lock);
  298. return 0;
  299. }
  300. }
  301. avail = MIN(avail, read_bytes);
  302. ret = rbuf_pop(conn->rx, buf, avail);
  303. pthread_mutex_unlock(&conn->rx_buf_lock);
  304. return ret;
  305. }
  306. // periodic check for pending write requests
  307. void check_tx_buf(h2o_stream_conn_t *conn)
  308. {
  309. pthread_mutex_lock(&conn->tx_buf_lock);
  310. if (rbuf_bytes_available(conn->tx)) {
  311. pthread_mutex_unlock(&conn->tx_buf_lock);
  312. stream_process(conn, 0);
  313. } else
  314. pthread_mutex_unlock(&conn->tx_buf_lock);
  315. }
  316. void h2o_stream_check_pending_write_reqs(void)
  317. {
  318. int _write_reqs = __atomic_exchange_n(&pending_write_reqs, 0, __ATOMIC_SEQ_CST);
  319. if (_write_reqs > 0)
  320. conn_list_iter_all(&conn_list, check_tx_buf);
  321. }