h2_connection.c 120 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/http/private/h2_connection.h>
  6. #include <aws/http/private/h2_stream.h>
  7. #include <aws/http/private/h2_decoder.h>
  8. #include <aws/http/private/h2_stream.h>
  9. #include <aws/http/private/strutil.h>
  10. #include <aws/common/clock.h>
  11. #include <aws/common/logging.h>
  12. #ifdef _MSC_VER
  13. # pragma warning(disable : 4204) /* non-constant aggregate initializer */
  14. #endif
  15. /* Apple toolchains such as xcode and swiftpm define the DEBUG symbol. undef it here so we can actually use the token */
  16. #undef DEBUG
  17. #define CONNECTION_LOGF(level, connection, text, ...) \
  18. AWS_LOGF_##level(AWS_LS_HTTP_CONNECTION, "id=%p: " text, (void *)(connection), __VA_ARGS__)
  19. #define CONNECTION_LOG(level, connection, text) CONNECTION_LOGF(level, connection, "%s", text)
  20. static int s_handler_process_read_message(
  21. struct aws_channel_handler *handler,
  22. struct aws_channel_slot *slot,
  23. struct aws_io_message *message);
  24. static int s_handler_process_write_message(
  25. struct aws_channel_handler *handler,
  26. struct aws_channel_slot *slot,
  27. struct aws_io_message *message);
  28. static int s_handler_increment_read_window(
  29. struct aws_channel_handler *handler,
  30. struct aws_channel_slot *slot,
  31. size_t size);
  32. static int s_handler_shutdown(
  33. struct aws_channel_handler *handler,
  34. struct aws_channel_slot *slot,
  35. enum aws_channel_direction dir,
  36. int error_code,
  37. bool free_scarce_resources_immediately);
  38. static size_t s_handler_initial_window_size(struct aws_channel_handler *handler);
  39. static size_t s_handler_message_overhead(struct aws_channel_handler *handler);
  40. static void s_handler_destroy(struct aws_channel_handler *handler);
  41. static void s_handler_installed(struct aws_channel_handler *handler, struct aws_channel_slot *slot);
  42. static struct aws_http_stream *s_connection_make_request(
  43. struct aws_http_connection *client_connection,
  44. const struct aws_http_make_request_options *options);
  45. static void s_connection_close(struct aws_http_connection *connection_base);
  46. static void s_connection_stop_new_request(struct aws_http_connection *connection_base);
  47. static bool s_connection_is_open(const struct aws_http_connection *connection_base);
  48. static bool s_connection_new_requests_allowed(const struct aws_http_connection *connection_base);
  49. static void s_connection_update_window(struct aws_http_connection *connection_base, uint32_t increment_size);
  50. static int s_connection_change_settings(
  51. struct aws_http_connection *connection_base,
  52. const struct aws_http2_setting *settings_array,
  53. size_t num_settings,
  54. aws_http2_on_change_settings_complete_fn *on_completed,
  55. void *user_data);
  56. static int s_connection_send_ping(
  57. struct aws_http_connection *connection_base,
  58. const struct aws_byte_cursor *optional_opaque_data,
  59. aws_http2_on_ping_complete_fn *on_completed,
  60. void *user_data);
  61. static void s_connection_send_goaway(
  62. struct aws_http_connection *connection_base,
  63. uint32_t http2_error,
  64. bool allow_more_streams,
  65. const struct aws_byte_cursor *optional_debug_data);
  66. static int s_connection_get_sent_goaway(
  67. struct aws_http_connection *connection_base,
  68. uint32_t *out_http2_error,
  69. uint32_t *out_last_stream_id);
  70. static int s_connection_get_received_goaway(
  71. struct aws_http_connection *connection_base,
  72. uint32_t *out_http2_error,
  73. uint32_t *out_last_stream_id);
  74. static void s_connection_get_local_settings(
  75. const struct aws_http_connection *connection_base,
  76. struct aws_http2_setting out_settings[AWS_HTTP2_SETTINGS_COUNT]);
  77. static void s_connection_get_remote_settings(
  78. const struct aws_http_connection *connection_base,
  79. struct aws_http2_setting out_settings[AWS_HTTP2_SETTINGS_COUNT]);
  80. static void s_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
  81. static void s_outgoing_frames_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
  82. static int s_encode_outgoing_frames_queue(struct aws_h2_connection *connection, struct aws_byte_buf *output);
  83. static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connection, struct aws_byte_buf *output);
  84. static int s_record_closed_stream(
  85. struct aws_h2_connection *connection,
  86. uint32_t stream_id,
  87. enum aws_h2_stream_closed_when closed_when);
  88. static void s_stream_complete(struct aws_h2_connection *connection, struct aws_h2_stream *stream, int error_code);
  89. static void s_write_outgoing_frames(struct aws_h2_connection *connection, bool first_try);
  90. static void s_finish_shutdown(struct aws_h2_connection *connection);
  91. static void s_send_goaway(
  92. struct aws_h2_connection *connection,
  93. uint32_t h2_error_code,
  94. bool allow_more_streams,
  95. const struct aws_byte_cursor *optional_debug_data);
  96. static struct aws_h2_pending_settings *s_new_pending_settings(
  97. struct aws_allocator *allocator,
  98. const struct aws_http2_setting *settings_array,
  99. size_t num_settings,
  100. aws_http2_on_change_settings_complete_fn *on_completed,
  101. void *user_data);
  102. static struct aws_h2err s_decoder_on_headers_begin(uint32_t stream_id, void *userdata);
  103. static struct aws_h2err s_decoder_on_headers_i(
  104. uint32_t stream_id,
  105. const struct aws_http_header *header,
  106. enum aws_http_header_name name_enum,
  107. enum aws_http_header_block block_type,
  108. void *userdata);
  109. static struct aws_h2err s_decoder_on_headers_end(
  110. uint32_t stream_id,
  111. bool malformed,
  112. enum aws_http_header_block block_type,
  113. void *userdata);
  114. static struct aws_h2err s_decoder_on_push_promise(uint32_t stream_id, uint32_t promised_stream_id, void *userdata);
  115. static struct aws_h2err s_decoder_on_data_begin(
  116. uint32_t stream_id,
  117. uint32_t payload_len,
  118. uint32_t total_padding_bytes,
  119. bool end_stream,
  120. void *userdata);
  121. static struct aws_h2err s_decoder_on_data_i(uint32_t stream_id, struct aws_byte_cursor data, void *userdata);
  122. static struct aws_h2err s_decoder_on_end_stream(uint32_t stream_id, void *userdata);
  123. static struct aws_h2err s_decoder_on_rst_stream(uint32_t stream_id, uint32_t h2_error_code, void *userdata);
  124. static struct aws_h2err s_decoder_on_ping_ack(uint8_t opaque_data[AWS_HTTP2_PING_DATA_SIZE], void *userdata);
  125. static struct aws_h2err s_decoder_on_ping(uint8_t opaque_data[AWS_HTTP2_PING_DATA_SIZE], void *userdata);
  126. static struct aws_h2err s_decoder_on_settings(
  127. const struct aws_http2_setting *settings_array,
  128. size_t num_settings,
  129. void *userdata);
  130. static struct aws_h2err s_decoder_on_settings_ack(void *userdata);
  131. static struct aws_h2err s_decoder_on_window_update(uint32_t stream_id, uint32_t window_size_increment, void *userdata);
  132. struct aws_h2err s_decoder_on_goaway(
  133. uint32_t last_stream,
  134. uint32_t error_code,
  135. struct aws_byte_cursor debug_data,
  136. void *userdata);
  137. static void s_reset_statistics(struct aws_channel_handler *handler);
  138. static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_array_list *stats);
  139. static struct aws_http_connection_vtable s_h2_connection_vtable = {
  140. .channel_handler_vtable =
  141. {
  142. .process_read_message = s_handler_process_read_message,
  143. .process_write_message = s_handler_process_write_message,
  144. .increment_read_window = s_handler_increment_read_window,
  145. .shutdown = s_handler_shutdown,
  146. .initial_window_size = s_handler_initial_window_size,
  147. .message_overhead = s_handler_message_overhead,
  148. .destroy = s_handler_destroy,
  149. .reset_statistics = s_reset_statistics,
  150. .gather_statistics = s_gather_statistics,
  151. },
  152. .on_channel_handler_installed = s_handler_installed,
  153. .make_request = s_connection_make_request,
  154. .new_server_request_handler_stream = NULL,
  155. .stream_send_response = NULL,
  156. .close = s_connection_close,
  157. .stop_new_requests = s_connection_stop_new_request,
  158. .is_open = s_connection_is_open,
  159. .new_requests_allowed = s_connection_new_requests_allowed,
  160. .update_window = s_connection_update_window,
  161. .change_settings = s_connection_change_settings,
  162. .send_ping = s_connection_send_ping,
  163. .send_goaway = s_connection_send_goaway,
  164. .get_sent_goaway = s_connection_get_sent_goaway,
  165. .get_received_goaway = s_connection_get_received_goaway,
  166. .get_local_settings = s_connection_get_local_settings,
  167. .get_remote_settings = s_connection_get_remote_settings,
  168. };
  169. static const struct aws_h2_decoder_vtable s_h2_decoder_vtable = {
  170. .on_headers_begin = s_decoder_on_headers_begin,
  171. .on_headers_i = s_decoder_on_headers_i,
  172. .on_headers_end = s_decoder_on_headers_end,
  173. .on_push_promise_begin = s_decoder_on_push_promise,
  174. .on_data_begin = s_decoder_on_data_begin,
  175. .on_data_i = s_decoder_on_data_i,
  176. .on_end_stream = s_decoder_on_end_stream,
  177. .on_rst_stream = s_decoder_on_rst_stream,
  178. .on_ping_ack = s_decoder_on_ping_ack,
  179. .on_ping = s_decoder_on_ping,
  180. .on_settings = s_decoder_on_settings,
  181. .on_settings_ack = s_decoder_on_settings_ack,
  182. .on_window_update = s_decoder_on_window_update,
  183. .on_goaway = s_decoder_on_goaway,
  184. };
  185. static void s_lock_synced_data(struct aws_h2_connection *connection) {
  186. int err = aws_mutex_lock(&connection->synced_data.lock);
  187. AWS_ASSERT(!err && "lock failed");
  188. (void)err;
  189. }
  190. static void s_unlock_synced_data(struct aws_h2_connection *connection) {
  191. int err = aws_mutex_unlock(&connection->synced_data.lock);
  192. AWS_ASSERT(!err && "unlock failed");
  193. (void)err;
  194. }
  195. static void s_acquire_stream_and_connection_lock(struct aws_h2_stream *stream, struct aws_h2_connection *connection) {
  196. int err = aws_mutex_lock(&stream->synced_data.lock);
  197. err |= aws_mutex_lock(&connection->synced_data.lock);
  198. AWS_ASSERT(!err && "lock connection and stream failed");
  199. (void)err;
  200. }
  201. static void s_release_stream_and_connection_lock(struct aws_h2_stream *stream, struct aws_h2_connection *connection) {
  202. int err = aws_mutex_unlock(&connection->synced_data.lock);
  203. err |= aws_mutex_unlock(&stream->synced_data.lock);
  204. AWS_ASSERT(!err && "unlock connection and stream failed");
  205. (void)err;
  206. }
  207. static void s_add_time_measurement_to_stats(uint64_t start_ns, uint64_t end_ns, uint64_t *output_ms) {
  208. if (end_ns > start_ns) {
  209. *output_ms += aws_timestamp_convert(end_ns - start_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
  210. } else {
  211. *output_ms = 0;
  212. }
  213. }
  214. /**
  215. * Internal function for bringing connection to a stop.
  216. * Invoked multiple times, including when:
  217. * - Channel is shutting down in the read direction.
  218. * - Channel is shutting down in the write direction.
  219. * - An error occurs that will shutdown the channel.
  220. * - User wishes to close the connection (this is the only case where the function may run off-thread).
  221. */
  222. static void s_stop(
  223. struct aws_h2_connection *connection,
  224. bool stop_reading,
  225. bool stop_writing,
  226. bool schedule_shutdown,
  227. int error_code) {
  228. AWS_ASSERT(stop_reading || stop_writing || schedule_shutdown); /* You are required to stop at least 1 thing */
  229. if (stop_reading) {
  230. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  231. connection->thread_data.is_reading_stopped = true;
  232. }
  233. if (stop_writing) {
  234. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  235. connection->thread_data.is_writing_stopped = true;
  236. }
  237. /* Even if we're not scheduling shutdown just yet (ex: sent final request but waiting to read final response)
  238. * we don't consider the connection "open" anymore so user can't create more streams */
  239. { /* BEGIN CRITICAL SECTION */
  240. s_lock_synced_data(connection);
  241. connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
  242. connection->synced_data.is_open = false;
  243. s_unlock_synced_data(connection);
  244. } /* END CRITICAL SECTION */
  245. if (schedule_shutdown) {
  246. AWS_LOGF_INFO(
  247. AWS_LS_HTTP_CONNECTION,
  248. "id=%p: Shutting down connection with error code %d (%s).",
  249. (void *)&connection->base,
  250. error_code,
  251. aws_error_name(error_code));
  252. aws_channel_shutdown(connection->base.channel_slot->channel, error_code);
  253. }
  254. }
  255. void aws_h2_connection_shutdown_due_to_write_err(struct aws_h2_connection *connection, int error_code) {
  256. AWS_PRECONDITION(error_code);
  257. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  258. if (connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written) {
  259. /* If shutdown is waiting for writes to complete, but writes are now broken,
  260. * then we must finish shutdown now */
  261. s_finish_shutdown(connection);
  262. } else {
  263. s_stop(connection, false /*stop_reading*/, true /*stop_writing*/, true /*schedule_shutdown*/, error_code);
  264. }
  265. }
  266. /* Common new() logic for server & client */
  267. static struct aws_h2_connection *s_connection_new(
  268. struct aws_allocator *alloc,
  269. bool manual_window_management,
  270. const struct aws_http2_connection_options *http2_options,
  271. bool server) {
  272. AWS_PRECONDITION(http2_options);
  273. struct aws_h2_connection *connection = aws_mem_calloc(alloc, 1, sizeof(struct aws_h2_connection));
  274. if (!connection) {
  275. return NULL;
  276. }
  277. connection->base.vtable = &s_h2_connection_vtable;
  278. connection->base.alloc = alloc;
  279. connection->base.channel_handler.vtable = &s_h2_connection_vtable.channel_handler_vtable;
  280. connection->base.channel_handler.alloc = alloc;
  281. connection->base.channel_handler.impl = connection;
  282. connection->base.http_version = AWS_HTTP_VERSION_2;
  283. /* Init the next stream id (server must use even ids, client odd [RFC 7540 5.1.1])*/
  284. connection->base.next_stream_id = (server ? 2 : 1);
  285. /* Stream window management */
  286. connection->base.stream_manual_window_management = manual_window_management;
  287. /* Connection window management */
  288. connection->conn_manual_window_management = http2_options->conn_manual_window_management;
  289. connection->on_goaway_received = http2_options->on_goaway_received;
  290. connection->on_remote_settings_change = http2_options->on_remote_settings_change;
  291. aws_channel_task_init(
  292. &connection->cross_thread_work_task, s_cross_thread_work_task, connection, "HTTP/2 cross-thread work");
  293. aws_channel_task_init(
  294. &connection->outgoing_frames_task, s_outgoing_frames_task, connection, "HTTP/2 outgoing frames");
  295. /* 1 refcount for user */
  296. aws_atomic_init_int(&connection->base.refcount, 1);
  297. uint32_t max_stream_id = AWS_H2_STREAM_ID_MAX;
  298. connection->synced_data.goaway_sent_last_stream_id = max_stream_id + 1;
  299. connection->synced_data.goaway_received_last_stream_id = max_stream_id + 1;
  300. aws_linked_list_init(&connection->synced_data.pending_stream_list);
  301. aws_linked_list_init(&connection->synced_data.pending_frame_list);
  302. aws_linked_list_init(&connection->synced_data.pending_settings_list);
  303. aws_linked_list_init(&connection->synced_data.pending_ping_list);
  304. aws_linked_list_init(&connection->synced_data.pending_goaway_list);
  305. aws_linked_list_init(&connection->thread_data.outgoing_streams_list);
  306. aws_linked_list_init(&connection->thread_data.pending_settings_queue);
  307. aws_linked_list_init(&connection->thread_data.pending_ping_queue);
  308. aws_linked_list_init(&connection->thread_data.stalled_window_streams_list);
  309. aws_linked_list_init(&connection->thread_data.waiting_streams_list);
  310. aws_linked_list_init(&connection->thread_data.outgoing_frames_queue);
  311. if (aws_mutex_init(&connection->synced_data.lock)) {
  312. CONNECTION_LOGF(
  313. ERROR, connection, "Mutex init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error()));
  314. goto error;
  315. }
  316. if (aws_hash_table_init(
  317. &connection->thread_data.active_streams_map, alloc, 8, aws_hash_ptr, aws_ptr_eq, NULL, NULL)) {
  318. CONNECTION_LOGF(
  319. ERROR, connection, "Hashtable init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error()));
  320. goto error;
  321. }
  322. size_t max_closed_streams = AWS_HTTP2_DEFAULT_MAX_CLOSED_STREAMS;
  323. if (http2_options->max_closed_streams) {
  324. max_closed_streams = http2_options->max_closed_streams;
  325. }
  326. connection->thread_data.closed_streams =
  327. aws_cache_new_fifo(alloc, aws_hash_ptr, aws_ptr_eq, NULL, NULL, max_closed_streams);
  328. if (!connection->thread_data.closed_streams) {
  329. CONNECTION_LOGF(
  330. ERROR, connection, "FIFO cache init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error()));
  331. goto error;
  332. }
  333. /* Initialize the value of settings */
  334. memcpy(connection->thread_data.settings_peer, aws_h2_settings_initial, sizeof(aws_h2_settings_initial));
  335. memcpy(connection->thread_data.settings_self, aws_h2_settings_initial, sizeof(aws_h2_settings_initial));
  336. memcpy(connection->synced_data.settings_peer, aws_h2_settings_initial, sizeof(aws_h2_settings_initial));
  337. memcpy(connection->synced_data.settings_self, aws_h2_settings_initial, sizeof(aws_h2_settings_initial));
  338. connection->thread_data.window_size_peer = AWS_H2_INIT_WINDOW_SIZE;
  339. connection->thread_data.window_size_self = AWS_H2_INIT_WINDOW_SIZE;
  340. connection->thread_data.goaway_received_last_stream_id = AWS_H2_STREAM_ID_MAX;
  341. connection->thread_data.goaway_sent_last_stream_id = AWS_H2_STREAM_ID_MAX;
  342. aws_crt_statistics_http2_channel_init(&connection->thread_data.stats);
  343. connection->thread_data.stats.was_inactive = true; /* Start with non active streams */
  344. connection->synced_data.is_open = true;
  345. connection->synced_data.new_stream_error_code = AWS_ERROR_SUCCESS;
  346. /* Create a new decoder */
  347. struct aws_h2_decoder_params params = {
  348. .alloc = alloc,
  349. .vtable = &s_h2_decoder_vtable,
  350. .userdata = connection,
  351. .logging_id = connection,
  352. .is_server = server,
  353. };
  354. connection->thread_data.decoder = aws_h2_decoder_new(&params);
  355. if (!connection->thread_data.decoder) {
  356. CONNECTION_LOGF(
  357. ERROR, connection, "Decoder init error %d (%s)", aws_last_error(), aws_error_name(aws_last_error()));
  358. goto error;
  359. }
  360. if (aws_h2_frame_encoder_init(&connection->thread_data.encoder, alloc, &connection->base)) {
  361. CONNECTION_LOGF(
  362. ERROR, connection, "Encoder init error %d (%s)", aws_last_error(), aws_error_name(aws_last_error()));
  363. goto error;
  364. }
  365. /* User data from connection base is not ready until the handler installed */
  366. connection->thread_data.init_pending_settings = s_new_pending_settings(
  367. connection->base.alloc,
  368. http2_options->initial_settings_array,
  369. http2_options->num_initial_settings,
  370. http2_options->on_initial_settings_completed,
  371. NULL /* user_data is set later... */);
  372. if (!connection->thread_data.init_pending_settings) {
  373. goto error;
  374. }
  375. /* We enqueue the inital settings when handler get installed */
  376. return connection;
  377. error:
  378. s_handler_destroy(&connection->base.channel_handler);
  379. return NULL;
  380. }
  381. struct aws_http_connection *aws_http_connection_new_http2_server(
  382. struct aws_allocator *allocator,
  383. bool manual_window_management,
  384. const struct aws_http2_connection_options *http2_options) {
  385. struct aws_h2_connection *connection = s_connection_new(allocator, manual_window_management, http2_options, true);
  386. if (!connection) {
  387. return NULL;
  388. }
  389. connection->base.server_data = &connection->base.client_or_server_data.server;
  390. return &connection->base;
  391. }
  392. struct aws_http_connection *aws_http_connection_new_http2_client(
  393. struct aws_allocator *allocator,
  394. bool manual_window_management,
  395. const struct aws_http2_connection_options *http2_options) {
  396. struct aws_h2_connection *connection = s_connection_new(allocator, manual_window_management, http2_options, false);
  397. if (!connection) {
  398. return NULL;
  399. }
  400. connection->base.client_data = &connection->base.client_or_server_data.client;
  401. return &connection->base;
  402. }
  403. static void s_handler_destroy(struct aws_channel_handler *handler) {
  404. struct aws_h2_connection *connection = handler->impl;
  405. CONNECTION_LOG(TRACE, connection, "Destroying connection");
  406. /* No streams should be left in internal datastructures */
  407. AWS_ASSERT(
  408. !aws_hash_table_is_valid(&connection->thread_data.active_streams_map) ||
  409. aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0);
  410. AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.waiting_streams_list));
  411. AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.stalled_window_streams_list));
  412. AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.outgoing_streams_list));
  413. AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_stream_list));
  414. AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_frame_list));
  415. AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_settings_list));
  416. AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_ping_list));
  417. AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_goaway_list));
  418. AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.pending_ping_queue));
  419. AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.pending_settings_queue));
  420. /* Clean up any unsent frames and structures */
  421. struct aws_linked_list *outgoing_frames_queue = &connection->thread_data.outgoing_frames_queue;
  422. while (!aws_linked_list_empty(outgoing_frames_queue)) {
  423. struct aws_linked_list_node *node = aws_linked_list_pop_front(outgoing_frames_queue);
  424. struct aws_h2_frame *frame = AWS_CONTAINER_OF(node, struct aws_h2_frame, node);
  425. aws_h2_frame_destroy(frame);
  426. }
  427. if (connection->thread_data.init_pending_settings) {
  428. /* if initial settings were never sent, we need to clear the memory here */
  429. aws_mem_release(connection->base.alloc, connection->thread_data.init_pending_settings);
  430. }
  431. aws_h2_decoder_destroy(connection->thread_data.decoder);
  432. aws_h2_frame_encoder_clean_up(&connection->thread_data.encoder);
  433. aws_hash_table_clean_up(&connection->thread_data.active_streams_map);
  434. aws_cache_destroy(connection->thread_data.closed_streams);
  435. aws_mutex_clean_up(&connection->synced_data.lock);
  436. aws_mem_release(connection->base.alloc, connection);
  437. }
  438. static struct aws_h2_pending_settings *s_new_pending_settings(
  439. struct aws_allocator *allocator,
  440. const struct aws_http2_setting *settings_array,
  441. size_t num_settings,
  442. aws_http2_on_change_settings_complete_fn *on_completed,
  443. void *user_data) {
  444. size_t settings_storage_size = sizeof(struct aws_http2_setting) * num_settings;
  445. struct aws_h2_pending_settings *pending_settings;
  446. void *settings_storage;
  447. if (!aws_mem_acquire_many(
  448. allocator,
  449. 2,
  450. &pending_settings,
  451. sizeof(struct aws_h2_pending_settings),
  452. &settings_storage,
  453. settings_storage_size)) {
  454. return NULL;
  455. }
  456. AWS_ZERO_STRUCT(*pending_settings);
  457. /* We buffer the settings up, incase the caller has freed them when the ACK arrives */
  458. pending_settings->settings_array = settings_storage;
  459. if (settings_array) {
  460. memcpy(pending_settings->settings_array, settings_array, num_settings * sizeof(struct aws_http2_setting));
  461. }
  462. pending_settings->num_settings = num_settings;
  463. pending_settings->on_completed = on_completed;
  464. pending_settings->user_data = user_data;
  465. return pending_settings;
  466. }
  467. static struct aws_h2_pending_ping *s_new_pending_ping(
  468. struct aws_allocator *allocator,
  469. const struct aws_byte_cursor *optional_opaque_data,
  470. const uint64_t started_time,
  471. void *user_data,
  472. aws_http2_on_ping_complete_fn *on_completed) {
  473. struct aws_h2_pending_ping *pending_ping = aws_mem_calloc(allocator, 1, sizeof(struct aws_h2_pending_ping));
  474. if (!pending_ping) {
  475. return NULL;
  476. }
  477. if (optional_opaque_data) {
  478. memcpy(pending_ping->opaque_data, optional_opaque_data->ptr, AWS_HTTP2_PING_DATA_SIZE);
  479. }
  480. pending_ping->started_time = started_time;
  481. pending_ping->on_completed = on_completed;
  482. pending_ping->user_data = user_data;
  483. return pending_ping;
  484. }
  485. static struct aws_h2_pending_goaway *s_new_pending_goaway(
  486. struct aws_allocator *allocator,
  487. uint32_t http2_error,
  488. bool allow_more_streams,
  489. const struct aws_byte_cursor *optional_debug_data) {
  490. struct aws_byte_cursor debug_data;
  491. AWS_ZERO_STRUCT(debug_data);
  492. if (optional_debug_data) {
  493. debug_data = *optional_debug_data;
  494. }
  495. struct aws_h2_pending_goaway *pending_goaway;
  496. void *debug_data_storage;
  497. /* mem acquire cannot fail anymore */
  498. aws_mem_acquire_many(
  499. allocator, 2, &pending_goaway, sizeof(struct aws_h2_pending_goaway), &debug_data_storage, debug_data.len);
  500. if (debug_data.len) {
  501. memcpy(debug_data_storage, debug_data.ptr, debug_data.len);
  502. debug_data.ptr = debug_data_storage;
  503. }
  504. pending_goaway->debug_data = debug_data;
  505. pending_goaway->http2_error = http2_error;
  506. pending_goaway->allow_more_streams = allow_more_streams;
  507. return pending_goaway;
  508. }
  509. void aws_h2_connection_enqueue_outgoing_frame(struct aws_h2_connection *connection, struct aws_h2_frame *frame) {
  510. AWS_PRECONDITION(frame->type != AWS_H2_FRAME_T_DATA);
  511. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  512. if (frame->high_priority) {
  513. /* Check from the head of the queue, and find a node with normal priority, and insert before it */
  514. struct aws_linked_list_node *iter = aws_linked_list_begin(&connection->thread_data.outgoing_frames_queue);
  515. /* one past the last element */
  516. const struct aws_linked_list_node *end = aws_linked_list_end(&connection->thread_data.outgoing_frames_queue);
  517. while (iter != end) {
  518. struct aws_h2_frame *frame_i = AWS_CONTAINER_OF(iter, struct aws_h2_frame, node);
  519. if (connection->thread_data.current_outgoing_frame == frame_i) {
  520. iter = iter->next;
  521. continue;
  522. }
  523. if (!frame_i->high_priority) {
  524. break;
  525. }
  526. iter = iter->next;
  527. }
  528. aws_linked_list_insert_before(iter, &frame->node);
  529. } else {
  530. aws_linked_list_push_back(&connection->thread_data.outgoing_frames_queue, &frame->node);
  531. }
  532. }
  533. static void s_on_channel_write_complete(
  534. struct aws_channel *channel,
  535. struct aws_io_message *message,
  536. int err_code,
  537. void *user_data) {
  538. (void)message;
  539. struct aws_h2_connection *connection = user_data;
  540. if (err_code) {
  541. CONNECTION_LOGF(ERROR, connection, "Message did not write to network, error %s", aws_error_name(err_code));
  542. aws_h2_connection_shutdown_due_to_write_err(connection, err_code);
  543. return;
  544. }
  545. CONNECTION_LOG(TRACE, connection, "Message finished writing to network. Rescheduling outgoing frame task");
  546. /* To avoid wasting memory, we only want ONE of our written aws_io_messages in the channel at a time.
  547. * Therefore, we wait until it's written to the network before trying to send another
  548. * by running the outgoing-frame-task again.
  549. *
  550. * We also want to share the network with other channels.
  551. * Therefore, when the write completes, we SCHEDULE the outgoing-frame-task
  552. * to run again instead of calling the function directly.
  553. * This way, if the message completes synchronously,
  554. * we're not hogging the network by writing message after message in a tight loop */
  555. aws_channel_schedule_task_now(channel, &connection->outgoing_frames_task);
  556. }
  557. static void s_outgoing_frames_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  558. (void)task;
  559. if (status != AWS_TASK_STATUS_RUN_READY) {
  560. return;
  561. }
  562. struct aws_h2_connection *connection = arg;
  563. s_write_outgoing_frames(connection, false /*first_try*/);
  564. }
  565. static void s_write_outgoing_frames(struct aws_h2_connection *connection, bool first_try) {
  566. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  567. AWS_PRECONDITION(connection->thread_data.is_outgoing_frames_task_active);
  568. struct aws_channel_slot *channel_slot = connection->base.channel_slot;
  569. struct aws_linked_list *outgoing_frames_queue = &connection->thread_data.outgoing_frames_queue;
  570. struct aws_linked_list *outgoing_streams_list = &connection->thread_data.outgoing_streams_list;
  571. if (connection->thread_data.is_writing_stopped) {
  572. return;
  573. }
  574. /* Determine whether there's work to do, and end task immediately if there's not.
  575. * Note that we stop writing DATA frames if the channel is trying to shut down */
  576. bool has_control_frames = !aws_linked_list_empty(outgoing_frames_queue);
  577. bool has_data_frames = !aws_linked_list_empty(outgoing_streams_list);
  578. bool may_write_data_frames = (connection->thread_data.window_size_peer > AWS_H2_MIN_WINDOW_SIZE) &&
  579. !connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written;
  580. bool will_write = has_control_frames || (has_data_frames && may_write_data_frames);
  581. if (!will_write) {
  582. if (!first_try) {
  583. CONNECTION_LOGF(
  584. TRACE,
  585. connection,
  586. "Outgoing frames task stopped. has_control_frames:%d has_data_frames:%d may_write_data_frames:%d",
  587. has_control_frames,
  588. has_data_frames,
  589. may_write_data_frames);
  590. }
  591. connection->thread_data.is_outgoing_frames_task_active = false;
  592. if (connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written) {
  593. s_finish_shutdown(connection);
  594. }
  595. return;
  596. }
  597. if (first_try) {
  598. CONNECTION_LOG(TRACE, connection, "Starting outgoing frames task");
  599. }
  600. /* Acquire aws_io_message, that we will attempt to fill up */
  601. struct aws_io_message *msg = aws_channel_slot_acquire_max_message_for_write(channel_slot);
  602. if (AWS_UNLIKELY(!msg)) {
  603. CONNECTION_LOG(ERROR, connection, "Failed to acquire message from pool, closing connection.");
  604. goto error;
  605. }
  606. /* Set up callback so we can send another message when this one completes */
  607. msg->on_completion = s_on_channel_write_complete;
  608. msg->user_data = connection;
  609. CONNECTION_LOGF(
  610. TRACE,
  611. connection,
  612. "Outgoing frames task acquired message with %zu bytes available",
  613. msg->message_data.capacity - msg->message_data.len);
  614. /* Write as many frames from outgoing_frames_queue as possible. */
  615. if (s_encode_outgoing_frames_queue(connection, &msg->message_data)) {
  616. goto error;
  617. }
  618. /* If outgoing_frames_queue emptied, and connection is running normally,
  619. * then write as many DATA frames from outgoing_streams_list as possible. */
  620. if (aws_linked_list_empty(outgoing_frames_queue) && may_write_data_frames) {
  621. if (s_encode_data_from_outgoing_streams(connection, &msg->message_data)) {
  622. goto error;
  623. }
  624. }
  625. if (msg->message_data.len) {
  626. /* Write message to channel.
  627. * outgoing_frames_task will resume when message completes. */
  628. CONNECTION_LOGF(TRACE, connection, "Outgoing frames task sending message of size %zu", msg->message_data.len);
  629. if (aws_channel_slot_send_message(channel_slot, msg, AWS_CHANNEL_DIR_WRITE)) {
  630. CONNECTION_LOGF(
  631. ERROR,
  632. connection,
  633. "Failed to send channel message: %s. Closing connection.",
  634. aws_error_name(aws_last_error()));
  635. goto error;
  636. }
  637. } else {
  638. /* Message is empty, warn that no work is being done and reschedule the task to try again next tick.
  639. * It's likely that body isn't ready, so body streaming function has no data to write yet.
  640. * If this scenario turns out to be common we should implement a "pause" feature. */
  641. CONNECTION_LOG(WARN, connection, "Outgoing frames task sent no data, will try again next tick.");
  642. aws_mem_release(msg->allocator, msg);
  643. aws_channel_schedule_task_now(channel_slot->channel, &connection->outgoing_frames_task);
  644. }
  645. return;
  646. error:;
  647. int error_code = aws_last_error();
  648. if (msg) {
  649. aws_mem_release(msg->allocator, msg);
  650. }
  651. aws_h2_connection_shutdown_due_to_write_err(connection, error_code);
  652. }
  653. /* Write as many frames from outgoing_frames_queue as possible (contains all non-DATA frames) */
  654. static int s_encode_outgoing_frames_queue(struct aws_h2_connection *connection, struct aws_byte_buf *output) {
  655. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  656. struct aws_linked_list *outgoing_frames_queue = &connection->thread_data.outgoing_frames_queue;
  657. /* Write as many frames from outgoing_frames_queue as possible. */
  658. while (!aws_linked_list_empty(outgoing_frames_queue)) {
  659. struct aws_linked_list_node *frame_node = aws_linked_list_front(outgoing_frames_queue);
  660. struct aws_h2_frame *frame = AWS_CONTAINER_OF(frame_node, struct aws_h2_frame, node);
  661. connection->thread_data.current_outgoing_frame = frame;
  662. bool frame_complete;
  663. if (aws_h2_encode_frame(&connection->thread_data.encoder, frame, output, &frame_complete)) {
  664. CONNECTION_LOGF(
  665. ERROR,
  666. connection,
  667. "Error encoding frame: type=%s stream=%" PRIu32 " error=%s",
  668. aws_h2_frame_type_to_str(frame->type),
  669. frame->stream_id,
  670. aws_error_name(aws_last_error()));
  671. return AWS_OP_ERR;
  672. }
  673. if (!frame_complete) {
  674. if (output->len == 0) {
  675. /* We're in trouble if an empty message isn't big enough for this frame to do any work with */
  676. CONNECTION_LOGF(
  677. ERROR,
  678. connection,
  679. "Message is too small for encoder. frame-type=%s stream=%" PRIu32 " available-space=%zu",
  680. aws_h2_frame_type_to_str(frame->type),
  681. frame->stream_id,
  682. output->capacity);
  683. aws_raise_error(AWS_ERROR_INVALID_STATE);
  684. return AWS_OP_ERR;
  685. }
  686. CONNECTION_LOG(TRACE, connection, "Outgoing frames task filled message, and has more frames to send later");
  687. break;
  688. }
  689. /* Done encoding frame, pop from queue and cleanup*/
  690. aws_linked_list_remove(frame_node);
  691. aws_h2_frame_destroy(frame);
  692. connection->thread_data.current_outgoing_frame = NULL;
  693. }
  694. return AWS_OP_SUCCESS;
  695. }
  696. /* Write as many DATA frames from outgoing_streams_list as possible. */
  697. static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connection, struct aws_byte_buf *output) {
  698. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  699. struct aws_linked_list *outgoing_streams_list = &connection->thread_data.outgoing_streams_list;
  700. if (aws_linked_list_empty(outgoing_streams_list)) {
  701. return AWS_OP_SUCCESS;
  702. }
  703. struct aws_linked_list *stalled_window_streams_list = &connection->thread_data.stalled_window_streams_list;
  704. struct aws_linked_list *waiting_streams_list = &connection->thread_data.waiting_streams_list;
  705. /* If a stream stalls, put it in this list until the function ends so we don't keep trying to read from it.
  706. * We put it back at the end of function. */
  707. struct aws_linked_list stalled_streams_list;
  708. aws_linked_list_init(&stalled_streams_list);
  709. int aws_error_code = 0;
  710. /* We simply round-robin through streams, instead of using stream priority.
  711. * Respecting priority is not required (RFC-7540 5.3), so we're ignoring it for now. This also keeps use safe
  712. * from priority DOS attacks: https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-9513 */
  713. while (!aws_linked_list_empty(outgoing_streams_list)) {
  714. if (connection->thread_data.window_size_peer <= AWS_H2_MIN_WINDOW_SIZE) {
  715. CONNECTION_LOGF(
  716. DEBUG,
  717. connection,
  718. "Peer connection's flow-control window is too small now %zu. Connection will stop sending DATA until "
  719. "WINDOW_UPDATE is received.",
  720. connection->thread_data.window_size_peer);
  721. goto done;
  722. }
  723. /* Stop looping if message is so full it's not worth the bother */
  724. size_t space_available = output->capacity - output->len;
  725. size_t worth_trying_threshold = AWS_H2_FRAME_PREFIX_SIZE * 2;
  726. if (space_available < worth_trying_threshold) {
  727. CONNECTION_LOG(TRACE, connection, "Outgoing frames task filled message, and has more frames to send later");
  728. goto done;
  729. }
  730. struct aws_linked_list_node *node = aws_linked_list_pop_front(outgoing_streams_list);
  731. struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node);
  732. /* Ask stream to encode a data frame.
  733. * Stream may complete itself as a result of encoding its data,
  734. * in which case it will vanish from the connection's datastructures as a side-effect of this call.
  735. * But if stream has more data to send, push it back into the appropriate list. */
  736. int data_encode_status;
  737. if (aws_h2_stream_encode_data_frame(stream, &connection->thread_data.encoder, output, &data_encode_status)) {
  738. aws_error_code = aws_last_error();
  739. CONNECTION_LOGF(
  740. ERROR,
  741. connection,
  742. "Connection error while encoding DATA on stream %" PRIu32 ", %s",
  743. stream->base.id,
  744. aws_error_name(aws_error_code));
  745. goto done;
  746. }
  747. /* If stream has more data, push it into the appropriate list. */
  748. switch (data_encode_status) {
  749. case AWS_H2_DATA_ENCODE_COMPLETE:
  750. break;
  751. case AWS_H2_DATA_ENCODE_ONGOING:
  752. aws_linked_list_push_back(outgoing_streams_list, node);
  753. break;
  754. case AWS_H2_DATA_ENCODE_ONGOING_BODY_STREAM_STALLED:
  755. aws_linked_list_push_back(&stalled_streams_list, node);
  756. break;
  757. case AWS_H2_DATA_ENCODE_ONGOING_WAITING_FOR_WRITES:
  758. stream->thread_data.waiting_for_writes = true;
  759. aws_linked_list_push_back(waiting_streams_list, node);
  760. break;
  761. case AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED:
  762. aws_linked_list_push_back(stalled_window_streams_list, node);
  763. AWS_H2_STREAM_LOG(
  764. DEBUG,
  765. stream,
  766. "Peer stream's flow-control window is too small. Data frames on this stream will not be sent until "
  767. "WINDOW_UPDATE. ");
  768. break;
  769. default:
  770. CONNECTION_LOG(ERROR, connection, "Data encode status is invalid.");
  771. aws_error_code = AWS_ERROR_INVALID_STATE;
  772. }
  773. }
  774. done:
  775. /* Return any stalled streams to outgoing_streams_list */
  776. while (!aws_linked_list_empty(&stalled_streams_list)) {
  777. aws_linked_list_push_back(outgoing_streams_list, aws_linked_list_pop_front(&stalled_streams_list));
  778. }
  779. if (aws_error_code) {
  780. return aws_raise_error(aws_error_code);
  781. }
  782. if (aws_linked_list_empty(outgoing_streams_list)) {
  783. /* transition from something to write -> nothing to write */
  784. uint64_t now_ns = 0;
  785. aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns);
  786. s_add_time_measurement_to_stats(
  787. connection->thread_data.outgoing_timestamp_ns,
  788. now_ns,
  789. &connection->thread_data.stats.pending_outgoing_stream_ms);
  790. }
  791. return AWS_OP_SUCCESS;
  792. }
  793. /* If the outgoing-frames-task isn't scheduled, run it immediately. */
  794. void aws_h2_try_write_outgoing_frames(struct aws_h2_connection *connection) {
  795. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  796. if (connection->thread_data.is_outgoing_frames_task_active) {
  797. return;
  798. }
  799. connection->thread_data.is_outgoing_frames_task_active = true;
  800. s_write_outgoing_frames(connection, true /*first_try*/);
  801. }
  802. /**
  803. * Returns successfully and sets `out_stream` if stream is currently active.
  804. * Returns successfully and sets `out_stream` to NULL if the frame should be ignored.
  805. * Returns failed aws_h2err if it is a connection error to receive this frame.
  806. */
  807. struct aws_h2err s_get_active_stream_for_incoming_frame(
  808. struct aws_h2_connection *connection,
  809. uint32_t stream_id,
  810. enum aws_h2_frame_type frame_type,
  811. struct aws_h2_stream **out_stream) {
  812. *out_stream = NULL;
  813. /* Check active streams */
  814. struct aws_hash_element *found = NULL;
  815. const void *stream_id_key = (void *)(size_t)stream_id;
  816. aws_hash_table_find(&connection->thread_data.active_streams_map, stream_id_key, &found);
  817. if (found) {
  818. /* Found it! return */
  819. *out_stream = found->value;
  820. return AWS_H2ERR_SUCCESS;
  821. }
  822. bool client_initiated = (stream_id % 2) == 1;
  823. bool self_initiated_stream = client_initiated && (connection->base.client_data != NULL);
  824. bool peer_initiated_stream = !self_initiated_stream;
  825. if ((self_initiated_stream && stream_id >= connection->base.next_stream_id) ||
  826. (peer_initiated_stream && stream_id > connection->thread_data.latest_peer_initiated_stream_id)) {
  827. /* Illegal to receive frames for a stream in the idle state (stream doesn't exist yet)
  828. * (except server receiving HEADERS to start a stream, but that's handled elsewhere) */
  829. CONNECTION_LOGF(
  830. ERROR,
  831. connection,
  832. "Illegal to receive %s frame on stream id=%" PRIu32 " state=IDLE",
  833. aws_h2_frame_type_to_str(frame_type),
  834. stream_id);
  835. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
  836. }
  837. if (peer_initiated_stream && stream_id > connection->thread_data.goaway_sent_last_stream_id) {
  838. /* Once GOAWAY sent, ignore frames for peer-initiated streams whose id > last-stream-id */
  839. CONNECTION_LOGF(
  840. TRACE,
  841. connection,
  842. "Ignoring %s frame on stream id=%" PRIu32 " because GOAWAY sent with last-stream-id=%" PRIu32,
  843. aws_h2_frame_type_to_str(frame_type),
  844. stream_id,
  845. connection->thread_data.goaway_sent_last_stream_id);
  846. return AWS_H2ERR_SUCCESS;
  847. }
  848. void *cached_value = NULL;
  849. /* Stream is closed, check whether it's legal for a few more frames to trickle in */
  850. if (aws_cache_find(connection->thread_data.closed_streams, stream_id_key, &cached_value)) {
  851. return aws_h2err_from_last_error();
  852. }
  853. if (cached_value) {
  854. if (frame_type == AWS_H2_FRAME_T_PRIORITY) {
  855. /* If we support PRIORITY, do something here. Right now just ignore it */
  856. return AWS_H2ERR_SUCCESS;
  857. }
  858. enum aws_h2_stream_closed_when closed_when = (enum aws_h2_stream_closed_when)(size_t)cached_value;
  859. switch (closed_when) {
  860. case AWS_H2_STREAM_CLOSED_WHEN_BOTH_SIDES_END_STREAM:
  861. /* WINDOW_UPDATE or RST_STREAM frames can be received ... for a short period after
  862. * a DATA or HEADERS frame containing an END_STREAM flag is sent.
  863. * Endpoints MUST ignore WINDOW_UPDATE or RST_STREAM frames received in this state */
  864. if (frame_type == AWS_H2_FRAME_T_WINDOW_UPDATE || frame_type == AWS_H2_FRAME_T_RST_STREAM) {
  865. CONNECTION_LOGF(
  866. TRACE,
  867. connection,
  868. "Ignoring %s frame on stream id=%" PRIu32 " because END_STREAM flag was recently sent.",
  869. aws_h2_frame_type_to_str(frame_type),
  870. stream_id);
  871. return AWS_H2ERR_SUCCESS;
  872. } else {
  873. CONNECTION_LOGF(
  874. ERROR,
  875. connection,
  876. "Illegal to receive %s frame on stream id=%" PRIu32 " after END_STREAM has been received.",
  877. aws_h2_frame_type_to_str(frame_type),
  878. stream_id);
  879. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_STREAM_CLOSED);
  880. }
  881. break;
  882. case AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_RECEIVED:
  883. /* An endpoint that receives any frame other than PRIORITY after receiving a RST_STREAM
  884. * MUST treat that as a stream error (Section 5.4.2) of type STREAM_CLOSED */
  885. CONNECTION_LOGF(
  886. ERROR,
  887. connection,
  888. "Illegal to receive %s frame on stream id=%" PRIu32 " after RST_STREAM has been received",
  889. aws_h2_frame_type_to_str(frame_type),
  890. stream_id);
  891. struct aws_h2_frame *rst_stream =
  892. aws_h2_frame_new_rst_stream(connection->base.alloc, stream_id, AWS_HTTP2_ERR_STREAM_CLOSED);
  893. if (!rst_stream) {
  894. CONNECTION_LOGF(
  895. ERROR, connection, "Error creating RST_STREAM frame, %s", aws_error_name(aws_last_error()));
  896. return aws_h2err_from_last_error();
  897. }
  898. aws_h2_connection_enqueue_outgoing_frame(connection, rst_stream);
  899. return AWS_H2ERR_SUCCESS;
  900. case AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_SENT:
  901. /* An endpoint MUST ignore frames that it receives on closed streams after it has sent a RST_STREAM
  902. * frame */
  903. CONNECTION_LOGF(
  904. TRACE,
  905. connection,
  906. "Ignoring %s frame on stream id=%" PRIu32 " because RST_STREAM was recently sent.",
  907. aws_h2_frame_type_to_str(frame_type),
  908. stream_id);
  909. return AWS_H2ERR_SUCCESS;
  910. break;
  911. default:
  912. CONNECTION_LOGF(
  913. ERROR, connection, "Invalid state fo cached closed stream, stream id=%" PRIu32, stream_id);
  914. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_INTERNAL_ERROR);
  915. break;
  916. }
  917. }
  918. if (frame_type == AWS_H2_FRAME_T_PRIORITY) {
  919. /* ignored if the stream has been removed from the dependency tree */
  920. return AWS_H2ERR_SUCCESS;
  921. }
  922. /* Stream closed (purged from closed_streams, or implicitly closed when its ID was skipped) */
  923. CONNECTION_LOGF(
  924. ERROR,
  925. connection,
  926. "Illegal to receive %s frame on stream id=%" PRIu32
  927. ", no memory of closed stream (ID skipped, or removed from cache)",
  928. aws_h2_frame_type_to_str(frame_type),
  929. stream_id);
  930. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
  931. }
  932. /* Decoder callbacks */
  933. struct aws_h2err s_decoder_on_headers_begin(uint32_t stream_id, void *userdata) {
  934. struct aws_h2_connection *connection = userdata;
  935. if (connection->base.server_data) {
  936. /* Server would create new request-handler stream... */
  937. return aws_h2err_from_aws_code(AWS_ERROR_UNIMPLEMENTED);
  938. }
  939. struct aws_h2_stream *stream;
  940. struct aws_h2err err =
  941. s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_HEADERS, &stream);
  942. if (aws_h2err_failed(err)) {
  943. return err;
  944. }
  945. if (stream) {
  946. err = aws_h2_stream_on_decoder_headers_begin(stream);
  947. if (aws_h2err_failed(err)) {
  948. return err;
  949. }
  950. }
  951. return AWS_H2ERR_SUCCESS;
  952. }
  953. struct aws_h2err s_decoder_on_headers_i(
  954. uint32_t stream_id,
  955. const struct aws_http_header *header,
  956. enum aws_http_header_name name_enum,
  957. enum aws_http_header_block block_type,
  958. void *userdata) {
  959. struct aws_h2_connection *connection = userdata;
  960. struct aws_h2_stream *stream;
  961. struct aws_h2err err =
  962. s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_HEADERS, &stream);
  963. if (aws_h2err_failed(err)) {
  964. return err;
  965. }
  966. if (stream) {
  967. err = aws_h2_stream_on_decoder_headers_i(stream, header, name_enum, block_type);
  968. if (aws_h2err_failed(err)) {
  969. return err;
  970. }
  971. }
  972. return AWS_H2ERR_SUCCESS;
  973. }
  974. struct aws_h2err s_decoder_on_headers_end(
  975. uint32_t stream_id,
  976. bool malformed,
  977. enum aws_http_header_block block_type,
  978. void *userdata) {
  979. struct aws_h2_connection *connection = userdata;
  980. struct aws_h2_stream *stream;
  981. struct aws_h2err err =
  982. s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_HEADERS, &stream);
  983. if (aws_h2err_failed(err)) {
  984. return err;
  985. }
  986. if (stream) {
  987. err = aws_h2_stream_on_decoder_headers_end(stream, malformed, block_type);
  988. if (aws_h2err_failed(err)) {
  989. return err;
  990. }
  991. }
  992. return AWS_H2ERR_SUCCESS;
  993. }
  994. struct aws_h2err s_decoder_on_push_promise(uint32_t stream_id, uint32_t promised_stream_id, void *userdata) {
  995. struct aws_h2_connection *connection = userdata;
  996. AWS_ASSERT(connection->base.client_data); /* decoder has already enforced this */
  997. AWS_ASSERT(promised_stream_id % 2 == 0); /* decoder has already enforced this */
  998. /* The identifier of a newly established stream MUST be numerically greater
  999. * than all streams that the initiating endpoint has opened or reserved (RFC-7540 5.1.1) */
  1000. if (promised_stream_id <= connection->thread_data.latest_peer_initiated_stream_id) {
  1001. CONNECTION_LOGF(
  1002. ERROR,
  1003. connection,
  1004. "Newly promised stream ID %" PRIu32 " must be higher than previously established ID %" PRIu32,
  1005. promised_stream_id,
  1006. connection->thread_data.latest_peer_initiated_stream_id);
  1007. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
  1008. }
  1009. connection->thread_data.latest_peer_initiated_stream_id = promised_stream_id;
  1010. /* If we ever fully support PUSH_PROMISE, this is where we'd add the
  1011. * promised_stream_id to some reserved_streams datastructure */
  1012. struct aws_h2_stream *stream;
  1013. struct aws_h2err err =
  1014. s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_PUSH_PROMISE, &stream);
  1015. if (aws_h2err_failed(err)) {
  1016. return err;
  1017. }
  1018. if (stream) {
  1019. err = aws_h2_stream_on_decoder_push_promise(stream, promised_stream_id);
  1020. if (aws_h2err_failed(err)) {
  1021. return err;
  1022. }
  1023. }
  1024. return AWS_H2ERR_SUCCESS;
  1025. }
  1026. static int s_connection_send_update_window(struct aws_h2_connection *connection, uint32_t window_size) {
  1027. struct aws_h2_frame *connection_window_update_frame =
  1028. aws_h2_frame_new_window_update(connection->base.alloc, 0, window_size);
  1029. if (!connection_window_update_frame) {
  1030. CONNECTION_LOGF(
  1031. ERROR,
  1032. connection,
  1033. "WINDOW_UPDATE frame on connection failed to be sent, error %s",
  1034. aws_error_name(aws_last_error()));
  1035. return AWS_OP_ERR;
  1036. }
  1037. aws_h2_connection_enqueue_outgoing_frame(connection, connection_window_update_frame);
  1038. connection->thread_data.window_size_self += window_size;
  1039. return AWS_OP_SUCCESS;
  1040. }
  1041. struct aws_h2err s_decoder_on_data_begin(
  1042. uint32_t stream_id,
  1043. uint32_t payload_len,
  1044. uint32_t total_padding_bytes,
  1045. bool end_stream,
  1046. void *userdata) {
  1047. struct aws_h2_connection *connection = userdata;
  1048. /* A receiver that receives a flow-controlled frame MUST always account for its contribution against the connection
  1049. * flow-control window, unless the receiver treats this as a connection error */
  1050. if (aws_sub_size_checked(
  1051. connection->thread_data.window_size_self, payload_len, &connection->thread_data.window_size_self)) {
  1052. CONNECTION_LOGF(
  1053. ERROR,
  1054. connection,
  1055. "DATA length %" PRIu32 " exceeds flow-control window %zu",
  1056. payload_len,
  1057. connection->thread_data.window_size_self);
  1058. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_FLOW_CONTROL_ERROR);
  1059. }
  1060. struct aws_h2_stream *stream;
  1061. struct aws_h2err err = s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_DATA, &stream);
  1062. if (aws_h2err_failed(err)) {
  1063. return err;
  1064. }
  1065. if (stream) {
  1066. err = aws_h2_stream_on_decoder_data_begin(stream, payload_len, total_padding_bytes, end_stream);
  1067. if (aws_h2err_failed(err)) {
  1068. return err;
  1069. }
  1070. }
  1071. /* Handle automatic updates of the connection flow window */
  1072. uint32_t auto_window_update;
  1073. if (connection->conn_manual_window_management) {
  1074. /* Automatically update the flow-window to account for padding, even though "manual window management"
  1075. * is enabled. We do this because the current API doesn't have any way to inform the user about padding,
  1076. * so we can't expect them to manage it themselves. */
  1077. auto_window_update = total_padding_bytes;
  1078. } else {
  1079. /* Automatically update the full amount we just received */
  1080. auto_window_update = payload_len;
  1081. }
  1082. if (auto_window_update != 0) {
  1083. if (s_connection_send_update_window(connection, auto_window_update)) {
  1084. return aws_h2err_from_last_error();
  1085. }
  1086. CONNECTION_LOGF(
  1087. TRACE,
  1088. connection,
  1089. "Automatically updating connection window by %" PRIu32 "(%" PRIu32 " due to padding).",
  1090. auto_window_update,
  1091. total_padding_bytes);
  1092. }
  1093. return AWS_H2ERR_SUCCESS;
  1094. }
  1095. struct aws_h2err s_decoder_on_data_i(uint32_t stream_id, struct aws_byte_cursor data, void *userdata) {
  1096. struct aws_h2_connection *connection = userdata;
  1097. /* Pass data to stream */
  1098. struct aws_h2_stream *stream;
  1099. struct aws_h2err err = s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_DATA, &stream);
  1100. if (aws_h2err_failed(err)) {
  1101. return err;
  1102. }
  1103. if (stream) {
  1104. err = aws_h2_stream_on_decoder_data_i(stream, data);
  1105. if (aws_h2err_failed(err)) {
  1106. return err;
  1107. }
  1108. }
  1109. return AWS_H2ERR_SUCCESS;
  1110. }
  1111. struct aws_h2err s_decoder_on_end_stream(uint32_t stream_id, void *userdata) {
  1112. struct aws_h2_connection *connection = userdata;
  1113. /* Not calling s_get_active_stream_for_incoming_frame() here because END_STREAM
  1114. * isn't an actual frame type. It's a flag on DATA or HEADERS frames, and we
  1115. * already checked the legality of those frames in their respective callbacks. */
  1116. struct aws_hash_element *found = NULL;
  1117. aws_hash_table_find(&connection->thread_data.active_streams_map, (void *)(size_t)stream_id, &found);
  1118. if (found) {
  1119. struct aws_h2_stream *stream = found->value;
  1120. struct aws_h2err err = aws_h2_stream_on_decoder_end_stream(stream);
  1121. if (aws_h2err_failed(err)) {
  1122. return err;
  1123. }
  1124. }
  1125. return AWS_H2ERR_SUCCESS;
  1126. }
  1127. static struct aws_h2err s_decoder_on_rst_stream(uint32_t stream_id, uint32_t h2_error_code, void *userdata) {
  1128. struct aws_h2_connection *connection = userdata;
  1129. /* Pass RST_STREAM to stream */
  1130. struct aws_h2_stream *stream;
  1131. struct aws_h2err err =
  1132. s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_RST_STREAM, &stream);
  1133. if (aws_h2err_failed(err)) {
  1134. return err;
  1135. }
  1136. if (stream) {
  1137. err = aws_h2_stream_on_decoder_rst_stream(stream, h2_error_code);
  1138. if (aws_h2err_failed(err)) {
  1139. return err;
  1140. }
  1141. }
  1142. return AWS_H2ERR_SUCCESS;
  1143. }
  1144. static struct aws_h2err s_decoder_on_ping_ack(uint8_t opaque_data[AWS_HTTP2_PING_DATA_SIZE], void *userdata) {
  1145. struct aws_h2_connection *connection = userdata;
  1146. if (aws_linked_list_empty(&connection->thread_data.pending_ping_queue)) {
  1147. CONNECTION_LOG(ERROR, connection, "Received extraneous PING ACK.");
  1148. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
  1149. }
  1150. struct aws_h2err err;
  1151. struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->thread_data.pending_ping_queue);
  1152. struct aws_h2_pending_ping *pending_ping = AWS_CONTAINER_OF(node, struct aws_h2_pending_ping, node);
  1153. /* Check the payload */
  1154. if (!aws_array_eq(opaque_data, AWS_HTTP2_PING_DATA_SIZE, pending_ping->opaque_data, AWS_HTTP2_PING_DATA_SIZE)) {
  1155. CONNECTION_LOG(ERROR, connection, "Received PING ACK with mismatched opaque-data.");
  1156. err = aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
  1157. goto error;
  1158. }
  1159. uint64_t time_stamp;
  1160. if (aws_high_res_clock_get_ticks(&time_stamp)) {
  1161. CONNECTION_LOGF(
  1162. ERROR,
  1163. connection,
  1164. "Failed getting the time stamp when PING ACK received, error %s",
  1165. aws_error_name(aws_last_error()));
  1166. err = aws_h2err_from_last_error();
  1167. goto error;
  1168. }
  1169. uint64_t rtt;
  1170. if (aws_sub_u64_checked(time_stamp, pending_ping->started_time, &rtt)) {
  1171. CONNECTION_LOGF(
  1172. ERROR,
  1173. connection,
  1174. "Overflow from time stamp when PING ACK received, error %s",
  1175. aws_error_name(aws_last_error()));
  1176. err = aws_h2err_from_last_error();
  1177. goto error;
  1178. }
  1179. CONNECTION_LOGF(TRACE, connection, "Round trip time is %lf ms, approximately", (double)rtt / 1000000);
  1180. /* fire the callback */
  1181. if (pending_ping->on_completed) {
  1182. pending_ping->on_completed(&connection->base, rtt, AWS_ERROR_SUCCESS, pending_ping->user_data);
  1183. }
  1184. aws_mem_release(connection->base.alloc, pending_ping);
  1185. return AWS_H2ERR_SUCCESS;
  1186. error:
  1187. if (pending_ping->on_completed) {
  1188. pending_ping->on_completed(&connection->base, 0 /* fake rtt */, err.aws_code, pending_ping->user_data);
  1189. }
  1190. aws_mem_release(connection->base.alloc, pending_ping);
  1191. return err;
  1192. }
  1193. static struct aws_h2err s_decoder_on_ping(uint8_t opaque_data[AWS_HTTP2_PING_DATA_SIZE], void *userdata) {
  1194. struct aws_h2_connection *connection = userdata;
  1195. /* send a PING frame with the ACK flag set in response, with an identical payload. */
  1196. struct aws_h2_frame *ping_ack_frame = aws_h2_frame_new_ping(connection->base.alloc, true, opaque_data);
  1197. if (!ping_ack_frame) {
  1198. CONNECTION_LOGF(
  1199. ERROR, connection, "Ping ACK frame failed to be sent, error %s", aws_error_name(aws_last_error()));
  1200. return aws_h2err_from_last_error();
  1201. }
  1202. aws_h2_connection_enqueue_outgoing_frame(connection, ping_ack_frame);
  1203. return AWS_H2ERR_SUCCESS;
  1204. }
  1205. static struct aws_h2err s_decoder_on_settings(
  1206. const struct aws_http2_setting *settings_array,
  1207. size_t num_settings,
  1208. void *userdata) {
  1209. struct aws_h2_connection *connection = userdata;
  1210. struct aws_h2err err;
  1211. /* Once all values have been processed, the recipient MUST immediately emit a SETTINGS frame with the ACK flag
  1212. * set.(RFC-7540 6.5.3) */
  1213. CONNECTION_LOG(TRACE, connection, "Setting frame processing ends");
  1214. struct aws_h2_frame *settings_ack_frame = aws_h2_frame_new_settings(connection->base.alloc, NULL, 0, true);
  1215. if (!settings_ack_frame) {
  1216. CONNECTION_LOGF(
  1217. ERROR, connection, "Settings ACK frame failed to be sent, error %s", aws_error_name(aws_last_error()));
  1218. return aws_h2err_from_last_error();
  1219. }
  1220. aws_h2_connection_enqueue_outgoing_frame(connection, settings_ack_frame);
  1221. /* Allocate a block of memory for settings_array in callback, which will only includes the settings we changed,
  1222. * freed once the callback finished */
  1223. struct aws_http2_setting *callback_array = NULL;
  1224. if (num_settings) {
  1225. callback_array = aws_mem_acquire(connection->base.alloc, num_settings * sizeof(struct aws_http2_setting));
  1226. if (!callback_array) {
  1227. return aws_h2err_from_last_error();
  1228. }
  1229. }
  1230. size_t callback_array_num = 0;
  1231. /* Apply the change to encoder and connection */
  1232. struct aws_h2_frame_encoder *encoder = &connection->thread_data.encoder;
  1233. for (size_t i = 0; i < num_settings; i++) {
  1234. if (connection->thread_data.settings_peer[settings_array[i].id] == settings_array[i].value) {
  1235. /* No change, don't do any work */
  1236. continue;
  1237. }
  1238. switch (settings_array[i].id) {
  1239. case AWS_HTTP2_SETTINGS_HEADER_TABLE_SIZE: {
  1240. aws_h2_frame_encoder_set_setting_header_table_size(encoder, settings_array[i].value);
  1241. } break;
  1242. case AWS_HTTP2_SETTINGS_INITIAL_WINDOW_SIZE: {
  1243. /* When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream
  1244. * flow-control windows that it maintains by the difference between the new value and the old value. */
  1245. int32_t size_changed =
  1246. settings_array[i].value - connection->thread_data.settings_peer[settings_array[i].id];
  1247. struct aws_hash_iter stream_iter = aws_hash_iter_begin(&connection->thread_data.active_streams_map);
  1248. while (!aws_hash_iter_done(&stream_iter)) {
  1249. struct aws_h2_stream *stream = stream_iter.element.value;
  1250. aws_hash_iter_next(&stream_iter);
  1251. err = aws_h2_stream_window_size_change(stream, size_changed, false /*self*/);
  1252. if (aws_h2err_failed(err)) {
  1253. CONNECTION_LOG(
  1254. ERROR,
  1255. connection,
  1256. "Connection error, change to SETTINGS_INITIAL_WINDOW_SIZE caused a stream's flow-control "
  1257. "window to exceed the maximum size");
  1258. goto error;
  1259. }
  1260. }
  1261. } break;
  1262. case AWS_HTTP2_SETTINGS_MAX_FRAME_SIZE: {
  1263. aws_h2_frame_encoder_set_setting_max_frame_size(encoder, settings_array[i].value);
  1264. } break;
  1265. default:
  1266. break;
  1267. }
  1268. connection->thread_data.settings_peer[settings_array[i].id] = settings_array[i].value;
  1269. callback_array[callback_array_num++] = settings_array[i];
  1270. }
  1271. if (connection->on_remote_settings_change) {
  1272. connection->on_remote_settings_change(
  1273. &connection->base, callback_array, callback_array_num, connection->base.user_data);
  1274. }
  1275. { /* BEGIN CRITICAL SECTION */
  1276. s_lock_synced_data(connection);
  1277. memcpy(
  1278. connection->synced_data.settings_peer,
  1279. connection->thread_data.settings_peer,
  1280. sizeof(connection->thread_data.settings_peer));
  1281. s_unlock_synced_data(connection);
  1282. } /* END CRITICAL SECTION */
  1283. aws_mem_release(connection->base.alloc, callback_array);
  1284. return AWS_H2ERR_SUCCESS;
  1285. error:
  1286. aws_mem_release(connection->base.alloc, callback_array);
  1287. return err;
  1288. }
  1289. static struct aws_h2err s_decoder_on_settings_ack(void *userdata) {
  1290. struct aws_h2_connection *connection = userdata;
  1291. if (aws_linked_list_empty(&connection->thread_data.pending_settings_queue)) {
  1292. CONNECTION_LOG(ERROR, connection, "Received a malicious extra SETTINGS acknowledgment");
  1293. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
  1294. }
  1295. struct aws_h2err err;
  1296. struct aws_h2_pending_settings *pending_settings = NULL;
  1297. struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->thread_data.pending_settings_queue);
  1298. pending_settings = AWS_CONTAINER_OF(node, struct aws_h2_pending_settings, node);
  1299. struct aws_http2_setting *settings_array = pending_settings->settings_array;
  1300. /* Apply the settings */
  1301. struct aws_h2_decoder *decoder = connection->thread_data.decoder;
  1302. for (size_t i = 0; i < pending_settings->num_settings; i++) {
  1303. if (connection->thread_data.settings_self[settings_array[i].id] == settings_array[i].value) {
  1304. /* No change, don't do any work */
  1305. continue;
  1306. }
  1307. switch (settings_array[i].id) {
  1308. case AWS_HTTP2_SETTINGS_HEADER_TABLE_SIZE: {
  1309. aws_h2_decoder_set_setting_header_table_size(decoder, settings_array[i].value);
  1310. } break;
  1311. case AWS_HTTP2_SETTINGS_ENABLE_PUSH: {
  1312. aws_h2_decoder_set_setting_enable_push(decoder, settings_array[i].value);
  1313. } break;
  1314. case AWS_HTTP2_SETTINGS_INITIAL_WINDOW_SIZE: {
  1315. /* When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream
  1316. * flow-control windows that it maintains by the difference between the new value and the old value. */
  1317. int32_t size_changed =
  1318. settings_array[i].value - connection->thread_data.settings_self[settings_array[i].id];
  1319. struct aws_hash_iter stream_iter = aws_hash_iter_begin(&connection->thread_data.active_streams_map);
  1320. while (!aws_hash_iter_done(&stream_iter)) {
  1321. struct aws_h2_stream *stream = stream_iter.element.value;
  1322. aws_hash_iter_next(&stream_iter);
  1323. err = aws_h2_stream_window_size_change(stream, size_changed, true /*self*/);
  1324. if (aws_h2err_failed(err)) {
  1325. CONNECTION_LOG(
  1326. ERROR,
  1327. connection,
  1328. "Connection error, change to SETTINGS_INITIAL_WINDOW_SIZE from internal caused a stream's "
  1329. "flow-control window to exceed the maximum size");
  1330. goto error;
  1331. }
  1332. }
  1333. } break;
  1334. case AWS_HTTP2_SETTINGS_MAX_FRAME_SIZE: {
  1335. aws_h2_decoder_set_setting_max_frame_size(decoder, settings_array[i].value);
  1336. } break;
  1337. default:
  1338. break;
  1339. }
  1340. connection->thread_data.settings_self[settings_array[i].id] = settings_array[i].value;
  1341. }
  1342. /* invoke the change settings compeleted user callback */
  1343. if (pending_settings->on_completed) {
  1344. pending_settings->on_completed(&connection->base, AWS_ERROR_SUCCESS, pending_settings->user_data);
  1345. }
  1346. { /* BEGIN CRITICAL SECTION */
  1347. s_lock_synced_data(connection);
  1348. memcpy(
  1349. connection->synced_data.settings_self,
  1350. connection->thread_data.settings_self,
  1351. sizeof(connection->thread_data.settings_self));
  1352. s_unlock_synced_data(connection);
  1353. } /* END CRITICAL SECTION */
  1354. /* clean up the pending_settings */
  1355. aws_mem_release(connection->base.alloc, pending_settings);
  1356. return AWS_H2ERR_SUCCESS;
  1357. error:
  1358. /* invoke the user callback with error code */
  1359. if (pending_settings->on_completed) {
  1360. pending_settings->on_completed(&connection->base, err.aws_code, pending_settings->user_data);
  1361. }
  1362. /* clean up the pending settings here */
  1363. aws_mem_release(connection->base.alloc, pending_settings);
  1364. return err;
  1365. }
  1366. static struct aws_h2err s_decoder_on_window_update(uint32_t stream_id, uint32_t window_size_increment, void *userdata) {
  1367. struct aws_h2_connection *connection = userdata;
  1368. if (stream_id == 0) {
  1369. /* Let's update the connection flow-control window size */
  1370. if (window_size_increment == 0) {
  1371. /* flow-control window increment of 0 MUST be treated as error (RFC7540 6.9.1) */
  1372. CONNECTION_LOG(ERROR, connection, "Window update frame with 0 increment size");
  1373. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
  1374. }
  1375. if (connection->thread_data.window_size_peer + window_size_increment > AWS_H2_WINDOW_UPDATE_MAX) {
  1376. /* We MUST NOT allow a flow-control window to exceed the max */
  1377. CONNECTION_LOG(
  1378. ERROR,
  1379. connection,
  1380. "Window update frame causes the connection flow-control window exceeding the maximum size");
  1381. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_FLOW_CONTROL_ERROR);
  1382. }
  1383. if (connection->thread_data.window_size_peer <= AWS_H2_MIN_WINDOW_SIZE) {
  1384. CONNECTION_LOGF(
  1385. DEBUG,
  1386. connection,
  1387. "Peer connection's flow-control window is resumed from too small to %" PRIu32
  1388. ". Connection will resume sending DATA.",
  1389. window_size_increment);
  1390. }
  1391. connection->thread_data.window_size_peer += window_size_increment;
  1392. return AWS_H2ERR_SUCCESS;
  1393. } else {
  1394. /* Update the flow-control window size for stream */
  1395. struct aws_h2_stream *stream;
  1396. bool window_resume;
  1397. struct aws_h2err err =
  1398. s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_WINDOW_UPDATE, &stream);
  1399. if (aws_h2err_failed(err)) {
  1400. return err;
  1401. }
  1402. if (stream) {
  1403. err = aws_h2_stream_on_decoder_window_update(stream, window_size_increment, &window_resume);
  1404. if (aws_h2err_failed(err)) {
  1405. return err;
  1406. }
  1407. if (window_resume) {
  1408. /* Set the stream free from stalled list */
  1409. AWS_H2_STREAM_LOGF(
  1410. DEBUG,
  1411. stream,
  1412. "Peer stream's flow-control window is resumed from 0 or negative to %" PRIu32
  1413. " Stream will resume sending data.",
  1414. stream->thread_data.window_size_peer);
  1415. aws_linked_list_remove(&stream->node);
  1416. aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node);
  1417. }
  1418. }
  1419. }
  1420. return AWS_H2ERR_SUCCESS;
  1421. }
  1422. struct aws_h2err s_decoder_on_goaway(
  1423. uint32_t last_stream,
  1424. uint32_t error_code,
  1425. struct aws_byte_cursor debug_data,
  1426. void *userdata) {
  1427. struct aws_h2_connection *connection = userdata;
  1428. if (last_stream > connection->thread_data.goaway_received_last_stream_id) {
  1429. CONNECTION_LOGF(
  1430. ERROR,
  1431. connection,
  1432. "Received GOAWAY with invalid last-stream-id=%" PRIu32 ", must not exceed previous last-stream-id=%" PRIu32,
  1433. last_stream,
  1434. connection->thread_data.goaway_received_last_stream_id);
  1435. return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
  1436. }
  1437. /* stop sending any new stream and making new request */
  1438. { /* BEGIN CRITICAL SECTION */
  1439. s_lock_synced_data(connection);
  1440. connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_GOAWAY_RECEIVED;
  1441. connection->synced_data.goaway_received_last_stream_id = last_stream;
  1442. connection->synced_data.goaway_received_http2_error_code = error_code;
  1443. s_unlock_synced_data(connection);
  1444. } /* END CRITICAL SECTION */
  1445. connection->thread_data.goaway_received_last_stream_id = last_stream;
  1446. CONNECTION_LOGF(
  1447. DEBUG,
  1448. connection,
  1449. "Received GOAWAY error-code=%s(0x%x) last-stream-id=%" PRIu32,
  1450. aws_http2_error_code_to_str(error_code),
  1451. error_code,
  1452. last_stream);
  1453. /* Complete activated streams whose id is higher than last_stream, since they will not process by peer. We should
  1454. * treat them as they had never been created at all.
  1455. * This would be more efficient if we could iterate streams in reverse-id order */
  1456. struct aws_hash_iter stream_iter = aws_hash_iter_begin(&connection->thread_data.active_streams_map);
  1457. while (!aws_hash_iter_done(&stream_iter)) {
  1458. struct aws_h2_stream *stream = stream_iter.element.value;
  1459. aws_hash_iter_next(&stream_iter);
  1460. if (stream->base.id > last_stream) {
  1461. AWS_H2_STREAM_LOG(
  1462. DEBUG,
  1463. stream,
  1464. "stream ID is higher than GOAWAY last stream ID, please retry this stream on a new connection.");
  1465. s_stream_complete(connection, stream, AWS_ERROR_HTTP_GOAWAY_RECEIVED);
  1466. }
  1467. }
  1468. if (connection->on_goaway_received) {
  1469. /* Inform user about goaway received and the error code. */
  1470. connection->on_goaway_received(
  1471. &connection->base, last_stream, error_code, debug_data, connection->base.user_data);
  1472. }
  1473. return AWS_H2ERR_SUCCESS;
  1474. }
  1475. /* End decoder callbacks */
  1476. static int s_send_connection_preface_client_string(struct aws_h2_connection *connection) {
  1477. /* Just send the magic string on its own aws_io_message. */
  1478. struct aws_io_message *msg = aws_channel_acquire_message_from_pool(
  1479. connection->base.channel_slot->channel,
  1480. AWS_IO_MESSAGE_APPLICATION_DATA,
  1481. aws_h2_connection_preface_client_string.len);
  1482. if (!msg) {
  1483. goto error;
  1484. }
  1485. if (!aws_byte_buf_write_from_whole_cursor(&msg->message_data, aws_h2_connection_preface_client_string)) {
  1486. aws_raise_error(AWS_ERROR_INVALID_STATE);
  1487. goto error;
  1488. }
  1489. if (aws_channel_slot_send_message(connection->base.channel_slot, msg, AWS_CHANNEL_DIR_WRITE)) {
  1490. goto error;
  1491. }
  1492. return AWS_OP_SUCCESS;
  1493. error:
  1494. if (msg) {
  1495. aws_mem_release(msg->allocator, msg);
  1496. }
  1497. return AWS_OP_ERR;
  1498. }
  1499. static void s_handler_installed(struct aws_channel_handler *handler, struct aws_channel_slot *slot) {
  1500. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(slot->channel));
  1501. struct aws_h2_connection *connection = handler->impl;
  1502. connection->base.channel_slot = slot;
  1503. /* Acquire a hold on the channel to prevent its destruction until the user has
  1504. * given the go-ahead via aws_http_connection_release() */
  1505. aws_channel_acquire_hold(slot->channel);
  1506. /* Send HTTP/2 connection preface (RFC-7540 3.5)
  1507. * - clients must send magic string
  1508. * - both client and server must send SETTINGS frame */
  1509. if (connection->base.client_data) {
  1510. if (s_send_connection_preface_client_string(connection)) {
  1511. CONNECTION_LOGF(
  1512. ERROR,
  1513. connection,
  1514. "Failed to send client connection preface string, %s",
  1515. aws_error_name(aws_last_error()));
  1516. goto error;
  1517. }
  1518. }
  1519. struct aws_h2_pending_settings *init_pending_settings = connection->thread_data.init_pending_settings;
  1520. aws_linked_list_push_back(&connection->thread_data.pending_settings_queue, &init_pending_settings->node);
  1521. connection->thread_data.init_pending_settings = NULL;
  1522. /* Set user_data here, the user_data is valid now */
  1523. init_pending_settings->user_data = connection->base.user_data;
  1524. struct aws_h2_frame *init_settings_frame = aws_h2_frame_new_settings(
  1525. connection->base.alloc,
  1526. init_pending_settings->settings_array,
  1527. init_pending_settings->num_settings,
  1528. false /*ACK*/);
  1529. if (!init_settings_frame) {
  1530. CONNECTION_LOGF(
  1531. ERROR,
  1532. connection,
  1533. "Failed to create the initial settings frame, error %s",
  1534. aws_error_name(aws_last_error()));
  1535. aws_mem_release(connection->base.alloc, init_pending_settings);
  1536. goto error;
  1537. }
  1538. /* enqueue the initial settings frame here */
  1539. aws_linked_list_push_back(&connection->thread_data.outgoing_frames_queue, &init_settings_frame->node);
  1540. /* If not manual connection window management, update the connection window to max. */
  1541. if (!connection->conn_manual_window_management) {
  1542. uint32_t initial_window_update_size = AWS_H2_WINDOW_UPDATE_MAX - AWS_H2_INIT_WINDOW_SIZE;
  1543. struct aws_h2_frame *connection_window_update_frame =
  1544. aws_h2_frame_new_window_update(connection->base.alloc, 0 /* stream_id */, initial_window_update_size);
  1545. AWS_ASSERT(connection_window_update_frame);
  1546. /* enqueue the windows update frame here */
  1547. aws_linked_list_push_back(
  1548. &connection->thread_data.outgoing_frames_queue, &connection_window_update_frame->node);
  1549. connection->thread_data.window_size_self += initial_window_update_size;
  1550. }
  1551. aws_h2_try_write_outgoing_frames(connection);
  1552. return;
  1553. error:
  1554. aws_h2_connection_shutdown_due_to_write_err(connection, aws_last_error());
  1555. }
  1556. static void s_stream_complete(struct aws_h2_connection *connection, struct aws_h2_stream *stream, int error_code) {
  1557. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  1558. /* Nice logging */
  1559. if (error_code) {
  1560. AWS_H2_STREAM_LOGF(
  1561. ERROR, stream, "Stream completed with error %d (%s).", error_code, aws_error_name(error_code));
  1562. } else if (stream->base.client_data) {
  1563. int status = stream->base.client_data->response_status;
  1564. AWS_H2_STREAM_LOGF(
  1565. DEBUG, stream, "Client stream complete, response status %d (%s)", status, aws_http_status_text(status));
  1566. } else {
  1567. AWS_H2_STREAM_LOG(DEBUG, stream, "Server stream complete");
  1568. }
  1569. /* Remove stream from active_streams_map and outgoing_stream_list (if it was in them at all) */
  1570. aws_hash_table_remove(&connection->thread_data.active_streams_map, (void *)(size_t)stream->base.id, NULL, NULL);
  1571. if (stream->node.next) {
  1572. aws_linked_list_remove(&stream->node);
  1573. }
  1574. if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0 &&
  1575. connection->thread_data.incoming_timestamp_ns != 0) {
  1576. uint64_t now_ns = 0;
  1577. aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns);
  1578. /* transition from something to read -> nothing to read and nothing to write */
  1579. s_add_time_measurement_to_stats(
  1580. connection->thread_data.incoming_timestamp_ns,
  1581. now_ns,
  1582. &connection->thread_data.stats.pending_incoming_stream_ms);
  1583. connection->thread_data.stats.was_inactive = true;
  1584. connection->thread_data.incoming_timestamp_ns = 0;
  1585. }
  1586. aws_h2_stream_complete(stream, error_code);
  1587. /* release connection's hold on stream */
  1588. aws_http_stream_release(&stream->base);
  1589. }
  1590. int aws_h2_connection_on_stream_closed(
  1591. struct aws_h2_connection *connection,
  1592. struct aws_h2_stream *stream,
  1593. enum aws_h2_stream_closed_when closed_when,
  1594. int aws_error_code) {
  1595. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  1596. AWS_PRECONDITION(stream->thread_data.state == AWS_H2_STREAM_STATE_CLOSED);
  1597. AWS_PRECONDITION(stream->base.id != 0);
  1598. uint32_t stream_id = stream->base.id;
  1599. /* Mark stream complete. This removes the stream from any "active" datastructures,
  1600. * invokes its completion callback, and releases its refcount. */
  1601. s_stream_complete(connection, stream, aws_error_code);
  1602. stream = NULL; /* Reference released, do not touch again */
  1603. if (s_record_closed_stream(connection, stream_id, closed_when)) {
  1604. return AWS_OP_ERR;
  1605. }
  1606. return AWS_OP_SUCCESS;
  1607. }
  1608. static int s_record_closed_stream(
  1609. struct aws_h2_connection *connection,
  1610. uint32_t stream_id,
  1611. enum aws_h2_stream_closed_when closed_when) {
  1612. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  1613. if (aws_cache_put(connection->thread_data.closed_streams, (void *)(size_t)stream_id, (void *)(size_t)closed_when)) {
  1614. CONNECTION_LOG(ERROR, connection, "Failed inserting ID into cache of recently closed streams");
  1615. return AWS_OP_ERR;
  1616. }
  1617. return AWS_OP_SUCCESS;
  1618. }
  1619. int aws_h2_connection_send_rst_and_close_reserved_stream(
  1620. struct aws_h2_connection *connection,
  1621. uint32_t stream_id,
  1622. uint32_t h2_error_code) {
  1623. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  1624. struct aws_h2_frame *rst_stream = aws_h2_frame_new_rst_stream(connection->base.alloc, stream_id, h2_error_code);
  1625. if (!rst_stream) {
  1626. CONNECTION_LOGF(ERROR, connection, "Error creating RST_STREAM frame, %s", aws_error_name(aws_last_error()));
  1627. return AWS_OP_ERR;
  1628. }
  1629. aws_h2_connection_enqueue_outgoing_frame(connection, rst_stream);
  1630. /* If we ever fully support PUSH_PROMISE, this is where we'd remove the
  1631. * promised_stream_id from some reserved_streams datastructure */
  1632. return s_record_closed_stream(connection, stream_id, AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_SENT);
  1633. }
  1634. /* Move stream into "active" datastructures and notify stream that it can send frames now */
  1635. static void s_move_stream_to_thread(
  1636. struct aws_h2_connection *connection,
  1637. struct aws_h2_stream *stream,
  1638. int new_stream_error_code) {
  1639. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  1640. if (new_stream_error_code) {
  1641. aws_raise_error(new_stream_error_code);
  1642. AWS_H2_STREAM_LOGF(
  1643. ERROR,
  1644. stream,
  1645. "Failed activating stream, error %d (%s)",
  1646. aws_last_error(),
  1647. aws_error_name(aws_last_error()));
  1648. goto error;
  1649. }
  1650. uint32_t max_concurrent_streams = connection->thread_data.settings_peer[AWS_HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS];
  1651. if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) >= max_concurrent_streams) {
  1652. AWS_H2_STREAM_LOG(ERROR, stream, "Failed activating stream, max concurrent streams are reached");
  1653. aws_raise_error(AWS_ERROR_HTTP_MAX_CONCURRENT_STREAMS_EXCEEDED);
  1654. goto error;
  1655. }
  1656. if (aws_hash_table_put(
  1657. &connection->thread_data.active_streams_map, (void *)(size_t)stream->base.id, stream, NULL)) {
  1658. AWS_H2_STREAM_LOG(ERROR, stream, "Failed inserting stream into map");
  1659. goto error;
  1660. }
  1661. enum aws_h2_stream_body_state body_state = AWS_H2_STREAM_BODY_STATE_NONE;
  1662. if (aws_h2_stream_on_activated(stream, &body_state)) {
  1663. goto error;
  1664. }
  1665. if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 1) {
  1666. /* transition from nothing to read -> something to read */
  1667. uint64_t now_ns = 0;
  1668. aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns);
  1669. connection->thread_data.incoming_timestamp_ns = now_ns;
  1670. }
  1671. switch (body_state) {
  1672. case AWS_H2_STREAM_BODY_STATE_WAITING_WRITES:
  1673. aws_linked_list_push_back(&connection->thread_data.waiting_streams_list, &stream->node);
  1674. break;
  1675. case AWS_H2_STREAM_BODY_STATE_ONGOING:
  1676. aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node);
  1677. break;
  1678. default:
  1679. break;
  1680. }
  1681. return;
  1682. error:
  1683. /* If the stream got into any datastructures, s_stream_complete() will remove it */
  1684. s_stream_complete(connection, stream, aws_last_error());
  1685. }
  1686. /* Perform on-thread work that is triggered by calls to the connection/stream API */
  1687. static void s_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  1688. (void)task;
  1689. if (status != AWS_TASK_STATUS_RUN_READY) {
  1690. return;
  1691. }
  1692. struct aws_h2_connection *connection = arg;
  1693. struct aws_linked_list pending_frames;
  1694. aws_linked_list_init(&pending_frames);
  1695. struct aws_linked_list pending_streams;
  1696. aws_linked_list_init(&pending_streams);
  1697. struct aws_linked_list pending_settings;
  1698. aws_linked_list_init(&pending_settings);
  1699. struct aws_linked_list pending_ping;
  1700. aws_linked_list_init(&pending_ping);
  1701. struct aws_linked_list pending_goaway;
  1702. aws_linked_list_init(&pending_goaway);
  1703. size_t window_update_size;
  1704. int new_stream_error_code;
  1705. { /* BEGIN CRITICAL SECTION */
  1706. s_lock_synced_data(connection);
  1707. connection->synced_data.is_cross_thread_work_task_scheduled = false;
  1708. aws_linked_list_swap_contents(&connection->synced_data.pending_frame_list, &pending_frames);
  1709. aws_linked_list_swap_contents(&connection->synced_data.pending_stream_list, &pending_streams);
  1710. aws_linked_list_swap_contents(&connection->synced_data.pending_settings_list, &pending_settings);
  1711. aws_linked_list_swap_contents(&connection->synced_data.pending_ping_list, &pending_ping);
  1712. aws_linked_list_swap_contents(&connection->synced_data.pending_goaway_list, &pending_goaway);
  1713. window_update_size = connection->synced_data.window_update_size;
  1714. connection->synced_data.window_update_size = 0;
  1715. new_stream_error_code = connection->synced_data.new_stream_error_code;
  1716. s_unlock_synced_data(connection);
  1717. } /* END CRITICAL SECTION */
  1718. /* Enqueue new pending control frames */
  1719. while (!aws_linked_list_empty(&pending_frames)) {
  1720. struct aws_linked_list_node *node = aws_linked_list_pop_front(&pending_frames);
  1721. struct aws_h2_frame *frame = AWS_CONTAINER_OF(node, struct aws_h2_frame, node);
  1722. aws_h2_connection_enqueue_outgoing_frame(connection, frame);
  1723. }
  1724. /* We already enqueued the window_update frame, just apply the change and let our peer check this value, no matter
  1725. * overflow happens or not. Peer will detect it for us. */
  1726. connection->thread_data.window_size_self =
  1727. aws_add_size_saturating(connection->thread_data.window_size_self, window_update_size);
  1728. /* Process new pending_streams */
  1729. while (!aws_linked_list_empty(&pending_streams)) {
  1730. struct aws_linked_list_node *node = aws_linked_list_pop_front(&pending_streams);
  1731. struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node);
  1732. s_move_stream_to_thread(connection, stream, new_stream_error_code);
  1733. }
  1734. /* Move pending settings to thread data */
  1735. while (!aws_linked_list_empty(&pending_settings)) {
  1736. aws_linked_list_push_back(
  1737. &connection->thread_data.pending_settings_queue, aws_linked_list_pop_front(&pending_settings));
  1738. }
  1739. /* Move pending PING to thread data */
  1740. while (!aws_linked_list_empty(&pending_ping)) {
  1741. aws_linked_list_push_back(
  1742. &connection->thread_data.pending_ping_queue, aws_linked_list_pop_front(&pending_ping));
  1743. }
  1744. /* Send user requested goaways */
  1745. while (!aws_linked_list_empty(&pending_goaway)) {
  1746. struct aws_linked_list_node *node = aws_linked_list_pop_front(&pending_goaway);
  1747. struct aws_h2_pending_goaway *goaway = AWS_CONTAINER_OF(node, struct aws_h2_pending_goaway, node);
  1748. s_send_goaway(connection, goaway->http2_error, goaway->allow_more_streams, &goaway->debug_data);
  1749. aws_mem_release(connection->base.alloc, goaway);
  1750. }
  1751. /* It's likely that frames were queued while processing cross-thread work.
  1752. * If so, try writing them now */
  1753. aws_h2_try_write_outgoing_frames(connection);
  1754. }
  1755. int aws_h2_stream_activate(struct aws_http_stream *stream) {
  1756. struct aws_h2_stream *h2_stream = AWS_CONTAINER_OF(stream, struct aws_h2_stream, base);
  1757. struct aws_http_connection *base_connection = stream->owning_connection;
  1758. struct aws_h2_connection *connection = AWS_CONTAINER_OF(base_connection, struct aws_h2_connection, base);
  1759. int err;
  1760. bool was_cross_thread_work_scheduled = false;
  1761. { /* BEGIN CRITICAL SECTION */
  1762. s_acquire_stream_and_connection_lock(h2_stream, connection);
  1763. if (stream->id) {
  1764. /* stream has already been activated. */
  1765. s_release_stream_and_connection_lock(h2_stream, connection);
  1766. return AWS_OP_SUCCESS;
  1767. }
  1768. err = connection->synced_data.new_stream_error_code;
  1769. if (err) {
  1770. s_release_stream_and_connection_lock(h2_stream, connection);
  1771. goto error;
  1772. }
  1773. stream->id = aws_http_connection_get_next_stream_id(base_connection);
  1774. if (stream->id) {
  1775. /* success */
  1776. was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled;
  1777. connection->synced_data.is_cross_thread_work_task_scheduled = true;
  1778. aws_linked_list_push_back(&connection->synced_data.pending_stream_list, &h2_stream->node);
  1779. h2_stream->synced_data.api_state = AWS_H2_STREAM_API_STATE_ACTIVE;
  1780. }
  1781. s_release_stream_and_connection_lock(h2_stream, connection);
  1782. } /* END CRITICAL SECTION */
  1783. if (!stream->id) {
  1784. /* aws_http_connection_get_next_stream_id() raises its own error. */
  1785. return AWS_OP_ERR;
  1786. }
  1787. /* connection keeps activated stream alive until stream completes */
  1788. aws_atomic_fetch_add(&stream->refcount, 1);
  1789. if (!was_cross_thread_work_scheduled) {
  1790. CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
  1791. aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
  1792. }
  1793. return AWS_OP_SUCCESS;
  1794. error:
  1795. CONNECTION_LOGF(
  1796. ERROR,
  1797. connection,
  1798. "Failed to activate the stream id=%p, new streams are not allowed now. error %d (%s)",
  1799. (void *)stream,
  1800. err,
  1801. aws_error_name(err));
  1802. return aws_raise_error(err);
  1803. }
  1804. static struct aws_http_stream *s_connection_make_request(
  1805. struct aws_http_connection *client_connection,
  1806. const struct aws_http_make_request_options *options) {
  1807. struct aws_h2_connection *connection = AWS_CONTAINER_OF(client_connection, struct aws_h2_connection, base);
  1808. /* #TODO: http/2-ify the request (ex: add ":method" header). Should we mutate a copy or the original? Validate?
  1809. * Or just pass pointer to headers struct and let encoder transform it while encoding? */
  1810. struct aws_h2_stream *stream = aws_h2_stream_new_request(client_connection, options);
  1811. if (!stream) {
  1812. CONNECTION_LOGF(
  1813. ERROR,
  1814. connection,
  1815. "Failed to create stream, error %d (%s)",
  1816. aws_last_error(),
  1817. aws_error_name(aws_last_error()));
  1818. return NULL;
  1819. }
  1820. int new_stream_error_code;
  1821. { /* BEGIN CRITICAL SECTION */
  1822. s_lock_synced_data(connection);
  1823. new_stream_error_code = connection->synced_data.new_stream_error_code;
  1824. s_unlock_synced_data(connection);
  1825. } /* END CRITICAL SECTION */
  1826. if (new_stream_error_code) {
  1827. aws_raise_error(new_stream_error_code);
  1828. CONNECTION_LOGF(
  1829. ERROR,
  1830. connection,
  1831. "Cannot create request stream, error %d (%s)",
  1832. aws_last_error(),
  1833. aws_error_name(aws_last_error()));
  1834. goto error;
  1835. }
  1836. AWS_H2_STREAM_LOG(DEBUG, stream, "Created HTTP/2 request stream"); /* #TODO: print method & path */
  1837. return &stream->base;
  1838. error:
  1839. /* Force destruction of the stream, avoiding ref counting */
  1840. stream->base.vtable->destroy(&stream->base);
  1841. return NULL;
  1842. }
  1843. static void s_connection_close(struct aws_http_connection *connection_base) {
  1844. struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
  1845. /* Don't stop reading/writing immediately, let that happen naturally during the channel shutdown process. */
  1846. s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, AWS_ERROR_SUCCESS);
  1847. }
  1848. static void s_connection_stop_new_request(struct aws_http_connection *connection_base) {
  1849. struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
  1850. { /* BEGIN CRITICAL SECTION */
  1851. s_lock_synced_data(connection);
  1852. if (!connection->synced_data.new_stream_error_code) {
  1853. connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
  1854. }
  1855. s_unlock_synced_data(connection);
  1856. } /* END CRITICAL SECTION */
  1857. }
  1858. static bool s_connection_is_open(const struct aws_http_connection *connection_base) {
  1859. struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
  1860. bool is_open;
  1861. { /* BEGIN CRITICAL SECTION */
  1862. s_lock_synced_data(connection);
  1863. is_open = connection->synced_data.is_open;
  1864. s_unlock_synced_data(connection);
  1865. } /* END CRITICAL SECTION */
  1866. return is_open;
  1867. }
  1868. static bool s_connection_new_requests_allowed(const struct aws_http_connection *connection_base) {
  1869. struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
  1870. int new_stream_error_code;
  1871. { /* BEGIN CRITICAL SECTION */
  1872. s_lock_synced_data(connection);
  1873. new_stream_error_code = connection->synced_data.new_stream_error_code;
  1874. s_unlock_synced_data(connection);
  1875. } /* END CRITICAL SECTION */
  1876. return new_stream_error_code == 0;
  1877. }
  1878. static void s_connection_update_window(struct aws_http_connection *connection_base, uint32_t increment_size) {
  1879. struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
  1880. if (!increment_size) {
  1881. /* Silently do nothing. */
  1882. return;
  1883. }
  1884. if (!connection->conn_manual_window_management) {
  1885. /* auto-mode, manual update window is not supported, silently do nothing with warning log. */
  1886. CONNECTION_LOG(
  1887. DEBUG,
  1888. connection,
  1889. "Connection manual window management is off, update window operations are not supported.");
  1890. return;
  1891. }
  1892. struct aws_h2_frame *connection_window_update_frame =
  1893. aws_h2_frame_new_window_update(connection->base.alloc, 0, increment_size);
  1894. if (!connection_window_update_frame) {
  1895. CONNECTION_LOGF(
  1896. ERROR,
  1897. connection,
  1898. "Failed to create WINDOW_UPDATE frame on connection, error %s",
  1899. aws_error_name(aws_last_error()));
  1900. /* OOM should result in a crash. And the increment size is too huge is the only other failure case, which will
  1901. * result in overflow. */
  1902. goto overflow;
  1903. }
  1904. int err = 0;
  1905. bool cross_thread_work_should_schedule = false;
  1906. bool connection_open = false;
  1907. size_t sum_size = 0;
  1908. { /* BEGIN CRITICAL SECTION */
  1909. s_lock_synced_data(connection);
  1910. err |= aws_add_size_checked(connection->synced_data.window_update_size, increment_size, &sum_size);
  1911. err |= sum_size > AWS_H2_WINDOW_UPDATE_MAX;
  1912. connection_open = connection->synced_data.is_open;
  1913. if (!err && connection_open) {
  1914. cross_thread_work_should_schedule = !connection->synced_data.is_cross_thread_work_task_scheduled;
  1915. connection->synced_data.is_cross_thread_work_task_scheduled = true;
  1916. aws_linked_list_push_back(
  1917. &connection->synced_data.pending_frame_list, &connection_window_update_frame->node);
  1918. connection->synced_data.window_update_size = sum_size;
  1919. }
  1920. s_unlock_synced_data(connection);
  1921. } /* END CRITICAL SECTION */
  1922. if (err) {
  1923. CONNECTION_LOG(
  1924. ERROR,
  1925. connection,
  1926. "The connection's flow-control windows has been incremented beyond 2**31 -1, the max for HTTP/2. The ");
  1927. aws_h2_frame_destroy(connection_window_update_frame);
  1928. goto overflow;
  1929. }
  1930. if (cross_thread_work_should_schedule) {
  1931. CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
  1932. aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
  1933. }
  1934. if (!connection_open) {
  1935. /* connection already closed, just do nothing */
  1936. aws_h2_frame_destroy(connection_window_update_frame);
  1937. return;
  1938. }
  1939. CONNECTION_LOGF(
  1940. TRACE,
  1941. connection,
  1942. "User requested to update the HTTP/2 connection's flow-control windows by %" PRIu32 ".",
  1943. increment_size);
  1944. return;
  1945. overflow:
  1946. /* Shutdown the connection as overflow detected */
  1947. s_stop(
  1948. connection,
  1949. false /*stop_reading*/,
  1950. false /*stop_writing*/,
  1951. true /*schedule_shutdown*/,
  1952. AWS_ERROR_OVERFLOW_DETECTED);
  1953. }
  1954. static int s_connection_change_settings(
  1955. struct aws_http_connection *connection_base,
  1956. const struct aws_http2_setting *settings_array,
  1957. size_t num_settings,
  1958. aws_http2_on_change_settings_complete_fn *on_completed,
  1959. void *user_data) {
  1960. struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
  1961. if (!settings_array && num_settings) {
  1962. CONNECTION_LOG(ERROR, connection, "Settings_array is NULL and num_settings is not zero.");
  1963. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  1964. }
  1965. struct aws_h2_pending_settings *pending_settings =
  1966. s_new_pending_settings(connection->base.alloc, settings_array, num_settings, on_completed, user_data);
  1967. if (!pending_settings) {
  1968. return AWS_OP_ERR;
  1969. }
  1970. struct aws_h2_frame *settings_frame =
  1971. aws_h2_frame_new_settings(connection->base.alloc, settings_array, num_settings, false /*ACK*/);
  1972. if (!settings_frame) {
  1973. CONNECTION_LOGF(
  1974. ERROR, connection, "Failed to create settings frame, error %s", aws_error_name(aws_last_error()));
  1975. aws_mem_release(connection->base.alloc, pending_settings);
  1976. return AWS_OP_ERR;
  1977. }
  1978. bool was_cross_thread_work_scheduled = false;
  1979. bool connection_open;
  1980. { /* BEGIN CRITICAL SECTION */
  1981. s_lock_synced_data(connection);
  1982. connection_open = connection->synced_data.is_open;
  1983. if (!connection_open) {
  1984. s_unlock_synced_data(connection);
  1985. goto closed;
  1986. }
  1987. was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled;
  1988. connection->synced_data.is_cross_thread_work_task_scheduled = true;
  1989. aws_linked_list_push_back(&connection->synced_data.pending_frame_list, &settings_frame->node);
  1990. aws_linked_list_push_back(&connection->synced_data.pending_settings_list, &pending_settings->node);
  1991. s_unlock_synced_data(connection);
  1992. } /* END CRITICAL SECTION */
  1993. if (!was_cross_thread_work_scheduled) {
  1994. CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
  1995. aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
  1996. }
  1997. return AWS_OP_SUCCESS;
  1998. closed:
  1999. CONNECTION_LOG(ERROR, connection, "Failed to change settings, connection is closed or closing.");
  2000. aws_h2_frame_destroy(settings_frame);
  2001. aws_mem_release(connection->base.alloc, pending_settings);
  2002. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  2003. }
  2004. static int s_connection_send_ping(
  2005. struct aws_http_connection *connection_base,
  2006. const struct aws_byte_cursor *optional_opaque_data,
  2007. aws_http2_on_ping_complete_fn *on_completed,
  2008. void *user_data) {
  2009. struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
  2010. if (optional_opaque_data && optional_opaque_data->len != 8) {
  2011. CONNECTION_LOG(ERROR, connection, "Only 8 bytes opaque data supported for PING in HTTP/2");
  2012. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  2013. }
  2014. uint64_t time_stamp;
  2015. if (aws_high_res_clock_get_ticks(&time_stamp)) {
  2016. CONNECTION_LOGF(
  2017. ERROR,
  2018. connection,
  2019. "Failed getting the time stamp to start PING, error %s",
  2020. aws_error_name(aws_last_error()));
  2021. return AWS_OP_ERR;
  2022. }
  2023. struct aws_h2_pending_ping *pending_ping =
  2024. s_new_pending_ping(connection->base.alloc, optional_opaque_data, time_stamp, user_data, on_completed);
  2025. if (!pending_ping) {
  2026. return AWS_OP_ERR;
  2027. }
  2028. struct aws_h2_frame *ping_frame =
  2029. aws_h2_frame_new_ping(connection->base.alloc, false /*ACK*/, pending_ping->opaque_data);
  2030. if (!ping_frame) {
  2031. CONNECTION_LOGF(ERROR, connection, "Failed to create PING frame, error %s", aws_error_name(aws_last_error()));
  2032. aws_mem_release(connection->base.alloc, pending_ping);
  2033. return AWS_OP_ERR;
  2034. }
  2035. bool was_cross_thread_work_scheduled = false;
  2036. bool connection_open;
  2037. { /* BEGIN CRITICAL SECTION */
  2038. s_lock_synced_data(connection);
  2039. connection_open = connection->synced_data.is_open;
  2040. if (!connection_open) {
  2041. s_unlock_synced_data(connection);
  2042. goto closed;
  2043. }
  2044. was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled;
  2045. connection->synced_data.is_cross_thread_work_task_scheduled = true;
  2046. aws_linked_list_push_back(&connection->synced_data.pending_frame_list, &ping_frame->node);
  2047. aws_linked_list_push_back(&connection->synced_data.pending_ping_list, &pending_ping->node);
  2048. s_unlock_synced_data(connection);
  2049. } /* END CRITICAL SECTION */
  2050. if (!was_cross_thread_work_scheduled) {
  2051. CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
  2052. aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
  2053. }
  2054. return AWS_OP_SUCCESS;
  2055. closed:
  2056. CONNECTION_LOG(ERROR, connection, "Failed to send ping, connection is closed or closing.");
  2057. aws_h2_frame_destroy(ping_frame);
  2058. aws_mem_release(connection->base.alloc, pending_ping);
  2059. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  2060. }
  2061. static void s_connection_send_goaway(
  2062. struct aws_http_connection *connection_base,
  2063. uint32_t http2_error,
  2064. bool allow_more_streams,
  2065. const struct aws_byte_cursor *optional_debug_data) {
  2066. struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
  2067. struct aws_h2_pending_goaway *pending_goaway =
  2068. s_new_pending_goaway(connection->base.alloc, http2_error, allow_more_streams, optional_debug_data);
  2069. bool was_cross_thread_work_scheduled = false;
  2070. bool connection_open;
  2071. { /* BEGIN CRITICAL SECTION */
  2072. s_lock_synced_data(connection);
  2073. connection_open = connection->synced_data.is_open;
  2074. if (!connection_open) {
  2075. s_unlock_synced_data(connection);
  2076. CONNECTION_LOG(DEBUG, connection, "Goaway not sent, connection is closed or closing.");
  2077. aws_mem_release(connection->base.alloc, pending_goaway);
  2078. return;
  2079. }
  2080. was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled;
  2081. connection->synced_data.is_cross_thread_work_task_scheduled = true;
  2082. aws_linked_list_push_back(&connection->synced_data.pending_goaway_list, &pending_goaway->node);
  2083. s_unlock_synced_data(connection);
  2084. } /* END CRITICAL SECTION */
  2085. if (allow_more_streams && (http2_error != AWS_HTTP2_ERR_NO_ERROR)) {
  2086. CONNECTION_LOGF(
  2087. DEBUG,
  2088. connection,
  2089. "Send goaway with allow more streams on and non-zero error code %s(0x%x)",
  2090. aws_http2_error_code_to_str(http2_error),
  2091. http2_error);
  2092. }
  2093. if (!was_cross_thread_work_scheduled) {
  2094. CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
  2095. aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
  2096. }
  2097. }
  2098. static void s_get_settings_general(
  2099. const struct aws_http_connection *connection_base,
  2100. struct aws_http2_setting out_settings[AWS_HTTP2_SETTINGS_COUNT],
  2101. bool local) {
  2102. struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
  2103. uint32_t synced_settings[AWS_HTTP2_SETTINGS_END_RANGE];
  2104. { /* BEGIN CRITICAL SECTION */
  2105. s_lock_synced_data(connection);
  2106. if (local) {
  2107. memcpy(
  2108. synced_settings, connection->synced_data.settings_self, sizeof(connection->synced_data.settings_self));
  2109. } else {
  2110. memcpy(
  2111. synced_settings, connection->synced_data.settings_peer, sizeof(connection->synced_data.settings_peer));
  2112. }
  2113. s_unlock_synced_data(connection);
  2114. } /* END CRITICAL SECTION */
  2115. for (int i = AWS_HTTP2_SETTINGS_BEGIN_RANGE; i < AWS_HTTP2_SETTINGS_END_RANGE; i++) {
  2116. /* settings range begin with 1, store them into 0-based array of aws_http2_setting */
  2117. out_settings[i - 1].id = i;
  2118. out_settings[i - 1].value = synced_settings[i];
  2119. }
  2120. return;
  2121. }
  2122. static void s_connection_get_local_settings(
  2123. const struct aws_http_connection *connection_base,
  2124. struct aws_http2_setting out_settings[AWS_HTTP2_SETTINGS_COUNT]) {
  2125. s_get_settings_general(connection_base, out_settings, true /*local*/);
  2126. }
  2127. static void s_connection_get_remote_settings(
  2128. const struct aws_http_connection *connection_base,
  2129. struct aws_http2_setting out_settings[AWS_HTTP2_SETTINGS_COUNT]) {
  2130. s_get_settings_general(connection_base, out_settings, false /*local*/);
  2131. }
  2132. /* Send a GOAWAY with the lowest possible last-stream-id or graceful shutdown warning */
  2133. static void s_send_goaway(
  2134. struct aws_h2_connection *connection,
  2135. uint32_t h2_error_code,
  2136. bool allow_more_streams,
  2137. const struct aws_byte_cursor *optional_debug_data) {
  2138. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  2139. uint32_t last_stream_id = allow_more_streams ? AWS_H2_STREAM_ID_MAX
  2140. : aws_min_u32(
  2141. connection->thread_data.latest_peer_initiated_stream_id,
  2142. connection->thread_data.goaway_sent_last_stream_id);
  2143. if (last_stream_id > connection->thread_data.goaway_sent_last_stream_id) {
  2144. CONNECTION_LOG(
  2145. DEBUG,
  2146. connection,
  2147. "GOAWAY frame with lower last stream id has been sent, ignoring sending graceful shutdown warning.");
  2148. return;
  2149. }
  2150. struct aws_byte_cursor debug_data;
  2151. AWS_ZERO_STRUCT(debug_data);
  2152. if (optional_debug_data) {
  2153. debug_data = *optional_debug_data;
  2154. }
  2155. struct aws_h2_frame *goaway =
  2156. aws_h2_frame_new_goaway(connection->base.alloc, last_stream_id, h2_error_code, debug_data);
  2157. if (!goaway) {
  2158. CONNECTION_LOGF(ERROR, connection, "Error creating GOAWAY frame, %s", aws_error_name(aws_last_error()));
  2159. goto error;
  2160. }
  2161. connection->thread_data.goaway_sent_last_stream_id = last_stream_id;
  2162. { /* BEGIN CRITICAL SECTION */
  2163. s_lock_synced_data(connection);
  2164. connection->synced_data.goaway_sent_last_stream_id = last_stream_id;
  2165. connection->synced_data.goaway_sent_http2_error_code = h2_error_code;
  2166. s_unlock_synced_data(connection);
  2167. } /* END CRITICAL SECTION */
  2168. aws_h2_connection_enqueue_outgoing_frame(connection, goaway);
  2169. return;
  2170. error:
  2171. aws_h2_connection_shutdown_due_to_write_err(connection, aws_last_error());
  2172. }
  2173. static int s_connection_get_sent_goaway(
  2174. struct aws_http_connection *connection_base,
  2175. uint32_t *out_http2_error,
  2176. uint32_t *out_last_stream_id) {
  2177. struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
  2178. uint32_t sent_last_stream_id;
  2179. uint32_t sent_http2_error;
  2180. { /* BEGIN CRITICAL SECTION */
  2181. s_lock_synced_data(connection);
  2182. sent_last_stream_id = connection->synced_data.goaway_sent_last_stream_id;
  2183. sent_http2_error = connection->synced_data.goaway_sent_http2_error_code;
  2184. s_unlock_synced_data(connection);
  2185. } /* END CRITICAL SECTION */
  2186. uint32_t max_stream_id = AWS_H2_STREAM_ID_MAX;
  2187. if (sent_last_stream_id == max_stream_id + 1) {
  2188. CONNECTION_LOG(ERROR, connection, "No GOAWAY has been sent so far.");
  2189. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  2190. }
  2191. *out_http2_error = sent_http2_error;
  2192. *out_last_stream_id = sent_last_stream_id;
  2193. return AWS_OP_SUCCESS;
  2194. }
  2195. static int s_connection_get_received_goaway(
  2196. struct aws_http_connection *connection_base,
  2197. uint32_t *out_http2_error,
  2198. uint32_t *out_last_stream_id) {
  2199. struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
  2200. uint32_t received_last_stream_id = 0;
  2201. uint32_t received_http2_error = 0;
  2202. bool goaway_not_ready = false;
  2203. uint32_t max_stream_id = AWS_H2_STREAM_ID_MAX;
  2204. { /* BEGIN CRITICAL SECTION */
  2205. s_lock_synced_data(connection);
  2206. if (connection->synced_data.goaway_received_last_stream_id == max_stream_id + 1) {
  2207. goaway_not_ready = true;
  2208. } else {
  2209. received_last_stream_id = connection->synced_data.goaway_received_last_stream_id;
  2210. received_http2_error = connection->synced_data.goaway_received_http2_error_code;
  2211. }
  2212. s_unlock_synced_data(connection);
  2213. } /* END CRITICAL SECTION */
  2214. if (goaway_not_ready) {
  2215. CONNECTION_LOG(ERROR, connection, "No GOAWAY has been received so far.");
  2216. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  2217. }
  2218. *out_http2_error = received_http2_error;
  2219. *out_last_stream_id = received_last_stream_id;
  2220. return AWS_OP_SUCCESS;
  2221. }
  2222. static int s_handler_process_read_message(
  2223. struct aws_channel_handler *handler,
  2224. struct aws_channel_slot *slot,
  2225. struct aws_io_message *message) {
  2226. (void)slot;
  2227. struct aws_h2_connection *connection = handler->impl;
  2228. CONNECTION_LOGF(TRACE, connection, "Begin processing message of size %zu.", message->message_data.len);
  2229. if (connection->thread_data.is_reading_stopped) {
  2230. CONNECTION_LOG(ERROR, connection, "Cannot process message because connection is shutting down.");
  2231. goto clean_up;
  2232. }
  2233. /* Any error that bubbles up from the decoder or its callbacks is treated as
  2234. * a Connection Error (a GOAWAY frames is sent, and the connection is closed) */
  2235. struct aws_byte_cursor message_cursor = aws_byte_cursor_from_buf(&message->message_data);
  2236. struct aws_h2err err = aws_h2_decode(connection->thread_data.decoder, &message_cursor);
  2237. if (aws_h2err_failed(err)) {
  2238. CONNECTION_LOGF(
  2239. ERROR,
  2240. connection,
  2241. "Failure while receiving frames, %s. Sending GOAWAY %s(0x%x) and closing connection",
  2242. aws_error_name(err.aws_code),
  2243. aws_http2_error_code_to_str(err.h2_code),
  2244. err.h2_code);
  2245. goto shutdown;
  2246. }
  2247. /* HTTP/2 protocol uses WINDOW_UPDATE frames to coordinate data rates with peer,
  2248. * so we can just keep the aws_channel's read-window wide open */
  2249. if (aws_channel_slot_increment_read_window(slot, message->message_data.len)) {
  2250. CONNECTION_LOGF(
  2251. ERROR,
  2252. connection,
  2253. "Incrementing read window failed, error %d (%s). Closing connection",
  2254. aws_last_error(),
  2255. aws_error_name(aws_last_error()));
  2256. err = aws_h2err_from_last_error();
  2257. goto shutdown;
  2258. }
  2259. goto clean_up;
  2260. shutdown:
  2261. s_send_goaway(connection, err.h2_code, false /*allow_more_streams*/, NULL /*optional_debug_data*/);
  2262. aws_h2_try_write_outgoing_frames(connection);
  2263. s_stop(connection, true /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, err.aws_code);
  2264. clean_up:
  2265. aws_mem_release(message->allocator, message);
  2266. /* Flush any outgoing frames that might have been queued as a result of decoder callbacks. */
  2267. aws_h2_try_write_outgoing_frames(connection);
  2268. return AWS_OP_SUCCESS;
  2269. }
  2270. static int s_handler_process_write_message(
  2271. struct aws_channel_handler *handler,
  2272. struct aws_channel_slot *slot,
  2273. struct aws_io_message *message) {
  2274. (void)handler;
  2275. (void)slot;
  2276. (void)message;
  2277. return aws_raise_error(AWS_ERROR_UNIMPLEMENTED);
  2278. }
  2279. static int s_handler_increment_read_window(
  2280. struct aws_channel_handler *handler,
  2281. struct aws_channel_slot *slot,
  2282. size_t size) {
  2283. (void)handler;
  2284. (void)slot;
  2285. (void)size;
  2286. return aws_raise_error(AWS_ERROR_UNIMPLEMENTED);
  2287. }
  2288. static int s_handler_shutdown(
  2289. struct aws_channel_handler *handler,
  2290. struct aws_channel_slot *slot,
  2291. enum aws_channel_direction dir,
  2292. int error_code,
  2293. bool free_scarce_resources_immediately) {
  2294. struct aws_h2_connection *connection = handler->impl;
  2295. CONNECTION_LOGF(
  2296. TRACE,
  2297. connection,
  2298. "Channel shutting down in %s direction with error code %d (%s).",
  2299. (dir == AWS_CHANNEL_DIR_READ) ? "read" : "write",
  2300. error_code,
  2301. aws_error_name(error_code));
  2302. if (dir == AWS_CHANNEL_DIR_READ) {
  2303. /* This call ensures that no further streams will be created. */
  2304. s_stop(connection, true /*stop_reading*/, false /*stop_writing*/, false /*schedule_shutdown*/, error_code);
  2305. /* Send user requested GOAWAY, if they haven't been sent before. It's OK to access
  2306. * synced_data.pending_goaway_list without holding the lock because no more user_requested GOAWAY can be added
  2307. * after s_stop() has been invoked. */
  2308. if (!aws_linked_list_empty(&connection->synced_data.pending_goaway_list)) {
  2309. while (!aws_linked_list_empty(&connection->synced_data.pending_goaway_list)) {
  2310. struct aws_linked_list_node *node =
  2311. aws_linked_list_pop_front(&connection->synced_data.pending_goaway_list);
  2312. struct aws_h2_pending_goaway *goaway = AWS_CONTAINER_OF(node, struct aws_h2_pending_goaway, node);
  2313. s_send_goaway(connection, goaway->http2_error, goaway->allow_more_streams, &goaway->debug_data);
  2314. aws_mem_release(connection->base.alloc, goaway);
  2315. }
  2316. aws_h2_try_write_outgoing_frames(connection);
  2317. }
  2318. /* Send GOAWAY if none have been sent so far,
  2319. * or if we've only sent a "graceful shutdown warning" that didn't name a last-stream-id */
  2320. if (connection->thread_data.goaway_sent_last_stream_id == AWS_H2_STREAM_ID_MAX) {
  2321. s_send_goaway(
  2322. connection,
  2323. error_code ? AWS_HTTP2_ERR_INTERNAL_ERROR : AWS_HTTP2_ERR_NO_ERROR,
  2324. false /*allow_more_streams*/,
  2325. NULL /*optional_debug_data*/);
  2326. aws_h2_try_write_outgoing_frames(connection);
  2327. }
  2328. aws_channel_slot_on_handler_shutdown_complete(
  2329. slot, AWS_CHANNEL_DIR_READ, error_code, free_scarce_resources_immediately);
  2330. } else /* AWS_CHANNEL_DIR_WRITE */ {
  2331. connection->thread_data.channel_shutdown_error_code = error_code;
  2332. connection->thread_data.channel_shutdown_immediately = free_scarce_resources_immediately;
  2333. connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written = true;
  2334. /* We'd prefer to wait until we know GOAWAY has been written, but don't wait if... */
  2335. if (free_scarce_resources_immediately /* we must finish ASAP */ ||
  2336. connection->thread_data.is_writing_stopped /* write will never complete */ ||
  2337. !connection->thread_data.is_outgoing_frames_task_active /* write is already complete */) {
  2338. s_finish_shutdown(connection);
  2339. } else {
  2340. CONNECTION_LOG(TRACE, connection, "HTTP/2 handler will finish shutdown once GOAWAY frame is written");
  2341. }
  2342. }
  2343. return AWS_OP_SUCCESS;
  2344. }
  2345. static void s_finish_shutdown(struct aws_h2_connection *connection) {
  2346. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  2347. AWS_PRECONDITION(connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written);
  2348. CONNECTION_LOG(TRACE, connection, "Finishing HTTP/2 handler shutdown");
  2349. connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written = false;
  2350. s_stop(
  2351. connection,
  2352. false /*stop_reading*/,
  2353. true /*stop_writing*/,
  2354. false /*schedule_shutdown*/,
  2355. connection->thread_data.channel_shutdown_error_code);
  2356. /* Remove remaining streams from internal datastructures and mark them as complete. */
  2357. struct aws_hash_iter stream_iter = aws_hash_iter_begin(&connection->thread_data.active_streams_map);
  2358. while (!aws_hash_iter_done(&stream_iter)) {
  2359. struct aws_h2_stream *stream = stream_iter.element.value;
  2360. aws_hash_iter_delete(&stream_iter, true);
  2361. aws_hash_iter_next(&stream_iter);
  2362. s_stream_complete(connection, stream, AWS_ERROR_HTTP_CONNECTION_CLOSED);
  2363. }
  2364. /* It's OK to access synced_data without holding the lock because
  2365. * no more streams or user-requested control frames can be added after s_stop() has been invoked. */
  2366. while (!aws_linked_list_empty(&connection->synced_data.pending_stream_list)) {
  2367. struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->synced_data.pending_stream_list);
  2368. struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node);
  2369. s_stream_complete(connection, stream, AWS_ERROR_HTTP_CONNECTION_CLOSED);
  2370. }
  2371. while (!aws_linked_list_empty(&connection->synced_data.pending_frame_list)) {
  2372. struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->synced_data.pending_frame_list);
  2373. struct aws_h2_frame *frame = AWS_CONTAINER_OF(node, struct aws_h2_frame, node);
  2374. aws_h2_frame_destroy(frame);
  2375. }
  2376. /* invoke pending callbacks haven't moved into thread, and clean up the data */
  2377. while (!aws_linked_list_empty(&connection->synced_data.pending_settings_list)) {
  2378. struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->synced_data.pending_settings_list);
  2379. struct aws_h2_pending_settings *settings = AWS_CONTAINER_OF(node, struct aws_h2_pending_settings, node);
  2380. if (settings->on_completed) {
  2381. settings->on_completed(&connection->base, AWS_ERROR_HTTP_CONNECTION_CLOSED, settings->user_data);
  2382. }
  2383. aws_mem_release(connection->base.alloc, settings);
  2384. }
  2385. while (!aws_linked_list_empty(&connection->synced_data.pending_ping_list)) {
  2386. struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->synced_data.pending_ping_list);
  2387. struct aws_h2_pending_ping *ping = AWS_CONTAINER_OF(node, struct aws_h2_pending_ping, node);
  2388. if (ping->on_completed) {
  2389. ping->on_completed(&connection->base, 0 /*fake rtt*/, AWS_ERROR_HTTP_CONNECTION_CLOSED, ping->user_data);
  2390. }
  2391. aws_mem_release(connection->base.alloc, ping);
  2392. }
  2393. /* invoke pending callbacks moved into thread, and clean up the data */
  2394. while (!aws_linked_list_empty(&connection->thread_data.pending_settings_queue)) {
  2395. struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->thread_data.pending_settings_queue);
  2396. struct aws_h2_pending_settings *pending_settings = AWS_CONTAINER_OF(node, struct aws_h2_pending_settings, node);
  2397. /* fire the user callback with error */
  2398. if (pending_settings->on_completed) {
  2399. pending_settings->on_completed(
  2400. &connection->base, AWS_ERROR_HTTP_CONNECTION_CLOSED, pending_settings->user_data);
  2401. }
  2402. aws_mem_release(connection->base.alloc, pending_settings);
  2403. }
  2404. while (!aws_linked_list_empty(&connection->thread_data.pending_ping_queue)) {
  2405. struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->thread_data.pending_ping_queue);
  2406. struct aws_h2_pending_ping *pending_ping = AWS_CONTAINER_OF(node, struct aws_h2_pending_ping, node);
  2407. /* fire the user callback with error */
  2408. if (pending_ping->on_completed) {
  2409. pending_ping->on_completed(
  2410. &connection->base, 0 /*fake rtt*/, AWS_ERROR_HTTP_CONNECTION_CLOSED, pending_ping->user_data);
  2411. }
  2412. aws_mem_release(connection->base.alloc, pending_ping);
  2413. }
  2414. aws_channel_slot_on_handler_shutdown_complete(
  2415. connection->base.channel_slot,
  2416. AWS_CHANNEL_DIR_WRITE,
  2417. connection->thread_data.channel_shutdown_error_code,
  2418. connection->thread_data.channel_shutdown_immediately);
  2419. }
  2420. static size_t s_handler_initial_window_size(struct aws_channel_handler *handler) {
  2421. (void)handler;
  2422. /* HTTP/2 protocol uses WINDOW_UPDATE frames to coordinate data rates with peer,
  2423. * so we can just keep the aws_channel's read-window wide open */
  2424. return SIZE_MAX;
  2425. }
  2426. static size_t s_handler_message_overhead(struct aws_channel_handler *handler) {
  2427. (void)handler;
  2428. /* "All frames begin with a fixed 9-octet header followed by a variable-length payload" (RFC-7540 4.1) */
  2429. return 9;
  2430. }
  2431. static void s_reset_statistics(struct aws_channel_handler *handler) {
  2432. struct aws_h2_connection *connection = handler->impl;
  2433. aws_crt_statistics_http2_channel_reset(&connection->thread_data.stats);
  2434. if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0) {
  2435. /* Check the current state */
  2436. connection->thread_data.stats.was_inactive = true;
  2437. }
  2438. return;
  2439. }
  2440. static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_array_list *stats) {
  2441. struct aws_h2_connection *connection = handler->impl;
  2442. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  2443. /* TODO: Need update the way we calculate statistics, to account for user-controlled pauses.
  2444. * If user is adding chunks 1 by 1, there can naturally be a gap in the upload.
  2445. * If the user lets the stream-window go to zero, there can naturally be a gap in the download. */
  2446. uint64_t now_ns = 0;
  2447. if (aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns)) {
  2448. return;
  2449. }
  2450. if (!aws_linked_list_empty(&connection->thread_data.outgoing_streams_list)) {
  2451. s_add_time_measurement_to_stats(
  2452. connection->thread_data.outgoing_timestamp_ns,
  2453. now_ns,
  2454. &connection->thread_data.stats.pending_outgoing_stream_ms);
  2455. connection->thread_data.outgoing_timestamp_ns = now_ns;
  2456. }
  2457. if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) != 0) {
  2458. s_add_time_measurement_to_stats(
  2459. connection->thread_data.incoming_timestamp_ns,
  2460. now_ns,
  2461. &connection->thread_data.stats.pending_incoming_stream_ms);
  2462. connection->thread_data.incoming_timestamp_ns = now_ns;
  2463. } else {
  2464. connection->thread_data.stats.was_inactive = true;
  2465. }
  2466. void *stats_base = &connection->thread_data.stats;
  2467. aws_array_list_push_back(stats, &stats_base);
  2468. }