1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include "aws/s3/private/s3_auto_ranged_get.h"
- #include "aws/s3/private/s3_auto_ranged_put.h"
- #include "aws/s3/private/s3_client_impl.h"
- #include "aws/s3/private/s3_default_meta_request.h"
- #include "aws/s3/private/s3_meta_request_impl.h"
- #include "aws/s3/private/s3_request_messages.h"
- #include "aws/s3/private/s3_util.h"
- #include <aws/auth/credentials.h>
- #include <aws/common/assert.h>
- #include <aws/common/atomics.h>
- #include <aws/common/clock.h>
- #include <aws/common/device_random.h>
- #include <aws/common/environment.h>
- #include <aws/common/json.h>
- #include <aws/common/string.h>
- #include <aws/common/system_info.h>
- #include <aws/http/connection.h>
- #include <aws/http/connection_manager.h>
- #include <aws/http/proxy.h>
- #include <aws/http/request_response.h>
- #include <aws/io/channel_bootstrap.h>
- #include <aws/io/event_loop.h>
- #include <aws/io/host_resolver.h>
- #include <aws/io/retry_strategy.h>
- #include <aws/io/socket.h>
- #include <aws/io/stream.h>
- #include <aws/io/tls_channel_handler.h>
- #include <aws/io/uri.h>
- #include <aws/s3/private/s3_copy_object.h>
- #include <inttypes.h>
- #include <math.h>
- #ifdef _MSC_VER
- # pragma warning(disable : 4232) /* function pointer to dll symbol */
- #endif /* _MSC_VER */
- struct aws_s3_meta_request_work {
- struct aws_linked_list_node node;
- struct aws_s3_meta_request *meta_request;
- };
- static const enum aws_log_level s_log_level_client_stats = AWS_LL_INFO;
- static const uint32_t s_max_requests_multiplier = 4;
- /* TODO Provide analysis on origins of this value. */
- static const double s_throughput_per_vip_gbps = 4.0;
- /* Preferred amount of active connections per meta request type. */
- const uint32_t g_num_conns_per_vip_meta_request_look_up[AWS_S3_META_REQUEST_TYPE_MAX] = {
- 10, /* AWS_S3_META_REQUEST_TYPE_DEFAULT */
- 10, /* AWS_S3_META_REQUEST_TYPE_GET_OBJECT */
- 10, /* AWS_S3_META_REQUEST_TYPE_PUT_OBJECT */
- 10 /* AWS_S3_META_REQUEST_TYPE_COPY_OBJECT */
- };
- /* Should be max of s_num_conns_per_vip_meta_request_look_up */
- const uint32_t g_max_num_connections_per_vip = 10;
- /**
- * Default part size is 8 MiB to reach the best performance from the experiments we had.
- * Default max part size is SIZE_MAX on 32bit systems, which is around 4GiB; and 5GiB on a 64bit system.
- * The server limit is 5GiB, but object size limit is 5TiB for now. We should be good enough for all the cases.
- * The max number of upload parts is 10000, which limits the object size to 39TiB on 32bit and 49TiB on 64bit.
- * TODO Provide more information on other values.
- */
- static const size_t s_default_part_size = 8 * 1024 * 1024;
- static const uint64_t s_default_max_part_size = SIZE_MAX < 5368709120ULL ? SIZE_MAX : 5368709120ULL;
- static const double s_default_throughput_target_gbps = 10.0;
- static const uint32_t s_default_max_retries = 5;
- static size_t s_dns_host_address_ttl_seconds = 5 * 60;
- /* Default time until a connection is declared dead, while handling a request but seeing no activity.
- * 30 seconds mirrors the value currently used by the Java SDK. */
- static const uint32_t s_default_throughput_failure_interval_seconds = 30;
- /* Called when ref count is 0. */
- static void s_s3_client_start_destroy(void *user_data);
- /* Called by s_s3_client_process_work_default when all shutdown criteria has been met. */
- static void s_s3_client_finish_destroy_default(struct aws_s3_client *client);
- /* Called when the body streaming elg shutdown has completed. */
- static void s_s3_client_body_streaming_elg_shutdown(void *user_data);
- static void s_s3_client_create_connection_for_request(struct aws_s3_client *client, struct aws_s3_request *request);
- /* Callback which handles the HTTP connection retrieved by acquire_http_connection. */
- static void s_s3_client_on_acquire_http_connection(
- struct aws_http_connection *http_connection,
- int error_code,
- void *user_data);
- static void s_s3_client_push_meta_request_synced(
- struct aws_s3_client *client,
- struct aws_s3_meta_request *meta_request);
- /* Schedule task for processing work. (Calls the corresponding vtable function.) */
- static void s_s3_client_schedule_process_work_synced(struct aws_s3_client *client);
- /* Default implementation for scheduling processing of work. */
- static void s_s3_client_schedule_process_work_synced_default(struct aws_s3_client *client);
- /* Actual task function that processes work. */
- static void s_s3_client_process_work_task(struct aws_task *task, void *arg, enum aws_task_status task_status);
- static void s_s3_client_process_work_default(struct aws_s3_client *client);
- static void s_s3_client_endpoint_shutdown_callback(struct aws_s3_client *client);
- /* Default factory function for creating a meta request. */
- static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
- struct aws_s3_client *client,
- const struct aws_s3_meta_request_options *options);
- static struct aws_s3_client_vtable s_s3_client_default_vtable = {
- .meta_request_factory = s_s3_client_meta_request_factory_default,
- .acquire_http_connection = aws_http_connection_manager_acquire_connection,
- .get_host_address_count = aws_host_resolver_get_host_address_count,
- .schedule_process_work_synced = s_s3_client_schedule_process_work_synced_default,
- .process_work = s_s3_client_process_work_default,
- .endpoint_shutdown_callback = s_s3_client_endpoint_shutdown_callback,
- .finish_destroy = s_s3_client_finish_destroy_default,
- };
- void aws_s3_set_dns_ttl(size_t ttl) {
- s_dns_host_address_ttl_seconds = ttl;
- }
- /* Returns the max number of connections allowed.
- *
- * When meta request is NULL, this will return the overall allowed number of connections.
- *
- * If meta_request is not NULL, this will give the max number of connections allowed for that meta request type on
- * that endpoint.
- */
- uint32_t aws_s3_client_get_max_active_connections(
- struct aws_s3_client *client,
- struct aws_s3_meta_request *meta_request) {
- AWS_PRECONDITION(client);
- uint32_t num_connections_per_vip = g_max_num_connections_per_vip;
- uint32_t num_vips = client->ideal_vip_count;
- if (meta_request != NULL) {
- num_connections_per_vip = g_num_conns_per_vip_meta_request_look_up[meta_request->type];
- struct aws_s3_endpoint *endpoint = meta_request->endpoint;
- AWS_ASSERT(endpoint != NULL);
- AWS_ASSERT(client->vtable->get_host_address_count);
- size_t num_known_vips = client->vtable->get_host_address_count(
- client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
- /* If the number of known vips is less than our ideal VIP count, clamp it. */
- if (num_known_vips < (size_t)num_vips) {
- num_vips = (uint32_t)num_known_vips;
- }
- }
- /* We always want to allow for at least one VIP worth of connections. */
- if (num_vips == 0) {
- num_vips = 1;
- }
- uint32_t max_active_connections = num_vips * num_connections_per_vip;
- if (client->max_active_connections_override > 0 &&
- client->max_active_connections_override < max_active_connections) {
- max_active_connections = client->max_active_connections_override;
- }
- return max_active_connections;
- }
- /* Returns the max number of requests allowed to be in memory */
- uint32_t aws_s3_client_get_max_requests_in_flight(struct aws_s3_client *client) {
- AWS_PRECONDITION(client);
- return aws_s3_client_get_max_active_connections(client, NULL) * s_max_requests_multiplier;
- }
- /* Returns the max number of requests that should be in preparation stage (ie: reading from a stream, being signed,
- * etc.) */
- uint32_t aws_s3_client_get_max_requests_prepare(struct aws_s3_client *client) {
- return aws_s3_client_get_max_active_connections(client, NULL);
- }
- static uint32_t s_s3_client_get_num_requests_network_io(
- struct aws_s3_client *client,
- enum aws_s3_meta_request_type meta_request_type) {
- AWS_PRECONDITION(client);
- uint32_t num_requests_network_io = 0;
- if (meta_request_type == AWS_S3_META_REQUEST_TYPE_MAX) {
- for (uint32_t i = 0; i < AWS_S3_META_REQUEST_TYPE_MAX; ++i) {
- num_requests_network_io += (uint32_t)aws_atomic_load_int(&client->stats.num_requests_network_io[i]);
- }
- } else {
- num_requests_network_io =
- (uint32_t)aws_atomic_load_int(&client->stats.num_requests_network_io[meta_request_type]);
- }
- return num_requests_network_io;
- }
- void aws_s3_client_lock_synced_data(struct aws_s3_client *client) {
- aws_mutex_lock(&client->synced_data.lock);
- }
- void aws_s3_client_unlock_synced_data(struct aws_s3_client *client) {
- aws_mutex_unlock(&client->synced_data.lock);
- }
- struct aws_s3_client *aws_s3_client_new(
- struct aws_allocator *allocator,
- const struct aws_s3_client_config *client_config) {
- AWS_PRECONDITION(allocator);
- AWS_PRECONDITION(client_config);
- if (client_config->client_bootstrap == NULL) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "Cannot create client from client_config; client_bootstrap provided in options is invalid.");
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- /* Cannot be less than zero. If zero, use default. */
- if (client_config->throughput_target_gbps < 0.0) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "Cannot create client from client_config; throughput_target_gbps cannot less than or equal to 0.");
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- #ifdef BYO_CRYPTO
- if (client_config->tls_mode == AWS_MR_TLS_ENABLED && client_config->tls_connection_options == NULL) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "Cannot create client from client_config; when using BYO_CRYPTO, tls_connection_options can not be "
- "NULL when TLS is enabled.");
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- #endif
- struct aws_s3_client *client = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_client));
- client->allocator = allocator;
- client->vtable = &s_s3_client_default_vtable;
- aws_ref_count_init(&client->ref_count, client, (aws_simple_completion_callback *)s_s3_client_start_destroy);
- if (aws_mutex_init(&client->synced_data.lock) != AWS_OP_SUCCESS) {
- goto lock_init_fail;
- }
- aws_linked_list_init(&client->synced_data.pending_meta_request_work);
- aws_linked_list_init(&client->synced_data.prepared_requests);
- aws_linked_list_init(&client->threaded_data.meta_requests);
- aws_linked_list_init(&client->threaded_data.request_queue);
- aws_atomic_init_int(&client->stats.num_requests_in_flight, 0);
- for (uint32_t i = 0; i < (uint32_t)AWS_S3_META_REQUEST_TYPE_MAX; ++i) {
- aws_atomic_init_int(&client->stats.num_requests_network_io[i], 0);
- }
- aws_atomic_init_int(&client->stats.num_requests_stream_queued_waiting, 0);
- aws_atomic_init_int(&client->stats.num_requests_streaming, 0);
- *((uint32_t *)&client->max_active_connections_override) = client_config->max_active_connections_override;
- /* Store our client bootstrap. */
- client->client_bootstrap = aws_client_bootstrap_acquire(client_config->client_bootstrap);
- struct aws_event_loop_group *event_loop_group = client_config->client_bootstrap->event_loop_group;
- aws_event_loop_group_acquire(event_loop_group);
- client->process_work_event_loop = aws_event_loop_group_get_next_loop(event_loop_group);
- /* Make a copy of the region string. */
- client->region = aws_string_new_from_array(allocator, client_config->region.ptr, client_config->region.len);
- if (client_config->part_size != 0) {
- *((size_t *)&client->part_size) = client_config->part_size;
- } else {
- *((size_t *)&client->part_size) = s_default_part_size;
- }
- if (client_config->max_part_size != 0) {
- *((size_t *)&client->max_part_size) = client_config->max_part_size;
- } else {
- *((size_t *)&client->max_part_size) = (size_t)s_default_max_part_size;
- }
- if (client_config->max_part_size < client_config->part_size) {
- *((size_t *)&client_config->max_part_size) = client_config->part_size;
- }
- client->connect_timeout_ms = client_config->connect_timeout_ms;
- if (client_config->proxy_ev_settings) {
- client->proxy_ev_settings = aws_mem_calloc(allocator, 1, sizeof(struct proxy_env_var_settings));
- *client->proxy_ev_settings = *client_config->proxy_ev_settings;
- if (client_config->proxy_ev_settings->tls_options) {
- client->proxy_ev_tls_options = aws_mem_calloc(allocator, 1, sizeof(struct aws_tls_connection_options));
- if (aws_tls_connection_options_copy(client->proxy_ev_tls_options, client->proxy_ev_settings->tls_options)) {
- goto on_error;
- }
- client->proxy_ev_settings->tls_options = client->proxy_ev_tls_options;
- }
- }
- if (client_config->tcp_keep_alive_options) {
- client->tcp_keep_alive_options = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_tcp_keep_alive_options));
- *client->tcp_keep_alive_options = *client_config->tcp_keep_alive_options;
- }
- if (client_config->monitoring_options) {
- client->monitoring_options = *client_config->monitoring_options;
- } else {
- client->monitoring_options.minimum_throughput_bytes_per_second = 1;
- client->monitoring_options.allowable_throughput_failure_interval_seconds =
- s_default_throughput_failure_interval_seconds;
- }
- if (client_config->tls_mode == AWS_MR_TLS_ENABLED) {
- client->tls_connection_options =
- aws_mem_calloc(client->allocator, 1, sizeof(struct aws_tls_connection_options));
- if (client_config->tls_connection_options != NULL) {
- aws_tls_connection_options_copy(client->tls_connection_options, client_config->tls_connection_options);
- } else {
- #ifdef BYO_CRYPTO
- AWS_FATAL_ASSERT(false);
- goto on_error;
- #else
- struct aws_tls_ctx_options default_tls_ctx_options;
- AWS_ZERO_STRUCT(default_tls_ctx_options);
- aws_tls_ctx_options_init_default_client(&default_tls_ctx_options, allocator);
- struct aws_tls_ctx *default_tls_ctx = aws_tls_client_ctx_new(allocator, &default_tls_ctx_options);
- if (default_tls_ctx == NULL) {
- goto on_error;
- }
- aws_tls_connection_options_init_from_ctx(client->tls_connection_options, default_tls_ctx);
- aws_tls_ctx_release(default_tls_ctx);
- aws_tls_ctx_options_clean_up(&default_tls_ctx_options);
- #endif
- }
- }
- if (client_config->proxy_options) {
- client->proxy_config = aws_http_proxy_config_new_from_proxy_options_with_tls_info(
- allocator, client_config->proxy_options, client_config->tls_mode == AWS_MR_TLS_ENABLED);
- if (client->proxy_config == NULL) {
- goto on_error;
- }
- }
- /* Set up body streaming ELG */
- {
- uint16_t num_event_loops =
- (uint16_t)aws_array_list_length(&client->client_bootstrap->event_loop_group->event_loops);
- uint16_t num_streaming_threads = num_event_loops;
- if (num_streaming_threads < 1) {
- num_streaming_threads = 1;
- }
- struct aws_shutdown_callback_options body_streaming_elg_shutdown_options = {
- .shutdown_callback_fn = s_s3_client_body_streaming_elg_shutdown,
- .shutdown_callback_user_data = client,
- };
- if (aws_get_cpu_group_count() > 1) {
- client->body_streaming_elg = aws_event_loop_group_new_default_pinned_to_cpu_group(
- client->allocator, num_streaming_threads, 1, &body_streaming_elg_shutdown_options);
- } else {
- client->body_streaming_elg = aws_event_loop_group_new_default(
- client->allocator, num_streaming_threads, &body_streaming_elg_shutdown_options);
- }
- if (!client->body_streaming_elg) {
- /* Fail to create elg, we should fail the call */
- goto on_error;
- }
- client->synced_data.body_streaming_elg_allocated = true;
- }
- /* Setup cannot fail after this point. */
- if (client_config->throughput_target_gbps != 0.0) {
- *((double *)&client->throughput_target_gbps) = client_config->throughput_target_gbps;
- } else {
- *((double *)&client->throughput_target_gbps) = s_default_throughput_target_gbps;
- }
- *((enum aws_s3_meta_request_compute_content_md5 *)&client->compute_content_md5) =
- client_config->compute_content_md5;
- /* Determine how many vips are ideal by dividing target-throughput by throughput-per-vip. */
- {
- double ideal_vip_count_double = client->throughput_target_gbps / s_throughput_per_vip_gbps;
- *((uint32_t *)&client->ideal_vip_count) = (uint32_t)ceil(ideal_vip_count_double);
- }
- if (client_config->signing_config) {
- client->cached_signing_config = aws_cached_signing_config_new(client->allocator, client_config->signing_config);
- }
- client->synced_data.active = true;
- if (client_config->retry_strategy != NULL) {
- aws_retry_strategy_acquire(client_config->retry_strategy);
- client->retry_strategy = client_config->retry_strategy;
- } else {
- struct aws_exponential_backoff_retry_options backoff_retry_options = {
- .el_group = client_config->client_bootstrap->event_loop_group,
- .max_retries = s_default_max_retries,
- };
- struct aws_standard_retry_options retry_options = {
- .backoff_retry_options = backoff_retry_options,
- };
- client->retry_strategy = aws_retry_strategy_new_standard(allocator, &retry_options);
- }
- aws_hash_table_init(
- &client->synced_data.endpoints,
- client->allocator,
- 10,
- aws_hash_string,
- aws_hash_callback_string_eq,
- aws_hash_callback_string_destroy,
- NULL);
- /* Initialize shutdown options and tracking. */
- client->shutdown_callback = client_config->shutdown_callback;
- client->shutdown_callback_user_data = client_config->shutdown_callback_user_data;
- *((bool *)&client->enable_read_backpressure) = client_config->enable_read_backpressure;
- *((size_t *)&client->initial_read_window) = client_config->initial_read_window;
- return client;
- on_error:
- aws_string_destroy(client->region);
- if (client->tls_connection_options) {
- aws_tls_connection_options_clean_up(client->tls_connection_options);
- aws_mem_release(client->allocator, client->tls_connection_options);
- client->tls_connection_options = NULL;
- }
- if (client->proxy_config) {
- aws_http_proxy_config_destroy(client->proxy_config);
- }
- if (client->proxy_ev_tls_options) {
- aws_tls_connection_options_clean_up(client->proxy_ev_tls_options);
- aws_mem_release(client->allocator, client->proxy_ev_tls_options);
- client->proxy_ev_settings->tls_options = NULL;
- }
- aws_mem_release(client->allocator, client->proxy_ev_settings);
- aws_mem_release(client->allocator, client->tcp_keep_alive_options);
- aws_event_loop_group_release(client->client_bootstrap->event_loop_group);
- aws_client_bootstrap_release(client->client_bootstrap);
- aws_mutex_clean_up(&client->synced_data.lock);
- lock_init_fail:
- aws_mem_release(client->allocator, client);
- return NULL;
- }
- struct aws_s3_client *aws_s3_client_acquire(struct aws_s3_client *client) {
- AWS_PRECONDITION(client);
- aws_ref_count_acquire(&client->ref_count);
- return client;
- }
- struct aws_s3_client *aws_s3_client_release(struct aws_s3_client *client) {
- if (client != NULL) {
- aws_ref_count_release(&client->ref_count);
- }
- return NULL;
- }
- static void s_s3_client_start_destroy(void *user_data) {
- struct aws_s3_client *client = user_data;
- AWS_PRECONDITION(client);
- AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client starting destruction.", (void *)client);
- struct aws_linked_list local_vip_list;
- aws_linked_list_init(&local_vip_list);
- /* BEGIN CRITICAL SECTION */
- {
- aws_s3_client_lock_synced_data(client);
- client->synced_data.active = false;
- /* Prevent the client from cleaning up in between the mutex unlock/re-lock below.*/
- client->synced_data.start_destroy_executing = true;
- aws_s3_client_unlock_synced_data(client);
- }
- /* END CRITICAL SECTION */
- aws_event_loop_group_release(client->body_streaming_elg);
- client->body_streaming_elg = NULL;
- /* BEGIN CRITICAL SECTION */
- {
- aws_s3_client_lock_synced_data(client);
- client->synced_data.start_destroy_executing = false;
- /* Schedule the work task to clean up outstanding connections and to call s_s3_client_finish_destroy function if
- * everything cleaning up asynchronously has finished. */
- s_s3_client_schedule_process_work_synced(client);
- aws_s3_client_unlock_synced_data(client);
- }
- /* END CRITICAL SECTION */
- }
- static void s_s3_client_finish_destroy_default(struct aws_s3_client *client) {
- AWS_PRECONDITION(client);
- AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client finishing destruction.", (void *)client);
- aws_string_destroy(client->region);
- client->region = NULL;
- if (client->tls_connection_options) {
- aws_tls_connection_options_clean_up(client->tls_connection_options);
- aws_mem_release(client->allocator, client->tls_connection_options);
- client->tls_connection_options = NULL;
- }
- if (client->proxy_config) {
- aws_http_proxy_config_destroy(client->proxy_config);
- }
- if (client->proxy_ev_tls_options) {
- aws_tls_connection_options_clean_up(client->proxy_ev_tls_options);
- aws_mem_release(client->allocator, client->proxy_ev_tls_options);
- client->proxy_ev_settings->tls_options = NULL;
- }
- aws_mem_release(client->allocator, client->proxy_ev_settings);
- aws_mem_release(client->allocator, client->tcp_keep_alive_options);
- aws_mutex_clean_up(&client->synced_data.lock);
- AWS_ASSERT(aws_linked_list_empty(&client->synced_data.pending_meta_request_work));
- AWS_ASSERT(aws_linked_list_empty(&client->threaded_data.meta_requests));
- aws_hash_table_clean_up(&client->synced_data.endpoints);
- aws_retry_strategy_release(client->retry_strategy);
- aws_event_loop_group_release(client->client_bootstrap->event_loop_group);
- aws_client_bootstrap_release(client->client_bootstrap);
- aws_cached_signing_config_destroy(client->cached_signing_config);
- aws_s3_client_shutdown_complete_callback_fn *shutdown_callback = client->shutdown_callback;
- void *shutdown_user_data = client->shutdown_callback_user_data;
- aws_mem_release(client->allocator, client);
- client = NULL;
- if (shutdown_callback != NULL) {
- shutdown_callback(shutdown_user_data);
- }
- }
- static void s_s3_client_body_streaming_elg_shutdown(void *user_data) {
- struct aws_s3_client *client = user_data;
- AWS_PRECONDITION(client);
- AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client body streaming ELG shutdown.", (void *)client);
- /* BEGIN CRITICAL SECTION */
- {
- aws_s3_client_lock_synced_data(client);
- client->synced_data.body_streaming_elg_allocated = false;
- s_s3_client_schedule_process_work_synced(client);
- aws_s3_client_unlock_synced_data(client);
- }
- /* END CRITICAL SECTION */
- }
- uint32_t aws_s3_client_queue_requests_threaded(
- struct aws_s3_client *client,
- struct aws_linked_list *request_list,
- bool queue_front) {
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(request_list);
- uint32_t request_list_size = 0;
- for (struct aws_linked_list_node *node = aws_linked_list_begin(request_list);
- node != aws_linked_list_end(request_list);
- node = aws_linked_list_next(node)) {
- ++request_list_size;
- }
- if (queue_front) {
- aws_linked_list_move_all_front(&client->threaded_data.request_queue, request_list);
- } else {
- aws_linked_list_move_all_back(&client->threaded_data.request_queue, request_list);
- }
- client->threaded_data.request_queue_size += request_list_size;
- return request_list_size;
- }
- struct aws_s3_request *aws_s3_client_dequeue_request_threaded(struct aws_s3_client *client) {
- AWS_PRECONDITION(client);
- if (aws_linked_list_empty(&client->threaded_data.request_queue)) {
- return NULL;
- }
- struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&client->threaded_data.request_queue);
- struct aws_s3_request *request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node);
- --client->threaded_data.request_queue_size;
- return request;
- }
- /*
- * There is currently some overlap between user provided Host header and endpoint
- * override. This function handles the corner cases for when either or both are provided.
- */
- int s_apply_endpoint_override(
- const struct aws_s3_client *client,
- struct aws_http_headers *message_headers,
- const struct aws_uri *endpoint) {
- AWS_PRECONDITION(message_headers);
- const struct aws_byte_cursor *endpoint_authority = endpoint == NULL ? NULL : aws_uri_authority(endpoint);
- if (!aws_http_headers_has(message_headers, g_host_header_name)) {
- if (endpoint_authority == NULL) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Cannot create meta s3 request; message provided in options does not have either 'Host' header "
- "set or endpoint override.",
- (void *)client);
- return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- }
- if (aws_http_headers_set(message_headers, g_host_header_name, *endpoint_authority)) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Cannot create meta s3 request; failed to set 'Host' header based on endpoint override.",
- (void *)client);
- return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- }
- }
- struct aws_byte_cursor host_value;
- AWS_FATAL_ASSERT(aws_http_headers_get(message_headers, g_host_header_name, &host_value) == AWS_OP_SUCCESS);
- if (endpoint_authority != NULL && !aws_byte_cursor_eq(&host_value, endpoint_authority)) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Cannot create meta s3 request; host header value " PRInSTR
- " does not match endpoint override " PRInSTR,
- (void *)client,
- AWS_BYTE_CURSOR_PRI(host_value),
- AWS_BYTE_CURSOR_PRI(*endpoint_authority));
- return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- }
- return AWS_OP_SUCCESS;
- }
- /* Public facing make-meta-request function. */
- struct aws_s3_meta_request *aws_s3_client_make_meta_request(
- struct aws_s3_client *client,
- const struct aws_s3_meta_request_options *options) {
- AWS_LOGF_INFO(AWS_LS_S3_CLIENT, "id=%p Initiating making of meta request", (void *)client);
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(client->vtable);
- AWS_PRECONDITION(client->vtable->meta_request_factory);
- AWS_PRECONDITION(options);
- if (options->type >= AWS_S3_META_REQUEST_TYPE_MAX) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Cannot create meta s3 request; invalid meta request type specified.",
- (void *)client);
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- if (options->message == NULL) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Cannot create meta s3 request; message provided in options is invalid.",
- (void *)client);
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- struct aws_http_headers *message_headers = aws_http_message_get_headers(options->message);
- if (message_headers == NULL) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Cannot create meta s3 request; message provided in options does not contain headers.",
- (void *)client);
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- if (options->checksum_config) {
- if (options->checksum_config->location == AWS_SCL_TRAILER) {
- struct aws_http_headers *headers = aws_http_message_get_headers(options->message);
- struct aws_byte_cursor existing_encoding;
- AWS_ZERO_STRUCT(existing_encoding);
- if (aws_http_headers_get(headers, g_content_encoding_header_name, &existing_encoding) == AWS_OP_SUCCESS) {
- if (aws_byte_cursor_find_exact(&existing_encoding, &g_content_encoding_header_aws_chunked, NULL) ==
- AWS_OP_SUCCESS) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Cannot create meta s3 request; for trailer checksum, the original request cannot be "
- "aws-chunked encoding. The client will encode the request instead.",
- (void *)client);
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- }
- }
- if (options->checksum_config->location == AWS_SCL_HEADER) {
- /* TODO: support calculate checksum to add to header */
- aws_raise_error(AWS_ERROR_UNSUPPORTED_OPERATION);
- return NULL;
- }
- if (options->checksum_config->location != AWS_SCL_NONE &&
- options->checksum_config->checksum_algorithm == AWS_SCA_NONE) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Cannot create meta s3 request; checksum algorithm must be set to calculate checksum.",
- (void *)client);
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- if (options->checksum_config->checksum_algorithm != AWS_SCA_NONE &&
- options->checksum_config->location == AWS_SCL_NONE) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Cannot create meta s3 request; checksum algorithm cannot be set if not calculate checksum from "
- "client.",
- (void *)client);
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- }
- if (s_apply_endpoint_override(client, message_headers, options->endpoint)) {
- return NULL;
- }
- struct aws_byte_cursor host_header_value;
- /* The Host header must be set from s_apply_endpoint_override, if not errored out */
- AWS_FATAL_ASSERT(aws_http_headers_get(message_headers, g_host_header_name, &host_header_value) == AWS_OP_SUCCESS);
- bool is_https = true;
- uint16_t port = 0;
- if (options->endpoint != NULL) {
- struct aws_byte_cursor https_scheme = aws_byte_cursor_from_c_str("https");
- struct aws_byte_cursor http_scheme = aws_byte_cursor_from_c_str("http");
- const struct aws_byte_cursor *scheme = aws_uri_scheme(options->endpoint);
- is_https = aws_byte_cursor_eq_ignore_case(scheme, &https_scheme);
- if (!is_https && !aws_byte_cursor_eq_ignore_case(scheme, &http_scheme)) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Cannot create meta s3 request; unexpected scheme '" PRInSTR "' in endpoint override.",
- (void *)client,
- AWS_BYTE_CURSOR_PRI(*scheme));
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- port = aws_uri_port(options->endpoint);
- }
- struct aws_s3_meta_request *meta_request = client->vtable->meta_request_factory(client, options);
- if (meta_request == NULL) {
- AWS_LOGF_ERROR(AWS_LS_S3_CLIENT, "id=%p: Could not create new meta request.", (void *)client);
- return NULL;
- }
- bool error_occurred = false;
- /* BEGIN CRITICAL SECTION */
- {
- aws_s3_client_lock_synced_data(client);
- struct aws_string *endpoint_host_name = NULL;
- if (options->endpoint != NULL) {
- endpoint_host_name = aws_string_new_from_cursor(client->allocator, aws_uri_host_name(options->endpoint));
- } else {
- struct aws_uri host_uri;
- if (aws_uri_init_parse(&host_uri, client->allocator, &host_header_value)) {
- error_occurred = true;
- goto unlock;
- }
- endpoint_host_name = aws_string_new_from_cursor(client->allocator, aws_uri_host_name(&host_uri));
- aws_uri_clean_up(&host_uri);
- }
- struct aws_s3_endpoint *endpoint = NULL;
- struct aws_hash_element *endpoint_hash_element = NULL;
- int was_created = 0;
- if (aws_hash_table_create(
- &client->synced_data.endpoints, endpoint_host_name, &endpoint_hash_element, &was_created)) {
- aws_string_destroy(endpoint_host_name);
- error_occurred = true;
- goto unlock;
- }
- if (was_created) {
- struct aws_s3_endpoint_options endpoint_options = {
- .host_name = endpoint_host_name,
- .client_bootstrap = client->client_bootstrap,
- .tls_connection_options = is_https ? client->tls_connection_options : NULL,
- .dns_host_address_ttl_seconds = s_dns_host_address_ttl_seconds,
- .client = client,
- .max_connections = aws_s3_client_get_max_active_connections(client, NULL),
- .port = port,
- .proxy_config = client->proxy_config,
- .proxy_ev_settings = client->proxy_ev_settings,
- .connect_timeout_ms = client->connect_timeout_ms,
- .tcp_keep_alive_options = client->tcp_keep_alive_options,
- .monitoring_options = &client->monitoring_options,
- };
- endpoint = aws_s3_endpoint_new(client->allocator, &endpoint_options);
- if (endpoint == NULL) {
- aws_hash_table_remove(&client->synced_data.endpoints, endpoint_host_name, NULL, NULL);
- aws_string_destroy(endpoint_host_name);
- error_occurred = true;
- goto unlock;
- }
- endpoint_hash_element->value = endpoint;
- ++client->synced_data.num_endpoints_allocated;
- } else {
- endpoint = endpoint_hash_element->value;
- aws_s3_endpoint_acquire(endpoint, true /*already_holding_lock*/);
- aws_string_destroy(endpoint_host_name);
- endpoint_host_name = NULL;
- }
- meta_request->endpoint = endpoint;
- s_s3_client_push_meta_request_synced(client, meta_request);
- s_s3_client_schedule_process_work_synced(client);
- unlock:
- aws_s3_client_unlock_synced_data(client);
- }
- /* END CRITICAL SECTION */
- if (error_occurred) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Could not create meta request due to error %d (%s)",
- (void *)client,
- aws_last_error(),
- aws_error_str(aws_last_error()));
- meta_request = aws_s3_meta_request_release(meta_request);
- } else {
- AWS_LOGF_INFO(AWS_LS_S3_CLIENT, "id=%p: Created meta request %p", (void *)client, (void *)meta_request);
- }
- return meta_request;
- }
- static void s_s3_client_endpoint_shutdown_callback(struct aws_s3_client *client) {
- AWS_PRECONDITION(client);
- /* BEGIN CRITICAL SECTION */
- {
- aws_s3_client_lock_synced_data(client);
- --client->synced_data.num_endpoints_allocated;
- s_s3_client_schedule_process_work_synced(client);
- aws_s3_client_unlock_synced_data(client);
- }
- /* END CRITICAL SECTION */
- }
- static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
- struct aws_s3_client *client,
- const struct aws_s3_meta_request_options *options) {
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(options);
- struct aws_http_headers *initial_message_headers = aws_http_message_get_headers(options->message);
- AWS_ASSERT(initial_message_headers);
- uint64_t content_length = 0;
- struct aws_byte_cursor content_length_cursor;
- bool content_length_header_found = false;
- if (!aws_http_headers_get(initial_message_headers, g_content_length_header_name, &content_length_cursor)) {
- if (aws_byte_cursor_utf8_parse_u64(content_length_cursor, &content_length)) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_META_REQUEST,
- "Could not parse Content-Length header. header value is:" PRInSTR "",
- AWS_BYTE_CURSOR_PRI(content_length_cursor));
- aws_raise_error(AWS_ERROR_S3_INVALID_CONTENT_LENGTH_HEADER);
- return NULL;
- }
- content_length_header_found = true;
- }
- /* Call the appropriate meta-request new function. */
- switch (options->type) {
- case AWS_S3_META_REQUEST_TYPE_GET_OBJECT: {
- /* If the initial request already has partNumber, the request is not
- * splittable(?). Treat it as a Default request.
- * TODO: Still need tests to verify that the request of a part is
- * splittable or not */
- if (aws_http_headers_has(initial_message_headers, aws_byte_cursor_from_c_str("partNumber"))) {
- return aws_s3_meta_request_default_new(client->allocator, client, content_length, false, options);
- }
- return aws_s3_meta_request_auto_ranged_get_new(client->allocator, client, client->part_size, options);
- }
- case AWS_S3_META_REQUEST_TYPE_PUT_OBJECT: {
- if (!content_length_header_found) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_META_REQUEST,
- "Could not create auto-ranged-put meta request; there is no Content-Length header present.");
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- struct aws_input_stream *input_stream = aws_http_message_get_body_stream(options->message);
- if ((input_stream == NULL) && (options->send_filepath.len == 0)) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_META_REQUEST,
- "Could not create auto-ranged-put meta request; filepath or body stream must be set.");
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- if (options->resume_token == NULL) {
- size_t client_part_size = client->part_size;
- size_t client_max_part_size = client->max_part_size;
- if (client_part_size < g_s3_min_upload_part_size) {
- AWS_LOGF_WARN(
- AWS_LS_S3_META_REQUEST,
- "Client config part size of %" PRIu64 " is less than the minimum upload part size of %" PRIu64
- ". Using to the minimum part-size for upload.",
- (uint64_t)client_part_size,
- (uint64_t)g_s3_min_upload_part_size);
- client_part_size = g_s3_min_upload_part_size;
- }
- if (client_max_part_size < g_s3_min_upload_part_size) {
- AWS_LOGF_WARN(
- AWS_LS_S3_META_REQUEST,
- "Client config max part size of %" PRIu64
- " is less than the minimum upload part size of %" PRIu64
- ". Clamping to the minimum part-size for upload.",
- (uint64_t)client_max_part_size,
- (uint64_t)g_s3_min_upload_part_size);
- client_max_part_size = g_s3_min_upload_part_size;
- }
- if (content_length <= client_part_size) {
- return aws_s3_meta_request_default_new(
- client->allocator,
- client,
- content_length,
- client->compute_content_md5 == AWS_MR_CONTENT_MD5_ENABLED &&
- !aws_http_headers_has(initial_message_headers, g_content_md5_header_name),
- options);
- } else {
- if (aws_s3_message_util_check_checksum_header(options->message)) {
- /* The checksum header has been set and the request will be splitted. We fail the request */
- AWS_LOGF_ERROR(
- AWS_LS_S3_META_REQUEST,
- "Could not create auto-ranged-put meta request; checksum headers has been set for "
- "auto-ranged-put that will be split. Pre-calculated checksums are only supported for "
- "single "
- "part upload.");
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- }
- uint64_t part_size_uint64 = content_length / (uint64_t)g_s3_max_num_upload_parts;
- if (part_size_uint64 > SIZE_MAX) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_META_REQUEST,
- "Could not create auto-ranged-put meta request; required part size of %" PRIu64
- " bytes is too large for platform.",
- part_size_uint64);
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- size_t part_size = (size_t)part_size_uint64;
- if (part_size > client_max_part_size) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_META_REQUEST,
- "Could not create auto-ranged-put meta request; required part size for put request is %" PRIu64
- ", but current maximum part size is %" PRIu64,
- (uint64_t)part_size,
- (uint64_t)client_max_part_size);
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- if (part_size < client_part_size) {
- part_size = client_part_size;
- }
- uint32_t num_parts = (uint32_t)(content_length / part_size);
- if ((content_length % part_size) > 0) {
- ++num_parts;
- }
- return aws_s3_meta_request_auto_ranged_put_new(
- client->allocator, client, part_size, content_length, num_parts, options);
- } else {
- /* dont pass part size and total num parts. constructor will pick it up from token */
- return aws_s3_meta_request_auto_ranged_put_new(
- client->allocator, client, 0, content_length, 0, options);
- }
- }
- case AWS_S3_META_REQUEST_TYPE_COPY_OBJECT: {
- /* TODO: support copy object correctly. */
- AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "CopyObject is not currently supported");
- aws_raise_error(AWS_ERROR_UNIMPLEMENTED);
- return NULL;
- }
- case AWS_S3_META_REQUEST_TYPE_DEFAULT:
- return aws_s3_meta_request_default_new(client->allocator, client, content_length, false, options);
- default:
- AWS_FATAL_ASSERT(false);
- }
- return NULL;
- }
- static void s_s3_client_push_meta_request_synced(
- struct aws_s3_client *client,
- struct aws_s3_meta_request *meta_request) {
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(meta_request);
- ASSERT_SYNCED_DATA_LOCK_HELD(client);
- struct aws_s3_meta_request_work *meta_request_work =
- aws_mem_calloc(client->allocator, 1, sizeof(struct aws_s3_meta_request_work));
- aws_s3_meta_request_acquire(meta_request);
- meta_request_work->meta_request = meta_request;
- aws_linked_list_push_back(&client->synced_data.pending_meta_request_work, &meta_request_work->node);
- }
- static void s_s3_client_schedule_process_work_synced(struct aws_s3_client *client) {
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(client->vtable);
- AWS_PRECONDITION(client->vtable->schedule_process_work_synced);
- ASSERT_SYNCED_DATA_LOCK_HELD(client);
- client->vtable->schedule_process_work_synced(client);
- }
- static void s_s3_client_schedule_process_work_synced_default(struct aws_s3_client *client) {
- ASSERT_SYNCED_DATA_LOCK_HELD(client);
- if (client->synced_data.process_work_task_scheduled) {
- return;
- }
- aws_task_init(
- &client->synced_data.process_work_task, s_s3_client_process_work_task, client, "s3_client_process_work_task");
- aws_event_loop_schedule_task_now(client->process_work_event_loop, &client->synced_data.process_work_task);
- client->synced_data.process_work_task_scheduled = true;
- }
- void aws_s3_client_schedule_process_work(struct aws_s3_client *client) {
- AWS_PRECONDITION(client);
- /* BEGIN CRITICAL SECTION */
- {
- aws_s3_client_lock_synced_data(client);
- s_s3_client_schedule_process_work_synced(client);
- aws_s3_client_unlock_synced_data(client);
- }
- /* END CRITICAL SECTION */
- }
- static void s_s3_client_remove_meta_request_threaded(
- struct aws_s3_client *client,
- struct aws_s3_meta_request *meta_request) {
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(meta_request);
- (void)client;
- aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
- meta_request->client_process_work_threaded_data.scheduled = false;
- aws_s3_meta_request_release(meta_request);
- }
- /* Task function for trying to find a request that can be processed. */
- static void s_s3_client_process_work_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
- AWS_PRECONDITION(task);
- (void)task;
- (void)task_status;
- /* Client keeps a reference to the event loop group; a 'canceled' status should not happen.*/
- AWS_ASSERT(task_status == AWS_TASK_STATUS_RUN_READY);
- struct aws_s3_client *client = arg;
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(client->vtable);
- AWS_PRECONDITION(client->vtable->process_work);
- client->vtable->process_work(client);
- }
- static void s_s3_client_process_work_default(struct aws_s3_client *client) {
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(client->vtable);
- AWS_PRECONDITION(client->vtable->finish_destroy);
- struct aws_linked_list meta_request_work_list;
- aws_linked_list_init(&meta_request_work_list);
- /*******************/
- /* Step 1: Move relevant data into thread local memory. */
- /*******************/
- AWS_LOGF_DEBUG(
- AWS_LS_S3_CLIENT,
- "id=%p s_s3_client_process_work_default - Moving relevant synced_data into threaded_data.",
- (void *)client);
- /* BEGIN CRITICAL SECTION */
- aws_s3_client_lock_synced_data(client);
- /* Once we exit this mutex, someone can reschedule this task. */
- client->synced_data.process_work_task_scheduled = false;
- client->synced_data.process_work_task_in_progress = true;
- aws_linked_list_swap_contents(&meta_request_work_list, &client->synced_data.pending_meta_request_work);
- uint32_t num_requests_queued =
- aws_s3_client_queue_requests_threaded(client, &client->synced_data.prepared_requests, false);
- {
- int sub_result = aws_sub_u32_checked(
- client->threaded_data.num_requests_being_prepared,
- num_requests_queued,
- &client->threaded_data.num_requests_being_prepared);
- AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
- (void)sub_result;
- }
- {
- int sub_result = aws_sub_u32_checked(
- client->threaded_data.num_requests_being_prepared,
- client->synced_data.num_failed_prepare_requests,
- &client->threaded_data.num_requests_being_prepared);
- client->synced_data.num_failed_prepare_requests = 0;
- AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
- (void)sub_result;
- }
- uint32_t num_endpoints_in_table = (uint32_t)aws_hash_table_get_entry_count(&client->synced_data.endpoints);
- uint32_t num_endpoints_allocated = client->synced_data.num_endpoints_allocated;
- aws_s3_client_unlock_synced_data(client);
- /* END CRITICAL SECTION */
- /*******************/
- /* Step 2: Push meta requests into the thread local list if they haven't already been scheduled. */
- /*******************/
- AWS_LOGF_DEBUG(
- AWS_LS_S3_CLIENT, "id=%p s_s3_client_process_work_default - Processing any new meta requests.", (void *)client);
- while (!aws_linked_list_empty(&meta_request_work_list)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_back(&meta_request_work_list);
- struct aws_s3_meta_request_work *meta_request_work =
- AWS_CONTAINER_OF(node, struct aws_s3_meta_request_work, node);
- AWS_FATAL_ASSERT(meta_request_work != NULL);
- AWS_FATAL_ASSERT(meta_request_work->meta_request != NULL);
- struct aws_s3_meta_request *meta_request = meta_request_work->meta_request;
- if (!meta_request->client_process_work_threaded_data.scheduled) {
- aws_linked_list_push_back(
- &client->threaded_data.meta_requests, &meta_request->client_process_work_threaded_data.node);
- meta_request->client_process_work_threaded_data.scheduled = true;
- } else {
- meta_request = aws_s3_meta_request_release(meta_request);
- }
- aws_mem_release(client->allocator, meta_request_work);
- }
- /*******************/
- /* Step 3: Update relevant meta requests and connections. */
- /*******************/
- {
- AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Updating meta requests.", (void *)client);
- aws_s3_client_update_meta_requests_threaded(client);
- AWS_LOGF_DEBUG(
- AWS_LS_S3_CLIENT, "id=%p Updating connections, assigning requests where possible.", (void *)client);
- aws_s3_client_update_connections_threaded(client);
- }
- /*******************/
- /* Step 4: Log client stats. */
- /*******************/
- {
- uint32_t num_requests_tracked_requests = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);
- uint32_t num_auto_ranged_get_network_io =
- s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_GET_OBJECT);
- uint32_t num_auto_ranged_put_network_io =
- s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_PUT_OBJECT);
- uint32_t num_auto_default_network_io =
- s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_DEFAULT);
- uint32_t num_requests_network_io =
- s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_MAX);
- uint32_t num_requests_stream_queued_waiting =
- (uint32_t)aws_atomic_load_int(&client->stats.num_requests_stream_queued_waiting);
- uint32_t num_requests_streaming = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_streaming);
- uint32_t total_approx_requests = num_requests_network_io + num_requests_stream_queued_waiting +
- num_requests_streaming + client->threaded_data.num_requests_being_prepared +
- client->threaded_data.request_queue_size;
- AWS_LOGF(
- s_log_level_client_stats,
- AWS_LS_S3_CLIENT_STATS,
- "id=%p Requests-in-flight(approx/exact):%d/%d Requests-preparing:%d Requests-queued:%d "
- "Requests-network(get/put/default/total):%d/%d/%d/%d Requests-streaming-waiting:%d Requests-streaming:%d "
- " Endpoints(in-table/allocated):%d/%d",
- (void *)client,
- total_approx_requests,
- num_requests_tracked_requests,
- client->threaded_data.num_requests_being_prepared,
- client->threaded_data.request_queue_size,
- num_auto_ranged_get_network_io,
- num_auto_ranged_put_network_io,
- num_auto_default_network_io,
- num_requests_network_io,
- num_requests_stream_queued_waiting,
- num_requests_streaming,
- num_endpoints_in_table,
- num_endpoints_allocated);
- }
- /*******************/
- /* Step 5: Check for client shutdown. */
- /*******************/
- {
- /* BEGIN CRITICAL SECTION */
- aws_s3_client_lock_synced_data(client);
- client->synced_data.process_work_task_in_progress = false;
- /* This flag should never be set twice. If it was, that means a double-free could occur.*/
- AWS_ASSERT(!client->synced_data.finish_destroy);
- bool finish_destroy = client->synced_data.active == false &&
- client->synced_data.start_destroy_executing == false &&
- client->synced_data.body_streaming_elg_allocated == false &&
- client->synced_data.process_work_task_scheduled == false &&
- client->synced_data.process_work_task_in_progress == false &&
- client->synced_data.num_endpoints_allocated == 0;
- client->synced_data.finish_destroy = finish_destroy;
- if (!client->synced_data.active) {
- AWS_LOGF_DEBUG(
- AWS_LS_S3_CLIENT,
- "id=%p Client shutdown progress: starting_destroy_executing=%d body_streaming_elg_allocated=%d "
- "process_work_task_scheduled=%d process_work_task_in_progress=%d num_endpoints_allocated=%d "
- "finish_destroy=%d",
- (void *)client,
- (int)client->synced_data.start_destroy_executing,
- (int)client->synced_data.body_streaming_elg_allocated,
- (int)client->synced_data.process_work_task_scheduled,
- (int)client->synced_data.process_work_task_in_progress,
- (int)client->synced_data.num_endpoints_allocated,
- (int)client->synced_data.finish_destroy);
- }
- aws_s3_client_unlock_synced_data(client);
- /* END CRITICAL SECTION */
- if (finish_destroy) {
- client->vtable->finish_destroy(client);
- }
- }
- }
- static void s_s3_client_prepare_callback_queue_request(
- struct aws_s3_meta_request *meta_request,
- struct aws_s3_request *request,
- int error_code,
- void *user_data);
- void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
- AWS_PRECONDITION(client);
- const uint32_t max_requests_in_flight = aws_s3_client_get_max_requests_in_flight(client);
- const uint32_t max_requests_prepare = aws_s3_client_get_max_requests_prepare(client);
- struct aws_linked_list meta_requests_work_remaining;
- aws_linked_list_init(&meta_requests_work_remaining);
- uint32_t num_requests_in_flight = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);
- const uint32_t pass_flags[] = {
- AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE,
- 0,
- };
- const uint32_t num_passes = AWS_ARRAY_SIZE(pass_flags);
- for (uint32_t pass_index = 0; pass_index < num_passes; ++pass_index) {
- /* While:
- * * Number of being-prepared + already-prepared-and-queued requests is less than the max that can be in the
- * preparation stage.
- * * Total number of requests tracked by the client is less than the max tracked ("in flight") requests.
- * * There are meta requests to get requests from.
- *
- * Then update meta requests to get new requests that can then be prepared (reading from any streams, signing,
- * etc.) for sending.
- */
- while ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) <
- max_requests_prepare &&
- num_requests_in_flight < max_requests_in_flight &&
- !aws_linked_list_empty(&client->threaded_data.meta_requests)) {
- struct aws_linked_list_node *meta_request_node =
- aws_linked_list_begin(&client->threaded_data.meta_requests);
- struct aws_s3_meta_request *meta_request =
- AWS_CONTAINER_OF(meta_request_node, struct aws_s3_meta_request, client_process_work_threaded_data);
- struct aws_s3_endpoint *endpoint = meta_request->endpoint;
- AWS_ASSERT(endpoint != NULL);
- AWS_ASSERT(client->vtable->get_host_address_count);
- size_t num_known_vips = client->vtable->get_host_address_count(
- client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
- /* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in
- * ramping up requests just yet. If there is already enough in the queue for one address (even if those
- * aren't for this particular endpoint) we skip over this meta request for now. */
- if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared +
- client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) {
- aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
- aws_linked_list_push_back(
- &meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
- continue;
- }
- struct aws_s3_request *request = NULL;
- /* Try to grab the next request from the meta request. */
- bool work_remaining = aws_s3_meta_request_update(meta_request, pass_flags[pass_index], &request);
- if (work_remaining) {
- /* If there is work remaining, but we didn't get a request back, take the meta request out of the
- * list so that we don't use it again during this function, with the intention of putting it back in
- * the list before this function ends. */
- if (request == NULL) {
- aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
- aws_linked_list_push_back(
- &meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
- } else {
- request->tracked_by_client = true;
- ++client->threaded_data.num_requests_being_prepared;
- num_requests_in_flight =
- (uint32_t)aws_atomic_fetch_add(&client->stats.num_requests_in_flight, 1) + 1;
- aws_s3_meta_request_prepare_request(
- meta_request, request, s_s3_client_prepare_callback_queue_request, client);
- }
- } else {
- s_s3_client_remove_meta_request_threaded(client, meta_request);
- }
- }
- aws_linked_list_move_all_front(&client->threaded_data.meta_requests, &meta_requests_work_remaining);
- }
- }
- static void s_s3_client_meta_request_finished_request(
- struct aws_s3_client *client,
- struct aws_s3_meta_request *meta_request,
- struct aws_s3_request *request,
- int error_code) {
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(request);
- if (request->tracked_by_client) {
- /* BEGIN CRITICAL SECTION */
- aws_s3_client_lock_synced_data(client);
- aws_atomic_fetch_sub(&client->stats.num_requests_in_flight, 1);
- s_s3_client_schedule_process_work_synced(client);
- aws_s3_client_unlock_synced_data(client);
- /* END CRITICAL SECTION */
- }
- aws_s3_meta_request_finished_request(meta_request, request, error_code);
- }
- static void s_s3_client_prepare_callback_queue_request(
- struct aws_s3_meta_request *meta_request,
- struct aws_s3_request *request,
- int error_code,
- void *user_data) {
- AWS_PRECONDITION(meta_request);
- AWS_PRECONDITION(request);
- struct aws_s3_client *client = user_data;
- AWS_PRECONDITION(client);
- if (error_code != AWS_ERROR_SUCCESS) {
- s_s3_client_meta_request_finished_request(client, meta_request, request, error_code);
- aws_s3_request_release(request);
- request = NULL;
- }
- /* BEGIN CRITICAL SECTION */
- {
- aws_s3_client_lock_synced_data(client);
- if (error_code == AWS_ERROR_SUCCESS) {
- aws_linked_list_push_back(&client->synced_data.prepared_requests, &request->node);
- } else {
- ++client->synced_data.num_failed_prepare_requests;
- }
- s_s3_client_schedule_process_work_synced(client);
- aws_s3_client_unlock_synced_data(client);
- }
- /* END CRITICAL SECTION */
- }
- void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(client->vtable);
- struct aws_linked_list left_over_requests;
- aws_linked_list_init(&left_over_requests);
- while (s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_MAX) <
- aws_s3_client_get_max_active_connections(client, NULL) &&
- !aws_linked_list_empty(&client->threaded_data.request_queue)) {
- struct aws_s3_request *request = aws_s3_client_dequeue_request_threaded(client);
- const uint32_t max_active_connections = aws_s3_client_get_max_active_connections(client, request->meta_request);
- /* Unless the request is marked "always send", if this meta request has a finish result, then finish the request
- * now and release it. */
- if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) {
- s_s3_client_meta_request_finished_request(client, request->meta_request, request, AWS_ERROR_S3_CANCELED);
- aws_s3_request_release(request);
- request = NULL;
- } else if (
- s_s3_client_get_num_requests_network_io(client, request->meta_request->type) < max_active_connections) {
- s_s3_client_create_connection_for_request(client, request);
- } else {
- /* Push the request into the left-over list to be used in a future call of this function. */
- aws_linked_list_push_back(&left_over_requests, &request->node);
- }
- }
- aws_s3_client_queue_requests_threaded(client, &left_over_requests, true);
- }
- static void s_s3_client_acquired_retry_token(
- struct aws_retry_strategy *retry_strategy,
- int error_code,
- struct aws_retry_token *token,
- void *user_data);
- static void s_s3_client_retry_ready(struct aws_retry_token *token, int error_code, void *user_data);
- static void s_s3_client_create_connection_for_request_default(
- struct aws_s3_client *client,
- struct aws_s3_request *request);
- static void s_s3_client_create_connection_for_request(struct aws_s3_client *client, struct aws_s3_request *request) {
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(client->vtable);
- if (client->vtable->create_connection_for_request) {
- client->vtable->create_connection_for_request(client, request);
- return;
- }
- s_s3_client_create_connection_for_request_default(client, request);
- }
- static void s_s3_client_create_connection_for_request_default(
- struct aws_s3_client *client,
- struct aws_s3_request *request) {
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(request);
- struct aws_s3_meta_request *meta_request = request->meta_request;
- AWS_PRECONDITION(meta_request);
- aws_atomic_fetch_add(&client->stats.num_requests_network_io[meta_request->type], 1);
- struct aws_s3_connection *connection = aws_mem_calloc(client->allocator, 1, sizeof(struct aws_s3_connection));
- connection->endpoint = aws_s3_endpoint_acquire(meta_request->endpoint, false /*already_holding_lock*/);
- connection->request = request;
- struct aws_byte_cursor host_header_value;
- AWS_ZERO_STRUCT(host_header_value);
- struct aws_http_headers *message_headers = aws_http_message_get_headers(meta_request->initial_request_message);
- AWS_ASSERT(message_headers);
- int get_header_result = aws_http_headers_get(message_headers, g_host_header_name, &host_header_value);
- AWS_ASSERT(get_header_result == AWS_OP_SUCCESS);
- (void)get_header_result;
- if (aws_retry_strategy_acquire_retry_token(
- client->retry_strategy, &host_header_value, s_s3_client_acquired_retry_token, connection, 0)) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Client could not acquire retry token for request %p due to error %d (%s)",
- (void *)client,
- (void *)request,
- aws_last_error_or_unknown(),
- aws_error_str(aws_last_error_or_unknown()));
- goto reset_connection;
- }
- return;
- reset_connection:
- aws_s3_client_notify_connection_finished(
- client, connection, aws_last_error_or_unknown(), AWS_S3_CONNECTION_FINISH_CODE_FAILED);
- }
- static void s_s3_client_acquired_retry_token(
- struct aws_retry_strategy *retry_strategy,
- int error_code,
- struct aws_retry_token *token,
- void *user_data) {
- AWS_PRECONDITION(retry_strategy);
- (void)retry_strategy;
- struct aws_s3_connection *connection = user_data;
- AWS_PRECONDITION(connection);
- struct aws_s3_request *request = connection->request;
- AWS_PRECONDITION(request);
- struct aws_s3_meta_request *meta_request = request->meta_request;
- AWS_PRECONDITION(meta_request);
- struct aws_s3_endpoint *endpoint = meta_request->endpoint;
- AWS_ASSERT(endpoint != NULL);
- struct aws_s3_client *client = endpoint->client;
- AWS_ASSERT(client != NULL);
- if (error_code != AWS_ERROR_SUCCESS) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Client could not get retry token for connection %p processing request %p due to error %d (%s)",
- (void *)client,
- (void *)connection,
- (void *)request,
- error_code,
- aws_error_str(error_code));
- goto error_clean_up;
- }
- AWS_ASSERT(token);
- connection->retry_token = token;
- AWS_ASSERT(client->vtable->acquire_http_connection);
- /* client needs to be kept alive until s_s3_client_on_acquire_http_connection completes */
- /* TODO: not a blocker, consider managing the life time of aws_s3_client from aws_s3_endpoint to simplify usage */
- aws_s3_client_acquire(client);
- client->vtable->acquire_http_connection(
- endpoint->http_connection_manager, s_s3_client_on_acquire_http_connection, connection);
- return;
- error_clean_up:
- aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
- }
- static void s_s3_client_on_acquire_http_connection(
- struct aws_http_connection *incoming_http_connection,
- int error_code,
- void *user_data) {
- struct aws_s3_connection *connection = user_data;
- AWS_PRECONDITION(connection);
- struct aws_s3_request *request = connection->request;
- AWS_PRECONDITION(request);
- struct aws_s3_meta_request *meta_request = request->meta_request;
- AWS_PRECONDITION(meta_request);
- struct aws_s3_endpoint *endpoint = meta_request->endpoint;
- AWS_ASSERT(endpoint != NULL);
- struct aws_s3_client *client = endpoint->client;
- AWS_ASSERT(client != NULL);
- if (error_code != AWS_ERROR_SUCCESS) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_ENDPOINT,
- "id=%p: Could not acquire connection due to error code %d (%s)",
- (void *)endpoint,
- error_code,
- aws_error_str(error_code));
- if (error_code == AWS_IO_DNS_INVALID_NAME) {
- goto error_fail;
- }
- goto error_retry;
- }
- connection->http_connection = incoming_http_connection;
- aws_s3_meta_request_send_request(meta_request, connection);
- aws_s3_client_release(client); /* kept since this callback was registered */
- return;
- error_retry:
- aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_RETRY);
- aws_s3_client_release(client); /* kept since this callback was registered */
- return;
- error_fail:
- aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
- aws_s3_client_release(client); /* kept since this callback was registered */
- }
- /* Called by aws_s3_meta_request when it has finished using this connection for a single request. */
- void aws_s3_client_notify_connection_finished(
- struct aws_s3_client *client,
- struct aws_s3_connection *connection,
- int error_code,
- enum aws_s3_connection_finish_code finish_code) {
- AWS_PRECONDITION(client);
- AWS_PRECONDITION(connection);
- struct aws_s3_request *request = connection->request;
- AWS_PRECONDITION(request);
- struct aws_s3_meta_request *meta_request = request->meta_request;
- AWS_PRECONDITION(meta_request);
- AWS_PRECONDITION(meta_request->initial_request_message);
- struct aws_s3_endpoint *endpoint = meta_request->endpoint;
- AWS_PRECONDITION(endpoint);
- /* If we're trying to setup a retry... */
- if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_RETRY) {
- if (connection->retry_token == NULL) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Client could not schedule retry of request %p for meta request %p, as retry token is NULL.",
- (void *)client,
- (void *)request,
- (void *)meta_request);
- goto reset_connection;
- }
- if (aws_s3_meta_request_is_finished(meta_request)) {
- AWS_LOGF_DEBUG(
- AWS_LS_S3_CLIENT,
- "id=%p Client not scheduling retry of request %p for meta request %p with token %p because meta "
- "request has been flagged as finished.",
- (void *)client,
- (void *)request,
- (void *)meta_request,
- (void *)connection->retry_token);
- goto reset_connection;
- }
- AWS_LOGF_DEBUG(
- AWS_LS_S3_CLIENT,
- "id=%p Client scheduling retry of request %p for meta request %p with token %p.",
- (void *)client,
- (void *)request,
- (void *)meta_request,
- (void *)connection->retry_token);
- enum aws_retry_error_type error_type = AWS_RETRY_ERROR_TYPE_TRANSIENT;
- switch (error_code) {
- case AWS_ERROR_S3_INTERNAL_ERROR:
- error_type = AWS_RETRY_ERROR_TYPE_SERVER_ERROR;
- break;
- case AWS_ERROR_S3_SLOW_DOWN:
- error_type = AWS_RETRY_ERROR_TYPE_THROTTLING;
- break;
- }
- if (connection->http_connection != NULL) {
- AWS_ASSERT(endpoint->http_connection_manager);
- aws_http_connection_manager_release_connection(
- endpoint->http_connection_manager, connection->http_connection);
- connection->http_connection = NULL;
- }
- /* Ask the retry strategy to schedule a retry of the request. */
- if (aws_retry_strategy_schedule_retry(
- connection->retry_token, error_type, s_s3_client_retry_ready, connection)) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Client could not retry request %p for meta request %p with token %p due to error %d (%s)",
- (void *)client,
- (void *)request,
- (void *)meta_request,
- (void *)connection->retry_token,
- aws_last_error_or_unknown(),
- aws_error_str(aws_last_error_or_unknown()));
- goto reset_connection;
- }
- return;
- }
- reset_connection:
- if (connection->retry_token != NULL) {
- /* If we have a retry token and successfully finished, record that success. */
- if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_SUCCESS) {
- aws_retry_token_record_success(connection->retry_token);
- }
- aws_retry_token_release(connection->retry_token);
- connection->retry_token = NULL;
- }
- /* If we weren't successful, and we're here, that means this failure is not eligible for a retry. So finish the
- * request, and close our HTTP connection. */
- if (finish_code != AWS_S3_CONNECTION_FINISH_CODE_SUCCESS) {
- if (connection->http_connection != NULL) {
- aws_http_connection_close(connection->http_connection);
- }
- }
- aws_atomic_fetch_sub(&client->stats.num_requests_network_io[meta_request->type], 1);
- s_s3_client_meta_request_finished_request(client, meta_request, request, error_code);
- if (connection->http_connection != NULL) {
- AWS_ASSERT(endpoint->http_connection_manager);
- aws_http_connection_manager_release_connection(endpoint->http_connection_manager, connection->http_connection);
- connection->http_connection = NULL;
- }
- if (connection->request != NULL) {
- aws_s3_request_release(connection->request);
- connection->request = NULL;
- }
- aws_retry_token_release(connection->retry_token);
- connection->retry_token = NULL;
- aws_s3_endpoint_release(connection->endpoint);
- connection->endpoint = NULL;
- aws_mem_release(client->allocator, connection);
- connection = NULL;
- /* BEGIN CRITICAL SECTION */
- {
- aws_s3_client_lock_synced_data(client);
- s_s3_client_schedule_process_work_synced(client);
- aws_s3_client_unlock_synced_data(client);
- }
- /* END CRITICAL SECTION */
- }
- static void s_s3_client_prepare_request_callback_retry_request(
- struct aws_s3_meta_request *meta_request,
- struct aws_s3_request *request,
- int error_code,
- void *user_data);
- static void s_s3_client_retry_ready(struct aws_retry_token *token, int error_code, void *user_data) {
- AWS_PRECONDITION(token);
- (void)token;
- struct aws_s3_connection *connection = user_data;
- AWS_PRECONDITION(connection);
- struct aws_s3_request *request = connection->request;
- AWS_PRECONDITION(request);
- struct aws_s3_meta_request *meta_request = request->meta_request;
- AWS_PRECONDITION(meta_request);
- struct aws_s3_endpoint *endpoint = meta_request->endpoint;
- AWS_PRECONDITION(endpoint);
- struct aws_s3_client *client = endpoint->client;
- AWS_PRECONDITION(client);
- /* If we couldn't retry this request, then bail on the entire meta request. */
- if (error_code != AWS_ERROR_SUCCESS) {
- AWS_LOGF_ERROR(
- AWS_LS_S3_CLIENT,
- "id=%p Client could not retry request %p for meta request %p due to error %d (%s)",
- (void *)client,
- (void *)meta_request,
- (void *)request,
- error_code,
- aws_error_str(error_code));
- goto error_clean_up;
- }
- AWS_LOGF_DEBUG(
- AWS_LS_S3_META_REQUEST,
- "id=%p Client retrying request %p for meta request %p on connection %p with retry token %p",
- (void *)client,
- (void *)request,
- (void *)meta_request,
- (void *)connection,
- (void *)connection->retry_token);
- aws_s3_meta_request_prepare_request(
- meta_request, request, s_s3_client_prepare_request_callback_retry_request, connection);
- return;
- error_clean_up:
- aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
- }
- static void s_s3_client_prepare_request_callback_retry_request(
- struct aws_s3_meta_request *meta_request,
- struct aws_s3_request *request,
- int error_code,
- void *user_data) {
- AWS_PRECONDITION(meta_request);
- (void)meta_request;
- AWS_PRECONDITION(request);
- (void)request;
- struct aws_s3_connection *connection = user_data;
- AWS_PRECONDITION(connection);
- struct aws_s3_endpoint *endpoint = meta_request->endpoint;
- AWS_ASSERT(endpoint != NULL);
- struct aws_s3_client *client = endpoint->client;
- AWS_ASSERT(client != NULL);
- if (error_code == AWS_ERROR_SUCCESS) {
- AWS_ASSERT(connection->retry_token);
- s_s3_client_acquired_retry_token(
- client->retry_strategy, AWS_ERROR_SUCCESS, connection->retry_token, connection);
- } else {
- aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
- }
- }
- static void s_resume_token_ref_count_zero_callback(void *arg) {
- struct aws_s3_meta_request_resume_token *token = arg;
- aws_string_destroy(token->multipart_upload_id);
- aws_mem_release(token->allocator, token);
- }
- struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_new(struct aws_allocator *allocator) {
- struct aws_s3_meta_request_resume_token *token =
- aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_meta_request_resume_token));
- token->allocator = allocator;
- aws_ref_count_init(&token->ref_count, token, s_resume_token_ref_count_zero_callback);
- return token;
- }
- struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_new_upload(
- struct aws_allocator *allocator,
- const struct aws_s3_upload_resume_token_options *options) {
- AWS_PRECONDITION(allocator);
- AWS_PRECONDITION(options);
- struct aws_s3_meta_request_resume_token *token = aws_s3_meta_request_resume_token_new(allocator);
- token->multipart_upload_id = aws_string_new_from_cursor(allocator, &options->upload_id);
- token->part_size = options->part_size;
- token->total_num_parts = options->total_num_parts;
- token->num_parts_completed = options->num_parts_completed;
- token->type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT;
- return token;
- }
- struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_acquire(
- struct aws_s3_meta_request_resume_token *resume_token) {
- if (resume_token) {
- aws_ref_count_acquire(&resume_token->ref_count);
- }
- return resume_token;
- }
- struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_release(
- struct aws_s3_meta_request_resume_token *resume_token) {
- if (resume_token) {
- aws_ref_count_release(&resume_token->ref_count);
- }
- return NULL;
- }
- enum aws_s3_meta_request_type aws_s3_meta_request_resume_token_type(
- struct aws_s3_meta_request_resume_token *resume_token) {
- AWS_FATAL_PRECONDITION(resume_token);
- return resume_token->type;
- }
- size_t aws_s3_meta_request_resume_token_part_size(struct aws_s3_meta_request_resume_token *resume_token) {
- AWS_FATAL_PRECONDITION(resume_token);
- return resume_token->part_size;
- }
- size_t aws_s3_meta_request_resume_token_total_num_parts(struct aws_s3_meta_request_resume_token *resume_token) {
- AWS_FATAL_PRECONDITION(resume_token);
- return resume_token->total_num_parts;
- }
- size_t aws_s3_meta_request_resume_token_num_parts_completed(struct aws_s3_meta_request_resume_token *resume_token) {
- AWS_FATAL_PRECONDITION(resume_token);
- return resume_token->num_parts_completed;
- }
- struct aws_byte_cursor aws_s3_meta_request_resume_token_upload_id(
- struct aws_s3_meta_request_resume_token *resume_token) {
- AWS_FATAL_PRECONDITION(resume_token);
- if (resume_token->type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT && resume_token->multipart_upload_id != NULL) {
- return aws_byte_cursor_from_string(resume_token->multipart_upload_id);
- }
- return aws_byte_cursor_from_c_str("");
- }
|