123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include <aws/http/private/h2_connection.h>
- #include <aws/http/private/h2_stream.h>
- #include <aws/http/private/h2_decoder.h>
- #include <aws/http/private/h2_stream.h>
- #include <aws/http/private/strutil.h>
- #include <aws/common/clock.h>
- #include <aws/common/logging.h>
- #ifdef _MSC_VER
- # pragma warning(disable : 4204) /* non-constant aggregate initializer */
- #endif
- /* Apple toolchains such as xcode and swiftpm define the DEBUG symbol. undef it here so we can actually use the token */
- #undef DEBUG
- #define CONNECTION_LOGF(level, connection, text, ...) \
- AWS_LOGF_##level(AWS_LS_HTTP_CONNECTION, "id=%p: " text, (void *)(connection), __VA_ARGS__)
- #define CONNECTION_LOG(level, connection, text) CONNECTION_LOGF(level, connection, "%s", text)
- static int s_handler_process_read_message(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- struct aws_io_message *message);
- static int s_handler_process_write_message(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- struct aws_io_message *message);
- static int s_handler_increment_read_window(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- size_t size);
- static int s_handler_shutdown(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- enum aws_channel_direction dir,
- int error_code,
- bool free_scarce_resources_immediately);
- static size_t s_handler_initial_window_size(struct aws_channel_handler *handler);
- static size_t s_handler_message_overhead(struct aws_channel_handler *handler);
- static void s_handler_destroy(struct aws_channel_handler *handler);
- static void s_handler_installed(struct aws_channel_handler *handler, struct aws_channel_slot *slot);
- static struct aws_http_stream *s_connection_make_request(
- struct aws_http_connection *client_connection,
- const struct aws_http_make_request_options *options);
- static void s_connection_close(struct aws_http_connection *connection_base);
- static void s_connection_stop_new_request(struct aws_http_connection *connection_base);
- static bool s_connection_is_open(const struct aws_http_connection *connection_base);
- static bool s_connection_new_requests_allowed(const struct aws_http_connection *connection_base);
- static void s_connection_update_window(struct aws_http_connection *connection_base, uint32_t increment_size);
- static int s_connection_change_settings(
- struct aws_http_connection *connection_base,
- const struct aws_http2_setting *settings_array,
- size_t num_settings,
- aws_http2_on_change_settings_complete_fn *on_completed,
- void *user_data);
- static int s_connection_send_ping(
- struct aws_http_connection *connection_base,
- const struct aws_byte_cursor *optional_opaque_data,
- aws_http2_on_ping_complete_fn *on_completed,
- void *user_data);
- static void s_connection_send_goaway(
- struct aws_http_connection *connection_base,
- uint32_t http2_error,
- bool allow_more_streams,
- const struct aws_byte_cursor *optional_debug_data);
- static int s_connection_get_sent_goaway(
- struct aws_http_connection *connection_base,
- uint32_t *out_http2_error,
- uint32_t *out_last_stream_id);
- static int s_connection_get_received_goaway(
- struct aws_http_connection *connection_base,
- uint32_t *out_http2_error,
- uint32_t *out_last_stream_id);
- static void s_connection_get_local_settings(
- const struct aws_http_connection *connection_base,
- struct aws_http2_setting out_settings[AWS_HTTP2_SETTINGS_COUNT]);
- static void s_connection_get_remote_settings(
- const struct aws_http_connection *connection_base,
- struct aws_http2_setting out_settings[AWS_HTTP2_SETTINGS_COUNT]);
- static void s_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
- static void s_outgoing_frames_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
- static int s_encode_outgoing_frames_queue(struct aws_h2_connection *connection, struct aws_byte_buf *output);
- static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connection, struct aws_byte_buf *output);
- static int s_record_closed_stream(
- struct aws_h2_connection *connection,
- uint32_t stream_id,
- enum aws_h2_stream_closed_when closed_when);
- static void s_stream_complete(struct aws_h2_connection *connection, struct aws_h2_stream *stream, int error_code);
- static void s_write_outgoing_frames(struct aws_h2_connection *connection, bool first_try);
- static void s_finish_shutdown(struct aws_h2_connection *connection);
- static void s_send_goaway(
- struct aws_h2_connection *connection,
- uint32_t h2_error_code,
- bool allow_more_streams,
- const struct aws_byte_cursor *optional_debug_data);
- static struct aws_h2_pending_settings *s_new_pending_settings(
- struct aws_allocator *allocator,
- const struct aws_http2_setting *settings_array,
- size_t num_settings,
- aws_http2_on_change_settings_complete_fn *on_completed,
- void *user_data);
- static struct aws_h2err s_decoder_on_headers_begin(uint32_t stream_id, void *userdata);
- static struct aws_h2err s_decoder_on_headers_i(
- uint32_t stream_id,
- const struct aws_http_header *header,
- enum aws_http_header_name name_enum,
- enum aws_http_header_block block_type,
- void *userdata);
- static struct aws_h2err s_decoder_on_headers_end(
- uint32_t stream_id,
- bool malformed,
- enum aws_http_header_block block_type,
- void *userdata);
- static struct aws_h2err s_decoder_on_push_promise(uint32_t stream_id, uint32_t promised_stream_id, void *userdata);
- static struct aws_h2err s_decoder_on_data_begin(
- uint32_t stream_id,
- uint32_t payload_len,
- uint32_t total_padding_bytes,
- bool end_stream,
- void *userdata);
- static struct aws_h2err s_decoder_on_data_i(uint32_t stream_id, struct aws_byte_cursor data, void *userdata);
- static struct aws_h2err s_decoder_on_end_stream(uint32_t stream_id, void *userdata);
- static struct aws_h2err s_decoder_on_rst_stream(uint32_t stream_id, uint32_t h2_error_code, void *userdata);
- static struct aws_h2err s_decoder_on_ping_ack(uint8_t opaque_data[AWS_HTTP2_PING_DATA_SIZE], void *userdata);
- static struct aws_h2err s_decoder_on_ping(uint8_t opaque_data[AWS_HTTP2_PING_DATA_SIZE], void *userdata);
- static struct aws_h2err s_decoder_on_settings(
- const struct aws_http2_setting *settings_array,
- size_t num_settings,
- void *userdata);
- static struct aws_h2err s_decoder_on_settings_ack(void *userdata);
- static struct aws_h2err s_decoder_on_window_update(uint32_t stream_id, uint32_t window_size_increment, void *userdata);
- struct aws_h2err s_decoder_on_goaway(
- uint32_t last_stream,
- uint32_t error_code,
- struct aws_byte_cursor debug_data,
- void *userdata);
- static void s_reset_statistics(struct aws_channel_handler *handler);
- static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_array_list *stats);
- static struct aws_http_connection_vtable s_h2_connection_vtable = {
- .channel_handler_vtable =
- {
- .process_read_message = s_handler_process_read_message,
- .process_write_message = s_handler_process_write_message,
- .increment_read_window = s_handler_increment_read_window,
- .shutdown = s_handler_shutdown,
- .initial_window_size = s_handler_initial_window_size,
- .message_overhead = s_handler_message_overhead,
- .destroy = s_handler_destroy,
- .reset_statistics = s_reset_statistics,
- .gather_statistics = s_gather_statistics,
- },
- .on_channel_handler_installed = s_handler_installed,
- .make_request = s_connection_make_request,
- .new_server_request_handler_stream = NULL,
- .stream_send_response = NULL,
- .close = s_connection_close,
- .stop_new_requests = s_connection_stop_new_request,
- .is_open = s_connection_is_open,
- .new_requests_allowed = s_connection_new_requests_allowed,
- .update_window = s_connection_update_window,
- .change_settings = s_connection_change_settings,
- .send_ping = s_connection_send_ping,
- .send_goaway = s_connection_send_goaway,
- .get_sent_goaway = s_connection_get_sent_goaway,
- .get_received_goaway = s_connection_get_received_goaway,
- .get_local_settings = s_connection_get_local_settings,
- .get_remote_settings = s_connection_get_remote_settings,
- };
- static const struct aws_h2_decoder_vtable s_h2_decoder_vtable = {
- .on_headers_begin = s_decoder_on_headers_begin,
- .on_headers_i = s_decoder_on_headers_i,
- .on_headers_end = s_decoder_on_headers_end,
- .on_push_promise_begin = s_decoder_on_push_promise,
- .on_data_begin = s_decoder_on_data_begin,
- .on_data_i = s_decoder_on_data_i,
- .on_end_stream = s_decoder_on_end_stream,
- .on_rst_stream = s_decoder_on_rst_stream,
- .on_ping_ack = s_decoder_on_ping_ack,
- .on_ping = s_decoder_on_ping,
- .on_settings = s_decoder_on_settings,
- .on_settings_ack = s_decoder_on_settings_ack,
- .on_window_update = s_decoder_on_window_update,
- .on_goaway = s_decoder_on_goaway,
- };
- static void s_lock_synced_data(struct aws_h2_connection *connection) {
- int err = aws_mutex_lock(&connection->synced_data.lock);
- AWS_ASSERT(!err && "lock failed");
- (void)err;
- }
- static void s_unlock_synced_data(struct aws_h2_connection *connection) {
- int err = aws_mutex_unlock(&connection->synced_data.lock);
- AWS_ASSERT(!err && "unlock failed");
- (void)err;
- }
- static void s_acquire_stream_and_connection_lock(struct aws_h2_stream *stream, struct aws_h2_connection *connection) {
- int err = aws_mutex_lock(&stream->synced_data.lock);
- err |= aws_mutex_lock(&connection->synced_data.lock);
- AWS_ASSERT(!err && "lock connection and stream failed");
- (void)err;
- }
- static void s_release_stream_and_connection_lock(struct aws_h2_stream *stream, struct aws_h2_connection *connection) {
- int err = aws_mutex_unlock(&connection->synced_data.lock);
- err |= aws_mutex_unlock(&stream->synced_data.lock);
- AWS_ASSERT(!err && "unlock connection and stream failed");
- (void)err;
- }
- static void s_add_time_measurement_to_stats(uint64_t start_ns, uint64_t end_ns, uint64_t *output_ms) {
- if (end_ns > start_ns) {
- *output_ms += aws_timestamp_convert(end_ns - start_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
- } else {
- *output_ms = 0;
- }
- }
- /**
- * Internal function for bringing connection to a stop.
- * Invoked multiple times, including when:
- * - Channel is shutting down in the read direction.
- * - Channel is shutting down in the write direction.
- * - An error occurs that will shutdown the channel.
- * - User wishes to close the connection (this is the only case where the function may run off-thread).
- */
- static void s_stop(
- struct aws_h2_connection *connection,
- bool stop_reading,
- bool stop_writing,
- bool schedule_shutdown,
- int error_code) {
- AWS_ASSERT(stop_reading || stop_writing || schedule_shutdown); /* You are required to stop at least 1 thing */
- if (stop_reading) {
- AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- connection->thread_data.is_reading_stopped = true;
- }
- if (stop_writing) {
- AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- connection->thread_data.is_writing_stopped = true;
- }
- /* Even if we're not scheduling shutdown just yet (ex: sent final request but waiting to read final response)
- * we don't consider the connection "open" anymore so user can't create more streams */
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
- connection->synced_data.is_open = false;
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- if (schedule_shutdown) {
- AWS_LOGF_INFO(
- AWS_LS_HTTP_CONNECTION,
- "id=%p: Shutting down connection with error code %d (%s).",
- (void *)&connection->base,
- error_code,
- aws_error_name(error_code));
- aws_channel_shutdown(connection->base.channel_slot->channel, error_code);
- }
- }
- void aws_h2_connection_shutdown_due_to_write_err(struct aws_h2_connection *connection, int error_code) {
- AWS_PRECONDITION(error_code);
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- if (connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written) {
- /* If shutdown is waiting for writes to complete, but writes are now broken,
- * then we must finish shutdown now */
- s_finish_shutdown(connection);
- } else {
- s_stop(connection, false /*stop_reading*/, true /*stop_writing*/, true /*schedule_shutdown*/, error_code);
- }
- }
- /* Common new() logic for server & client */
- static struct aws_h2_connection *s_connection_new(
- struct aws_allocator *alloc,
- bool manual_window_management,
- const struct aws_http2_connection_options *http2_options,
- bool server) {
- AWS_PRECONDITION(http2_options);
- struct aws_h2_connection *connection = aws_mem_calloc(alloc, 1, sizeof(struct aws_h2_connection));
- if (!connection) {
- return NULL;
- }
- connection->base.vtable = &s_h2_connection_vtable;
- connection->base.alloc = alloc;
- connection->base.channel_handler.vtable = &s_h2_connection_vtable.channel_handler_vtable;
- connection->base.channel_handler.alloc = alloc;
- connection->base.channel_handler.impl = connection;
- connection->base.http_version = AWS_HTTP_VERSION_2;
- /* Init the next stream id (server must use even ids, client odd [RFC 7540 5.1.1])*/
- connection->base.next_stream_id = (server ? 2 : 1);
- /* Stream window management */
- connection->base.stream_manual_window_management = manual_window_management;
- /* Connection window management */
- connection->conn_manual_window_management = http2_options->conn_manual_window_management;
- connection->on_goaway_received = http2_options->on_goaway_received;
- connection->on_remote_settings_change = http2_options->on_remote_settings_change;
- aws_channel_task_init(
- &connection->cross_thread_work_task, s_cross_thread_work_task, connection, "HTTP/2 cross-thread work");
- aws_channel_task_init(
- &connection->outgoing_frames_task, s_outgoing_frames_task, connection, "HTTP/2 outgoing frames");
- /* 1 refcount for user */
- aws_atomic_init_int(&connection->base.refcount, 1);
- uint32_t max_stream_id = AWS_H2_STREAM_ID_MAX;
- connection->synced_data.goaway_sent_last_stream_id = max_stream_id + 1;
- connection->synced_data.goaway_received_last_stream_id = max_stream_id + 1;
- aws_linked_list_init(&connection->synced_data.pending_stream_list);
- aws_linked_list_init(&connection->synced_data.pending_frame_list);
- aws_linked_list_init(&connection->synced_data.pending_settings_list);
- aws_linked_list_init(&connection->synced_data.pending_ping_list);
- aws_linked_list_init(&connection->synced_data.pending_goaway_list);
- aws_linked_list_init(&connection->thread_data.outgoing_streams_list);
- aws_linked_list_init(&connection->thread_data.pending_settings_queue);
- aws_linked_list_init(&connection->thread_data.pending_ping_queue);
- aws_linked_list_init(&connection->thread_data.stalled_window_streams_list);
- aws_linked_list_init(&connection->thread_data.waiting_streams_list);
- aws_linked_list_init(&connection->thread_data.outgoing_frames_queue);
- if (aws_mutex_init(&connection->synced_data.lock)) {
- CONNECTION_LOGF(
- ERROR, connection, "Mutex init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error()));
- goto error;
- }
- if (aws_hash_table_init(
- &connection->thread_data.active_streams_map, alloc, 8, aws_hash_ptr, aws_ptr_eq, NULL, NULL)) {
- CONNECTION_LOGF(
- ERROR, connection, "Hashtable init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error()));
- goto error;
- }
- size_t max_closed_streams = AWS_HTTP2_DEFAULT_MAX_CLOSED_STREAMS;
- if (http2_options->max_closed_streams) {
- max_closed_streams = http2_options->max_closed_streams;
- }
- connection->thread_data.closed_streams =
- aws_cache_new_fifo(alloc, aws_hash_ptr, aws_ptr_eq, NULL, NULL, max_closed_streams);
- if (!connection->thread_data.closed_streams) {
- CONNECTION_LOGF(
- ERROR, connection, "FIFO cache init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error()));
- goto error;
- }
- /* Initialize the value of settings */
- memcpy(connection->thread_data.settings_peer, aws_h2_settings_initial, sizeof(aws_h2_settings_initial));
- memcpy(connection->thread_data.settings_self, aws_h2_settings_initial, sizeof(aws_h2_settings_initial));
- memcpy(connection->synced_data.settings_peer, aws_h2_settings_initial, sizeof(aws_h2_settings_initial));
- memcpy(connection->synced_data.settings_self, aws_h2_settings_initial, sizeof(aws_h2_settings_initial));
- connection->thread_data.window_size_peer = AWS_H2_INIT_WINDOW_SIZE;
- connection->thread_data.window_size_self = AWS_H2_INIT_WINDOW_SIZE;
- connection->thread_data.goaway_received_last_stream_id = AWS_H2_STREAM_ID_MAX;
- connection->thread_data.goaway_sent_last_stream_id = AWS_H2_STREAM_ID_MAX;
- aws_crt_statistics_http2_channel_init(&connection->thread_data.stats);
- connection->thread_data.stats.was_inactive = true; /* Start with non active streams */
- connection->synced_data.is_open = true;
- connection->synced_data.new_stream_error_code = AWS_ERROR_SUCCESS;
- /* Create a new decoder */
- struct aws_h2_decoder_params params = {
- .alloc = alloc,
- .vtable = &s_h2_decoder_vtable,
- .userdata = connection,
- .logging_id = connection,
- .is_server = server,
- };
- connection->thread_data.decoder = aws_h2_decoder_new(¶ms);
- if (!connection->thread_data.decoder) {
- CONNECTION_LOGF(
- ERROR, connection, "Decoder init error %d (%s)", aws_last_error(), aws_error_name(aws_last_error()));
- goto error;
- }
- if (aws_h2_frame_encoder_init(&connection->thread_data.encoder, alloc, &connection->base)) {
- CONNECTION_LOGF(
- ERROR, connection, "Encoder init error %d (%s)", aws_last_error(), aws_error_name(aws_last_error()));
- goto error;
- }
- /* User data from connection base is not ready until the handler installed */
- connection->thread_data.init_pending_settings = s_new_pending_settings(
- connection->base.alloc,
- http2_options->initial_settings_array,
- http2_options->num_initial_settings,
- http2_options->on_initial_settings_completed,
- NULL /* user_data is set later... */);
- if (!connection->thread_data.init_pending_settings) {
- goto error;
- }
- /* We enqueue the inital settings when handler get installed */
- return connection;
- error:
- s_handler_destroy(&connection->base.channel_handler);
- return NULL;
- }
- struct aws_http_connection *aws_http_connection_new_http2_server(
- struct aws_allocator *allocator,
- bool manual_window_management,
- const struct aws_http2_connection_options *http2_options) {
- struct aws_h2_connection *connection = s_connection_new(allocator, manual_window_management, http2_options, true);
- if (!connection) {
- return NULL;
- }
- connection->base.server_data = &connection->base.client_or_server_data.server;
- return &connection->base;
- }
- struct aws_http_connection *aws_http_connection_new_http2_client(
- struct aws_allocator *allocator,
- bool manual_window_management,
- const struct aws_http2_connection_options *http2_options) {
- struct aws_h2_connection *connection = s_connection_new(allocator, manual_window_management, http2_options, false);
- if (!connection) {
- return NULL;
- }
- connection->base.client_data = &connection->base.client_or_server_data.client;
- return &connection->base;
- }
- static void s_handler_destroy(struct aws_channel_handler *handler) {
- struct aws_h2_connection *connection = handler->impl;
- CONNECTION_LOG(TRACE, connection, "Destroying connection");
- /* No streams should be left in internal datastructures */
- AWS_ASSERT(
- !aws_hash_table_is_valid(&connection->thread_data.active_streams_map) ||
- aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0);
- AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.waiting_streams_list));
- AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.stalled_window_streams_list));
- AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.outgoing_streams_list));
- AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_stream_list));
- AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_frame_list));
- AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_settings_list));
- AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_ping_list));
- AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_goaway_list));
- AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.pending_ping_queue));
- AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.pending_settings_queue));
- /* Clean up any unsent frames and structures */
- struct aws_linked_list *outgoing_frames_queue = &connection->thread_data.outgoing_frames_queue;
- while (!aws_linked_list_empty(outgoing_frames_queue)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(outgoing_frames_queue);
- struct aws_h2_frame *frame = AWS_CONTAINER_OF(node, struct aws_h2_frame, node);
- aws_h2_frame_destroy(frame);
- }
- if (connection->thread_data.init_pending_settings) {
- /* if initial settings were never sent, we need to clear the memory here */
- aws_mem_release(connection->base.alloc, connection->thread_data.init_pending_settings);
- }
- aws_h2_decoder_destroy(connection->thread_data.decoder);
- aws_h2_frame_encoder_clean_up(&connection->thread_data.encoder);
- aws_hash_table_clean_up(&connection->thread_data.active_streams_map);
- aws_cache_destroy(connection->thread_data.closed_streams);
- aws_mutex_clean_up(&connection->synced_data.lock);
- aws_mem_release(connection->base.alloc, connection);
- }
- static struct aws_h2_pending_settings *s_new_pending_settings(
- struct aws_allocator *allocator,
- const struct aws_http2_setting *settings_array,
- size_t num_settings,
- aws_http2_on_change_settings_complete_fn *on_completed,
- void *user_data) {
- size_t settings_storage_size = sizeof(struct aws_http2_setting) * num_settings;
- struct aws_h2_pending_settings *pending_settings;
- void *settings_storage;
- if (!aws_mem_acquire_many(
- allocator,
- 2,
- &pending_settings,
- sizeof(struct aws_h2_pending_settings),
- &settings_storage,
- settings_storage_size)) {
- return NULL;
- }
- AWS_ZERO_STRUCT(*pending_settings);
- /* We buffer the settings up, incase the caller has freed them when the ACK arrives */
- pending_settings->settings_array = settings_storage;
- if (settings_array) {
- memcpy(pending_settings->settings_array, settings_array, num_settings * sizeof(struct aws_http2_setting));
- }
- pending_settings->num_settings = num_settings;
- pending_settings->on_completed = on_completed;
- pending_settings->user_data = user_data;
- return pending_settings;
- }
- static struct aws_h2_pending_ping *s_new_pending_ping(
- struct aws_allocator *allocator,
- const struct aws_byte_cursor *optional_opaque_data,
- const uint64_t started_time,
- void *user_data,
- aws_http2_on_ping_complete_fn *on_completed) {
- struct aws_h2_pending_ping *pending_ping = aws_mem_calloc(allocator, 1, sizeof(struct aws_h2_pending_ping));
- if (!pending_ping) {
- return NULL;
- }
- if (optional_opaque_data) {
- memcpy(pending_ping->opaque_data, optional_opaque_data->ptr, AWS_HTTP2_PING_DATA_SIZE);
- }
- pending_ping->started_time = started_time;
- pending_ping->on_completed = on_completed;
- pending_ping->user_data = user_data;
- return pending_ping;
- }
- static struct aws_h2_pending_goaway *s_new_pending_goaway(
- struct aws_allocator *allocator,
- uint32_t http2_error,
- bool allow_more_streams,
- const struct aws_byte_cursor *optional_debug_data) {
- struct aws_byte_cursor debug_data;
- AWS_ZERO_STRUCT(debug_data);
- if (optional_debug_data) {
- debug_data = *optional_debug_data;
- }
- struct aws_h2_pending_goaway *pending_goaway;
- void *debug_data_storage;
- /* mem acquire cannot fail anymore */
- aws_mem_acquire_many(
- allocator, 2, &pending_goaway, sizeof(struct aws_h2_pending_goaway), &debug_data_storage, debug_data.len);
- if (debug_data.len) {
- memcpy(debug_data_storage, debug_data.ptr, debug_data.len);
- debug_data.ptr = debug_data_storage;
- }
- pending_goaway->debug_data = debug_data;
- pending_goaway->http2_error = http2_error;
- pending_goaway->allow_more_streams = allow_more_streams;
- return pending_goaway;
- }
- void aws_h2_connection_enqueue_outgoing_frame(struct aws_h2_connection *connection, struct aws_h2_frame *frame) {
- AWS_PRECONDITION(frame->type != AWS_H2_FRAME_T_DATA);
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- if (frame->high_priority) {
- /* Check from the head of the queue, and find a node with normal priority, and insert before it */
- struct aws_linked_list_node *iter = aws_linked_list_begin(&connection->thread_data.outgoing_frames_queue);
- /* one past the last element */
- const struct aws_linked_list_node *end = aws_linked_list_end(&connection->thread_data.outgoing_frames_queue);
- while (iter != end) {
- struct aws_h2_frame *frame_i = AWS_CONTAINER_OF(iter, struct aws_h2_frame, node);
- if (connection->thread_data.current_outgoing_frame == frame_i) {
- iter = iter->next;
- continue;
- }
- if (!frame_i->high_priority) {
- break;
- }
- iter = iter->next;
- }
- aws_linked_list_insert_before(iter, &frame->node);
- } else {
- aws_linked_list_push_back(&connection->thread_data.outgoing_frames_queue, &frame->node);
- }
- }
- static void s_on_channel_write_complete(
- struct aws_channel *channel,
- struct aws_io_message *message,
- int err_code,
- void *user_data) {
- (void)message;
- struct aws_h2_connection *connection = user_data;
- if (err_code) {
- CONNECTION_LOGF(ERROR, connection, "Message did not write to network, error %s", aws_error_name(err_code));
- aws_h2_connection_shutdown_due_to_write_err(connection, err_code);
- return;
- }
- CONNECTION_LOG(TRACE, connection, "Message finished writing to network. Rescheduling outgoing frame task");
- /* To avoid wasting memory, we only want ONE of our written aws_io_messages in the channel at a time.
- * Therefore, we wait until it's written to the network before trying to send another
- * by running the outgoing-frame-task again.
- *
- * We also want to share the network with other channels.
- * Therefore, when the write completes, we SCHEDULE the outgoing-frame-task
- * to run again instead of calling the function directly.
- * This way, if the message completes synchronously,
- * we're not hogging the network by writing message after message in a tight loop */
- aws_channel_schedule_task_now(channel, &connection->outgoing_frames_task);
- }
- static void s_outgoing_frames_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
- (void)task;
- if (status != AWS_TASK_STATUS_RUN_READY) {
- return;
- }
- struct aws_h2_connection *connection = arg;
- s_write_outgoing_frames(connection, false /*first_try*/);
- }
- static void s_write_outgoing_frames(struct aws_h2_connection *connection, bool first_try) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- AWS_PRECONDITION(connection->thread_data.is_outgoing_frames_task_active);
- struct aws_channel_slot *channel_slot = connection->base.channel_slot;
- struct aws_linked_list *outgoing_frames_queue = &connection->thread_data.outgoing_frames_queue;
- struct aws_linked_list *outgoing_streams_list = &connection->thread_data.outgoing_streams_list;
- if (connection->thread_data.is_writing_stopped) {
- return;
- }
- /* Determine whether there's work to do, and end task immediately if there's not.
- * Note that we stop writing DATA frames if the channel is trying to shut down */
- bool has_control_frames = !aws_linked_list_empty(outgoing_frames_queue);
- bool has_data_frames = !aws_linked_list_empty(outgoing_streams_list);
- bool may_write_data_frames = (connection->thread_data.window_size_peer > AWS_H2_MIN_WINDOW_SIZE) &&
- !connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written;
- bool will_write = has_control_frames || (has_data_frames && may_write_data_frames);
- if (!will_write) {
- if (!first_try) {
- CONNECTION_LOGF(
- TRACE,
- connection,
- "Outgoing frames task stopped. has_control_frames:%d has_data_frames:%d may_write_data_frames:%d",
- has_control_frames,
- has_data_frames,
- may_write_data_frames);
- }
- connection->thread_data.is_outgoing_frames_task_active = false;
- if (connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written) {
- s_finish_shutdown(connection);
- }
- return;
- }
- if (first_try) {
- CONNECTION_LOG(TRACE, connection, "Starting outgoing frames task");
- }
- /* Acquire aws_io_message, that we will attempt to fill up */
- struct aws_io_message *msg = aws_channel_slot_acquire_max_message_for_write(channel_slot);
- if (AWS_UNLIKELY(!msg)) {
- CONNECTION_LOG(ERROR, connection, "Failed to acquire message from pool, closing connection.");
- goto error;
- }
- /* Set up callback so we can send another message when this one completes */
- msg->on_completion = s_on_channel_write_complete;
- msg->user_data = connection;
- CONNECTION_LOGF(
- TRACE,
- connection,
- "Outgoing frames task acquired message with %zu bytes available",
- msg->message_data.capacity - msg->message_data.len);
- /* Write as many frames from outgoing_frames_queue as possible. */
- if (s_encode_outgoing_frames_queue(connection, &msg->message_data)) {
- goto error;
- }
- /* If outgoing_frames_queue emptied, and connection is running normally,
- * then write as many DATA frames from outgoing_streams_list as possible. */
- if (aws_linked_list_empty(outgoing_frames_queue) && may_write_data_frames) {
- if (s_encode_data_from_outgoing_streams(connection, &msg->message_data)) {
- goto error;
- }
- }
- if (msg->message_data.len) {
- /* Write message to channel.
- * outgoing_frames_task will resume when message completes. */
- CONNECTION_LOGF(TRACE, connection, "Outgoing frames task sending message of size %zu", msg->message_data.len);
- if (aws_channel_slot_send_message(channel_slot, msg, AWS_CHANNEL_DIR_WRITE)) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Failed to send channel message: %s. Closing connection.",
- aws_error_name(aws_last_error()));
- goto error;
- }
- } else {
- /* Message is empty, warn that no work is being done and reschedule the task to try again next tick.
- * It's likely that body isn't ready, so body streaming function has no data to write yet.
- * If this scenario turns out to be common we should implement a "pause" feature. */
- CONNECTION_LOG(WARN, connection, "Outgoing frames task sent no data, will try again next tick.");
- aws_mem_release(msg->allocator, msg);
- aws_channel_schedule_task_now(channel_slot->channel, &connection->outgoing_frames_task);
- }
- return;
- error:;
- int error_code = aws_last_error();
- if (msg) {
- aws_mem_release(msg->allocator, msg);
- }
- aws_h2_connection_shutdown_due_to_write_err(connection, error_code);
- }
- /* Write as many frames from outgoing_frames_queue as possible (contains all non-DATA frames) */
- static int s_encode_outgoing_frames_queue(struct aws_h2_connection *connection, struct aws_byte_buf *output) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- struct aws_linked_list *outgoing_frames_queue = &connection->thread_data.outgoing_frames_queue;
- /* Write as many frames from outgoing_frames_queue as possible. */
- while (!aws_linked_list_empty(outgoing_frames_queue)) {
- struct aws_linked_list_node *frame_node = aws_linked_list_front(outgoing_frames_queue);
- struct aws_h2_frame *frame = AWS_CONTAINER_OF(frame_node, struct aws_h2_frame, node);
- connection->thread_data.current_outgoing_frame = frame;
- bool frame_complete;
- if (aws_h2_encode_frame(&connection->thread_data.encoder, frame, output, &frame_complete)) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Error encoding frame: type=%s stream=%" PRIu32 " error=%s",
- aws_h2_frame_type_to_str(frame->type),
- frame->stream_id,
- aws_error_name(aws_last_error()));
- return AWS_OP_ERR;
- }
- if (!frame_complete) {
- if (output->len == 0) {
- /* We're in trouble if an empty message isn't big enough for this frame to do any work with */
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Message is too small for encoder. frame-type=%s stream=%" PRIu32 " available-space=%zu",
- aws_h2_frame_type_to_str(frame->type),
- frame->stream_id,
- output->capacity);
- aws_raise_error(AWS_ERROR_INVALID_STATE);
- return AWS_OP_ERR;
- }
- CONNECTION_LOG(TRACE, connection, "Outgoing frames task filled message, and has more frames to send later");
- break;
- }
- /* Done encoding frame, pop from queue and cleanup*/
- aws_linked_list_remove(frame_node);
- aws_h2_frame_destroy(frame);
- connection->thread_data.current_outgoing_frame = NULL;
- }
- return AWS_OP_SUCCESS;
- }
- /* Write as many DATA frames from outgoing_streams_list as possible. */
- static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connection, struct aws_byte_buf *output) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- struct aws_linked_list *outgoing_streams_list = &connection->thread_data.outgoing_streams_list;
- if (aws_linked_list_empty(outgoing_streams_list)) {
- return AWS_OP_SUCCESS;
- }
- struct aws_linked_list *stalled_window_streams_list = &connection->thread_data.stalled_window_streams_list;
- struct aws_linked_list *waiting_streams_list = &connection->thread_data.waiting_streams_list;
- /* If a stream stalls, put it in this list until the function ends so we don't keep trying to read from it.
- * We put it back at the end of function. */
- struct aws_linked_list stalled_streams_list;
- aws_linked_list_init(&stalled_streams_list);
- int aws_error_code = 0;
- /* We simply round-robin through streams, instead of using stream priority.
- * Respecting priority is not required (RFC-7540 5.3), so we're ignoring it for now. This also keeps use safe
- * from priority DOS attacks: https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-9513 */
- while (!aws_linked_list_empty(outgoing_streams_list)) {
- if (connection->thread_data.window_size_peer <= AWS_H2_MIN_WINDOW_SIZE) {
- CONNECTION_LOGF(
- DEBUG,
- connection,
- "Peer connection's flow-control window is too small now %zu. Connection will stop sending DATA until "
- "WINDOW_UPDATE is received.",
- connection->thread_data.window_size_peer);
- goto done;
- }
- /* Stop looping if message is so full it's not worth the bother */
- size_t space_available = output->capacity - output->len;
- size_t worth_trying_threshold = AWS_H2_FRAME_PREFIX_SIZE * 2;
- if (space_available < worth_trying_threshold) {
- CONNECTION_LOG(TRACE, connection, "Outgoing frames task filled message, and has more frames to send later");
- goto done;
- }
- struct aws_linked_list_node *node = aws_linked_list_pop_front(outgoing_streams_list);
- struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node);
- /* Ask stream to encode a data frame.
- * Stream may complete itself as a result of encoding its data,
- * in which case it will vanish from the connection's datastructures as a side-effect of this call.
- * But if stream has more data to send, push it back into the appropriate list. */
- int data_encode_status;
- if (aws_h2_stream_encode_data_frame(stream, &connection->thread_data.encoder, output, &data_encode_status)) {
- aws_error_code = aws_last_error();
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Connection error while encoding DATA on stream %" PRIu32 ", %s",
- stream->base.id,
- aws_error_name(aws_error_code));
- goto done;
- }
- /* If stream has more data, push it into the appropriate list. */
- switch (data_encode_status) {
- case AWS_H2_DATA_ENCODE_COMPLETE:
- break;
- case AWS_H2_DATA_ENCODE_ONGOING:
- aws_linked_list_push_back(outgoing_streams_list, node);
- break;
- case AWS_H2_DATA_ENCODE_ONGOING_BODY_STREAM_STALLED:
- aws_linked_list_push_back(&stalled_streams_list, node);
- break;
- case AWS_H2_DATA_ENCODE_ONGOING_WAITING_FOR_WRITES:
- stream->thread_data.waiting_for_writes = true;
- aws_linked_list_push_back(waiting_streams_list, node);
- break;
- case AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED:
- aws_linked_list_push_back(stalled_window_streams_list, node);
- AWS_H2_STREAM_LOG(
- DEBUG,
- stream,
- "Peer stream's flow-control window is too small. Data frames on this stream will not be sent until "
- "WINDOW_UPDATE. ");
- break;
- default:
- CONNECTION_LOG(ERROR, connection, "Data encode status is invalid.");
- aws_error_code = AWS_ERROR_INVALID_STATE;
- }
- }
- done:
- /* Return any stalled streams to outgoing_streams_list */
- while (!aws_linked_list_empty(&stalled_streams_list)) {
- aws_linked_list_push_back(outgoing_streams_list, aws_linked_list_pop_front(&stalled_streams_list));
- }
- if (aws_error_code) {
- return aws_raise_error(aws_error_code);
- }
- if (aws_linked_list_empty(outgoing_streams_list)) {
- /* transition from something to write -> nothing to write */
- uint64_t now_ns = 0;
- aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns);
- s_add_time_measurement_to_stats(
- connection->thread_data.outgoing_timestamp_ns,
- now_ns,
- &connection->thread_data.stats.pending_outgoing_stream_ms);
- }
- return AWS_OP_SUCCESS;
- }
- /* If the outgoing-frames-task isn't scheduled, run it immediately. */
- void aws_h2_try_write_outgoing_frames(struct aws_h2_connection *connection) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- if (connection->thread_data.is_outgoing_frames_task_active) {
- return;
- }
- connection->thread_data.is_outgoing_frames_task_active = true;
- s_write_outgoing_frames(connection, true /*first_try*/);
- }
- /**
- * Returns successfully and sets `out_stream` if stream is currently active.
- * Returns successfully and sets `out_stream` to NULL if the frame should be ignored.
- * Returns failed aws_h2err if it is a connection error to receive this frame.
- */
- struct aws_h2err s_get_active_stream_for_incoming_frame(
- struct aws_h2_connection *connection,
- uint32_t stream_id,
- enum aws_h2_frame_type frame_type,
- struct aws_h2_stream **out_stream) {
- *out_stream = NULL;
- /* Check active streams */
- struct aws_hash_element *found = NULL;
- const void *stream_id_key = (void *)(size_t)stream_id;
- aws_hash_table_find(&connection->thread_data.active_streams_map, stream_id_key, &found);
- if (found) {
- /* Found it! return */
- *out_stream = found->value;
- return AWS_H2ERR_SUCCESS;
- }
- bool client_initiated = (stream_id % 2) == 1;
- bool self_initiated_stream = client_initiated && (connection->base.client_data != NULL);
- bool peer_initiated_stream = !self_initiated_stream;
- if ((self_initiated_stream && stream_id >= connection->base.next_stream_id) ||
- (peer_initiated_stream && stream_id > connection->thread_data.latest_peer_initiated_stream_id)) {
- /* Illegal to receive frames for a stream in the idle state (stream doesn't exist yet)
- * (except server receiving HEADERS to start a stream, but that's handled elsewhere) */
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Illegal to receive %s frame on stream id=%" PRIu32 " state=IDLE",
- aws_h2_frame_type_to_str(frame_type),
- stream_id);
- return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
- }
- if (peer_initiated_stream && stream_id > connection->thread_data.goaway_sent_last_stream_id) {
- /* Once GOAWAY sent, ignore frames for peer-initiated streams whose id > last-stream-id */
- CONNECTION_LOGF(
- TRACE,
- connection,
- "Ignoring %s frame on stream id=%" PRIu32 " because GOAWAY sent with last-stream-id=%" PRIu32,
- aws_h2_frame_type_to_str(frame_type),
- stream_id,
- connection->thread_data.goaway_sent_last_stream_id);
- return AWS_H2ERR_SUCCESS;
- }
- void *cached_value = NULL;
- /* Stream is closed, check whether it's legal for a few more frames to trickle in */
- if (aws_cache_find(connection->thread_data.closed_streams, stream_id_key, &cached_value)) {
- return aws_h2err_from_last_error();
- }
- if (cached_value) {
- if (frame_type == AWS_H2_FRAME_T_PRIORITY) {
- /* If we support PRIORITY, do something here. Right now just ignore it */
- return AWS_H2ERR_SUCCESS;
- }
- enum aws_h2_stream_closed_when closed_when = (enum aws_h2_stream_closed_when)(size_t)cached_value;
- switch (closed_when) {
- case AWS_H2_STREAM_CLOSED_WHEN_BOTH_SIDES_END_STREAM:
- /* WINDOW_UPDATE or RST_STREAM frames can be received ... for a short period after
- * a DATA or HEADERS frame containing an END_STREAM flag is sent.
- * Endpoints MUST ignore WINDOW_UPDATE or RST_STREAM frames received in this state */
- if (frame_type == AWS_H2_FRAME_T_WINDOW_UPDATE || frame_type == AWS_H2_FRAME_T_RST_STREAM) {
- CONNECTION_LOGF(
- TRACE,
- connection,
- "Ignoring %s frame on stream id=%" PRIu32 " because END_STREAM flag was recently sent.",
- aws_h2_frame_type_to_str(frame_type),
- stream_id);
- return AWS_H2ERR_SUCCESS;
- } else {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Illegal to receive %s frame on stream id=%" PRIu32 " after END_STREAM has been received.",
- aws_h2_frame_type_to_str(frame_type),
- stream_id);
- return aws_h2err_from_h2_code(AWS_HTTP2_ERR_STREAM_CLOSED);
- }
- break;
- case AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_RECEIVED:
- /* An endpoint that receives any frame other than PRIORITY after receiving a RST_STREAM
- * MUST treat that as a stream error (Section 5.4.2) of type STREAM_CLOSED */
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Illegal to receive %s frame on stream id=%" PRIu32 " after RST_STREAM has been received",
- aws_h2_frame_type_to_str(frame_type),
- stream_id);
- struct aws_h2_frame *rst_stream =
- aws_h2_frame_new_rst_stream(connection->base.alloc, stream_id, AWS_HTTP2_ERR_STREAM_CLOSED);
- if (!rst_stream) {
- CONNECTION_LOGF(
- ERROR, connection, "Error creating RST_STREAM frame, %s", aws_error_name(aws_last_error()));
- return aws_h2err_from_last_error();
- }
- aws_h2_connection_enqueue_outgoing_frame(connection, rst_stream);
- return AWS_H2ERR_SUCCESS;
- case AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_SENT:
- /* An endpoint MUST ignore frames that it receives on closed streams after it has sent a RST_STREAM
- * frame */
- CONNECTION_LOGF(
- TRACE,
- connection,
- "Ignoring %s frame on stream id=%" PRIu32 " because RST_STREAM was recently sent.",
- aws_h2_frame_type_to_str(frame_type),
- stream_id);
- return AWS_H2ERR_SUCCESS;
- break;
- default:
- CONNECTION_LOGF(
- ERROR, connection, "Invalid state fo cached closed stream, stream id=%" PRIu32, stream_id);
- return aws_h2err_from_h2_code(AWS_HTTP2_ERR_INTERNAL_ERROR);
- break;
- }
- }
- if (frame_type == AWS_H2_FRAME_T_PRIORITY) {
- /* ignored if the stream has been removed from the dependency tree */
- return AWS_H2ERR_SUCCESS;
- }
- /* Stream closed (purged from closed_streams, or implicitly closed when its ID was skipped) */
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Illegal to receive %s frame on stream id=%" PRIu32
- ", no memory of closed stream (ID skipped, or removed from cache)",
- aws_h2_frame_type_to_str(frame_type),
- stream_id);
- return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
- }
- /* Decoder callbacks */
- struct aws_h2err s_decoder_on_headers_begin(uint32_t stream_id, void *userdata) {
- struct aws_h2_connection *connection = userdata;
- if (connection->base.server_data) {
- /* Server would create new request-handler stream... */
- return aws_h2err_from_aws_code(AWS_ERROR_UNIMPLEMENTED);
- }
- struct aws_h2_stream *stream;
- struct aws_h2err err =
- s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_HEADERS, &stream);
- if (aws_h2err_failed(err)) {
- return err;
- }
- if (stream) {
- err = aws_h2_stream_on_decoder_headers_begin(stream);
- if (aws_h2err_failed(err)) {
- return err;
- }
- }
- return AWS_H2ERR_SUCCESS;
- }
- struct aws_h2err s_decoder_on_headers_i(
- uint32_t stream_id,
- const struct aws_http_header *header,
- enum aws_http_header_name name_enum,
- enum aws_http_header_block block_type,
- void *userdata) {
- struct aws_h2_connection *connection = userdata;
- struct aws_h2_stream *stream;
- struct aws_h2err err =
- s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_HEADERS, &stream);
- if (aws_h2err_failed(err)) {
- return err;
- }
- if (stream) {
- err = aws_h2_stream_on_decoder_headers_i(stream, header, name_enum, block_type);
- if (aws_h2err_failed(err)) {
- return err;
- }
- }
- return AWS_H2ERR_SUCCESS;
- }
- struct aws_h2err s_decoder_on_headers_end(
- uint32_t stream_id,
- bool malformed,
- enum aws_http_header_block block_type,
- void *userdata) {
- struct aws_h2_connection *connection = userdata;
- struct aws_h2_stream *stream;
- struct aws_h2err err =
- s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_HEADERS, &stream);
- if (aws_h2err_failed(err)) {
- return err;
- }
- if (stream) {
- err = aws_h2_stream_on_decoder_headers_end(stream, malformed, block_type);
- if (aws_h2err_failed(err)) {
- return err;
- }
- }
- return AWS_H2ERR_SUCCESS;
- }
- struct aws_h2err s_decoder_on_push_promise(uint32_t stream_id, uint32_t promised_stream_id, void *userdata) {
- struct aws_h2_connection *connection = userdata;
- AWS_ASSERT(connection->base.client_data); /* decoder has already enforced this */
- AWS_ASSERT(promised_stream_id % 2 == 0); /* decoder has already enforced this */
- /* The identifier of a newly established stream MUST be numerically greater
- * than all streams that the initiating endpoint has opened or reserved (RFC-7540 5.1.1) */
- if (promised_stream_id <= connection->thread_data.latest_peer_initiated_stream_id) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Newly promised stream ID %" PRIu32 " must be higher than previously established ID %" PRIu32,
- promised_stream_id,
- connection->thread_data.latest_peer_initiated_stream_id);
- return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
- }
- connection->thread_data.latest_peer_initiated_stream_id = promised_stream_id;
- /* If we ever fully support PUSH_PROMISE, this is where we'd add the
- * promised_stream_id to some reserved_streams datastructure */
- struct aws_h2_stream *stream;
- struct aws_h2err err =
- s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_PUSH_PROMISE, &stream);
- if (aws_h2err_failed(err)) {
- return err;
- }
- if (stream) {
- err = aws_h2_stream_on_decoder_push_promise(stream, promised_stream_id);
- if (aws_h2err_failed(err)) {
- return err;
- }
- }
- return AWS_H2ERR_SUCCESS;
- }
- static int s_connection_send_update_window(struct aws_h2_connection *connection, uint32_t window_size) {
- struct aws_h2_frame *connection_window_update_frame =
- aws_h2_frame_new_window_update(connection->base.alloc, 0, window_size);
- if (!connection_window_update_frame) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "WINDOW_UPDATE frame on connection failed to be sent, error %s",
- aws_error_name(aws_last_error()));
- return AWS_OP_ERR;
- }
- aws_h2_connection_enqueue_outgoing_frame(connection, connection_window_update_frame);
- connection->thread_data.window_size_self += window_size;
- return AWS_OP_SUCCESS;
- }
- struct aws_h2err s_decoder_on_data_begin(
- uint32_t stream_id,
- uint32_t payload_len,
- uint32_t total_padding_bytes,
- bool end_stream,
- void *userdata) {
- struct aws_h2_connection *connection = userdata;
- /* A receiver that receives a flow-controlled frame MUST always account for its contribution against the connection
- * flow-control window, unless the receiver treats this as a connection error */
- if (aws_sub_size_checked(
- connection->thread_data.window_size_self, payload_len, &connection->thread_data.window_size_self)) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "DATA length %" PRIu32 " exceeds flow-control window %zu",
- payload_len,
- connection->thread_data.window_size_self);
- return aws_h2err_from_h2_code(AWS_HTTP2_ERR_FLOW_CONTROL_ERROR);
- }
- struct aws_h2_stream *stream;
- struct aws_h2err err = s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_DATA, &stream);
- if (aws_h2err_failed(err)) {
- return err;
- }
- if (stream) {
- err = aws_h2_stream_on_decoder_data_begin(stream, payload_len, total_padding_bytes, end_stream);
- if (aws_h2err_failed(err)) {
- return err;
- }
- }
- /* Handle automatic updates of the connection flow window */
- uint32_t auto_window_update;
- if (connection->conn_manual_window_management) {
- /* Automatically update the flow-window to account for padding, even though "manual window management"
- * is enabled. We do this because the current API doesn't have any way to inform the user about padding,
- * so we can't expect them to manage it themselves. */
- auto_window_update = total_padding_bytes;
- } else {
- /* Automatically update the full amount we just received */
- auto_window_update = payload_len;
- }
- if (auto_window_update != 0) {
- if (s_connection_send_update_window(connection, auto_window_update)) {
- return aws_h2err_from_last_error();
- }
- CONNECTION_LOGF(
- TRACE,
- connection,
- "Automatically updating connection window by %" PRIu32 "(%" PRIu32 " due to padding).",
- auto_window_update,
- total_padding_bytes);
- }
- return AWS_H2ERR_SUCCESS;
- }
- struct aws_h2err s_decoder_on_data_i(uint32_t stream_id, struct aws_byte_cursor data, void *userdata) {
- struct aws_h2_connection *connection = userdata;
- /* Pass data to stream */
- struct aws_h2_stream *stream;
- struct aws_h2err err = s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_DATA, &stream);
- if (aws_h2err_failed(err)) {
- return err;
- }
- if (stream) {
- err = aws_h2_stream_on_decoder_data_i(stream, data);
- if (aws_h2err_failed(err)) {
- return err;
- }
- }
- return AWS_H2ERR_SUCCESS;
- }
- struct aws_h2err s_decoder_on_end_stream(uint32_t stream_id, void *userdata) {
- struct aws_h2_connection *connection = userdata;
- /* Not calling s_get_active_stream_for_incoming_frame() here because END_STREAM
- * isn't an actual frame type. It's a flag on DATA or HEADERS frames, and we
- * already checked the legality of those frames in their respective callbacks. */
- struct aws_hash_element *found = NULL;
- aws_hash_table_find(&connection->thread_data.active_streams_map, (void *)(size_t)stream_id, &found);
- if (found) {
- struct aws_h2_stream *stream = found->value;
- struct aws_h2err err = aws_h2_stream_on_decoder_end_stream(stream);
- if (aws_h2err_failed(err)) {
- return err;
- }
- }
- return AWS_H2ERR_SUCCESS;
- }
- static struct aws_h2err s_decoder_on_rst_stream(uint32_t stream_id, uint32_t h2_error_code, void *userdata) {
- struct aws_h2_connection *connection = userdata;
- /* Pass RST_STREAM to stream */
- struct aws_h2_stream *stream;
- struct aws_h2err err =
- s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_RST_STREAM, &stream);
- if (aws_h2err_failed(err)) {
- return err;
- }
- if (stream) {
- err = aws_h2_stream_on_decoder_rst_stream(stream, h2_error_code);
- if (aws_h2err_failed(err)) {
- return err;
- }
- }
- return AWS_H2ERR_SUCCESS;
- }
- static struct aws_h2err s_decoder_on_ping_ack(uint8_t opaque_data[AWS_HTTP2_PING_DATA_SIZE], void *userdata) {
- struct aws_h2_connection *connection = userdata;
- if (aws_linked_list_empty(&connection->thread_data.pending_ping_queue)) {
- CONNECTION_LOG(ERROR, connection, "Received extraneous PING ACK.");
- return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
- }
- struct aws_h2err err;
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->thread_data.pending_ping_queue);
- struct aws_h2_pending_ping *pending_ping = AWS_CONTAINER_OF(node, struct aws_h2_pending_ping, node);
- /* Check the payload */
- if (!aws_array_eq(opaque_data, AWS_HTTP2_PING_DATA_SIZE, pending_ping->opaque_data, AWS_HTTP2_PING_DATA_SIZE)) {
- CONNECTION_LOG(ERROR, connection, "Received PING ACK with mismatched opaque-data.");
- err = aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
- goto error;
- }
- uint64_t time_stamp;
- if (aws_high_res_clock_get_ticks(&time_stamp)) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Failed getting the time stamp when PING ACK received, error %s",
- aws_error_name(aws_last_error()));
- err = aws_h2err_from_last_error();
- goto error;
- }
- uint64_t rtt;
- if (aws_sub_u64_checked(time_stamp, pending_ping->started_time, &rtt)) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Overflow from time stamp when PING ACK received, error %s",
- aws_error_name(aws_last_error()));
- err = aws_h2err_from_last_error();
- goto error;
- }
- CONNECTION_LOGF(TRACE, connection, "Round trip time is %lf ms, approximately", (double)rtt / 1000000);
- /* fire the callback */
- if (pending_ping->on_completed) {
- pending_ping->on_completed(&connection->base, rtt, AWS_ERROR_SUCCESS, pending_ping->user_data);
- }
- aws_mem_release(connection->base.alloc, pending_ping);
- return AWS_H2ERR_SUCCESS;
- error:
- if (pending_ping->on_completed) {
- pending_ping->on_completed(&connection->base, 0 /* fake rtt */, err.aws_code, pending_ping->user_data);
- }
- aws_mem_release(connection->base.alloc, pending_ping);
- return err;
- }
- static struct aws_h2err s_decoder_on_ping(uint8_t opaque_data[AWS_HTTP2_PING_DATA_SIZE], void *userdata) {
- struct aws_h2_connection *connection = userdata;
- /* send a PING frame with the ACK flag set in response, with an identical payload. */
- struct aws_h2_frame *ping_ack_frame = aws_h2_frame_new_ping(connection->base.alloc, true, opaque_data);
- if (!ping_ack_frame) {
- CONNECTION_LOGF(
- ERROR, connection, "Ping ACK frame failed to be sent, error %s", aws_error_name(aws_last_error()));
- return aws_h2err_from_last_error();
- }
- aws_h2_connection_enqueue_outgoing_frame(connection, ping_ack_frame);
- return AWS_H2ERR_SUCCESS;
- }
- static struct aws_h2err s_decoder_on_settings(
- const struct aws_http2_setting *settings_array,
- size_t num_settings,
- void *userdata) {
- struct aws_h2_connection *connection = userdata;
- struct aws_h2err err;
- /* Once all values have been processed, the recipient MUST immediately emit a SETTINGS frame with the ACK flag
- * set.(RFC-7540 6.5.3) */
- CONNECTION_LOG(TRACE, connection, "Setting frame processing ends");
- struct aws_h2_frame *settings_ack_frame = aws_h2_frame_new_settings(connection->base.alloc, NULL, 0, true);
- if (!settings_ack_frame) {
- CONNECTION_LOGF(
- ERROR, connection, "Settings ACK frame failed to be sent, error %s", aws_error_name(aws_last_error()));
- return aws_h2err_from_last_error();
- }
- aws_h2_connection_enqueue_outgoing_frame(connection, settings_ack_frame);
- /* Allocate a block of memory for settings_array in callback, which will only includes the settings we changed,
- * freed once the callback finished */
- struct aws_http2_setting *callback_array = NULL;
- if (num_settings) {
- callback_array = aws_mem_acquire(connection->base.alloc, num_settings * sizeof(struct aws_http2_setting));
- if (!callback_array) {
- return aws_h2err_from_last_error();
- }
- }
- size_t callback_array_num = 0;
- /* Apply the change to encoder and connection */
- struct aws_h2_frame_encoder *encoder = &connection->thread_data.encoder;
- for (size_t i = 0; i < num_settings; i++) {
- if (connection->thread_data.settings_peer[settings_array[i].id] == settings_array[i].value) {
- /* No change, don't do any work */
- continue;
- }
- switch (settings_array[i].id) {
- case AWS_HTTP2_SETTINGS_HEADER_TABLE_SIZE: {
- aws_h2_frame_encoder_set_setting_header_table_size(encoder, settings_array[i].value);
- } break;
- case AWS_HTTP2_SETTINGS_INITIAL_WINDOW_SIZE: {
- /* When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream
- * flow-control windows that it maintains by the difference between the new value and the old value. */
- int32_t size_changed =
- settings_array[i].value - connection->thread_data.settings_peer[settings_array[i].id];
- struct aws_hash_iter stream_iter = aws_hash_iter_begin(&connection->thread_data.active_streams_map);
- while (!aws_hash_iter_done(&stream_iter)) {
- struct aws_h2_stream *stream = stream_iter.element.value;
- aws_hash_iter_next(&stream_iter);
- err = aws_h2_stream_window_size_change(stream, size_changed, false /*self*/);
- if (aws_h2err_failed(err)) {
- CONNECTION_LOG(
- ERROR,
- connection,
- "Connection error, change to SETTINGS_INITIAL_WINDOW_SIZE caused a stream's flow-control "
- "window to exceed the maximum size");
- goto error;
- }
- }
- } break;
- case AWS_HTTP2_SETTINGS_MAX_FRAME_SIZE: {
- aws_h2_frame_encoder_set_setting_max_frame_size(encoder, settings_array[i].value);
- } break;
- default:
- break;
- }
- connection->thread_data.settings_peer[settings_array[i].id] = settings_array[i].value;
- callback_array[callback_array_num++] = settings_array[i];
- }
- if (connection->on_remote_settings_change) {
- connection->on_remote_settings_change(
- &connection->base, callback_array, callback_array_num, connection->base.user_data);
- }
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- memcpy(
- connection->synced_data.settings_peer,
- connection->thread_data.settings_peer,
- sizeof(connection->thread_data.settings_peer));
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- aws_mem_release(connection->base.alloc, callback_array);
- return AWS_H2ERR_SUCCESS;
- error:
- aws_mem_release(connection->base.alloc, callback_array);
- return err;
- }
- static struct aws_h2err s_decoder_on_settings_ack(void *userdata) {
- struct aws_h2_connection *connection = userdata;
- if (aws_linked_list_empty(&connection->thread_data.pending_settings_queue)) {
- CONNECTION_LOG(ERROR, connection, "Received a malicious extra SETTINGS acknowledgment");
- return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
- }
- struct aws_h2err err;
- struct aws_h2_pending_settings *pending_settings = NULL;
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->thread_data.pending_settings_queue);
- pending_settings = AWS_CONTAINER_OF(node, struct aws_h2_pending_settings, node);
- struct aws_http2_setting *settings_array = pending_settings->settings_array;
- /* Apply the settings */
- struct aws_h2_decoder *decoder = connection->thread_data.decoder;
- for (size_t i = 0; i < pending_settings->num_settings; i++) {
- if (connection->thread_data.settings_self[settings_array[i].id] == settings_array[i].value) {
- /* No change, don't do any work */
- continue;
- }
- switch (settings_array[i].id) {
- case AWS_HTTP2_SETTINGS_HEADER_TABLE_SIZE: {
- aws_h2_decoder_set_setting_header_table_size(decoder, settings_array[i].value);
- } break;
- case AWS_HTTP2_SETTINGS_ENABLE_PUSH: {
- aws_h2_decoder_set_setting_enable_push(decoder, settings_array[i].value);
- } break;
- case AWS_HTTP2_SETTINGS_INITIAL_WINDOW_SIZE: {
- /* When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream
- * flow-control windows that it maintains by the difference between the new value and the old value. */
- int32_t size_changed =
- settings_array[i].value - connection->thread_data.settings_self[settings_array[i].id];
- struct aws_hash_iter stream_iter = aws_hash_iter_begin(&connection->thread_data.active_streams_map);
- while (!aws_hash_iter_done(&stream_iter)) {
- struct aws_h2_stream *stream = stream_iter.element.value;
- aws_hash_iter_next(&stream_iter);
- err = aws_h2_stream_window_size_change(stream, size_changed, true /*self*/);
- if (aws_h2err_failed(err)) {
- CONNECTION_LOG(
- ERROR,
- connection,
- "Connection error, change to SETTINGS_INITIAL_WINDOW_SIZE from internal caused a stream's "
- "flow-control window to exceed the maximum size");
- goto error;
- }
- }
- } break;
- case AWS_HTTP2_SETTINGS_MAX_FRAME_SIZE: {
- aws_h2_decoder_set_setting_max_frame_size(decoder, settings_array[i].value);
- } break;
- default:
- break;
- }
- connection->thread_data.settings_self[settings_array[i].id] = settings_array[i].value;
- }
- /* invoke the change settings compeleted user callback */
- if (pending_settings->on_completed) {
- pending_settings->on_completed(&connection->base, AWS_ERROR_SUCCESS, pending_settings->user_data);
- }
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- memcpy(
- connection->synced_data.settings_self,
- connection->thread_data.settings_self,
- sizeof(connection->thread_data.settings_self));
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- /* clean up the pending_settings */
- aws_mem_release(connection->base.alloc, pending_settings);
- return AWS_H2ERR_SUCCESS;
- error:
- /* invoke the user callback with error code */
- if (pending_settings->on_completed) {
- pending_settings->on_completed(&connection->base, err.aws_code, pending_settings->user_data);
- }
- /* clean up the pending settings here */
- aws_mem_release(connection->base.alloc, pending_settings);
- return err;
- }
- static struct aws_h2err s_decoder_on_window_update(uint32_t stream_id, uint32_t window_size_increment, void *userdata) {
- struct aws_h2_connection *connection = userdata;
- if (stream_id == 0) {
- /* Let's update the connection flow-control window size */
- if (window_size_increment == 0) {
- /* flow-control window increment of 0 MUST be treated as error (RFC7540 6.9.1) */
- CONNECTION_LOG(ERROR, connection, "Window update frame with 0 increment size");
- return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
- }
- if (connection->thread_data.window_size_peer + window_size_increment > AWS_H2_WINDOW_UPDATE_MAX) {
- /* We MUST NOT allow a flow-control window to exceed the max */
- CONNECTION_LOG(
- ERROR,
- connection,
- "Window update frame causes the connection flow-control window exceeding the maximum size");
- return aws_h2err_from_h2_code(AWS_HTTP2_ERR_FLOW_CONTROL_ERROR);
- }
- if (connection->thread_data.window_size_peer <= AWS_H2_MIN_WINDOW_SIZE) {
- CONNECTION_LOGF(
- DEBUG,
- connection,
- "Peer connection's flow-control window is resumed from too small to %" PRIu32
- ". Connection will resume sending DATA.",
- window_size_increment);
- }
- connection->thread_data.window_size_peer += window_size_increment;
- return AWS_H2ERR_SUCCESS;
- } else {
- /* Update the flow-control window size for stream */
- struct aws_h2_stream *stream;
- bool window_resume;
- struct aws_h2err err =
- s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_WINDOW_UPDATE, &stream);
- if (aws_h2err_failed(err)) {
- return err;
- }
- if (stream) {
- err = aws_h2_stream_on_decoder_window_update(stream, window_size_increment, &window_resume);
- if (aws_h2err_failed(err)) {
- return err;
- }
- if (window_resume) {
- /* Set the stream free from stalled list */
- AWS_H2_STREAM_LOGF(
- DEBUG,
- stream,
- "Peer stream's flow-control window is resumed from 0 or negative to %" PRIu32
- " Stream will resume sending data.",
- stream->thread_data.window_size_peer);
- aws_linked_list_remove(&stream->node);
- aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node);
- }
- }
- }
- return AWS_H2ERR_SUCCESS;
- }
- struct aws_h2err s_decoder_on_goaway(
- uint32_t last_stream,
- uint32_t error_code,
- struct aws_byte_cursor debug_data,
- void *userdata) {
- struct aws_h2_connection *connection = userdata;
- if (last_stream > connection->thread_data.goaway_received_last_stream_id) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Received GOAWAY with invalid last-stream-id=%" PRIu32 ", must not exceed previous last-stream-id=%" PRIu32,
- last_stream,
- connection->thread_data.goaway_received_last_stream_id);
- return aws_h2err_from_h2_code(AWS_HTTP2_ERR_PROTOCOL_ERROR);
- }
- /* stop sending any new stream and making new request */
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_GOAWAY_RECEIVED;
- connection->synced_data.goaway_received_last_stream_id = last_stream;
- connection->synced_data.goaway_received_http2_error_code = error_code;
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- connection->thread_data.goaway_received_last_stream_id = last_stream;
- CONNECTION_LOGF(
- DEBUG,
- connection,
- "Received GOAWAY error-code=%s(0x%x) last-stream-id=%" PRIu32,
- aws_http2_error_code_to_str(error_code),
- error_code,
- last_stream);
- /* Complete activated streams whose id is higher than last_stream, since they will not process by peer. We should
- * treat them as they had never been created at all.
- * This would be more efficient if we could iterate streams in reverse-id order */
- struct aws_hash_iter stream_iter = aws_hash_iter_begin(&connection->thread_data.active_streams_map);
- while (!aws_hash_iter_done(&stream_iter)) {
- struct aws_h2_stream *stream = stream_iter.element.value;
- aws_hash_iter_next(&stream_iter);
- if (stream->base.id > last_stream) {
- AWS_H2_STREAM_LOG(
- DEBUG,
- stream,
- "stream ID is higher than GOAWAY last stream ID, please retry this stream on a new connection.");
- s_stream_complete(connection, stream, AWS_ERROR_HTTP_GOAWAY_RECEIVED);
- }
- }
- if (connection->on_goaway_received) {
- /* Inform user about goaway received and the error code. */
- connection->on_goaway_received(
- &connection->base, last_stream, error_code, debug_data, connection->base.user_data);
- }
- return AWS_H2ERR_SUCCESS;
- }
- /* End decoder callbacks */
- static int s_send_connection_preface_client_string(struct aws_h2_connection *connection) {
- /* Just send the magic string on its own aws_io_message. */
- struct aws_io_message *msg = aws_channel_acquire_message_from_pool(
- connection->base.channel_slot->channel,
- AWS_IO_MESSAGE_APPLICATION_DATA,
- aws_h2_connection_preface_client_string.len);
- if (!msg) {
- goto error;
- }
- if (!aws_byte_buf_write_from_whole_cursor(&msg->message_data, aws_h2_connection_preface_client_string)) {
- aws_raise_error(AWS_ERROR_INVALID_STATE);
- goto error;
- }
- if (aws_channel_slot_send_message(connection->base.channel_slot, msg, AWS_CHANNEL_DIR_WRITE)) {
- goto error;
- }
- return AWS_OP_SUCCESS;
- error:
- if (msg) {
- aws_mem_release(msg->allocator, msg);
- }
- return AWS_OP_ERR;
- }
- static void s_handler_installed(struct aws_channel_handler *handler, struct aws_channel_slot *slot) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(slot->channel));
- struct aws_h2_connection *connection = handler->impl;
- connection->base.channel_slot = slot;
- /* Acquire a hold on the channel to prevent its destruction until the user has
- * given the go-ahead via aws_http_connection_release() */
- aws_channel_acquire_hold(slot->channel);
- /* Send HTTP/2 connection preface (RFC-7540 3.5)
- * - clients must send magic string
- * - both client and server must send SETTINGS frame */
- if (connection->base.client_data) {
- if (s_send_connection_preface_client_string(connection)) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Failed to send client connection preface string, %s",
- aws_error_name(aws_last_error()));
- goto error;
- }
- }
- struct aws_h2_pending_settings *init_pending_settings = connection->thread_data.init_pending_settings;
- aws_linked_list_push_back(&connection->thread_data.pending_settings_queue, &init_pending_settings->node);
- connection->thread_data.init_pending_settings = NULL;
- /* Set user_data here, the user_data is valid now */
- init_pending_settings->user_data = connection->base.user_data;
- struct aws_h2_frame *init_settings_frame = aws_h2_frame_new_settings(
- connection->base.alloc,
- init_pending_settings->settings_array,
- init_pending_settings->num_settings,
- false /*ACK*/);
- if (!init_settings_frame) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Failed to create the initial settings frame, error %s",
- aws_error_name(aws_last_error()));
- aws_mem_release(connection->base.alloc, init_pending_settings);
- goto error;
- }
- /* enqueue the initial settings frame here */
- aws_linked_list_push_back(&connection->thread_data.outgoing_frames_queue, &init_settings_frame->node);
- /* If not manual connection window management, update the connection window to max. */
- if (!connection->conn_manual_window_management) {
- uint32_t initial_window_update_size = AWS_H2_WINDOW_UPDATE_MAX - AWS_H2_INIT_WINDOW_SIZE;
- struct aws_h2_frame *connection_window_update_frame =
- aws_h2_frame_new_window_update(connection->base.alloc, 0 /* stream_id */, initial_window_update_size);
- AWS_ASSERT(connection_window_update_frame);
- /* enqueue the windows update frame here */
- aws_linked_list_push_back(
- &connection->thread_data.outgoing_frames_queue, &connection_window_update_frame->node);
- connection->thread_data.window_size_self += initial_window_update_size;
- }
- aws_h2_try_write_outgoing_frames(connection);
- return;
- error:
- aws_h2_connection_shutdown_due_to_write_err(connection, aws_last_error());
- }
- static void s_stream_complete(struct aws_h2_connection *connection, struct aws_h2_stream *stream, int error_code) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- /* Nice logging */
- if (error_code) {
- AWS_H2_STREAM_LOGF(
- ERROR, stream, "Stream completed with error %d (%s).", error_code, aws_error_name(error_code));
- } else if (stream->base.client_data) {
- int status = stream->base.client_data->response_status;
- AWS_H2_STREAM_LOGF(
- DEBUG, stream, "Client stream complete, response status %d (%s)", status, aws_http_status_text(status));
- } else {
- AWS_H2_STREAM_LOG(DEBUG, stream, "Server stream complete");
- }
- /* Remove stream from active_streams_map and outgoing_stream_list (if it was in them at all) */
- aws_hash_table_remove(&connection->thread_data.active_streams_map, (void *)(size_t)stream->base.id, NULL, NULL);
- if (stream->node.next) {
- aws_linked_list_remove(&stream->node);
- }
- if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0 &&
- connection->thread_data.incoming_timestamp_ns != 0) {
- uint64_t now_ns = 0;
- aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns);
- /* transition from something to read -> nothing to read and nothing to write */
- s_add_time_measurement_to_stats(
- connection->thread_data.incoming_timestamp_ns,
- now_ns,
- &connection->thread_data.stats.pending_incoming_stream_ms);
- connection->thread_data.stats.was_inactive = true;
- connection->thread_data.incoming_timestamp_ns = 0;
- }
- aws_h2_stream_complete(stream, error_code);
- /* release connection's hold on stream */
- aws_http_stream_release(&stream->base);
- }
- int aws_h2_connection_on_stream_closed(
- struct aws_h2_connection *connection,
- struct aws_h2_stream *stream,
- enum aws_h2_stream_closed_when closed_when,
- int aws_error_code) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- AWS_PRECONDITION(stream->thread_data.state == AWS_H2_STREAM_STATE_CLOSED);
- AWS_PRECONDITION(stream->base.id != 0);
- uint32_t stream_id = stream->base.id;
- /* Mark stream complete. This removes the stream from any "active" datastructures,
- * invokes its completion callback, and releases its refcount. */
- s_stream_complete(connection, stream, aws_error_code);
- stream = NULL; /* Reference released, do not touch again */
- if (s_record_closed_stream(connection, stream_id, closed_when)) {
- return AWS_OP_ERR;
- }
- return AWS_OP_SUCCESS;
- }
- static int s_record_closed_stream(
- struct aws_h2_connection *connection,
- uint32_t stream_id,
- enum aws_h2_stream_closed_when closed_when) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- if (aws_cache_put(connection->thread_data.closed_streams, (void *)(size_t)stream_id, (void *)(size_t)closed_when)) {
- CONNECTION_LOG(ERROR, connection, "Failed inserting ID into cache of recently closed streams");
- return AWS_OP_ERR;
- }
- return AWS_OP_SUCCESS;
- }
- int aws_h2_connection_send_rst_and_close_reserved_stream(
- struct aws_h2_connection *connection,
- uint32_t stream_id,
- uint32_t h2_error_code) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- struct aws_h2_frame *rst_stream = aws_h2_frame_new_rst_stream(connection->base.alloc, stream_id, h2_error_code);
- if (!rst_stream) {
- CONNECTION_LOGF(ERROR, connection, "Error creating RST_STREAM frame, %s", aws_error_name(aws_last_error()));
- return AWS_OP_ERR;
- }
- aws_h2_connection_enqueue_outgoing_frame(connection, rst_stream);
- /* If we ever fully support PUSH_PROMISE, this is where we'd remove the
- * promised_stream_id from some reserved_streams datastructure */
- return s_record_closed_stream(connection, stream_id, AWS_H2_STREAM_CLOSED_WHEN_RST_STREAM_SENT);
- }
- /* Move stream into "active" datastructures and notify stream that it can send frames now */
- static void s_move_stream_to_thread(
- struct aws_h2_connection *connection,
- struct aws_h2_stream *stream,
- int new_stream_error_code) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- if (new_stream_error_code) {
- aws_raise_error(new_stream_error_code);
- AWS_H2_STREAM_LOGF(
- ERROR,
- stream,
- "Failed activating stream, error %d (%s)",
- aws_last_error(),
- aws_error_name(aws_last_error()));
- goto error;
- }
- uint32_t max_concurrent_streams = connection->thread_data.settings_peer[AWS_HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS];
- if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) >= max_concurrent_streams) {
- AWS_H2_STREAM_LOG(ERROR, stream, "Failed activating stream, max concurrent streams are reached");
- aws_raise_error(AWS_ERROR_HTTP_MAX_CONCURRENT_STREAMS_EXCEEDED);
- goto error;
- }
- if (aws_hash_table_put(
- &connection->thread_data.active_streams_map, (void *)(size_t)stream->base.id, stream, NULL)) {
- AWS_H2_STREAM_LOG(ERROR, stream, "Failed inserting stream into map");
- goto error;
- }
- enum aws_h2_stream_body_state body_state = AWS_H2_STREAM_BODY_STATE_NONE;
- if (aws_h2_stream_on_activated(stream, &body_state)) {
- goto error;
- }
- if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 1) {
- /* transition from nothing to read -> something to read */
- uint64_t now_ns = 0;
- aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns);
- connection->thread_data.incoming_timestamp_ns = now_ns;
- }
- switch (body_state) {
- case AWS_H2_STREAM_BODY_STATE_WAITING_WRITES:
- aws_linked_list_push_back(&connection->thread_data.waiting_streams_list, &stream->node);
- break;
- case AWS_H2_STREAM_BODY_STATE_ONGOING:
- aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node);
- break;
- default:
- break;
- }
- return;
- error:
- /* If the stream got into any datastructures, s_stream_complete() will remove it */
- s_stream_complete(connection, stream, aws_last_error());
- }
- /* Perform on-thread work that is triggered by calls to the connection/stream API */
- static void s_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
- (void)task;
- if (status != AWS_TASK_STATUS_RUN_READY) {
- return;
- }
- struct aws_h2_connection *connection = arg;
- struct aws_linked_list pending_frames;
- aws_linked_list_init(&pending_frames);
- struct aws_linked_list pending_streams;
- aws_linked_list_init(&pending_streams);
- struct aws_linked_list pending_settings;
- aws_linked_list_init(&pending_settings);
- struct aws_linked_list pending_ping;
- aws_linked_list_init(&pending_ping);
- struct aws_linked_list pending_goaway;
- aws_linked_list_init(&pending_goaway);
- size_t window_update_size;
- int new_stream_error_code;
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- connection->synced_data.is_cross_thread_work_task_scheduled = false;
- aws_linked_list_swap_contents(&connection->synced_data.pending_frame_list, &pending_frames);
- aws_linked_list_swap_contents(&connection->synced_data.pending_stream_list, &pending_streams);
- aws_linked_list_swap_contents(&connection->synced_data.pending_settings_list, &pending_settings);
- aws_linked_list_swap_contents(&connection->synced_data.pending_ping_list, &pending_ping);
- aws_linked_list_swap_contents(&connection->synced_data.pending_goaway_list, &pending_goaway);
- window_update_size = connection->synced_data.window_update_size;
- connection->synced_data.window_update_size = 0;
- new_stream_error_code = connection->synced_data.new_stream_error_code;
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- /* Enqueue new pending control frames */
- while (!aws_linked_list_empty(&pending_frames)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&pending_frames);
- struct aws_h2_frame *frame = AWS_CONTAINER_OF(node, struct aws_h2_frame, node);
- aws_h2_connection_enqueue_outgoing_frame(connection, frame);
- }
- /* We already enqueued the window_update frame, just apply the change and let our peer check this value, no matter
- * overflow happens or not. Peer will detect it for us. */
- connection->thread_data.window_size_self =
- aws_add_size_saturating(connection->thread_data.window_size_self, window_update_size);
- /* Process new pending_streams */
- while (!aws_linked_list_empty(&pending_streams)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&pending_streams);
- struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node);
- s_move_stream_to_thread(connection, stream, new_stream_error_code);
- }
- /* Move pending settings to thread data */
- while (!aws_linked_list_empty(&pending_settings)) {
- aws_linked_list_push_back(
- &connection->thread_data.pending_settings_queue, aws_linked_list_pop_front(&pending_settings));
- }
- /* Move pending PING to thread data */
- while (!aws_linked_list_empty(&pending_ping)) {
- aws_linked_list_push_back(
- &connection->thread_data.pending_ping_queue, aws_linked_list_pop_front(&pending_ping));
- }
- /* Send user requested goaways */
- while (!aws_linked_list_empty(&pending_goaway)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&pending_goaway);
- struct aws_h2_pending_goaway *goaway = AWS_CONTAINER_OF(node, struct aws_h2_pending_goaway, node);
- s_send_goaway(connection, goaway->http2_error, goaway->allow_more_streams, &goaway->debug_data);
- aws_mem_release(connection->base.alloc, goaway);
- }
- /* It's likely that frames were queued while processing cross-thread work.
- * If so, try writing them now */
- aws_h2_try_write_outgoing_frames(connection);
- }
- int aws_h2_stream_activate(struct aws_http_stream *stream) {
- struct aws_h2_stream *h2_stream = AWS_CONTAINER_OF(stream, struct aws_h2_stream, base);
- struct aws_http_connection *base_connection = stream->owning_connection;
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(base_connection, struct aws_h2_connection, base);
- int err;
- bool was_cross_thread_work_scheduled = false;
- { /* BEGIN CRITICAL SECTION */
- s_acquire_stream_and_connection_lock(h2_stream, connection);
- if (stream->id) {
- /* stream has already been activated. */
- s_release_stream_and_connection_lock(h2_stream, connection);
- return AWS_OP_SUCCESS;
- }
- err = connection->synced_data.new_stream_error_code;
- if (err) {
- s_release_stream_and_connection_lock(h2_stream, connection);
- goto error;
- }
- stream->id = aws_http_connection_get_next_stream_id(base_connection);
- if (stream->id) {
- /* success */
- was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled;
- connection->synced_data.is_cross_thread_work_task_scheduled = true;
- aws_linked_list_push_back(&connection->synced_data.pending_stream_list, &h2_stream->node);
- h2_stream->synced_data.api_state = AWS_H2_STREAM_API_STATE_ACTIVE;
- }
- s_release_stream_and_connection_lock(h2_stream, connection);
- } /* END CRITICAL SECTION */
- if (!stream->id) {
- /* aws_http_connection_get_next_stream_id() raises its own error. */
- return AWS_OP_ERR;
- }
- /* connection keeps activated stream alive until stream completes */
- aws_atomic_fetch_add(&stream->refcount, 1);
- if (!was_cross_thread_work_scheduled) {
- CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
- aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
- }
- return AWS_OP_SUCCESS;
- error:
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Failed to activate the stream id=%p, new streams are not allowed now. error %d (%s)",
- (void *)stream,
- err,
- aws_error_name(err));
- return aws_raise_error(err);
- }
- static struct aws_http_stream *s_connection_make_request(
- struct aws_http_connection *client_connection,
- const struct aws_http_make_request_options *options) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(client_connection, struct aws_h2_connection, base);
- /* #TODO: http/2-ify the request (ex: add ":method" header). Should we mutate a copy or the original? Validate?
- * Or just pass pointer to headers struct and let encoder transform it while encoding? */
- struct aws_h2_stream *stream = aws_h2_stream_new_request(client_connection, options);
- if (!stream) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Failed to create stream, error %d (%s)",
- aws_last_error(),
- aws_error_name(aws_last_error()));
- return NULL;
- }
- int new_stream_error_code;
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- new_stream_error_code = connection->synced_data.new_stream_error_code;
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- if (new_stream_error_code) {
- aws_raise_error(new_stream_error_code);
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Cannot create request stream, error %d (%s)",
- aws_last_error(),
- aws_error_name(aws_last_error()));
- goto error;
- }
- AWS_H2_STREAM_LOG(DEBUG, stream, "Created HTTP/2 request stream"); /* #TODO: print method & path */
- return &stream->base;
- error:
- /* Force destruction of the stream, avoiding ref counting */
- stream->base.vtable->destroy(&stream->base);
- return NULL;
- }
- static void s_connection_close(struct aws_http_connection *connection_base) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
- /* Don't stop reading/writing immediately, let that happen naturally during the channel shutdown process. */
- s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, AWS_ERROR_SUCCESS);
- }
- static void s_connection_stop_new_request(struct aws_http_connection *connection_base) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- if (!connection->synced_data.new_stream_error_code) {
- connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
- }
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- }
- static bool s_connection_is_open(const struct aws_http_connection *connection_base) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
- bool is_open;
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- is_open = connection->synced_data.is_open;
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- return is_open;
- }
- static bool s_connection_new_requests_allowed(const struct aws_http_connection *connection_base) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
- int new_stream_error_code;
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- new_stream_error_code = connection->synced_data.new_stream_error_code;
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- return new_stream_error_code == 0;
- }
- static void s_connection_update_window(struct aws_http_connection *connection_base, uint32_t increment_size) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
- if (!increment_size) {
- /* Silently do nothing. */
- return;
- }
- if (!connection->conn_manual_window_management) {
- /* auto-mode, manual update window is not supported, silently do nothing with warning log. */
- CONNECTION_LOG(
- DEBUG,
- connection,
- "Connection manual window management is off, update window operations are not supported.");
- return;
- }
- struct aws_h2_frame *connection_window_update_frame =
- aws_h2_frame_new_window_update(connection->base.alloc, 0, increment_size);
- if (!connection_window_update_frame) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Failed to create WINDOW_UPDATE frame on connection, error %s",
- aws_error_name(aws_last_error()));
- /* OOM should result in a crash. And the increment size is too huge is the only other failure case, which will
- * result in overflow. */
- goto overflow;
- }
- int err = 0;
- bool cross_thread_work_should_schedule = false;
- bool connection_open = false;
- size_t sum_size = 0;
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- err |= aws_add_size_checked(connection->synced_data.window_update_size, increment_size, &sum_size);
- err |= sum_size > AWS_H2_WINDOW_UPDATE_MAX;
- connection_open = connection->synced_data.is_open;
- if (!err && connection_open) {
- cross_thread_work_should_schedule = !connection->synced_data.is_cross_thread_work_task_scheduled;
- connection->synced_data.is_cross_thread_work_task_scheduled = true;
- aws_linked_list_push_back(
- &connection->synced_data.pending_frame_list, &connection_window_update_frame->node);
- connection->synced_data.window_update_size = sum_size;
- }
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- if (err) {
- CONNECTION_LOG(
- ERROR,
- connection,
- "The connection's flow-control windows has been incremented beyond 2**31 -1, the max for HTTP/2. The ");
- aws_h2_frame_destroy(connection_window_update_frame);
- goto overflow;
- }
- if (cross_thread_work_should_schedule) {
- CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
- aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
- }
- if (!connection_open) {
- /* connection already closed, just do nothing */
- aws_h2_frame_destroy(connection_window_update_frame);
- return;
- }
- CONNECTION_LOGF(
- TRACE,
- connection,
- "User requested to update the HTTP/2 connection's flow-control windows by %" PRIu32 ".",
- increment_size);
- return;
- overflow:
- /* Shutdown the connection as overflow detected */
- s_stop(
- connection,
- false /*stop_reading*/,
- false /*stop_writing*/,
- true /*schedule_shutdown*/,
- AWS_ERROR_OVERFLOW_DETECTED);
- }
- static int s_connection_change_settings(
- struct aws_http_connection *connection_base,
- const struct aws_http2_setting *settings_array,
- size_t num_settings,
- aws_http2_on_change_settings_complete_fn *on_completed,
- void *user_data) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
- if (!settings_array && num_settings) {
- CONNECTION_LOG(ERROR, connection, "Settings_array is NULL and num_settings is not zero.");
- return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- }
- struct aws_h2_pending_settings *pending_settings =
- s_new_pending_settings(connection->base.alloc, settings_array, num_settings, on_completed, user_data);
- if (!pending_settings) {
- return AWS_OP_ERR;
- }
- struct aws_h2_frame *settings_frame =
- aws_h2_frame_new_settings(connection->base.alloc, settings_array, num_settings, false /*ACK*/);
- if (!settings_frame) {
- CONNECTION_LOGF(
- ERROR, connection, "Failed to create settings frame, error %s", aws_error_name(aws_last_error()));
- aws_mem_release(connection->base.alloc, pending_settings);
- return AWS_OP_ERR;
- }
- bool was_cross_thread_work_scheduled = false;
- bool connection_open;
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- connection_open = connection->synced_data.is_open;
- if (!connection_open) {
- s_unlock_synced_data(connection);
- goto closed;
- }
- was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled;
- connection->synced_data.is_cross_thread_work_task_scheduled = true;
- aws_linked_list_push_back(&connection->synced_data.pending_frame_list, &settings_frame->node);
- aws_linked_list_push_back(&connection->synced_data.pending_settings_list, &pending_settings->node);
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- if (!was_cross_thread_work_scheduled) {
- CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
- aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
- }
- return AWS_OP_SUCCESS;
- closed:
- CONNECTION_LOG(ERROR, connection, "Failed to change settings, connection is closed or closing.");
- aws_h2_frame_destroy(settings_frame);
- aws_mem_release(connection->base.alloc, pending_settings);
- return aws_raise_error(AWS_ERROR_INVALID_STATE);
- }
- static int s_connection_send_ping(
- struct aws_http_connection *connection_base,
- const struct aws_byte_cursor *optional_opaque_data,
- aws_http2_on_ping_complete_fn *on_completed,
- void *user_data) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
- if (optional_opaque_data && optional_opaque_data->len != 8) {
- CONNECTION_LOG(ERROR, connection, "Only 8 bytes opaque data supported for PING in HTTP/2");
- return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- }
- uint64_t time_stamp;
- if (aws_high_res_clock_get_ticks(&time_stamp)) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Failed getting the time stamp to start PING, error %s",
- aws_error_name(aws_last_error()));
- return AWS_OP_ERR;
- }
- struct aws_h2_pending_ping *pending_ping =
- s_new_pending_ping(connection->base.alloc, optional_opaque_data, time_stamp, user_data, on_completed);
- if (!pending_ping) {
- return AWS_OP_ERR;
- }
- struct aws_h2_frame *ping_frame =
- aws_h2_frame_new_ping(connection->base.alloc, false /*ACK*/, pending_ping->opaque_data);
- if (!ping_frame) {
- CONNECTION_LOGF(ERROR, connection, "Failed to create PING frame, error %s", aws_error_name(aws_last_error()));
- aws_mem_release(connection->base.alloc, pending_ping);
- return AWS_OP_ERR;
- }
- bool was_cross_thread_work_scheduled = false;
- bool connection_open;
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- connection_open = connection->synced_data.is_open;
- if (!connection_open) {
- s_unlock_synced_data(connection);
- goto closed;
- }
- was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled;
- connection->synced_data.is_cross_thread_work_task_scheduled = true;
- aws_linked_list_push_back(&connection->synced_data.pending_frame_list, &ping_frame->node);
- aws_linked_list_push_back(&connection->synced_data.pending_ping_list, &pending_ping->node);
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- if (!was_cross_thread_work_scheduled) {
- CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
- aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
- }
- return AWS_OP_SUCCESS;
- closed:
- CONNECTION_LOG(ERROR, connection, "Failed to send ping, connection is closed or closing.");
- aws_h2_frame_destroy(ping_frame);
- aws_mem_release(connection->base.alloc, pending_ping);
- return aws_raise_error(AWS_ERROR_INVALID_STATE);
- }
- static void s_connection_send_goaway(
- struct aws_http_connection *connection_base,
- uint32_t http2_error,
- bool allow_more_streams,
- const struct aws_byte_cursor *optional_debug_data) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
- struct aws_h2_pending_goaway *pending_goaway =
- s_new_pending_goaway(connection->base.alloc, http2_error, allow_more_streams, optional_debug_data);
- bool was_cross_thread_work_scheduled = false;
- bool connection_open;
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- connection_open = connection->synced_data.is_open;
- if (!connection_open) {
- s_unlock_synced_data(connection);
- CONNECTION_LOG(DEBUG, connection, "Goaway not sent, connection is closed or closing.");
- aws_mem_release(connection->base.alloc, pending_goaway);
- return;
- }
- was_cross_thread_work_scheduled = connection->synced_data.is_cross_thread_work_task_scheduled;
- connection->synced_data.is_cross_thread_work_task_scheduled = true;
- aws_linked_list_push_back(&connection->synced_data.pending_goaway_list, &pending_goaway->node);
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- if (allow_more_streams && (http2_error != AWS_HTTP2_ERR_NO_ERROR)) {
- CONNECTION_LOGF(
- DEBUG,
- connection,
- "Send goaway with allow more streams on and non-zero error code %s(0x%x)",
- aws_http2_error_code_to_str(http2_error),
- http2_error);
- }
- if (!was_cross_thread_work_scheduled) {
- CONNECTION_LOG(TRACE, connection, "Scheduling cross-thread work task");
- aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
- }
- }
- static void s_get_settings_general(
- const struct aws_http_connection *connection_base,
- struct aws_http2_setting out_settings[AWS_HTTP2_SETTINGS_COUNT],
- bool local) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
- uint32_t synced_settings[AWS_HTTP2_SETTINGS_END_RANGE];
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- if (local) {
- memcpy(
- synced_settings, connection->synced_data.settings_self, sizeof(connection->synced_data.settings_self));
- } else {
- memcpy(
- synced_settings, connection->synced_data.settings_peer, sizeof(connection->synced_data.settings_peer));
- }
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- for (int i = AWS_HTTP2_SETTINGS_BEGIN_RANGE; i < AWS_HTTP2_SETTINGS_END_RANGE; i++) {
- /* settings range begin with 1, store them into 0-based array of aws_http2_setting */
- out_settings[i - 1].id = i;
- out_settings[i - 1].value = synced_settings[i];
- }
- return;
- }
- static void s_connection_get_local_settings(
- const struct aws_http_connection *connection_base,
- struct aws_http2_setting out_settings[AWS_HTTP2_SETTINGS_COUNT]) {
- s_get_settings_general(connection_base, out_settings, true /*local*/);
- }
- static void s_connection_get_remote_settings(
- const struct aws_http_connection *connection_base,
- struct aws_http2_setting out_settings[AWS_HTTP2_SETTINGS_COUNT]) {
- s_get_settings_general(connection_base, out_settings, false /*local*/);
- }
- /* Send a GOAWAY with the lowest possible last-stream-id or graceful shutdown warning */
- static void s_send_goaway(
- struct aws_h2_connection *connection,
- uint32_t h2_error_code,
- bool allow_more_streams,
- const struct aws_byte_cursor *optional_debug_data) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- uint32_t last_stream_id = allow_more_streams ? AWS_H2_STREAM_ID_MAX
- : aws_min_u32(
- connection->thread_data.latest_peer_initiated_stream_id,
- connection->thread_data.goaway_sent_last_stream_id);
- if (last_stream_id > connection->thread_data.goaway_sent_last_stream_id) {
- CONNECTION_LOG(
- DEBUG,
- connection,
- "GOAWAY frame with lower last stream id has been sent, ignoring sending graceful shutdown warning.");
- return;
- }
- struct aws_byte_cursor debug_data;
- AWS_ZERO_STRUCT(debug_data);
- if (optional_debug_data) {
- debug_data = *optional_debug_data;
- }
- struct aws_h2_frame *goaway =
- aws_h2_frame_new_goaway(connection->base.alloc, last_stream_id, h2_error_code, debug_data);
- if (!goaway) {
- CONNECTION_LOGF(ERROR, connection, "Error creating GOAWAY frame, %s", aws_error_name(aws_last_error()));
- goto error;
- }
- connection->thread_data.goaway_sent_last_stream_id = last_stream_id;
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- connection->synced_data.goaway_sent_last_stream_id = last_stream_id;
- connection->synced_data.goaway_sent_http2_error_code = h2_error_code;
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- aws_h2_connection_enqueue_outgoing_frame(connection, goaway);
- return;
- error:
- aws_h2_connection_shutdown_due_to_write_err(connection, aws_last_error());
- }
- static int s_connection_get_sent_goaway(
- struct aws_http_connection *connection_base,
- uint32_t *out_http2_error,
- uint32_t *out_last_stream_id) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
- uint32_t sent_last_stream_id;
- uint32_t sent_http2_error;
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- sent_last_stream_id = connection->synced_data.goaway_sent_last_stream_id;
- sent_http2_error = connection->synced_data.goaway_sent_http2_error_code;
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- uint32_t max_stream_id = AWS_H2_STREAM_ID_MAX;
- if (sent_last_stream_id == max_stream_id + 1) {
- CONNECTION_LOG(ERROR, connection, "No GOAWAY has been sent so far.");
- return aws_raise_error(AWS_ERROR_INVALID_STATE);
- }
- *out_http2_error = sent_http2_error;
- *out_last_stream_id = sent_last_stream_id;
- return AWS_OP_SUCCESS;
- }
- static int s_connection_get_received_goaway(
- struct aws_http_connection *connection_base,
- uint32_t *out_http2_error,
- uint32_t *out_last_stream_id) {
- struct aws_h2_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h2_connection, base);
- uint32_t received_last_stream_id = 0;
- uint32_t received_http2_error = 0;
- bool goaway_not_ready = false;
- uint32_t max_stream_id = AWS_H2_STREAM_ID_MAX;
- { /* BEGIN CRITICAL SECTION */
- s_lock_synced_data(connection);
- if (connection->synced_data.goaway_received_last_stream_id == max_stream_id + 1) {
- goaway_not_ready = true;
- } else {
- received_last_stream_id = connection->synced_data.goaway_received_last_stream_id;
- received_http2_error = connection->synced_data.goaway_received_http2_error_code;
- }
- s_unlock_synced_data(connection);
- } /* END CRITICAL SECTION */
- if (goaway_not_ready) {
- CONNECTION_LOG(ERROR, connection, "No GOAWAY has been received so far.");
- return aws_raise_error(AWS_ERROR_INVALID_STATE);
- }
- *out_http2_error = received_http2_error;
- *out_last_stream_id = received_last_stream_id;
- return AWS_OP_SUCCESS;
- }
- static int s_handler_process_read_message(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- struct aws_io_message *message) {
- (void)slot;
- struct aws_h2_connection *connection = handler->impl;
- CONNECTION_LOGF(TRACE, connection, "Begin processing message of size %zu.", message->message_data.len);
- if (connection->thread_data.is_reading_stopped) {
- CONNECTION_LOG(ERROR, connection, "Cannot process message because connection is shutting down.");
- goto clean_up;
- }
- /* Any error that bubbles up from the decoder or its callbacks is treated as
- * a Connection Error (a GOAWAY frames is sent, and the connection is closed) */
- struct aws_byte_cursor message_cursor = aws_byte_cursor_from_buf(&message->message_data);
- struct aws_h2err err = aws_h2_decode(connection->thread_data.decoder, &message_cursor);
- if (aws_h2err_failed(err)) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Failure while receiving frames, %s. Sending GOAWAY %s(0x%x) and closing connection",
- aws_error_name(err.aws_code),
- aws_http2_error_code_to_str(err.h2_code),
- err.h2_code);
- goto shutdown;
- }
- /* HTTP/2 protocol uses WINDOW_UPDATE frames to coordinate data rates with peer,
- * so we can just keep the aws_channel's read-window wide open */
- if (aws_channel_slot_increment_read_window(slot, message->message_data.len)) {
- CONNECTION_LOGF(
- ERROR,
- connection,
- "Incrementing read window failed, error %d (%s). Closing connection",
- aws_last_error(),
- aws_error_name(aws_last_error()));
- err = aws_h2err_from_last_error();
- goto shutdown;
- }
- goto clean_up;
- shutdown:
- s_send_goaway(connection, err.h2_code, false /*allow_more_streams*/, NULL /*optional_debug_data*/);
- aws_h2_try_write_outgoing_frames(connection);
- s_stop(connection, true /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, err.aws_code);
- clean_up:
- aws_mem_release(message->allocator, message);
- /* Flush any outgoing frames that might have been queued as a result of decoder callbacks. */
- aws_h2_try_write_outgoing_frames(connection);
- return AWS_OP_SUCCESS;
- }
- static int s_handler_process_write_message(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- struct aws_io_message *message) {
- (void)handler;
- (void)slot;
- (void)message;
- return aws_raise_error(AWS_ERROR_UNIMPLEMENTED);
- }
- static int s_handler_increment_read_window(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- size_t size) {
- (void)handler;
- (void)slot;
- (void)size;
- return aws_raise_error(AWS_ERROR_UNIMPLEMENTED);
- }
- static int s_handler_shutdown(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- enum aws_channel_direction dir,
- int error_code,
- bool free_scarce_resources_immediately) {
- struct aws_h2_connection *connection = handler->impl;
- CONNECTION_LOGF(
- TRACE,
- connection,
- "Channel shutting down in %s direction with error code %d (%s).",
- (dir == AWS_CHANNEL_DIR_READ) ? "read" : "write",
- error_code,
- aws_error_name(error_code));
- if (dir == AWS_CHANNEL_DIR_READ) {
- /* This call ensures that no further streams will be created. */
- s_stop(connection, true /*stop_reading*/, false /*stop_writing*/, false /*schedule_shutdown*/, error_code);
- /* Send user requested GOAWAY, if they haven't been sent before. It's OK to access
- * synced_data.pending_goaway_list without holding the lock because no more user_requested GOAWAY can be added
- * after s_stop() has been invoked. */
- if (!aws_linked_list_empty(&connection->synced_data.pending_goaway_list)) {
- while (!aws_linked_list_empty(&connection->synced_data.pending_goaway_list)) {
- struct aws_linked_list_node *node =
- aws_linked_list_pop_front(&connection->synced_data.pending_goaway_list);
- struct aws_h2_pending_goaway *goaway = AWS_CONTAINER_OF(node, struct aws_h2_pending_goaway, node);
- s_send_goaway(connection, goaway->http2_error, goaway->allow_more_streams, &goaway->debug_data);
- aws_mem_release(connection->base.alloc, goaway);
- }
- aws_h2_try_write_outgoing_frames(connection);
- }
- /* Send GOAWAY if none have been sent so far,
- * or if we've only sent a "graceful shutdown warning" that didn't name a last-stream-id */
- if (connection->thread_data.goaway_sent_last_stream_id == AWS_H2_STREAM_ID_MAX) {
- s_send_goaway(
- connection,
- error_code ? AWS_HTTP2_ERR_INTERNAL_ERROR : AWS_HTTP2_ERR_NO_ERROR,
- false /*allow_more_streams*/,
- NULL /*optional_debug_data*/);
- aws_h2_try_write_outgoing_frames(connection);
- }
- aws_channel_slot_on_handler_shutdown_complete(
- slot, AWS_CHANNEL_DIR_READ, error_code, free_scarce_resources_immediately);
- } else /* AWS_CHANNEL_DIR_WRITE */ {
- connection->thread_data.channel_shutdown_error_code = error_code;
- connection->thread_data.channel_shutdown_immediately = free_scarce_resources_immediately;
- connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written = true;
- /* We'd prefer to wait until we know GOAWAY has been written, but don't wait if... */
- if (free_scarce_resources_immediately /* we must finish ASAP */ ||
- connection->thread_data.is_writing_stopped /* write will never complete */ ||
- !connection->thread_data.is_outgoing_frames_task_active /* write is already complete */) {
- s_finish_shutdown(connection);
- } else {
- CONNECTION_LOG(TRACE, connection, "HTTP/2 handler will finish shutdown once GOAWAY frame is written");
- }
- }
- return AWS_OP_SUCCESS;
- }
- static void s_finish_shutdown(struct aws_h2_connection *connection) {
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- AWS_PRECONDITION(connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written);
- CONNECTION_LOG(TRACE, connection, "Finishing HTTP/2 handler shutdown");
- connection->thread_data.channel_shutdown_waiting_for_goaway_to_be_written = false;
- s_stop(
- connection,
- false /*stop_reading*/,
- true /*stop_writing*/,
- false /*schedule_shutdown*/,
- connection->thread_data.channel_shutdown_error_code);
- /* Remove remaining streams from internal datastructures and mark them as complete. */
- struct aws_hash_iter stream_iter = aws_hash_iter_begin(&connection->thread_data.active_streams_map);
- while (!aws_hash_iter_done(&stream_iter)) {
- struct aws_h2_stream *stream = stream_iter.element.value;
- aws_hash_iter_delete(&stream_iter, true);
- aws_hash_iter_next(&stream_iter);
- s_stream_complete(connection, stream, AWS_ERROR_HTTP_CONNECTION_CLOSED);
- }
- /* It's OK to access synced_data without holding the lock because
- * no more streams or user-requested control frames can be added after s_stop() has been invoked. */
- while (!aws_linked_list_empty(&connection->synced_data.pending_stream_list)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->synced_data.pending_stream_list);
- struct aws_h2_stream *stream = AWS_CONTAINER_OF(node, struct aws_h2_stream, node);
- s_stream_complete(connection, stream, AWS_ERROR_HTTP_CONNECTION_CLOSED);
- }
- while (!aws_linked_list_empty(&connection->synced_data.pending_frame_list)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->synced_data.pending_frame_list);
- struct aws_h2_frame *frame = AWS_CONTAINER_OF(node, struct aws_h2_frame, node);
- aws_h2_frame_destroy(frame);
- }
- /* invoke pending callbacks haven't moved into thread, and clean up the data */
- while (!aws_linked_list_empty(&connection->synced_data.pending_settings_list)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->synced_data.pending_settings_list);
- struct aws_h2_pending_settings *settings = AWS_CONTAINER_OF(node, struct aws_h2_pending_settings, node);
- if (settings->on_completed) {
- settings->on_completed(&connection->base, AWS_ERROR_HTTP_CONNECTION_CLOSED, settings->user_data);
- }
- aws_mem_release(connection->base.alloc, settings);
- }
- while (!aws_linked_list_empty(&connection->synced_data.pending_ping_list)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->synced_data.pending_ping_list);
- struct aws_h2_pending_ping *ping = AWS_CONTAINER_OF(node, struct aws_h2_pending_ping, node);
- if (ping->on_completed) {
- ping->on_completed(&connection->base, 0 /*fake rtt*/, AWS_ERROR_HTTP_CONNECTION_CLOSED, ping->user_data);
- }
- aws_mem_release(connection->base.alloc, ping);
- }
- /* invoke pending callbacks moved into thread, and clean up the data */
- while (!aws_linked_list_empty(&connection->thread_data.pending_settings_queue)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->thread_data.pending_settings_queue);
- struct aws_h2_pending_settings *pending_settings = AWS_CONTAINER_OF(node, struct aws_h2_pending_settings, node);
- /* fire the user callback with error */
- if (pending_settings->on_completed) {
- pending_settings->on_completed(
- &connection->base, AWS_ERROR_HTTP_CONNECTION_CLOSED, pending_settings->user_data);
- }
- aws_mem_release(connection->base.alloc, pending_settings);
- }
- while (!aws_linked_list_empty(&connection->thread_data.pending_ping_queue)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->thread_data.pending_ping_queue);
- struct aws_h2_pending_ping *pending_ping = AWS_CONTAINER_OF(node, struct aws_h2_pending_ping, node);
- /* fire the user callback with error */
- if (pending_ping->on_completed) {
- pending_ping->on_completed(
- &connection->base, 0 /*fake rtt*/, AWS_ERROR_HTTP_CONNECTION_CLOSED, pending_ping->user_data);
- }
- aws_mem_release(connection->base.alloc, pending_ping);
- }
- aws_channel_slot_on_handler_shutdown_complete(
- connection->base.channel_slot,
- AWS_CHANNEL_DIR_WRITE,
- connection->thread_data.channel_shutdown_error_code,
- connection->thread_data.channel_shutdown_immediately);
- }
- static size_t s_handler_initial_window_size(struct aws_channel_handler *handler) {
- (void)handler;
- /* HTTP/2 protocol uses WINDOW_UPDATE frames to coordinate data rates with peer,
- * so we can just keep the aws_channel's read-window wide open */
- return SIZE_MAX;
- }
- static size_t s_handler_message_overhead(struct aws_channel_handler *handler) {
- (void)handler;
- /* "All frames begin with a fixed 9-octet header followed by a variable-length payload" (RFC-7540 4.1) */
- return 9;
- }
- static void s_reset_statistics(struct aws_channel_handler *handler) {
- struct aws_h2_connection *connection = handler->impl;
- aws_crt_statistics_http2_channel_reset(&connection->thread_data.stats);
- if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0) {
- /* Check the current state */
- connection->thread_data.stats.was_inactive = true;
- }
- return;
- }
- static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_array_list *stats) {
- struct aws_h2_connection *connection = handler->impl;
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
- /* TODO: Need update the way we calculate statistics, to account for user-controlled pauses.
- * If user is adding chunks 1 by 1, there can naturally be a gap in the upload.
- * If the user lets the stream-window go to zero, there can naturally be a gap in the download. */
- uint64_t now_ns = 0;
- if (aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns)) {
- return;
- }
- if (!aws_linked_list_empty(&connection->thread_data.outgoing_streams_list)) {
- s_add_time_measurement_to_stats(
- connection->thread_data.outgoing_timestamp_ns,
- now_ns,
- &connection->thread_data.stats.pending_outgoing_stream_ms);
- connection->thread_data.outgoing_timestamp_ns = now_ns;
- }
- if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) != 0) {
- s_add_time_measurement_to_stats(
- connection->thread_data.incoming_timestamp_ns,
- now_ns,
- &connection->thread_data.stats.pending_incoming_stream_ms);
- connection->thread_data.incoming_timestamp_ns = now_ns;
- } else {
- connection->thread_data.stats.was_inactive = true;
- }
- void *stats_base = &connection->thread_data.stats;
- aws_array_list_push_back(stats, &stats_base);
- }
|