s3_paginator.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/s3/private/s3_paginator.h>
  6. #include <aws/s3/s3_client.h>
  7. #include <aws/common/assert.h>
  8. #include <aws/common/atomics.h>
  9. #include <aws/common/byte_buf.h>
  10. #include <aws/common/mutex.h>
  11. #include <aws/common/ref_count.h>
  12. #include <aws/common/string.h>
  13. #include <aws/common/xml_parser.h>
  14. #include <aws/http/request_response.h>
  15. static const size_t s_dynamic_body_initial_buf_size = 1024;
  16. enum operation_state {
  17. OS_NOT_STARTED,
  18. OS_INITIATED,
  19. OS_COMPLETED,
  20. OS_ERROR,
  21. };
  22. struct aws_s3_paginated_operation {
  23. struct aws_allocator *allocator;
  24. struct aws_string *result_xml_node_name;
  25. struct aws_string *continuation_xml_node_name;
  26. aws_s3_next_http_message_fn *next_http_message;
  27. aws_s3_on_result_node_encountered_fn *on_result_node_encountered;
  28. aws_s3_on_paginated_operation_cleanup_fn *on_paginated_operation_cleanup;
  29. void *user_data;
  30. struct aws_ref_count ref_count;
  31. };
  32. struct aws_s3_paginator {
  33. struct aws_allocator *allocator;
  34. struct aws_s3_client *client;
  35. /** The current, in-flight paginated request to s3. */
  36. struct aws_atomic_var current_request;
  37. struct aws_string *bucket_name;
  38. struct aws_string *endpoint;
  39. struct aws_s3_paginated_operation *operation;
  40. struct aws_ref_count ref_count;
  41. struct {
  42. struct aws_string *continuation_token;
  43. enum operation_state operation_state;
  44. struct aws_mutex lock;
  45. bool has_more_results;
  46. } shared_mt_state;
  47. struct aws_byte_buf result_body;
  48. aws_s3_on_page_finished_fn *on_page_finished;
  49. void *user_data;
  50. };
  51. static void s_operation_ref_count_zero_callback(void *arg) {
  52. struct aws_s3_paginated_operation *operation = arg;
  53. if (operation->on_paginated_operation_cleanup) {
  54. operation->on_paginated_operation_cleanup(operation->user_data);
  55. }
  56. if (operation->result_xml_node_name) {
  57. aws_string_destroy(operation->result_xml_node_name);
  58. }
  59. if (operation->continuation_xml_node_name) {
  60. aws_string_destroy(operation->continuation_xml_node_name);
  61. }
  62. aws_mem_release(operation->allocator, operation);
  63. }
  64. static void s_paginator_ref_count_zero_callback(void *arg) {
  65. struct aws_s3_paginator *paginator = arg;
  66. aws_s3_client_release(paginator->client);
  67. aws_s3_paginated_operation_release(paginator->operation);
  68. aws_byte_buf_clean_up(&paginator->result_body);
  69. struct aws_s3_meta_request *previous_request = aws_atomic_exchange_ptr(&paginator->current_request, NULL);
  70. if (previous_request != NULL) {
  71. aws_s3_meta_request_release(previous_request);
  72. }
  73. if (paginator->bucket_name) {
  74. aws_string_destroy(paginator->bucket_name);
  75. }
  76. if (paginator->endpoint) {
  77. aws_string_destroy(paginator->endpoint);
  78. }
  79. if (paginator->shared_mt_state.continuation_token) {
  80. aws_string_destroy(paginator->shared_mt_state.continuation_token);
  81. }
  82. aws_mem_release(paginator->allocator, paginator);
  83. }
  84. struct aws_s3_paginator *aws_s3_initiate_paginator(
  85. struct aws_allocator *allocator,
  86. const struct aws_s3_paginator_params *params) {
  87. AWS_FATAL_PRECONDITION(params);
  88. AWS_FATAL_PRECONDITION(params->client);
  89. struct aws_s3_paginator *paginator = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_paginator));
  90. paginator->allocator = allocator;
  91. paginator->client = aws_s3_client_acquire(params->client);
  92. paginator->operation = params->operation;
  93. paginator->on_page_finished = params->on_page_finished_fn;
  94. paginator->user_data = params->user_data;
  95. paginator->bucket_name = aws_string_new_from_cursor(allocator, &params->bucket_name);
  96. paginator->endpoint = aws_string_new_from_cursor(allocator, &params->endpoint);
  97. aws_s3_paginated_operation_acquire(params->operation);
  98. aws_byte_buf_init(&paginator->result_body, allocator, s_dynamic_body_initial_buf_size);
  99. aws_ref_count_init(&paginator->ref_count, paginator, s_paginator_ref_count_zero_callback);
  100. aws_mutex_init(&paginator->shared_mt_state.lock);
  101. aws_atomic_init_ptr(&paginator->current_request, NULL);
  102. paginator->shared_mt_state.operation_state = OS_NOT_STARTED;
  103. return paginator;
  104. }
  105. void aws_s3_paginator_release(struct aws_s3_paginator *paginator) {
  106. if (paginator) {
  107. aws_ref_count_release(&paginator->ref_count);
  108. }
  109. }
  110. void aws_s3_paginator_acquire(struct aws_s3_paginator *paginator) {
  111. AWS_FATAL_PRECONDITION(paginator);
  112. aws_ref_count_acquire(&paginator->ref_count);
  113. }
  114. struct aws_s3_paginated_operation *aws_s3_paginated_operation_new(
  115. struct aws_allocator *allocator,
  116. const struct aws_s3_paginated_operation_params *params) {
  117. struct aws_s3_paginated_operation *operation =
  118. aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_paginated_operation));
  119. operation->allocator = allocator;
  120. operation->result_xml_node_name = aws_string_new_from_cursor(allocator, params->result_xml_node_name);
  121. operation->continuation_xml_node_name = aws_string_new_from_cursor(allocator, params->continuation_token_node_name);
  122. operation->next_http_message = params->next_message;
  123. operation->on_result_node_encountered = params->on_result_node_encountered_fn;
  124. operation->on_paginated_operation_cleanup = params->on_paginated_operation_cleanup;
  125. operation->user_data = params->user_data;
  126. aws_ref_count_init(&operation->ref_count, operation, s_operation_ref_count_zero_callback);
  127. return operation;
  128. }
  129. void aws_s3_paginated_operation_acquire(struct aws_s3_paginated_operation *operation) {
  130. AWS_FATAL_PRECONDITION(operation);
  131. aws_ref_count_acquire(&operation->ref_count);
  132. }
  133. void aws_s3_paginated_operation_release(struct aws_s3_paginated_operation *operation) {
  134. if (operation) {
  135. aws_ref_count_release(&operation->ref_count);
  136. }
  137. }
  138. bool aws_s3_paginator_has_more_results(const struct aws_s3_paginator *paginator) {
  139. AWS_PRECONDITION(paginator);
  140. bool has_more_results = false;
  141. struct aws_s3_paginator *paginator_mut = (struct aws_s3_paginator *)paginator;
  142. aws_mutex_lock(&paginator_mut->shared_mt_state.lock);
  143. has_more_results = paginator->shared_mt_state.has_more_results;
  144. aws_mutex_unlock(&paginator_mut->shared_mt_state.lock);
  145. AWS_LOGF_INFO(AWS_LS_S3_GENERAL, "has more %d", has_more_results);
  146. return has_more_results;
  147. }
  148. struct aws_string *s_paginator_get_continuation_token(const struct aws_s3_paginator *paginator) {
  149. AWS_PRECONDITION(paginator);
  150. struct aws_string *continuation_token = NULL;
  151. struct aws_s3_paginator *paginator_mut = (struct aws_s3_paginator *)paginator;
  152. aws_mutex_lock(&paginator_mut->shared_mt_state.lock);
  153. if (paginator->shared_mt_state.continuation_token) {
  154. continuation_token =
  155. aws_string_clone_or_reuse(paginator->allocator, paginator->shared_mt_state.continuation_token);
  156. }
  157. aws_mutex_unlock(&paginator_mut->shared_mt_state.lock);
  158. return continuation_token;
  159. }
  160. static inline int s_set_paginator_state_if_legal(
  161. struct aws_s3_paginator *paginator,
  162. enum operation_state expected,
  163. enum operation_state state) {
  164. aws_mutex_lock(&paginator->shared_mt_state.lock);
  165. if (paginator->shared_mt_state.operation_state != expected) {
  166. aws_mutex_unlock(&paginator->shared_mt_state.lock);
  167. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  168. }
  169. paginator->shared_mt_state.operation_state = state;
  170. aws_mutex_unlock(&paginator->shared_mt_state.lock);
  171. return AWS_OP_SUCCESS;
  172. }
  173. /**
  174. * On a successful operation, this is an xml document. Just copy the buffers over until we're ready to parse (upon
  175. * completion) of the response body.
  176. */
  177. static int s_receive_body_callback(
  178. struct aws_s3_meta_request *meta_request,
  179. const struct aws_byte_cursor *body,
  180. uint64_t range_start,
  181. void *user_data) {
  182. (void)range_start;
  183. (void)meta_request;
  184. struct aws_s3_paginator *paginator = user_data;
  185. if (body && body->len) {
  186. aws_byte_buf_append_dynamic(&paginator->result_body, body);
  187. }
  188. return AWS_OP_SUCCESS;
  189. }
  190. struct parser_wrapper {
  191. struct aws_s3_paginated_operation *operation;
  192. struct aws_string *next_continuation_token;
  193. bool has_more_results;
  194. };
  195. static bool s_on_result_node_encountered(struct aws_xml_parser *parser, struct aws_xml_node *node, void *user_data) {
  196. struct parser_wrapper *wrapper = user_data;
  197. struct aws_byte_cursor node_name;
  198. aws_xml_node_get_name(node, &node_name);
  199. struct aws_byte_cursor continuation_name_val =
  200. aws_byte_cursor_from_string(wrapper->operation->continuation_xml_node_name);
  201. if (aws_byte_cursor_eq_ignore_case(&node_name, &continuation_name_val)) {
  202. struct aws_byte_cursor continuation_token_cur;
  203. bool ret_val = aws_xml_node_as_body(parser, node, &continuation_token_cur) == AWS_OP_SUCCESS;
  204. if (ret_val) {
  205. wrapper->next_continuation_token =
  206. aws_string_new_from_cursor(wrapper->operation->allocator, &continuation_token_cur);
  207. }
  208. return ret_val;
  209. }
  210. if (aws_byte_cursor_eq_c_str_ignore_case(&node_name, "IsTruncated")) {
  211. struct aws_byte_cursor truncated_cur;
  212. bool ret_val = aws_xml_node_as_body(parser, node, &truncated_cur) == AWS_OP_SUCCESS;
  213. if (ret_val) {
  214. if (aws_byte_cursor_eq_c_str_ignore_case(&truncated_cur, "true")) {
  215. wrapper->has_more_results = true;
  216. }
  217. }
  218. return ret_val;
  219. }
  220. return wrapper->operation->on_result_node_encountered(parser, node, wrapper->operation->user_data);
  221. }
  222. static bool s_on_root_node_encountered(struct aws_xml_parser *parser, struct aws_xml_node *node, void *user_data) {
  223. struct parser_wrapper *wrapper = user_data;
  224. struct aws_byte_cursor node_name;
  225. aws_xml_node_get_name(node, &node_name);
  226. struct aws_byte_cursor result_name_val = aws_byte_cursor_from_string(wrapper->operation->result_xml_node_name);
  227. if (aws_byte_cursor_eq_ignore_case(&node_name, &result_name_val)) {
  228. return aws_xml_node_traverse(parser, node, s_on_result_node_encountered, wrapper);
  229. }
  230. return false;
  231. }
  232. static void s_on_request_finished(
  233. struct aws_s3_meta_request *meta_request,
  234. const struct aws_s3_meta_request_result *meta_request_result,
  235. void *user_data) {
  236. (void)meta_request;
  237. struct aws_s3_paginator *paginator = user_data;
  238. if (meta_request_result->response_status == 200) {
  239. /* clears previous continuation token */
  240. aws_mutex_lock(&paginator->shared_mt_state.lock);
  241. if (paginator->shared_mt_state.continuation_token) {
  242. aws_string_destroy(paginator->shared_mt_state.continuation_token);
  243. paginator->shared_mt_state.continuation_token = NULL;
  244. paginator->shared_mt_state.has_more_results = false;
  245. }
  246. aws_mutex_unlock(&paginator->shared_mt_state.lock);
  247. struct aws_byte_cursor result_body_cursor = aws_byte_cursor_from_buf(&paginator->result_body);
  248. struct aws_string *continuation_token = NULL;
  249. bool has_more_results = false;
  250. aws_s3_paginated_operation_on_response(
  251. paginator->operation, &result_body_cursor, &continuation_token, &has_more_results);
  252. aws_mutex_lock(&paginator->shared_mt_state.lock);
  253. if (paginator->shared_mt_state.continuation_token) {
  254. aws_string_destroy(paginator->shared_mt_state.continuation_token);
  255. }
  256. paginator->shared_mt_state.continuation_token = continuation_token;
  257. paginator->shared_mt_state.has_more_results = has_more_results;
  258. aws_mutex_unlock(&paginator->shared_mt_state.lock);
  259. if (has_more_results) {
  260. s_set_paginator_state_if_legal(paginator, OS_INITIATED, OS_NOT_STARTED);
  261. } else {
  262. s_set_paginator_state_if_legal(paginator, OS_INITIATED, OS_COMPLETED);
  263. }
  264. } else {
  265. s_set_paginator_state_if_legal(paginator, OS_INITIATED, OS_ERROR);
  266. }
  267. if (paginator->on_page_finished) {
  268. paginator->on_page_finished(paginator, meta_request_result->error_code, paginator->user_data);
  269. }
  270. /* this ref count was done right before we kicked off the request to keep the paginator object alive. Release it now
  271. * that the operation has completed. */
  272. aws_s3_paginator_release(paginator);
  273. }
  274. int aws_s3_paginated_operation_on_response(
  275. struct aws_s3_paginated_operation *operation,
  276. struct aws_byte_cursor *response_body,
  277. struct aws_string **continuation_token_out,
  278. bool *has_more_results_out) {
  279. struct aws_xml_parser_options parser_options = {
  280. .doc = *response_body,
  281. .max_depth = 16U,
  282. };
  283. struct parser_wrapper wrapper = {.operation = operation};
  284. /* we've got a full xml document now and the request succeeded, parse the document and fire all the callbacks
  285. * for each object and prefix. All of that happens in these three lines. */
  286. struct aws_xml_parser *parser = aws_xml_parser_new(operation->allocator, &parser_options);
  287. int error_code = aws_xml_parser_parse(parser, s_on_root_node_encountered, &wrapper);
  288. aws_xml_parser_destroy(parser);
  289. if (error_code == AWS_OP_SUCCESS) {
  290. *continuation_token_out = wrapper.next_continuation_token;
  291. *has_more_results_out = wrapper.has_more_results;
  292. }
  293. return error_code;
  294. }
  295. int aws_s3_construct_next_paginated_request_http_message(
  296. struct aws_s3_paginated_operation *operation,
  297. struct aws_byte_cursor *continuation_token,
  298. struct aws_http_message **out_message) {
  299. return operation->next_http_message(continuation_token, operation->user_data, out_message);
  300. }
  301. int aws_s3_paginator_continue(struct aws_s3_paginator *paginator, const struct aws_signing_config_aws *signing_config) {
  302. AWS_PRECONDITION(paginator);
  303. AWS_PRECONDITION(signing_config);
  304. int re_code = AWS_OP_ERR;
  305. if (s_set_paginator_state_if_legal(paginator, OS_NOT_STARTED, OS_INITIATED)) {
  306. return re_code;
  307. }
  308. struct aws_http_message *paginated_request_message = NULL;
  309. struct aws_string *continuation_string = NULL;
  310. struct aws_byte_buf host_buf;
  311. AWS_ZERO_STRUCT(host_buf);
  312. struct aws_byte_cursor host_cur = aws_byte_cursor_from_string(paginator->bucket_name);
  313. struct aws_byte_cursor period_cur = aws_byte_cursor_from_c_str(".");
  314. struct aws_byte_cursor endpoint_val = aws_byte_cursor_from_string(paginator->endpoint);
  315. if (aws_byte_buf_init_copy_from_cursor(&host_buf, paginator->allocator, host_cur) ||
  316. aws_byte_buf_append_dynamic(&host_buf, &period_cur) || aws_byte_buf_append_dynamic(&host_buf, &endpoint_val)) {
  317. goto done;
  318. }
  319. struct aws_http_header host_header = {
  320. .name = aws_byte_cursor_from_c_str("host"),
  321. .value = aws_byte_cursor_from_buf(&host_buf),
  322. };
  323. continuation_string = s_paginator_get_continuation_token(paginator);
  324. struct aws_byte_cursor continuation_cursor;
  325. AWS_ZERO_STRUCT(continuation_cursor);
  326. struct aws_byte_cursor *continuation = NULL;
  327. if (continuation_string) {
  328. continuation_cursor = aws_byte_cursor_from_string(continuation_string);
  329. continuation = &continuation_cursor;
  330. }
  331. if (paginator->operation->next_http_message(
  332. continuation, paginator->operation->user_data, &paginated_request_message)) {
  333. goto done;
  334. }
  335. if (aws_http_message_add_header(paginated_request_message, host_header)) {
  336. goto done;
  337. }
  338. struct aws_s3_meta_request_options request_options = {
  339. .user_data = paginator,
  340. .signing_config = (struct aws_signing_config_aws *)signing_config,
  341. .type = AWS_S3_META_REQUEST_TYPE_DEFAULT,
  342. .body_callback = s_receive_body_callback,
  343. .finish_callback = s_on_request_finished,
  344. .message = paginated_request_message,
  345. };
  346. /* re-use the current buffer. */
  347. aws_byte_buf_reset(&paginator->result_body, false);
  348. /* we're kicking off an asynchronous request. ref-count the paginator to keep it alive until we finish. */
  349. aws_s3_paginator_acquire(paginator);
  350. struct aws_s3_meta_request *previous_request = aws_atomic_exchange_ptr(&paginator->current_request, NULL);
  351. if (previous_request != NULL) {
  352. /* release request from previous page */
  353. aws_s3_meta_request_release(previous_request);
  354. }
  355. struct aws_s3_meta_request *new_request = aws_s3_client_make_meta_request(paginator->client, &request_options);
  356. aws_atomic_store_ptr(&paginator->current_request, new_request);
  357. if (new_request == NULL) {
  358. s_set_paginator_state_if_legal(paginator, OS_INITIATED, OS_ERROR);
  359. goto done;
  360. }
  361. re_code = AWS_OP_SUCCESS;
  362. done:
  363. aws_http_message_release(paginated_request_message);
  364. aws_string_destroy(continuation_string);
  365. aws_byte_buf_clean_up(&host_buf);
  366. return re_code;
  367. }