test_copy.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. from botocore.exceptions import ClientError
  14. from botocore.stub import Stubber
  15. from __tests__ import BaseGeneralInterfaceTest
  16. from __tests__ import FileSizeProvider
  17. from s3transfer.manager import TransferManager
  18. from s3transfer.manager import TransferConfig
  19. from s3transfer.utils import MIN_UPLOAD_CHUNKSIZE
  20. class BaseCopyTest(BaseGeneralInterfaceTest):
  21. def setUp(self):
  22. super(BaseCopyTest, self).setUp()
  23. self.config = TransferConfig(
  24. max_request_concurrency=1,
  25. multipart_chunksize=MIN_UPLOAD_CHUNKSIZE,
  26. multipart_threshold=MIN_UPLOAD_CHUNKSIZE * 4
  27. )
  28. self._manager = TransferManager(self.client, self.config)
  29. # Initialize some default arguments
  30. self.bucket = 'mybucket'
  31. self.key = 'mykey'
  32. self.copy_source = {
  33. 'Bucket': 'mysourcebucket',
  34. 'Key': 'mysourcekey'
  35. }
  36. self.extra_args = {}
  37. self.subscribers = []
  38. self.half_chunksize = int(MIN_UPLOAD_CHUNKSIZE / 2)
  39. self.content = b'0' * (2 * MIN_UPLOAD_CHUNKSIZE + self.half_chunksize)
  40. @property
  41. def manager(self):
  42. return self._manager
  43. @property
  44. def method(self):
  45. return self.manager.copy
  46. def create_call_kwargs(self):
  47. return {
  48. 'copy_source': self.copy_source,
  49. 'bucket': self.bucket,
  50. 'key': self.key,
  51. }
  52. def create_invalid_extra_args(self):
  53. return {
  54. 'Foo': 'bar'
  55. }
  56. def create_stubbed_responses(self):
  57. return [
  58. {
  59. 'method': 'head_object',
  60. 'service_response': {
  61. 'ContentLength': len(self.content)
  62. }
  63. },
  64. {
  65. 'method': 'copy_object',
  66. 'service_response': {}
  67. }
  68. ]
  69. def create_expected_progress_callback_info(self):
  70. return [
  71. {'bytes_transferred': len(self.content)},
  72. ]
  73. def add_head_object_response(self, expected_params=None, stubber=None):
  74. if not stubber:
  75. stubber = self.stubber
  76. head_response = self.create_stubbed_responses()[0]
  77. if expected_params:
  78. head_response['expected_params'] = expected_params
  79. stubber.add_response(**head_response)
  80. def add_successful_copy_responses(
  81. self, expected_copy_params=None, expected_create_mpu_params=None,
  82. expected_complete_mpu_params=None):
  83. # Add all responses needed to do the copy of the object.
  84. # Should account for both ranged and nonranged downloads.
  85. stubbed_responses = self.create_stubbed_responses()[1:]
  86. # If the length of copy responses is greater than one then it is
  87. # a multipart copy.
  88. copy_responses = stubbed_responses[0:1]
  89. if len(stubbed_responses) > 1:
  90. copy_responses = stubbed_responses[1:-1]
  91. # Add the expected create multipart upload params.
  92. if expected_create_mpu_params:
  93. stubbed_responses[0][
  94. 'expected_params'] = expected_create_mpu_params
  95. # Add any expected copy parameters.
  96. if expected_copy_params:
  97. for i, copy_response in enumerate(copy_responses):
  98. if isinstance(expected_copy_params, list):
  99. copy_response['expected_params'] = expected_copy_params[i]
  100. else:
  101. copy_response['expected_params'] = expected_copy_params
  102. # Add the expected complete multipart upload params.
  103. if expected_complete_mpu_params:
  104. stubbed_responses[-1][
  105. 'expected_params'] = expected_complete_mpu_params
  106. # Add the responses to the stubber.
  107. for stubbed_response in stubbed_responses:
  108. self.stubber.add_response(**stubbed_response)
  109. def test_can_provide_file_size(self):
  110. self.add_successful_copy_responses()
  111. call_kwargs = self.create_call_kwargs()
  112. call_kwargs['subscribers'] = [FileSizeProvider(len(self.content))]
  113. future = self.manager.copy(**call_kwargs)
  114. future.result()
  115. # The HeadObject should have not happened and should have been able
  116. # to successfully copy the file.
  117. self.stubber.assert_no_pending_responses()
  118. def test_provide_copy_source_as_dict(self):
  119. self.copy_source['VersionId'] = 'mysourceversionid'
  120. expected_params = {
  121. 'Bucket': 'mysourcebucket',
  122. 'Key': 'mysourcekey',
  123. 'VersionId': 'mysourceversionid'
  124. }
  125. self.add_head_object_response(expected_params=expected_params)
  126. self.add_successful_copy_responses()
  127. future = self.manager.copy(**self.create_call_kwargs())
  128. future.result()
  129. self.stubber.assert_no_pending_responses()
  130. def test_invalid_copy_source(self):
  131. self.copy_source = ['bucket', 'key']
  132. future = self.manager.copy(**self.create_call_kwargs())
  133. with self.assertRaises(TypeError):
  134. future.result()
  135. def test_provide_copy_source_client(self):
  136. source_client = self.session.create_client(
  137. 's3', 'eu-central-1', aws_access_key_id='foo',
  138. aws_secret_access_key='bar')
  139. source_stubber = Stubber(source_client)
  140. source_stubber.activate()
  141. self.addCleanup(source_stubber.deactivate)
  142. self.add_head_object_response(stubber=source_stubber)
  143. self.add_successful_copy_responses()
  144. call_kwargs = self.create_call_kwargs()
  145. call_kwargs['source_client'] = source_client
  146. future = self.manager.copy(**call_kwargs)
  147. future.result()
  148. # Make sure that all of the responses were properly
  149. # used for both clients.
  150. source_stubber.assert_no_pending_responses()
  151. self.stubber.assert_no_pending_responses()
  152. class TestNonMultipartCopy(BaseCopyTest):
  153. __test__ = True
  154. def test_copy(self):
  155. expected_head_params = {
  156. 'Bucket': 'mysourcebucket',
  157. 'Key': 'mysourcekey'
  158. }
  159. expected_copy_object = {
  160. 'Bucket': self.bucket,
  161. 'Key': self.key,
  162. 'CopySource': self.copy_source
  163. }
  164. self.add_head_object_response(expected_params=expected_head_params)
  165. self.add_successful_copy_responses(
  166. expected_copy_params=expected_copy_object)
  167. future = self.manager.copy(**self.create_call_kwargs())
  168. future.result()
  169. self.stubber.assert_no_pending_responses()
  170. def test_copy_with_extra_args(self):
  171. self.extra_args['MetadataDirective'] = 'REPLACE'
  172. expected_head_params = {
  173. 'Bucket': 'mysourcebucket',
  174. 'Key': 'mysourcekey'
  175. }
  176. expected_copy_object = {
  177. 'Bucket': self.bucket,
  178. 'Key': self.key,
  179. 'CopySource': self.copy_source,
  180. 'MetadataDirective': 'REPLACE'
  181. }
  182. self.add_head_object_response(expected_params=expected_head_params)
  183. self.add_successful_copy_responses(
  184. expected_copy_params=expected_copy_object)
  185. call_kwargs = self.create_call_kwargs()
  186. call_kwargs['extra_args'] = self.extra_args
  187. future = self.manager.copy(**call_kwargs)
  188. future.result()
  189. self.stubber.assert_no_pending_responses()
  190. def test_copy_maps_extra_args_to_head_object(self):
  191. self.extra_args['CopySourceSSECustomerAlgorithm'] = 'AES256'
  192. expected_head_params = {
  193. 'Bucket': 'mysourcebucket',
  194. 'Key': 'mysourcekey',
  195. 'SSECustomerAlgorithm': 'AES256'
  196. }
  197. expected_copy_object = {
  198. 'Bucket': self.bucket,
  199. 'Key': self.key,
  200. 'CopySource': self.copy_source,
  201. 'CopySourceSSECustomerAlgorithm': 'AES256'
  202. }
  203. self.add_head_object_response(expected_params=expected_head_params)
  204. self.add_successful_copy_responses(
  205. expected_copy_params=expected_copy_object)
  206. call_kwargs = self.create_call_kwargs()
  207. call_kwargs['extra_args'] = self.extra_args
  208. future = self.manager.copy(**call_kwargs)
  209. future.result()
  210. self.stubber.assert_no_pending_responses()
  211. def test_allowed_copy_params_are_valid(self):
  212. op_model = self.client.meta.service_model.operation_model('CopyObject')
  213. for allowed_upload_arg in self._manager.ALLOWED_COPY_ARGS:
  214. self.assertIn(allowed_upload_arg, op_model.input_shape.members)
  215. def test_copy_with_tagging(self):
  216. extra_args = {
  217. 'Tagging': 'tag1=val1', 'TaggingDirective': 'REPLACE'
  218. }
  219. self.add_head_object_response()
  220. self.add_successful_copy_responses(
  221. expected_copy_params={
  222. 'Bucket': self.bucket,
  223. 'Key': self.key,
  224. 'CopySource': self.copy_source,
  225. 'Tagging': 'tag1=val1',
  226. 'TaggingDirective': 'REPLACE'
  227. }
  228. )
  229. future = self.manager.copy(
  230. self.copy_source, self.bucket, self.key, extra_args)
  231. future.result()
  232. self.stubber.assert_no_pending_responses()
  233. def test_raise_exception_on_s3_object_lambda_resource(self):
  234. s3_object_lambda_arn = (
  235. 'arn:aws:s3-object-lambda:us-west-2:123456789012:'
  236. 'accesspoint:my-accesspoint'
  237. )
  238. with self.assertRaisesRegexp(ValueError, 'methods do not support'):
  239. self.manager.copy(self.copy_source, s3_object_lambda_arn, self.key)
  240. def test_raise_exception_on_s3_object_lambda_resource_as_source(self):
  241. source = {'Bucket': 'arn:aws:s3-object-lambda:us-west-2:123456789012:'
  242. 'accesspoint:my-accesspoint'}
  243. with self.assertRaisesRegexp(ValueError, 'methods do not support'):
  244. self.manager.copy(source, self.bucket, self.key)
  245. class TestMultipartCopy(BaseCopyTest):
  246. __test__ = True
  247. def setUp(self):
  248. super(TestMultipartCopy, self).setUp()
  249. self.config = TransferConfig(
  250. max_request_concurrency=1, multipart_threshold=1,
  251. multipart_chunksize=4)
  252. self._manager = TransferManager(self.client, self.config)
  253. def create_stubbed_responses(self):
  254. return [
  255. {
  256. 'method': 'head_object',
  257. 'service_response': {
  258. 'ContentLength': len(self.content)
  259. }
  260. },
  261. {
  262. 'method': 'create_multipart_upload',
  263. 'service_response': {
  264. 'UploadId': 'my-upload-id'
  265. }
  266. },
  267. {
  268. 'method': 'upload_part_copy',
  269. 'service_response': {
  270. 'CopyPartResult': {
  271. 'ETag': 'etag-1'
  272. }
  273. }
  274. },
  275. {
  276. 'method': 'upload_part_copy',
  277. 'service_response': {
  278. 'CopyPartResult': {
  279. 'ETag': 'etag-2'
  280. }
  281. }
  282. },
  283. {
  284. 'method': 'upload_part_copy',
  285. 'service_response': {
  286. 'CopyPartResult': {
  287. 'ETag': 'etag-3'
  288. }
  289. }
  290. },
  291. {
  292. 'method': 'complete_multipart_upload',
  293. 'service_response': {}
  294. }
  295. ]
  296. def create_expected_progress_callback_info(self):
  297. # Note that last read is from the empty sentinel indicating
  298. # that the stream is done.
  299. return [
  300. {'bytes_transferred': MIN_UPLOAD_CHUNKSIZE},
  301. {'bytes_transferred': MIN_UPLOAD_CHUNKSIZE},
  302. {'bytes_transferred': self.half_chunksize}
  303. ]
  304. def add_create_multipart_upload_response(self):
  305. self.stubber.add_response(**self.create_stubbed_responses()[1])
  306. def _get_expected_params(self):
  307. upload_id = 'my-upload-id'
  308. # Add expected parameters to the head object
  309. expected_head_params = {
  310. 'Bucket': 'mysourcebucket',
  311. 'Key': 'mysourcekey',
  312. }
  313. # Add expected parameters for the create multipart
  314. expected_create_mpu_params = {
  315. 'Bucket': self.bucket,
  316. 'Key': self.key,
  317. }
  318. expected_copy_params = []
  319. # Add expected parameters to the copy part
  320. ranges = ['bytes=0-5242879', 'bytes=5242880-10485759',
  321. 'bytes=10485760-13107199']
  322. for i, range_val in enumerate(ranges):
  323. expected_copy_params.append(
  324. {
  325. 'Bucket': self.bucket,
  326. 'Key': self.key,
  327. 'CopySource': self.copy_source,
  328. 'UploadId': upload_id,
  329. 'PartNumber': i + 1,
  330. 'CopySourceRange': range_val
  331. }
  332. )
  333. # Add expected parameters for the complete multipart
  334. expected_complete_mpu_params = {
  335. 'Bucket': self.bucket,
  336. 'Key': self.key, 'UploadId': upload_id,
  337. 'MultipartUpload': {
  338. 'Parts': [
  339. {'ETag': 'etag-1', 'PartNumber': 1},
  340. {'ETag': 'etag-2', 'PartNumber': 2},
  341. {'ETag': 'etag-3', 'PartNumber': 3}
  342. ]
  343. }
  344. }
  345. return expected_head_params, {
  346. 'expected_create_mpu_params': expected_create_mpu_params,
  347. 'expected_copy_params': expected_copy_params,
  348. 'expected_complete_mpu_params': expected_complete_mpu_params,
  349. }
  350. def _add_params_to_expected_params(
  351. self, add_copy_kwargs, operation_types, new_params):
  352. expected_params_to_update = []
  353. for operation_type in operation_types:
  354. add_copy_kwargs_key = 'expected_' + operation_type + '_params'
  355. expected_params = add_copy_kwargs[add_copy_kwargs_key]
  356. if isinstance(expected_params, list):
  357. expected_params_to_update.extend(expected_params)
  358. else:
  359. expected_params_to_update.append(expected_params)
  360. for expected_params in expected_params_to_update:
  361. expected_params.update(new_params)
  362. def test_copy(self):
  363. head_params, add_copy_kwargs = self._get_expected_params()
  364. self.add_head_object_response(expected_params=head_params)
  365. self.add_successful_copy_responses(**add_copy_kwargs)
  366. future = self.manager.copy(**self.create_call_kwargs())
  367. future.result()
  368. self.stubber.assert_no_pending_responses()
  369. def test_copy_with_extra_args(self):
  370. # This extra argument should be added to the head object,
  371. # the create multipart upload, and upload part copy.
  372. self.extra_args['RequestPayer'] = 'requester'
  373. head_params, add_copy_kwargs = self._get_expected_params()
  374. head_params.update(self.extra_args)
  375. self.add_head_object_response(expected_params=head_params)
  376. self._add_params_to_expected_params(
  377. add_copy_kwargs, ['create_mpu', 'copy', 'complete_mpu'],
  378. self.extra_args)
  379. self.add_successful_copy_responses(**add_copy_kwargs)
  380. call_kwargs = self.create_call_kwargs()
  381. call_kwargs['extra_args'] = self.extra_args
  382. future = self.manager.copy(**call_kwargs)
  383. future.result()
  384. self.stubber.assert_no_pending_responses()
  385. def test_copy_blacklists_args_to_create_multipart(self):
  386. # This argument can never be used for multipart uploads
  387. self.extra_args['MetadataDirective'] = 'COPY'
  388. head_params, add_copy_kwargs = self._get_expected_params()
  389. self.add_head_object_response(expected_params=head_params)
  390. self.add_successful_copy_responses(**add_copy_kwargs)
  391. call_kwargs = self.create_call_kwargs()
  392. call_kwargs['extra_args'] = self.extra_args
  393. future = self.manager.copy(**call_kwargs)
  394. future.result()
  395. self.stubber.assert_no_pending_responses()
  396. def test_copy_args_to_only_create_multipart(self):
  397. self.extra_args['ACL'] = 'private'
  398. head_params, add_copy_kwargs = self._get_expected_params()
  399. self.add_head_object_response(expected_params=head_params)
  400. self._add_params_to_expected_params(
  401. add_copy_kwargs, ['create_mpu'], self.extra_args)
  402. self.add_successful_copy_responses(**add_copy_kwargs)
  403. call_kwargs = self.create_call_kwargs()
  404. call_kwargs['extra_args'] = self.extra_args
  405. future = self.manager.copy(**call_kwargs)
  406. future.result()
  407. self.stubber.assert_no_pending_responses()
  408. def test_copy_passes_args_to_create_multipart_and_upload_part(self):
  409. # This will only be used for the complete multipart upload
  410. # and upload part.
  411. self.extra_args['SSECustomerAlgorithm'] = 'AES256'
  412. head_params, add_copy_kwargs = self._get_expected_params()
  413. self.add_head_object_response(expected_params=head_params)
  414. self._add_params_to_expected_params(
  415. add_copy_kwargs, ['create_mpu', 'copy'], self.extra_args)
  416. self.add_successful_copy_responses(**add_copy_kwargs)
  417. call_kwargs = self.create_call_kwargs()
  418. call_kwargs['extra_args'] = self.extra_args
  419. future = self.manager.copy(**call_kwargs)
  420. future.result()
  421. self.stubber.assert_no_pending_responses()
  422. def test_copy_maps_extra_args_to_head_object(self):
  423. self.extra_args['CopySourceSSECustomerAlgorithm'] = 'AES256'
  424. head_params, add_copy_kwargs = self._get_expected_params()
  425. # The CopySourceSSECustomerAlgorithm needs to get mapped to
  426. # SSECustomerAlgorithm for HeadObject
  427. head_params['SSECustomerAlgorithm'] = 'AES256'
  428. self.add_head_object_response(expected_params=head_params)
  429. # However, it needs to remain the same for UploadPartCopy.
  430. self._add_params_to_expected_params(
  431. add_copy_kwargs, ['copy'], self.extra_args)
  432. self.add_successful_copy_responses(**add_copy_kwargs)
  433. call_kwargs = self.create_call_kwargs()
  434. call_kwargs['extra_args'] = self.extra_args
  435. future = self.manager.copy(**call_kwargs)
  436. future.result()
  437. self.stubber.assert_no_pending_responses()
  438. def test_abort_on_failure(self):
  439. # First add the head object and create multipart upload
  440. self.add_head_object_response()
  441. self.add_create_multipart_upload_response()
  442. # Cause an error on upload_part_copy
  443. self.stubber.add_client_error('upload_part_copy', 'ArbitraryFailure')
  444. # Add the abort multipart to ensure it gets cleaned up on failure
  445. self.stubber.add_response(
  446. 'abort_multipart_upload',
  447. service_response={},
  448. expected_params={
  449. 'Bucket': self.bucket,
  450. 'Key': self.key,
  451. 'UploadId': 'my-upload-id'
  452. }
  453. )
  454. future = self.manager.copy(**self.create_call_kwargs())
  455. with self.assertRaisesRegexp(ClientError, 'ArbitraryFailure'):
  456. future.result()
  457. self.stubber.assert_no_pending_responses()
  458. def test_mp_copy_with_tagging_directive(self):
  459. extra_args = {
  460. 'Tagging': 'tag1=val1', 'TaggingDirective': 'REPLACE'
  461. }
  462. self.add_head_object_response()
  463. self.add_successful_copy_responses(
  464. expected_create_mpu_params={
  465. 'Bucket': self.bucket,
  466. 'Key': self.key,
  467. 'Tagging': 'tag1=val1',
  468. }
  469. )
  470. future = self.manager.copy(
  471. self.copy_source, self.bucket, self.key, extra_args)
  472. future.result()
  473. self.stubber.assert_no_pending_responses()