test_manager.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the 'License'). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the 'license' file accompanying this file. This file is
  10. # distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. from io import BytesIO
  14. from botocore.awsrequest import create_request_object
  15. import mock
  16. from __tests__ import skip_if_using_serial_implementation
  17. from __tests__ import StubbedClientTest
  18. from s3transfer.exceptions import CancelledError
  19. from s3transfer.exceptions import FatalError
  20. from s3transfer.futures import BaseExecutor
  21. from s3transfer.manager import TransferManager
  22. from s3transfer.manager import TransferConfig
  23. class ArbitraryException(Exception):
  24. pass
  25. class SignalTransferringBody(BytesIO):
  26. """A mocked body with the ability to signal when transfers occur"""
  27. def __init__(self):
  28. super(SignalTransferringBody, self).__init__()
  29. self.signal_transferring_call_count = 0
  30. self.signal_not_transferring_call_count = 0
  31. def signal_transferring(self):
  32. self.signal_transferring_call_count += 1
  33. def signal_not_transferring(self):
  34. self.signal_not_transferring_call_count += 1
  35. def seek(self, where, whence=0):
  36. pass
  37. def tell(self):
  38. return 0
  39. def read(self, amount=0):
  40. return b''
  41. class TestTransferManager(StubbedClientTest):
  42. @skip_if_using_serial_implementation(
  43. 'Exception is thrown once all transfers are submitted. '
  44. 'However for the serial implementation, transfers are performed '
  45. 'in main thread meaning all transfers will complete before the '
  46. 'exception being thrown.'
  47. )
  48. def test_error_in_context_manager_cancels_incomplete_transfers(self):
  49. # The purpose of this test is to make sure if an error is raised
  50. # in the body of the context manager, incomplete transfers will
  51. # be cancelled with value of the exception wrapped by a CancelledError
  52. # NOTE: The fact that delete() was chosen to test this is arbitrary
  53. # other than it is the easiet to set up for the stubber.
  54. # The specific operation is not important to the purpose of this test.
  55. num_transfers = 100
  56. futures = []
  57. ref_exception_msg = 'arbitrary exception'
  58. for _ in range(num_transfers):
  59. self.stubber.add_response('delete_object', {})
  60. manager = TransferManager(
  61. self.client,
  62. TransferConfig(
  63. max_request_concurrency=1, max_submission_concurrency=1)
  64. )
  65. try:
  66. with manager:
  67. for i in range(num_transfers):
  68. futures.append(manager.delete('mybucket', 'mykey'))
  69. raise ArbitraryException(ref_exception_msg)
  70. except ArbitraryException:
  71. # At least one of the submitted futures should have been
  72. # cancelled.
  73. with self.assertRaisesRegexp(FatalError, ref_exception_msg):
  74. for future in futures:
  75. future.result()
  76. @skip_if_using_serial_implementation(
  77. 'Exception is thrown once all transfers are submitted. '
  78. 'However for the serial implementation, transfers are performed '
  79. 'in main thread meaning all transfers will complete before the '
  80. 'exception being thrown.'
  81. )
  82. def test_cntrl_c_in_context_manager_cancels_incomplete_transfers(self):
  83. # The purpose of this test is to make sure if an error is raised
  84. # in the body of the context manager, incomplete transfers will
  85. # be cancelled with value of the exception wrapped by a CancelledError
  86. # NOTE: The fact that delete() was chosen to test this is arbitrary
  87. # other than it is the easiet to set up for the stubber.
  88. # The specific operation is not important to the purpose of this test.
  89. num_transfers = 100
  90. futures = []
  91. for _ in range(num_transfers):
  92. self.stubber.add_response('delete_object', {})
  93. manager = TransferManager(
  94. self.client,
  95. TransferConfig(
  96. max_request_concurrency=1, max_submission_concurrency=1)
  97. )
  98. try:
  99. with manager:
  100. for i in range(num_transfers):
  101. futures.append(manager.delete('mybucket', 'mykey'))
  102. raise KeyboardInterrupt()
  103. except KeyboardInterrupt:
  104. # At least one of the submitted futures should have been
  105. # cancelled.
  106. with self.assertRaisesRegexp(
  107. CancelledError, 'KeyboardInterrupt()'):
  108. for future in futures:
  109. future.result()
  110. def test_enable_disable_callbacks_only_ever_registered_once(self):
  111. body = SignalTransferringBody()
  112. request = create_request_object({
  113. 'method': 'PUT',
  114. 'url': 'https://s3.amazonaws.com',
  115. 'body': body,
  116. 'headers': {},
  117. 'context': {}
  118. })
  119. # Create two TransferManager's using the same client
  120. TransferManager(self.client)
  121. TransferManager(self.client)
  122. self.client.meta.events.emit(
  123. 'request-created.s3', request=request, operation_name='PutObject')
  124. # The client should have only have the enable/disable callback
  125. # handlers registered once depite being used for two different
  126. # TransferManagers.
  127. self.assertEqual(
  128. body.signal_transferring_call_count, 1,
  129. 'The enable_callback() should have only ever been registered once')
  130. self.assertEqual(
  131. body.signal_not_transferring_call_count, 1,
  132. 'The disable_callback() should have only ever been registered '
  133. 'once')
  134. def test_use_custom_executor_implementation(self):
  135. mocked_executor_cls = mock.Mock(BaseExecutor)
  136. transfer_manager = TransferManager(
  137. self.client, executor_cls=mocked_executor_cls)
  138. transfer_manager.delete('bucket', 'key')
  139. self.assertTrue(mocked_executor_cls.return_value.submit.called)
  140. def test_unicode_exception_in_context_manager(self):
  141. with self.assertRaises(ArbitraryException):
  142. with TransferManager(self.client):
  143. raise ArbitraryException(u'\u2713')
  144. def test_client_property(self):
  145. manager = TransferManager(self.client)
  146. self.assertIs(manager.client, self.client)
  147. def test_config_property(self):
  148. config = TransferConfig()
  149. manager = TransferManager(self.client, config)
  150. self.assertIs(manager.config, config)
  151. def test_can_disable_bucket_validation(self):
  152. s3_object_lambda_arn = (
  153. 'arn:aws:s3-object-lambda:us-west-2:123456789012:'
  154. 'accesspoint:my-accesspoint'
  155. )
  156. config = TransferConfig()
  157. manager = TransferManager(self.client, config)
  158. manager.VALIDATE_SUPPORTED_BUCKET_VALUES = False
  159. manager.delete(s3_object_lambda_arn, 'my-key')