s3_chunk_stream.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include "aws/s3/private/s3_checksums.h"
  6. #include <aws/common/encoding.h>
  7. #include <aws/common/string.h>
  8. #include <aws/io/stream.h>
  9. #include <inttypes.h>
  10. AWS_STATIC_STRING_FROM_LITERAL(s_carriage_return, "\r\n");
  11. AWS_STATIC_STRING_FROM_LITERAL(s_empty_chunk, "0\r\n");
  12. AWS_STATIC_STRING_FROM_LITERAL(s_final_chunk, "\r\n0\r\n");
  13. AWS_STATIC_STRING_FROM_LITERAL(s_colon, ":");
  14. AWS_STATIC_STRING_FROM_LITERAL(s_post_trailer, "\r\n\r\n");
  15. struct aws_chunk_stream;
  16. typedef int(set_stream_fn)(struct aws_chunk_stream *parent_stream);
  17. struct aws_chunk_stream {
  18. struct aws_input_stream base;
  19. struct aws_allocator *allocator;
  20. /* aws_input_stream_byte_cursor provides our actual functionality */
  21. /* Pointing to the stream we read from */
  22. struct aws_input_stream *current_stream;
  23. struct aws_input_stream *checksum_stream;
  24. struct aws_byte_buf checksum_result;
  25. struct aws_byte_buf *checksum_result_output;
  26. struct aws_byte_buf pre_chunk_buffer;
  27. struct aws_byte_buf post_chunk_buffer;
  28. const struct aws_byte_cursor *checksum_header_name;
  29. int64_t length;
  30. set_stream_fn *set_current_stream_fn;
  31. };
  32. static int s_set_null_stream(struct aws_chunk_stream *parent_stream) {
  33. aws_input_stream_release(parent_stream->current_stream);
  34. parent_stream->current_stream = NULL;
  35. parent_stream->set_current_stream_fn = NULL;
  36. aws_byte_buf_clean_up(&parent_stream->post_chunk_buffer);
  37. return AWS_OP_SUCCESS;
  38. }
  39. static int s_set_post_chunk_stream(struct aws_chunk_stream *parent_stream) {
  40. int64_t current_stream_length;
  41. if (aws_input_stream_get_length(parent_stream->current_stream, &current_stream_length)) {
  42. aws_input_stream_release(parent_stream->current_stream);
  43. return AWS_OP_ERR;
  44. }
  45. aws_input_stream_release(parent_stream->current_stream);
  46. struct aws_byte_cursor final_chunk_cursor;
  47. if (current_stream_length > 0) {
  48. final_chunk_cursor = aws_byte_cursor_from_string(s_final_chunk);
  49. } else {
  50. final_chunk_cursor = aws_byte_cursor_from_string(s_empty_chunk);
  51. }
  52. struct aws_byte_cursor post_trailer_cursor = aws_byte_cursor_from_string(s_post_trailer);
  53. struct aws_byte_cursor colon_cursor = aws_byte_cursor_from_string(s_colon);
  54. if (parent_stream->checksum_result.len == 0) {
  55. AWS_LOGF_ERROR(AWS_LS_S3_META_REQUEST, "Failed to extract base64 encoded checksum of stream");
  56. return aws_raise_error(AWS_ERROR_S3_CHECKSUM_CALCULATION_FAILED);
  57. }
  58. struct aws_byte_cursor checksum_result_cursor = aws_byte_cursor_from_buf(&parent_stream->checksum_result);
  59. if (parent_stream->checksum_result_output &&
  60. aws_byte_buf_init_copy_from_cursor(
  61. parent_stream->checksum_result_output, aws_default_allocator(), checksum_result_cursor)) {
  62. return AWS_OP_ERR;
  63. }
  64. if (aws_byte_buf_init(
  65. &parent_stream->post_chunk_buffer,
  66. aws_default_allocator(),
  67. final_chunk_cursor.len + parent_stream->checksum_header_name->len + colon_cursor.len +
  68. checksum_result_cursor.len + post_trailer_cursor.len)) {
  69. goto error;
  70. }
  71. if (aws_byte_buf_append(&parent_stream->post_chunk_buffer, &final_chunk_cursor) ||
  72. aws_byte_buf_append(&parent_stream->post_chunk_buffer, parent_stream->checksum_header_name) ||
  73. aws_byte_buf_append(&parent_stream->post_chunk_buffer, &colon_cursor) ||
  74. aws_byte_buf_append(&parent_stream->post_chunk_buffer, &checksum_result_cursor) ||
  75. aws_byte_buf_append(&parent_stream->post_chunk_buffer, &post_trailer_cursor)) {
  76. goto error;
  77. }
  78. struct aws_byte_cursor post_chunk_cursor = aws_byte_cursor_from_buf(&parent_stream->post_chunk_buffer);
  79. parent_stream->current_stream = aws_input_stream_new_from_cursor(aws_default_allocator(), &post_chunk_cursor);
  80. parent_stream->set_current_stream_fn = s_set_null_stream;
  81. return AWS_OP_SUCCESS;
  82. error:
  83. aws_byte_buf_clean_up(parent_stream->checksum_result_output);
  84. aws_byte_buf_clean_up(&parent_stream->post_chunk_buffer);
  85. return AWS_OP_ERR;
  86. }
  87. static int s_set_chunk_stream(struct aws_chunk_stream *parent_stream) {
  88. aws_input_stream_release(parent_stream->current_stream);
  89. parent_stream->current_stream = parent_stream->checksum_stream;
  90. aws_byte_buf_clean_up(&parent_stream->pre_chunk_buffer);
  91. parent_stream->checksum_stream = NULL;
  92. parent_stream->set_current_stream_fn = s_set_post_chunk_stream;
  93. return AWS_OP_SUCCESS;
  94. }
  95. static int s_aws_input_chunk_stream_seek(
  96. struct aws_input_stream *stream,
  97. int64_t offset,
  98. enum aws_stream_seek_basis basis) {
  99. (void)stream;
  100. (void)offset;
  101. (void)basis;
  102. AWS_LOGF_ERROR(
  103. AWS_LS_S3_CLIENT,
  104. "Cannot seek on chunk stream, as it will cause the checksum output to mismatch the checksum of the stream"
  105. "contents");
  106. AWS_ASSERT(false);
  107. return aws_raise_error(AWS_ERROR_UNSUPPORTED_OPERATION);
  108. }
  109. static int s_aws_input_chunk_stream_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
  110. struct aws_chunk_stream *impl = AWS_CONTAINER_OF(stream, struct aws_chunk_stream, base);
  111. struct aws_stream_status status;
  112. AWS_ZERO_STRUCT(status);
  113. while (impl->current_stream != NULL && dest->len < dest->capacity) {
  114. int err = aws_input_stream_read(impl->current_stream, dest);
  115. if (err) {
  116. return err;
  117. }
  118. if (aws_input_stream_get_status(impl->current_stream, &status)) {
  119. return AWS_OP_ERR;
  120. }
  121. if (status.is_end_of_stream && impl->set_current_stream_fn(impl)) {
  122. return AWS_OP_ERR;
  123. }
  124. }
  125. return AWS_OP_SUCCESS;
  126. }
  127. static int s_aws_input_chunk_stream_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
  128. struct aws_chunk_stream *impl = AWS_CONTAINER_OF(stream, struct aws_chunk_stream, base);
  129. if (impl->current_stream == NULL) {
  130. status->is_end_of_stream = true;
  131. status->is_valid = true;
  132. return AWS_OP_SUCCESS;
  133. }
  134. int res = aws_input_stream_get_status(impl->current_stream, status);
  135. if (res != AWS_OP_SUCCESS) {
  136. /* Only when the current_stream is NULL, it is end of stream, as the current stream will be updated to feed to
  137. * data */
  138. status->is_end_of_stream = false;
  139. }
  140. return res;
  141. }
  142. static int s_aws_input_chunk_stream_get_length(struct aws_input_stream *stream, int64_t *out_length) {
  143. struct aws_chunk_stream *impl = AWS_CONTAINER_OF(stream, struct aws_chunk_stream, base);
  144. *out_length = impl->length;
  145. return AWS_OP_SUCCESS;
  146. }
  147. static void s_aws_input_chunk_stream_destroy(struct aws_chunk_stream *impl) {
  148. if (impl) {
  149. if (impl->current_stream) {
  150. aws_input_stream_release(impl->current_stream);
  151. }
  152. if (impl->checksum_stream) {
  153. aws_input_stream_release(impl->checksum_stream);
  154. }
  155. aws_byte_buf_clean_up(&impl->pre_chunk_buffer);
  156. aws_byte_buf_clean_up(&impl->checksum_result);
  157. aws_byte_buf_clean_up(&impl->post_chunk_buffer);
  158. aws_mem_release(impl->allocator, impl);
  159. }
  160. }
  161. static struct aws_input_stream_vtable s_aws_input_chunk_stream_vtable = {
  162. .seek = s_aws_input_chunk_stream_seek,
  163. .read = s_aws_input_chunk_stream_read,
  164. .get_status = s_aws_input_chunk_stream_get_status,
  165. .get_length = s_aws_input_chunk_stream_get_length,
  166. };
  167. struct aws_input_stream *aws_chunk_stream_new(
  168. struct aws_allocator *allocator,
  169. struct aws_input_stream *existing_stream,
  170. enum aws_s3_checksum_algorithm algorithm,
  171. struct aws_byte_buf *checksum_output) {
  172. struct aws_chunk_stream *impl = aws_mem_calloc(allocator, 1, sizeof(struct aws_chunk_stream));
  173. impl->allocator = allocator;
  174. impl->base.vtable = &s_aws_input_chunk_stream_vtable;
  175. impl->checksum_result_output = checksum_output;
  176. int64_t stream_length = 0;
  177. int64_t final_chunk_len = 0;
  178. if (aws_input_stream_get_length(existing_stream, &stream_length)) {
  179. goto error;
  180. }
  181. struct aws_byte_cursor pre_chunk_cursor = aws_byte_cursor_from_string(s_carriage_return);
  182. char stream_length_string[32];
  183. AWS_ZERO_ARRAY(stream_length_string);
  184. snprintf(stream_length_string, AWS_ARRAY_SIZE(stream_length_string), "%" PRIX64, stream_length);
  185. struct aws_string *stream_length_aws_string = aws_string_new_from_c_str(allocator, stream_length_string);
  186. struct aws_byte_cursor stream_length_cursor = aws_byte_cursor_from_string(stream_length_aws_string);
  187. if (aws_byte_buf_init(&impl->pre_chunk_buffer, allocator, stream_length_cursor.len + pre_chunk_cursor.len)) {
  188. goto error;
  189. }
  190. if (aws_byte_buf_append(&impl->pre_chunk_buffer, &stream_length_cursor)) {
  191. goto error;
  192. }
  193. aws_string_destroy(stream_length_aws_string);
  194. if (aws_byte_buf_append(&impl->pre_chunk_buffer, &pre_chunk_cursor)) {
  195. goto error;
  196. }
  197. size_t checksum_len = aws_get_digest_size_from_algorithm(algorithm);
  198. size_t encoded_checksum_len = 0;
  199. if (aws_base64_compute_encoded_len(checksum_len, &encoded_checksum_len)) {
  200. goto error;
  201. }
  202. if (aws_byte_buf_init(&impl->checksum_result, allocator, encoded_checksum_len)) {
  203. goto error;
  204. }
  205. impl->checksum_stream = aws_checksum_stream_new(allocator, existing_stream, algorithm, &impl->checksum_result);
  206. if (impl->checksum_stream == NULL) {
  207. goto error;
  208. }
  209. int64_t prechunk_stream_len = 0;
  210. int64_t colon_len = s_colon->len;
  211. int64_t post_trailer_len = s_post_trailer->len;
  212. struct aws_byte_cursor complete_pre_chunk_cursor = aws_byte_cursor_from_buf(&impl->pre_chunk_buffer);
  213. if (stream_length > 0) {
  214. impl->current_stream = aws_input_stream_new_from_cursor(allocator, &complete_pre_chunk_cursor);
  215. final_chunk_len = s_final_chunk->len;
  216. if (impl->current_stream == NULL) {
  217. goto error;
  218. }
  219. impl->set_current_stream_fn = s_set_chunk_stream;
  220. } else {
  221. impl->current_stream = impl->checksum_stream;
  222. final_chunk_len = s_empty_chunk->len;
  223. impl->checksum_stream = NULL;
  224. impl->set_current_stream_fn = s_set_post_chunk_stream;
  225. }
  226. impl->checksum_header_name = aws_get_http_header_name_from_algorithm(algorithm);
  227. if (aws_input_stream_get_length(impl->current_stream, &prechunk_stream_len)) {
  228. goto error;
  229. }
  230. /* we subtract one since aws_base64_compute_encoded_len accounts for the null terminator which won't show up in our
  231. * stream */
  232. impl->length = prechunk_stream_len + stream_length + final_chunk_len + impl->checksum_header_name->len + colon_len +
  233. encoded_checksum_len + post_trailer_len - 1;
  234. AWS_ASSERT(impl->current_stream);
  235. aws_ref_count_init(&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_chunk_stream_destroy);
  236. return &impl->base;
  237. error:
  238. aws_input_stream_release(impl->checksum_stream);
  239. aws_input_stream_release(impl->current_stream);
  240. aws_byte_buf_clean_up(&impl->pre_chunk_buffer);
  241. aws_byte_buf_clean_up(&impl->checksum_result);
  242. aws_mem_release(impl->allocator, impl);
  243. return NULL;
  244. }