s3_auto_ranged_put.c 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include "aws/s3/private/s3_auto_ranged_put.h"
  6. #include "aws/s3/private/s3_checksums.h"
  7. #include "aws/s3/private/s3_list_parts.h"
  8. #include "aws/s3/private/s3_request_messages.h"
  9. #include "aws/s3/private/s3_util.h"
  10. #include <aws/common/encoding.h>
  11. #include <aws/common/string.h>
  12. #include <aws/io/stream.h>
  13. static const struct aws_byte_cursor s_upload_id = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("UploadId");
  14. static const size_t s_complete_multipart_upload_init_body_size_bytes = 512;
  15. static const size_t s_abort_multipart_upload_init_body_size_bytes = 512;
  16. static const struct aws_byte_cursor s_create_multipart_upload_copy_headers[] = {
  17. AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("x-amz-server-side-encryption-customer-algorithm"),
  18. AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("x-amz-server-side-encryption-customer-key-MD5"),
  19. AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("x-amz-server-side-encryption-context"),
  20. };
  21. static void s_s3_meta_request_auto_ranged_put_destroy(struct aws_s3_meta_request *meta_request);
  22. static bool s_s3_auto_ranged_put_update(
  23. struct aws_s3_meta_request *meta_request,
  24. uint32_t flags,
  25. struct aws_s3_request **out_request);
  26. static int s_s3_auto_ranged_put_prepare_request(
  27. struct aws_s3_meta_request *meta_request,
  28. struct aws_s3_request *request);
  29. static void s_s3_auto_ranged_put_request_finished(
  30. struct aws_s3_meta_request *meta_request,
  31. struct aws_s3_request *request,
  32. int error_code);
  33. static void s_s3_auto_ranged_put_send_request_finish(
  34. struct aws_s3_connection *connection,
  35. struct aws_http_stream *stream,
  36. int error_code);
  37. static int s_s3_auto_ranged_put_pause(
  38. struct aws_s3_meta_request *meta_request,
  39. struct aws_s3_meta_request_resume_token **resume_token);
  40. static bool s_process_part_info(const struct aws_s3_part_info *info, void *user_data) {
  41. struct aws_s3_auto_ranged_put *auto_ranged_put = user_data;
  42. struct aws_string *etag = aws_strip_quotes(auto_ranged_put->base.allocator, info->e_tag);
  43. const struct aws_byte_cursor *checksum_cur = NULL;
  44. switch (auto_ranged_put->base.checksum_config.checksum_algorithm) {
  45. case AWS_SCA_CRC32:
  46. checksum_cur = &info->checksumCRC32;
  47. break;
  48. case AWS_SCA_CRC32C:
  49. checksum_cur = &info->checksumCRC32C;
  50. break;
  51. case AWS_SCA_SHA1:
  52. checksum_cur = &info->checksumSHA1;
  53. break;
  54. case AWS_SCA_SHA256:
  55. checksum_cur = &info->checksumSHA256;
  56. break;
  57. case AWS_SCA_NONE:
  58. break;
  59. default:
  60. AWS_ASSERT(false);
  61. break;
  62. }
  63. if (checksum_cur) {
  64. aws_byte_buf_init_copy_from_cursor(
  65. &auto_ranged_put->encoded_checksum_list[info->part_number - 1],
  66. auto_ranged_put->base.allocator,
  67. *checksum_cur);
  68. }
  69. aws_array_list_set_at(&auto_ranged_put->synced_data.etag_list, &etag, info->part_number - 1);
  70. return true;
  71. }
  72. /*
  73. * Validates token and updates part variables. Noop if token is null.
  74. */
  75. static int s_try_update_part_info_from_resume_token(
  76. uint64_t content_length,
  77. const struct aws_s3_meta_request_resume_token *resume_token,
  78. size_t *out_part_size,
  79. uint32_t *out_total_num_parts) {
  80. if (!resume_token) {
  81. return AWS_OP_SUCCESS;
  82. }
  83. if (resume_token->type != AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
  84. AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Could not load persisted state. Invalid token type.");
  85. goto invalid_argument_cleanup;
  86. }
  87. if (resume_token->multipart_upload_id == NULL) {
  88. AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Could not load persisted state. Multipart upload id missing.");
  89. goto invalid_argument_cleanup;
  90. }
  91. if (resume_token->part_size < g_s3_min_upload_part_size) {
  92. AWS_LOGF_ERROR(
  93. AWS_LS_S3_META_REQUEST,
  94. "Could not create resume auto-ranged-put meta request; part size of %" PRIu64
  95. " specified in the token is below minimum threshold for multi-part.",
  96. (uint64_t)resume_token->part_size);
  97. goto invalid_argument_cleanup;
  98. }
  99. if ((uint32_t)resume_token->total_num_parts > g_s3_max_num_upload_parts) {
  100. AWS_LOGF_ERROR(
  101. AWS_LS_S3_META_REQUEST,
  102. "Could not create resume auto-ranged-put meta request; total number of parts %" PRIu32
  103. " specified in the token is too large for platform.",
  104. (uint32_t)resume_token->total_num_parts);
  105. goto invalid_argument_cleanup;
  106. }
  107. uint32_t num_parts = (uint32_t)(content_length / resume_token->part_size);
  108. if ((content_length % resume_token->part_size) > 0) {
  109. ++num_parts;
  110. }
  111. if (resume_token->total_num_parts != num_parts) {
  112. AWS_LOGF_ERROR(
  113. AWS_LS_S3_META_REQUEST,
  114. "Could not create auto-ranged-put meta request; persisted number of parts %zu"
  115. " does not match expected number of parts based on length of the body.",
  116. resume_token->total_num_parts);
  117. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  118. }
  119. *out_part_size = resume_token->part_size;
  120. *out_total_num_parts = (uint32_t)resume_token->total_num_parts;
  121. return AWS_OP_SUCCESS;
  122. invalid_argument_cleanup:
  123. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  124. }
  125. /**
  126. * Initializes state necessary to resume upload. Noop if token is null.
  127. */
  128. static int s_try_init_resume_state_from_persisted_data(
  129. struct aws_allocator *allocator,
  130. struct aws_s3_auto_ranged_put *auto_ranged_put,
  131. const struct aws_s3_meta_request_resume_token *resume_token) {
  132. if (resume_token == NULL) {
  133. auto_ranged_put->synced_data.list_parts_operation = NULL;
  134. auto_ranged_put->synced_data.list_parts_state.completed = true;
  135. auto_ranged_put->synced_data.list_parts_state.started = true;
  136. return AWS_OP_SUCCESS;
  137. }
  138. struct aws_byte_cursor request_path;
  139. if (aws_http_message_get_request_path(auto_ranged_put->base.initial_request_message, &request_path)) {
  140. AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Could not load persisted state. Request path could not be read.");
  141. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  142. }
  143. auto_ranged_put->synced_data.num_parts_sent = 0;
  144. auto_ranged_put->synced_data.num_parts_completed = 0;
  145. auto_ranged_put->synced_data.create_multipart_upload_sent = true;
  146. auto_ranged_put->synced_data.create_multipart_upload_completed = true;
  147. auto_ranged_put->upload_id = aws_string_clone_or_reuse(allocator, resume_token->multipart_upload_id);
  148. struct aws_s3_list_parts_params list_parts_params = {
  149. .key = request_path,
  150. .upload_id = aws_byte_cursor_from_string(auto_ranged_put->upload_id),
  151. .on_part = s_process_part_info,
  152. .user_data = auto_ranged_put,
  153. };
  154. auto_ranged_put->synced_data.list_parts_operation = aws_s3_list_parts_operation_new(allocator, &list_parts_params);
  155. struct aws_http_headers *needed_response_headers = aws_http_headers_new(allocator);
  156. const size_t copy_header_count = AWS_ARRAY_SIZE(s_create_multipart_upload_copy_headers);
  157. struct aws_http_headers *initial_headers =
  158. aws_http_message_get_headers(auto_ranged_put->base.initial_request_message);
  159. /* Copy headers that would have been used for create multi part from initial message, since create will never be
  160. * called in this flow */
  161. for (size_t header_index = 0; header_index < copy_header_count; ++header_index) {
  162. const struct aws_byte_cursor *header_name = &s_create_multipart_upload_copy_headers[header_index];
  163. struct aws_byte_cursor header_value;
  164. AWS_ZERO_STRUCT(header_value);
  165. if (aws_http_headers_get(initial_headers, *header_name, &header_value) == AWS_OP_SUCCESS) {
  166. aws_http_headers_set(needed_response_headers, *header_name, header_value);
  167. }
  168. }
  169. auto_ranged_put->synced_data.needed_response_headers = needed_response_headers;
  170. return AWS_OP_SUCCESS;
  171. }
  172. static struct aws_s3_meta_request_vtable s_s3_auto_ranged_put_vtable = {
  173. .update = s_s3_auto_ranged_put_update,
  174. .send_request_finish = s_s3_auto_ranged_put_send_request_finish,
  175. .prepare_request = s_s3_auto_ranged_put_prepare_request,
  176. .init_signing_date_time = aws_s3_meta_request_init_signing_date_time_default,
  177. .sign_request = aws_s3_meta_request_sign_request_default,
  178. .finished_request = s_s3_auto_ranged_put_request_finished,
  179. .destroy = s_s3_meta_request_auto_ranged_put_destroy,
  180. .finish = aws_s3_meta_request_finish_default,
  181. .pause = s_s3_auto_ranged_put_pause,
  182. };
  183. /* Allocate a new auto-ranged put meta request */
  184. struct aws_s3_meta_request *aws_s3_meta_request_auto_ranged_put_new(
  185. struct aws_allocator *allocator,
  186. struct aws_s3_client *client,
  187. size_t part_size,
  188. uint64_t content_length,
  189. uint32_t num_parts,
  190. const struct aws_s3_meta_request_options *options) {
  191. /* These should already have been validated by the caller. */
  192. AWS_PRECONDITION(allocator);
  193. AWS_PRECONDITION(client);
  194. AWS_PRECONDITION(options);
  195. AWS_PRECONDITION(options->message);
  196. AWS_PRECONDITION(aws_http_message_get_body_stream(options->message));
  197. if (s_try_update_part_info_from_resume_token(content_length, options->resume_token, &part_size, &num_parts)) {
  198. return NULL;
  199. }
  200. struct aws_s3_auto_ranged_put *auto_ranged_put =
  201. aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_auto_ranged_put));
  202. if (aws_s3_meta_request_init_base(
  203. allocator,
  204. client,
  205. part_size,
  206. client->compute_content_md5 == AWS_MR_CONTENT_MD5_ENABLED ||
  207. aws_http_headers_has(aws_http_message_get_headers(options->message), g_content_md5_header_name),
  208. options,
  209. auto_ranged_put,
  210. &s_s3_auto_ranged_put_vtable,
  211. &auto_ranged_put->base)) {
  212. aws_mem_release(allocator, auto_ranged_put);
  213. return NULL;
  214. }
  215. auto_ranged_put->content_length = content_length;
  216. auto_ranged_put->synced_data.total_num_parts = num_parts;
  217. auto_ranged_put->upload_id = NULL;
  218. auto_ranged_put->resume_token = options->resume_token;
  219. aws_s3_meta_request_resume_token_acquire(auto_ranged_put->resume_token);
  220. auto_ranged_put->threaded_update_data.next_part_number = 1;
  221. auto_ranged_put->prepare_data.num_parts_read_from_stream = 0;
  222. struct aws_string **etag_c_array = aws_mem_calloc(allocator, sizeof(struct aws_string *), num_parts);
  223. aws_array_list_init_static(
  224. &auto_ranged_put->synced_data.etag_list, etag_c_array, num_parts, sizeof(struct aws_string *));
  225. auto_ranged_put->encoded_checksum_list = aws_mem_calloc(allocator, sizeof(struct aws_byte_buf), num_parts);
  226. if (s_try_init_resume_state_from_persisted_data(allocator, auto_ranged_put, options->resume_token)) {
  227. goto error_clean_up;
  228. }
  229. AWS_LOGF_DEBUG(
  230. AWS_LS_S3_META_REQUEST, "id=%p Created new Auto-Ranged Put Meta Request.", (void *)&auto_ranged_put->base);
  231. return &auto_ranged_put->base;
  232. error_clean_up:
  233. aws_s3_meta_request_release(&auto_ranged_put->base);
  234. return NULL;
  235. }
  236. /* Destroy our auto-ranged put meta request */
  237. static void s_s3_meta_request_auto_ranged_put_destroy(struct aws_s3_meta_request *meta_request) {
  238. AWS_PRECONDITION(meta_request);
  239. AWS_PRECONDITION(meta_request->impl);
  240. struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
  241. aws_string_destroy(auto_ranged_put->upload_id);
  242. auto_ranged_put->upload_id = NULL;
  243. auto_ranged_put->resume_token = aws_s3_meta_request_resume_token_release(auto_ranged_put->resume_token);
  244. aws_s3_paginated_operation_release(auto_ranged_put->synced_data.list_parts_operation);
  245. for (size_t etag_index = 0; etag_index < auto_ranged_put->synced_data.total_num_parts; ++etag_index) {
  246. struct aws_string *etag = NULL;
  247. aws_array_list_get_at(&auto_ranged_put->synced_data.etag_list, &etag, etag_index);
  248. aws_string_destroy(etag);
  249. }
  250. aws_string_destroy(auto_ranged_put->synced_data.list_parts_continuation_token);
  251. for (size_t checksum_index = 0; checksum_index < auto_ranged_put->synced_data.total_num_parts; ++checksum_index) {
  252. aws_byte_buf_clean_up(&auto_ranged_put->encoded_checksum_list[checksum_index]);
  253. }
  254. aws_mem_release(meta_request->allocator, auto_ranged_put->synced_data.etag_list.data);
  255. aws_mem_release(meta_request->allocator, auto_ranged_put->encoded_checksum_list);
  256. aws_array_list_clean_up(&auto_ranged_put->synced_data.etag_list);
  257. aws_http_headers_release(auto_ranged_put->synced_data.needed_response_headers);
  258. aws_mem_release(meta_request->allocator, auto_ranged_put);
  259. }
  260. static bool s_s3_auto_ranged_put_update(
  261. struct aws_s3_meta_request *meta_request,
  262. uint32_t flags,
  263. struct aws_s3_request **out_request) {
  264. AWS_PRECONDITION(meta_request);
  265. AWS_PRECONDITION(out_request);
  266. struct aws_s3_request *request = NULL;
  267. bool work_remaining = false;
  268. struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
  269. /* BEGIN CRITICAL SECTION */
  270. {
  271. aws_s3_meta_request_lock_synced_data(meta_request);
  272. if (!aws_s3_meta_request_has_finish_result_synced(meta_request)) {
  273. /* If resuming and list part has not be sent, do it now. */
  274. if (!auto_ranged_put->synced_data.list_parts_state.started) {
  275. request = aws_s3_request_new(
  276. meta_request,
  277. AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_LIST_PARTS,
  278. 0,
  279. AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);
  280. auto_ranged_put->synced_data.list_parts_state.started = true;
  281. goto has_work_remaining;
  282. }
  283. if (auto_ranged_put->synced_data.list_parts_state.continues) {
  284. /* If list parts need to continue, send another list parts request. */
  285. AWS_ASSERT(auto_ranged_put->synced_data.list_parts_continuation_token != NULL);
  286. request = aws_s3_request_new(
  287. meta_request,
  288. AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_LIST_PARTS,
  289. 0,
  290. AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);
  291. auto_ranged_put->synced_data.list_parts_state.continues = false;
  292. goto has_work_remaining;
  293. }
  294. if (!auto_ranged_put->synced_data.list_parts_state.completed) {
  295. /* waiting on list parts to finish. */
  296. goto has_work_remaining;
  297. }
  298. /* If we haven't already sent a create-multipart-upload message, do so now. */
  299. if (!auto_ranged_put->synced_data.create_multipart_upload_sent) {
  300. request = aws_s3_request_new(
  301. meta_request,
  302. AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_CREATE_MULTIPART_UPLOAD,
  303. 0,
  304. AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);
  305. auto_ranged_put->synced_data.create_multipart_upload_sent = true;
  306. goto has_work_remaining;
  307. }
  308. /* If the create-multipart-upload message hasn't been completed, then there is still additional work to do,
  309. * but it can't be done yet. */
  310. if (!auto_ranged_put->synced_data.create_multipart_upload_completed) {
  311. goto has_work_remaining;
  312. }
  313. /* If we haven't sent all of the parts yet, then set up to send a new part now. */
  314. if (auto_ranged_put->synced_data.num_parts_sent < auto_ranged_put->synced_data.total_num_parts) {
  315. /* Check if the etag/checksum list has the result already */
  316. int part_index = auto_ranged_put->threaded_update_data.next_part_number - 1;
  317. for (size_t etag_index = part_index;
  318. etag_index < aws_array_list_length(&auto_ranged_put->synced_data.etag_list);
  319. ++etag_index) {
  320. struct aws_string *etag = NULL;
  321. if (!aws_array_list_get_at(&auto_ranged_put->synced_data.etag_list, &etag, etag_index) && etag) {
  322. /* part already downloaded, skip it here and prepare will take care of adjusting the buffer */
  323. ++auto_ranged_put->threaded_update_data.next_part_number;
  324. } else {
  325. // incomplete part found. break out and create request for it.
  326. break;
  327. }
  328. }
  329. // Something went really wrong. we still have parts to send, but have etags for all parts
  330. AWS_FATAL_ASSERT(
  331. auto_ranged_put->threaded_update_data.next_part_number <=
  332. auto_ranged_put->synced_data.total_num_parts);
  333. if ((flags & AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE) != 0) {
  334. uint32_t num_parts_in_flight =
  335. (auto_ranged_put->synced_data.num_parts_sent -
  336. auto_ranged_put->synced_data.num_parts_completed);
  337. /* Because uploads must read from their streams serially, we try to limit the amount of in flight
  338. * requests for a given multipart upload if we can. */
  339. if (num_parts_in_flight > 0) {
  340. goto has_work_remaining;
  341. }
  342. }
  343. /* Allocate a request for another part. */
  344. request = aws_s3_request_new(
  345. meta_request,
  346. AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART,
  347. 0,
  348. AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);
  349. request->part_number = auto_ranged_put->threaded_update_data.next_part_number;
  350. ++auto_ranged_put->threaded_update_data.next_part_number;
  351. ++auto_ranged_put->synced_data.num_parts_sent;
  352. AWS_LOGF_DEBUG(
  353. AWS_LS_S3_META_REQUEST,
  354. "id=%p: Returning request %p for part %d",
  355. (void *)meta_request,
  356. (void *)request,
  357. request->part_number);
  358. goto has_work_remaining;
  359. }
  360. /* There is one more request to send after all of the parts (the complete-multipart-upload) but it can't be
  361. * done until all of the parts have been completed.*/
  362. if (auto_ranged_put->synced_data.num_parts_completed != auto_ranged_put->synced_data.total_num_parts) {
  363. goto has_work_remaining;
  364. }
  365. /* If the complete-multipart-upload request hasn't been set yet, then send it now. */
  366. if (!auto_ranged_put->synced_data.complete_multipart_upload_sent) {
  367. request = aws_s3_request_new(
  368. meta_request,
  369. AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_COMPLETE_MULTIPART_UPLOAD,
  370. 0,
  371. AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);
  372. auto_ranged_put->synced_data.complete_multipart_upload_sent = true;
  373. goto has_work_remaining;
  374. }
  375. /* Wait for the complete-multipart-upload request to finish. */
  376. if (!auto_ranged_put->synced_data.complete_multipart_upload_completed) {
  377. goto has_work_remaining;
  378. }
  379. goto no_work_remaining;
  380. } else {
  381. /* If the create multipart upload hasn't been sent, then there is nothing left to do when canceling. */
  382. if (!auto_ranged_put->synced_data.create_multipart_upload_sent) {
  383. goto no_work_remaining;
  384. }
  385. /* If the create-multipart-upload request is still in flight, wait for it to finish. */
  386. if (!auto_ranged_put->synced_data.create_multipart_upload_completed) {
  387. goto has_work_remaining;
  388. }
  389. /* If the number of parts completed is less than the number of parts sent, then we need to wait until all of
  390. * those parts are done sending before aborting. */
  391. if (auto_ranged_put->synced_data.num_parts_completed < auto_ranged_put->synced_data.num_parts_sent) {
  392. goto has_work_remaining;
  393. }
  394. /* If the complete-multipart-upload is already in flight, then we can't necessarily send an abort. */
  395. if (auto_ranged_put->synced_data.complete_multipart_upload_sent &&
  396. !auto_ranged_put->synced_data.complete_multipart_upload_completed) {
  397. goto has_work_remaining;
  398. }
  399. /* If the upload was paused or resume failed, we don't abort the multipart upload. */
  400. if (meta_request->synced_data.finish_result.error_code == AWS_ERROR_S3_PAUSED ||
  401. meta_request->synced_data.finish_result.error_code == AWS_ERROR_S3_RESUME_FAILED) {
  402. goto no_work_remaining;
  403. }
  404. /* If the complete-multipart-upload completed successfully, then there is nothing to abort since the
  405. * transfer has already finished. */
  406. if (auto_ranged_put->synced_data.complete_multipart_upload_completed &&
  407. auto_ranged_put->synced_data.complete_multipart_upload_error_code == AWS_ERROR_SUCCESS) {
  408. goto no_work_remaining;
  409. }
  410. /* If we made it here, and the abort-multipart-upload message hasn't been sent yet, then do so now. */
  411. if (!auto_ranged_put->synced_data.abort_multipart_upload_sent) {
  412. if (auto_ranged_put->upload_id == NULL) {
  413. goto no_work_remaining;
  414. }
  415. if (auto_ranged_put->base.synced_data.finish_result.error_code == AWS_ERROR_SUCCESS) {
  416. /* Not sending abort when success even if we haven't sent complete MPU, in case we resume after MPU
  417. * already completed. */
  418. goto no_work_remaining;
  419. }
  420. request = aws_s3_request_new(
  421. meta_request,
  422. AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_ABORT_MULTIPART_UPLOAD,
  423. 0,
  424. AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_ALWAYS_SEND);
  425. auto_ranged_put->synced_data.abort_multipart_upload_sent = true;
  426. goto has_work_remaining;
  427. }
  428. /* Wait for the multipart upload to be completed. */
  429. if (!auto_ranged_put->synced_data.abort_multipart_upload_completed) {
  430. goto has_work_remaining;
  431. }
  432. goto no_work_remaining;
  433. }
  434. has_work_remaining:
  435. work_remaining = true;
  436. no_work_remaining:
  437. if (!work_remaining) {
  438. aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS);
  439. }
  440. aws_s3_meta_request_unlock_synced_data(meta_request);
  441. }
  442. /* END CRITICAL SECTION */
  443. if (work_remaining) {
  444. *out_request = request;
  445. } else {
  446. AWS_ASSERT(request == NULL);
  447. aws_s3_meta_request_finish(meta_request);
  448. }
  449. return work_remaining;
  450. }
  451. /**
  452. * Helper to compute request body size.
  453. * Basically returns either part size or if content is not equally divisible into parts, the size of the remaining last
  454. * part.
  455. */
  456. static size_t s_compute_request_body_size(struct aws_s3_meta_request *meta_request, uint32_t part_number) {
  457. AWS_PRECONDITION(meta_request);
  458. struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
  459. size_t request_body_size = meta_request->part_size;
  460. /* Last part--adjust size to match remaining content length. */
  461. if (part_number == auto_ranged_put->synced_data.total_num_parts) {
  462. size_t content_remainder = (size_t)(auto_ranged_put->content_length % (uint64_t)meta_request->part_size);
  463. if (content_remainder > 0) {
  464. request_body_size = content_remainder;
  465. }
  466. }
  467. return request_body_size;
  468. }
  469. static int s_verify_part_matches_checksum(
  470. struct aws_allocator *allocator,
  471. struct aws_byte_buf part_body,
  472. enum aws_s3_checksum_algorithm algorithm,
  473. struct aws_byte_buf part_checksum) {
  474. AWS_PRECONDITION(allocator);
  475. if (algorithm == AWS_SCA_NONE) {
  476. return AWS_OP_SUCCESS;
  477. }
  478. struct aws_byte_buf checksum;
  479. if (aws_byte_buf_init(&checksum, allocator, aws_get_digest_size_from_algorithm(algorithm))) {
  480. return AWS_OP_ERR;
  481. }
  482. struct aws_byte_buf encoded_checksum = {0};
  483. int return_status = AWS_OP_SUCCESS;
  484. struct aws_byte_cursor body_cur = aws_byte_cursor_from_buf(&part_body);
  485. size_t encoded_len = 0;
  486. if (aws_base64_compute_encoded_len(aws_get_digest_size_from_algorithm(algorithm), &encoded_len)) {
  487. AWS_LOGF_ERROR(
  488. AWS_LS_S3_META_REQUEST, "Failed to resume upload. Unable to determine length of encoded checksum.");
  489. return_status = aws_raise_error(AWS_ERROR_S3_RESUME_FAILED);
  490. goto on_done;
  491. }
  492. if (aws_checksum_compute(allocator, algorithm, &body_cur, &checksum, 0)) {
  493. AWS_LOGF_ERROR(
  494. AWS_LS_S3_META_REQUEST, "Failed to resume upload. Unable to compute checksum for the skipped part.");
  495. return_status = aws_raise_error(AWS_ERROR_S3_RESUME_FAILED);
  496. goto on_done;
  497. }
  498. if (aws_byte_buf_init(&encoded_checksum, allocator, encoded_len)) {
  499. AWS_LOGF_ERROR(
  500. AWS_LS_S3_META_REQUEST, "Failed to resume upload. Unable to allocate buffer for encoded checksum.");
  501. return_status = aws_raise_error(AWS_ERROR_S3_RESUME_FAILED);
  502. goto on_done;
  503. }
  504. struct aws_byte_cursor checksum_cur = aws_byte_cursor_from_buf(&checksum);
  505. if (aws_base64_encode(&checksum_cur, &encoded_checksum)) {
  506. AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Failed to resume upload. Unable to encode checksum.");
  507. return_status = aws_raise_error(AWS_ERROR_S3_RESUME_FAILED);
  508. goto on_done;
  509. }
  510. if (!aws_byte_buf_eq(&encoded_checksum, &part_checksum)) {
  511. AWS_LOGF_ERROR(
  512. AWS_LS_S3_META_REQUEST, "Failed to resume upload. Checksum for previously uploaded part does not match");
  513. return_status = aws_raise_error(AWS_ERROR_S3_RESUMED_PART_CHECKSUM_MISMATCH);
  514. goto on_done;
  515. }
  516. on_done:
  517. aws_byte_buf_clean_up(&checksum);
  518. aws_byte_buf_clean_up(&encoded_checksum);
  519. return return_status;
  520. }
  521. /**
  522. * Skips parts from input stream that were previously uploaded.
  523. * Assumes input stream has num_parts_read_from_stream specifying which part stream is on
  524. * and will read into temp buffer until it gets to skip_until_part_number (i.e. skipping does include
  525. * that part). If checksum is set on the request and parts with checksums were uploaded before, checksum will be
  526. * verified.
  527. */
  528. static int s_skip_parts_from_stream(
  529. struct aws_s3_meta_request *meta_request,
  530. uint32_t num_parts_read_from_stream,
  531. uint32_t skip_until_part_number) {
  532. AWS_PRECONDITION(meta_request);
  533. AWS_PRECONDITION(num_parts_read_from_stream <= skip_until_part_number);
  534. struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
  535. AWS_PRECONDITION(skip_until_part_number <= auto_ranged_put->synced_data.total_num_parts);
  536. if (num_parts_read_from_stream == skip_until_part_number) {
  537. return AWS_OP_SUCCESS;
  538. }
  539. struct aws_byte_buf temp_body_buf;
  540. if (aws_byte_buf_init(&temp_body_buf, meta_request->allocator, 0)) {
  541. return AWS_OP_ERR;
  542. }
  543. AWS_LOGF_DEBUG(
  544. AWS_LS_S3_META_REQUEST,
  545. "id=%p: Skipping parts %d through %d",
  546. (void *)meta_request,
  547. num_parts_read_from_stream,
  548. skip_until_part_number);
  549. int return_status = AWS_OP_SUCCESS;
  550. for (uint32_t part_index = num_parts_read_from_stream; part_index < skip_until_part_number; ++part_index) {
  551. size_t request_body_size = s_compute_request_body_size(meta_request, part_index + 1);
  552. if (temp_body_buf.capacity != request_body_size) {
  553. // reinit with correct size
  554. aws_byte_buf_clean_up(&temp_body_buf);
  555. if (aws_byte_buf_init(&temp_body_buf, meta_request->allocator, request_body_size)) {
  556. return AWS_OP_ERR;
  557. }
  558. } else {
  559. // reuse buffer
  560. aws_byte_buf_reset(&temp_body_buf, false);
  561. }
  562. if (aws_s3_meta_request_read_body(meta_request, &temp_body_buf)) {
  563. AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Failed to resume upload. Input steam cannot be read.");
  564. return_status = AWS_OP_ERR;
  565. goto on_done;
  566. }
  567. // compare skipped checksum to previously uploaded checksum
  568. if (auto_ranged_put->encoded_checksum_list[part_index].len > 0 &&
  569. s_verify_part_matches_checksum(
  570. meta_request->allocator,
  571. temp_body_buf,
  572. meta_request->checksum_config.checksum_algorithm,
  573. auto_ranged_put->encoded_checksum_list[part_index])) {
  574. return_status = AWS_OP_ERR;
  575. goto on_done;
  576. }
  577. }
  578. on_done:
  579. aws_byte_buf_clean_up(&temp_body_buf);
  580. return return_status;
  581. }
  582. /* Given a request, prepare it for sending based on its description. */
  583. static int s_s3_auto_ranged_put_prepare_request(
  584. struct aws_s3_meta_request *meta_request,
  585. struct aws_s3_request *request) {
  586. AWS_PRECONDITION(meta_request);
  587. struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
  588. AWS_PRECONDITION(auto_ranged_put);
  589. struct aws_http_message *message = NULL;
  590. switch (request->request_tag) {
  591. case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_LIST_PARTS: {
  592. int message_creation_result = AWS_OP_ERR;
  593. /* BEGIN CRITICAL SECTION */
  594. {
  595. aws_s3_meta_request_lock_synced_data(meta_request);
  596. if (auto_ranged_put->synced_data.list_parts_continuation_token) {
  597. AWS_LOGF_DEBUG(
  598. AWS_LS_S3_META_REQUEST,
  599. "id=%p ListParts for Multi-part Upload, with ID:%s, continues with token:%s.",
  600. (void *)meta_request,
  601. aws_string_c_str(auto_ranged_put->upload_id),
  602. aws_string_c_str(auto_ranged_put->synced_data.list_parts_continuation_token));
  603. struct aws_byte_cursor continuation_cur =
  604. aws_byte_cursor_from_string(auto_ranged_put->synced_data.list_parts_continuation_token);
  605. message_creation_result = aws_s3_construct_next_paginated_request_http_message(
  606. auto_ranged_put->synced_data.list_parts_operation, &continuation_cur, &message);
  607. } else {
  608. message_creation_result = aws_s3_construct_next_paginated_request_http_message(
  609. auto_ranged_put->synced_data.list_parts_operation, NULL, &message);
  610. }
  611. aws_s3_meta_request_unlock_synced_data(meta_request);
  612. }
  613. /* END CRITICAL SECTION */
  614. if (message_creation_result) {
  615. goto message_create_failed;
  616. }
  617. if (meta_request->checksum_config.checksum_algorithm == AWS_SCA_NONE) {
  618. /* We don't need to worry about the pre-calculated checksum from user as for multipart upload, only way
  619. * to calculate checksum for multipart upload is from client. */
  620. aws_s3_message_util_copy_headers(
  621. meta_request->initial_request_message,
  622. message,
  623. g_s3_list_parts_excluded_headers,
  624. g_s3_list_parts_excluded_headers_count,
  625. true);
  626. } else {
  627. aws_s3_message_util_copy_headers(
  628. meta_request->initial_request_message,
  629. message,
  630. g_s3_list_parts_with_checksum_excluded_headers,
  631. g_s3_list_parts_with_checksum_excluded_headers_count,
  632. true);
  633. }
  634. break;
  635. }
  636. case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_CREATE_MULTIPART_UPLOAD: {
  637. /* Create the message to create a new multipart upload. */
  638. message = aws_s3_create_multipart_upload_message_new(
  639. meta_request->allocator,
  640. meta_request->initial_request_message,
  641. meta_request->checksum_config.checksum_algorithm);
  642. break;
  643. }
  644. case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART: {
  645. size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number);
  646. if (request->num_times_prepared == 0) {
  647. if (s_skip_parts_from_stream(
  648. meta_request,
  649. auto_ranged_put->prepare_data.num_parts_read_from_stream,
  650. request->part_number - 1)) {
  651. goto message_create_failed;
  652. }
  653. auto_ranged_put->prepare_data.num_parts_read_from_stream = request->part_number - 1;
  654. aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size);
  655. if (aws_s3_meta_request_read_body(meta_request, &request->request_body)) {
  656. goto message_create_failed;
  657. }
  658. ++auto_ranged_put->prepare_data.num_parts_read_from_stream;
  659. }
  660. /* Create a new put-object message to upload a part. */
  661. message = aws_s3_upload_part_message_new(
  662. meta_request->allocator,
  663. meta_request->initial_request_message,
  664. &request->request_body,
  665. request->part_number,
  666. auto_ranged_put->upload_id,
  667. meta_request->should_compute_content_md5,
  668. &meta_request->checksum_config,
  669. &auto_ranged_put->encoded_checksum_list[request->part_number - 1]);
  670. break;
  671. }
  672. case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_COMPLETE_MULTIPART_UPLOAD: {
  673. if (request->num_times_prepared == 0) {
  674. /* Corner case of last part being previously uploaded during resume.
  675. * Read it from input stream and potentially verify checksum */
  676. if (s_skip_parts_from_stream(
  677. meta_request,
  678. auto_ranged_put->prepare_data.num_parts_read_from_stream,
  679. auto_ranged_put->synced_data.total_num_parts)) {
  680. goto message_create_failed;
  681. }
  682. auto_ranged_put->prepare_data.num_parts_read_from_stream = auto_ranged_put->synced_data.total_num_parts;
  683. aws_byte_buf_init(
  684. &request->request_body, meta_request->allocator, s_complete_multipart_upload_init_body_size_bytes);
  685. } else {
  686. aws_byte_buf_reset(&request->request_body, false);
  687. }
  688. /* BEGIN CRITICAL SECTION */
  689. {
  690. aws_s3_meta_request_lock_synced_data(meta_request);
  691. AWS_FATAL_ASSERT(auto_ranged_put->upload_id);
  692. AWS_ASSERT(request->request_body.capacity > 0);
  693. aws_byte_buf_reset(&request->request_body, false);
  694. /* Build the message to complete our multipart upload, which includes a payload describing all of
  695. * our completed parts. */
  696. message = aws_s3_complete_multipart_message_new(
  697. meta_request->allocator,
  698. meta_request->initial_request_message,
  699. &request->request_body,
  700. auto_ranged_put->upload_id,
  701. &auto_ranged_put->synced_data.etag_list,
  702. auto_ranged_put->encoded_checksum_list,
  703. meta_request->checksum_config.checksum_algorithm);
  704. aws_s3_meta_request_unlock_synced_data(meta_request);
  705. }
  706. /* END CRITICAL SECTION */
  707. break;
  708. }
  709. case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_ABORT_MULTIPART_UPLOAD: {
  710. AWS_FATAL_ASSERT(auto_ranged_put->upload_id);
  711. AWS_LOGF_DEBUG(
  712. AWS_LS_S3_META_REQUEST,
  713. "id=%p Abort multipart upload request for upload id %s.",
  714. (void *)meta_request,
  715. aws_string_c_str(auto_ranged_put->upload_id));
  716. if (request->num_times_prepared == 0) {
  717. aws_byte_buf_init(
  718. &request->request_body, meta_request->allocator, s_abort_multipart_upload_init_body_size_bytes);
  719. } else {
  720. aws_byte_buf_reset(&request->request_body, false);
  721. }
  722. /* Build the message to abort our multipart upload */
  723. message = aws_s3_abort_multipart_upload_message_new(
  724. meta_request->allocator, meta_request->initial_request_message, auto_ranged_put->upload_id);
  725. break;
  726. }
  727. }
  728. if (message == NULL) {
  729. AWS_LOGF_ERROR(
  730. AWS_LS_S3_META_REQUEST,
  731. "id=%p Could not allocate message for request with tag %d for auto-ranged-put meta request.",
  732. (void *)meta_request,
  733. request->request_tag);
  734. goto message_create_failed;
  735. }
  736. aws_s3_request_setup_send_data(request, message);
  737. aws_http_message_release(message);
  738. AWS_LOGF_DEBUG(
  739. AWS_LS_S3_META_REQUEST,
  740. "id=%p: Prepared request %p for part %d",
  741. (void *)meta_request,
  742. (void *)request,
  743. request->part_number);
  744. return AWS_OP_SUCCESS;
  745. message_create_failed:
  746. return AWS_OP_ERR;
  747. }
  748. /* Invoked before retry */
  749. static void s_s3_auto_ranged_put_send_request_finish(
  750. struct aws_s3_connection *connection,
  751. struct aws_http_stream *stream,
  752. int error_code) {
  753. struct aws_s3_request *request = connection->request;
  754. AWS_PRECONDITION(request);
  755. /* Request tag is different from different type of meta requests */
  756. switch (request->request_tag) {
  757. case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_COMPLETE_MULTIPART_UPLOAD: {
  758. /* For complete multipart upload, the server may return async error. */
  759. aws_s3_meta_request_send_request_finish_handle_async_error(connection, stream, error_code);
  760. break;
  761. }
  762. default:
  763. aws_s3_meta_request_send_request_finish_default(connection, stream, error_code);
  764. break;
  765. }
  766. }
  767. /* Invoked when no-retry will happen */
  768. static void s_s3_auto_ranged_put_request_finished(
  769. struct aws_s3_meta_request *meta_request,
  770. struct aws_s3_request *request,
  771. int error_code) {
  772. AWS_PRECONDITION(meta_request);
  773. AWS_PRECONDITION(meta_request->impl);
  774. AWS_PRECONDITION(request);
  775. struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
  776. switch (request->request_tag) {
  777. case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_LIST_PARTS: {
  778. /* BEGIN CRITICAL SECTION */
  779. {
  780. aws_s3_meta_request_lock_synced_data(meta_request);
  781. bool has_more_results = false;
  782. if (error_code == AWS_ERROR_SUCCESS) {
  783. struct aws_byte_cursor body_cursor = aws_byte_cursor_from_buf(&request->send_data.response_body);
  784. /* Clear the token before */
  785. aws_string_destroy(auto_ranged_put->synced_data.list_parts_continuation_token);
  786. auto_ranged_put->synced_data.list_parts_continuation_token = NULL;
  787. if (aws_s3_paginated_operation_on_response(
  788. auto_ranged_put->synced_data.list_parts_operation,
  789. &body_cursor,
  790. &auto_ranged_put->synced_data.list_parts_continuation_token,
  791. &has_more_results)) {
  792. AWS_LOGF_ERROR(
  793. AWS_LS_S3_META_REQUEST, "id=%p Failed to parse list parts response.", (void *)meta_request);
  794. error_code = AWS_ERROR_S3_LIST_PARTS_PARSE_FAILED;
  795. } else if (!has_more_results) {
  796. for (size_t etag_index = 0;
  797. etag_index < aws_array_list_length(&auto_ranged_put->synced_data.etag_list);
  798. etag_index++) {
  799. struct aws_string *etag = NULL;
  800. aws_array_list_get_at(&auto_ranged_put->synced_data.etag_list, &etag, etag_index);
  801. if (etag != NULL) {
  802. /* Update the number of parts sent/completed previously */
  803. ++auto_ranged_put->synced_data.num_parts_sent;
  804. ++auto_ranged_put->synced_data.num_parts_completed;
  805. }
  806. }
  807. AWS_LOGF_DEBUG(
  808. AWS_LS_S3_META_REQUEST,
  809. "id=%p: Resuming PutObject. %d out of %d parts have completed during previous request.",
  810. (void *)meta_request,
  811. auto_ranged_put->synced_data.num_parts_completed,
  812. auto_ranged_put->synced_data.total_num_parts);
  813. }
  814. }
  815. if (has_more_results) {
  816. /* If list parts has more result, make sure list parts continues */
  817. auto_ranged_put->synced_data.list_parts_state.continues = true;
  818. auto_ranged_put->synced_data.list_parts_state.completed = false;
  819. } else {
  820. /* No more result, complete the list parts */
  821. auto_ranged_put->synced_data.list_parts_state.continues = false;
  822. auto_ranged_put->synced_data.list_parts_state.completed = true;
  823. }
  824. auto_ranged_put->synced_data.list_parts_error_code = error_code;
  825. if (error_code != AWS_ERROR_SUCCESS) {
  826. if (request->send_data.response_status == AWS_HTTP_STATUS_CODE_404_NOT_FOUND &&
  827. auto_ranged_put->resume_token->num_parts_completed ==
  828. auto_ranged_put->resume_token->total_num_parts) {
  829. AWS_LOGF_DEBUG(
  830. AWS_LS_S3_META_REQUEST,
  831. "id=%p: Resuming PutObject ended early, since there is nothing to resume"
  832. "(request finished prior to being paused?)",
  833. (void *)meta_request);
  834. aws_s3_meta_request_set_success_synced(meta_request, AWS_S3_RESPONSE_STATUS_SUCCESS);
  835. } else {
  836. aws_s3_meta_request_set_fail_synced(meta_request, request, error_code);
  837. }
  838. }
  839. aws_s3_meta_request_unlock_synced_data(meta_request);
  840. }
  841. /* END CRITICAL SECTION */
  842. break;
  843. }
  844. case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_CREATE_MULTIPART_UPLOAD: {
  845. struct aws_http_headers *needed_response_headers = NULL;
  846. if (error_code == AWS_ERROR_SUCCESS) {
  847. needed_response_headers = aws_http_headers_new(meta_request->allocator);
  848. const size_t copy_header_count = AWS_ARRAY_SIZE(s_create_multipart_upload_copy_headers);
  849. /* Copy any headers now that we'll need for the final, transformed headers later. */
  850. for (size_t header_index = 0; header_index < copy_header_count; ++header_index) {
  851. const struct aws_byte_cursor *header_name = &s_create_multipart_upload_copy_headers[header_index];
  852. struct aws_byte_cursor header_value;
  853. AWS_ZERO_STRUCT(header_value);
  854. if (aws_http_headers_get(request->send_data.response_headers, *header_name, &header_value) ==
  855. AWS_OP_SUCCESS) {
  856. aws_http_headers_set(needed_response_headers, *header_name, header_value);
  857. }
  858. }
  859. struct aws_byte_cursor buffer_byte_cursor = aws_byte_cursor_from_buf(&request->send_data.response_body);
  860. /* Find the upload id for this multipart upload. */
  861. struct aws_string *upload_id =
  862. aws_xml_get_top_level_tag(meta_request->allocator, &s_upload_id, &buffer_byte_cursor);
  863. if (upload_id == NULL) {
  864. AWS_LOGF_ERROR(
  865. AWS_LS_S3_META_REQUEST,
  866. "id=%p Could not find upload-id in create-multipart-upload response",
  867. (void *)meta_request);
  868. aws_raise_error(AWS_ERROR_S3_MISSING_UPLOAD_ID);
  869. error_code = AWS_ERROR_S3_MISSING_UPLOAD_ID;
  870. } else {
  871. /* Store the multipart upload id. */
  872. auto_ranged_put->upload_id = upload_id;
  873. }
  874. }
  875. /* BEGIN CRITICAL SECTION */
  876. {
  877. aws_s3_meta_request_lock_synced_data(meta_request);
  878. AWS_ASSERT(auto_ranged_put->synced_data.needed_response_headers == NULL);
  879. auto_ranged_put->synced_data.needed_response_headers = needed_response_headers;
  880. auto_ranged_put->synced_data.create_multipart_upload_completed = true;
  881. auto_ranged_put->synced_data.list_parts_error_code = error_code;
  882. if (error_code != AWS_ERROR_SUCCESS) {
  883. aws_s3_meta_request_set_fail_synced(meta_request, request, error_code);
  884. }
  885. aws_s3_meta_request_unlock_synced_data(meta_request);
  886. }
  887. /* END CRITICAL SECTION */
  888. break;
  889. }
  890. case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART: {
  891. size_t part_number = request->part_number;
  892. AWS_FATAL_ASSERT(part_number > 0);
  893. size_t part_index = part_number - 1;
  894. struct aws_string *etag = NULL;
  895. if (error_code == AWS_ERROR_SUCCESS) {
  896. /* Find the ETag header if it exists and cache it. */
  897. struct aws_byte_cursor etag_within_quotes;
  898. AWS_ASSERT(request->send_data.response_headers);
  899. if (aws_http_headers_get(
  900. request->send_data.response_headers, g_etag_header_name, &etag_within_quotes) !=
  901. AWS_OP_SUCCESS) {
  902. AWS_LOGF_ERROR(
  903. AWS_LS_S3_META_REQUEST,
  904. "id=%p Could not find ETag header for request %p",
  905. (void *)meta_request,
  906. (void *)request);
  907. error_code = AWS_ERROR_S3_MISSING_ETAG;
  908. } else {
  909. /* The ETag value arrives in quotes, but we don't want it in quotes when we send it back up
  910. * later, so just get rid of the quotes now. */
  911. etag = aws_strip_quotes(meta_request->allocator, etag_within_quotes);
  912. }
  913. }
  914. if (error_code == AWS_ERROR_SUCCESS && meta_request->progress_callback != NULL) {
  915. struct aws_s3_meta_request_progress progress = {
  916. .bytes_transferred = meta_request->part_size,
  917. .content_length = auto_ranged_put->content_length,
  918. };
  919. meta_request->progress_callback(meta_request, &progress, meta_request->user_data);
  920. }
  921. /* BEGIN CRITICAL SECTION */
  922. {
  923. aws_s3_meta_request_lock_synced_data(meta_request);
  924. ++auto_ranged_put->synced_data.num_parts_completed;
  925. AWS_LOGF_DEBUG(
  926. AWS_LS_S3_META_REQUEST,
  927. "id=%p: %d out of %d parts have completed.",
  928. (void *)meta_request,
  929. auto_ranged_put->synced_data.num_parts_completed,
  930. auto_ranged_put->synced_data.total_num_parts);
  931. if (error_code == AWS_ERROR_SUCCESS) {
  932. AWS_ASSERT(etag != NULL);
  933. ++auto_ranged_put->synced_data.num_parts_successful;
  934. /* ETags need to be associated with their part number, so we keep the etag indices consistent with
  935. * part numbers. This means we may have to add padding to the list in the case that parts finish out
  936. * of order. */
  937. aws_array_list_set_at(&auto_ranged_put->synced_data.etag_list, &etag, part_index);
  938. } else {
  939. ++auto_ranged_put->synced_data.num_parts_failed;
  940. aws_s3_meta_request_set_fail_synced(meta_request, request, error_code);
  941. }
  942. aws_s3_meta_request_unlock_synced_data(meta_request);
  943. }
  944. /* END CRITICAL SECTION */
  945. break;
  946. }
  947. case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_COMPLETE_MULTIPART_UPLOAD: {
  948. if (error_code == AWS_ERROR_SUCCESS && meta_request->headers_callback != NULL) {
  949. struct aws_http_headers *final_response_headers = aws_http_headers_new(meta_request->allocator);
  950. /* Copy all the response headers from this request. */
  951. copy_http_headers(request->send_data.response_headers, final_response_headers);
  952. /* Copy over any response headers that we've previously determined are needed for this final
  953. * response.
  954. */
  955. /* BEGIN CRITICAL SECTION */
  956. {
  957. aws_s3_meta_request_lock_synced_data(meta_request);
  958. copy_http_headers(auto_ranged_put->synced_data.needed_response_headers, final_response_headers);
  959. aws_s3_meta_request_unlock_synced_data(meta_request);
  960. }
  961. /* END CRITICAL SECTION */
  962. struct aws_byte_cursor response_body_cursor =
  963. aws_byte_cursor_from_buf(&request->send_data.response_body);
  964. /**
  965. * TODO: The body of the response can be ERROR, check Error specified in body part from
  966. * https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#AmazonS3-CompleteMultipartUpload-response-CompleteMultipartUploadOutput
  967. * We need to handle this case.
  968. * TODO: the checksum returned within the response of complete multipart upload need to be exposed?
  969. */
  970. /* Grab the ETag for the entire object, and set it as a header. */
  971. struct aws_string *etag_header_value =
  972. aws_xml_get_top_level_tag(meta_request->allocator, &g_etag_header_name, &response_body_cursor);
  973. if (etag_header_value != NULL) {
  974. struct aws_byte_buf etag_header_value_byte_buf;
  975. AWS_ZERO_STRUCT(etag_header_value_byte_buf);
  976. replace_quote_entities(meta_request->allocator, etag_header_value, &etag_header_value_byte_buf);
  977. aws_http_headers_set(
  978. final_response_headers,
  979. g_etag_header_name,
  980. aws_byte_cursor_from_buf(&etag_header_value_byte_buf));
  981. aws_string_destroy(etag_header_value);
  982. aws_byte_buf_clean_up(&etag_header_value_byte_buf);
  983. }
  984. /* Notify the user of the headers. */
  985. if (meta_request->headers_callback(
  986. meta_request,
  987. final_response_headers,
  988. request->send_data.response_status,
  989. meta_request->user_data)) {
  990. error_code = aws_last_error_or_unknown();
  991. }
  992. meta_request->headers_callback = NULL;
  993. aws_http_headers_release(final_response_headers);
  994. }
  995. /* BEGIN CRITICAL SECTION */
  996. {
  997. aws_s3_meta_request_lock_synced_data(meta_request);
  998. auto_ranged_put->synced_data.complete_multipart_upload_completed = true;
  999. auto_ranged_put->synced_data.complete_multipart_upload_error_code = error_code;
  1000. if (error_code != AWS_ERROR_SUCCESS) {
  1001. aws_s3_meta_request_set_fail_synced(meta_request, request, error_code);
  1002. }
  1003. aws_s3_meta_request_unlock_synced_data(meta_request);
  1004. }
  1005. /* END CRITICAL SECTION */
  1006. break;
  1007. }
  1008. case AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_ABORT_MULTIPART_UPLOAD: {
  1009. /* BEGIN CRITICAL SECTION */
  1010. {
  1011. aws_s3_meta_request_lock_synced_data(meta_request);
  1012. auto_ranged_put->synced_data.abort_multipart_upload_error_code = error_code;
  1013. auto_ranged_put->synced_data.abort_multipart_upload_completed = true;
  1014. aws_s3_meta_request_unlock_synced_data(meta_request);
  1015. }
  1016. /* END CRITICAL SECTION */
  1017. break;
  1018. }
  1019. }
  1020. }
  1021. static int s_s3_auto_ranged_put_pause(
  1022. struct aws_s3_meta_request *meta_request,
  1023. struct aws_s3_meta_request_resume_token **out_resume_token) {
  1024. *out_resume_token = NULL;
  1025. /* lock */
  1026. aws_s3_meta_request_lock_synced_data(meta_request);
  1027. struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
  1028. AWS_LOGF_DEBUG(
  1029. AWS_LS_S3_META_REQUEST,
  1030. "id=%p: Pausing request with %u out of %u parts have completed.",
  1031. (void *)meta_request,
  1032. auto_ranged_put->synced_data.num_parts_completed,
  1033. auto_ranged_put->synced_data.total_num_parts);
  1034. /* upload can be in one of several states:
  1035. * - not started, i.e. we didn't even call crete mpu yet - return success,
  1036. * token is NULL and cancel the upload
  1037. * - in the middle of upload - return success, create token and cancel
  1038. * upload
  1039. * - complete MPU started - return success, generate token and try to cancel
  1040. * complete MPU
  1041. */
  1042. if (auto_ranged_put->synced_data.create_multipart_upload_completed) {
  1043. *out_resume_token = aws_s3_meta_request_resume_token_new(meta_request->allocator);
  1044. (*out_resume_token)->type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT;
  1045. (*out_resume_token)->multipart_upload_id =
  1046. aws_string_clone_or_reuse(meta_request->allocator, auto_ranged_put->upload_id);
  1047. (*out_resume_token)->part_size = meta_request->part_size;
  1048. (*out_resume_token)->total_num_parts = auto_ranged_put->synced_data.total_num_parts;
  1049. (*out_resume_token)->num_parts_completed = auto_ranged_put->synced_data.num_parts_completed;
  1050. }
  1051. /**
  1052. * Cancels the meta request using the PAUSED flag to avoid deletion of uploaded parts.
  1053. * This allows the client to resume the upload later, setting the persistable state in the meta request options.
  1054. */
  1055. aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_PAUSED);
  1056. /* unlock */
  1057. aws_s3_meta_request_unlock_synced_data(meta_request);
  1058. return AWS_OP_SUCCESS;
  1059. }