h1_connection.c 83 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/common/clock.h>
  6. #include <aws/common/math.h>
  7. #include <aws/common/mutex.h>
  8. #include <aws/common/string.h>
  9. #include <aws/http/private/h1_connection.h>
  10. #include <aws/http/private/h1_decoder.h>
  11. #include <aws/http/private/h1_stream.h>
  12. #include <aws/http/private/request_response_impl.h>
  13. #include <aws/http/status_code.h>
  14. #include <aws/io/logging.h>
  15. #include <inttypes.h>
  16. #ifdef _MSC_VER
  17. # pragma warning(disable : 4204) /* non-constant aggregate initializer */
  18. #endif
  19. enum {
  20. DECODER_INITIAL_SCRATCH_SIZE = 256,
  21. };
  22. static int s_handler_process_read_message(
  23. struct aws_channel_handler *handler,
  24. struct aws_channel_slot *slot,
  25. struct aws_io_message *message);
  26. static int s_handler_process_write_message(
  27. struct aws_channel_handler *handler,
  28. struct aws_channel_slot *slot,
  29. struct aws_io_message *message);
  30. static int s_handler_increment_read_window(
  31. struct aws_channel_handler *handler,
  32. struct aws_channel_slot *slot,
  33. size_t size);
  34. static int s_handler_shutdown(
  35. struct aws_channel_handler *handler,
  36. struct aws_channel_slot *slot,
  37. enum aws_channel_direction dir,
  38. int error_code,
  39. bool free_scarce_resources_immediately);
  40. static size_t s_handler_initial_window_size(struct aws_channel_handler *handler);
  41. static size_t s_handler_message_overhead(struct aws_channel_handler *handler);
  42. static void s_handler_destroy(struct aws_channel_handler *handler);
  43. static void s_handler_installed(struct aws_channel_handler *handler, struct aws_channel_slot *slot);
  44. static struct aws_http_stream *s_make_request(
  45. struct aws_http_connection *client_connection,
  46. const struct aws_http_make_request_options *options);
  47. static struct aws_http_stream *s_new_server_request_handler_stream(
  48. const struct aws_http_request_handler_options *options);
  49. static int s_stream_send_response(struct aws_http_stream *stream, struct aws_http_message *response);
  50. static void s_connection_close(struct aws_http_connection *connection_base);
  51. static void s_connection_stop_new_request(struct aws_http_connection *connection_base);
  52. static bool s_connection_is_open(const struct aws_http_connection *connection_base);
  53. static bool s_connection_new_requests_allowed(const struct aws_http_connection *connection_base);
  54. static int s_decoder_on_request(
  55. enum aws_http_method method_enum,
  56. const struct aws_byte_cursor *method_str,
  57. const struct aws_byte_cursor *uri,
  58. void *user_data);
  59. static int s_decoder_on_response(int status_code, void *user_data);
  60. static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void *user_data);
  61. static int s_decoder_on_body(const struct aws_byte_cursor *data, bool finished, void *user_data);
  62. static int s_decoder_on_done(void *user_data);
  63. static void s_reset_statistics(struct aws_channel_handler *handler);
  64. static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_array_list *stats);
  65. static void s_write_outgoing_stream(struct aws_h1_connection *connection, bool first_try);
  66. static int s_try_process_next_stream_read_message(struct aws_h1_connection *connection, bool *out_stop_processing);
  67. static struct aws_http_connection_vtable s_h1_connection_vtable = {
  68. .channel_handler_vtable =
  69. {
  70. .process_read_message = s_handler_process_read_message,
  71. .process_write_message = s_handler_process_write_message,
  72. .increment_read_window = s_handler_increment_read_window,
  73. .shutdown = s_handler_shutdown,
  74. .initial_window_size = s_handler_initial_window_size,
  75. .message_overhead = s_handler_message_overhead,
  76. .destroy = s_handler_destroy,
  77. .reset_statistics = s_reset_statistics,
  78. .gather_statistics = s_gather_statistics,
  79. },
  80. .on_channel_handler_installed = s_handler_installed,
  81. .make_request = s_make_request,
  82. .new_server_request_handler_stream = s_new_server_request_handler_stream,
  83. .stream_send_response = s_stream_send_response,
  84. .close = s_connection_close,
  85. .stop_new_requests = s_connection_stop_new_request,
  86. .is_open = s_connection_is_open,
  87. .new_requests_allowed = s_connection_new_requests_allowed,
  88. .change_settings = NULL,
  89. .send_ping = NULL,
  90. .send_goaway = NULL,
  91. .get_sent_goaway = NULL,
  92. .get_received_goaway = NULL,
  93. .get_local_settings = NULL,
  94. .get_remote_settings = NULL,
  95. };
  96. static const struct aws_h1_decoder_vtable s_h1_decoder_vtable = {
  97. .on_request = s_decoder_on_request,
  98. .on_response = s_decoder_on_response,
  99. .on_header = s_decoder_on_header,
  100. .on_body = s_decoder_on_body,
  101. .on_done = s_decoder_on_done,
  102. };
  103. void aws_h1_connection_lock_synced_data(struct aws_h1_connection *connection) {
  104. int err = aws_mutex_lock(&connection->synced_data.lock);
  105. AWS_ASSERT(!err);
  106. (void)err;
  107. }
  108. void aws_h1_connection_unlock_synced_data(struct aws_h1_connection *connection) {
  109. int err = aws_mutex_unlock(&connection->synced_data.lock);
  110. AWS_ASSERT(!err);
  111. (void)err;
  112. }
  113. /**
  114. * Internal function for bringing connection to a stop.
  115. * Invoked multiple times, including when:
  116. * - Channel is shutting down in the read direction.
  117. * - Channel is shutting down in the write direction.
  118. * - An error occurs.
  119. * - User wishes to close the connection (this is the only case where the function may run off-thread).
  120. */
  121. static void s_stop(
  122. struct aws_h1_connection *connection,
  123. bool stop_reading,
  124. bool stop_writing,
  125. bool schedule_shutdown,
  126. int error_code) {
  127. AWS_ASSERT(stop_reading || stop_writing || schedule_shutdown); /* You are required to stop at least 1 thing */
  128. if (stop_reading) {
  129. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  130. connection->thread_data.is_reading_stopped = true;
  131. }
  132. if (stop_writing) {
  133. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  134. connection->thread_data.is_writing_stopped = true;
  135. }
  136. { /* BEGIN CRITICAL SECTION */
  137. aws_h1_connection_lock_synced_data(connection);
  138. /* Even if we're not scheduling shutdown just yet (ex: sent final request but waiting to read final response)
  139. * we don't consider the connection "open" anymore so user can't create more streams */
  140. connection->synced_data.is_open = false;
  141. connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
  142. aws_h1_connection_unlock_synced_data(connection);
  143. } /* END CRITICAL SECTION */
  144. if (schedule_shutdown) {
  145. AWS_LOGF_INFO(
  146. AWS_LS_HTTP_CONNECTION,
  147. "id=%p: Shutting down connection with error code %d (%s).",
  148. (void *)&connection->base,
  149. error_code,
  150. aws_error_name(error_code));
  151. aws_channel_shutdown(connection->base.channel_slot->channel, error_code);
  152. }
  153. }
  154. static void s_shutdown_due_to_error(struct aws_h1_connection *connection, int error_code) {
  155. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  156. if (!error_code) {
  157. error_code = AWS_ERROR_UNKNOWN;
  158. }
  159. /* Stop reading AND writing if an error occurs.
  160. *
  161. * It doesn't currently seem worth the complexity to distinguish between read errors and write errors.
  162. * The only scenarios that would benefit from this are pipelining scenarios (ex: A server
  163. * could continue sending a response to request A if there was an error reading request B).
  164. * But pipelining in HTTP/1.1 is known to be fragile with regards to errors, so let's just keep it simple.
  165. */
  166. s_stop(connection, true /*stop_reading*/, true /*stop_writing*/, true /*schedule_shutdown*/, error_code);
  167. }
  168. /**
  169. * Public function for closing connection.
  170. */
  171. static void s_connection_close(struct aws_http_connection *connection_base) {
  172. struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base);
  173. /* Don't stop reading/writing immediately, let that happen naturally during the channel shutdown process. */
  174. s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, AWS_ERROR_SUCCESS);
  175. }
  176. static void s_connection_stop_new_request(struct aws_http_connection *connection_base) {
  177. struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base);
  178. { /* BEGIN CRITICAL SECTION */
  179. aws_h1_connection_lock_synced_data(connection);
  180. if (!connection->synced_data.new_stream_error_code) {
  181. connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
  182. }
  183. aws_h1_connection_unlock_synced_data(connection);
  184. } /* END CRITICAL SECTION */
  185. }
  186. static bool s_connection_is_open(const struct aws_http_connection *connection_base) {
  187. struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base);
  188. bool is_open;
  189. { /* BEGIN CRITICAL SECTION */
  190. aws_h1_connection_lock_synced_data(connection);
  191. is_open = connection->synced_data.is_open;
  192. aws_h1_connection_unlock_synced_data(connection);
  193. } /* END CRITICAL SECTION */
  194. return is_open;
  195. }
  196. static bool s_connection_new_requests_allowed(const struct aws_http_connection *connection_base) {
  197. struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base);
  198. int new_stream_error_code;
  199. { /* BEGIN CRITICAL SECTION */
  200. aws_h1_connection_lock_synced_data(connection);
  201. new_stream_error_code = connection->synced_data.new_stream_error_code;
  202. aws_h1_connection_unlock_synced_data(connection);
  203. } /* END CRITICAL SECTION */
  204. return new_stream_error_code == 0;
  205. }
  206. static int s_stream_send_response(struct aws_http_stream *stream, struct aws_http_message *response) {
  207. AWS_PRECONDITION(stream);
  208. AWS_PRECONDITION(response);
  209. struct aws_h1_stream *h1_stream = AWS_CONTAINER_OF(stream, struct aws_h1_stream, base);
  210. return aws_h1_stream_send_response(h1_stream, response);
  211. }
  212. /* Calculate the desired window size for connection that has switched protocols and become a midchannel handler. */
  213. static size_t s_calculate_midchannel_desired_connection_window(struct aws_h1_connection *connection) {
  214. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  215. AWS_ASSERT(connection->thread_data.has_switched_protocols);
  216. if (!connection->base.channel_slot->adj_right) {
  217. /* No downstream handler installed. */
  218. return 0;
  219. }
  220. /* Connection is just dumbly forwarding aws_io_messages, so try to match downstream handler. */
  221. return aws_channel_slot_downstream_read_window(connection->base.channel_slot);
  222. }
  223. /* Calculate the desired window size for a connection that is processing data for aws_http_streams. */
  224. static size_t s_calculate_stream_mode_desired_connection_window(struct aws_h1_connection *connection) {
  225. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  226. AWS_ASSERT(!connection->thread_data.has_switched_protocols);
  227. if (!connection->base.stream_manual_window_management) {
  228. return SIZE_MAX;
  229. }
  230. /* Connection window should match the available space in the read-buffer */
  231. AWS_ASSERT(
  232. connection->thread_data.read_buffer.pending_bytes <= connection->thread_data.read_buffer.capacity &&
  233. "This isn't fatal, but our math is off");
  234. const size_t desired_connection_window = aws_sub_size_saturating(
  235. connection->thread_data.read_buffer.capacity, connection->thread_data.read_buffer.pending_bytes);
  236. AWS_LOGF_TRACE(
  237. AWS_LS_HTTP_CONNECTION,
  238. "id=%p: Window stats: connection=%zu+%zu stream=%" PRIu64 " buffer=%zu/%zu",
  239. (void *)&connection->base,
  240. connection->thread_data.connection_window,
  241. desired_connection_window - connection->thread_data.connection_window /*increment_size*/,
  242. connection->thread_data.incoming_stream ? connection->thread_data.incoming_stream->thread_data.stream_window
  243. : 0,
  244. connection->thread_data.read_buffer.pending_bytes,
  245. connection->thread_data.read_buffer.capacity);
  246. return desired_connection_window;
  247. }
  248. /* Increment connection window, if necessary */
  249. static int s_update_connection_window(struct aws_h1_connection *connection) {
  250. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  251. if (connection->thread_data.is_reading_stopped) {
  252. return AWS_OP_SUCCESS;
  253. }
  254. const size_t desired_size = connection->thread_data.has_switched_protocols
  255. ? s_calculate_midchannel_desired_connection_window(connection)
  256. : s_calculate_stream_mode_desired_connection_window(connection);
  257. const size_t increment_size = aws_sub_size_saturating(desired_size, connection->thread_data.connection_window);
  258. if (increment_size > 0) {
  259. /* Update local `connection_window`. See comments at variable's declaration site
  260. * on why we use this instead of the official `aws_channel_slot.window_size` */
  261. connection->thread_data.connection_window += increment_size;
  262. connection->thread_data.recent_window_increments =
  263. aws_add_size_saturating(connection->thread_data.recent_window_increments, increment_size);
  264. if (aws_channel_slot_increment_read_window(connection->base.channel_slot, increment_size)) {
  265. AWS_LOGF_ERROR(
  266. AWS_LS_HTTP_CONNECTION,
  267. "id=%p: Failed to increment read window, error %d (%s). Closing connection.",
  268. (void *)&connection->base,
  269. aws_last_error(),
  270. aws_error_name(aws_last_error()));
  271. return AWS_OP_ERR;
  272. }
  273. }
  274. return AWS_OP_SUCCESS;
  275. }
  276. int aws_h1_stream_activate(struct aws_http_stream *stream) {
  277. struct aws_h1_stream *h1_stream = AWS_CONTAINER_OF(stream, struct aws_h1_stream, base);
  278. struct aws_http_connection *base_connection = stream->owning_connection;
  279. struct aws_h1_connection *connection = AWS_CONTAINER_OF(base_connection, struct aws_h1_connection, base);
  280. bool should_schedule_task = false;
  281. { /* BEGIN CRITICAL SECTION */
  282. /* Note: We're touching both the connection's and stream's synced_data in this section,
  283. * which is OK because an h1_connection and all its h1_streams share a single lock. */
  284. aws_h1_connection_lock_synced_data(connection);
  285. if (stream->id) {
  286. /* stream has already been activated. */
  287. aws_h1_connection_unlock_synced_data(connection);
  288. return AWS_OP_SUCCESS;
  289. }
  290. if (connection->synced_data.new_stream_error_code) {
  291. aws_h1_connection_unlock_synced_data(connection);
  292. AWS_LOGF_ERROR(
  293. AWS_LS_HTTP_CONNECTION,
  294. "id=%p: Failed to activate the stream id=%p, new streams are not allowed now. error %d (%s)",
  295. (void *)&connection->base,
  296. (void *)stream,
  297. connection->synced_data.new_stream_error_code,
  298. aws_error_name(connection->synced_data.new_stream_error_code));
  299. return aws_raise_error(connection->synced_data.new_stream_error_code);
  300. }
  301. stream->id = aws_http_connection_get_next_stream_id(base_connection);
  302. if (!stream->id) {
  303. aws_h1_connection_unlock_synced_data(connection);
  304. /* aws_http_connection_get_next_stream_id() raises its own error. */
  305. return AWS_OP_ERR;
  306. }
  307. /* ID successfully assigned */
  308. h1_stream->synced_data.api_state = AWS_H1_STREAM_API_STATE_ACTIVE;
  309. aws_linked_list_push_back(&connection->synced_data.new_client_stream_list, &h1_stream->node);
  310. if (!connection->synced_data.is_cross_thread_work_task_scheduled) {
  311. connection->synced_data.is_cross_thread_work_task_scheduled = true;
  312. should_schedule_task = true;
  313. }
  314. aws_h1_connection_unlock_synced_data(connection);
  315. } /* END CRITICAL SECTION */
  316. /* connection keeps activated stream alive until stream completes */
  317. aws_atomic_fetch_add(&stream->refcount, 1);
  318. if (should_schedule_task) {
  319. AWS_LOGF_TRACE(
  320. AWS_LS_HTTP_CONNECTION, "id=%p: Scheduling connection cross-thread work task.", (void *)base_connection);
  321. aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
  322. } else {
  323. AWS_LOGF_TRACE(
  324. AWS_LS_HTTP_CONNECTION,
  325. "id=%p: Connection cross-thread work task was already scheduled",
  326. (void *)base_connection);
  327. }
  328. return AWS_OP_SUCCESS;
  329. }
  330. struct aws_http_stream *s_make_request(
  331. struct aws_http_connection *client_connection,
  332. const struct aws_http_make_request_options *options) {
  333. struct aws_h1_stream *stream = aws_h1_stream_new_request(client_connection, options);
  334. if (!stream) {
  335. AWS_LOGF_ERROR(
  336. AWS_LS_HTTP_CONNECTION,
  337. "id=%p: Cannot create request stream, error %d (%s)",
  338. (void *)client_connection,
  339. aws_last_error(),
  340. aws_error_name(aws_last_error()));
  341. return NULL;
  342. }
  343. struct aws_h1_connection *connection = AWS_CONTAINER_OF(client_connection, struct aws_h1_connection, base);
  344. /* Insert new stream into pending list, and schedule outgoing_stream_task if it's not already running. */
  345. int new_stream_error_code;
  346. { /* BEGIN CRITICAL SECTION */
  347. aws_h1_connection_lock_synced_data(connection);
  348. new_stream_error_code = connection->synced_data.new_stream_error_code;
  349. aws_h1_connection_unlock_synced_data(connection);
  350. } /* END CRITICAL SECTION */
  351. if (new_stream_error_code) {
  352. AWS_LOGF_ERROR(
  353. AWS_LS_HTTP_CONNECTION,
  354. "id=%p: Cannot create request stream, error %d (%s)",
  355. (void *)client_connection,
  356. new_stream_error_code,
  357. aws_error_name(new_stream_error_code));
  358. aws_raise_error(new_stream_error_code);
  359. goto error;
  360. }
  361. /* Success! */
  362. struct aws_byte_cursor method;
  363. aws_http_message_get_request_method(options->request, &method);
  364. stream->base.request_method = aws_http_str_to_method(method);
  365. struct aws_byte_cursor path;
  366. aws_http_message_get_request_path(options->request, &path);
  367. AWS_LOGF_DEBUG(
  368. AWS_LS_HTTP_STREAM,
  369. "id=%p: Created client request on connection=%p: " PRInSTR " " PRInSTR " " PRInSTR,
  370. (void *)&stream->base,
  371. (void *)client_connection,
  372. AWS_BYTE_CURSOR_PRI(method),
  373. AWS_BYTE_CURSOR_PRI(path),
  374. AWS_BYTE_CURSOR_PRI(aws_http_version_to_str(connection->base.http_version)));
  375. return &stream->base;
  376. error:
  377. /* Force destruction of the stream, avoiding ref counting */
  378. stream->base.vtable->destroy(&stream->base);
  379. return NULL;
  380. }
  381. /* Extract work items from synced_data, and perform the work on-thread. */
  382. static void s_cross_thread_work_task(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) {
  383. (void)channel_task;
  384. struct aws_h1_connection *connection = arg;
  385. if (status != AWS_TASK_STATUS_RUN_READY) {
  386. return;
  387. }
  388. AWS_LOGF_TRACE(
  389. AWS_LS_HTTP_CONNECTION, "id=%p: Running connection cross-thread work task.", (void *)&connection->base);
  390. /* BEGIN CRITICAL SECTION */
  391. aws_h1_connection_lock_synced_data(connection);
  392. connection->synced_data.is_cross_thread_work_task_scheduled = false;
  393. bool has_new_client_streams = !aws_linked_list_empty(&connection->synced_data.new_client_stream_list);
  394. aws_linked_list_move_all_back(
  395. &connection->thread_data.stream_list, &connection->synced_data.new_client_stream_list);
  396. aws_h1_connection_unlock_synced_data(connection);
  397. /* END CRITICAL SECTION */
  398. /* Kick off outgoing-stream task if necessary */
  399. if (has_new_client_streams) {
  400. aws_h1_connection_try_write_outgoing_stream(connection);
  401. }
  402. }
  403. static bool s_aws_http_stream_was_successful_connect(struct aws_h1_stream *stream) {
  404. struct aws_http_stream *base = &stream->base;
  405. if (base->request_method != AWS_HTTP_METHOD_CONNECT) {
  406. return false;
  407. }
  408. if (base->client_data == NULL) {
  409. return false;
  410. }
  411. if (base->client_data->response_status != AWS_HTTP_STATUS_CODE_200_OK) {
  412. return false;
  413. }
  414. return true;
  415. }
  416. /**
  417. * Validate and perform a protocol switch on a connection. Protocol switching essentially turns the connection's
  418. * handler into a dummy pass-through. It is valid to switch protocols to the same protocol resulting in a channel
  419. * that has a "dead" http handler in the middle of the channel (which negotiated the CONNECT through the proxy) and
  420. * a "live" handler on the end which takes the actual http requests. By doing this, we get the exact same
  421. * behavior whether we're transitioning to http or any other protocol: once the CONNECT succeeds
  422. * the first http handler is put in pass-through mode and a new protocol (which could be http) is tacked onto the end.
  423. */
  424. static int s_aws_http1_switch_protocols(struct aws_h1_connection *connection) {
  425. AWS_FATAL_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  426. /* Switching protocols while there are multiple streams is too complex to deal with.
  427. * Ensure stream_list has exactly this 1 stream in it. */
  428. if (aws_linked_list_begin(&connection->thread_data.stream_list) !=
  429. aws_linked_list_rbegin(&connection->thread_data.stream_list)) {
  430. AWS_LOGF_ERROR(
  431. AWS_LS_HTTP_CONNECTION,
  432. "id=%p: Cannot switch protocols while further streams are pending, closing connection.",
  433. (void *)&connection->base);
  434. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  435. }
  436. AWS_LOGF_TRACE(
  437. AWS_LS_HTTP_CONNECTION,
  438. "id=%p: Connection has switched protocols, another channel handler must be installed to"
  439. " deal with further data.",
  440. (void *)&connection->base);
  441. connection->thread_data.has_switched_protocols = true;
  442. { /* BEGIN CRITICAL SECTION */
  443. aws_h1_connection_lock_synced_data(connection);
  444. connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_SWITCHED_PROTOCOLS;
  445. aws_h1_connection_unlock_synced_data(connection);
  446. } /* END CRITICAL SECTION */
  447. return AWS_OP_SUCCESS;
  448. }
  449. static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
  450. struct aws_h1_connection *connection =
  451. AWS_CONTAINER_OF(stream->base.owning_connection, struct aws_h1_connection, base);
  452. /*
  453. * If this is the end of a successful CONNECT request, mark ourselves as pass-through since the proxy layer
  454. * will be tacking on a new http handler (and possibly a tls handler in-between).
  455. */
  456. if (error_code == AWS_ERROR_SUCCESS && s_aws_http_stream_was_successful_connect(stream)) {
  457. if (s_aws_http1_switch_protocols(connection)) {
  458. error_code = AWS_ERROR_HTTP_PROTOCOL_SWITCH_FAILURE;
  459. s_shutdown_due_to_error(connection, error_code);
  460. }
  461. }
  462. if (error_code != AWS_ERROR_SUCCESS) {
  463. if (stream->base.client_data && stream->is_incoming_message_done) {
  464. /* As a request that finished receiving the response, we ignore error and
  465. * consider it finished successfully */
  466. AWS_LOGF_DEBUG(
  467. AWS_LS_HTTP_STREAM,
  468. "id=%p: Ignoring error code %d (%s). The response has been fully received,"
  469. "so the stream will complete successfully.",
  470. (void *)&stream->base,
  471. error_code,
  472. aws_error_name(error_code));
  473. error_code = AWS_ERROR_SUCCESS;
  474. }
  475. if (stream->base.server_data && stream->is_outgoing_message_done) {
  476. /* As a server finished sending the response, but still failed with the request was not finished receiving.
  477. * We ignore error and consider it finished successfully */
  478. AWS_LOGF_DEBUG(
  479. AWS_LS_HTTP_STREAM,
  480. "id=%p: Ignoring error code %d (%s). The response has been fully sent,"
  481. " so the stream will complete successfully",
  482. (void *)&stream->base,
  483. error_code,
  484. aws_error_name(error_code));
  485. error_code = AWS_ERROR_SUCCESS;
  486. }
  487. }
  488. /* Remove stream from list. */
  489. aws_linked_list_remove(&stream->node);
  490. /* Nice logging */
  491. if (error_code) {
  492. AWS_LOGF_DEBUG(
  493. AWS_LS_HTTP_STREAM,
  494. "id=%p: Stream completed with error code %d (%s).",
  495. (void *)&stream->base,
  496. error_code,
  497. aws_error_name(error_code));
  498. } else if (stream->base.client_data) {
  499. AWS_LOGF_DEBUG(
  500. AWS_LS_HTTP_STREAM,
  501. "id=%p: Client request complete, response status: %d (%s).",
  502. (void *)&stream->base,
  503. stream->base.client_data->response_status,
  504. aws_http_status_text(stream->base.client_data->response_status));
  505. } else {
  506. AWS_ASSERT(stream->base.server_data);
  507. AWS_LOGF_DEBUG(
  508. AWS_LS_HTTP_STREAM,
  509. "id=%p: Server response to " PRInSTR " request complete.",
  510. (void *)&stream->base,
  511. AWS_BYTE_CURSOR_PRI(stream->base.server_data->request_method_str));
  512. }
  513. /* If connection must shut down, do it BEFORE invoking stream-complete callback.
  514. * That way, if aws_http_connection_is_open() is called from stream-complete callback, it returns false. */
  515. if (stream->is_final_stream) {
  516. AWS_LOGF_TRACE(
  517. AWS_LS_HTTP_CONNECTION,
  518. "id=%p: Closing connection due to completion of final stream.",
  519. (void *)&connection->base);
  520. s_connection_close(&connection->base);
  521. }
  522. { /* BEGIN CRITICAL SECTION */
  523. /* Note: We're touching the stream's synced_data here, which is OK
  524. * because an h1_connection and all its h1_streams share a single lock. */
  525. aws_h1_connection_lock_synced_data(connection);
  526. /* Mark stream complete */
  527. stream->synced_data.api_state = AWS_H1_STREAM_API_STATE_COMPLETE;
  528. /* Move chunks out of synced data */
  529. aws_linked_list_move_all_back(&stream->thread_data.pending_chunk_list, &stream->synced_data.pending_chunk_list);
  530. aws_h1_connection_unlock_synced_data(connection);
  531. } /* END CRITICAL SECTION */
  532. /* Complete any leftover chunks */
  533. while (!aws_linked_list_empty(&stream->thread_data.pending_chunk_list)) {
  534. struct aws_linked_list_node *node = aws_linked_list_pop_front(&stream->thread_data.pending_chunk_list);
  535. struct aws_h1_chunk *chunk = AWS_CONTAINER_OF(node, struct aws_h1_chunk, node);
  536. aws_h1_chunk_complete_and_destroy(chunk, &stream->base, AWS_ERROR_HTTP_STREAM_HAS_COMPLETED);
  537. }
  538. /* Invoke callback and clean up stream. */
  539. if (stream->base.on_complete) {
  540. stream->base.on_complete(&stream->base, error_code, stream->base.user_data);
  541. }
  542. aws_http_stream_release(&stream->base);
  543. }
  544. static void s_add_time_measurement_to_stats(uint64_t start_ns, uint64_t end_ns, uint64_t *output_ms) {
  545. if (end_ns > start_ns) {
  546. *output_ms += aws_timestamp_convert(end_ns - start_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
  547. }
  548. }
  549. static void s_set_outgoing_stream_ptr(
  550. struct aws_h1_connection *connection,
  551. struct aws_h1_stream *next_outgoing_stream) {
  552. struct aws_h1_stream *prev = connection->thread_data.outgoing_stream;
  553. uint64_t now_ns = 0;
  554. aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns);
  555. if (prev == NULL && next_outgoing_stream != NULL) {
  556. /* transition from nothing to write -> something to write */
  557. connection->thread_data.outgoing_stream_timestamp_ns = now_ns;
  558. } else if (prev != NULL && next_outgoing_stream == NULL) {
  559. /* transition from something to write -> nothing to write */
  560. s_add_time_measurement_to_stats(
  561. connection->thread_data.outgoing_stream_timestamp_ns,
  562. now_ns,
  563. &connection->thread_data.stats.pending_outgoing_stream_ms);
  564. }
  565. connection->thread_data.outgoing_stream = next_outgoing_stream;
  566. }
  567. static void s_set_incoming_stream_ptr(
  568. struct aws_h1_connection *connection,
  569. struct aws_h1_stream *next_incoming_stream) {
  570. struct aws_h1_stream *prev = connection->thread_data.incoming_stream;
  571. uint64_t now_ns = 0;
  572. aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns);
  573. if (prev == NULL && next_incoming_stream != NULL) {
  574. /* transition from nothing to read -> something to read */
  575. connection->thread_data.incoming_stream_timestamp_ns = now_ns;
  576. } else if (prev != NULL && next_incoming_stream == NULL) {
  577. /* transition from something to read -> nothing to read */
  578. s_add_time_measurement_to_stats(
  579. connection->thread_data.incoming_stream_timestamp_ns,
  580. now_ns,
  581. &connection->thread_data.stats.pending_incoming_stream_ms);
  582. }
  583. connection->thread_data.incoming_stream = next_incoming_stream;
  584. }
  585. /**
  586. * Ensure `incoming_stream` is pointing at the correct stream, and update state if it changes.
  587. */
  588. static void s_client_update_incoming_stream_ptr(struct aws_h1_connection *connection) {
  589. struct aws_linked_list *list = &connection->thread_data.stream_list;
  590. struct aws_h1_stream *desired;
  591. if (connection->thread_data.is_reading_stopped) {
  592. desired = NULL;
  593. } else if (aws_linked_list_empty(list)) {
  594. desired = NULL;
  595. } else {
  596. desired = AWS_CONTAINER_OF(aws_linked_list_begin(list), struct aws_h1_stream, node);
  597. }
  598. if (connection->thread_data.incoming_stream == desired) {
  599. return;
  600. }
  601. AWS_LOGF_TRACE(
  602. AWS_LS_HTTP_CONNECTION,
  603. "id=%p: Current incoming stream is now %p.",
  604. (void *)&connection->base,
  605. desired ? (void *)&desired->base : NULL);
  606. s_set_incoming_stream_ptr(connection, desired);
  607. }
  608. /**
  609. * If necessary, update `outgoing_stream` so it is pointing at a stream
  610. * with data to send, or NULL if all streams are done sending data.
  611. *
  612. * Called from event-loop thread.
  613. * This function has lots of side effects.
  614. */
  615. static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connection *connection) {
  616. struct aws_h1_stream *current = connection->thread_data.outgoing_stream;
  617. bool current_changed = false;
  618. int err;
  619. /* If current stream is done sending data... */
  620. if (current && !aws_h1_encoder_is_message_in_progress(&connection->thread_data.encoder)) {
  621. current->is_outgoing_message_done = true;
  622. /* RFC-7230 section 6.6: Tear-down.
  623. * If this was the final stream, don't allows any further streams to be sent */
  624. if (current->is_final_stream) {
  625. AWS_LOGF_TRACE(
  626. AWS_LS_HTTP_CONNECTION,
  627. "id=%p: Done sending final stream, no further streams will be sent.",
  628. (void *)&connection->base);
  629. s_stop(
  630. connection,
  631. false /*stop_reading*/,
  632. true /*stop_writing*/,
  633. false /*schedule_shutdown*/,
  634. AWS_ERROR_SUCCESS);
  635. }
  636. /* If it's also done receiving data, then it's complete! */
  637. if (current->is_incoming_message_done) {
  638. /* Only 1st stream in list could finish receiving before it finished sending */
  639. AWS_ASSERT(&current->node == aws_linked_list_begin(&connection->thread_data.stream_list));
  640. /* This removes stream from list */
  641. s_stream_complete(current, AWS_ERROR_SUCCESS);
  642. }
  643. current = NULL;
  644. current_changed = true;
  645. }
  646. /* If current stream is NULL, look for more work. */
  647. if (!current && !connection->thread_data.is_writing_stopped) {
  648. /* Look for next stream we can work on. */
  649. for (struct aws_linked_list_node *node = aws_linked_list_begin(&connection->thread_data.stream_list);
  650. node != aws_linked_list_end(&connection->thread_data.stream_list);
  651. node = aws_linked_list_next(node)) {
  652. struct aws_h1_stream *stream = AWS_CONTAINER_OF(node, struct aws_h1_stream, node);
  653. /* If we already sent this stream's data, keep looking... */
  654. if (stream->is_outgoing_message_done) {
  655. continue;
  656. }
  657. /* STOP if we're a server, and this stream's response isn't ready to send.
  658. * It's not like we can skip this and start on the next stream because responses must be sent in order.
  659. * Don't need a check like this for clients because their streams always start with data to send. */
  660. if (connection->base.server_data && !stream->thread_data.has_outgoing_response) {
  661. break;
  662. }
  663. /* We found a stream to work on! */
  664. current = stream;
  665. current_changed = true;
  666. break;
  667. }
  668. }
  669. /* Update current incoming and outgoing streams. */
  670. if (current_changed) {
  671. AWS_LOGF_TRACE(
  672. AWS_LS_HTTP_CONNECTION,
  673. "id=%p: Current outgoing stream is now %p.",
  674. (void *)&connection->base,
  675. current ? (void *)&current->base : NULL);
  676. s_set_outgoing_stream_ptr(connection, current);
  677. if (current) {
  678. err = aws_h1_encoder_start_message(
  679. &connection->thread_data.encoder, &current->encoder_message, &current->base);
  680. (void)err;
  681. AWS_ASSERT(!err);
  682. }
  683. /* incoming_stream update is only for client */
  684. if (connection->base.client_data) {
  685. s_client_update_incoming_stream_ptr(connection);
  686. }
  687. }
  688. return current;
  689. }
  690. /* Runs after an aws_io_message containing HTTP has completed (written to the network, or failed).
  691. * This does NOT run after switching protocols, when we're dumbly forwarding aws_io_messages
  692. * as a midchannel handler. */
  693. static void s_on_channel_write_complete(
  694. struct aws_channel *channel,
  695. struct aws_io_message *message,
  696. int err_code,
  697. void *user_data) {
  698. (void)message;
  699. struct aws_h1_connection *connection = user_data;
  700. AWS_ASSERT(connection->thread_data.is_outgoing_stream_task_active);
  701. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  702. if (err_code) {
  703. AWS_LOGF_TRACE(
  704. AWS_LS_HTTP_CONNECTION,
  705. "id=%p: Message did not write to network, error %d (%s)",
  706. (void *)&connection->base,
  707. err_code,
  708. aws_error_name(err_code));
  709. s_shutdown_due_to_error(connection, err_code);
  710. return;
  711. }
  712. AWS_LOGF_TRACE(
  713. AWS_LS_HTTP_CONNECTION,
  714. "id=%p: Message finished writing to network. Rescheduling outgoing stream task.",
  715. (void *)&connection->base);
  716. /* To avoid wasting memory, we only want ONE of our written aws_io_messages in the channel at a time.
  717. * Therefore, we wait until it's written to the network before trying to send another
  718. * by running the outgoing-stream-task again.
  719. *
  720. * We also want to share the network with other channels.
  721. * Therefore, when the write completes, we SCHEDULE the outgoing-stream-task
  722. * to run again instead of calling the function directly.
  723. * This way, if the message completes synchronously,
  724. * we're not hogging the network by writing message after message in a tight loop */
  725. aws_channel_schedule_task_now(channel, &connection->outgoing_stream_task);
  726. }
  727. static void s_outgoing_stream_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  728. (void)task;
  729. if (status != AWS_TASK_STATUS_RUN_READY) {
  730. return;
  731. }
  732. struct aws_h1_connection *connection = arg;
  733. AWS_ASSERT(connection->thread_data.is_outgoing_stream_task_active);
  734. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  735. s_write_outgoing_stream(connection, false /*first_try*/);
  736. }
  737. void aws_h1_connection_try_write_outgoing_stream(struct aws_h1_connection *connection) {
  738. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  739. if (connection->thread_data.is_outgoing_stream_task_active) {
  740. /* Task is already active */
  741. return;
  742. }
  743. connection->thread_data.is_outgoing_stream_task_active = true;
  744. s_write_outgoing_stream(connection, true /*first_try*/);
  745. }
  746. /* Do the actual work of the outgoing-stream-task */
  747. static void s_write_outgoing_stream(struct aws_h1_connection *connection, bool first_try) {
  748. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  749. AWS_PRECONDITION(connection->thread_data.is_outgoing_stream_task_active);
  750. /* Just stop if we're no longer writing stream data */
  751. if (connection->thread_data.is_writing_stopped || connection->thread_data.has_switched_protocols) {
  752. return;
  753. }
  754. /* Determine whether we have data available to send, and end task immediately if there's not.
  755. * The outgoing stream task will be kicked off again when user adds more data (new stream, new chunk, etc) */
  756. struct aws_h1_stream *outgoing_stream = s_update_outgoing_stream_ptr(connection);
  757. bool waiting_for_chunks = aws_h1_encoder_is_waiting_for_chunks(&connection->thread_data.encoder);
  758. if (!outgoing_stream || waiting_for_chunks) {
  759. if (!first_try) {
  760. AWS_LOGF_TRACE(
  761. AWS_LS_HTTP_CONNECTION,
  762. "id=%p: Outgoing stream task stopped. outgoing_stream=%p waiting_for_chunks:%d",
  763. (void *)&connection->base,
  764. outgoing_stream ? (void *)&outgoing_stream->base : NULL,
  765. waiting_for_chunks);
  766. }
  767. connection->thread_data.is_outgoing_stream_task_active = false;
  768. return;
  769. }
  770. if (first_try) {
  771. AWS_LOGF_TRACE(AWS_LS_HTTP_CONNECTION, "id=%p: Outgoing stream task has begun.", (void *)&connection->base);
  772. }
  773. struct aws_io_message *msg = aws_channel_slot_acquire_max_message_for_write(connection->base.channel_slot);
  774. if (!msg) {
  775. AWS_LOGF_ERROR(
  776. AWS_LS_HTTP_CONNECTION,
  777. "id=%p: Failed to acquire message from pool, error %d (%s). Closing connection.",
  778. (void *)&connection->base,
  779. aws_last_error(),
  780. aws_error_name(aws_last_error()));
  781. goto error;
  782. }
  783. /* Set up callback so we can send another message when this one completes */
  784. msg->on_completion = s_on_channel_write_complete;
  785. msg->user_data = connection;
  786. /*
  787. * Fill message data from the outgoing stream.
  788. * Note that we might be resuming work on a stream from a previous run of this task.
  789. */
  790. if (AWS_OP_SUCCESS != aws_h1_encoder_process(&connection->thread_data.encoder, &msg->message_data)) {
  791. /* Error sending data, abandon ship */
  792. goto error;
  793. }
  794. if (msg->message_data.len > 0) {
  795. AWS_LOGF_TRACE(
  796. AWS_LS_HTTP_CONNECTION,
  797. "id=%p: Outgoing stream task is sending message of size %zu.",
  798. (void *)&connection->base,
  799. msg->message_data.len);
  800. if (aws_channel_slot_send_message(connection->base.channel_slot, msg, AWS_CHANNEL_DIR_WRITE)) {
  801. AWS_LOGF_ERROR(
  802. AWS_LS_HTTP_CONNECTION,
  803. "id=%p: Failed to send message in write direction, error %d (%s). Closing connection.",
  804. (void *)&connection->base,
  805. aws_last_error(),
  806. aws_error_name(aws_last_error()));
  807. goto error;
  808. }
  809. } else {
  810. /* If message is empty, warn that no work is being done
  811. * and reschedule the task to try again next tick.
  812. * It's likely that body isn't ready, so body streaming function has no data to write yet.
  813. * If this scenario turns out to be common we should implement a "pause" feature. */
  814. AWS_LOGF_WARN(
  815. AWS_LS_HTTP_CONNECTION,
  816. "id=%p: Current outgoing stream %p sent no data, will try again next tick.",
  817. (void *)&connection->base,
  818. outgoing_stream ? (void *)&outgoing_stream->base : NULL);
  819. aws_mem_release(msg->allocator, msg);
  820. aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->outgoing_stream_task);
  821. }
  822. return;
  823. error:
  824. if (msg) {
  825. aws_mem_release(msg->allocator, msg);
  826. }
  827. s_shutdown_due_to_error(connection, aws_last_error());
  828. }
  829. static int s_decoder_on_request(
  830. enum aws_http_method method_enum,
  831. const struct aws_byte_cursor *method_str,
  832. const struct aws_byte_cursor *uri,
  833. void *user_data) {
  834. struct aws_h1_connection *connection = user_data;
  835. struct aws_h1_stream *incoming_stream = connection->thread_data.incoming_stream;
  836. AWS_FATAL_ASSERT(connection->thread_data.incoming_stream->base.server_data); /* Request but I'm a client?!?!? */
  837. AWS_ASSERT(incoming_stream->base.server_data->request_method_str.len == 0);
  838. AWS_ASSERT(incoming_stream->base.server_data->request_path.len == 0);
  839. AWS_LOGF_TRACE(
  840. AWS_LS_HTTP_STREAM,
  841. "id=%p: Incoming request: method=" PRInSTR " uri=" PRInSTR,
  842. (void *)&incoming_stream->base,
  843. AWS_BYTE_CURSOR_PRI(*method_str),
  844. AWS_BYTE_CURSOR_PRI(*uri));
  845. /* Copy strings to internal buffer */
  846. struct aws_byte_buf *storage_buf = &incoming_stream->incoming_storage_buf;
  847. AWS_ASSERT(storage_buf->capacity == 0);
  848. size_t storage_size = 0;
  849. int err = aws_add_size_checked(uri->len, method_str->len, &storage_size);
  850. if (err) {
  851. goto error;
  852. }
  853. err = aws_byte_buf_init(storage_buf, incoming_stream->base.alloc, storage_size);
  854. if (err) {
  855. goto error;
  856. }
  857. aws_byte_buf_write_from_whole_cursor(storage_buf, *method_str);
  858. incoming_stream->base.server_data->request_method_str = aws_byte_cursor_from_buf(storage_buf);
  859. aws_byte_buf_write_from_whole_cursor(storage_buf, *uri);
  860. incoming_stream->base.server_data->request_path = aws_byte_cursor_from_buf(storage_buf);
  861. aws_byte_cursor_advance(&incoming_stream->base.server_data->request_path, storage_buf->len - uri->len);
  862. incoming_stream->base.request_method = method_enum;
  863. /* No user callbacks, so we're not checking for shutdown */
  864. return AWS_OP_SUCCESS;
  865. error:
  866. AWS_LOGF_ERROR(
  867. AWS_LS_HTTP_CONNECTION,
  868. "id=%p: Failed to process new incoming request, error %d (%s).",
  869. (void *)&connection->base,
  870. aws_last_error(),
  871. aws_error_name(aws_last_error()));
  872. return AWS_OP_ERR;
  873. }
  874. static int s_decoder_on_response(int status_code, void *user_data) {
  875. struct aws_h1_connection *connection = user_data;
  876. AWS_FATAL_ASSERT(connection->thread_data.incoming_stream->base.client_data); /* Response but I'm a server?!?!? */
  877. AWS_LOGF_TRACE(
  878. AWS_LS_HTTP_STREAM,
  879. "id=%p: Incoming response status: %d (%s).",
  880. (void *)&connection->thread_data.incoming_stream->base,
  881. status_code,
  882. aws_http_status_text(status_code));
  883. connection->thread_data.incoming_stream->base.client_data->response_status = status_code;
  884. /* No user callbacks, so we're not checking for shutdown */
  885. return AWS_OP_SUCCESS;
  886. }
  887. static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void *user_data) {
  888. struct aws_h1_connection *connection = user_data;
  889. struct aws_h1_stream *incoming_stream = connection->thread_data.incoming_stream;
  890. AWS_LOGF_TRACE(
  891. AWS_LS_HTTP_STREAM,
  892. "id=%p: Incoming header: " PRInSTR ": " PRInSTR,
  893. (void *)&incoming_stream->base,
  894. AWS_BYTE_CURSOR_PRI(header->name_data),
  895. AWS_BYTE_CURSOR_PRI(header->value_data));
  896. enum aws_http_header_block header_block =
  897. aws_h1_decoder_get_header_block(connection->thread_data.incoming_stream_decoder);
  898. /* RFC-7230 section 6.1.
  899. * "Connection: close" header signals that a connection will not persist after the current request/response */
  900. if (header->name == AWS_HTTP_HEADER_CONNECTION) {
  901. /* Certain L7 proxies send a connection close header on a 200/OK response to a CONNECT request. This is nutty
  902. * behavior, but the obviously desired behavior on a 200 CONNECT response is to leave the connection open
  903. * for the tunneling. */
  904. bool ignore_connection_close =
  905. incoming_stream->base.request_method == AWS_HTTP_METHOD_CONNECT && incoming_stream->base.client_data &&
  906. incoming_stream->base.client_data->response_status == AWS_HTTP_STATUS_CODE_200_OK;
  907. if (!ignore_connection_close && aws_byte_cursor_eq_c_str_ignore_case(&header->value_data, "close")) {
  908. AWS_LOGF_TRACE(
  909. AWS_LS_HTTP_STREAM,
  910. "id=%p: Received 'Connection: close' header. This will be the final stream on this connection.",
  911. (void *)&incoming_stream->base);
  912. incoming_stream->is_final_stream = true;
  913. { /* BEGIN CRITICAL SECTION */
  914. aws_h1_connection_lock_synced_data(connection);
  915. connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
  916. aws_h1_connection_unlock_synced_data(connection);
  917. } /* END CRITICAL SECTION */
  918. if (connection->base.client_data) {
  919. /**
  920. * RFC-9112 section 9.6.
  921. * A client that receives a "close" connection option MUST cease sending
  922. * requests on that connection and close the connection after reading the
  923. * response message containing the "close" connection option.
  924. *
  925. * Mark the stream's outgoing message as complete,
  926. * so that we stop sending, and stop waiting for it to finish sending.
  927. **/
  928. if (!incoming_stream->is_outgoing_message_done) {
  929. AWS_LOGF_DEBUG(
  930. AWS_LS_HTTP_STREAM,
  931. "id=%p: Received 'Connection: close' header, no more request data will be sent.",
  932. (void *)&incoming_stream->base);
  933. incoming_stream->is_outgoing_message_done = true;
  934. }
  935. /* Stop writing right now.
  936. * Shutdown will be scheduled after we finishing parsing the response */
  937. s_stop(
  938. connection,
  939. false /*stop_reading*/,
  940. true /*stop_writing*/,
  941. false /*schedule_shutdown*/,
  942. AWS_ERROR_SUCCESS);
  943. }
  944. }
  945. }
  946. if (incoming_stream->base.on_incoming_headers) {
  947. struct aws_http_header deliver = {
  948. .name = header->name_data,
  949. .value = header->value_data,
  950. };
  951. int err = incoming_stream->base.on_incoming_headers(
  952. &incoming_stream->base, header_block, &deliver, 1, incoming_stream->base.user_data);
  953. if (err) {
  954. AWS_LOGF_ERROR(
  955. AWS_LS_HTTP_STREAM,
  956. "id=%p: Incoming header callback raised error %d (%s).",
  957. (void *)&incoming_stream->base,
  958. aws_last_error(),
  959. aws_error_name(aws_last_error()));
  960. return AWS_OP_ERR;
  961. }
  962. }
  963. return AWS_OP_SUCCESS;
  964. }
  965. static int s_mark_head_done(struct aws_h1_stream *incoming_stream) {
  966. /* Bail out if we've already done this */
  967. if (incoming_stream->is_incoming_head_done) {
  968. return AWS_OP_SUCCESS;
  969. }
  970. struct aws_h1_connection *connection =
  971. AWS_CONTAINER_OF(incoming_stream->base.owning_connection, struct aws_h1_connection, base);
  972. enum aws_http_header_block header_block =
  973. aws_h1_decoder_get_header_block(connection->thread_data.incoming_stream_decoder);
  974. if (header_block == AWS_HTTP_HEADER_BLOCK_MAIN) {
  975. AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Main header block done.", (void *)&incoming_stream->base);
  976. incoming_stream->is_incoming_head_done = true;
  977. } else if (header_block == AWS_HTTP_HEADER_BLOCK_INFORMATIONAL) {
  978. AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Informational header block done.", (void *)&incoming_stream->base);
  979. /* Only clients can receive informational headers.
  980. * Check whether we're switching protocols */
  981. if (incoming_stream->base.client_data->response_status == AWS_HTTP_STATUS_CODE_101_SWITCHING_PROTOCOLS) {
  982. if (s_aws_http1_switch_protocols(connection)) {
  983. return AWS_OP_ERR;
  984. }
  985. }
  986. }
  987. /* Invoke user cb */
  988. if (incoming_stream->base.on_incoming_header_block_done) {
  989. int err = incoming_stream->base.on_incoming_header_block_done(
  990. &incoming_stream->base, header_block, incoming_stream->base.user_data);
  991. if (err) {
  992. AWS_LOGF_ERROR(
  993. AWS_LS_HTTP_STREAM,
  994. "id=%p: Incoming-header-block-done callback raised error %d (%s).",
  995. (void *)&incoming_stream->base,
  996. aws_last_error(),
  997. aws_error_name(aws_last_error()));
  998. return AWS_OP_ERR;
  999. }
  1000. }
  1001. return AWS_OP_SUCCESS;
  1002. }
  1003. static int s_decoder_on_body(const struct aws_byte_cursor *data, bool finished, void *user_data) {
  1004. (void)finished;
  1005. struct aws_h1_connection *connection = user_data;
  1006. struct aws_h1_stream *incoming_stream = connection->thread_data.incoming_stream;
  1007. AWS_ASSERT(incoming_stream);
  1008. int err = s_mark_head_done(incoming_stream);
  1009. if (err) {
  1010. return AWS_OP_ERR;
  1011. }
  1012. /* No need to invoke callback for 0-length data */
  1013. if (data->len == 0) {
  1014. return AWS_OP_SUCCESS;
  1015. }
  1016. AWS_LOGF_TRACE(
  1017. AWS_LS_HTTP_STREAM, "id=%p: Incoming body: %zu bytes received.", (void *)&incoming_stream->base, data->len);
  1018. if (connection->base.stream_manual_window_management) {
  1019. /* Let stream window shrink by amount of body data received */
  1020. if (data->len > incoming_stream->thread_data.stream_window) {
  1021. /* This error shouldn't be possible, but it's all complicated, so do runtime check to be safe. */
  1022. AWS_LOGF_ERROR(
  1023. AWS_LS_HTTP_STREAM,
  1024. "id=%p: Internal error. Data exceeds HTTP-stream's window.",
  1025. (void *)&incoming_stream->base);
  1026. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  1027. }
  1028. incoming_stream->thread_data.stream_window -= data->len;
  1029. if (incoming_stream->thread_data.stream_window == 0) {
  1030. AWS_LOGF_DEBUG(
  1031. AWS_LS_HTTP_STREAM,
  1032. "id=%p: Flow-control window has reached 0. No more data can be received until window is updated.",
  1033. (void *)&incoming_stream->base);
  1034. }
  1035. }
  1036. if (incoming_stream->base.on_incoming_body) {
  1037. err = incoming_stream->base.on_incoming_body(&incoming_stream->base, data, incoming_stream->base.user_data);
  1038. if (err) {
  1039. AWS_LOGF_ERROR(
  1040. AWS_LS_HTTP_STREAM,
  1041. "id=%p: Incoming body callback raised error %d (%s).",
  1042. (void *)&incoming_stream->base,
  1043. aws_last_error(),
  1044. aws_error_name(aws_last_error()));
  1045. return AWS_OP_ERR;
  1046. }
  1047. }
  1048. return AWS_OP_SUCCESS;
  1049. }
  1050. static int s_decoder_on_done(void *user_data) {
  1051. struct aws_h1_connection *connection = user_data;
  1052. struct aws_h1_stream *incoming_stream = connection->thread_data.incoming_stream;
  1053. AWS_ASSERT(incoming_stream);
  1054. /* Ensure head was marked done */
  1055. int err = s_mark_head_done(incoming_stream);
  1056. if (err) {
  1057. return AWS_OP_ERR;
  1058. }
  1059. /* If it is a informational response, we stop here, keep waiting for new response */
  1060. enum aws_http_header_block header_block =
  1061. aws_h1_decoder_get_header_block(connection->thread_data.incoming_stream_decoder);
  1062. if (header_block == AWS_HTTP_HEADER_BLOCK_INFORMATIONAL) {
  1063. return AWS_OP_SUCCESS;
  1064. }
  1065. /* Otherwise the incoming stream is finished decoding and we will update it if needed */
  1066. incoming_stream->is_incoming_message_done = true;
  1067. /* RFC-7230 section 6.6
  1068. * After reading the final message, the connection must not read any more */
  1069. if (incoming_stream->is_final_stream) {
  1070. AWS_LOGF_TRACE(
  1071. AWS_LS_HTTP_CONNECTION,
  1072. "id=%p: Done reading final stream, no further streams will be read.",
  1073. (void *)&connection->base);
  1074. s_stop(
  1075. connection, true /*stop_reading*/, false /*stop_writing*/, false /*schedule_shutdown*/, AWS_ERROR_SUCCESS);
  1076. }
  1077. if (connection->base.server_data) {
  1078. /* Server side */
  1079. aws_http_on_incoming_request_done_fn *on_request_done = incoming_stream->base.server_data->on_request_done;
  1080. if (on_request_done) {
  1081. err = on_request_done(&incoming_stream->base, incoming_stream->base.user_data);
  1082. if (err) {
  1083. AWS_LOGF_ERROR(
  1084. AWS_LS_HTTP_STREAM,
  1085. "id=%p: Incoming request done callback raised error %d (%s).",
  1086. (void *)&incoming_stream->base,
  1087. aws_last_error(),
  1088. aws_error_name(aws_last_error()));
  1089. return AWS_OP_ERR;
  1090. }
  1091. }
  1092. if (incoming_stream->is_outgoing_message_done) {
  1093. AWS_ASSERT(&incoming_stream->node == aws_linked_list_begin(&connection->thread_data.stream_list));
  1094. s_stream_complete(incoming_stream, AWS_ERROR_SUCCESS);
  1095. }
  1096. s_set_incoming_stream_ptr(connection, NULL);
  1097. } else if (incoming_stream->is_outgoing_message_done) {
  1098. /* Client side */
  1099. AWS_ASSERT(&incoming_stream->node == aws_linked_list_begin(&connection->thread_data.stream_list));
  1100. s_stream_complete(incoming_stream, AWS_ERROR_SUCCESS);
  1101. s_client_update_incoming_stream_ptr(connection);
  1102. }
  1103. /* Report success even if user's on_complete() callback shuts down on the connection.
  1104. * We don't want it to look like something went wrong while decoding.
  1105. * The decode() function returns after each message completes,
  1106. * and we won't call decode() again if the connection has been shut down */
  1107. return AWS_OP_SUCCESS;
  1108. }
  1109. /* Common new() logic for server & client */
  1110. static struct aws_h1_connection *s_connection_new(
  1111. struct aws_allocator *alloc,
  1112. bool manual_window_management,
  1113. size_t initial_window_size,
  1114. const struct aws_http1_connection_options *http1_options,
  1115. bool server) {
  1116. struct aws_h1_connection *connection = aws_mem_calloc(alloc, 1, sizeof(struct aws_h1_connection));
  1117. if (!connection) {
  1118. goto error_connection_alloc;
  1119. }
  1120. connection->base.vtable = &s_h1_connection_vtable;
  1121. connection->base.alloc = alloc;
  1122. connection->base.channel_handler.vtable = &s_h1_connection_vtable.channel_handler_vtable;
  1123. connection->base.channel_handler.alloc = alloc;
  1124. connection->base.channel_handler.impl = connection;
  1125. connection->base.http_version = AWS_HTTP_VERSION_1_1;
  1126. connection->base.stream_manual_window_management = manual_window_management;
  1127. /* Init the next stream id (server must use even ids, client odd [RFC 7540 5.1.1])*/
  1128. connection->base.next_stream_id = server ? 2 : 1;
  1129. /* 1 refcount for user */
  1130. aws_atomic_init_int(&connection->base.refcount, 1);
  1131. if (manual_window_management) {
  1132. connection->initial_stream_window_size = initial_window_size;
  1133. if (http1_options->read_buffer_capacity > 0) {
  1134. connection->thread_data.read_buffer.capacity = http1_options->read_buffer_capacity;
  1135. } else {
  1136. /* User did not set capacity, choose something reasonable based on initial_window_size */
  1137. /* NOTE: These values are currently guesses, we should test to find good values */
  1138. const size_t clamp_min = aws_min_size(g_aws_channel_max_fragment_size * 4, /*256KB*/ 256 * 1024);
  1139. const size_t clamp_max = /*1MB*/ 1 * 1024 * 1024;
  1140. connection->thread_data.read_buffer.capacity =
  1141. aws_max_size(clamp_min, aws_min_size(clamp_max, initial_window_size));
  1142. }
  1143. connection->thread_data.connection_window = connection->thread_data.read_buffer.capacity;
  1144. } else {
  1145. /* No backpressure, keep connection window at SIZE_MAX */
  1146. connection->initial_stream_window_size = SIZE_MAX;
  1147. connection->thread_data.read_buffer.capacity = SIZE_MAX;
  1148. connection->thread_data.connection_window = SIZE_MAX;
  1149. }
  1150. aws_h1_encoder_init(&connection->thread_data.encoder, alloc);
  1151. aws_channel_task_init(
  1152. &connection->outgoing_stream_task, s_outgoing_stream_task, connection, "http1_connection_outgoing_stream");
  1153. aws_channel_task_init(
  1154. &connection->cross_thread_work_task,
  1155. s_cross_thread_work_task,
  1156. connection,
  1157. "http1_connection_cross_thread_work");
  1158. aws_linked_list_init(&connection->thread_data.stream_list);
  1159. aws_linked_list_init(&connection->thread_data.read_buffer.messages);
  1160. aws_crt_statistics_http1_channel_init(&connection->thread_data.stats);
  1161. int err = aws_mutex_init(&connection->synced_data.lock);
  1162. if (err) {
  1163. AWS_LOGF_ERROR(
  1164. AWS_LS_HTTP_CONNECTION,
  1165. "static: Failed to initialize mutex, error %d (%s).",
  1166. aws_last_error(),
  1167. aws_error_name(aws_last_error()));
  1168. goto error_mutex;
  1169. }
  1170. aws_linked_list_init(&connection->synced_data.new_client_stream_list);
  1171. connection->synced_data.is_open = true;
  1172. struct aws_h1_decoder_params options = {
  1173. .alloc = alloc,
  1174. .is_decoding_requests = server,
  1175. .user_data = connection,
  1176. .vtable = s_h1_decoder_vtable,
  1177. .scratch_space_initial_size = DECODER_INITIAL_SCRATCH_SIZE,
  1178. };
  1179. connection->thread_data.incoming_stream_decoder = aws_h1_decoder_new(&options);
  1180. if (!connection->thread_data.incoming_stream_decoder) {
  1181. AWS_LOGF_ERROR(
  1182. AWS_LS_HTTP_CONNECTION,
  1183. "static: Failed to create decoder, error %d (%s).",
  1184. aws_last_error(),
  1185. aws_error_name(aws_last_error()));
  1186. goto error_decoder;
  1187. }
  1188. return connection;
  1189. error_decoder:
  1190. aws_mutex_clean_up(&connection->synced_data.lock);
  1191. error_mutex:
  1192. aws_mem_release(alloc, connection);
  1193. error_connection_alloc:
  1194. return NULL;
  1195. }
  1196. struct aws_http_connection *aws_http_connection_new_http1_1_server(
  1197. struct aws_allocator *allocator,
  1198. bool manual_window_management,
  1199. size_t initial_window_size,
  1200. const struct aws_http1_connection_options *http1_options) {
  1201. struct aws_h1_connection *connection =
  1202. s_connection_new(allocator, manual_window_management, initial_window_size, http1_options, true /*is_server*/);
  1203. if (!connection) {
  1204. return NULL;
  1205. }
  1206. connection->base.server_data = &connection->base.client_or_server_data.server;
  1207. return &connection->base;
  1208. }
  1209. struct aws_http_connection *aws_http_connection_new_http1_1_client(
  1210. struct aws_allocator *allocator,
  1211. bool manual_window_management,
  1212. size_t initial_window_size,
  1213. const struct aws_http1_connection_options *http1_options) {
  1214. struct aws_h1_connection *connection =
  1215. s_connection_new(allocator, manual_window_management, initial_window_size, http1_options, false /*is_server*/);
  1216. if (!connection) {
  1217. return NULL;
  1218. }
  1219. connection->base.client_data = &connection->base.client_or_server_data.client;
  1220. return &connection->base;
  1221. }
  1222. static void s_handler_destroy(struct aws_channel_handler *handler) {
  1223. struct aws_h1_connection *connection = handler->impl;
  1224. AWS_LOGF_TRACE(AWS_LS_HTTP_CONNECTION, "id=%p: Destroying connection.", (void *)&connection->base);
  1225. AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.stream_list));
  1226. AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.new_client_stream_list));
  1227. /* Clean up any buffered read messages. */
  1228. while (!aws_linked_list_empty(&connection->thread_data.read_buffer.messages)) {
  1229. struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->thread_data.read_buffer.messages);
  1230. struct aws_io_message *msg = AWS_CONTAINER_OF(node, struct aws_io_message, queueing_handle);
  1231. aws_mem_release(msg->allocator, msg);
  1232. }
  1233. aws_h1_decoder_destroy(connection->thread_data.incoming_stream_decoder);
  1234. aws_h1_encoder_clean_up(&connection->thread_data.encoder);
  1235. aws_mutex_clean_up(&connection->synced_data.lock);
  1236. aws_mem_release(connection->base.alloc, connection);
  1237. }
  1238. static void s_handler_installed(struct aws_channel_handler *handler, struct aws_channel_slot *slot) {
  1239. struct aws_h1_connection *connection = handler->impl;
  1240. connection->base.channel_slot = slot;
  1241. /* Acquire a hold on the channel to prevent its destruction until the user has
  1242. * given the go-ahead via aws_http_connection_release() */
  1243. aws_channel_acquire_hold(slot->channel);
  1244. }
  1245. /* Try to send the next queued aws_io_message to the downstream handler.
  1246. * This can only be called after the connection has switched protocols and becoming a midchannel handler. */
  1247. static int s_try_process_next_midchannel_read_message(struct aws_h1_connection *connection, bool *out_stop_processing) {
  1248. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  1249. AWS_ASSERT(connection->thread_data.has_switched_protocols);
  1250. AWS_ASSERT(!connection->thread_data.is_reading_stopped);
  1251. AWS_ASSERT(!aws_linked_list_empty(&connection->thread_data.read_buffer.messages));
  1252. *out_stop_processing = false;
  1253. struct aws_io_message *sending_msg = NULL;
  1254. if (!connection->base.channel_slot->adj_right) {
  1255. AWS_LOGF_ERROR(
  1256. AWS_LS_HTTP_CONNECTION,
  1257. "id=%p: Connection has switched protocols, but no handler is installed to deal with this data.",
  1258. (void *)connection);
  1259. return aws_raise_error(AWS_ERROR_HTTP_SWITCHED_PROTOCOLS);
  1260. }
  1261. size_t downstream_window = aws_channel_slot_downstream_read_window(connection->base.channel_slot);
  1262. if (downstream_window == 0) {
  1263. AWS_LOGF_TRACE(
  1264. AWS_LS_HTTP_CONNECTION,
  1265. "id=%p: Downstream window is 0, cannot send switched-protocol message now.",
  1266. (void *)&connection->base);
  1267. *out_stop_processing = true;
  1268. return AWS_OP_SUCCESS;
  1269. }
  1270. struct aws_linked_list_node *queued_msg_node = aws_linked_list_front(&connection->thread_data.read_buffer.messages);
  1271. struct aws_io_message *queued_msg = AWS_CONTAINER_OF(queued_msg_node, struct aws_io_message, queueing_handle);
  1272. /* Note that copy_mark is used to mark the progress of partially sent messages. */
  1273. AWS_ASSERT(queued_msg->message_data.len > queued_msg->copy_mark);
  1274. size_t sending_bytes = aws_min_size(queued_msg->message_data.len - queued_msg->copy_mark, downstream_window);
  1275. AWS_ASSERT(connection->thread_data.read_buffer.pending_bytes >= sending_bytes);
  1276. connection->thread_data.read_buffer.pending_bytes -= sending_bytes;
  1277. /* If we can't send the whole entire queued_msg, copy its data into a new aws_io_message and send that. */
  1278. if (sending_bytes != queued_msg->message_data.len) {
  1279. sending_msg = aws_channel_acquire_message_from_pool(
  1280. connection->base.channel_slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, sending_bytes);
  1281. if (!sending_msg) {
  1282. goto error;
  1283. }
  1284. aws_byte_buf_write(
  1285. &sending_msg->message_data, queued_msg->message_data.buffer + queued_msg->copy_mark, sending_bytes);
  1286. queued_msg->copy_mark += sending_bytes;
  1287. AWS_LOGF_TRACE(
  1288. AWS_LS_HTTP_CONNECTION,
  1289. "id=%p: Sending %zu bytes switched-protocol message to downstream handler, %zu bytes remain.",
  1290. (void *)&connection->base,
  1291. sending_bytes,
  1292. queued_msg->message_data.len - queued_msg->copy_mark);
  1293. /* If the last of queued_msg has been copied, it can be deleted now. */
  1294. if (queued_msg->copy_mark == queued_msg->message_data.len) {
  1295. aws_linked_list_remove(queued_msg_node);
  1296. aws_mem_release(queued_msg->allocator, queued_msg);
  1297. }
  1298. } else {
  1299. /* Sending all of queued_msg along. */
  1300. AWS_LOGF_TRACE(
  1301. AWS_LS_HTTP_CONNECTION,
  1302. "id=%p: Sending full switched-protocol message of size %zu to downstream handler.",
  1303. (void *)&connection->base,
  1304. queued_msg->message_data.len);
  1305. aws_linked_list_remove(queued_msg_node);
  1306. sending_msg = queued_msg;
  1307. }
  1308. int err = aws_channel_slot_send_message(connection->base.channel_slot, sending_msg, AWS_CHANNEL_DIR_READ);
  1309. if (err) {
  1310. AWS_LOGF_ERROR(
  1311. AWS_LS_HTTP_CONNECTION,
  1312. "id=%p: Failed to send message in read direction, error %d (%s).",
  1313. (void *)&connection->base,
  1314. aws_last_error(),
  1315. aws_error_name(aws_last_error()));
  1316. goto error;
  1317. }
  1318. return AWS_OP_SUCCESS;
  1319. error:
  1320. if (sending_msg) {
  1321. aws_mem_release(sending_msg->allocator, sending_msg);
  1322. }
  1323. return AWS_OP_ERR;
  1324. }
  1325. static struct aws_http_stream *s_new_server_request_handler_stream(
  1326. const struct aws_http_request_handler_options *options) {
  1327. struct aws_h1_connection *connection = AWS_CONTAINER_OF(options->server_connection, struct aws_h1_connection, base);
  1328. if (!aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel) ||
  1329. !connection->thread_data.can_create_request_handler_stream) {
  1330. AWS_LOGF_ERROR(
  1331. AWS_LS_HTTP_CONNECTION,
  1332. "id=%p: aws_http_stream_new_server_request_handler() can only be called during incoming request callback.",
  1333. (void *)&connection->base);
  1334. aws_raise_error(AWS_ERROR_INVALID_STATE);
  1335. return NULL;
  1336. }
  1337. struct aws_h1_stream *stream = aws_h1_stream_new_request_handler(options);
  1338. if (!stream) {
  1339. AWS_LOGF_ERROR(
  1340. AWS_LS_HTTP_CONNECTION,
  1341. "id=%p: Failed to create request handler stream, error %d (%s).",
  1342. (void *)&connection->base,
  1343. aws_last_error(),
  1344. aws_error_name(aws_last_error()));
  1345. return NULL;
  1346. }
  1347. /*
  1348. * Success!
  1349. * Everything beyond this point cannot fail
  1350. */
  1351. /* Prevent further streams from being created until it's ok to do so. */
  1352. connection->thread_data.can_create_request_handler_stream = false;
  1353. /* Stream is waiting for response. */
  1354. aws_linked_list_push_back(&connection->thread_data.stream_list, &stream->node);
  1355. /* Connection owns stream, and must outlive stream */
  1356. aws_http_connection_acquire(&connection->base);
  1357. AWS_LOGF_TRACE(
  1358. AWS_LS_HTTP_STREAM,
  1359. "id=%p: Created request handler stream on server connection=%p",
  1360. (void *)&stream->base,
  1361. (void *)&connection->base);
  1362. return &stream->base;
  1363. }
  1364. /* Invokes the on_incoming_request callback and returns new stream. */
  1365. static struct aws_h1_stream *s_server_invoke_on_incoming_request(struct aws_h1_connection *connection) {
  1366. AWS_PRECONDITION(connection->base.server_data);
  1367. AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  1368. AWS_PRECONDITION(!connection->thread_data.can_create_request_handler_stream);
  1369. AWS_PRECONDITION(!connection->thread_data.incoming_stream);
  1370. /**
  1371. * The user MUST create the new request-handler stream during the on-incoming-request callback.
  1372. */
  1373. connection->thread_data.can_create_request_handler_stream = true;
  1374. struct aws_http_stream *new_stream =
  1375. connection->base.server_data->on_incoming_request(&connection->base, connection->base.user_data);
  1376. connection->thread_data.can_create_request_handler_stream = false;
  1377. return new_stream ? AWS_CONTAINER_OF(new_stream, struct aws_h1_stream, base) : NULL;
  1378. }
  1379. static int s_handler_process_read_message(
  1380. struct aws_channel_handler *handler,
  1381. struct aws_channel_slot *slot,
  1382. struct aws_io_message *message) {
  1383. (void)slot;
  1384. struct aws_h1_connection *connection = handler->impl;
  1385. const size_t message_size = message->message_data.len;
  1386. AWS_LOGF_TRACE(
  1387. AWS_LS_HTTP_CONNECTION, "id=%p: Incoming message of size %zu.", (void *)&connection->base, message_size);
  1388. /* Shrink connection window by amount of data received. See comments at variable's
  1389. * declaration site on why we use this instead of the official `aws_channel_slot.window_size`. */
  1390. if (message_size > connection->thread_data.connection_window) {
  1391. /* This error shouldn't be possible, but this is all complicated so check at runtime to be safe. */
  1392. AWS_LOGF_ERROR(
  1393. AWS_LS_HTTP_CONNECTION,
  1394. "id=%p: Internal error. Message exceeds connection's window.",
  1395. (void *)&connection->base);
  1396. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  1397. }
  1398. connection->thread_data.connection_window -= message_size;
  1399. /* Push message into queue of buffered messages */
  1400. aws_linked_list_push_back(&connection->thread_data.read_buffer.messages, &message->queueing_handle);
  1401. connection->thread_data.read_buffer.pending_bytes += message_size;
  1402. /* Try to process messages in queue */
  1403. aws_h1_connection_try_process_read_messages(connection);
  1404. return AWS_OP_SUCCESS;
  1405. }
  1406. void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *connection) {
  1407. /* Protect against this function being called recursively. */
  1408. if (connection->thread_data.is_processing_read_messages) {
  1409. return;
  1410. }
  1411. connection->thread_data.is_processing_read_messages = true;
  1412. /* Process queued messages */
  1413. while (!aws_linked_list_empty(&connection->thread_data.read_buffer.messages)) {
  1414. if (connection->thread_data.is_reading_stopped) {
  1415. AWS_LOGF_ERROR(
  1416. AWS_LS_HTTP_CONNECTION,
  1417. "id=%p: Cannot process message because connection is shutting down.",
  1418. (void *)&connection->base);
  1419. aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
  1420. goto shutdown;
  1421. }
  1422. bool stop_processing = false;
  1423. /* When connection has switched protocols, messages are processed very differently.
  1424. * We need to do this check in the middle of the normal processing loop,
  1425. * in case the switch happens in the middle of processing a message. */
  1426. if (connection->thread_data.has_switched_protocols) {
  1427. if (s_try_process_next_midchannel_read_message(connection, &stop_processing)) {
  1428. goto shutdown;
  1429. }
  1430. } else {
  1431. if (s_try_process_next_stream_read_message(connection, &stop_processing)) {
  1432. goto shutdown;
  1433. }
  1434. }
  1435. /* Break out of loop if we can't process any more data */
  1436. if (stop_processing) {
  1437. break;
  1438. }
  1439. }
  1440. /* Increment connection window, if necessary */
  1441. if (s_update_connection_window(connection)) {
  1442. goto shutdown;
  1443. }
  1444. connection->thread_data.is_processing_read_messages = false;
  1445. return;
  1446. shutdown:
  1447. s_shutdown_due_to_error(connection, aws_last_error());
  1448. }
  1449. /* Try to process the next queued aws_io_message as normal HTTP data for an aws_http_stream.
  1450. * This MUST NOT be called if the connection has switched protocols and become a midchannel handler. */
  1451. static int s_try_process_next_stream_read_message(struct aws_h1_connection *connection, bool *out_stop_processing) {
  1452. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
  1453. AWS_ASSERT(!connection->thread_data.has_switched_protocols);
  1454. AWS_ASSERT(!connection->thread_data.is_reading_stopped);
  1455. AWS_ASSERT(!aws_linked_list_empty(&connection->thread_data.read_buffer.messages));
  1456. *out_stop_processing = false;
  1457. /* Ensure that an incoming stream exists to receive the data */
  1458. if (!connection->thread_data.incoming_stream) {
  1459. if (aws_http_connection_is_client(&connection->base)) {
  1460. /* Client side */
  1461. AWS_LOGF_ERROR(
  1462. AWS_LS_HTTP_CONNECTION,
  1463. "id=%p: Cannot process message because no requests are currently awaiting response, closing "
  1464. "connection.",
  1465. (void *)&connection->base);
  1466. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  1467. } else {
  1468. /* Server side.
  1469. * Invoke on-incoming-request callback. The user MUST create a new stream from this callback.
  1470. * The new stream becomes the current incoming stream */
  1471. s_set_incoming_stream_ptr(connection, s_server_invoke_on_incoming_request(connection));
  1472. if (!connection->thread_data.incoming_stream) {
  1473. AWS_LOGF_ERROR(
  1474. AWS_LS_HTTP_CONNECTION,
  1475. "id=%p: Incoming request callback failed to provide a new stream, last error %d (%s). "
  1476. "Closing connection.",
  1477. (void *)&connection->base,
  1478. aws_last_error(),
  1479. aws_error_name(aws_last_error()));
  1480. return AWS_OP_ERR;
  1481. }
  1482. }
  1483. }
  1484. struct aws_h1_stream *incoming_stream = connection->thread_data.incoming_stream;
  1485. /* Stop processing if stream's window reaches 0. */
  1486. const uint64_t stream_window = incoming_stream->thread_data.stream_window;
  1487. if (stream_window == 0) {
  1488. AWS_LOGF_TRACE(
  1489. AWS_LS_HTTP_CONNECTION,
  1490. "id=%p: HTTP-stream's window is 0, cannot process message now.",
  1491. (void *)&connection->base);
  1492. *out_stop_processing = true;
  1493. return AWS_OP_SUCCESS;
  1494. }
  1495. struct aws_linked_list_node *queued_msg_node = aws_linked_list_front(&connection->thread_data.read_buffer.messages);
  1496. struct aws_io_message *queued_msg = AWS_CONTAINER_OF(queued_msg_node, struct aws_io_message, queueing_handle);
  1497. /* Note that copy_mark is used to mark the progress of partially decoded messages */
  1498. struct aws_byte_cursor message_cursor = aws_byte_cursor_from_buf(&queued_msg->message_data);
  1499. aws_byte_cursor_advance(&message_cursor, queued_msg->copy_mark);
  1500. /* Don't process more data than the stream's window can accept.
  1501. *
  1502. * TODO: Let the decoder know about stream-window size so it can stop itself,
  1503. * instead of limiting the amount of data we feed into the decoder at a time.
  1504. * This would be more optimal, AND avoid an edge-case where the stream-window goes
  1505. * to 0 as the body ends, and the connection can't proceed to the trailing headers.
  1506. */
  1507. message_cursor.len = (size_t)aws_min_u64(message_cursor.len, stream_window);
  1508. const size_t prev_cursor_len = message_cursor.len;
  1509. /* Set some decoder state, based on current stream */
  1510. aws_h1_decoder_set_logging_id(connection->thread_data.incoming_stream_decoder, incoming_stream);
  1511. bool body_headers_ignored = incoming_stream->base.request_method == AWS_HTTP_METHOD_HEAD;
  1512. aws_h1_decoder_set_body_headers_ignored(connection->thread_data.incoming_stream_decoder, body_headers_ignored);
  1513. /* As decoder runs, it invokes the internal s_decoder_X callbacks, which in turn invoke user callbacks.
  1514. * The decoder will stop once it hits the end of the request/response OR the end of the message data. */
  1515. if (aws_h1_decode(connection->thread_data.incoming_stream_decoder, &message_cursor)) {
  1516. AWS_LOGF_ERROR(
  1517. AWS_LS_HTTP_CONNECTION,
  1518. "id=%p: Message processing failed, error %d (%s). Closing connection.",
  1519. (void *)&connection->base,
  1520. aws_last_error(),
  1521. aws_error_name(aws_last_error()));
  1522. return AWS_OP_ERR;
  1523. }
  1524. size_t bytes_processed = prev_cursor_len - message_cursor.len;
  1525. queued_msg->copy_mark += bytes_processed;
  1526. AWS_ASSERT(connection->thread_data.read_buffer.pending_bytes >= bytes_processed);
  1527. connection->thread_data.read_buffer.pending_bytes -= bytes_processed;
  1528. AWS_LOGF_TRACE(
  1529. AWS_LS_HTTP_CONNECTION,
  1530. "id=%p: Decoded %zu bytes of message, %zu bytes remain.",
  1531. (void *)&connection->base,
  1532. bytes_processed,
  1533. queued_msg->message_data.len - queued_msg->copy_mark);
  1534. /* If the last of queued_msg has been processed, it can be deleted now.
  1535. * Otherwise, it remains in the queue for further processing later. */
  1536. if (queued_msg->copy_mark == queued_msg->message_data.len) {
  1537. aws_linked_list_remove(&queued_msg->queueing_handle);
  1538. aws_mem_release(queued_msg->allocator, queued_msg);
  1539. }
  1540. return AWS_OP_SUCCESS;
  1541. }
  1542. static int s_handler_process_write_message(
  1543. struct aws_channel_handler *handler,
  1544. struct aws_channel_slot *slot,
  1545. struct aws_io_message *message) {
  1546. struct aws_h1_connection *connection = handler->impl;
  1547. if (connection->thread_data.is_writing_stopped) {
  1548. aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
  1549. goto error;
  1550. }
  1551. if (!connection->thread_data.has_switched_protocols) {
  1552. aws_raise_error(AWS_ERROR_INVALID_STATE);
  1553. goto error;
  1554. }
  1555. /* Pass the message right along. */
  1556. int err = aws_channel_slot_send_message(slot, message, AWS_CHANNEL_DIR_WRITE);
  1557. if (err) {
  1558. goto error;
  1559. }
  1560. return AWS_OP_SUCCESS;
  1561. error:
  1562. AWS_LOGF_ERROR(
  1563. AWS_LS_HTTP_CONNECTION,
  1564. "id=%p: Destroying write message without passing it along, error %d (%s)",
  1565. (void *)&connection->base,
  1566. aws_last_error(),
  1567. aws_error_name(aws_last_error()));
  1568. if (message->on_completion) {
  1569. message->on_completion(connection->base.channel_slot->channel, message, aws_last_error(), message->user_data);
  1570. }
  1571. aws_mem_release(message->allocator, message);
  1572. s_shutdown_due_to_error(connection, aws_last_error());
  1573. return AWS_OP_SUCCESS;
  1574. }
  1575. static int s_handler_increment_read_window(
  1576. struct aws_channel_handler *handler,
  1577. struct aws_channel_slot *slot,
  1578. size_t size) {
  1579. (void)slot;
  1580. struct aws_h1_connection *connection = handler->impl;
  1581. if (!connection->thread_data.has_switched_protocols) {
  1582. AWS_LOGF_ERROR(
  1583. AWS_LS_HTTP_CONNECTION,
  1584. "id=%p: HTTP connection cannot have a downstream handler without first switching protocols",
  1585. (void *)&connection->base);
  1586. aws_raise_error(AWS_ERROR_INVALID_STATE);
  1587. goto error;
  1588. }
  1589. AWS_LOGF_TRACE(
  1590. AWS_LS_HTTP_CONNECTION,
  1591. "id=%p: Handler in read direction incremented read window by %zu. Sending queued messages, if any.",
  1592. (void *)&connection->base,
  1593. size);
  1594. /* Send along any queued messages, and increment connection's window if necessary */
  1595. aws_h1_connection_try_process_read_messages(connection);
  1596. return AWS_OP_SUCCESS;
  1597. error:
  1598. s_shutdown_due_to_error(connection, aws_last_error());
  1599. return AWS_OP_SUCCESS;
  1600. }
  1601. static int s_handler_shutdown(
  1602. struct aws_channel_handler *handler,
  1603. struct aws_channel_slot *slot,
  1604. enum aws_channel_direction dir,
  1605. int error_code,
  1606. bool free_scarce_resources_immediately) {
  1607. (void)free_scarce_resources_immediately;
  1608. struct aws_h1_connection *connection = handler->impl;
  1609. AWS_LOGF_TRACE(
  1610. AWS_LS_HTTP_CONNECTION,
  1611. "id=%p: Channel shutting down in %s direction with error code %d (%s).",
  1612. (void *)&connection->base,
  1613. (dir == AWS_CHANNEL_DIR_READ) ? "read" : "write",
  1614. error_code,
  1615. aws_error_name(error_code));
  1616. if (dir == AWS_CHANNEL_DIR_READ) {
  1617. /* This call ensures that no further streams will be created or worked on. */
  1618. s_stop(connection, true /*stop_reading*/, false /*stop_writing*/, false /*schedule_shutdown*/, error_code);
  1619. } else /* dir == AWS_CHANNEL_DIR_WRITE */ {
  1620. s_stop(connection, false /*stop_reading*/, true /*stop_writing*/, false /*schedule_shutdown*/, error_code);
  1621. /* Mark all pending streams as complete. */
  1622. int stream_error_code = error_code == AWS_ERROR_SUCCESS ? AWS_ERROR_HTTP_CONNECTION_CLOSED : error_code;
  1623. while (!aws_linked_list_empty(&connection->thread_data.stream_list)) {
  1624. struct aws_linked_list_node *node = aws_linked_list_front(&connection->thread_data.stream_list);
  1625. s_stream_complete(AWS_CONTAINER_OF(node, struct aws_h1_stream, node), stream_error_code);
  1626. }
  1627. /* It's OK to access synced_data.new_client_stream_list without holding the lock because
  1628. * no more streams can be added after s_stop() has been invoked. */
  1629. while (!aws_linked_list_empty(&connection->synced_data.new_client_stream_list)) {
  1630. struct aws_linked_list_node *node = aws_linked_list_front(&connection->synced_data.new_client_stream_list);
  1631. s_stream_complete(AWS_CONTAINER_OF(node, struct aws_h1_stream, node), stream_error_code);
  1632. }
  1633. }
  1634. aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, free_scarce_resources_immediately);
  1635. return AWS_OP_SUCCESS;
  1636. }
  1637. static size_t s_handler_initial_window_size(struct aws_channel_handler *handler) {
  1638. struct aws_h1_connection *connection = handler->impl;
  1639. return connection->thread_data.connection_window;
  1640. }
  1641. static size_t s_handler_message_overhead(struct aws_channel_handler *handler) {
  1642. (void)handler;
  1643. return 0;
  1644. }
  1645. static void s_reset_statistics(struct aws_channel_handler *handler) {
  1646. struct aws_h1_connection *connection = handler->impl;
  1647. aws_crt_statistics_http1_channel_reset(&connection->thread_data.stats);
  1648. }
  1649. static void s_pull_up_stats_timestamps(struct aws_h1_connection *connection) {
  1650. uint64_t now_ns = 0;
  1651. if (aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns)) {
  1652. return;
  1653. }
  1654. if (connection->thread_data.outgoing_stream) {
  1655. s_add_time_measurement_to_stats(
  1656. connection->thread_data.outgoing_stream_timestamp_ns,
  1657. now_ns,
  1658. &connection->thread_data.stats.pending_outgoing_stream_ms);
  1659. connection->thread_data.outgoing_stream_timestamp_ns = now_ns;
  1660. connection->thread_data.stats.current_outgoing_stream_id =
  1661. aws_http_stream_get_id(&connection->thread_data.outgoing_stream->base);
  1662. }
  1663. if (connection->thread_data.incoming_stream) {
  1664. s_add_time_measurement_to_stats(
  1665. connection->thread_data.incoming_stream_timestamp_ns,
  1666. now_ns,
  1667. &connection->thread_data.stats.pending_incoming_stream_ms);
  1668. connection->thread_data.incoming_stream_timestamp_ns = now_ns;
  1669. connection->thread_data.stats.current_incoming_stream_id =
  1670. aws_http_stream_get_id(&connection->thread_data.incoming_stream->base);
  1671. }
  1672. }
  1673. static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_array_list *stats) {
  1674. struct aws_h1_connection *connection = handler->impl;
  1675. /* TODO: Need update the way we calculate statistics, to account for user-controlled pauses.
  1676. * If user is adding chunks 1 by 1, there can naturally be a gap in the upload.
  1677. * If the user lets the stream-window go to zero, there can naturally be a gap in the download. */
  1678. s_pull_up_stats_timestamps(connection);
  1679. void *stats_base = &connection->thread_data.stats;
  1680. aws_array_list_push_back(stats, &stats_base);
  1681. }
  1682. struct aws_crt_statistics_http1_channel *aws_h1_connection_get_statistics(struct aws_http_connection *connection) {
  1683. AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->channel_slot->channel));
  1684. struct aws_h1_connection *h1_conn = (void *)connection;
  1685. return &h1_conn->thread_data.stats;
  1686. }
  1687. struct aws_h1_window_stats aws_h1_connection_window_stats(struct aws_http_connection *connection_base) {
  1688. struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base);
  1689. struct aws_h1_window_stats stats = {
  1690. .connection_window = connection->thread_data.connection_window,
  1691. .buffer_capacity = connection->thread_data.read_buffer.capacity,
  1692. .buffer_pending_bytes = connection->thread_data.read_buffer.pending_bytes,
  1693. .recent_window_increments = connection->thread_data.recent_window_increments,
  1694. .has_incoming_stream = connection->thread_data.incoming_stream != NULL,
  1695. .stream_window = connection->thread_data.incoming_stream
  1696. ? connection->thread_data.incoming_stream->thread_data.stream_window
  1697. : 0,
  1698. };
  1699. /* Resets each time it's queried */
  1700. connection->thread_data.recent_window_increments = 0;
  1701. return stats;
  1702. }