streaming.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "daemon/common.h"
  3. #include "streaming.h"
  4. #include "connlist.h"
  5. #include "h2o_utils.h"
  6. #include "streaming/common.h"
  7. static int pending_write_reqs = 0;
  8. #define H2O2STREAM_BUF_SIZE (1024 * 1024)
  9. // h2o_stream_conn_t related functions
  10. void h2o_stream_conn_t_init(h2o_stream_conn_t *conn)
  11. {
  12. memset(conn, 0, sizeof(*conn));
  13. conn->rx = rbuf_create(H2O2STREAM_BUF_SIZE);
  14. conn->tx = rbuf_create(H2O2STREAM_BUF_SIZE);
  15. pthread_mutex_init(&conn->rx_buf_lock, NULL);
  16. pthread_mutex_init(&conn->tx_buf_lock, NULL);
  17. pthread_cond_init(&conn->rx_buf_cond, NULL);
  18. // no need to check for NULL as rbuf_create uses mallocz internally
  19. }
  20. void h2o_stream_conn_t_destroy(h2o_stream_conn_t *conn)
  21. {
  22. rbuf_free(conn->rx);
  23. rbuf_free(conn->tx);
  24. freez(conn->url);
  25. freez(conn->user_agent);
  26. pthread_mutex_destroy(&conn->rx_buf_lock);
  27. pthread_mutex_destroy(&conn->tx_buf_lock);
  28. pthread_cond_destroy(&conn->rx_buf_cond);
  29. }
  30. // streaming upgrade related functions
  31. int is_streaming_handshake(h2o_req_t *req)
  32. {
  33. /* method */
  34. if (!h2o_memis(req->input.method.base, req->input.method.len, H2O_STRLIT("GET")))
  35. return 1;
  36. if (!h2o_memis(req->path_normalized.base, req->path_normalized.len, H2O_STRLIT(NETDATA_STREAM_URL))) {
  37. return 1;
  38. }
  39. /* upgrade header */
  40. if (req->upgrade.base == NULL || !h2o_lcstris(req->upgrade.base, req->upgrade.len, H2O_STRLIT(NETDATA_STREAM_PROTO_NAME)))
  41. return 1;
  42. // TODO consider adding some key in form of random number
  43. // to prevent caching on route especially if TLS is not used
  44. // e.g. client sends random number
  45. // server replies with it xored
  46. return 0;
  47. }
  48. static void stream_on_close(h2o_stream_conn_t *conn);
  49. void stream_process(h2o_stream_conn_t *conn, int initial);
  50. void stream_on_complete(void *user_data, h2o_socket_t *sock, size_t reqsize)
  51. {
  52. h2o_stream_conn_t *conn = user_data;
  53. /* close the connection on error */
  54. if (sock == NULL) {
  55. stream_on_close(conn);
  56. return;
  57. }
  58. conn->sock = sock;
  59. sock->data = conn;
  60. conn_list_insert(&conn_list, conn);
  61. h2o_buffer_consume(&sock->input, reqsize);
  62. stream_process(conn, 1);
  63. }
  64. // handling of active streams
  65. static void stream_on_close(h2o_stream_conn_t *conn)
  66. {
  67. if (conn->sock != NULL)
  68. h2o_socket_close(conn->sock);
  69. conn_list_remove_conn(&conn_list, conn);
  70. pthread_mutex_lock(&conn->rx_buf_lock);
  71. conn->shutdown = 1;
  72. pthread_cond_broadcast(&conn->rx_buf_cond);
  73. pthread_mutex_unlock(&conn->rx_buf_lock);
  74. h2o_stream_conn_t_destroy(conn);
  75. freez(conn);
  76. }
  77. static void on_write_complete(h2o_socket_t *sock, const char *err)
  78. {
  79. h2o_stream_conn_t *conn = sock->data;
  80. if (err != NULL) {
  81. stream_on_close(conn);
  82. error_report("Streaming connection error \"%s\"", err);
  83. return;
  84. }
  85. pthread_mutex_lock(&conn->tx_buf_lock);
  86. rbuf_bump_tail(conn->tx, conn->tx_buf.len);
  87. conn->tx_buf.base = NULL;
  88. conn->tx_buf.len = 0;
  89. pthread_mutex_unlock(&conn->tx_buf_lock);
  90. stream_process(conn, 0);
  91. }
  92. static void stream_on_recv(h2o_socket_t *sock, const char *err)
  93. {
  94. h2o_stream_conn_t *conn = sock->data;
  95. if (err != NULL) {
  96. stream_on_close(conn);
  97. error_report("Streaming connection error \"%s\"", err);
  98. return;
  99. }
  100. stream_process(conn, 0);
  101. }
  102. #define PARSE_DONE 1
  103. #define PARSE_ERROR -1
  104. #define GIMME_MORE_OF_DEM_SWEET_BYTEZ 0
  105. #define STREAM_METHOD "STREAM "
  106. #define USER_AGENT "User-Agent: "
  107. #define NEED_MIN_BYTES(buf, bytes) do { \
  108. if(rbuf_bytes_available(buf) < bytes) \
  109. return GIMME_MORE_OF_DEM_SWEET_BYTEZ;\
  110. } while(0)
  111. // TODO check in streaming code this is probably defined somewhere already
  112. #define MAX_LEN_STREAM_HELLO (1024*2)
  113. static int process_STREAM_X_HTTP_1_1(http_stream_parse_state_t *parser_state, rbuf_t buf, char **url, char **user_agent)
  114. {
  115. int idx;
  116. switch(*parser_state) {
  117. case HTTP_STREAM:
  118. NEED_MIN_BYTES(buf, strlen(STREAM_METHOD));
  119. if (rbuf_memcmp_n(buf, H2O_STRLIT(STREAM_METHOD))) {
  120. error_report("Expected \"%s\"", STREAM_METHOD);
  121. return PARSE_ERROR;
  122. }
  123. rbuf_bump_tail(buf, strlen(STREAM_METHOD));
  124. *parser_state = HTTP_URL;
  125. /* FALLTHROUGH */
  126. case HTTP_URL:
  127. if (!rbuf_find_bytes(buf, " ", 1, &idx)) {
  128. if (rbuf_bytes_available(buf) >= MAX_LEN_STREAM_HELLO) {
  129. error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
  130. return PARSE_ERROR;
  131. }
  132. }
  133. *url = mallocz(idx + 1);
  134. rbuf_pop(buf, *url, idx);
  135. (*url)[idx] = 0;
  136. *parser_state = HTTP_PROTO;
  137. /* FALLTHROUGH */
  138. case HTTP_PROTO:
  139. NEED_MIN_BYTES(buf, strlen(HTTP_1_1));
  140. if (rbuf_memcmp_n(buf, H2O_STRLIT(HTTP_1_1))) {
  141. error_report("Expected \"%s\"", HTTP_1_1);
  142. return PARSE_ERROR;
  143. }
  144. rbuf_bump_tail(buf, strlen(HTTP_1_1));
  145. *parser_state = HTTP_USER_AGENT_KEY;
  146. /* FALLTHROUGH */
  147. case HTTP_USER_AGENT_KEY:
  148. // and OF COURSE EVERYTHING is passed in URL except
  149. // for user agent which we need and is passed as HTTP header
  150. // not worth writing a parser for this so we manually extract
  151. // just the single header we need and skip everything else
  152. if (!rbuf_find_bytes(buf, USER_AGENT, strlen(USER_AGENT), &idx)) {
  153. if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
  154. error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
  155. return PARSE_ERROR;
  156. }
  157. return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
  158. }
  159. rbuf_bump_tail(buf, idx + strlen(USER_AGENT));
  160. *parser_state = HTTP_USER_AGENT_VALUE;
  161. /* FALLTHROUGH */
  162. case HTTP_USER_AGENT_VALUE:
  163. if (!rbuf_find_bytes(buf, "\r\n", 2, &idx)) {
  164. if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
  165. error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
  166. return PARSE_ERROR;
  167. }
  168. return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
  169. }
  170. *user_agent = mallocz(idx + 1);
  171. rbuf_pop(buf, *user_agent, idx);
  172. (*user_agent)[idx] = 0;
  173. *parser_state = HTTP_HDR;
  174. /* FALLTHROUGH */
  175. case HTTP_HDR:
  176. if (!rbuf_find_bytes(buf, HTTP_HDR_END, strlen(HTTP_HDR_END), &idx)) {
  177. if (rbuf_bytes_available(buf) >= (size_t)(rbuf_get_capacity(buf) * 0.9)) {
  178. error_report("The initial \"STREAM [URL]" HTTP_1_1 "\" over max of %d", MAX_LEN_STREAM_HELLO);
  179. return PARSE_ERROR;
  180. }
  181. return GIMME_MORE_OF_DEM_SWEET_BYTEZ;
  182. }
  183. rbuf_bump_tail(buf, idx + strlen(HTTP_HDR_END));
  184. *parser_state = HTTP_DONE;
  185. return PARSE_DONE;
  186. case HTTP_DONE:
  187. error_report("Parsing is done. No need to call again.");
  188. return PARSE_DONE;
  189. default:
  190. error_report("Unknown parser state %d", (int)*parser_state);
  191. return PARSE_ERROR;
  192. }
  193. }
  194. #define SINGLE_WRITE_MAX (1024)
  195. void stream_process(h2o_stream_conn_t *conn, int initial)
  196. {
  197. int rc;
  198. struct web_client w;
  199. pthread_mutex_lock(&conn->tx_buf_lock);
  200. if (h2o_socket_is_writing(conn->sock) || rbuf_bytes_available(conn->tx)) {
  201. if (rbuf_bytes_available(conn->tx) && !conn->tx_buf.base) {
  202. conn->tx_buf.base = rbuf_get_linear_read_range(conn->tx, &conn->tx_buf.len);
  203. if (conn->tx_buf.base) {
  204. conn->tx_buf.len = MIN(conn->tx_buf.len, SINGLE_WRITE_MAX);
  205. h2o_socket_write(conn->sock, &conn->tx_buf, 1, on_write_complete);
  206. }
  207. }
  208. }
  209. pthread_mutex_unlock(&conn->tx_buf_lock);
  210. if (initial)
  211. h2o_socket_read_start(conn->sock, stream_on_recv);
  212. if (conn->sock->input->size) {
  213. size_t insert_max;
  214. pthread_mutex_lock(&conn->rx_buf_lock);
  215. char *insert_loc = rbuf_get_linear_insert_range(conn->rx, &insert_max);
  216. if (insert_loc == NULL) {
  217. pthread_cond_broadcast(&conn->rx_buf_cond);
  218. pthread_mutex_unlock(&conn->rx_buf_lock);
  219. return;
  220. }
  221. insert_max = MIN(insert_max, conn->sock->input->size);
  222. memcpy(insert_loc, conn->sock->input->bytes, insert_max);
  223. rbuf_bump_head(conn->rx, insert_max);
  224. h2o_buffer_consume(&conn->sock->input, insert_max);
  225. pthread_cond_broadcast(&conn->rx_buf_cond);
  226. pthread_mutex_unlock(&conn->rx_buf_lock);
  227. }
  228. switch (conn->state) {
  229. case STREAM_X_HTTP_1_1:
  230. // no conn->rx lock here as at this point we are still single threaded
  231. // until we call rrdpush_receiver_thread_spawn() later down
  232. rc = process_STREAM_X_HTTP_1_1(&conn->parse_state, conn->rx, &conn->url, &conn->user_agent);
  233. if (rc == PARSE_ERROR) {
  234. error_report("error parsing the STREAM hello");
  235. break;
  236. }
  237. if (rc != PARSE_DONE)
  238. break;
  239. conn->state = STREAM_X_HTTP_1_1_DONE;
  240. /* FALLTHROUGH */
  241. case STREAM_X_HTTP_1_1_DONE:
  242. memset(&w, 0, sizeof(w));
  243. w.response.data = buffer_create(1024, NULL);
  244. // get client ip from the conn->sock
  245. struct sockaddr client;
  246. socklen_t len = h2o_socket_getpeername(conn->sock, &client);
  247. char peername[NI_MAXHOST];
  248. size_t peername_len = h2o_socket_getnumerichost(&client, len, peername);
  249. size_t cpy_len = sizeof(w.client_ip) < peername_len ? sizeof(w.client_ip) : peername_len;
  250. memcpy(w.client_ip, peername, cpy_len);
  251. w.client_ip[cpy_len - 1] = 0;
  252. w.user_agent = conn->user_agent;
  253. rc = rrdpush_receiver_thread_spawn(&w, conn->url, conn);
  254. if (rc != HTTP_RESP_OK) {
  255. error_report("HTTPD Failed to spawn the receiver thread %d", rc);
  256. conn->state = STREAM_CLOSE;
  257. stream_on_close(conn);
  258. } else {
  259. conn->state = STREAM_ACTIVE;
  260. }
  261. buffer_free(w.response.data);
  262. /* FALLTHROUGH */
  263. case STREAM_ACTIVE:
  264. break;
  265. default:
  266. error_report("Unknown conn->state");
  267. }
  268. }
  269. // read and write functions to be used by streaming parser
  270. int h2o_stream_write(void *ctx, const char *data, size_t data_len)
  271. {
  272. h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx;
  273. pthread_mutex_lock(&conn->tx_buf_lock);
  274. size_t avail = rbuf_bytes_free(conn->tx);
  275. avail = MIN(avail, data_len);
  276. rbuf_push(conn->tx, data, avail);
  277. pthread_mutex_unlock(&conn->tx_buf_lock);
  278. __atomic_add_fetch(&pending_write_reqs, 1, __ATOMIC_SEQ_CST);
  279. return avail;
  280. }
  281. size_t h2o_stream_read(void *ctx, char *buf, size_t read_bytes)
  282. {
  283. int ret;
  284. h2o_stream_conn_t *conn = (h2o_stream_conn_t *)ctx;
  285. pthread_mutex_lock(&conn->rx_buf_lock);
  286. size_t avail = rbuf_bytes_available(conn->rx);
  287. if (!avail) {
  288. if (conn->shutdown) {
  289. pthread_mutex_unlock(&conn->rx_buf_lock);
  290. return -1;
  291. }
  292. pthread_cond_wait(&conn->rx_buf_cond, &conn->rx_buf_lock);
  293. if (conn->shutdown) {
  294. pthread_mutex_unlock(&conn->rx_buf_lock);
  295. return -1;
  296. }
  297. avail = rbuf_bytes_available(conn->rx);
  298. if (!avail) {
  299. pthread_mutex_unlock(&conn->rx_buf_lock);
  300. return 0;
  301. }
  302. }
  303. avail = MIN(avail, read_bytes);
  304. ret = rbuf_pop(conn->rx, buf, avail);
  305. pthread_mutex_unlock(&conn->rx_buf_lock);
  306. return ret;
  307. }
  308. // periodic check for pending write requests
  309. void check_tx_buf(h2o_stream_conn_t *conn)
  310. {
  311. pthread_mutex_lock(&conn->tx_buf_lock);
  312. if (rbuf_bytes_available(conn->tx)) {
  313. pthread_mutex_unlock(&conn->tx_buf_lock);
  314. stream_process(conn, 0);
  315. } else
  316. pthread_mutex_unlock(&conn->tx_buf_lock);
  317. }
  318. void h2o_stream_check_pending_write_reqs(void)
  319. {
  320. int _write_reqs = __atomic_exchange_n(&pending_write_reqs, 0, __ATOMIC_SEQ_CST);
  321. if (_write_reqs > 0)
  322. conn_list_iter_all(&conn_list, check_tx_buf);
  323. }