stream.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/io/stream.h>
  6. #include <aws/common/file.h>
  7. #include <aws/io/file_utils.h>
  8. #include <errno.h>
  9. int aws_input_stream_seek(struct aws_input_stream *stream, int64_t offset, enum aws_stream_seek_basis basis) {
  10. AWS_ASSERT(stream && stream->vtable && stream->vtable->seek);
  11. return stream->vtable->seek(stream, offset, basis);
  12. }
  13. int aws_input_stream_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
  14. AWS_ASSERT(stream && stream->vtable && stream->vtable->read);
  15. AWS_ASSERT(dest);
  16. AWS_ASSERT(dest->len <= dest->capacity);
  17. /* Deal with this edge case here, instead of relying on every implementation to do it right. */
  18. if (dest->capacity == dest->len) {
  19. return AWS_OP_SUCCESS;
  20. }
  21. /* Prevent implementations from accidentally overwriting existing data in the buffer.
  22. * Hand them a "safe" buffer that starts where the existing data ends. */
  23. const void *safe_buf_start = dest->buffer + dest->len;
  24. const size_t safe_buf_capacity = dest->capacity - dest->len;
  25. struct aws_byte_buf safe_buf = aws_byte_buf_from_empty_array(safe_buf_start, safe_buf_capacity);
  26. int read_result = stream->vtable->read(stream, &safe_buf);
  27. /* Ensure the implementation did not commit forbidden acts upon the buffer */
  28. AWS_FATAL_ASSERT(
  29. (safe_buf.buffer == safe_buf_start) && (safe_buf.capacity == safe_buf_capacity) &&
  30. (safe_buf.len <= safe_buf_capacity));
  31. if (read_result == AWS_OP_SUCCESS) {
  32. /* Update the actual buffer */
  33. dest->len += safe_buf.len;
  34. }
  35. return read_result;
  36. }
  37. int aws_input_stream_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
  38. AWS_ASSERT(stream && stream->vtable && stream->vtable->get_status);
  39. return stream->vtable->get_status(stream, status);
  40. }
  41. int aws_input_stream_get_length(struct aws_input_stream *stream, int64_t *out_length) {
  42. AWS_ASSERT(stream && stream->vtable && stream->vtable->get_length);
  43. return stream->vtable->get_length(stream, out_length);
  44. }
  45. /*
  46. * cursor stream implementation
  47. */
  48. struct aws_input_stream_byte_cursor_impl {
  49. struct aws_input_stream base;
  50. struct aws_allocator *allocator;
  51. struct aws_byte_cursor original_cursor;
  52. struct aws_byte_cursor current_cursor;
  53. };
  54. /*
  55. * This is an ugly function that, in the absence of better guidance, is designed to handle all possible combinations of
  56. * size_t (uint32_t, uint64_t). If size_t ever exceeds 64 bits this function will fail badly.
  57. *
  58. * Safety and invariant assumptions are sprinkled via comments. The overall strategy is to cast up to 64 bits and
  59. * perform all arithmetic there, being careful with signed vs. unsigned to prevent bad operations.
  60. *
  61. * Assumption #1: size_t resolves to an unsigned integer 64 bits or smaller
  62. */
  63. AWS_STATIC_ASSERT(sizeof(size_t) <= 8);
  64. static int s_aws_input_stream_byte_cursor_seek(
  65. struct aws_input_stream *stream,
  66. int64_t offset,
  67. enum aws_stream_seek_basis basis) {
  68. struct aws_input_stream_byte_cursor_impl *impl =
  69. AWS_CONTAINER_OF(stream, struct aws_input_stream_byte_cursor_impl, base);
  70. uint64_t final_offset = 0;
  71. switch (basis) {
  72. case AWS_SSB_BEGIN:
  73. /*
  74. * (uint64_t)offset -- safe by virtue of the earlier is-negative check
  75. * (uint64_t)impl->original_cursor.len -- safe via assumption 1
  76. */
  77. if (offset < 0 || (uint64_t)offset > (uint64_t)impl->original_cursor.len) {
  78. return aws_raise_error(AWS_IO_STREAM_INVALID_SEEK_POSITION);
  79. }
  80. /* safe because negative offsets were turned into an error */
  81. final_offset = (uint64_t)offset;
  82. break;
  83. case AWS_SSB_END:
  84. /*
  85. * -offset -- safe as long offset is not INT64_MIN which was previously checked
  86. * (uint64_t)(-offset) -- safe because (-offset) is positive (and < INT64_MAX < UINT64_MAX)
  87. */
  88. if (offset > 0 || offset == INT64_MIN || (uint64_t)(-offset) > (uint64_t)impl->original_cursor.len) {
  89. return aws_raise_error(AWS_IO_STREAM_INVALID_SEEK_POSITION);
  90. }
  91. /* cases that would make this unsafe became errors with previous conditional */
  92. final_offset = (uint64_t)impl->original_cursor.len - (uint64_t)(-offset);
  93. break;
  94. default:
  95. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  96. }
  97. /* true because we already validated against (impl->original_cursor.len) which is <= SIZE_MAX */
  98. AWS_ASSERT(final_offset <= SIZE_MAX);
  99. /* safe via previous assert */
  100. size_t final_offset_sz = (size_t)final_offset;
  101. /* sanity */
  102. AWS_ASSERT(final_offset_sz <= impl->original_cursor.len);
  103. /* reset current_cursor to new position */
  104. impl->current_cursor = impl->original_cursor;
  105. impl->current_cursor.ptr += final_offset_sz;
  106. impl->current_cursor.len -= final_offset_sz;
  107. return AWS_OP_SUCCESS;
  108. }
  109. static int s_aws_input_stream_byte_cursor_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
  110. struct aws_input_stream_byte_cursor_impl *impl =
  111. AWS_CONTAINER_OF(stream, struct aws_input_stream_byte_cursor_impl, base);
  112. size_t actually_read = dest->capacity - dest->len;
  113. if (actually_read > impl->current_cursor.len) {
  114. actually_read = impl->current_cursor.len;
  115. }
  116. if (!aws_byte_buf_write(dest, impl->current_cursor.ptr, actually_read)) {
  117. return aws_raise_error(AWS_IO_STREAM_READ_FAILED);
  118. }
  119. aws_byte_cursor_advance(&impl->current_cursor, actually_read);
  120. return AWS_OP_SUCCESS;
  121. }
  122. static int s_aws_input_stream_byte_cursor_get_status(
  123. struct aws_input_stream *stream,
  124. struct aws_stream_status *status) {
  125. struct aws_input_stream_byte_cursor_impl *impl =
  126. AWS_CONTAINER_OF(stream, struct aws_input_stream_byte_cursor_impl, base);
  127. status->is_end_of_stream = impl->current_cursor.len == 0;
  128. status->is_valid = true;
  129. return AWS_OP_SUCCESS;
  130. }
  131. static int s_aws_input_stream_byte_cursor_get_length(struct aws_input_stream *stream, int64_t *out_length) {
  132. struct aws_input_stream_byte_cursor_impl *impl =
  133. AWS_CONTAINER_OF(stream, struct aws_input_stream_byte_cursor_impl, base);
  134. #if SIZE_MAX > INT64_MAX
  135. size_t length = impl->original_cursor.len;
  136. if (length > INT64_MAX) {
  137. return aws_raise_error(AWS_ERROR_OVERFLOW_DETECTED);
  138. }
  139. #endif
  140. *out_length = (int64_t)impl->original_cursor.len;
  141. return AWS_OP_SUCCESS;
  142. }
  143. static void s_aws_input_stream_byte_cursor_destroy(struct aws_input_stream_byte_cursor_impl *impl) {
  144. aws_mem_release(impl->allocator, impl);
  145. }
  146. static struct aws_input_stream_vtable s_aws_input_stream_byte_cursor_vtable = {
  147. .seek = s_aws_input_stream_byte_cursor_seek,
  148. .read = s_aws_input_stream_byte_cursor_read,
  149. .get_status = s_aws_input_stream_byte_cursor_get_status,
  150. .get_length = s_aws_input_stream_byte_cursor_get_length,
  151. };
  152. struct aws_input_stream *aws_input_stream_new_from_cursor(
  153. struct aws_allocator *allocator,
  154. const struct aws_byte_cursor *cursor) {
  155. struct aws_input_stream_byte_cursor_impl *impl =
  156. aws_mem_calloc(allocator, 1, sizeof(struct aws_input_stream_byte_cursor_impl));
  157. impl->allocator = allocator;
  158. impl->original_cursor = *cursor;
  159. impl->current_cursor = *cursor;
  160. impl->base.vtable = &s_aws_input_stream_byte_cursor_vtable;
  161. aws_ref_count_init(
  162. &impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_stream_byte_cursor_destroy);
  163. return &impl->base;
  164. }
  165. /*
  166. * file-based input stream
  167. */
  168. struct aws_input_stream_file_impl {
  169. struct aws_input_stream base;
  170. struct aws_allocator *allocator;
  171. FILE *file;
  172. bool close_on_clean_up;
  173. };
  174. static int s_aws_input_stream_file_seek(
  175. struct aws_input_stream *stream,
  176. int64_t offset,
  177. enum aws_stream_seek_basis basis) {
  178. struct aws_input_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_file_impl, base);
  179. int whence = (basis == AWS_SSB_BEGIN) ? SEEK_SET : SEEK_END;
  180. if (aws_fseek(impl->file, offset, whence)) {
  181. return AWS_OP_ERR;
  182. }
  183. return AWS_OP_SUCCESS;
  184. }
  185. static int s_aws_input_stream_file_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
  186. struct aws_input_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_file_impl, base);
  187. size_t max_read = dest->capacity - dest->len;
  188. size_t actually_read = fread(dest->buffer + dest->len, 1, max_read, impl->file);
  189. if (actually_read == 0) {
  190. if (ferror(impl->file)) {
  191. return aws_raise_error(AWS_IO_STREAM_READ_FAILED);
  192. }
  193. }
  194. dest->len += actually_read;
  195. return AWS_OP_SUCCESS;
  196. }
  197. static int s_aws_input_stream_file_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
  198. struct aws_input_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_file_impl, base);
  199. status->is_end_of_stream = feof(impl->file) != 0;
  200. status->is_valid = ferror(impl->file) == 0;
  201. return AWS_OP_SUCCESS;
  202. }
  203. static int s_aws_input_stream_file_get_length(struct aws_input_stream *stream, int64_t *length) {
  204. struct aws_input_stream_file_impl *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_file_impl, base);
  205. return aws_file_get_length(impl->file, length);
  206. }
  207. static void s_aws_input_stream_file_destroy(struct aws_input_stream_file_impl *impl) {
  208. if (impl->close_on_clean_up && impl->file) {
  209. fclose(impl->file);
  210. }
  211. aws_mem_release(impl->allocator, impl);
  212. }
  213. static struct aws_input_stream_vtable s_aws_input_stream_file_vtable = {
  214. .seek = s_aws_input_stream_file_seek,
  215. .read = s_aws_input_stream_file_read,
  216. .get_status = s_aws_input_stream_file_get_status,
  217. .get_length = s_aws_input_stream_file_get_length,
  218. };
  219. struct aws_input_stream *aws_input_stream_new_from_file(struct aws_allocator *allocator, const char *file_name) {
  220. struct aws_input_stream_file_impl *impl = aws_mem_calloc(allocator, 1, sizeof(struct aws_input_stream_file_impl));
  221. impl->file = aws_fopen(file_name, "r+b");
  222. if (impl->file == NULL) {
  223. goto on_error;
  224. }
  225. impl->close_on_clean_up = true;
  226. impl->allocator = allocator;
  227. impl->base.vtable = &s_aws_input_stream_file_vtable;
  228. aws_ref_count_init(&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_stream_file_destroy);
  229. return &impl->base;
  230. on_error:
  231. aws_mem_release(allocator, impl);
  232. return NULL;
  233. }
  234. struct aws_input_stream *aws_input_stream_new_from_open_file(struct aws_allocator *allocator, FILE *file) {
  235. struct aws_input_stream_file_impl *impl = aws_mem_calloc(allocator, 1, sizeof(struct aws_input_stream_file_impl));
  236. impl->file = file;
  237. impl->close_on_clean_up = false;
  238. impl->allocator = allocator;
  239. impl->base.vtable = &s_aws_input_stream_file_vtable;
  240. aws_ref_count_init(&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_stream_file_destroy);
  241. return &impl->base;
  242. }
  243. struct aws_input_stream *aws_input_stream_acquire(struct aws_input_stream *stream) {
  244. if (stream != NULL) {
  245. if (stream->vtable->acquire) {
  246. stream->vtable->acquire(stream);
  247. } else {
  248. aws_ref_count_acquire(&stream->ref_count);
  249. }
  250. }
  251. return stream;
  252. }
  253. struct aws_input_stream *aws_input_stream_release(struct aws_input_stream *stream) {
  254. if (stream != NULL) {
  255. if (stream->vtable->release) {
  256. stream->vtable->release(stream);
  257. } else {
  258. aws_ref_count_release(&stream->ref_count);
  259. }
  260. }
  261. return NULL;
  262. }
  263. void aws_input_stream_destroy(struct aws_input_stream *stream) {
  264. aws_input_stream_release(stream);
  265. }