test_crt.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. # Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import io
  14. import pytest
  15. from botocore.credentials import Credentials, ReadOnlyCredentials
  16. from botocore.exceptions import ClientError, NoCredentialsError
  17. from botocore.session import Session
  18. from s3transfer.exceptions import TransferNotDoneError
  19. from s3transfer.utils import CallArgs
  20. from __tests__ import HAS_CRT, FileCreator, mock, requires_crt, unittest
  21. if HAS_CRT:
  22. import awscrt.auth
  23. import awscrt.s3
  24. import s3transfer.crt
  25. requires_crt_pytest = pytest.mark.skipif(
  26. not HAS_CRT, reason="Test requires awscrt to be installed."
  27. )
  28. @pytest.fixture
  29. def mock_crt_process_lock(monkeypatch):
  30. # The process lock is cached at the module layer whenever the
  31. # cross process lock is successfully acquired. This patch ensures that
  32. # test cases will start off with no previously cached process lock and
  33. # if a cross process is instantiated/acquired it will be the mock that
  34. # can be used for controlling lock behavior.
  35. monkeypatch.setattr('s3transfer.crt.CRT_S3_PROCESS_LOCK', None)
  36. with mock.patch('awscrt.s3.CrossProcessLock', spec=True) as mock_lock:
  37. yield mock_lock
  38. @pytest.fixture
  39. def mock_s3_crt_client():
  40. with mock.patch('s3transfer.crt.S3Client', spec=True) as mock_client:
  41. yield mock_client
  42. @pytest.fixture
  43. def mock_get_recommended_throughput_target_gbps():
  44. with mock.patch(
  45. 's3transfer.crt.get_recommended_throughput_target_gbps'
  46. ) as mock_get_target_gbps:
  47. yield mock_get_target_gbps
  48. class CustomFutureException(Exception):
  49. pass
  50. @requires_crt_pytest
  51. class TestCRTProcessLock:
  52. def test_acquire_crt_s3_process_lock(self, mock_crt_process_lock):
  53. lock = s3transfer.crt.acquire_crt_s3_process_lock('app-name')
  54. assert lock is s3transfer.crt.CRT_S3_PROCESS_LOCK
  55. assert lock is mock_crt_process_lock.return_value
  56. mock_crt_process_lock.assert_called_once_with('app-name')
  57. mock_crt_process_lock.return_value.acquire.assert_called_once_with()
  58. def test_unable_to_acquire_lock_returns_none(self, mock_crt_process_lock):
  59. mock_crt_process_lock.return_value.acquire.side_effect = RuntimeError
  60. assert s3transfer.crt.acquire_crt_s3_process_lock('app-name') is None
  61. assert s3transfer.crt.CRT_S3_PROCESS_LOCK is None
  62. mock_crt_process_lock.assert_called_once_with('app-name')
  63. mock_crt_process_lock.return_value.acquire.assert_called_once_with()
  64. def test_multiple_acquires_return_same_lock(self, mock_crt_process_lock):
  65. lock = s3transfer.crt.acquire_crt_s3_process_lock('app-name')
  66. assert s3transfer.crt.acquire_crt_s3_process_lock('app-name') is lock
  67. assert lock is s3transfer.crt.CRT_S3_PROCESS_LOCK
  68. # The process lock should have only been instantiated and acquired once
  69. mock_crt_process_lock.assert_called_once_with('app-name')
  70. mock_crt_process_lock.return_value.acquire.assert_called_once_with()
  71. @requires_crt
  72. class TestBotocoreCRTRequestSerializer(unittest.TestCase):
  73. def setUp(self):
  74. self.region = 'us-west-2'
  75. self.session = Session()
  76. self.session.set_config_variable('region', self.region)
  77. self.request_serializer = s3transfer.crt.BotocoreCRTRequestSerializer(
  78. self.session
  79. )
  80. self.bucket = "test_bucket"
  81. self.key = "test_key"
  82. self.files = FileCreator()
  83. self.filename = self.files.create_file('myfile', 'my content')
  84. self.expected_path = "/" + self.bucket + "/" + self.key
  85. self.expected_host = "s3.%s.amazonaws.com" % (self.region)
  86. def tearDown(self):
  87. self.files.remove_all()
  88. def test_upload_request(self):
  89. callargs = CallArgs(
  90. bucket=self.bucket,
  91. key=self.key,
  92. fileobj=self.filename,
  93. extra_args={},
  94. subscribers=[],
  95. )
  96. coordinator = s3transfer.crt.CRTTransferCoordinator()
  97. future = s3transfer.crt.CRTTransferFuture(
  98. s3transfer.crt.CRTTransferMeta(call_args=callargs), coordinator
  99. )
  100. crt_request = self.request_serializer.serialize_http_request(
  101. "put_object", future
  102. )
  103. self.assertEqual("PUT", crt_request.method)
  104. self.assertEqual(self.expected_path, crt_request.path)
  105. self.assertEqual(self.expected_host, crt_request.headers.get("host"))
  106. self.assertIsNone(crt_request.headers.get("Authorization"))
  107. def test_download_request(self):
  108. callargs = CallArgs(
  109. bucket=self.bucket,
  110. key=self.key,
  111. fileobj=self.filename,
  112. extra_args={},
  113. subscribers=[],
  114. )
  115. coordinator = s3transfer.crt.CRTTransferCoordinator()
  116. future = s3transfer.crt.CRTTransferFuture(
  117. s3transfer.crt.CRTTransferMeta(call_args=callargs), coordinator
  118. )
  119. crt_request = self.request_serializer.serialize_http_request(
  120. "get_object", future
  121. )
  122. self.assertEqual("GET", crt_request.method)
  123. self.assertEqual(self.expected_path, crt_request.path)
  124. self.assertEqual(self.expected_host, crt_request.headers.get("host"))
  125. self.assertIsNone(crt_request.headers.get("Authorization"))
  126. def test_delete_request(self):
  127. callargs = CallArgs(
  128. bucket=self.bucket, key=self.key, extra_args={}, subscribers=[]
  129. )
  130. coordinator = s3transfer.crt.CRTTransferCoordinator()
  131. future = s3transfer.crt.CRTTransferFuture(
  132. s3transfer.crt.CRTTransferMeta(call_args=callargs), coordinator
  133. )
  134. crt_request = self.request_serializer.serialize_http_request(
  135. "delete_object", future
  136. )
  137. self.assertEqual("DELETE", crt_request.method)
  138. self.assertEqual(self.expected_path, crt_request.path)
  139. self.assertEqual(self.expected_host, crt_request.headers.get("host"))
  140. self.assertIsNone(crt_request.headers.get("Authorization"))
  141. def _create_crt_response_error(
  142. self, status_code, body, operation_name=None
  143. ):
  144. return awscrt.s3.S3ResponseError(
  145. code=14343,
  146. name='AWS_ERROR_S3_INVALID_RESPONSE_STATUS',
  147. message='Invalid response status from request',
  148. status_code=status_code,
  149. headers=[
  150. ('x-amz-request-id', 'QSJHJJZR2EDYD4GQ'),
  151. (
  152. 'x-amz-id-2',
  153. 'xDbgdKdvYZTjgpOTzm7yNP2JPrOQl+eaQvUkFdOjdJoWkIC643fgHxdsHpUKvVAfjKf5F6otEYA=',
  154. ),
  155. ('Content-Type', 'application/xml'),
  156. ('Transfer-Encoding', 'chunked'),
  157. ('Date', 'Fri, 10 Nov 2023 23:22:47 GMT'),
  158. ('Server', 'AmazonS3'),
  159. ],
  160. body=body,
  161. operation_name=operation_name,
  162. )
  163. def test_translate_get_object_404(self):
  164. body = (
  165. b'<?xml version="1.0" encoding="UTF-8"?>\n<Error>'
  166. b'<Code>NoSuchKey</Code>'
  167. b'<Message>The specified key does not exist.</Message>'
  168. b'<Key>obviously-no-such-key.txt</Key>'
  169. b'<RequestId>SBJ7ZQY03N1WDW9T</RequestId>'
  170. b'<HostId>SomeHostId</HostId></Error>'
  171. )
  172. crt_exc = self._create_crt_response_error(404, body, 'GetObject')
  173. boto_err = self.request_serializer.translate_crt_exception(crt_exc)
  174. self.assertIsInstance(
  175. boto_err, self.session.create_client('s3').exceptions.NoSuchKey
  176. )
  177. def test_translate_head_object_404(self):
  178. # There's no body in a HEAD response, so we can't map it to a modeled S3 exception.
  179. # But it should still map to a botocore ClientError
  180. body = None
  181. crt_exc = self._create_crt_response_error(
  182. 404, body, operation_name='HeadObject'
  183. )
  184. boto_err = self.request_serializer.translate_crt_exception(crt_exc)
  185. self.assertIsInstance(boto_err, ClientError)
  186. def test_translate_unknown_operation_404(self):
  187. body = None
  188. crt_exc = self._create_crt_response_error(404, body)
  189. boto_err = self.request_serializer.translate_crt_exception(crt_exc)
  190. self.assertIsInstance(boto_err, ClientError)
  191. @requires_crt_pytest
  192. class TestBotocoreCRTCredentialsWrapper:
  193. @pytest.fixture
  194. def botocore_credentials(self):
  195. return Credentials(
  196. access_key='access_key', secret_key='secret_key', token='token'
  197. )
  198. def assert_crt_credentials(
  199. self,
  200. crt_credentials,
  201. expected_access_key='access_key',
  202. expected_secret_key='secret_key',
  203. expected_token='token',
  204. ):
  205. assert crt_credentials.access_key_id == expected_access_key
  206. assert crt_credentials.secret_access_key == expected_secret_key
  207. assert crt_credentials.session_token == expected_token
  208. def test_fetch_crt_credentials_successfully(self, botocore_credentials):
  209. wrapper = s3transfer.crt.BotocoreCRTCredentialsWrapper(
  210. botocore_credentials
  211. )
  212. crt_credentials = wrapper()
  213. self.assert_crt_credentials(crt_credentials)
  214. def test_wrapper_does_not_cache_frozen_credentials(self):
  215. mock_credentials = mock.Mock(Credentials)
  216. mock_credentials.get_frozen_credentials.side_effect = [
  217. ReadOnlyCredentials('access_key_1', 'secret_key_1', 'token_1'),
  218. ReadOnlyCredentials('access_key_2', 'secret_key_2', 'token_2'),
  219. ]
  220. wrapper = s3transfer.crt.BotocoreCRTCredentialsWrapper(
  221. mock_credentials
  222. )
  223. crt_credentials_1 = wrapper()
  224. self.assert_crt_credentials(
  225. crt_credentials_1,
  226. expected_access_key='access_key_1',
  227. expected_secret_key='secret_key_1',
  228. expected_token='token_1',
  229. )
  230. crt_credentials_2 = wrapper()
  231. self.assert_crt_credentials(
  232. crt_credentials_2,
  233. expected_access_key='access_key_2',
  234. expected_secret_key='secret_key_2',
  235. expected_token='token_2',
  236. )
  237. assert mock_credentials.get_frozen_credentials.call_count == 2
  238. def test_raises_error_when_resolved_credentials_is_none(self):
  239. wrapper = s3transfer.crt.BotocoreCRTCredentialsWrapper(None)
  240. with pytest.raises(NoCredentialsError):
  241. wrapper()
  242. def test_to_crt_credentials_provider(self, botocore_credentials):
  243. wrapper = s3transfer.crt.BotocoreCRTCredentialsWrapper(
  244. botocore_credentials
  245. )
  246. crt_credentials_provider = wrapper.to_crt_credentials_provider()
  247. assert isinstance(
  248. crt_credentials_provider, awscrt.auth.AwsCredentialsProvider
  249. )
  250. get_credentials_future = crt_credentials_provider.get_credentials()
  251. crt_credentials = get_credentials_future.result()
  252. self.assert_crt_credentials(crt_credentials)
  253. @requires_crt
  254. class TestCRTTransferFuture(unittest.TestCase):
  255. def setUp(self):
  256. self.mock_s3_request = mock.Mock(awscrt.s3.S3RequestType)
  257. self.mock_crt_future = mock.Mock(awscrt.s3.Future)
  258. self.mock_s3_request.finished_future = self.mock_crt_future
  259. self.coordinator = s3transfer.crt.CRTTransferCoordinator()
  260. self.coordinator.set_s3_request(self.mock_s3_request)
  261. self.future = s3transfer.crt.CRTTransferFuture(
  262. coordinator=self.coordinator
  263. )
  264. def test_set_exception(self):
  265. self.future.set_exception(CustomFutureException())
  266. with self.assertRaises(CustomFutureException):
  267. self.future.result()
  268. def test_set_exception_raises_error_when_not_done(self):
  269. self.mock_crt_future.done.return_value = False
  270. with self.assertRaises(TransferNotDoneError):
  271. self.future.set_exception(CustomFutureException())
  272. def test_set_exception_can_override_previous_exception(self):
  273. self.future.set_exception(Exception())
  274. self.future.set_exception(CustomFutureException())
  275. with self.assertRaises(CustomFutureException):
  276. self.future.result()
  277. @requires_crt
  278. class TestOnBodyFileObjWriter(unittest.TestCase):
  279. def test_call(self):
  280. fileobj = io.BytesIO()
  281. writer = s3transfer.crt.OnBodyFileObjWriter(fileobj)
  282. writer(chunk=b'content')
  283. self.assertEqual(fileobj.getvalue(), b'content')
  284. @requires_crt_pytest
  285. class TestCreateS3CRTClient:
  286. @pytest.mark.parametrize(
  287. 'provided_bytes_per_sec,recommended_gbps,expected_gbps',
  288. [
  289. (None, 100.0, 100.0),
  290. (None, None, 10.0),
  291. # NOTE: create_s3_crt_client() accepts target throughput as bytes
  292. # per second and it is converted to gigabits per second for the
  293. # CRT client instantiation.
  294. (1_000_000_000, None, 8.0),
  295. (1_000_000_000, 100.0, 8.0),
  296. ],
  297. )
  298. def test_target_throughput(
  299. self,
  300. provided_bytes_per_sec,
  301. recommended_gbps,
  302. expected_gbps,
  303. mock_s3_crt_client,
  304. mock_get_recommended_throughput_target_gbps,
  305. ):
  306. mock_get_recommended_throughput_target_gbps.return_value = (
  307. recommended_gbps
  308. )
  309. s3transfer.crt.create_s3_crt_client(
  310. 'us-west-2',
  311. target_throughput=provided_bytes_per_sec,
  312. )
  313. assert (
  314. mock_s3_crt_client.call_args[1]['throughput_target_gbps']
  315. == expected_gbps
  316. )