123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include <aws/s3/private/s3_paginator.h>
- #include <aws/s3/s3_client.h>
- #include <aws/common/assert.h>
- #include <aws/common/atomics.h>
- #include <aws/common/byte_buf.h>
- #include <aws/common/mutex.h>
- #include <aws/common/ref_count.h>
- #include <aws/common/string.h>
- #include <aws/common/xml_parser.h>
- #include <aws/http/request_response.h>
- static const size_t s_dynamic_body_initial_buf_size = 1024;
- enum operation_state {
- OS_NOT_STARTED,
- OS_INITIATED,
- OS_COMPLETED,
- OS_ERROR,
- };
- struct aws_s3_paginated_operation {
- struct aws_allocator *allocator;
- struct aws_string *result_xml_node_name;
- struct aws_string *continuation_xml_node_name;
- aws_s3_next_http_message_fn *next_http_message;
- aws_s3_on_result_node_encountered_fn *on_result_node_encountered;
- aws_s3_on_paginated_operation_cleanup_fn *on_paginated_operation_cleanup;
- void *user_data;
- struct aws_ref_count ref_count;
- };
- struct aws_s3_paginator {
- struct aws_allocator *allocator;
- struct aws_s3_client *client;
- /** The current, in-flight paginated request to s3. */
- struct aws_atomic_var current_request;
- struct aws_string *bucket_name;
- struct aws_string *endpoint;
- struct aws_s3_paginated_operation *operation;
- struct aws_ref_count ref_count;
- struct {
- struct aws_string *continuation_token;
- enum operation_state operation_state;
- struct aws_mutex lock;
- bool has_more_results;
- } shared_mt_state;
- struct aws_byte_buf result_body;
- aws_s3_on_page_finished_fn *on_page_finished;
- void *user_data;
- };
- static void s_operation_ref_count_zero_callback(void *arg) {
- struct aws_s3_paginated_operation *operation = arg;
- if (operation->on_paginated_operation_cleanup) {
- operation->on_paginated_operation_cleanup(operation->user_data);
- }
- if (operation->result_xml_node_name) {
- aws_string_destroy(operation->result_xml_node_name);
- }
- if (operation->continuation_xml_node_name) {
- aws_string_destroy(operation->continuation_xml_node_name);
- }
- aws_mem_release(operation->allocator, operation);
- }
- static void s_paginator_ref_count_zero_callback(void *arg) {
- struct aws_s3_paginator *paginator = arg;
- aws_s3_client_release(paginator->client);
- aws_s3_paginated_operation_release(paginator->operation);
- aws_byte_buf_clean_up(&paginator->result_body);
- struct aws_s3_meta_request *previous_request = aws_atomic_exchange_ptr(&paginator->current_request, NULL);
- if (previous_request != NULL) {
- aws_s3_meta_request_release(previous_request);
- }
- if (paginator->bucket_name) {
- aws_string_destroy(paginator->bucket_name);
- }
- if (paginator->endpoint) {
- aws_string_destroy(paginator->endpoint);
- }
- if (paginator->shared_mt_state.continuation_token) {
- aws_string_destroy(paginator->shared_mt_state.continuation_token);
- }
- aws_mem_release(paginator->allocator, paginator);
- }
- struct aws_s3_paginator *aws_s3_initiate_paginator(
- struct aws_allocator *allocator,
- const struct aws_s3_paginator_params *params) {
- AWS_FATAL_PRECONDITION(params);
- AWS_FATAL_PRECONDITION(params->client);
- struct aws_s3_paginator *paginator = aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_paginator));
- paginator->allocator = allocator;
- paginator->client = aws_s3_client_acquire(params->client);
- paginator->operation = params->operation;
- paginator->on_page_finished = params->on_page_finished_fn;
- paginator->user_data = params->user_data;
- paginator->bucket_name = aws_string_new_from_cursor(allocator, ¶ms->bucket_name);
- paginator->endpoint = aws_string_new_from_cursor(allocator, ¶ms->endpoint);
- aws_s3_paginated_operation_acquire(params->operation);
- aws_byte_buf_init(&paginator->result_body, allocator, s_dynamic_body_initial_buf_size);
- aws_ref_count_init(&paginator->ref_count, paginator, s_paginator_ref_count_zero_callback);
- aws_mutex_init(&paginator->shared_mt_state.lock);
- aws_atomic_init_ptr(&paginator->current_request, NULL);
- paginator->shared_mt_state.operation_state = OS_NOT_STARTED;
- return paginator;
- }
- void aws_s3_paginator_release(struct aws_s3_paginator *paginator) {
- if (paginator) {
- aws_ref_count_release(&paginator->ref_count);
- }
- }
- void aws_s3_paginator_acquire(struct aws_s3_paginator *paginator) {
- AWS_FATAL_PRECONDITION(paginator);
- aws_ref_count_acquire(&paginator->ref_count);
- }
- struct aws_s3_paginated_operation *aws_s3_paginated_operation_new(
- struct aws_allocator *allocator,
- const struct aws_s3_paginated_operation_params *params) {
- struct aws_s3_paginated_operation *operation =
- aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_paginated_operation));
- operation->allocator = allocator;
- operation->result_xml_node_name = aws_string_new_from_cursor(allocator, params->result_xml_node_name);
- operation->continuation_xml_node_name = aws_string_new_from_cursor(allocator, params->continuation_token_node_name);
- operation->next_http_message = params->next_message;
- operation->on_result_node_encountered = params->on_result_node_encountered_fn;
- operation->on_paginated_operation_cleanup = params->on_paginated_operation_cleanup;
- operation->user_data = params->user_data;
- aws_ref_count_init(&operation->ref_count, operation, s_operation_ref_count_zero_callback);
- return operation;
- }
- void aws_s3_paginated_operation_acquire(struct aws_s3_paginated_operation *operation) {
- AWS_FATAL_PRECONDITION(operation);
- aws_ref_count_acquire(&operation->ref_count);
- }
- void aws_s3_paginated_operation_release(struct aws_s3_paginated_operation *operation) {
- if (operation) {
- aws_ref_count_release(&operation->ref_count);
- }
- }
- bool aws_s3_paginator_has_more_results(const struct aws_s3_paginator *paginator) {
- AWS_PRECONDITION(paginator);
- bool has_more_results = false;
- struct aws_s3_paginator *paginator_mut = (struct aws_s3_paginator *)paginator;
- aws_mutex_lock(&paginator_mut->shared_mt_state.lock);
- has_more_results = paginator->shared_mt_state.has_more_results;
- aws_mutex_unlock(&paginator_mut->shared_mt_state.lock);
- AWS_LOGF_INFO(AWS_LS_S3_GENERAL, "has more %d", has_more_results);
- return has_more_results;
- }
- struct aws_string *s_paginator_get_continuation_token(const struct aws_s3_paginator *paginator) {
- AWS_PRECONDITION(paginator);
- struct aws_string *continuation_token = NULL;
- struct aws_s3_paginator *paginator_mut = (struct aws_s3_paginator *)paginator;
- aws_mutex_lock(&paginator_mut->shared_mt_state.lock);
- if (paginator->shared_mt_state.continuation_token) {
- continuation_token =
- aws_string_clone_or_reuse(paginator->allocator, paginator->shared_mt_state.continuation_token);
- }
- aws_mutex_unlock(&paginator_mut->shared_mt_state.lock);
- return continuation_token;
- }
- static inline int s_set_paginator_state_if_legal(
- struct aws_s3_paginator *paginator,
- enum operation_state expected,
- enum operation_state state) {
- aws_mutex_lock(&paginator->shared_mt_state.lock);
- if (paginator->shared_mt_state.operation_state != expected) {
- aws_mutex_unlock(&paginator->shared_mt_state.lock);
- return aws_raise_error(AWS_ERROR_INVALID_STATE);
- }
- paginator->shared_mt_state.operation_state = state;
- aws_mutex_unlock(&paginator->shared_mt_state.lock);
- return AWS_OP_SUCCESS;
- }
- /**
- * On a successful operation, this is an xml document. Just copy the buffers over until we're ready to parse (upon
- * completion) of the response body.
- */
- static int s_receive_body_callback(
- struct aws_s3_meta_request *meta_request,
- const struct aws_byte_cursor *body,
- uint64_t range_start,
- void *user_data) {
- (void)range_start;
- (void)meta_request;
- struct aws_s3_paginator *paginator = user_data;
- if (body && body->len) {
- aws_byte_buf_append_dynamic(&paginator->result_body, body);
- }
- return AWS_OP_SUCCESS;
- }
- struct parser_wrapper {
- struct aws_s3_paginated_operation *operation;
- struct aws_string *next_continuation_token;
- bool has_more_results;
- };
- static bool s_on_result_node_encountered(struct aws_xml_parser *parser, struct aws_xml_node *node, void *user_data) {
- struct parser_wrapper *wrapper = user_data;
- struct aws_byte_cursor node_name;
- aws_xml_node_get_name(node, &node_name);
- struct aws_byte_cursor continuation_name_val =
- aws_byte_cursor_from_string(wrapper->operation->continuation_xml_node_name);
- if (aws_byte_cursor_eq_ignore_case(&node_name, &continuation_name_val)) {
- struct aws_byte_cursor continuation_token_cur;
- bool ret_val = aws_xml_node_as_body(parser, node, &continuation_token_cur) == AWS_OP_SUCCESS;
- if (ret_val) {
- wrapper->next_continuation_token =
- aws_string_new_from_cursor(wrapper->operation->allocator, &continuation_token_cur);
- }
- return ret_val;
- }
- if (aws_byte_cursor_eq_c_str_ignore_case(&node_name, "IsTruncated")) {
- struct aws_byte_cursor truncated_cur;
- bool ret_val = aws_xml_node_as_body(parser, node, &truncated_cur) == AWS_OP_SUCCESS;
- if (ret_val) {
- if (aws_byte_cursor_eq_c_str_ignore_case(&truncated_cur, "true")) {
- wrapper->has_more_results = true;
- }
- }
- return ret_val;
- }
- return wrapper->operation->on_result_node_encountered(parser, node, wrapper->operation->user_data);
- }
- static bool s_on_root_node_encountered(struct aws_xml_parser *parser, struct aws_xml_node *node, void *user_data) {
- struct parser_wrapper *wrapper = user_data;
- struct aws_byte_cursor node_name;
- aws_xml_node_get_name(node, &node_name);
- struct aws_byte_cursor result_name_val = aws_byte_cursor_from_string(wrapper->operation->result_xml_node_name);
- if (aws_byte_cursor_eq_ignore_case(&node_name, &result_name_val)) {
- return aws_xml_node_traverse(parser, node, s_on_result_node_encountered, wrapper);
- }
- return false;
- }
- static void s_on_request_finished(
- struct aws_s3_meta_request *meta_request,
- const struct aws_s3_meta_request_result *meta_request_result,
- void *user_data) {
- (void)meta_request;
- struct aws_s3_paginator *paginator = user_data;
- if (meta_request_result->response_status == 200) {
- /* clears previous continuation token */
- aws_mutex_lock(&paginator->shared_mt_state.lock);
- if (paginator->shared_mt_state.continuation_token) {
- aws_string_destroy(paginator->shared_mt_state.continuation_token);
- paginator->shared_mt_state.continuation_token = NULL;
- paginator->shared_mt_state.has_more_results = false;
- }
- aws_mutex_unlock(&paginator->shared_mt_state.lock);
- struct aws_byte_cursor result_body_cursor = aws_byte_cursor_from_buf(&paginator->result_body);
- struct aws_string *continuation_token = NULL;
- bool has_more_results = false;
- aws_s3_paginated_operation_on_response(
- paginator->operation, &result_body_cursor, &continuation_token, &has_more_results);
- aws_mutex_lock(&paginator->shared_mt_state.lock);
- if (paginator->shared_mt_state.continuation_token) {
- aws_string_destroy(paginator->shared_mt_state.continuation_token);
- }
- paginator->shared_mt_state.continuation_token = continuation_token;
- paginator->shared_mt_state.has_more_results = has_more_results;
- aws_mutex_unlock(&paginator->shared_mt_state.lock);
- if (has_more_results) {
- s_set_paginator_state_if_legal(paginator, OS_INITIATED, OS_NOT_STARTED);
- } else {
- s_set_paginator_state_if_legal(paginator, OS_INITIATED, OS_COMPLETED);
- }
- } else {
- s_set_paginator_state_if_legal(paginator, OS_INITIATED, OS_ERROR);
- }
- if (paginator->on_page_finished) {
- paginator->on_page_finished(paginator, meta_request_result->error_code, paginator->user_data);
- }
- /* this ref count was done right before we kicked off the request to keep the paginator object alive. Release it now
- * that the operation has completed. */
- aws_s3_paginator_release(paginator);
- }
- int aws_s3_paginated_operation_on_response(
- struct aws_s3_paginated_operation *operation,
- struct aws_byte_cursor *response_body,
- struct aws_string **continuation_token_out,
- bool *has_more_results_out) {
- struct aws_xml_parser_options parser_options = {
- .doc = *response_body,
- .max_depth = 16U,
- };
- struct parser_wrapper wrapper = {.operation = operation};
- /* we've got a full xml document now and the request succeeded, parse the document and fire all the callbacks
- * for each object and prefix. All of that happens in these three lines. */
- struct aws_xml_parser *parser = aws_xml_parser_new(operation->allocator, &parser_options);
- int error_code = aws_xml_parser_parse(parser, s_on_root_node_encountered, &wrapper);
- aws_xml_parser_destroy(parser);
- if (error_code == AWS_OP_SUCCESS) {
- *continuation_token_out = wrapper.next_continuation_token;
- *has_more_results_out = wrapper.has_more_results;
- }
- return error_code;
- }
- int aws_s3_construct_next_paginated_request_http_message(
- struct aws_s3_paginated_operation *operation,
- struct aws_byte_cursor *continuation_token,
- struct aws_http_message **out_message) {
- return operation->next_http_message(continuation_token, operation->user_data, out_message);
- }
- int aws_s3_paginator_continue(struct aws_s3_paginator *paginator, const struct aws_signing_config_aws *signing_config) {
- AWS_PRECONDITION(paginator);
- AWS_PRECONDITION(signing_config);
- int re_code = AWS_OP_ERR;
- if (s_set_paginator_state_if_legal(paginator, OS_NOT_STARTED, OS_INITIATED)) {
- return re_code;
- }
- struct aws_http_message *paginated_request_message = NULL;
- struct aws_string *continuation_string = NULL;
- struct aws_byte_buf host_buf;
- AWS_ZERO_STRUCT(host_buf);
- struct aws_byte_cursor host_cur = aws_byte_cursor_from_string(paginator->bucket_name);
- struct aws_byte_cursor period_cur = aws_byte_cursor_from_c_str(".");
- struct aws_byte_cursor endpoint_val = aws_byte_cursor_from_string(paginator->endpoint);
- if (aws_byte_buf_init_copy_from_cursor(&host_buf, paginator->allocator, host_cur) ||
- aws_byte_buf_append_dynamic(&host_buf, &period_cur) || aws_byte_buf_append_dynamic(&host_buf, &endpoint_val)) {
- goto done;
- }
- struct aws_http_header host_header = {
- .name = aws_byte_cursor_from_c_str("host"),
- .value = aws_byte_cursor_from_buf(&host_buf),
- };
- continuation_string = s_paginator_get_continuation_token(paginator);
- struct aws_byte_cursor continuation_cursor;
- AWS_ZERO_STRUCT(continuation_cursor);
- struct aws_byte_cursor *continuation = NULL;
- if (continuation_string) {
- continuation_cursor = aws_byte_cursor_from_string(continuation_string);
- continuation = &continuation_cursor;
- }
- if (paginator->operation->next_http_message(
- continuation, paginator->operation->user_data, &paginated_request_message)) {
- goto done;
- }
- if (aws_http_message_add_header(paginated_request_message, host_header)) {
- goto done;
- }
- struct aws_s3_meta_request_options request_options = {
- .user_data = paginator,
- .signing_config = (struct aws_signing_config_aws *)signing_config,
- .type = AWS_S3_META_REQUEST_TYPE_DEFAULT,
- .body_callback = s_receive_body_callback,
- .finish_callback = s_on_request_finished,
- .message = paginated_request_message,
- };
- /* re-use the current buffer. */
- aws_byte_buf_reset(&paginator->result_body, false);
- /* we're kicking off an asynchronous request. ref-count the paginator to keep it alive until we finish. */
- aws_s3_paginator_acquire(paginator);
- struct aws_s3_meta_request *previous_request = aws_atomic_exchange_ptr(&paginator->current_request, NULL);
- if (previous_request != NULL) {
- /* release request from previous page */
- aws_s3_meta_request_release(previous_request);
- }
- struct aws_s3_meta_request *new_request = aws_s3_client_make_meta_request(paginator->client, &request_options);
- aws_atomic_store_ptr(&paginator->current_request, new_request);
- if (new_request == NULL) {
- s_set_paginator_state_if_legal(paginator, OS_INITIATED, OS_ERROR);
- goto done;
- }
- re_code = AWS_OP_SUCCESS;
- done:
- aws_http_message_release(paginated_request_message);
- aws_string_destroy(continuation_string);
- aws_byte_buf_clean_up(&host_buf);
- return re_code;
- }
|