# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import sys import time import traceback import mock from concurrent.futures import ThreadPoolExecutor from __tests__ import unittest from __tests__ import RecordingExecutor from __tests__ import TransferCoordinatorWithInterrupt from s3transfer.exceptions import CancelledError from s3transfer.exceptions import FatalError from s3transfer.exceptions import TransferNotDoneError from s3transfer.futures import TransferFuture from s3transfer.futures import TransferMeta from s3transfer.futures import TransferCoordinator from s3transfer.futures import BoundedExecutor from s3transfer.futures import ExecutorFuture from s3transfer.futures import BaseExecutor from s3transfer.futures import NonThreadedExecutor from s3transfer.futures import NonThreadedExecutorFuture from s3transfer.tasks import Task from s3transfer.utils import FunctionContainer from s3transfer.utils import TaskSemaphore from s3transfer.utils import NoResourcesAvailable def return_call_args(*args, **kwargs): return args, kwargs def raise_exception(exception): raise exception def get_exc_info(exception): try: raise_exception(exception) except: return sys.exc_info() class RecordingTransferCoordinator(TransferCoordinator): def __init__(self): self.all_transfer_futures_ever_associated = set() super(RecordingTransferCoordinator, self).__init__() def add_associated_future(self, future): self.all_transfer_futures_ever_associated.add(future) super(RecordingTransferCoordinator, self).add_associated_future(future) class ReturnFooTask(Task): def _main(self, **kwargs): return 'foo' class SleepTask(Task): def _main(self, sleep_time, **kwargs): time.sleep(sleep_time) class TestTransferFuture(unittest.TestCase): def setUp(self): self.meta = TransferMeta() self.coordinator = TransferCoordinator() self.future = self._get_transfer_future() def _get_transfer_future(self, **kwargs): components = { 'meta': self.meta, 'coordinator': self.coordinator, } for component_name, component in kwargs.items(): components[component_name] = component return TransferFuture(**components) def test_meta(self): self.assertIs(self.future.meta, self.meta) def test_done(self): self.assertFalse(self.future.done()) self.coordinator.set_result(None) self.assertTrue(self.future.done()) def test_result(self): result = 'foo' self.coordinator.set_result(result) self.coordinator.announce_done() self.assertEqual(self.future.result(), result) def test_keyboard_interrupt_on_result_does_not_block(self): # This should raise a KeyboardInterrupt when result is called on it. self.coordinator = TransferCoordinatorWithInterrupt() self.future = self._get_transfer_future() # result() should not block and immediately raise the keyboard # interrupt exception. with self.assertRaises(KeyboardInterrupt): self.future.result() def test_cancel(self): self.future.cancel() self.assertTrue(self.future.done()) self.assertEqual(self.coordinator.status, 'cancelled') def test_set_exception(self): # Set the result such that there is no exception self.coordinator.set_result('result') self.coordinator.announce_done() self.assertEqual(self.future.result(), 'result') self.future.set_exception(ValueError()) with self.assertRaises(ValueError): self.future.result() def test_set_exception_only_after_done(self): with self.assertRaises(TransferNotDoneError): self.future.set_exception(ValueError()) self.coordinator.set_result('result') self.coordinator.announce_done() self.future.set_exception(ValueError()) with self.assertRaises(ValueError): self.future.result() class TestTransferMeta(unittest.TestCase): def setUp(self): self.transfer_meta = TransferMeta() def test_size(self): self.assertEqual(self.transfer_meta.size, None) self.transfer_meta.provide_transfer_size(5) self.assertEqual(self.transfer_meta.size, 5) def test_call_args(self): call_args = object() transfer_meta = TransferMeta(call_args) # Assert the that call args provided is the same as is returned self.assertIs(transfer_meta.call_args, call_args) def test_transfer_id(self): transfer_meta = TransferMeta(transfer_id=1) self.assertEqual(transfer_meta.transfer_id, 1) def test_user_context(self): self.transfer_meta.user_context['foo'] = 'bar' self.assertEqual(self.transfer_meta.user_context, {'foo': 'bar'}) class TestTransferCoordinator(unittest.TestCase): def setUp(self): self.transfer_coordinator = TransferCoordinator() def test_transfer_id(self): transfer_coordinator = TransferCoordinator(transfer_id=1) self.assertEqual(transfer_coordinator.transfer_id, 1) def test_repr(self): transfer_coordinator = TransferCoordinator(transfer_id=1) self.assertEqual( repr(transfer_coordinator), 'TransferCoordinator(transfer_id=1)') def test_initial_status(self): # A TransferCoordinator with no progress should have the status # of not-started self.assertEqual(self.transfer_coordinator.status, 'not-started') def test_set_status_to_queued(self): self.transfer_coordinator.set_status_to_queued() self.assertEqual(self.transfer_coordinator.status, 'queued') def test_cannot_set_status_to_queued_from_done_state(self): self.transfer_coordinator.set_exception(RuntimeError) with self.assertRaises(RuntimeError): self.transfer_coordinator.set_status_to_queued() def test_status_running(self): self.transfer_coordinator.set_status_to_running() self.assertEqual(self.transfer_coordinator.status, 'running') def test_cannot_set_status_to_running_from_done_state(self): self.transfer_coordinator.set_exception(RuntimeError) with self.assertRaises(RuntimeError): self.transfer_coordinator.set_status_to_running() def test_set_result(self): success_result = 'foo' self.transfer_coordinator.set_result(success_result) self.transfer_coordinator.announce_done() # Setting result should result in a success state and the return value # that was set. self.assertEqual(self.transfer_coordinator.status, 'success') self.assertEqual(self.transfer_coordinator.result(), success_result) def test_set_exception(self): exception_result = RuntimeError self.transfer_coordinator.set_exception(exception_result) self.transfer_coordinator.announce_done() # Setting an exception should result in a failed state and the return # value should be the rasied exception self.assertEqual(self.transfer_coordinator.status, 'failed') self.assertEqual(self.transfer_coordinator.exception, exception_result) with self.assertRaises(exception_result): self.transfer_coordinator.result() def test_exception_cannot_override_done_state(self): self.transfer_coordinator.set_result('foo') self.transfer_coordinator.set_exception(RuntimeError) # It status should be success even after the exception is set because # success is a done state. self.assertEqual(self.transfer_coordinator.status, 'success') def test_exception_can_override_done_state_with_override_flag(self): self.transfer_coordinator.set_result('foo') self.transfer_coordinator.set_exception(RuntimeError, override=True) self.assertEqual(self.transfer_coordinator.status, 'failed') def test_cancel(self): self.assertEqual(self.transfer_coordinator.status, 'not-started') self.transfer_coordinator.cancel() # This should set the state to cancelled and raise the CancelledError # exception and should have also set the done event so that result() # is no longer set. self.assertEqual(self.transfer_coordinator.status, 'cancelled') with self.assertRaises(CancelledError): self.transfer_coordinator.result() def test_cancel_can_run_done_callbacks_that_uses_result(self): exceptions = [] def capture_exception(transfer_coordinator, captured_exceptions): try: transfer_coordinator.result() except Exception as e: captured_exceptions.append(e) self.assertEqual(self.transfer_coordinator.status, 'not-started') self.transfer_coordinator.add_done_callback( capture_exception, self.transfer_coordinator, exceptions) self.transfer_coordinator.cancel() self.assertEqual(len(exceptions), 1) self.assertIsInstance(exceptions[0], CancelledError) def test_cancel_with_message(self): message = 'my message' self.transfer_coordinator.cancel(message) self.transfer_coordinator.announce_done() with self.assertRaisesRegexp(CancelledError, message): self.transfer_coordinator.result() def test_cancel_with_provided_exception(self): message = 'my message' self.transfer_coordinator.cancel(message, exc_type=FatalError) self.transfer_coordinator.announce_done() with self.assertRaisesRegexp(FatalError, message): self.transfer_coordinator.result() def test_cancel_cannot_override_done_state(self): self.transfer_coordinator.set_result('foo') self.transfer_coordinator.cancel() # It status should be success even after cancel is called because # succes is a done state. self.assertEqual(self.transfer_coordinator.status, 'success') def test_set_result_can_override_cancel(self): self.transfer_coordinator.cancel() # Result setting should override any cancel or set exception as this # is always invoked by the final task. self.transfer_coordinator.set_result('foo') self.transfer_coordinator.announce_done() self.assertEqual(self.transfer_coordinator.status, 'success') def test_submit(self): # Submit a callable to the transfer coordinator. It should submit it # to the executor. executor = RecordingExecutor( BoundedExecutor(1, 1, {'my-tag': TaskSemaphore(1)})) task = ReturnFooTask(self.transfer_coordinator) future = self.transfer_coordinator.submit(executor, task, tag='my-tag') executor.shutdown() # Make sure the future got submit and executed as well by checking its # result value which should include the provided future tag. self.assertEqual( executor.submissions, [{'block': True, 'tag': 'my-tag', 'task': task}] ) self.assertEqual(future.result(), 'foo') def test_association_and_disassociation_on_submit(self): self.transfer_coordinator = RecordingTransferCoordinator() # Submit a callable to the transfer coordinator. executor = BoundedExecutor(1, 1) task = ReturnFooTask(self.transfer_coordinator) future = self.transfer_coordinator.submit(executor, task) executor.shutdown() # Make sure the future that got submitted was associated to the # transfer future at some point. self.assertEqual( self.transfer_coordinator.all_transfer_futures_ever_associated, set([future]) ) # Make sure the future got disassociated once the future is now done # by looking at the currently associated futures. self.assertEqual( self.transfer_coordinator.associated_futures, set([])) def test_done(self): # These should result in not done state: # queued self.assertFalse(self.transfer_coordinator.done()) # running self.transfer_coordinator.set_status_to_running() self.assertFalse(self.transfer_coordinator.done()) # These should result in done state: # failed self.transfer_coordinator.set_exception(Exception) self.assertTrue(self.transfer_coordinator.done()) # success self.transfer_coordinator.set_result('foo') self.assertTrue(self.transfer_coordinator.done()) # cancelled self.transfer_coordinator.cancel() self.assertTrue(self.transfer_coordinator.done()) def test_result_waits_until_done(self): execution_order = [] def sleep_then_set_result(transfer_coordinator, execution_order): time.sleep(0.05) execution_order.append('setting_result') transfer_coordinator.set_result(None) self.transfer_coordinator.announce_done() with ThreadPoolExecutor(max_workers=1) as executor: executor.submit( sleep_then_set_result, self.transfer_coordinator, execution_order) self.transfer_coordinator.result() execution_order.append('after_result') # The result() call should have waited until the other thread set # the result after sleeping for 0.05 seconds. self.assertTrue(execution_order, ['setting_result', 'after_result']) def test_failure_cleanups(self): args = (1, 2) kwargs = {'foo': 'bar'} second_args = (2, 4) second_kwargs = {'biz': 'baz'} self.transfer_coordinator.add_failure_cleanup( return_call_args, *args, **kwargs) self.transfer_coordinator.add_failure_cleanup( return_call_args, *second_args, **second_kwargs) # Ensure the callbacks got added. self.assertEqual(len(self.transfer_coordinator.failure_cleanups), 2) result_list = [] # Ensure they will get called in the correct order. for cleanup in self.transfer_coordinator.failure_cleanups: result_list.append(cleanup()) self.assertEqual( result_list, [(args, kwargs), (second_args, second_kwargs)]) def test_associated_futures(self): first_future = object() # Associate one future to the transfer self.transfer_coordinator.add_associated_future(first_future) associated_futures = self.transfer_coordinator.associated_futures # The first future should be in the returned list of futures. self.assertEqual(associated_futures, set([first_future])) second_future = object() # Associate another future to the transfer. self.transfer_coordinator.add_associated_future(second_future) # The association should not have mutated the returned list from # before. self.assertEqual(associated_futures, set([first_future])) # Both futures should be in the returned list. self.assertEqual( self.transfer_coordinator.associated_futures, set([first_future, second_future])) def test_done_callbacks_on_done(self): done_callback_invocations = [] callback = FunctionContainer( done_callback_invocations.append, 'done callback called') # Add the done callback to the transfer. self.transfer_coordinator.add_done_callback(callback) # Announce that the transfer is done. This should invoke the done # callback. self.transfer_coordinator.announce_done() self.assertEqual(done_callback_invocations, ['done callback called']) # If done is announced again, we should not invoke the callback again # because done has already been announced and thus the callback has # been ran as well. self.transfer_coordinator.announce_done() self.assertEqual(done_callback_invocations, ['done callback called']) def test_failure_cleanups_on_done(self): cleanup_invocations = [] callback = FunctionContainer( cleanup_invocations.append, 'cleanup called') # Add the failure cleanup to the transfer. self.transfer_coordinator.add_failure_cleanup(callback) # Announce that the transfer is done. This should invoke the failure # cleanup. self.transfer_coordinator.announce_done() self.assertEqual(cleanup_invocations, ['cleanup called']) # If done is announced again, we should not invoke the cleanup again # because done has already been announced and thus the cleanup has # been ran as well. self.transfer_coordinator.announce_done() self.assertEqual(cleanup_invocations, ['cleanup called']) class TestBoundedExecutor(unittest.TestCase): def setUp(self): self.coordinator = TransferCoordinator() self.tag_semaphores = {} self.executor = self.get_executor() def get_executor(self, max_size=1, max_num_threads=1): return BoundedExecutor(max_size, max_num_threads, self.tag_semaphores) def get_task(self, task_cls, main_kwargs=None): return task_cls(self.coordinator, main_kwargs=main_kwargs) def get_sleep_task(self, sleep_time=0.01): return self.get_task(SleepTask, main_kwargs={'sleep_time': sleep_time}) def add_semaphore(self, task_tag, count): self.tag_semaphores[task_tag] = TaskSemaphore(count) def assert_submit_would_block(self, task, tag=None): with self.assertRaises(NoResourcesAvailable): self.executor.submit(task, tag=tag, block=False) def assert_submit_would_not_block(self, task, tag=None, **kwargs): try: self.executor.submit(task, tag=tag, block=False) except NoResourcesAvailable: self.fail( 'Task %s should not have been blocked. Caused by:\n%s' % ( task, traceback.format_exc() ) ) def add_done_callback_to_future(self, future, fn, *args, **kwargs): callback_for_future = FunctionContainer(fn, *args, **kwargs) future.add_done_callback(callback_for_future) def test_submit_single_task(self): # Ensure we can submit a task to the executor task = self.get_task(ReturnFooTask) future = self.executor.submit(task) # Ensure what we get back is a Future self.assertIsInstance(future, ExecutorFuture) # Ensure the callable got executed. self.assertEqual(future.result(), 'foo') def test_executor_blocks_on_full_capacity(self): first_task = self.get_sleep_task() second_task = self.get_sleep_task() self.executor.submit(first_task) # The first task should be sleeping for a substantial period of # time such that on the submission of the second task, it will # raise an error saying that it cannot be submitted as the max # capacity of the semaphore is one. self.assert_submit_would_block(second_task) def test_executor_clears_capacity_on_done_tasks(self): first_task = self.get_sleep_task() second_task = self.get_task(ReturnFooTask) # Submit a task. future = self.executor.submit(first_task) # Submit a new task when the first task finishes. This should not get # blocked because the first task should have finished clearing up # capacity. self.add_done_callback_to_future( future, self.assert_submit_would_not_block, second_task) # Wait for it to complete. self.executor.shutdown() def test_would_not_block_when_full_capacity_in_other_semaphore(self): first_task = self.get_sleep_task() # Now let's create a new task with a tag and so it uses different # semaphore. task_tag = 'other' other_task = self.get_sleep_task() self.add_semaphore(task_tag, 1) # Submit the normal first task self.executor.submit(first_task) # Even though The first task should be sleeping for a substantial # period of time, the submission of the second task should not # raise an error because it should use a different semaphore self.assert_submit_would_not_block(other_task, task_tag) # Another submission of the other task though should raise # an exception as the capacity is equal to one for that tag. self.assert_submit_would_block(other_task, task_tag) def test_shutdown(self): slow_task = self.get_sleep_task() future = self.executor.submit(slow_task) self.executor.shutdown() # Ensure that the shutdown waits until the task is done self.assertTrue(future.done()) def test_shutdown_no_wait(self): slow_task = self.get_sleep_task() future = self.executor.submit(slow_task) self.executor.shutdown(False) # Ensure that the shutdown returns immediately even if the task is # not done, which it should not be because it it slow. self.assertFalse(future.done()) def test_replace_underlying_executor(self): mocked_executor_cls = mock.Mock(BaseExecutor) executor = BoundedExecutor(10, 1, {}, mocked_executor_cls) executor.submit(self.get_task(ReturnFooTask)) self.assertTrue(mocked_executor_cls.return_value.submit.called) class TestExecutorFuture(unittest.TestCase): def test_result(self): with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(return_call_args, 'foo', biz='baz') wrapped_future = ExecutorFuture(future) self.assertEqual(wrapped_future.result(), (('foo',), {'biz': 'baz'})) def test_done(self): with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(return_call_args, 'foo', biz='baz') wrapped_future = ExecutorFuture(future) self.assertTrue(wrapped_future.done()) def test_add_done_callback(self): done_callbacks = [] with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(return_call_args, 'foo', biz='baz') wrapped_future = ExecutorFuture(future) wrapped_future.add_done_callback( FunctionContainer(done_callbacks.append, 'called')) self.assertEqual(done_callbacks, ['called']) class TestNonThreadedExecutor(unittest.TestCase): def test_submit(self): executor = NonThreadedExecutor() future = executor.submit(return_call_args, 1, 2, foo='bar') self.assertIsInstance(future, NonThreadedExecutorFuture) self.assertEqual(future.result(), ((1, 2), {'foo': 'bar'})) def test_submit_with_exception(self): executor = NonThreadedExecutor() future = executor.submit(raise_exception, RuntimeError()) self.assertIsInstance(future, NonThreadedExecutorFuture) with self.assertRaises(RuntimeError): future.result() def test_submit_with_exception_and_captures_info(self): exception = ValueError('message') tb = get_exc_info(exception)[2] future = NonThreadedExecutor().submit(raise_exception, exception) try: future.result() # An exception should have been raised self.fail('Future should have raised a ValueError') except ValueError: actual_tb = sys.exc_info()[2] last_frame = traceback.extract_tb(actual_tb)[-1] last_expected_frame = traceback.extract_tb(tb)[-1] self.assertEqual(last_frame, last_expected_frame) class TestNonThreadedExecutorFuture(unittest.TestCase): def setUp(self): self.future = NonThreadedExecutorFuture() def test_done_starts_false(self): self.assertFalse(self.future.done()) def test_done_after_setting_result(self): self.future.set_result('result') self.assertTrue(self.future.done()) def test_done_after_setting_exception(self): self.future.set_exception_info(Exception(), None) self.assertTrue(self.future.done()) def test_result(self): self.future.set_result('result') self.assertEqual(self.future.result(), 'result') def test_exception_result(self): exception = ValueError('message') self.future.set_exception_info(exception, None) with self.assertRaisesRegexp(ValueError, 'message'): self.future.result() def test_exception_result_doesnt_modify_last_frame(self): exception = ValueError('message') tb = get_exc_info(exception)[2] self.future.set_exception_info(exception, tb) try: self.future.result() # An exception should have been raised self.fail() except ValueError: actual_tb = sys.exc_info()[2] last_frame = traceback.extract_tb(actual_tb)[-1] last_expected_frame = traceback.extract_tb(tb)[-1] self.assertEqual(last_frame, last_expected_frame) def test_done_callback(self): done_futures = [] self.future.add_done_callback(done_futures.append) self.assertEqual(done_futures, []) self.future.set_result('result') self.assertEqual(done_futures, [self.future]) def test_done_callback_after_done(self): self.future.set_result('result') done_futures = [] self.future.add_done_callback(done_futures.append) self.assertEqual(done_futures, [self.future])