h2_stream.c 57 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/http/private/h2_stream.h>
  6. #include <aws/http/private/h2_connection.h>
  7. #include <aws/http/private/strutil.h>
  8. #include <aws/http/status_code.h>
  9. #include <aws/io/channel.h>
  10. #include <aws/io/logging.h>
  11. #include <aws/io/stream.h>
  12. /* Apple toolchains such as xcode and swiftpm define the DEBUG symbol. undef it here so we can actually use the token */
  13. #undef DEBUG
  14. static void s_stream_destroy(struct aws_http_stream *stream_base);
  15. static void s_stream_update_window(struct aws_http_stream *stream_base, size_t increment_size);
  16. static int s_stream_reset_stream(struct aws_http_stream *stream_base, uint32_t http2_error);
  17. static int s_stream_get_received_error_code(struct aws_http_stream *stream_base, uint32_t *out_http2_error);
  18. static int s_stream_get_sent_error_code(struct aws_http_stream *stream_base, uint32_t *out_http2_error);
  19. static int s_stream_write_data(
  20. struct aws_http_stream *stream_base,
  21. const struct aws_http2_stream_write_data_options *options);
  22. static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
  23. static struct aws_h2err s_send_rst_and_close_stream(struct aws_h2_stream *stream, struct aws_h2err stream_error);
  24. static int s_stream_reset_stream_internal(struct aws_http_stream *stream_base, struct aws_h2err stream_error);
  25. struct aws_http_stream_vtable s_h2_stream_vtable = {
  26. .destroy = s_stream_destroy,
  27. .update_window = s_stream_update_window,
  28. .activate = aws_h2_stream_activate,
  29. .http1_write_chunk = NULL,
  30. .http2_reset_stream = s_stream_reset_stream,
  31. .http2_get_received_error_code = s_stream_get_received_error_code,
  32. .http2_get_sent_error_code = s_stream_get_sent_error_code,
  33. .http2_write_data = s_stream_write_data,
  34. };
  35. const char *aws_h2_stream_state_to_str(enum aws_h2_stream_state state) {
  36. switch (state) {
  37. case AWS_H2_STREAM_STATE_IDLE:
  38. return "IDLE";
  39. case AWS_H2_STREAM_STATE_RESERVED_LOCAL:
  40. return "RESERVED_LOCAL";
  41. case AWS_H2_STREAM_STATE_RESERVED_REMOTE:
  42. return "RESERVED_REMOTE";
  43. case AWS_H2_STREAM_STATE_OPEN:
  44. return "OPEN";
  45. case AWS_H2_STREAM_STATE_HALF_CLOSED_LOCAL:
  46. return "HALF_CLOSED_LOCAL";
  47. case AWS_H2_STREAM_STATE_HALF_CLOSED_REMOTE:
  48. return "HALF_CLOSED_REMOTE";
  49. case AWS_H2_STREAM_STATE_CLOSED:
  50. return "CLOSED";
  51. default:
  52. /* unreachable */
  53. AWS_ASSERT(0);
  54. return "*** UNKNOWN ***";
  55. }
  56. }
  57. static struct aws_h2_connection *s_get_h2_connection(const struct aws_h2_stream *stream) {
  58. return AWS_CONTAINER_OF(stream->base.owning_connection, struct aws_h2_connection, base);
  59. }
  60. static void s_lock_synced_data(struct aws_h2_stream *stream) {
  61. int err = aws_mutex_lock(&stream->synced_data.lock);
  62. AWS_ASSERT(!err && "lock failed");
  63. (void)err;
  64. }
  65. static void s_unlock_synced_data(struct aws_h2_stream *stream) {
  66. int err = aws_mutex_unlock(&stream->synced_data.lock);
  67. AWS_ASSERT(!err && "unlock failed");
  68. (void)err;
  69. }
  70. #define AWS_PRECONDITION_ON_CHANNEL_THREAD(STREAM) \
  71. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(s_get_h2_connection(STREAM)->base.channel_slot->channel))
  72. static bool s_client_state_allows_frame_type[AWS_H2_STREAM_STATE_COUNT][AWS_H2_FRAME_TYPE_COUNT] = {
  73. /* State before anything is sent or received */
  74. [AWS_H2_STREAM_STATE_IDLE] = {0},
  75. /* Client streams are never in reserved (local) state */
  76. [AWS_H2_STREAM_STATE_RESERVED_LOCAL] = {0},
  77. /* Client received push-request via PUSH_PROMISE on another stream.
  78. * Waiting for push-response to start arriving on this server-initiated stream. */
  79. [AWS_H2_STREAM_STATE_RESERVED_REMOTE] =
  80. {
  81. [AWS_H2_FRAME_T_HEADERS] = true,
  82. [AWS_H2_FRAME_T_RST_STREAM] = true,
  83. },
  84. /* Client is sending request and has not received full response yet. */
  85. [AWS_H2_STREAM_STATE_OPEN] =
  86. {
  87. [AWS_H2_FRAME_T_DATA] = true,
  88. [AWS_H2_FRAME_T_HEADERS] = true,
  89. [AWS_H2_FRAME_T_RST_STREAM] = true,
  90. [AWS_H2_FRAME_T_PUSH_PROMISE] = true,
  91. [AWS_H2_FRAME_T_WINDOW_UPDATE] = true,
  92. },
  93. /* Client has sent full request (END_STREAM), but has not received full response yet. */
  94. [AWS_H2_STREAM_STATE_HALF_CLOSED_LOCAL] =
  95. {
  96. [AWS_H2_FRAME_T_DATA] = true,
  97. [AWS_H2_FRAME_T_HEADERS] = true,
  98. [AWS_H2_FRAME_T_RST_STREAM] = true,
  99. [AWS_H2_FRAME_T_PUSH_PROMISE] = true,
  100. [AWS_H2_FRAME_T_WINDOW_UPDATE] = true,
  101. },
  102. /* Client has received full response (END_STREAM), but is still sending request (uncommon). */
  103. [AWS_H2_STREAM_STATE_HALF_CLOSED_REMOTE] =
  104. {
  105. [AWS_H2_FRAME_T_RST_STREAM] = true,
  106. [AWS_H2_FRAME_T_WINDOW_UPDATE] = true,
  107. },
  108. /* Full request sent (END_STREAM) and full response received (END_STREAM).
  109. * OR sent RST_STREAM. OR received RST_STREAM. */
  110. [AWS_H2_STREAM_STATE_CLOSED] = {0},
  111. };
  112. static bool s_server_state_allows_frame_type[AWS_H2_STREAM_STATE_COUNT][AWS_H2_FRAME_TYPE_COUNT] = {
  113. /* State before anything is sent or received, waiting for request headers to arrives and start things off */
  114. [AWS_H2_STREAM_STATE_IDLE] =
  115. {
  116. [AWS_H2_FRAME_T_HEADERS] = true,
  117. },
  118. /* Server sent push-request via PUSH_PROMISE on a client-initiated stream,
  119. * but hasn't started sending the push-response on this server-initiated stream yet. */
  120. [AWS_H2_STREAM_STATE_RESERVED_LOCAL] =
  121. {
  122. [AWS_H2_FRAME_T_RST_STREAM] = true,
  123. [AWS_H2_FRAME_T_WINDOW_UPDATE] = true,
  124. },
  125. /* Server streams are never in reserved (remote) state */
  126. [AWS_H2_STREAM_STATE_RESERVED_REMOTE] = {0},
  127. /* Server is receiving request, and has sent full response yet. */
  128. [AWS_H2_STREAM_STATE_OPEN] =
  129. {
  130. [AWS_H2_FRAME_T_HEADERS] = true,
  131. [AWS_H2_FRAME_T_DATA] = true,
  132. [AWS_H2_FRAME_T_RST_STREAM] = true,
  133. [AWS_H2_FRAME_T_WINDOW_UPDATE] = true,
  134. },
  135. /* Server has sent full response (END_STREAM), but has not received full response yet (uncommon). */
  136. [AWS_H2_STREAM_STATE_HALF_CLOSED_LOCAL] =
  137. {
  138. [AWS_H2_FRAME_T_HEADERS] = true,
  139. [AWS_H2_FRAME_T_DATA] = true,
  140. [AWS_H2_FRAME_T_RST_STREAM] = true,
  141. [AWS_H2_FRAME_T_WINDOW_UPDATE] = true,
  142. },
  143. /* Server has received full request (END_STREAM), and is still sending response. */
  144. [AWS_H2_STREAM_STATE_HALF_CLOSED_REMOTE] =
  145. {
  146. [AWS_H2_FRAME_T_RST_STREAM] = true,
  147. [AWS_H2_FRAME_T_WINDOW_UPDATE] = true,
  148. },
  149. /* Full request received (END_STREAM) and full response sent (END_STREAM).
  150. * OR sent RST_STREAM. OR received RST_STREAM. */
  151. [AWS_H2_STREAM_STATE_CLOSED] = {0},
  152. };
  153. /* Returns the appropriate Stream Error if given frame not allowed in current state */
  154. static struct aws_h2err s_check_state_allows_frame_type(
  155. const struct aws_h2_stream *stream,
  156. enum aws_h2_frame_type frame_type) {
  157. AWS_PRECONDITION(frame_type < AWS_H2_FRAME_T_UNKNOWN); /* Decoder won't invoke callbacks for unknown frame types */
  158. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  159. const enum aws_h2_stream_state state = stream->thread_data.state;
  160. bool allowed;
  161. if (stream->base.server_data) {
  162. allowed = s_server_state_allows_frame_type[state][frame_type];
  163. } else {
  164. allowed = s_client_state_allows_frame_type[state][frame_type];
  165. }
  166. if (allowed) {
  167. return AWS_H2ERR_SUCCESS;
  168. }
  169. /* Determine specific error code */
  170. enum aws_http2_error_code h2_error_code = AWS_HTTP2_ERR_PROTOCOL_ERROR;
  171. /* If peer knows the state is closed, then it's a STREAM_CLOSED error */
  172. if (state == AWS_H2_STREAM_STATE_CLOSED || state == AWS_H2_STREAM_STATE_HALF_CLOSED_REMOTE) {
  173. h2_error_code = AWS_HTTP2_ERR_STREAM_CLOSED;
  174. }
  175. AWS_H2_STREAM_LOGF(
  176. ERROR,
  177. stream,
  178. "Malformed message, cannot receive %s frame in %s state",
  179. aws_h2_frame_type_to_str(frame_type),
  180. aws_h2_stream_state_to_str(state));
  181. return aws_h2err_from_h2_code(h2_error_code);
  182. }
  183. static int s_stream_send_update_window_frame(struct aws_h2_stream *stream, size_t increment_size) {
  184. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  185. AWS_PRECONDITION(increment_size <= AWS_H2_WINDOW_UPDATE_MAX);
  186. struct aws_h2_connection *connection = s_get_h2_connection(stream);
  187. struct aws_h2_frame *stream_window_update_frame =
  188. aws_h2_frame_new_window_update(stream->base.alloc, stream->base.id, (uint32_t)increment_size);
  189. if (!stream_window_update_frame) {
  190. AWS_H2_STREAM_LOGF(
  191. ERROR,
  192. stream,
  193. "Failed to create WINDOW_UPDATE frame on connection, error %s",
  194. aws_error_name(aws_last_error()));
  195. return AWS_OP_ERR;
  196. }
  197. aws_h2_connection_enqueue_outgoing_frame(connection, stream_window_update_frame);
  198. return AWS_OP_SUCCESS;
  199. }
  200. struct aws_h2_stream *aws_h2_stream_new_request(
  201. struct aws_http_connection *client_connection,
  202. const struct aws_http_make_request_options *options) {
  203. AWS_PRECONDITION(client_connection);
  204. AWS_PRECONDITION(options);
  205. struct aws_h2_stream *stream = aws_mem_calloc(client_connection->alloc, 1, sizeof(struct aws_h2_stream));
  206. /* Initialize base stream */
  207. stream->base.vtable = &s_h2_stream_vtable;
  208. stream->base.alloc = client_connection->alloc;
  209. stream->base.owning_connection = client_connection;
  210. stream->base.user_data = options->user_data;
  211. stream->base.on_incoming_headers = options->on_response_headers;
  212. stream->base.on_incoming_header_block_done = options->on_response_header_block_done;
  213. stream->base.on_incoming_body = options->on_response_body;
  214. stream->base.on_complete = options->on_complete;
  215. stream->base.on_destroy = options->on_destroy;
  216. stream->base.client_data = &stream->base.client_or_server_data.client;
  217. stream->base.client_data->response_status = AWS_HTTP_STATUS_CODE_UNKNOWN;
  218. aws_linked_list_init(&stream->thread_data.outgoing_writes);
  219. aws_linked_list_init(&stream->synced_data.pending_write_list);
  220. /* Stream refcount starts at 1, and gets incremented again for the connection upon a call to activate() */
  221. aws_atomic_init_int(&stream->base.refcount, 1);
  222. enum aws_http_version message_version = aws_http_message_get_protocol_version(options->request);
  223. switch (message_version) {
  224. case AWS_HTTP_VERSION_1_1:
  225. /* TODO: don't automatic transform HTTP/1 message. Let user explicitly pass in HTTP/2 request */
  226. stream->thread_data.outgoing_message =
  227. aws_http2_message_new_from_http1(stream->base.alloc, options->request);
  228. if (!stream->thread_data.outgoing_message) {
  229. AWS_H2_STREAM_LOG(ERROR, stream, "Stream failed to create the HTTP/2 message from HTTP/1.1 message");
  230. goto error;
  231. }
  232. break;
  233. case AWS_HTTP_VERSION_2:
  234. stream->thread_data.outgoing_message = options->request;
  235. aws_http_message_acquire(stream->thread_data.outgoing_message);
  236. break;
  237. default:
  238. /* Not supported */
  239. aws_raise_error(AWS_ERROR_HTTP_UNSUPPORTED_PROTOCOL);
  240. goto error;
  241. }
  242. struct aws_byte_cursor method;
  243. AWS_ZERO_STRUCT(method);
  244. if (aws_http_message_get_request_method(options->request, &method)) {
  245. goto error;
  246. }
  247. stream->base.request_method = aws_http_str_to_method(method);
  248. /* Init H2 specific stuff */
  249. stream->thread_data.state = AWS_H2_STREAM_STATE_IDLE;
  250. /* stream end is implicit if the request isn't using manual data writes */
  251. stream->synced_data.manual_write_ended = !options->http2_use_manual_data_writes;
  252. stream->manual_write = options->http2_use_manual_data_writes;
  253. /* if there's a request body to write, add it as the first outgoing write */
  254. struct aws_input_stream *body_stream = aws_http_message_get_body_stream(options->request);
  255. if (body_stream) {
  256. struct aws_h2_stream_data_write *body_write =
  257. aws_mem_calloc(stream->base.alloc, 1, sizeof(struct aws_h2_stream_data_write));
  258. body_write->data_stream = aws_input_stream_acquire(body_stream);
  259. body_write->end_stream = !stream->manual_write;
  260. aws_linked_list_push_back(&stream->thread_data.outgoing_writes, &body_write->node);
  261. }
  262. stream->sent_reset_error_code = -1;
  263. stream->received_reset_error_code = -1;
  264. stream->synced_data.reset_error.h2_code = AWS_HTTP2_ERR_COUNT;
  265. stream->synced_data.api_state = AWS_H2_STREAM_API_STATE_INIT;
  266. if (aws_mutex_init(&stream->synced_data.lock)) {
  267. AWS_H2_STREAM_LOGF(
  268. ERROR, stream, "Mutex init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error()));
  269. goto error;
  270. }
  271. aws_channel_task_init(
  272. &stream->cross_thread_work_task, s_stream_cross_thread_work_task, stream, "HTTP/2 stream cross-thread work");
  273. return stream;
  274. error:
  275. s_stream_destroy(&stream->base);
  276. return NULL;
  277. }
  278. static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  279. (void)task;
  280. struct aws_h2_stream *stream = arg;
  281. if (status != AWS_TASK_STATUS_RUN_READY) {
  282. goto end;
  283. }
  284. struct aws_h2_connection *connection = s_get_h2_connection(stream);
  285. if (aws_h2_stream_get_state(stream) == AWS_H2_STREAM_STATE_CLOSED) {
  286. /* stream is closed, silently ignoring the requests from user */
  287. AWS_H2_STREAM_LOG(
  288. TRACE, stream, "Stream closed before cross thread work task runs, ignoring everything was sent by user.");
  289. goto end;
  290. }
  291. /* Not sending window update at half closed remote state */
  292. bool ignore_window_update = (aws_h2_stream_get_state(stream) == AWS_H2_STREAM_STATE_HALF_CLOSED_REMOTE);
  293. bool reset_called;
  294. size_t window_update_size;
  295. struct aws_h2err reset_error;
  296. struct aws_linked_list pending_writes;
  297. aws_linked_list_init(&pending_writes);
  298. { /* BEGIN CRITICAL SECTION */
  299. s_lock_synced_data(stream);
  300. stream->synced_data.is_cross_thread_work_task_scheduled = false;
  301. /* window_update_size is ensured to be not greater than AWS_H2_WINDOW_UPDATE_MAX */
  302. window_update_size = stream->synced_data.window_update_size;
  303. stream->synced_data.window_update_size = 0;
  304. reset_called = stream->synced_data.reset_called;
  305. reset_error = stream->synced_data.reset_error;
  306. /* copy out pending writes */
  307. aws_linked_list_swap_contents(&pending_writes, &stream->synced_data.pending_write_list);
  308. s_unlock_synced_data(stream);
  309. } /* END CRITICAL SECTION */
  310. if (window_update_size > 0 && !ignore_window_update) {
  311. if (s_stream_send_update_window_frame(stream, window_update_size)) {
  312. /* Treat this as a connection error */
  313. aws_h2_connection_shutdown_due_to_write_err(connection, aws_last_error());
  314. }
  315. }
  316. /* The largest legal value will be 2 * max window size, which is way less than INT64_MAX, so if the window_size_self
  317. * overflows, remote peer will find it out. So just apply the change and ignore the possible overflow.*/
  318. stream->thread_data.window_size_self += window_update_size;
  319. if (reset_called) {
  320. struct aws_h2err returned_h2err = s_send_rst_and_close_stream(stream, reset_error);
  321. if (aws_h2err_failed(returned_h2err)) {
  322. aws_h2_connection_shutdown_due_to_write_err(connection, returned_h2err.aws_code);
  323. }
  324. }
  325. if (stream->thread_data.waiting_for_writes && !aws_linked_list_empty(&pending_writes)) {
  326. /* Got more to write, move the stream back to outgoing list */
  327. aws_linked_list_remove(&stream->node);
  328. aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node);
  329. stream->thread_data.waiting_for_writes = false;
  330. }
  331. /* move any pending writes to the outgoing write queue */
  332. aws_linked_list_move_all_back(&stream->thread_data.outgoing_writes, &pending_writes);
  333. /* It's likely that frames were queued while processing cross-thread work.
  334. * If so, try writing them now */
  335. aws_h2_try_write_outgoing_frames(connection);
  336. end:
  337. aws_http_stream_release(&stream->base);
  338. }
  339. static void s_stream_data_write_destroy(
  340. struct aws_h2_stream *stream,
  341. struct aws_h2_stream_data_write *write,
  342. int error_code) {
  343. AWS_PRECONDITION(stream);
  344. AWS_PRECONDITION(write);
  345. if (write->on_complete) {
  346. write->on_complete(&stream->base, error_code, write->user_data);
  347. }
  348. if (write->data_stream) {
  349. aws_input_stream_release(write->data_stream);
  350. }
  351. aws_mem_release(stream->base.alloc, write);
  352. }
  353. static void s_h2_stream_destroy_pending_writes(struct aws_h2_stream *stream) {
  354. /**
  355. * Only called when stream is not active and will never be active afterward (destroying).
  356. * Under this assumption, we can safely touch `stream->synced_data.pending_write_list` without
  357. * lock, as the user can only add write to the list when the stream is ACTIVE
  358. */
  359. AWS_ASSERT(stream->synced_data.api_state != AWS_H2_STREAM_API_STATE_ACTIVE);
  360. aws_linked_list_move_all_back(
  361. &stream->thread_data.outgoing_writes,
  362. &stream->synced_data.pending_write_list); /* clean up any outgoing writes */
  363. while (!aws_linked_list_empty(&stream->thread_data.outgoing_writes)) {
  364. struct aws_linked_list_node *node = aws_linked_list_pop_front(&stream->thread_data.outgoing_writes);
  365. struct aws_h2_stream_data_write *write = AWS_CONTAINER_OF(node, struct aws_h2_stream_data_write, node);
  366. AWS_LOGF_DEBUG(AWS_LS_HTTP_STREAM, "Stream closing, cancelling write of stream %p", (void *)write->data_stream);
  367. s_stream_data_write_destroy(stream, write, AWS_ERROR_HTTP_STREAM_HAS_COMPLETED);
  368. }
  369. }
  370. static void s_stream_destroy(struct aws_http_stream *stream_base) {
  371. AWS_PRECONDITION(stream_base);
  372. struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base);
  373. s_h2_stream_destroy_pending_writes(stream);
  374. AWS_H2_STREAM_LOG(DEBUG, stream, "Destroying stream");
  375. aws_mutex_clean_up(&stream->synced_data.lock);
  376. aws_http_message_release(stream->thread_data.outgoing_message);
  377. aws_mem_release(stream->base.alloc, stream);
  378. }
  379. void aws_h2_stream_complete(struct aws_h2_stream *stream, int error_code) {
  380. { /* BEGIN CRITICAL SECTION */
  381. /* clean up any pending writes */
  382. s_lock_synced_data(stream);
  383. /* The stream is complete now, this will prevent further writes from being queued */
  384. stream->synced_data.api_state = AWS_H2_STREAM_API_STATE_COMPLETE;
  385. s_unlock_synced_data(stream);
  386. } /* END CRITICAL SECTION */
  387. s_h2_stream_destroy_pending_writes(stream);
  388. /* Invoke callback */
  389. if (stream->base.on_complete) {
  390. stream->base.on_complete(&stream->base, error_code, stream->base.user_data);
  391. }
  392. }
  393. static void s_stream_update_window(struct aws_http_stream *stream_base, size_t increment_size) {
  394. AWS_PRECONDITION(stream_base);
  395. struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base);
  396. struct aws_h2_connection *connection = s_get_h2_connection(stream);
  397. if (!increment_size) {
  398. return;
  399. }
  400. if (!connection->base.stream_manual_window_management) {
  401. /* auto-mode, manual update window is not supported */
  402. AWS_H2_STREAM_LOG(
  403. DEBUG, stream, "Manual window management is off, update window operations are not supported.");
  404. return;
  405. }
  406. int err = 0;
  407. bool stream_is_init;
  408. bool cross_thread_work_should_schedule = false;
  409. size_t sum_size;
  410. { /* BEGIN CRITICAL SECTION */
  411. s_lock_synced_data(stream);
  412. err |= aws_add_size_checked(stream->synced_data.window_update_size, increment_size, &sum_size);
  413. err |= sum_size > AWS_H2_WINDOW_UPDATE_MAX;
  414. stream_is_init = stream->synced_data.api_state == AWS_H2_STREAM_API_STATE_INIT;
  415. if (!err && !stream_is_init) {
  416. cross_thread_work_should_schedule = !stream->synced_data.is_cross_thread_work_task_scheduled;
  417. stream->synced_data.is_cross_thread_work_task_scheduled = true;
  418. stream->synced_data.window_update_size = sum_size;
  419. }
  420. s_unlock_synced_data(stream);
  421. } /* END CRITICAL SECTION */
  422. if (cross_thread_work_should_schedule) {
  423. AWS_H2_STREAM_LOG(TRACE, stream, "Scheduling stream cross-thread work task");
  424. /* increment the refcount of stream to keep it alive until the task runs */
  425. aws_atomic_fetch_add(&stream->base.refcount, 1);
  426. aws_channel_schedule_task_now(connection->base.channel_slot->channel, &stream->cross_thread_work_task);
  427. return;
  428. }
  429. if (stream_is_init) {
  430. AWS_H2_STREAM_LOG(
  431. ERROR,
  432. stream,
  433. "Stream update window failed. Stream is in initialized state, please activate the stream first.");
  434. aws_raise_error(AWS_ERROR_INVALID_STATE);
  435. return;
  436. }
  437. if (err) {
  438. /* The increment_size is still not 100% safe, since we cannot control the incoming data frame. So just
  439. * ruled out the value that is obviously wrong values */
  440. AWS_H2_STREAM_LOG(
  441. ERROR,
  442. stream,
  443. "The stream's flow-control window has been incremented beyond 2**31 -1, the max for HTTP/2. The stream "
  444. "will close.");
  445. aws_raise_error(AWS_ERROR_OVERFLOW_DETECTED);
  446. struct aws_h2err stream_error = {
  447. .aws_code = AWS_ERROR_OVERFLOW_DETECTED,
  448. .h2_code = AWS_HTTP2_ERR_INTERNAL_ERROR,
  449. };
  450. /* Only when stream is not initialized reset will fail. So, we can assert it to be succeed. */
  451. AWS_FATAL_ASSERT(s_stream_reset_stream_internal(stream_base, stream_error) == AWS_OP_SUCCESS);
  452. }
  453. return;
  454. }
  455. static int s_stream_reset_stream_internal(struct aws_http_stream *stream_base, struct aws_h2err stream_error) {
  456. struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base);
  457. struct aws_h2_connection *connection = s_get_h2_connection(stream);
  458. bool reset_called;
  459. bool stream_is_init;
  460. bool cross_thread_work_should_schedule = false;
  461. { /* BEGIN CRITICAL SECTION */
  462. s_lock_synced_data(stream);
  463. reset_called = stream->synced_data.reset_called;
  464. stream_is_init = stream->synced_data.api_state == AWS_H2_STREAM_API_STATE_INIT;
  465. if (!reset_called && !stream_is_init) {
  466. cross_thread_work_should_schedule = !stream->synced_data.is_cross_thread_work_task_scheduled;
  467. stream->synced_data.reset_called = true;
  468. stream->synced_data.reset_error = stream_error;
  469. }
  470. s_unlock_synced_data(stream);
  471. } /* END CRITICAL SECTION */
  472. if (stream_is_init) {
  473. AWS_H2_STREAM_LOG(
  474. ERROR, stream, "Reset stream failed. Stream is in initialized state, please activate the stream first.");
  475. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  476. }
  477. if (cross_thread_work_should_schedule) {
  478. AWS_H2_STREAM_LOG(TRACE, stream, "Scheduling stream cross-thread work task");
  479. /* increment the refcount of stream to keep it alive until the task runs */
  480. aws_atomic_fetch_add(&stream->base.refcount, 1);
  481. aws_channel_schedule_task_now(connection->base.channel_slot->channel, &stream->cross_thread_work_task);
  482. return AWS_OP_SUCCESS;
  483. }
  484. if (reset_called) {
  485. AWS_H2_STREAM_LOG(DEBUG, stream, "Reset stream ignored. Reset stream has been called already.");
  486. }
  487. return AWS_OP_SUCCESS;
  488. }
  489. static int s_stream_reset_stream(struct aws_http_stream *stream_base, uint32_t http2_error) {
  490. struct aws_h2err stream_error = {
  491. .aws_code = AWS_ERROR_HTTP_RST_STREAM_SENT,
  492. .h2_code = http2_error,
  493. };
  494. AWS_LOGF_TRACE(
  495. AWS_LS_HTTP_STREAM,
  496. "id=%p: User requested RST_STREAM with error code %s (0x%x)",
  497. (void *)stream_base,
  498. aws_http2_error_code_to_str(http2_error),
  499. http2_error);
  500. return s_stream_reset_stream_internal(stream_base, stream_error);
  501. }
  502. static int s_stream_get_received_error_code(struct aws_http_stream *stream_base, uint32_t *out_http2_error) {
  503. struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base);
  504. if (stream->received_reset_error_code == -1) {
  505. return aws_raise_error(AWS_ERROR_HTTP_DATA_NOT_AVAILABLE);
  506. }
  507. *out_http2_error = (uint32_t)stream->received_reset_error_code;
  508. return AWS_OP_SUCCESS;
  509. }
  510. static int s_stream_get_sent_error_code(struct aws_http_stream *stream_base, uint32_t *out_http2_error) {
  511. struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base);
  512. if (stream->sent_reset_error_code == -1) {
  513. return aws_raise_error(AWS_ERROR_HTTP_DATA_NOT_AVAILABLE);
  514. }
  515. *out_http2_error = (uint32_t)stream->sent_reset_error_code;
  516. return AWS_OP_SUCCESS;
  517. }
  518. enum aws_h2_stream_state aws_h2_stream_get_state(const struct aws_h2_stream *stream) {
  519. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  520. return stream->thread_data.state;
  521. }
  522. /* Given a Stream Error, send RST_STREAM frame and close stream.
  523. * A Connection Error is returned if something goes catastrophically wrong */
  524. static struct aws_h2err s_send_rst_and_close_stream(struct aws_h2_stream *stream, struct aws_h2err stream_error) {
  525. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  526. AWS_PRECONDITION(stream->thread_data.state != AWS_H2_STREAM_STATE_CLOSED);
  527. struct aws_h2_connection *connection = s_get_h2_connection(stream);
  528. stream->thread_data.state = AWS_H2_STREAM_STATE_CLOSED;
  529. AWS_H2_STREAM_LOGF(
  530. DEBUG,
  531. stream,
  532. "Sending RST_STREAM with error code %s (0x%x). State -> CLOSED",
  533. aws_http2_error_code_to_str(stream_error.h2_code),
  534. stream_error.h2_code);
  535. /* Send RST_STREAM */
  536. struct aws_h2_frame *rst_stream_frame =
  537. aws_h2_frame_new_rst_stream(stream->base.alloc, stream->base.id, stream_error.h2_code);
  538. AWS_FATAL_ASSERT(rst_stream_frame != NULL);
  539. aws_h2_connection_enqueue_outgoing_frame(connection, rst_stream_frame); /* connection takes ownership of frame */
  540. stream->sent_reset_error_code = stream_error.h2_code;
  541. /* Tell connection that stream is now closed */
  542. if (aws_h2_connection_on_stream_closed(
  543. connection, stream, AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_SENT, stream_error.aws_code)) {
  544. return aws_h2err_from_last_error();
  545. }
  546. return AWS_H2ERR_SUCCESS;
  547. }
  548. struct aws_h2err aws_h2_stream_window_size_change(struct aws_h2_stream *stream, int32_t size_changed, bool self) {
  549. if (self) {
  550. if (stream->thread_data.window_size_self + size_changed > AWS_H2_WINDOW_UPDATE_MAX) {
  551. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_FLOW_CONTROL_ERROR);
  552. }
  553. stream->thread_data.window_size_self += size_changed;
  554. } else {
  555. if ((int64_t)stream->thread_data.window_size_peer + size_changed > AWS_H2_WINDOW_UPDATE_MAX) {
  556. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_FLOW_CONTROL_ERROR);
  557. }
  558. stream->thread_data.window_size_peer += size_changed;
  559. }
  560. return AWS_H2ERR_SUCCESS;
  561. }
  562. static inline bool s_h2_stream_has_outgoing_writes(struct aws_h2_stream *stream) {
  563. return !aws_linked_list_empty(&stream->thread_data.outgoing_writes);
  564. }
  565. static void s_h2_stream_write_data_complete(struct aws_h2_stream *stream, bool *waiting_writes) {
  566. AWS_PRECONDITION(waiting_writes);
  567. AWS_PRECONDITION(s_h2_stream_has_outgoing_writes(stream));
  568. /* finish/clean up the current write operation */
  569. struct aws_linked_list_node *node = aws_linked_list_pop_front(&stream->thread_data.outgoing_writes);
  570. struct aws_h2_stream_data_write *write_op = AWS_CONTAINER_OF(node, struct aws_h2_stream_data_write, node);
  571. const bool ending_stream = write_op->end_stream;
  572. s_stream_data_write_destroy(stream, write_op, AWS_OP_SUCCESS);
  573. /* check to see if there are more queued writes or stream_end was called */
  574. *waiting_writes = !ending_stream && !s_h2_stream_has_outgoing_writes(stream);
  575. }
  576. static struct aws_h2_stream_data_write *s_h2_stream_get_current_write(struct aws_h2_stream *stream) {
  577. AWS_PRECONDITION(s_h2_stream_has_outgoing_writes(stream));
  578. struct aws_linked_list_node *node = aws_linked_list_front(&stream->thread_data.outgoing_writes);
  579. struct aws_h2_stream_data_write *write = AWS_CONTAINER_OF(node, struct aws_h2_stream_data_write, node);
  580. return write;
  581. }
  582. static struct aws_input_stream *s_h2_stream_get_data_stream(struct aws_h2_stream *stream) {
  583. struct aws_h2_stream_data_write *write = s_h2_stream_get_current_write(stream);
  584. return write->data_stream;
  585. }
  586. static bool s_h2_stream_does_current_write_end_stream(struct aws_h2_stream *stream) {
  587. struct aws_h2_stream_data_write *write = s_h2_stream_get_current_write(stream);
  588. return write->end_stream;
  589. }
  590. int aws_h2_stream_on_activated(struct aws_h2_stream *stream, enum aws_h2_stream_body_state *body_state) {
  591. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  592. struct aws_h2_connection *connection = s_get_h2_connection(stream);
  593. /* Create HEADERS frame */
  594. struct aws_http_message *msg = stream->thread_data.outgoing_message;
  595. /* Should be ensured when the stream is created */
  596. AWS_ASSERT(aws_http_message_get_protocol_version(msg) == AWS_HTTP_VERSION_2);
  597. /* If manual write, always has data to be sent. */
  598. bool with_data = aws_http_message_get_body_stream(msg) != NULL || stream->manual_write;
  599. struct aws_http_headers *h2_headers = aws_http_message_get_headers(msg);
  600. struct aws_h2_frame *headers_frame = aws_h2_frame_new_headers(
  601. stream->base.alloc,
  602. stream->base.id,
  603. h2_headers,
  604. !with_data /* end_stream */,
  605. 0 /* padding - not currently configurable via public API */,
  606. NULL /* priority - not currently configurable via public API */);
  607. if (!headers_frame) {
  608. AWS_H2_STREAM_LOGF(ERROR, stream, "Failed to create HEADERS frame: %s", aws_error_name(aws_last_error()));
  609. goto error;
  610. }
  611. /* Initialize the flow-control window size */
  612. stream->thread_data.window_size_peer =
  613. connection->thread_data.settings_peer[AWS_HTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
  614. stream->thread_data.window_size_self =
  615. connection->thread_data.settings_self[AWS_HTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
  616. if (with_data) {
  617. /* If stream has DATA to send, put it in the outgoing_streams_list, and we'll send data later */
  618. stream->thread_data.state = AWS_H2_STREAM_STATE_OPEN;
  619. AWS_H2_STREAM_LOG(TRACE, stream, "Sending HEADERS. State -> OPEN");
  620. } else {
  621. /* If stream has no body, then HEADERS frame marks the end of outgoing data */
  622. stream->thread_data.state = AWS_H2_STREAM_STATE_HALF_CLOSED_LOCAL;
  623. AWS_H2_STREAM_LOG(TRACE, stream, "Sending HEADERS with END_STREAM. State -> HALF_CLOSED_LOCAL");
  624. }
  625. if (s_h2_stream_has_outgoing_writes(stream)) {
  626. *body_state = AWS_H2_STREAM_BODY_STATE_ONGOING;
  627. } else {
  628. if (stream->manual_write) {
  629. stream->thread_data.waiting_for_writes = true;
  630. *body_state = AWS_H2_STREAM_BODY_STATE_WAITING_WRITES;
  631. } else {
  632. *body_state = AWS_H2_STREAM_BODY_STATE_NONE;
  633. }
  634. }
  635. aws_h2_connection_enqueue_outgoing_frame(connection, headers_frame);
  636. return AWS_OP_SUCCESS;
  637. error:
  638. return AWS_OP_ERR;
  639. }
  640. int aws_h2_stream_encode_data_frame(
  641. struct aws_h2_stream *stream,
  642. struct aws_h2_frame_encoder *encoder,
  643. struct aws_byte_buf *output,
  644. int *data_encode_status) {
  645. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  646. AWS_PRECONDITION(
  647. stream->thread_data.state == AWS_H2_STREAM_STATE_OPEN ||
  648. stream->thread_data.state == AWS_H2_STREAM_STATE_HALF_CLOSED_REMOTE);
  649. struct aws_h2_connection *connection = s_get_h2_connection(stream);
  650. AWS_PRECONDITION(connection->thread_data.window_size_peer > AWS_H2_MIN_WINDOW_SIZE);
  651. if (stream->thread_data.window_size_peer <= AWS_H2_MIN_WINDOW_SIZE) {
  652. /* The stream is stalled now */
  653. *data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED;
  654. return AWS_OP_SUCCESS;
  655. }
  656. *data_encode_status = AWS_H2_DATA_ENCODE_COMPLETE;
  657. struct aws_input_stream *input_stream = s_h2_stream_get_data_stream(stream);
  658. AWS_ASSERT(input_stream);
  659. bool input_stream_complete = false;
  660. bool input_stream_stalled = false;
  661. bool ends_stream = s_h2_stream_does_current_write_end_stream(stream);
  662. if (aws_h2_encode_data_frame(
  663. encoder,
  664. stream->base.id,
  665. input_stream,
  666. ends_stream,
  667. 0 /*pad_length*/,
  668. &stream->thread_data.window_size_peer,
  669. &connection->thread_data.window_size_peer,
  670. output,
  671. &input_stream_complete,
  672. &input_stream_stalled)) {
  673. /* Failed to write DATA, treat it as a Stream Error */
  674. AWS_H2_STREAM_LOGF(ERROR, stream, "Error encoding stream DATA, %s", aws_error_name(aws_last_error()));
  675. struct aws_h2err returned_h2err = s_send_rst_and_close_stream(stream, aws_h2err_from_last_error());
  676. if (aws_h2err_failed(returned_h2err)) {
  677. aws_h2_connection_shutdown_due_to_write_err(connection, returned_h2err.aws_code);
  678. }
  679. return AWS_OP_SUCCESS;
  680. }
  681. bool waiting_writes = false;
  682. if (input_stream_complete) {
  683. s_h2_stream_write_data_complete(stream, &waiting_writes);
  684. }
  685. /*
  686. * input_stream_complete for manual writes just means the current outgoing_write is complete. The body is not
  687. * complete for real until the stream is told to close
  688. */
  689. if (input_stream_complete && ends_stream) {
  690. /* Done sending data. No more data will be sent. */
  691. if (stream->thread_data.state == AWS_H2_STREAM_STATE_HALF_CLOSED_REMOTE) {
  692. /* Both sides have sent END_STREAM */
  693. stream->thread_data.state = AWS_H2_STREAM_STATE_CLOSED;
  694. AWS_H2_STREAM_LOG(TRACE, stream, "Sent END_STREAM. State -> CLOSED");
  695. /* Tell connection that stream is now closed */
  696. if (aws_h2_connection_on_stream_closed(
  697. connection, stream, AWS_H2_STREAM_CLOSED_WHEN_BOTH_SIDES_END_STREAM, AWS_ERROR_SUCCESS)) {
  698. return AWS_OP_ERR;
  699. }
  700. } else {
  701. /* Else can't close until we receive END_STREAM */
  702. stream->thread_data.state = AWS_H2_STREAM_STATE_HALF_CLOSED_LOCAL;
  703. AWS_H2_STREAM_LOG(TRACE, stream, "Sent END_STREAM. State -> HALF_CLOSED_LOCAL");
  704. }
  705. } else {
  706. *data_encode_status = AWS_H2_DATA_ENCODE_ONGOING;
  707. if (input_stream_stalled) {
  708. AWS_ASSERT(!input_stream_complete);
  709. *data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_BODY_STREAM_STALLED;
  710. }
  711. if (stream->thread_data.window_size_peer <= AWS_H2_MIN_WINDOW_SIZE) {
  712. /* if body and window both stalled, we take the window stalled status, which will take the stream out
  713. * from outgoing list */
  714. *data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED;
  715. }
  716. if (waiting_writes) {
  717. /* if window stalled and we waiting for manual writes, we take waiting writes status, which will be handled
  718. * properly if more writes coming, but windows is still stalled. But not the other way around. */
  719. AWS_ASSERT(input_stream_complete);
  720. *data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_WAITING_FOR_WRITES;
  721. }
  722. }
  723. return AWS_OP_SUCCESS;
  724. }
  725. struct aws_h2err aws_h2_stream_on_decoder_headers_begin(struct aws_h2_stream *stream) {
  726. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  727. struct aws_h2err stream_err = s_check_state_allows_frame_type(stream, AWS_H2_FRAME_T_HEADERS);
  728. if (aws_h2err_failed(stream_err)) {
  729. return s_send_rst_and_close_stream(stream, stream_err);
  730. }
  731. return AWS_H2ERR_SUCCESS;
  732. }
  733. struct aws_h2err aws_h2_stream_on_decoder_headers_i(
  734. struct aws_h2_stream *stream,
  735. const struct aws_http_header *header,
  736. enum aws_http_header_name name_enum,
  737. enum aws_http_header_block block_type) {
  738. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  739. /* Not calling s_check_state_allows_frame_type() here because we already checked
  740. * at start of HEADERS frame in aws_h2_stream_on_decoder_headers_begin() */
  741. bool is_server = stream->base.server_data;
  742. /* RFC-7540 8.1 - Message consists of:
  743. * - 0+ Informational 1xx headers (response-only, decoder validates that this only occurs in responses)
  744. * - 1 main headers with normal request or response.
  745. * - 0 or 1 trailing headers with no pseudo-headers */
  746. switch (block_type) {
  747. case AWS_HTTP_HEADER_BLOCK_INFORMATIONAL:
  748. if (stream->thread_data.received_main_headers) {
  749. AWS_H2_STREAM_LOG(
  750. ERROR, stream, "Malformed message, received informational (1xx) response after main response");
  751. goto malformed;
  752. }
  753. break;
  754. case AWS_HTTP_HEADER_BLOCK_MAIN:
  755. if (stream->thread_data.received_main_headers) {
  756. AWS_H2_STREAM_LOG(ERROR, stream, "Malformed message, received second set of headers");
  757. goto malformed;
  758. }
  759. break;
  760. case AWS_HTTP_HEADER_BLOCK_TRAILING:
  761. if (!stream->thread_data.received_main_headers) {
  762. /* A HEADERS frame without any pseudo-headers looks like trailing headers to the decoder */
  763. AWS_H2_STREAM_LOG(ERROR, stream, "Malformed headers lack required pseudo-header fields.");
  764. goto malformed;
  765. }
  766. break;
  767. default:
  768. AWS_ASSERT(0);
  769. }
  770. if (is_server) {
  771. return aws_h2err_from_aws_code(AWS_ERROR_UNIMPLEMENTED);
  772. } else {
  773. /* Client */
  774. switch (name_enum) {
  775. case AWS_HTTP_HEADER_STATUS: {
  776. uint64_t status_code = 0;
  777. int err = aws_byte_cursor_utf8_parse_u64(header->value, &status_code);
  778. AWS_ASSERT(!err && "Invalid :status value. Decoder should have already validated this");
  779. (void)err;
  780. stream->base.client_data->response_status = (int)status_code;
  781. } break;
  782. case AWS_HTTP_HEADER_CONTENT_LENGTH: {
  783. if (stream->thread_data.content_length_received) {
  784. AWS_H2_STREAM_LOG(ERROR, stream, "Duplicate content-length value");
  785. goto malformed;
  786. }
  787. if (aws_byte_cursor_utf8_parse_u64(header->value, &stream->thread_data.incoming_content_length)) {
  788. AWS_H2_STREAM_LOG(ERROR, stream, "Invalid content-length value");
  789. goto malformed;
  790. }
  791. stream->thread_data.content_length_received = true;
  792. } break;
  793. default:
  794. break;
  795. }
  796. }
  797. if (stream->base.on_incoming_headers) {
  798. if (stream->base.on_incoming_headers(&stream->base, block_type, header, 1, stream->base.user_data)) {
  799. AWS_H2_STREAM_LOGF(
  800. ERROR, stream, "Incoming header callback raised error, %s", aws_error_name(aws_last_error()));
  801. return s_send_rst_and_close_stream(stream, aws_h2err_from_last_error());
  802. }
  803. }
  804. return AWS_H2ERR_SUCCESS;
  805. malformed:
  806. /* RFC-9113 8.1.1 Malformed requests or responses that are detected MUST be treated as a stream error
  807. * (Section 5.4.2) of type PROTOCOL_ERROR.*/
  808. return s_send_rst_and_close_stream(stream, aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR));
  809. }
  810. struct aws_h2err aws_h2_stream_on_decoder_headers_end(
  811. struct aws_h2_stream *stream,
  812. bool malformed,
  813. enum aws_http_header_block block_type) {
  814. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  815. /* Not calling s_check_state_allows_frame_type() here because we already checked
  816. * at start of HEADERS frame in aws_h2_stream_on_decoder_headers_begin() */
  817. if (malformed) {
  818. AWS_H2_STREAM_LOG(ERROR, stream, "Headers are malformed");
  819. return s_send_rst_and_close_stream(stream, aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR));
  820. }
  821. switch (block_type) {
  822. case AWS_HTTP_HEADER_BLOCK_INFORMATIONAL:
  823. AWS_H2_STREAM_LOG(TRACE, stream, "Informational 1xx header-block done.");
  824. break;
  825. case AWS_HTTP_HEADER_BLOCK_MAIN:
  826. AWS_H2_STREAM_LOG(TRACE, stream, "Main header-block done.");
  827. stream->thread_data.received_main_headers = true;
  828. break;
  829. case AWS_HTTP_HEADER_BLOCK_TRAILING:
  830. AWS_H2_STREAM_LOG(TRACE, stream, "Trailing 1xx header-block done.");
  831. break;
  832. default:
  833. AWS_ASSERT(0);
  834. }
  835. if (stream->base.on_incoming_header_block_done) {
  836. if (stream->base.on_incoming_header_block_done(&stream->base, block_type, stream->base.user_data)) {
  837. AWS_H2_STREAM_LOGF(
  838. ERROR,
  839. stream,
  840. "Incoming-header-block-done callback raised error, %s",
  841. aws_error_name(aws_last_error()));
  842. return s_send_rst_and_close_stream(stream, aws_h2err_from_last_error());
  843. }
  844. }
  845. return AWS_H2ERR_SUCCESS;
  846. }
  847. struct aws_h2err aws_h2_stream_on_decoder_push_promise(struct aws_h2_stream *stream, uint32_t promised_stream_id) {
  848. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  849. struct aws_h2err stream_err = s_check_state_allows_frame_type(stream, AWS_H2_FRAME_T_PUSH_PROMISE);
  850. if (aws_h2err_failed(stream_err)) {
  851. return s_send_rst_and_close_stream(stream, stream_err);
  852. }
  853. /* Note: Until we have a need for it, PUSH_PROMISE is not a fully supported feature.
  854. * Promised streams are automatically rejected in a manner compliant with RFC-7540. */
  855. AWS_H2_STREAM_LOG(DEBUG, stream, "Automatically rejecting promised stream, PUSH_PROMISE is not fully supported");
  856. if (aws_h2_connection_send_rst_and_close_reserved_stream(
  857. s_get_h2_connection(stream), promised_stream_id, AWS_HTTP2_ERR_REFUSED_STREAM)) {
  858. return aws_h2err_from_last_error();
  859. }
  860. return AWS_H2ERR_SUCCESS;
  861. }
  862. static int s_stream_send_update_window(struct aws_h2_stream *stream, uint32_t window_size) {
  863. struct aws_h2_frame *stream_window_update_frame =
  864. aws_h2_frame_new_window_update(stream->base.alloc, stream->base.id, window_size);
  865. if (!stream_window_update_frame) {
  866. AWS_H2_STREAM_LOGF(
  867. ERROR,
  868. stream,
  869. "WINDOW_UPDATE frame on stream failed to be sent, error %s",
  870. aws_error_name(aws_last_error()));
  871. return AWS_OP_ERR;
  872. }
  873. aws_h2_connection_enqueue_outgoing_frame(s_get_h2_connection(stream), stream_window_update_frame);
  874. stream->thread_data.window_size_self += window_size;
  875. return AWS_OP_SUCCESS;
  876. }
  877. struct aws_h2err aws_h2_stream_on_decoder_data_begin(
  878. struct aws_h2_stream *stream,
  879. uint32_t payload_len,
  880. uint32_t total_padding_bytes,
  881. bool end_stream) {
  882. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  883. struct aws_h2err stream_err = s_check_state_allows_frame_type(stream, AWS_H2_FRAME_T_DATA);
  884. if (aws_h2err_failed(stream_err)) {
  885. return s_send_rst_and_close_stream(stream, stream_err);
  886. }
  887. if (!stream->thread_data.received_main_headers) {
  888. AWS_H2_STREAM_LOG(ERROR, stream, "Malformed message, received DATA before main HEADERS");
  889. return s_send_rst_and_close_stream(stream, aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR));
  890. }
  891. if (stream->thread_data.content_length_received) {
  892. uint64_t data_len = payload_len - total_padding_bytes;
  893. if (aws_add_u64_checked(
  894. stream->thread_data.incoming_data_length, data_len, &stream->thread_data.incoming_data_length)) {
  895. return s_send_rst_and_close_stream(stream, aws_h2err_from_aws_code(AWS_ERROR_OVERFLOW_DETECTED));
  896. }
  897. if (stream->thread_data.incoming_data_length > stream->thread_data.incoming_content_length) {
  898. AWS_H2_STREAM_LOGF(
  899. ERROR,
  900. stream,
  901. "Total received data payload=%" PRIu64 " has exceed the received content-length header, which=%" PRIi64
  902. ". Closing malformed stream",
  903. stream->thread_data.incoming_data_length,
  904. stream->thread_data.incoming_content_length);
  905. return s_send_rst_and_close_stream(stream, aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR));
  906. }
  907. }
  908. /* RFC-7540 6.9.1:
  909. * The sender MUST NOT send a flow-controlled frame with a length that exceeds
  910. * the space available in either of the flow-control windows advertised by the receiver.
  911. * Frames with zero length with the END_STREAM flag set (that is, an empty DATA frame)
  912. * MAY be sent if there is no available space in either flow-control window. */
  913. if ((int32_t)payload_len > stream->thread_data.window_size_self && payload_len != 0) {
  914. AWS_H2_STREAM_LOGF(
  915. ERROR,
  916. stream,
  917. "DATA length=%" PRIu32 " exceeds flow-control window=%" PRIi64,
  918. payload_len,
  919. stream->thread_data.window_size_self);
  920. return s_send_rst_and_close_stream(stream, aws_h2err_from_h2_code(AWS_HTTP2_ERR_FLOW_CONTROL_ERROR));
  921. }
  922. stream->thread_data.window_size_self -= payload_len;
  923. /* If stream isn't over, we may need to send automatic window updates to keep data flowing */
  924. if (!end_stream) {
  925. uint32_t auto_window_update;
  926. if (stream->base.owning_connection->stream_manual_window_management) {
  927. /* Automatically update the flow-window to account for padding, even though "manual window management"
  928. * is enabled, because the current API doesn't have any way to inform the user about padding,
  929. * so we can't expect them to manage it themselves. */
  930. auto_window_update = total_padding_bytes;
  931. } else {
  932. /* Automatically update the full amount we just received */
  933. auto_window_update = payload_len;
  934. }
  935. if (auto_window_update != 0) {
  936. if (s_stream_send_update_window(stream, auto_window_update)) {
  937. return aws_h2err_from_last_error();
  938. }
  939. AWS_H2_STREAM_LOGF(
  940. TRACE,
  941. stream,
  942. "Automatically updating stream window by %" PRIu32 "(%" PRIu32 " due to padding).",
  943. auto_window_update,
  944. total_padding_bytes);
  945. }
  946. }
  947. return AWS_H2ERR_SUCCESS;
  948. }
  949. struct aws_h2err aws_h2_stream_on_decoder_data_i(struct aws_h2_stream *stream, struct aws_byte_cursor data) {
  950. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  951. /* Not calling s_check_state_allows_frame_type() here because we already checked at start of DATA frame in
  952. * aws_h2_stream_on_decoder_data_begin() */
  953. if (stream->base.on_incoming_body) {
  954. if (stream->base.on_incoming_body(&stream->base, &data, stream->base.user_data)) {
  955. AWS_H2_STREAM_LOGF(
  956. ERROR, stream, "Incoming body callback raised error, %s", aws_error_name(aws_last_error()));
  957. return s_send_rst_and_close_stream(stream, aws_h2err_from_last_error());
  958. }
  959. }
  960. return AWS_H2ERR_SUCCESS;
  961. }
  962. struct aws_h2err aws_h2_stream_on_decoder_window_update(
  963. struct aws_h2_stream *stream,
  964. uint32_t window_size_increment,
  965. bool *window_resume) {
  966. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  967. *window_resume = false;
  968. struct aws_h2err stream_err = s_check_state_allows_frame_type(stream, AWS_H2_FRAME_T_WINDOW_UPDATE);
  969. if (aws_h2err_failed(stream_err)) {
  970. return s_send_rst_and_close_stream(stream, stream_err);
  971. }
  972. if (window_size_increment == 0) {
  973. /* flow-control window increment of 0 MUST be treated as error (RFC7540 6.9.1) */
  974. AWS_H2_STREAM_LOG(ERROR, stream, "Window update frame with 0 increment size");
  975. return s_send_rst_and_close_stream(stream, aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR));
  976. }
  977. int32_t old_window_size = stream->thread_data.window_size_peer;
  978. stream_err = (aws_h2_stream_window_size_change(stream, window_size_increment, false /*self*/));
  979. if (aws_h2err_failed(stream_err)) {
  980. /* We MUST NOT allow a flow-control window to exceed the max */
  981. AWS_H2_STREAM_LOG(
  982. ERROR, stream, "Window update frame causes the stream flow-control window to exceed the maximum size");
  983. return s_send_rst_and_close_stream(stream, stream_err);
  984. }
  985. if (stream->thread_data.window_size_peer > AWS_H2_MIN_WINDOW_SIZE && old_window_size <= AWS_H2_MIN_WINDOW_SIZE) {
  986. *window_resume = true;
  987. }
  988. return AWS_H2ERR_SUCCESS;
  989. }
  990. struct aws_h2err aws_h2_stream_on_decoder_end_stream(struct aws_h2_stream *stream) {
  991. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  992. /* Not calling s_check_state_allows_frame_type() here because END_STREAM isn't
  993. * an actual frame type. It's a flag on DATA or HEADERS frames, and we
  994. * already checked the legality of those frames in their respective callbacks. */
  995. if (stream->thread_data.content_length_received) {
  996. if (stream->base.request_method != AWS_HTTP_METHOD_HEAD &&
  997. stream->base.client_data->response_status != AWS_HTTP_STATUS_CODE_304_NOT_MODIFIED) {
  998. /**
  999. * RFC-9110 8.6.
  1000. * A server MAY send a Content-Length header field in a response to a HEAD request.
  1001. * A server MAY send a Content-Length header field in a 304 (Not Modified) response.
  1002. * But both of these condition will have no body receive.
  1003. */
  1004. if (stream->thread_data.incoming_data_length != stream->thread_data.incoming_content_length) {
  1005. /**
  1006. * RFC-9113 8.1.1:
  1007. * A request or response is also malformed if the value of a content-length header field does not equal
  1008. * the sum of the DATA frame payload lengths that form the content, unless the message is defined as
  1009. * having no content.
  1010. *
  1011. * Clients MUST NOT accept a malformed response.
  1012. */
  1013. AWS_H2_STREAM_LOGF(
  1014. ERROR,
  1015. stream,
  1016. "Total received data payload=%" PRIu64
  1017. " does not match the received content-length header, which=%" PRIi64 ". Closing malformed stream",
  1018. stream->thread_data.incoming_data_length,
  1019. stream->thread_data.incoming_content_length);
  1020. return s_send_rst_and_close_stream(stream, aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR));
  1021. }
  1022. }
  1023. }
  1024. if (stream->thread_data.state == AWS_H2_STREAM_STATE_HALF_CLOSED_LOCAL) {
  1025. /* Both sides have sent END_STREAM */
  1026. stream->thread_data.state = AWS_H2_STREAM_STATE_CLOSED;
  1027. AWS_H2_STREAM_LOG(TRACE, stream, "Received END_STREAM. State -> CLOSED");
  1028. /* Tell connection that stream is now closed */
  1029. if (aws_h2_connection_on_stream_closed(
  1030. s_get_h2_connection(stream),
  1031. stream,
  1032. AWS_H2_STREAM_CLOSED_WHEN_BOTH_SIDES_END_STREAM,
  1033. AWS_ERROR_SUCCESS)) {
  1034. return aws_h2err_from_last_error();
  1035. }
  1036. } else {
  1037. /* Else can't close until our side sends END_STREAM */
  1038. stream->thread_data.state = AWS_H2_STREAM_STATE_HALF_CLOSED_REMOTE;
  1039. AWS_H2_STREAM_LOG(TRACE, stream, "Received END_STREAM. State -> HALF_CLOSED_REMOTE");
  1040. }
  1041. return AWS_H2ERR_SUCCESS;
  1042. }
  1043. struct aws_h2err aws_h2_stream_on_decoder_rst_stream(struct aws_h2_stream *stream, uint32_t h2_error_code) {
  1044. AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
  1045. /* Check that this state allows RST_STREAM. */
  1046. struct aws_h2err err = s_check_state_allows_frame_type(stream, AWS_H2_FRAME_T_RST_STREAM);
  1047. if (aws_h2err_failed(err)) {
  1048. /* Usually we send a RST_STREAM when the state doesn't allow a frame type, but RFC-7540 5.4.2 says:
  1049. * "To avoid looping, an endpoint MUST NOT send a RST_STREAM in response to a RST_STREAM frame." */
  1050. return err;
  1051. }
  1052. /* RFC-7540 8.1 - a server MAY request that the client abort transmission of a request without error by sending a
  1053. * RST_STREAM with an error code of NO_ERROR after sending a complete response (i.e., a frame with the END_STREAM
  1054. * flag). Clients MUST NOT discard responses as a result of receiving such a RST_STREAM */
  1055. int aws_error_code;
  1056. if (stream->base.client_data && (h2_error_code == AWS_HTTP2_ERR_NO_ERROR) &&
  1057. (stream->thread_data.state == AWS_H2_STREAM_STATE_HALF_CLOSED_REMOTE)) {
  1058. aws_error_code = AWS_ERROR_SUCCESS;
  1059. } else {
  1060. aws_error_code = AWS_ERROR_HTTP_RST_STREAM_RECEIVED;
  1061. AWS_H2_STREAM_LOGF(
  1062. ERROR,
  1063. stream,
  1064. "Peer terminated stream with HTTP/2 RST_STREAM frame, error-code=0x%x(%s)",
  1065. h2_error_code,
  1066. aws_http2_error_code_to_str(h2_error_code));
  1067. }
  1068. stream->thread_data.state = AWS_H2_STREAM_STATE_CLOSED;
  1069. stream->received_reset_error_code = h2_error_code;
  1070. AWS_H2_STREAM_LOGF(
  1071. TRACE,
  1072. stream,
  1073. "Received RST_STREAM code=0x%x(%s). State -> CLOSED",
  1074. h2_error_code,
  1075. aws_http2_error_code_to_str(h2_error_code));
  1076. if (aws_h2_connection_on_stream_closed(
  1077. s_get_h2_connection(stream), stream, AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_RECEIVED, aws_error_code)) {
  1078. return aws_h2err_from_last_error();
  1079. }
  1080. return AWS_H2ERR_SUCCESS;
  1081. }
  1082. static int s_stream_write_data(
  1083. struct aws_http_stream *stream_base,
  1084. const struct aws_http2_stream_write_data_options *options) {
  1085. struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base);
  1086. if (!stream->manual_write) {
  1087. AWS_H2_STREAM_LOG(
  1088. ERROR,
  1089. stream,
  1090. "Manual writes are not enabled. You need to enable manual writes using by setting "
  1091. "'http2_use_manual_data_writes' to true in 'aws_http_make_request_options'");
  1092. return aws_raise_error(AWS_ERROR_HTTP_MANUAL_WRITE_NOT_ENABLED);
  1093. }
  1094. struct aws_h2_connection *connection = s_get_h2_connection(stream);
  1095. /* queue this new write into the pending write list for the stream */
  1096. struct aws_h2_stream_data_write *pending_write =
  1097. aws_mem_calloc(stream->base.alloc, 1, sizeof(struct aws_h2_stream_data_write));
  1098. if (options->data) {
  1099. pending_write->data_stream = aws_input_stream_acquire(options->data);
  1100. } else {
  1101. struct aws_byte_cursor empty_cursor;
  1102. AWS_ZERO_STRUCT(empty_cursor);
  1103. pending_write->data_stream = aws_input_stream_new_from_cursor(stream->base.alloc, &empty_cursor);
  1104. }
  1105. bool schedule_cross_thread_work = false;
  1106. { /* BEGIN CRITICAL SECTION */
  1107. s_lock_synced_data(stream);
  1108. {
  1109. if (stream->synced_data.api_state != AWS_H2_STREAM_API_STATE_ACTIVE) {
  1110. s_unlock_synced_data(stream);
  1111. int error_code = stream->synced_data.api_state == AWS_H2_STREAM_API_STATE_INIT
  1112. ? AWS_ERROR_HTTP_STREAM_NOT_ACTIVATED
  1113. : AWS_ERROR_HTTP_STREAM_HAS_COMPLETED;
  1114. s_stream_data_write_destroy(stream, pending_write, error_code);
  1115. AWS_H2_STREAM_LOG(ERROR, stream, "Cannot write DATA frames to an inactive or closed stream");
  1116. return aws_raise_error(error_code);
  1117. }
  1118. if (stream->synced_data.manual_write_ended) {
  1119. s_unlock_synced_data(stream);
  1120. s_stream_data_write_destroy(stream, pending_write, AWS_ERROR_HTTP_MANUAL_WRITE_HAS_COMPLETED);
  1121. AWS_H2_STREAM_LOG(ERROR, stream, "Cannot write DATA frames to a stream after manual write ended");
  1122. /* Fail with error, otherwise, people can wait for on_complete callback that will never be invoked. */
  1123. return aws_raise_error(AWS_ERROR_HTTP_MANUAL_WRITE_HAS_COMPLETED);
  1124. }
  1125. /* Not setting this until we're sure we succeeded, so that callback doesn't fire on cleanup if we fail */
  1126. if (options->end_stream) {
  1127. stream->synced_data.manual_write_ended = true;
  1128. }
  1129. pending_write->end_stream = options->end_stream;
  1130. pending_write->on_complete = options->on_complete;
  1131. pending_write->user_data = options->user_data;
  1132. aws_linked_list_push_back(&stream->synced_data.pending_write_list, &pending_write->node);
  1133. schedule_cross_thread_work = !stream->synced_data.is_cross_thread_work_task_scheduled;
  1134. stream->synced_data.is_cross_thread_work_task_scheduled = true;
  1135. }
  1136. s_unlock_synced_data(stream);
  1137. } /* END CRITICAL SECTION */
  1138. if (schedule_cross_thread_work) {
  1139. AWS_H2_STREAM_LOG(TRACE, stream, "Scheduling stream cross-thread work task");
  1140. /* increment the refcount of stream to keep it alive until the task runs */
  1141. aws_atomic_fetch_add(&stream->base.refcount, 1);
  1142. aws_channel_schedule_task_now(connection->base.channel_slot->channel, &stream->cross_thread_work_task);
  1143. }
  1144. return AWS_OP_SUCCESS;
  1145. }