s3_client.c 78 KB


  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include "aws/s3/private/s3_auto_ranged_get.h"
  6. #include "aws/s3/private/s3_auto_ranged_put.h"
  7. #include "aws/s3/private/s3_client_impl.h"
  8. #include "aws/s3/private/s3_default_meta_request.h"
  9. #include "aws/s3/private/s3_meta_request_impl.h"
  10. #include "aws/s3/private/s3_request_messages.h"
  11. #include "aws/s3/private/s3_util.h"
  12. #include <aws/auth/credentials.h>
  13. #include <aws/common/assert.h>
  14. #include <aws/common/atomics.h>
  15. #include <aws/common/clock.h>
  16. #include <aws/common/device_random.h>
  17. #include <aws/common/environment.h>
  18. #include <aws/common/json.h>
  19. #include <aws/common/string.h>
  20. #include <aws/common/system_info.h>
  21. #include <aws/http/connection.h>
  22. #include <aws/http/connection_manager.h>
  23. #include <aws/http/proxy.h>
  24. #include <aws/http/request_response.h>
  25. #include <aws/io/channel_bootstrap.h>
  26. #include <aws/io/event_loop.h>
  27. #include <aws/io/host_resolver.h>
  28. #include <aws/io/retry_strategy.h>
  29. #include <aws/io/socket.h>
  30. #include <aws/io/stream.h>
  31. #include <aws/io/tls_channel_handler.h>
  32. #include <aws/io/uri.h>
  33. #include <aws/s3/private/s3_copy_object.h>
  34. #include <inttypes.h>
  35. #include <math.h>
  36. #ifdef _MSC_VER
  37. # pragma warning(disable : 4232) /* function pointer to dll symbol */
  38. #endif /* _MSC_VER */
  39. struct aws_s3_meta_request_work {
  40. struct aws_linked_list_node node;
  41. struct aws_s3_meta_request *meta_request;
  42. };
  43. static const enum aws_log_level s_log_level_client_stats = AWS_LL_INFO;
  44. static const uint32_t s_max_requests_multiplier = 4;
  45. /* TODO Provide analysis on origins of this value. */
  46. static const double s_throughput_per_vip_gbps = 4.0;
  47. /* Preferred amount of active connections per meta request type. */
  48. const uint32_t g_num_conns_per_vip_meta_request_look_up[AWS_S3_META_REQUEST_TYPE_MAX] = {
  49. 10, /* AWS_S3_META_REQUEST_TYPE_DEFAULT */
  50. 10, /* AWS_S3_META_REQUEST_TYPE_GET_OBJECT */
  51. 10, /* AWS_S3_META_REQUEST_TYPE_PUT_OBJECT */
  52. 10 /* AWS_S3_META_REQUEST_TYPE_COPY_OBJECT */
  53. };
  54. /* Should be max of s_num_conns_per_vip_meta_request_look_up */
  55. const uint32_t g_max_num_connections_per_vip = 10;
  56. /**
  57. * Default part size is 8 MiB to reach the best performance from the experiments we had.
  58. * Default max part size is SIZE_MAX on 32bit systems, which is around 4GiB; and 5GiB on a 64bit system.
  59. * The server limit is 5GiB, but object size limit is 5TiB for now. We should be good enough for all the cases.
  60. * The max number of upload parts is 10000, which limits the object size to 39TiB on 32bit and 49TiB on 64bit.
  61. * TODO Provide more information on other values.
  62. */
  63. static const size_t s_default_part_size = 8 * 1024 * 1024;
  64. static const uint64_t s_default_max_part_size = SIZE_MAX < 5368709120ULL ? SIZE_MAX : 5368709120ULL;
  65. static const double s_default_throughput_target_gbps = 10.0;
  66. static const uint32_t s_default_max_retries = 5;
  67. static size_t s_dns_host_address_ttl_seconds = 5 * 60;
  68. /* Default time until a connection is declared dead, while handling a request but seeing no activity.
  69. * 30 seconds mirrors the value currently used by the Java SDK. */
  70. static const uint32_t s_default_throughput_failure_interval_seconds = 30;
  71. /* Called when ref count is 0. */
  72. static void s_s3_client_start_destroy(void *user_data);
  73. /* Called by s_s3_client_process_work_default when all shutdown criteria has been met. */
  74. static void s_s3_client_finish_destroy_default(struct aws_s3_client *client);
  75. /* Called when the body streaming elg shutdown has completed. */
  76. static void s_s3_client_body_streaming_elg_shutdown(void *user_data);
  77. static void s_s3_client_create_connection_for_request(struct aws_s3_client *client, struct aws_s3_request *request);
  78. /* Callback which handles the HTTP connection retrieved by acquire_http_connection. */
  79. static void s_s3_client_on_acquire_http_connection(
  80. struct aws_http_connection *http_connection,
  81. int error_code,
  82. void *user_data);
  83. static void s_s3_client_push_meta_request_synced(
  84. struct aws_s3_client *client,
  85. struct aws_s3_meta_request *meta_request);
  86. /* Schedule task for processing work. (Calls the corresponding vtable function.) */
  87. static void s_s3_client_schedule_process_work_synced(struct aws_s3_client *client);
  88. /* Default implementation for scheduling processing of work. */
  89. static void s_s3_client_schedule_process_work_synced_default(struct aws_s3_client *client);
  90. /* Actual task function that processes work. */
  91. static void s_s3_client_process_work_task(struct aws_task *task, void *arg, enum aws_task_status task_status);
  92. static void s_s3_client_process_work_default(struct aws_s3_client *client);
  93. static void s_s3_client_endpoint_shutdown_callback(struct aws_s3_client *client);
  94. /* Default factory function for creating a meta request. */
  95. static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
  96. struct aws_s3_client *client,
  97. const struct aws_s3_meta_request_options *options);
  98. static struct aws_s3_client_vtable s_s3_client_default_vtable = {
  99. .meta_request_factory = s_s3_client_meta_request_factory_default,
  100. .acquire_http_connection = aws_http_connection_manager_acquire_connection,
  101. .get_host_address_count = aws_host_resolver_get_host_address_count,
  102. .schedule_process_work_synced = s_s3_client_schedule_process_work_synced_default,
  103. .process_work = s_s3_client_process_work_default,
  104. .endpoint_shutdown_callback = s_s3_client_endpoint_shutdown_callback,
  105. .finish_destroy = s_s3_client_finish_destroy_default,
  106. };
  107. void aws_s3_set_dns_ttl(size_t ttl) {
  108. s_dns_host_address_ttl_seconds = ttl;
  109. }
  110. /* Returns the max number of connections allowed.
  111. *
  112. * When meta request is NULL, this will return the overall allowed number of connections.
  113. *
  114. * If meta_request is not NULL, this will give the max number of connections allowed for that meta request type on
  115. * that endpoint.
  116. */
  117. uint32_t aws_s3_client_get_max_active_connections(
  118. struct aws_s3_client *client,
  119. struct aws_s3_meta_request *meta_request) {
  120. AWS_PRECONDITION(client);
  121. uint32_t num_connections_per_vip = g_max_num_connections_per_vip;
  122. uint32_t num_vips = client->ideal_vip_count;
  123. if (meta_request != NULL) {
  124. num_connections_per_vip = g_num_conns_per_vip_meta_request_look_up[meta_request->type];
  125. struct aws_s3_endpoint *endpoint = meta_request->endpoint;
  126. AWS_ASSERT(endpoint != NULL);
  127. AWS_ASSERT(client->vtable->get_host_address_count);
  128. size_t num_known_vips = client->vtable->get_host_address_count(
  129. client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
  130. /* If the number of known vips is less than our ideal VIP count, clamp it. */
  131. if (num_known_vips < (size_t)num_vips) {
  132. num_vips = (uint32_t)num_known_vips;
  133. }
  134. }
  135. /* We always want to allow for at least one VIP worth of connections. */
  136. if (num_vips == 0) {
  137. num_vips = 1;
  138. }
  139. uint32_t max_active_connections = num_vips * num_connections_per_vip;
  140. if (client->max_active_connections_override > 0 &&
  141. client->max_active_connections_override < max_active_connections) {
  142. max_active_connections = client->max_active_connections_override;
  143. }
  144. return max_active_connections;
  145. }
  146. /* Returns the max number of requests allowed to be in memory */
  147. uint32_t aws_s3_client_get_max_requests_in_flight(struct aws_s3_client *client) {
  148. AWS_PRECONDITION(client);
  149. return aws_s3_client_get_max_active_connections(client, NULL) * s_max_requests_multiplier;
  150. }
  151. /* Returns the max number of requests that should be in preparation stage (ie: reading from a stream, being signed,
  152. * etc.) */
  153. uint32_t aws_s3_client_get_max_requests_prepare(struct aws_s3_client *client) {
  154. return aws_s3_client_get_max_active_connections(client, NULL);
  155. }
  156. static uint32_t s_s3_client_get_num_requests_network_io(
  157. struct aws_s3_client *client,
  158. enum aws_s3_meta_request_type meta_request_type) {
  159. AWS_PRECONDITION(client);
  160. uint32_t num_requests_network_io = 0;
  161. if (meta_request_type == AWS_S3_META_REQUEST_TYPE_MAX) {
  162. for (uint32_t i = 0; i < AWS_S3_META_REQUEST_TYPE_MAX; ++i) {
  163. num_requests_network_io += (uint32_t)aws_atomic_load_int(&client->stats.num_requests_network_io[i]);
  164. }
  165. } else {
  166. num_requests_network_io =
  167. (uint32_t)aws_atomic_load_int(&client->stats.num_requests_network_io[meta_request_type]);
  168. }
  169. return num_requests_network_io;
  170. }
  171. void aws_s3_client_lock_synced_data(struct aws_s3_client *client) {
  172. aws_mutex_lock(&client->synced_data.lock);
  173. }
  174. void aws_s3_client_unlock_synced_data(struct aws_s3_client *client) {
  175. aws_mutex_unlock(&client->synced_data.lock);
  176. }
  177. struct aws_s3_client *aws_s3_client_new(
  178. struct aws_allocator *allocator,
  179. const struct aws_s3_client_config *client_config) {
  180. AWS_PRECONDITION(allocator);
  181. AWS_PRECONDITION(client_config);
  182. if (client_config->client_bootstrap == NULL) {
  183. AWS_LOGF_ERROR(
  184. AWS_LS_S3_CLIENT,
  185. "Cannot create client from client_config; client_bootstrap provided in options is invalid.");
  186. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  187. return NULL;
  188. }
  189. /* Cannot be less than zero. If zero, use default. */
  190. if (client_config->throughput_target_gbps < 0.0) {
  191. AWS_LOGF_ERROR(
  192. AWS_LS_S3_CLIENT,
  193. "Cannot create client from client_config; throughput_target_gbps cannot less than or equal to 0.");
  194. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  195. return NULL;
  196. }
  197. #ifdef BYO_CRYPTO
  198. if (client_config->tls_mode == AWS_MR_TLS_ENABLED && client_config->tls_connection_options == NULL) {
  199. AWS_LOGF_ERROR(
  200. AWS_LS_S3_CLIENT,
  201. "Cannot create client from client_config; when using BYO_CRYPTO, tls_connection_options can not be "
  202. "NULL when TLS is enabled.");
  203. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  204. return NULL;
  205. }
  206. #endif
  207. struct aws_s3_client *client = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_client));
  208. client->allocator = allocator;
  209. client->vtable = &s_s3_client_default_vtable;
  210. aws_ref_count_init(&client->ref_count, client, (aws_simple_completion_callback *)s_s3_client_start_destroy);
  211. if (aws_mutex_init(&client->synced_data.lock) != AWS_OP_SUCCESS) {
  212. goto lock_init_fail;
  213. }
  214. aws_linked_list_init(&client->synced_data.pending_meta_request_work);
  215. aws_linked_list_init(&client->synced_data.prepared_requests);
  216. aws_linked_list_init(&client->threaded_data.meta_requests);
  217. aws_linked_list_init(&client->threaded_data.request_queue);
  218. aws_atomic_init_int(&client->stats.num_requests_in_flight, 0);
  219. for (uint32_t i = 0; i < (uint32_t)AWS_S3_META_REQUEST_TYPE_MAX; ++i) {
  220. aws_atomic_init_int(&client->stats.num_requests_network_io[i], 0);
  221. }
  222. aws_atomic_init_int(&client->stats.num_requests_stream_queued_waiting, 0);
  223. aws_atomic_init_int(&client->stats.num_requests_streaming, 0);
  224. *((uint32_t *)&client->max_active_connections_override) = client_config->max_active_connections_override;
  225. /* Store our client bootstrap. */
  226. client->client_bootstrap = aws_client_bootstrap_acquire(client_config->client_bootstrap);
  227. struct aws_event_loop_group *event_loop_group = client_config->client_bootstrap->event_loop_group;
  228. aws_event_loop_group_acquire(event_loop_group);
  229. client->process_work_event_loop = aws_event_loop_group_get_next_loop(event_loop_group);
  230. /* Make a copy of the region string. */
  231. client->region = aws_string_new_from_array(allocator, client_config->region.ptr, client_config->region.len);
  232. if (client_config->part_size != 0) {
  233. *((size_t *)&client->part_size) = client_config->part_size;
  234. } else {
  235. *((size_t *)&client->part_size) = s_default_part_size;
  236. }
  237. if (client_config->max_part_size != 0) {
  238. *((size_t *)&client->max_part_size) = client_config->max_part_size;
  239. } else {
  240. *((size_t *)&client->max_part_size) = (size_t)s_default_max_part_size;
  241. }
  242. if (client_config->max_part_size < client_config->part_size) {
  243. *((size_t *)&client_config->max_part_size) = client_config->part_size;
  244. }
  245. client->connect_timeout_ms = client_config->connect_timeout_ms;
  246. if (client_config->proxy_ev_settings) {
  247. client->proxy_ev_settings = aws_mem_calloc(allocator, 1, sizeof(struct proxy_env_var_settings));
  248. *client->proxy_ev_settings = *client_config->proxy_ev_settings;
  249. if (client_config->proxy_ev_settings->tls_options) {
  250. client->proxy_ev_tls_options = aws_mem_calloc(allocator, 1, sizeof(struct aws_tls_connection_options));
  251. if (aws_tls_connection_options_copy(client->proxy_ev_tls_options, client->proxy_ev_settings->tls_options)) {
  252. goto on_error;
  253. }
  254. client->proxy_ev_settings->tls_options = client->proxy_ev_tls_options;
  255. }
  256. }
  257. if (client_config->tcp_keep_alive_options) {
  258. client->tcp_keep_alive_options = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_tcp_keep_alive_options));
  259. *client->tcp_keep_alive_options = *client_config->tcp_keep_alive_options;
  260. }
  261. if (client_config->monitoring_options) {
  262. client->monitoring_options = *client_config->monitoring_options;
  263. } else {
  264. client->monitoring_options.minimum_throughput_bytes_per_second = 1;
  265. client->monitoring_options.allowable_throughput_failure_interval_seconds =
  266. s_default_throughput_failure_interval_seconds;
  267. }
  268. if (client_config->tls_mode == AWS_MR_TLS_ENABLED) {
  269. client->tls_connection_options =
  270. aws_mem_calloc(client->allocator, 1, sizeof(struct aws_tls_connection_options));
  271. if (client_config->tls_connection_options != NULL) {
  272. aws_tls_connection_options_copy(client->tls_connection_options, client_config->tls_connection_options);
  273. } else {
  274. #ifdef BYO_CRYPTO
  275. AWS_FATAL_ASSERT(false);
  276. goto on_error;
  277. #else
  278. struct aws_tls_ctx_options default_tls_ctx_options;
  279. AWS_ZERO_STRUCT(default_tls_ctx_options);
  280. aws_tls_ctx_options_init_default_client(&default_tls_ctx_options, allocator);
  281. struct aws_tls_ctx *default_tls_ctx = aws_tls_client_ctx_new(allocator, &default_tls_ctx_options);
  282. if (default_tls_ctx == NULL) {
  283. goto on_error;
  284. }
  285. aws_tls_connection_options_init_from_ctx(client->tls_connection_options, default_tls_ctx);
  286. aws_tls_ctx_release(default_tls_ctx);
  287. aws_tls_ctx_options_clean_up(&default_tls_ctx_options);
  288. #endif
  289. }
  290. }
  291. if (client_config->proxy_options) {
  292. client->proxy_config = aws_http_proxy_config_new_from_proxy_options_with_tls_info(
  293. allocator, client_config->proxy_options, client_config->tls_mode == AWS_MR_TLS_ENABLED);
  294. if (client->proxy_config == NULL) {
  295. goto on_error;
  296. }
  297. }
  298. /* Set up body streaming ELG */
  299. {
  300. uint16_t num_event_loops =
  301. (uint16_t)aws_array_list_length(&client->client_bootstrap->event_loop_group->event_loops);
  302. uint16_t num_streaming_threads = num_event_loops;
  303. if (num_streaming_threads < 1) {
  304. num_streaming_threads = 1;
  305. }
  306. struct aws_shutdown_callback_options body_streaming_elg_shutdown_options = {
  307. .shutdown_callback_fn = s_s3_client_body_streaming_elg_shutdown,
  308. .shutdown_callback_user_data = client,
  309. };
  310. if (aws_get_cpu_group_count() > 1) {
  311. client->body_streaming_elg = aws_event_loop_group_new_default_pinned_to_cpu_group(
  312. client->allocator, num_streaming_threads, 1, &body_streaming_elg_shutdown_options);
  313. } else {
  314. client->body_streaming_elg = aws_event_loop_group_new_default(
  315. client->allocator, num_streaming_threads, &body_streaming_elg_shutdown_options);
  316. }
  317. if (!client->body_streaming_elg) {
  318. /* Fail to create elg, we should fail the call */
  319. goto on_error;
  320. }
  321. client->synced_data.body_streaming_elg_allocated = true;
  322. }
  323. /* Setup cannot fail after this point. */
  324. if (client_config->throughput_target_gbps != 0.0) {
  325. *((double *)&client->throughput_target_gbps) = client_config->throughput_target_gbps;
  326. } else {
  327. *((double *)&client->throughput_target_gbps) = s_default_throughput_target_gbps;
  328. }
  329. *((enum aws_s3_meta_request_compute_content_md5 *)&client->compute_content_md5) =
  330. client_config->compute_content_md5;
  331. /* Determine how many vips are ideal by dividing target-throughput by throughput-per-vip. */
  332. {
  333. double ideal_vip_count_double = client->throughput_target_gbps / s_throughput_per_vip_gbps;
  334. *((uint32_t *)&client->ideal_vip_count) = (uint32_t)ceil(ideal_vip_count_double);
  335. }
  336. if (client_config->signing_config) {
  337. client->cached_signing_config = aws_cached_signing_config_new(client->allocator, client_config->signing_config);
  338. }
  339. client->synced_data.active = true;
  340. if (client_config->retry_strategy != NULL) {
  341. aws_retry_strategy_acquire(client_config->retry_strategy);
  342. client->retry_strategy = client_config->retry_strategy;
  343. } else {
  344. struct aws_exponential_backoff_retry_options backoff_retry_options = {
  345. .el_group = client_config->client_bootstrap->event_loop_group,
  346. .max_retries = s_default_max_retries,
  347. };
  348. struct aws_standard_retry_options retry_options = {
  349. .backoff_retry_options = backoff_retry_options,
  350. };
  351. client->retry_strategy = aws_retry_strategy_new_standard(allocator, &retry_options);
  352. }
  353. aws_hash_table_init(
  354. &client->synced_data.endpoints,
  355. client->allocator,
  356. 10,
  357. aws_hash_string,
  358. aws_hash_callback_string_eq,
  359. aws_hash_callback_string_destroy,
  360. NULL);
  361. /* Initialize shutdown options and tracking. */
  362. client->shutdown_callback = client_config->shutdown_callback;
  363. client->shutdown_callback_user_data = client_config->shutdown_callback_user_data;
  364. *((bool *)&client->enable_read_backpressure) = client_config->enable_read_backpressure;
  365. *((size_t *)&client->initial_read_window) = client_config->initial_read_window;
  366. return client;
  367. on_error:
  368. aws_string_destroy(client->region);
  369. if (client->tls_connection_options) {
  370. aws_tls_connection_options_clean_up(client->tls_connection_options);
  371. aws_mem_release(client->allocator, client->tls_connection_options);
  372. client->tls_connection_options = NULL;
  373. }
  374. if (client->proxy_config) {
  375. aws_http_proxy_config_destroy(client->proxy_config);
  376. }
  377. if (client->proxy_ev_tls_options) {
  378. aws_tls_connection_options_clean_up(client->proxy_ev_tls_options);
  379. aws_mem_release(client->allocator, client->proxy_ev_tls_options);
  380. client->proxy_ev_settings->tls_options = NULL;
  381. }
  382. aws_mem_release(client->allocator, client->proxy_ev_settings);
  383. aws_mem_release(client->allocator, client->tcp_keep_alive_options);
  384. aws_event_loop_group_release(client->client_bootstrap->event_loop_group);
  385. aws_client_bootstrap_release(client->client_bootstrap);
  386. aws_mutex_clean_up(&client->synced_data.lock);
  387. lock_init_fail:
  388. aws_mem_release(client->allocator, client);
  389. return NULL;
  390. }
  391. struct aws_s3_client *aws_s3_client_acquire(struct aws_s3_client *client) {
  392. AWS_PRECONDITION(client);
  393. aws_ref_count_acquire(&client->ref_count);
  394. return client;
  395. }
  396. struct aws_s3_client *aws_s3_client_release(struct aws_s3_client *client) {
  397. if (client != NULL) {
  398. aws_ref_count_release(&client->ref_count);
  399. }
  400. return NULL;
  401. }
  402. static void s_s3_client_start_destroy(void *user_data) {
  403. struct aws_s3_client *client = user_data;
  404. AWS_PRECONDITION(client);
  405. AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client starting destruction.", (void *)client);
  406. struct aws_linked_list local_vip_list;
  407. aws_linked_list_init(&local_vip_list);
  408. /* BEGIN CRITICAL SECTION */
  409. {
  410. aws_s3_client_lock_synced_data(client);
  411. client->synced_data.active = false;
  412. /* Prevent the client from cleaning up in between the mutex unlock/re-lock below.*/
  413. client->synced_data.start_destroy_executing = true;
  414. aws_s3_client_unlock_synced_data(client);
  415. }
  416. /* END CRITICAL SECTION */
  417. aws_event_loop_group_release(client->body_streaming_elg);
  418. client->body_streaming_elg = NULL;
  419. /* BEGIN CRITICAL SECTION */
  420. {
  421. aws_s3_client_lock_synced_data(client);
  422. client->synced_data.start_destroy_executing = false;
  423. /* Schedule the work task to clean up outstanding connections and to call s_s3_client_finish_destroy function if
  424. * everything cleaning up asynchronously has finished. */
  425. s_s3_client_schedule_process_work_synced(client);
  426. aws_s3_client_unlock_synced_data(client);
  427. }
  428. /* END CRITICAL SECTION */
  429. }
  430. static void s_s3_client_finish_destroy_default(struct aws_s3_client *client) {
  431. AWS_PRECONDITION(client);
  432. AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client finishing destruction.", (void *)client);
  433. aws_string_destroy(client->region);
  434. client->region = NULL;
  435. if (client->tls_connection_options) {
  436. aws_tls_connection_options_clean_up(client->tls_connection_options);
  437. aws_mem_release(client->allocator, client->tls_connection_options);
  438. client->tls_connection_options = NULL;
  439. }
  440. if (client->proxy_config) {
  441. aws_http_proxy_config_destroy(client->proxy_config);
  442. }
  443. if (client->proxy_ev_tls_options) {
  444. aws_tls_connection_options_clean_up(client->proxy_ev_tls_options);
  445. aws_mem_release(client->allocator, client->proxy_ev_tls_options);
  446. client->proxy_ev_settings->tls_options = NULL;
  447. }
  448. aws_mem_release(client->allocator, client->proxy_ev_settings);
  449. aws_mem_release(client->allocator, client->tcp_keep_alive_options);
  450. aws_mutex_clean_up(&client->synced_data.lock);
  451. AWS_ASSERT(aws_linked_list_empty(&client->synced_data.pending_meta_request_work));
  452. AWS_ASSERT(aws_linked_list_empty(&client->threaded_data.meta_requests));
  453. aws_hash_table_clean_up(&client->synced_data.endpoints);
  454. aws_retry_strategy_release(client->retry_strategy);
  455. aws_event_loop_group_release(client->client_bootstrap->event_loop_group);
  456. aws_client_bootstrap_release(client->client_bootstrap);
  457. aws_cached_signing_config_destroy(client->cached_signing_config);
  458. aws_s3_client_shutdown_complete_callback_fn *shutdown_callback = client->shutdown_callback;
  459. void *shutdown_user_data = client->shutdown_callback_user_data;
  460. aws_mem_release(client->allocator, client);
  461. client = NULL;
  462. if (shutdown_callback != NULL) {
  463. shutdown_callback(shutdown_user_data);
  464. }
  465. }
  466. static void s_s3_client_body_streaming_elg_shutdown(void *user_data) {
  467. struct aws_s3_client *client = user_data;
  468. AWS_PRECONDITION(client);
  469. AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Client body streaming ELG shutdown.", (void *)client);
  470. /* BEGIN CRITICAL SECTION */
  471. {
  472. aws_s3_client_lock_synced_data(client);
  473. client->synced_data.body_streaming_elg_allocated = false;
  474. s_s3_client_schedule_process_work_synced(client);
  475. aws_s3_client_unlock_synced_data(client);
  476. }
  477. /* END CRITICAL SECTION */
  478. }
  479. uint32_t aws_s3_client_queue_requests_threaded(
  480. struct aws_s3_client *client,
  481. struct aws_linked_list *request_list,
  482. bool queue_front) {
  483. AWS_PRECONDITION(client);
  484. AWS_PRECONDITION(request_list);
  485. uint32_t request_list_size = 0;
  486. for (struct aws_linked_list_node *node = aws_linked_list_begin(request_list);
  487. node != aws_linked_list_end(request_list);
  488. node = aws_linked_list_next(node)) {
  489. ++request_list_size;
  490. }
  491. if (queue_front) {
  492. aws_linked_list_move_all_front(&client->threaded_data.request_queue, request_list);
  493. } else {
  494. aws_linked_list_move_all_back(&client->threaded_data.request_queue, request_list);
  495. }
  496. client->threaded_data.request_queue_size += request_list_size;
  497. return request_list_size;
  498. }
  499. struct aws_s3_request *aws_s3_client_dequeue_request_threaded(struct aws_s3_client *client) {
  500. AWS_PRECONDITION(client);
  501. if (aws_linked_list_empty(&client->threaded_data.request_queue)) {
  502. return NULL;
  503. }
  504. struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&client->threaded_data.request_queue);
  505. struct aws_s3_request *request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node);
  506. --client->threaded_data.request_queue_size;
  507. return request;
  508. }
  509. /*
  510. * There is currently some overlap between user provided Host header and endpoint
  511. * override. This function handles the corner cases for when either or both are provided.
  512. */
  513. int s_apply_endpoint_override(
  514. const struct aws_s3_client *client,
  515. struct aws_http_headers *message_headers,
  516. const struct aws_uri *endpoint) {
  517. AWS_PRECONDITION(message_headers);
  518. const struct aws_byte_cursor *endpoint_authority = endpoint == NULL ? NULL : aws_uri_authority(endpoint);
  519. if (!aws_http_headers_has(message_headers, g_host_header_name)) {
  520. if (endpoint_authority == NULL) {
  521. AWS_LOGF_ERROR(
  522. AWS_LS_S3_CLIENT,
  523. "id=%p Cannot create meta s3 request; message provided in options does not have either 'Host' header "
  524. "set or endpoint override.",
  525. (void *)client);
  526. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  527. }
  528. if (aws_http_headers_set(message_headers, g_host_header_name, *endpoint_authority)) {
  529. AWS_LOGF_ERROR(
  530. AWS_LS_S3_CLIENT,
  531. "id=%p Cannot create meta s3 request; failed to set 'Host' header based on endpoint override.",
  532. (void *)client);
  533. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  534. }
  535. }
  536. struct aws_byte_cursor host_value;
  537. AWS_FATAL_ASSERT(aws_http_headers_get(message_headers, g_host_header_name, &host_value) == AWS_OP_SUCCESS);
  538. if (endpoint_authority != NULL && !aws_byte_cursor_eq(&host_value, endpoint_authority)) {
  539. AWS_LOGF_ERROR(
  540. AWS_LS_S3_CLIENT,
  541. "id=%p Cannot create meta s3 request; host header value " PRInSTR
  542. " does not match endpoint override " PRInSTR,
  543. (void *)client,
  544. AWS_BYTE_CURSOR_PRI(host_value),
  545. AWS_BYTE_CURSOR_PRI(*endpoint_authority));
  546. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  547. }
  548. return AWS_OP_SUCCESS;
  549. }
  550. /* Public facing make-meta-request function. */
  551. struct aws_s3_meta_request *aws_s3_client_make_meta_request(
  552. struct aws_s3_client *client,
  553. const struct aws_s3_meta_request_options *options) {
  554. AWS_LOGF_INFO(AWS_LS_S3_CLIENT, "id=%p Initiating making of meta request", (void *)client);
  555. AWS_PRECONDITION(client);
  556. AWS_PRECONDITION(client->vtable);
  557. AWS_PRECONDITION(client->vtable->meta_request_factory);
  558. AWS_PRECONDITION(options);
  559. if (options->type >= AWS_S3_META_REQUEST_TYPE_MAX) {
  560. AWS_LOGF_ERROR(
  561. AWS_LS_S3_CLIENT,
  562. "id=%p Cannot create meta s3 request; invalid meta request type specified.",
  563. (void *)client);
  564. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  565. return NULL;
  566. }
  567. if (options->message == NULL) {
  568. AWS_LOGF_ERROR(
  569. AWS_LS_S3_CLIENT,
  570. "id=%p Cannot create meta s3 request; message provided in options is invalid.",
  571. (void *)client);
  572. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  573. return NULL;
  574. }
  575. struct aws_http_headers *message_headers = aws_http_message_get_headers(options->message);
  576. if (message_headers == NULL) {
  577. AWS_LOGF_ERROR(
  578. AWS_LS_S3_CLIENT,
  579. "id=%p Cannot create meta s3 request; message provided in options does not contain headers.",
  580. (void *)client);
  581. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  582. return NULL;
  583. }
  584. if (options->checksum_config) {
  585. if (options->checksum_config->location == AWS_SCL_TRAILER) {
  586. struct aws_http_headers *headers = aws_http_message_get_headers(options->message);
  587. struct aws_byte_cursor existing_encoding;
  588. AWS_ZERO_STRUCT(existing_encoding);
  589. if (aws_http_headers_get(headers, g_content_encoding_header_name, &existing_encoding) == AWS_OP_SUCCESS) {
  590. if (aws_byte_cursor_find_exact(&existing_encoding, &g_content_encoding_header_aws_chunked, NULL) ==
  591. AWS_OP_SUCCESS) {
  592. AWS_LOGF_ERROR(
  593. AWS_LS_S3_CLIENT,
  594. "id=%p Cannot create meta s3 request; for trailer checksum, the original request cannot be "
  595. "aws-chunked encoding. The client will encode the request instead.",
  596. (void *)client);
  597. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  598. return NULL;
  599. }
  600. }
  601. }
  602. if (options->checksum_config->location == AWS_SCL_HEADER) {
  603. /* TODO: support calculate checksum to add to header */
  604. aws_raise_error(AWS_ERROR_UNSUPPORTED_OPERATION);
  605. return NULL;
  606. }
  607. if (options->checksum_config->location != AWS_SCL_NONE &&
  608. options->checksum_config->checksum_algorithm == AWS_SCA_NONE) {
  609. AWS_LOGF_ERROR(
  610. AWS_LS_S3_CLIENT,
  611. "id=%p Cannot create meta s3 request; checksum algorithm must be set to calculate checksum.",
  612. (void *)client);
  613. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  614. return NULL;
  615. }
  616. if (options->checksum_config->checksum_algorithm != AWS_SCA_NONE &&
  617. options->checksum_config->location == AWS_SCL_NONE) {
  618. AWS_LOGF_ERROR(
  619. AWS_LS_S3_CLIENT,
  620. "id=%p Cannot create meta s3 request; checksum algorithm cannot be set if not calculate checksum from "
  621. "client.",
  622. (void *)client);
  623. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  624. return NULL;
  625. }
  626. }
  627. if (s_apply_endpoint_override(client, message_headers, options->endpoint)) {
  628. return NULL;
  629. }
  630. struct aws_byte_cursor host_header_value;
  631. /* The Host header must be set from s_apply_endpoint_override, if not errored out */
  632. AWS_FATAL_ASSERT(aws_http_headers_get(message_headers, g_host_header_name, &host_header_value) == AWS_OP_SUCCESS);
  633. bool is_https = true;
  634. uint16_t port = 0;
  635. if (options->endpoint != NULL) {
  636. struct aws_byte_cursor https_scheme = aws_byte_cursor_from_c_str("https");
  637. struct aws_byte_cursor http_scheme = aws_byte_cursor_from_c_str("http");
  638. const struct aws_byte_cursor *scheme = aws_uri_scheme(options->endpoint);
  639. is_https = aws_byte_cursor_eq_ignore_case(scheme, &https_scheme);
  640. if (!is_https && !aws_byte_cursor_eq_ignore_case(scheme, &http_scheme)) {
  641. AWS_LOGF_ERROR(
  642. AWS_LS_S3_CLIENT,
  643. "id=%p Cannot create meta s3 request; unexpected scheme '" PRInSTR "' in endpoint override.",
  644. (void *)client,
  645. AWS_BYTE_CURSOR_PRI(*scheme));
  646. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  647. return NULL;
  648. }
  649. port = aws_uri_port(options->endpoint);
  650. }
  651. struct aws_s3_meta_request *meta_request = client->vtable->meta_request_factory(client, options);
  652. if (meta_request == NULL) {
  653. AWS_LOGF_ERROR(AWS_LS_S3_CLIENT, "id=%p: Could not create new meta request.", (void *)client);
  654. return NULL;
  655. }
  656. bool error_occurred = false;
  657. /* BEGIN CRITICAL SECTION */
  658. {
  659. aws_s3_client_lock_synced_data(client);
  660. struct aws_string *endpoint_host_name = NULL;
  661. if (options->endpoint != NULL) {
  662. endpoint_host_name = aws_string_new_from_cursor(client->allocator, aws_uri_host_name(options->endpoint));
  663. } else {
  664. struct aws_uri host_uri;
  665. if (aws_uri_init_parse(&host_uri, client->allocator, &host_header_value)) {
  666. error_occurred = true;
  667. goto unlock;
  668. }
  669. endpoint_host_name = aws_string_new_from_cursor(client->allocator, aws_uri_host_name(&host_uri));
  670. aws_uri_clean_up(&host_uri);
  671. }
  672. struct aws_s3_endpoint *endpoint = NULL;
  673. struct aws_hash_element *endpoint_hash_element = NULL;
  674. int was_created = 0;
  675. if (aws_hash_table_create(
  676. &client->synced_data.endpoints, endpoint_host_name, &endpoint_hash_element, &was_created)) {
  677. aws_string_destroy(endpoint_host_name);
  678. error_occurred = true;
  679. goto unlock;
  680. }
  681. if (was_created) {
  682. struct aws_s3_endpoint_options endpoint_options = {
  683. .host_name = endpoint_host_name,
  684. .client_bootstrap = client->client_bootstrap,
  685. .tls_connection_options = is_https ? client->tls_connection_options : NULL,
  686. .dns_host_address_ttl_seconds = s_dns_host_address_ttl_seconds,
  687. .client = client,
  688. .max_connections = aws_s3_client_get_max_active_connections(client, NULL),
  689. .port = port,
  690. .proxy_config = client->proxy_config,
  691. .proxy_ev_settings = client->proxy_ev_settings,
  692. .connect_timeout_ms = client->connect_timeout_ms,
  693. .tcp_keep_alive_options = client->tcp_keep_alive_options,
  694. .monitoring_options = &client->monitoring_options,
  695. };
  696. endpoint = aws_s3_endpoint_new(client->allocator, &endpoint_options);
  697. if (endpoint == NULL) {
  698. aws_hash_table_remove(&client->synced_data.endpoints, endpoint_host_name, NULL, NULL);
  699. aws_string_destroy(endpoint_host_name);
  700. error_occurred = true;
  701. goto unlock;
  702. }
  703. endpoint_hash_element->value = endpoint;
  704. ++client->synced_data.num_endpoints_allocated;
  705. } else {
  706. endpoint = endpoint_hash_element->value;
  707. aws_s3_endpoint_acquire(endpoint, true /*already_holding_lock*/);
  708. aws_string_destroy(endpoint_host_name);
  709. endpoint_host_name = NULL;
  710. }
  711. meta_request->endpoint = endpoint;
  712. s_s3_client_push_meta_request_synced(client, meta_request);
  713. s_s3_client_schedule_process_work_synced(client);
  714. unlock:
  715. aws_s3_client_unlock_synced_data(client);
  716. }
  717. /* END CRITICAL SECTION */
  718. if (error_occurred) {
  719. AWS_LOGF_ERROR(
  720. AWS_LS_S3_CLIENT,
  721. "id=%p Could not create meta request due to error %d (%s)",
  722. (void *)client,
  723. aws_last_error(),
  724. aws_error_str(aws_last_error()));
  725. meta_request = aws_s3_meta_request_release(meta_request);
  726. } else {
  727. AWS_LOGF_INFO(AWS_LS_S3_CLIENT, "id=%p: Created meta request %p", (void *)client, (void *)meta_request);
  728. }
  729. return meta_request;
  730. }
  731. static void s_s3_client_endpoint_shutdown_callback(struct aws_s3_client *client) {
  732. AWS_PRECONDITION(client);
  733. /* BEGIN CRITICAL SECTION */
  734. {
  735. aws_s3_client_lock_synced_data(client);
  736. --client->synced_data.num_endpoints_allocated;
  737. s_s3_client_schedule_process_work_synced(client);
  738. aws_s3_client_unlock_synced_data(client);
  739. }
  740. /* END CRITICAL SECTION */
  741. }
  742. static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
  743. struct aws_s3_client *client,
  744. const struct aws_s3_meta_request_options *options) {
  745. AWS_PRECONDITION(client);
  746. AWS_PRECONDITION(options);
  747. struct aws_http_headers *initial_message_headers = aws_http_message_get_headers(options->message);
  748. AWS_ASSERT(initial_message_headers);
  749. uint64_t content_length = 0;
  750. struct aws_byte_cursor content_length_cursor;
  751. bool content_length_header_found = false;
  752. if (!aws_http_headers_get(initial_message_headers, g_content_length_header_name, &content_length_cursor)) {
  753. if (aws_byte_cursor_utf8_parse_u64(content_length_cursor, &content_length)) {
  754. AWS_LOGF_ERROR(
  755. AWS_LS_S3_META_REQUEST,
  756. "Could not parse Content-Length header. header value is:" PRInSTR "",
  757. AWS_BYTE_CURSOR_PRI(content_length_cursor));
  758. aws_raise_error(AWS_ERROR_S3_INVALID_CONTENT_LENGTH_HEADER);
  759. return NULL;
  760. }
  761. content_length_header_found = true;
  762. }
  763. /* Call the appropriate meta-request new function. */
  764. switch (options->type) {
  765. case AWS_S3_META_REQUEST_TYPE_GET_OBJECT: {
  766. /* If the initial request already has partNumber, the request is not
  767. * splittable(?). Treat it as a Default request.
  768. * TODO: Still need tests to verify that the request of a part is
  769. * splittable or not */
  770. if (aws_http_headers_has(initial_message_headers, aws_byte_cursor_from_c_str("partNumber"))) {
  771. return aws_s3_meta_request_default_new(client->allocator, client, content_length, false, options);
  772. }
  773. return aws_s3_meta_request_auto_ranged_get_new(client->allocator, client, client->part_size, options);
  774. }
  775. case AWS_S3_META_REQUEST_TYPE_PUT_OBJECT: {
  776. if (!content_length_header_found) {
  777. AWS_LOGF_ERROR(
  778. AWS_LS_S3_META_REQUEST,
  779. "Could not create auto-ranged-put meta request; there is no Content-Length header present.");
  780. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  781. return NULL;
  782. }
  783. struct aws_input_stream *input_stream = aws_http_message_get_body_stream(options->message);
  784. if ((input_stream == NULL) && (options->send_filepath.len == 0)) {
  785. AWS_LOGF_ERROR(
  786. AWS_LS_S3_META_REQUEST,
  787. "Could not create auto-ranged-put meta request; filepath or body stream must be set.");
  788. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  789. return NULL;
  790. }
  791. if (options->resume_token == NULL) {
  792. size_t client_part_size = client->part_size;
  793. size_t client_max_part_size = client->max_part_size;
  794. if (client_part_size < g_s3_min_upload_part_size) {
  795. AWS_LOGF_WARN(
  796. AWS_LS_S3_META_REQUEST,
  797. "Client config part size of %" PRIu64 " is less than the minimum upload part size of %" PRIu64
  798. ". Using to the minimum part-size for upload.",
  799. (uint64_t)client_part_size,
  800. (uint64_t)g_s3_min_upload_part_size);
  801. client_part_size = g_s3_min_upload_part_size;
  802. }
  803. if (client_max_part_size < g_s3_min_upload_part_size) {
  804. AWS_LOGF_WARN(
  805. AWS_LS_S3_META_REQUEST,
  806. "Client config max part size of %" PRIu64
  807. " is less than the minimum upload part size of %" PRIu64
  808. ". Clamping to the minimum part-size for upload.",
  809. (uint64_t)client_max_part_size,
  810. (uint64_t)g_s3_min_upload_part_size);
  811. client_max_part_size = g_s3_min_upload_part_size;
  812. }
  813. if (content_length <= client_part_size) {
  814. return aws_s3_meta_request_default_new(
  815. client->allocator,
  816. client,
  817. content_length,
  818. client->compute_content_md5 == AWS_MR_CONTENT_MD5_ENABLED &&
  819. !aws_http_headers_has(initial_message_headers, g_content_md5_header_name),
  820. options);
  821. } else {
  822. if (aws_s3_message_util_check_checksum_header(options->message)) {
  823. /* The checksum header has been set and the request will be splitted. We fail the request */
  824. AWS_LOGF_ERROR(
  825. AWS_LS_S3_META_REQUEST,
  826. "Could not create auto-ranged-put meta request; checksum headers has been set for "
  827. "auto-ranged-put that will be split. Pre-calculated checksums are only supported for "
  828. "single "
  829. "part upload.");
  830. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  831. return NULL;
  832. }
  833. }
  834. uint64_t part_size_uint64 = content_length / (uint64_t)g_s3_max_num_upload_parts;
  835. if (part_size_uint64 > SIZE_MAX) {
  836. AWS_LOGF_ERROR(
  837. AWS_LS_S3_META_REQUEST,
  838. "Could not create auto-ranged-put meta request; required part size of %" PRIu64
  839. " bytes is too large for platform.",
  840. part_size_uint64);
  841. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  842. return NULL;
  843. }
  844. size_t part_size = (size_t)part_size_uint64;
  845. if (part_size > client_max_part_size) {
  846. AWS_LOGF_ERROR(
  847. AWS_LS_S3_META_REQUEST,
  848. "Could not create auto-ranged-put meta request; required part size for put request is %" PRIu64
  849. ", but current maximum part size is %" PRIu64,
  850. (uint64_t)part_size,
  851. (uint64_t)client_max_part_size);
  852. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  853. return NULL;
  854. }
  855. if (part_size < client_part_size) {
  856. part_size = client_part_size;
  857. }
  858. uint32_t num_parts = (uint32_t)(content_length / part_size);
  859. if ((content_length % part_size) > 0) {
  860. ++num_parts;
  861. }
  862. return aws_s3_meta_request_auto_ranged_put_new(
  863. client->allocator, client, part_size, content_length, num_parts, options);
  864. } else {
  865. /* dont pass part size and total num parts. constructor will pick it up from token */
  866. return aws_s3_meta_request_auto_ranged_put_new(
  867. client->allocator, client, 0, content_length, 0, options);
  868. }
  869. }
  870. case AWS_S3_META_REQUEST_TYPE_COPY_OBJECT: {
  871. /* TODO: support copy object correctly. */
  872. AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "CopyObject is not currently supported");
  873. aws_raise_error(AWS_ERROR_UNIMPLEMENTED);
  874. return NULL;
  875. }
  876. case AWS_S3_META_REQUEST_TYPE_DEFAULT:
  877. return aws_s3_meta_request_default_new(client->allocator, client, content_length, false, options);
  878. default:
  879. AWS_FATAL_ASSERT(false);
  880. }
  881. return NULL;
  882. }
  883. static void s_s3_client_push_meta_request_synced(
  884. struct aws_s3_client *client,
  885. struct aws_s3_meta_request *meta_request) {
  886. AWS_PRECONDITION(client);
  887. AWS_PRECONDITION(meta_request);
  888. ASSERT_SYNCED_DATA_LOCK_HELD(client);
  889. struct aws_s3_meta_request_work *meta_request_work =
  890. aws_mem_calloc(client->allocator, 1, sizeof(struct aws_s3_meta_request_work));
  891. aws_s3_meta_request_acquire(meta_request);
  892. meta_request_work->meta_request = meta_request;
  893. aws_linked_list_push_back(&client->synced_data.pending_meta_request_work, &meta_request_work->node);
  894. }
  895. static void s_s3_client_schedule_process_work_synced(struct aws_s3_client *client) {
  896. AWS_PRECONDITION(client);
  897. AWS_PRECONDITION(client->vtable);
  898. AWS_PRECONDITION(client->vtable->schedule_process_work_synced);
  899. ASSERT_SYNCED_DATA_LOCK_HELD(client);
  900. client->vtable->schedule_process_work_synced(client);
  901. }
  902. static void s_s3_client_schedule_process_work_synced_default(struct aws_s3_client *client) {
  903. ASSERT_SYNCED_DATA_LOCK_HELD(client);
  904. if (client->synced_data.process_work_task_scheduled) {
  905. return;
  906. }
  907. aws_task_init(
  908. &client->synced_data.process_work_task, s_s3_client_process_work_task, client, "s3_client_process_work_task");
  909. aws_event_loop_schedule_task_now(client->process_work_event_loop, &client->synced_data.process_work_task);
  910. client->synced_data.process_work_task_scheduled = true;
  911. }
  912. void aws_s3_client_schedule_process_work(struct aws_s3_client *client) {
  913. AWS_PRECONDITION(client);
  914. /* BEGIN CRITICAL SECTION */
  915. {
  916. aws_s3_client_lock_synced_data(client);
  917. s_s3_client_schedule_process_work_synced(client);
  918. aws_s3_client_unlock_synced_data(client);
  919. }
  920. /* END CRITICAL SECTION */
  921. }
  922. static void s_s3_client_remove_meta_request_threaded(
  923. struct aws_s3_client *client,
  924. struct aws_s3_meta_request *meta_request) {
  925. AWS_PRECONDITION(client);
  926. AWS_PRECONDITION(meta_request);
  927. (void)client;
  928. aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
  929. meta_request->client_process_work_threaded_data.scheduled = false;
  930. aws_s3_meta_request_release(meta_request);
  931. }
  932. /* Task function for trying to find a request that can be processed. */
  933. static void s_s3_client_process_work_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
  934. AWS_PRECONDITION(task);
  935. (void)task;
  936. (void)task_status;
  937. /* Client keeps a reference to the event loop group; a 'canceled' status should not happen.*/
  938. AWS_ASSERT(task_status == AWS_TASK_STATUS_RUN_READY);
  939. struct aws_s3_client *client = arg;
  940. AWS_PRECONDITION(client);
  941. AWS_PRECONDITION(client->vtable);
  942. AWS_PRECONDITION(client->vtable->process_work);
  943. client->vtable->process_work(client);
  944. }
  945. static void s_s3_client_process_work_default(struct aws_s3_client *client) {
  946. AWS_PRECONDITION(client);
  947. AWS_PRECONDITION(client->vtable);
  948. AWS_PRECONDITION(client->vtable->finish_destroy);
  949. struct aws_linked_list meta_request_work_list;
  950. aws_linked_list_init(&meta_request_work_list);
  951. /*******************/
  952. /* Step 1: Move relevant data into thread local memory. */
  953. /*******************/
  954. AWS_LOGF_DEBUG(
  955. AWS_LS_S3_CLIENT,
  956. "id=%p s_s3_client_process_work_default - Moving relevant synced_data into threaded_data.",
  957. (void *)client);
  958. /* BEGIN CRITICAL SECTION */
  959. aws_s3_client_lock_synced_data(client);
  960. /* Once we exit this mutex, someone can reschedule this task. */
  961. client->synced_data.process_work_task_scheduled = false;
  962. client->synced_data.process_work_task_in_progress = true;
  963. aws_linked_list_swap_contents(&meta_request_work_list, &client->synced_data.pending_meta_request_work);
  964. uint32_t num_requests_queued =
  965. aws_s3_client_queue_requests_threaded(client, &client->synced_data.prepared_requests, false);
  966. {
  967. int sub_result = aws_sub_u32_checked(
  968. client->threaded_data.num_requests_being_prepared,
  969. num_requests_queued,
  970. &client->threaded_data.num_requests_being_prepared);
  971. AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
  972. (void)sub_result;
  973. }
  974. {
  975. int sub_result = aws_sub_u32_checked(
  976. client->threaded_data.num_requests_being_prepared,
  977. client->synced_data.num_failed_prepare_requests,
  978. &client->threaded_data.num_requests_being_prepared);
  979. client->synced_data.num_failed_prepare_requests = 0;
  980. AWS_ASSERT(sub_result == AWS_OP_SUCCESS);
  981. (void)sub_result;
  982. }
  983. uint32_t num_endpoints_in_table = (uint32_t)aws_hash_table_get_entry_count(&client->synced_data.endpoints);
  984. uint32_t num_endpoints_allocated = client->synced_data.num_endpoints_allocated;
  985. aws_s3_client_unlock_synced_data(client);
  986. /* END CRITICAL SECTION */
  987. /*******************/
  988. /* Step 2: Push meta requests into the thread local list if they haven't already been scheduled. */
  989. /*******************/
  990. AWS_LOGF_DEBUG(
  991. AWS_LS_S3_CLIENT, "id=%p s_s3_client_process_work_default - Processing any new meta requests.", (void *)client);
  992. while (!aws_linked_list_empty(&meta_request_work_list)) {
  993. struct aws_linked_list_node *node = aws_linked_list_pop_back(&meta_request_work_list);
  994. struct aws_s3_meta_request_work *meta_request_work =
  995. AWS_CONTAINER_OF(node, struct aws_s3_meta_request_work, node);
  996. AWS_FATAL_ASSERT(meta_request_work != NULL);
  997. AWS_FATAL_ASSERT(meta_request_work->meta_request != NULL);
  998. struct aws_s3_meta_request *meta_request = meta_request_work->meta_request;
  999. if (!meta_request->client_process_work_threaded_data.scheduled) {
  1000. aws_linked_list_push_back(
  1001. &client->threaded_data.meta_requests, &meta_request->client_process_work_threaded_data.node);
  1002. meta_request->client_process_work_threaded_data.scheduled = true;
  1003. } else {
  1004. meta_request = aws_s3_meta_request_release(meta_request);
  1005. }
  1006. aws_mem_release(client->allocator, meta_request_work);
  1007. }
  1008. /*******************/
  1009. /* Step 3: Update relevant meta requests and connections. */
  1010. /*******************/
  1011. {
  1012. AWS_LOGF_DEBUG(AWS_LS_S3_CLIENT, "id=%p Updating meta requests.", (void *)client);
  1013. aws_s3_client_update_meta_requests_threaded(client);
  1014. AWS_LOGF_DEBUG(
  1015. AWS_LS_S3_CLIENT, "id=%p Updating connections, assigning requests where possible.", (void *)client);
  1016. aws_s3_client_update_connections_threaded(client);
  1017. }
  1018. /*******************/
  1019. /* Step 4: Log client stats. */
  1020. /*******************/
  1021. {
  1022. uint32_t num_requests_tracked_requests = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);
  1023. uint32_t num_auto_ranged_get_network_io =
  1024. s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_GET_OBJECT);
  1025. uint32_t num_auto_ranged_put_network_io =
  1026. s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_PUT_OBJECT);
  1027. uint32_t num_auto_default_network_io =
  1028. s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_DEFAULT);
  1029. uint32_t num_requests_network_io =
  1030. s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_MAX);
  1031. uint32_t num_requests_stream_queued_waiting =
  1032. (uint32_t)aws_atomic_load_int(&client->stats.num_requests_stream_queued_waiting);
  1033. uint32_t num_requests_streaming = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_streaming);
  1034. uint32_t total_approx_requests = num_requests_network_io + num_requests_stream_queued_waiting +
  1035. num_requests_streaming + client->threaded_data.num_requests_being_prepared +
  1036. client->threaded_data.request_queue_size;
  1037. AWS_LOGF(
  1038. s_log_level_client_stats,
  1039. AWS_LS_S3_CLIENT_STATS,
  1040. "id=%p Requests-in-flight(approx/exact):%d/%d Requests-preparing:%d Requests-queued:%d "
  1041. "Requests-network(get/put/default/total):%d/%d/%d/%d Requests-streaming-waiting:%d Requests-streaming:%d "
  1042. " Endpoints(in-table/allocated):%d/%d",
  1043. (void *)client,
  1044. total_approx_requests,
  1045. num_requests_tracked_requests,
  1046. client->threaded_data.num_requests_being_prepared,
  1047. client->threaded_data.request_queue_size,
  1048. num_auto_ranged_get_network_io,
  1049. num_auto_ranged_put_network_io,
  1050. num_auto_default_network_io,
  1051. num_requests_network_io,
  1052. num_requests_stream_queued_waiting,
  1053. num_requests_streaming,
  1054. num_endpoints_in_table,
  1055. num_endpoints_allocated);
  1056. }
  1057. /*******************/
  1058. /* Step 5: Check for client shutdown. */
  1059. /*******************/
  1060. {
  1061. /* BEGIN CRITICAL SECTION */
  1062. aws_s3_client_lock_synced_data(client);
  1063. client->synced_data.process_work_task_in_progress = false;
  1064. /* This flag should never be set twice. If it was, that means a double-free could occur.*/
  1065. AWS_ASSERT(!client->synced_data.finish_destroy);
  1066. bool finish_destroy = client->synced_data.active == false &&
  1067. client->synced_data.start_destroy_executing == false &&
  1068. client->synced_data.body_streaming_elg_allocated == false &&
  1069. client->synced_data.process_work_task_scheduled == false &&
  1070. client->synced_data.process_work_task_in_progress == false &&
  1071. client->synced_data.num_endpoints_allocated == 0;
  1072. client->synced_data.finish_destroy = finish_destroy;
  1073. if (!client->synced_data.active) {
  1074. AWS_LOGF_DEBUG(
  1075. AWS_LS_S3_CLIENT,
  1076. "id=%p Client shutdown progress: starting_destroy_executing=%d body_streaming_elg_allocated=%d "
  1077. "process_work_task_scheduled=%d process_work_task_in_progress=%d num_endpoints_allocated=%d "
  1078. "finish_destroy=%d",
  1079. (void *)client,
  1080. (int)client->synced_data.start_destroy_executing,
  1081. (int)client->synced_data.body_streaming_elg_allocated,
  1082. (int)client->synced_data.process_work_task_scheduled,
  1083. (int)client->synced_data.process_work_task_in_progress,
  1084. (int)client->synced_data.num_endpoints_allocated,
  1085. (int)client->synced_data.finish_destroy);
  1086. }
  1087. aws_s3_client_unlock_synced_data(client);
  1088. /* END CRITICAL SECTION */
  1089. if (finish_destroy) {
  1090. client->vtable->finish_destroy(client);
  1091. }
  1092. }
  1093. }
  1094. static void s_s3_client_prepare_callback_queue_request(
  1095. struct aws_s3_meta_request *meta_request,
  1096. struct aws_s3_request *request,
  1097. int error_code,
  1098. void *user_data);
  1099. void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
  1100. AWS_PRECONDITION(client);
  1101. const uint32_t max_requests_in_flight = aws_s3_client_get_max_requests_in_flight(client);
  1102. const uint32_t max_requests_prepare = aws_s3_client_get_max_requests_prepare(client);
  1103. struct aws_linked_list meta_requests_work_remaining;
  1104. aws_linked_list_init(&meta_requests_work_remaining);
  1105. uint32_t num_requests_in_flight = (uint32_t)aws_atomic_load_int(&client->stats.num_requests_in_flight);
  1106. const uint32_t pass_flags[] = {
  1107. AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE,
  1108. 0,
  1109. };
  1110. const uint32_t num_passes = AWS_ARRAY_SIZE(pass_flags);
  1111. for (uint32_t pass_index = 0; pass_index < num_passes; ++pass_index) {
  1112. /* While:
  1113. * * Number of being-prepared + already-prepared-and-queued requests is less than the max that can be in the
  1114. * preparation stage.
  1115. * * Total number of requests tracked by the client is less than the max tracked ("in flight") requests.
  1116. * * There are meta requests to get requests from.
  1117. *
  1118. * Then update meta requests to get new requests that can then be prepared (reading from any streams, signing,
  1119. * etc.) for sending.
  1120. */
  1121. while ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) <
  1122. max_requests_prepare &&
  1123. num_requests_in_flight < max_requests_in_flight &&
  1124. !aws_linked_list_empty(&client->threaded_data.meta_requests)) {
  1125. struct aws_linked_list_node *meta_request_node =
  1126. aws_linked_list_begin(&client->threaded_data.meta_requests);
  1127. struct aws_s3_meta_request *meta_request =
  1128. AWS_CONTAINER_OF(meta_request_node, struct aws_s3_meta_request, client_process_work_threaded_data);
  1129. struct aws_s3_endpoint *endpoint = meta_request->endpoint;
  1130. AWS_ASSERT(endpoint != NULL);
  1131. AWS_ASSERT(client->vtable->get_host_address_count);
  1132. size_t num_known_vips = client->vtable->get_host_address_count(
  1133. client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
  1134. /* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in
  1135. * ramping up requests just yet. If there is already enough in the queue for one address (even if those
  1136. * aren't for this particular endpoint) we skip over this meta request for now. */
  1137. if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared +
  1138. client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) {
  1139. aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
  1140. aws_linked_list_push_back(
  1141. &meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
  1142. continue;
  1143. }
  1144. struct aws_s3_request *request = NULL;
  1145. /* Try to grab the next request from the meta request. */
  1146. bool work_remaining = aws_s3_meta_request_update(meta_request, pass_flags[pass_index], &request);
  1147. if (work_remaining) {
  1148. /* If there is work remaining, but we didn't get a request back, take the meta request out of the
  1149. * list so that we don't use it again during this function, with the intention of putting it back in
  1150. * the list before this function ends. */
  1151. if (request == NULL) {
  1152. aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
  1153. aws_linked_list_push_back(
  1154. &meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
  1155. } else {
  1156. request->tracked_by_client = true;
  1157. ++client->threaded_data.num_requests_being_prepared;
  1158. num_requests_in_flight =
  1159. (uint32_t)aws_atomic_fetch_add(&client->stats.num_requests_in_flight, 1) + 1;
  1160. aws_s3_meta_request_prepare_request(
  1161. meta_request, request, s_s3_client_prepare_callback_queue_request, client);
  1162. }
  1163. } else {
  1164. s_s3_client_remove_meta_request_threaded(client, meta_request);
  1165. }
  1166. }
  1167. aws_linked_list_move_all_front(&client->threaded_data.meta_requests, &meta_requests_work_remaining);
  1168. }
  1169. }
  1170. static void s_s3_client_meta_request_finished_request(
  1171. struct aws_s3_client *client,
  1172. struct aws_s3_meta_request *meta_request,
  1173. struct aws_s3_request *request,
  1174. int error_code) {
  1175. AWS_PRECONDITION(client);
  1176. AWS_PRECONDITION(request);
  1177. if (request->tracked_by_client) {
  1178. /* BEGIN CRITICAL SECTION */
  1179. aws_s3_client_lock_synced_data(client);
  1180. aws_atomic_fetch_sub(&client->stats.num_requests_in_flight, 1);
  1181. s_s3_client_schedule_process_work_synced(client);
  1182. aws_s3_client_unlock_synced_data(client);
  1183. /* END CRITICAL SECTION */
  1184. }
  1185. aws_s3_meta_request_finished_request(meta_request, request, error_code);
  1186. }
  1187. static void s_s3_client_prepare_callback_queue_request(
  1188. struct aws_s3_meta_request *meta_request,
  1189. struct aws_s3_request *request,
  1190. int error_code,
  1191. void *user_data) {
  1192. AWS_PRECONDITION(meta_request);
  1193. AWS_PRECONDITION(request);
  1194. struct aws_s3_client *client = user_data;
  1195. AWS_PRECONDITION(client);
  1196. if (error_code != AWS_ERROR_SUCCESS) {
  1197. s_s3_client_meta_request_finished_request(client, meta_request, request, error_code);
  1198. aws_s3_request_release(request);
  1199. request = NULL;
  1200. }
  1201. /* BEGIN CRITICAL SECTION */
  1202. {
  1203. aws_s3_client_lock_synced_data(client);
  1204. if (error_code == AWS_ERROR_SUCCESS) {
  1205. aws_linked_list_push_back(&client->synced_data.prepared_requests, &request->node);
  1206. } else {
  1207. ++client->synced_data.num_failed_prepare_requests;
  1208. }
  1209. s_s3_client_schedule_process_work_synced(client);
  1210. aws_s3_client_unlock_synced_data(client);
  1211. }
  1212. /* END CRITICAL SECTION */
  1213. }
  1214. void aws_s3_client_update_connections_threaded(struct aws_s3_client *client) {
  1215. AWS_PRECONDITION(client);
  1216. AWS_PRECONDITION(client->vtable);
  1217. struct aws_linked_list left_over_requests;
  1218. aws_linked_list_init(&left_over_requests);
  1219. while (s_s3_client_get_num_requests_network_io(client, AWS_S3_META_REQUEST_TYPE_MAX) <
  1220. aws_s3_client_get_max_active_connections(client, NULL) &&
  1221. !aws_linked_list_empty(&client->threaded_data.request_queue)) {
  1222. struct aws_s3_request *request = aws_s3_client_dequeue_request_threaded(client);
  1223. const uint32_t max_active_connections = aws_s3_client_get_max_active_connections(client, request->meta_request);
  1224. /* Unless the request is marked "always send", if this meta request has a finish result, then finish the request
  1225. * now and release it. */
  1226. if (!request->always_send && aws_s3_meta_request_has_finish_result(request->meta_request)) {
  1227. s_s3_client_meta_request_finished_request(client, request->meta_request, request, AWS_ERROR_S3_CANCELED);
  1228. aws_s3_request_release(request);
  1229. request = NULL;
  1230. } else if (
  1231. s_s3_client_get_num_requests_network_io(client, request->meta_request->type) < max_active_connections) {
  1232. s_s3_client_create_connection_for_request(client, request);
  1233. } else {
  1234. /* Push the request into the left-over list to be used in a future call of this function. */
  1235. aws_linked_list_push_back(&left_over_requests, &request->node);
  1236. }
  1237. }
  1238. aws_s3_client_queue_requests_threaded(client, &left_over_requests, true);
  1239. }
  1240. static void s_s3_client_acquired_retry_token(
  1241. struct aws_retry_strategy *retry_strategy,
  1242. int error_code,
  1243. struct aws_retry_token *token,
  1244. void *user_data);
  1245. static void s_s3_client_retry_ready(struct aws_retry_token *token, int error_code, void *user_data);
  1246. static void s_s3_client_create_connection_for_request_default(
  1247. struct aws_s3_client *client,
  1248. struct aws_s3_request *request);
  1249. static void s_s3_client_create_connection_for_request(struct aws_s3_client *client, struct aws_s3_request *request) {
  1250. AWS_PRECONDITION(client);
  1251. AWS_PRECONDITION(client->vtable);
  1252. if (client->vtable->create_connection_for_request) {
  1253. client->vtable->create_connection_for_request(client, request);
  1254. return;
  1255. }
  1256. s_s3_client_create_connection_for_request_default(client, request);
  1257. }
  1258. static void s_s3_client_create_connection_for_request_default(
  1259. struct aws_s3_client *client,
  1260. struct aws_s3_request *request) {
  1261. AWS_PRECONDITION(client);
  1262. AWS_PRECONDITION(request);
  1263. struct aws_s3_meta_request *meta_request = request->meta_request;
  1264. AWS_PRECONDITION(meta_request);
  1265. aws_atomic_fetch_add(&client->stats.num_requests_network_io[meta_request->type], 1);
  1266. struct aws_s3_connection *connection = aws_mem_calloc(client->allocator, 1, sizeof(struct aws_s3_connection));
  1267. connection->endpoint = aws_s3_endpoint_acquire(meta_request->endpoint, false /*already_holding_lock*/);
  1268. connection->request = request;
  1269. struct aws_byte_cursor host_header_value;
  1270. AWS_ZERO_STRUCT(host_header_value);
  1271. struct aws_http_headers *message_headers = aws_http_message_get_headers(meta_request->initial_request_message);
  1272. AWS_ASSERT(message_headers);
  1273. int get_header_result = aws_http_headers_get(message_headers, g_host_header_name, &host_header_value);
  1274. AWS_ASSERT(get_header_result == AWS_OP_SUCCESS);
  1275. (void)get_header_result;
  1276. if (aws_retry_strategy_acquire_retry_token(
  1277. client->retry_strategy, &host_header_value, s_s3_client_acquired_retry_token, connection, 0)) {
  1278. AWS_LOGF_ERROR(
  1279. AWS_LS_S3_CLIENT,
  1280. "id=%p Client could not acquire retry token for request %p due to error %d (%s)",
  1281. (void *)client,
  1282. (void *)request,
  1283. aws_last_error_or_unknown(),
  1284. aws_error_str(aws_last_error_or_unknown()));
  1285. goto reset_connection;
  1286. }
  1287. return;
  1288. reset_connection:
  1289. aws_s3_client_notify_connection_finished(
  1290. client, connection, aws_last_error_or_unknown(), AWS_S3_CONNECTION_FINISH_CODE_FAILED);
  1291. }
  1292. static void s_s3_client_acquired_retry_token(
  1293. struct aws_retry_strategy *retry_strategy,
  1294. int error_code,
  1295. struct aws_retry_token *token,
  1296. void *user_data) {
  1297. AWS_PRECONDITION(retry_strategy);
  1298. (void)retry_strategy;
  1299. struct aws_s3_connection *connection = user_data;
  1300. AWS_PRECONDITION(connection);
  1301. struct aws_s3_request *request = connection->request;
  1302. AWS_PRECONDITION(request);
  1303. struct aws_s3_meta_request *meta_request = request->meta_request;
  1304. AWS_PRECONDITION(meta_request);
  1305. struct aws_s3_endpoint *endpoint = meta_request->endpoint;
  1306. AWS_ASSERT(endpoint != NULL);
  1307. struct aws_s3_client *client = endpoint->client;
  1308. AWS_ASSERT(client != NULL);
  1309. if (error_code != AWS_ERROR_SUCCESS) {
  1310. AWS_LOGF_ERROR(
  1311. AWS_LS_S3_CLIENT,
  1312. "id=%p Client could not get retry token for connection %p processing request %p due to error %d (%s)",
  1313. (void *)client,
  1314. (void *)connection,
  1315. (void *)request,
  1316. error_code,
  1317. aws_error_str(error_code));
  1318. goto error_clean_up;
  1319. }
  1320. AWS_ASSERT(token);
  1321. connection->retry_token = token;
  1322. AWS_ASSERT(client->vtable->acquire_http_connection);
  1323. /* client needs to be kept alive until s_s3_client_on_acquire_http_connection completes */
  1324. /* TODO: not a blocker, consider managing the life time of aws_s3_client from aws_s3_endpoint to simplify usage */
  1325. aws_s3_client_acquire(client);
  1326. client->vtable->acquire_http_connection(
  1327. endpoint->http_connection_manager, s_s3_client_on_acquire_http_connection, connection);
  1328. return;
  1329. error_clean_up:
  1330. aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
  1331. }
  1332. static void s_s3_client_on_acquire_http_connection(
  1333. struct aws_http_connection *incoming_http_connection,
  1334. int error_code,
  1335. void *user_data) {
  1336. struct aws_s3_connection *connection = user_data;
  1337. AWS_PRECONDITION(connection);
  1338. struct aws_s3_request *request = connection->request;
  1339. AWS_PRECONDITION(request);
  1340. struct aws_s3_meta_request *meta_request = request->meta_request;
  1341. AWS_PRECONDITION(meta_request);
  1342. struct aws_s3_endpoint *endpoint = meta_request->endpoint;
  1343. AWS_ASSERT(endpoint != NULL);
  1344. struct aws_s3_client *client = endpoint->client;
  1345. AWS_ASSERT(client != NULL);
  1346. if (error_code != AWS_ERROR_SUCCESS) {
  1347. AWS_LOGF_ERROR(
  1348. AWS_LS_S3_ENDPOINT,
  1349. "id=%p: Could not acquire connection due to error code %d (%s)",
  1350. (void *)endpoint,
  1351. error_code,
  1352. aws_error_str(error_code));
  1353. if (error_code == AWS_IO_DNS_INVALID_NAME) {
  1354. goto error_fail;
  1355. }
  1356. goto error_retry;
  1357. }
  1358. connection->http_connection = incoming_http_connection;
  1359. aws_s3_meta_request_send_request(meta_request, connection);
  1360. aws_s3_client_release(client); /* kept since this callback was registered */
  1361. return;
  1362. error_retry:
  1363. aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_RETRY);
  1364. aws_s3_client_release(client); /* kept since this callback was registered */
  1365. return;
  1366. error_fail:
  1367. aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
  1368. aws_s3_client_release(client); /* kept since this callback was registered */
  1369. }
  1370. /* Called by aws_s3_meta_request when it has finished using this connection for a single request. */
  1371. void aws_s3_client_notify_connection_finished(
  1372. struct aws_s3_client *client,
  1373. struct aws_s3_connection *connection,
  1374. int error_code,
  1375. enum aws_s3_connection_finish_code finish_code) {
  1376. AWS_PRECONDITION(client);
  1377. AWS_PRECONDITION(connection);
  1378. struct aws_s3_request *request = connection->request;
  1379. AWS_PRECONDITION(request);
  1380. struct aws_s3_meta_request *meta_request = request->meta_request;
  1381. AWS_PRECONDITION(meta_request);
  1382. AWS_PRECONDITION(meta_request->initial_request_message);
  1383. struct aws_s3_endpoint *endpoint = meta_request->endpoint;
  1384. AWS_PRECONDITION(endpoint);
  1385. /* If we're trying to setup a retry... */
  1386. if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_RETRY) {
  1387. if (connection->retry_token == NULL) {
  1388. AWS_LOGF_ERROR(
  1389. AWS_LS_S3_CLIENT,
  1390. "id=%p Client could not schedule retry of request %p for meta request %p, as retry token is NULL.",
  1391. (void *)client,
  1392. (void *)request,
  1393. (void *)meta_request);
  1394. goto reset_connection;
  1395. }
  1396. if (aws_s3_meta_request_is_finished(meta_request)) {
  1397. AWS_LOGF_DEBUG(
  1398. AWS_LS_S3_CLIENT,
  1399. "id=%p Client not scheduling retry of request %p for meta request %p with token %p because meta "
  1400. "request has been flagged as finished.",
  1401. (void *)client,
  1402. (void *)request,
  1403. (void *)meta_request,
  1404. (void *)connection->retry_token);
  1405. goto reset_connection;
  1406. }
  1407. AWS_LOGF_DEBUG(
  1408. AWS_LS_S3_CLIENT,
  1409. "id=%p Client scheduling retry of request %p for meta request %p with token %p.",
  1410. (void *)client,
  1411. (void *)request,
  1412. (void *)meta_request,
  1413. (void *)connection->retry_token);
  1414. enum aws_retry_error_type error_type = AWS_RETRY_ERROR_TYPE_TRANSIENT;
  1415. switch (error_code) {
  1416. case AWS_ERROR_S3_INTERNAL_ERROR:
  1417. error_type = AWS_RETRY_ERROR_TYPE_SERVER_ERROR;
  1418. break;
  1419. case AWS_ERROR_S3_SLOW_DOWN:
  1420. error_type = AWS_RETRY_ERROR_TYPE_THROTTLING;
  1421. break;
  1422. }
  1423. if (connection->http_connection != NULL) {
  1424. AWS_ASSERT(endpoint->http_connection_manager);
  1425. aws_http_connection_manager_release_connection(
  1426. endpoint->http_connection_manager, connection->http_connection);
  1427. connection->http_connection = NULL;
  1428. }
  1429. /* Ask the retry strategy to schedule a retry of the request. */
  1430. if (aws_retry_strategy_schedule_retry(
  1431. connection->retry_token, error_type, s_s3_client_retry_ready, connection)) {
  1432. AWS_LOGF_ERROR(
  1433. AWS_LS_S3_CLIENT,
  1434. "id=%p Client could not retry request %p for meta request %p with token %p due to error %d (%s)",
  1435. (void *)client,
  1436. (void *)request,
  1437. (void *)meta_request,
  1438. (void *)connection->retry_token,
  1439. aws_last_error_or_unknown(),
  1440. aws_error_str(aws_last_error_or_unknown()));
  1441. goto reset_connection;
  1442. }
  1443. return;
  1444. }
  1445. reset_connection:
  1446. if (connection->retry_token != NULL) {
  1447. /* If we have a retry token and successfully finished, record that success. */
  1448. if (finish_code == AWS_S3_CONNECTION_FINISH_CODE_SUCCESS) {
  1449. aws_retry_token_record_success(connection->retry_token);
  1450. }
  1451. aws_retry_token_release(connection->retry_token);
  1452. connection->retry_token = NULL;
  1453. }
  1454. /* If we weren't successful, and we're here, that means this failure is not eligible for a retry. So finish the
  1455. * request, and close our HTTP connection. */
  1456. if (finish_code != AWS_S3_CONNECTION_FINISH_CODE_SUCCESS) {
  1457. if (connection->http_connection != NULL) {
  1458. aws_http_connection_close(connection->http_connection);
  1459. }
  1460. }
  1461. aws_atomic_fetch_sub(&client->stats.num_requests_network_io[meta_request->type], 1);
  1462. s_s3_client_meta_request_finished_request(client, meta_request, request, error_code);
  1463. if (connection->http_connection != NULL) {
  1464. AWS_ASSERT(endpoint->http_connection_manager);
  1465. aws_http_connection_manager_release_connection(endpoint->http_connection_manager, connection->http_connection);
  1466. connection->http_connection = NULL;
  1467. }
  1468. if (connection->request != NULL) {
  1469. aws_s3_request_release(connection->request);
  1470. connection->request = NULL;
  1471. }
  1472. aws_retry_token_release(connection->retry_token);
  1473. connection->retry_token = NULL;
  1474. aws_s3_endpoint_release(connection->endpoint);
  1475. connection->endpoint = NULL;
  1476. aws_mem_release(client->allocator, connection);
  1477. connection = NULL;
  1478. /* BEGIN CRITICAL SECTION */
  1479. {
  1480. aws_s3_client_lock_synced_data(client);
  1481. s_s3_client_schedule_process_work_synced(client);
  1482. aws_s3_client_unlock_synced_data(client);
  1483. }
  1484. /* END CRITICAL SECTION */
  1485. }
  1486. static void s_s3_client_prepare_request_callback_retry_request(
  1487. struct aws_s3_meta_request *meta_request,
  1488. struct aws_s3_request *request,
  1489. int error_code,
  1490. void *user_data);
  1491. static void s_s3_client_retry_ready(struct aws_retry_token *token, int error_code, void *user_data) {
  1492. AWS_PRECONDITION(token);
  1493. (void)token;
  1494. struct aws_s3_connection *connection = user_data;
  1495. AWS_PRECONDITION(connection);
  1496. struct aws_s3_request *request = connection->request;
  1497. AWS_PRECONDITION(request);
  1498. struct aws_s3_meta_request *meta_request = request->meta_request;
  1499. AWS_PRECONDITION(meta_request);
  1500. struct aws_s3_endpoint *endpoint = meta_request->endpoint;
  1501. AWS_PRECONDITION(endpoint);
  1502. struct aws_s3_client *client = endpoint->client;
  1503. AWS_PRECONDITION(client);
  1504. /* If we couldn't retry this request, then bail on the entire meta request. */
  1505. if (error_code != AWS_ERROR_SUCCESS) {
  1506. AWS_LOGF_ERROR(
  1507. AWS_LS_S3_CLIENT,
  1508. "id=%p Client could not retry request %p for meta request %p due to error %d (%s)",
  1509. (void *)client,
  1510. (void *)meta_request,
  1511. (void *)request,
  1512. error_code,
  1513. aws_error_str(error_code));
  1514. goto error_clean_up;
  1515. }
  1516. AWS_LOGF_DEBUG(
  1517. AWS_LS_S3_META_REQUEST,
  1518. "id=%p Client retrying request %p for meta request %p on connection %p with retry token %p",
  1519. (void *)client,
  1520. (void *)request,
  1521. (void *)meta_request,
  1522. (void *)connection,
  1523. (void *)connection->retry_token);
  1524. aws_s3_meta_request_prepare_request(
  1525. meta_request, request, s_s3_client_prepare_request_callback_retry_request, connection);
  1526. return;
  1527. error_clean_up:
  1528. aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
  1529. }
  1530. static void s_s3_client_prepare_request_callback_retry_request(
  1531. struct aws_s3_meta_request *meta_request,
  1532. struct aws_s3_request *request,
  1533. int error_code,
  1534. void *user_data) {
  1535. AWS_PRECONDITION(meta_request);
  1536. (void)meta_request;
  1537. AWS_PRECONDITION(request);
  1538. (void)request;
  1539. struct aws_s3_connection *connection = user_data;
  1540. AWS_PRECONDITION(connection);
  1541. struct aws_s3_endpoint *endpoint = meta_request->endpoint;
  1542. AWS_ASSERT(endpoint != NULL);
  1543. struct aws_s3_client *client = endpoint->client;
  1544. AWS_ASSERT(client != NULL);
  1545. if (error_code == AWS_ERROR_SUCCESS) {
  1546. AWS_ASSERT(connection->retry_token);
  1547. s_s3_client_acquired_retry_token(
  1548. client->retry_strategy, AWS_ERROR_SUCCESS, connection->retry_token, connection);
  1549. } else {
  1550. aws_s3_client_notify_connection_finished(client, connection, error_code, AWS_S3_CONNECTION_FINISH_CODE_FAILED);
  1551. }
  1552. }
  1553. static void s_resume_token_ref_count_zero_callback(void *arg) {
  1554. struct aws_s3_meta_request_resume_token *token = arg;
  1555. aws_string_destroy(token->multipart_upload_id);
  1556. aws_mem_release(token->allocator, token);
  1557. }
  1558. struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_new(struct aws_allocator *allocator) {
  1559. struct aws_s3_meta_request_resume_token *token =
  1560. aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_meta_request_resume_token));
  1561. token->allocator = allocator;
  1562. aws_ref_count_init(&token->ref_count, token, s_resume_token_ref_count_zero_callback);
  1563. return token;
  1564. }
  1565. struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_new_upload(
  1566. struct aws_allocator *allocator,
  1567. const struct aws_s3_upload_resume_token_options *options) {
  1568. AWS_PRECONDITION(allocator);
  1569. AWS_PRECONDITION(options);
  1570. struct aws_s3_meta_request_resume_token *token = aws_s3_meta_request_resume_token_new(allocator);
  1571. token->multipart_upload_id = aws_string_new_from_cursor(allocator, &options->upload_id);
  1572. token->part_size = options->part_size;
  1573. token->total_num_parts = options->total_num_parts;
  1574. token->num_parts_completed = options->num_parts_completed;
  1575. token->type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT;
  1576. return token;
  1577. }
  1578. struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_acquire(
  1579. struct aws_s3_meta_request_resume_token *resume_token) {
  1580. if (resume_token) {
  1581. aws_ref_count_acquire(&resume_token->ref_count);
  1582. }
  1583. return resume_token;
  1584. }
  1585. struct aws_s3_meta_request_resume_token *aws_s3_meta_request_resume_token_release(
  1586. struct aws_s3_meta_request_resume_token *resume_token) {
  1587. if (resume_token) {
  1588. aws_ref_count_release(&resume_token->ref_count);
  1589. }
  1590. return NULL;
  1591. }
  1592. enum aws_s3_meta_request_type aws_s3_meta_request_resume_token_type(
  1593. struct aws_s3_meta_request_resume_token *resume_token) {
  1594. AWS_FATAL_PRECONDITION(resume_token);
  1595. return resume_token->type;
  1596. }
  1597. size_t aws_s3_meta_request_resume_token_part_size(struct aws_s3_meta_request_resume_token *resume_token) {
  1598. AWS_FATAL_PRECONDITION(resume_token);
  1599. return resume_token->part_size;
  1600. }
  1601. size_t aws_s3_meta_request_resume_token_total_num_parts(struct aws_s3_meta_request_resume_token *resume_token) {
  1602. AWS_FATAL_PRECONDITION(resume_token);
  1603. return resume_token->total_num_parts;
  1604. }
  1605. size_t aws_s3_meta_request_resume_token_num_parts_completed(struct aws_s3_meta_request_resume_token *resume_token) {
  1606. AWS_FATAL_PRECONDITION(resume_token);
  1607. return resume_token->num_parts_completed;
  1608. }
  1609. struct aws_byte_cursor aws_s3_meta_request_resume_token_upload_id(
  1610. struct aws_s3_meta_request_resume_token *resume_token) {
  1611. AWS_FATAL_PRECONDITION(resume_token);
  1612. if (resume_token->type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT && resume_token->multipart_upload_id != NULL) {
  1613. return aws_byte_cursor_from_string(resume_token->multipart_upload_id);
  1614. }
  1615. return aws_byte_cursor_from_c_str("");
  1616. }