test_upload.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import os
  14. import time
  15. import tempfile
  16. import shutil
  17. import mock
  18. from botocore.client import Config
  19. from botocore.exceptions import ClientError
  20. from botocore.awsrequest import AWSRequest
  21. from botocore.stub import ANY
  22. from __tests__ import BaseGeneralInterfaceTest
  23. from __tests__ import RecordingSubscriber
  24. from __tests__ import RecordingOSUtils
  25. from __tests__ import NonSeekableReader
  26. from s3transfer.compat import six
  27. from s3transfer.manager import TransferManager
  28. from s3transfer.manager import TransferConfig
  29. from s3transfer.utils import ChunksizeAdjuster
  30. class BaseUploadTest(BaseGeneralInterfaceTest):
  31. def setUp(self):
  32. super(BaseUploadTest, self).setUp()
  33. # TODO: We do not want to use the real MIN_UPLOAD_CHUNKSIZE
  34. # when we're adjusting parts.
  35. # This is really wasteful and fails CI builds because self.contents
  36. # would normally use 10MB+ of memory.
  37. # Until there's an API to configure this, we're patching this with
  38. # a min size of 1. We can't patch MIN_UPLOAD_CHUNKSIZE directly
  39. # because it's already bound to a default value in the
  40. # chunksize adjuster. Instead we need to patch out the
  41. # chunksize adjuster class.
  42. self.adjuster_patch = mock.patch(
  43. 's3transfer.upload.ChunksizeAdjuster',
  44. lambda: ChunksizeAdjuster(min_size=1))
  45. self.adjuster_patch.start()
  46. self.config = TransferConfig(max_request_concurrency=1)
  47. self._manager = TransferManager(self.client, self.config)
  48. # Create a temporary directory with files to read from
  49. self.tempdir = tempfile.mkdtemp()
  50. self.filename = os.path.join(self.tempdir, 'myfile')
  51. self.content = b'my content'
  52. with open(self.filename, 'wb') as f:
  53. f.write(self.content)
  54. # Initialize some default arguments
  55. self.bucket = 'mybucket'
  56. self.key = 'mykey'
  57. self.extra_args = {}
  58. self.subscribers = []
  59. # A list to keep track of all of the bodies sent over the wire
  60. # and their order.
  61. self.sent_bodies = []
  62. self.client.meta.events.register(
  63. 'before-parameter-build.s3.*', self.collect_body)
  64. def tearDown(self):
  65. super(BaseUploadTest, self).tearDown()
  66. shutil.rmtree(self.tempdir)
  67. self.adjuster_patch.stop()
  68. def collect_body(self, params, model, **kwargs):
  69. # A handler to simulate the reading of the body including the
  70. # request-created event that signals to simulate the progress
  71. # callbacks
  72. if 'Body' in params:
  73. # TODO: This is not ideal. Need to figure out a better idea of
  74. # simulating reading of the request across the wire to trigger
  75. # progress callbacks
  76. request = AWSRequest(
  77. method='PUT', url='https://s3.amazonaws.com',
  78. data=params['Body']
  79. )
  80. self.client.meta.events.emit(
  81. 'request-created.s3.%s' % model.name,
  82. request=request, operation_name=model.name
  83. )
  84. self.sent_bodies.append(self._stream_body(params['Body']))
  85. def _stream_body(self, body):
  86. read_amt = 8 * 1024
  87. data = body.read(read_amt)
  88. collected_body = data
  89. while data:
  90. data = body.read(read_amt)
  91. collected_body += data
  92. return collected_body
  93. @property
  94. def manager(self):
  95. return self._manager
  96. @property
  97. def method(self):
  98. return self.manager.upload
  99. def create_call_kwargs(self):
  100. return {
  101. 'fileobj': self.filename,
  102. 'bucket': self.bucket,
  103. 'key': self.key
  104. }
  105. def create_invalid_extra_args(self):
  106. return {
  107. 'Foo': 'bar'
  108. }
  109. def create_stubbed_responses(self):
  110. return [{'method': 'put_object', 'service_response': {}}]
  111. def create_expected_progress_callback_info(self):
  112. return [{'bytes_transferred': 10}]
  113. def assert_expected_client_calls_were_correct(self):
  114. # We assert that expected client calls were made by ensuring that
  115. # there are no more pending responses. If there are no more pending
  116. # responses, then all stubbed responses were consumed.
  117. self.stubber.assert_no_pending_responses()
  118. class TestNonMultipartUpload(BaseUploadTest):
  119. __test__ = True
  120. def add_put_object_response_with_default_expected_params(
  121. self, extra_expected_params=None):
  122. expected_params = {
  123. 'Body': ANY, 'Bucket': self.bucket, 'Key': self.key
  124. }
  125. if extra_expected_params:
  126. expected_params.update(extra_expected_params)
  127. upload_response = self.create_stubbed_responses()[0]
  128. upload_response['expected_params'] = expected_params
  129. self.stubber.add_response(**upload_response)
  130. def assert_put_object_body_was_correct(self):
  131. self.assertEqual(self.sent_bodies, [self.content])
  132. def test_upload(self):
  133. self.extra_args['RequestPayer'] = 'requester'
  134. self.add_put_object_response_with_default_expected_params(
  135. extra_expected_params={'RequestPayer': 'requester'}
  136. )
  137. future = self.manager.upload(
  138. self.filename, self.bucket, self.key, self.extra_args)
  139. future.result()
  140. self.assert_expected_client_calls_were_correct()
  141. self.assert_put_object_body_was_correct()
  142. def test_upload_for_fileobj(self):
  143. self.add_put_object_response_with_default_expected_params()
  144. with open(self.filename, 'rb') as f:
  145. future = self.manager.upload(
  146. f, self.bucket, self.key, self.extra_args)
  147. future.result()
  148. self.assert_expected_client_calls_were_correct()
  149. self.assert_put_object_body_was_correct()
  150. def test_upload_for_seekable_filelike_obj(self):
  151. self.add_put_object_response_with_default_expected_params()
  152. bytes_io = six.BytesIO(self.content)
  153. future = self.manager.upload(
  154. bytes_io, self.bucket, self.key, self.extra_args)
  155. future.result()
  156. self.assert_expected_client_calls_were_correct()
  157. self.assert_put_object_body_was_correct()
  158. def test_upload_for_seekable_filelike_obj_that_has_been_seeked(self):
  159. self.add_put_object_response_with_default_expected_params()
  160. bytes_io = six.BytesIO(self.content)
  161. seek_pos = 5
  162. bytes_io.seek(seek_pos)
  163. future = self.manager.upload(
  164. bytes_io, self.bucket, self.key, self.extra_args)
  165. future.result()
  166. self.assert_expected_client_calls_were_correct()
  167. self.assertEqual(b''.join(self.sent_bodies), self.content[seek_pos:])
  168. def test_upload_for_non_seekable_filelike_obj(self):
  169. self.add_put_object_response_with_default_expected_params()
  170. body = NonSeekableReader(self.content)
  171. future = self.manager.upload(
  172. body, self.bucket, self.key, self.extra_args)
  173. future.result()
  174. self.assert_expected_client_calls_were_correct()
  175. self.assert_put_object_body_was_correct()
  176. def test_sigv4_progress_callbacks_invoked_once(self):
  177. # Reset the client and manager to use sigv4
  178. self.reset_stubber_with_new_client(
  179. {'config': Config(signature_version='s3v4')})
  180. self.client.meta.events.register(
  181. 'before-parameter-build.s3.*', self.collect_body)
  182. self._manager = TransferManager(self.client, self.config)
  183. # Add the stubbed response.
  184. self.add_put_object_response_with_default_expected_params()
  185. subscriber = RecordingSubscriber()
  186. future = self.manager.upload(
  187. self.filename, self.bucket, self.key, subscribers=[subscriber])
  188. future.result()
  189. self.assert_expected_client_calls_were_correct()
  190. # The amount of bytes seen should be the same as the file size
  191. self.assertEqual(subscriber.calculate_bytes_seen(), len(self.content))
  192. def test_uses_provided_osutil(self):
  193. osutil = RecordingOSUtils()
  194. # Use the recording os utility for the transfer manager
  195. self._manager = TransferManager(self.client, self.config, osutil)
  196. self.add_put_object_response_with_default_expected_params()
  197. future = self.manager.upload(self.filename, self.bucket, self.key)
  198. future.result()
  199. # The upload should have used the os utility. We check this by making
  200. # sure that the recorded opens are as expected.
  201. expected_opens = [(self.filename, 'rb')]
  202. self.assertEqual(osutil.open_records, expected_opens)
  203. def test_allowed_upload_params_are_valid(self):
  204. op_model = self.client.meta.service_model.operation_model('PutObject')
  205. for allowed_upload_arg in self._manager.ALLOWED_UPLOAD_ARGS:
  206. self.assertIn(allowed_upload_arg, op_model.input_shape.members)
  207. def test_upload_with_bandwidth_limiter(self):
  208. self.content = b'a' * 1024 * 1024
  209. with open(self.filename, 'wb') as f:
  210. f.write(self.content)
  211. self.config = TransferConfig(
  212. max_request_concurrency=1, max_bandwidth=len(self.content)/2)
  213. self._manager = TransferManager(self.client, self.config)
  214. self.add_put_object_response_with_default_expected_params()
  215. start = time.time()
  216. future = self.manager.upload(self.filename, self.bucket, self.key)
  217. future.result()
  218. # This is just a smoke test to make sure that the limiter is
  219. # being used and not necessary its exactness. So we set the maximum
  220. # bandwidth to len(content)/2 per sec and make sure that it is
  221. # noticeably slower. Ideally it will take more than two seconds, but
  222. # given tracking at the beginning of transfers are not entirely
  223. # accurate setting at the initial start of a transfer, we give us
  224. # some flexibility by setting the expected time to half of the
  225. # theoretical time to take.
  226. self.assertGreaterEqual(time.time() - start, 1)
  227. self.assert_expected_client_calls_were_correct()
  228. self.assert_put_object_body_was_correct()
  229. def test_raise_exception_on_s3_object_lambda_resource(self):
  230. s3_object_lambda_arn = (
  231. 'arn:aws:s3-object-lambda:us-west-2:123456789012:'
  232. 'accesspoint:my-accesspoint'
  233. )
  234. with self.assertRaisesRegexp(ValueError, 'methods do not support'):
  235. self.manager.upload(self.filename, s3_object_lambda_arn, self.key)
  236. class TestMultipartUpload(BaseUploadTest):
  237. __test__ = True
  238. def setUp(self):
  239. super(TestMultipartUpload, self).setUp()
  240. self.chunksize = 4
  241. self.config = TransferConfig(
  242. max_request_concurrency=1, multipart_threshold=1,
  243. multipart_chunksize=self.chunksize)
  244. self._manager = TransferManager(self.client, self.config)
  245. self.multipart_id = 'my-upload-id'
  246. def create_stubbed_responses(self):
  247. return [
  248. {'method': 'create_multipart_upload',
  249. 'service_response': {'UploadId': self.multipart_id}},
  250. {'method': 'upload_part',
  251. 'service_response': {'ETag': 'etag-1'}},
  252. {'method': 'upload_part',
  253. 'service_response': {'ETag': 'etag-2'}},
  254. {'method': 'upload_part',
  255. 'service_response': {'ETag': 'etag-3'}},
  256. {'method': 'complete_multipart_upload', 'service_response': {}}
  257. ]
  258. def create_expected_progress_callback_info(self):
  259. return [
  260. {'bytes_transferred': 4},
  261. {'bytes_transferred': 4},
  262. {'bytes_transferred': 2}
  263. ]
  264. def assert_upload_part_bodies_were_correct(self):
  265. expected_contents = []
  266. for i in range(0, len(self.content), self.chunksize):
  267. end_i = i + self.chunksize
  268. if end_i > len(self.content):
  269. expected_contents.append(self.content[i:])
  270. else:
  271. expected_contents.append(self.content[i:end_i])
  272. self.assertEqual(self.sent_bodies, expected_contents)
  273. def add_create_multipart_response_with_default_expected_params(
  274. self, extra_expected_params=None):
  275. expected_params = {'Bucket': self.bucket, 'Key': self.key}
  276. if extra_expected_params:
  277. expected_params.update(extra_expected_params)
  278. response = self.create_stubbed_responses()[0]
  279. response['expected_params'] = expected_params
  280. self.stubber.add_response(**response)
  281. def add_upload_part_responses_with_default_expected_params(
  282. self, extra_expected_params=None):
  283. num_parts = 3
  284. upload_part_responses = self.create_stubbed_responses()[1:-1]
  285. for i in range(num_parts):
  286. upload_part_response = upload_part_responses[i]
  287. expected_params = {
  288. 'Bucket': self.bucket,
  289. 'Key': self.key,
  290. 'UploadId': self.multipart_id,
  291. 'Body': ANY,
  292. 'PartNumber': i + 1,
  293. }
  294. if extra_expected_params:
  295. expected_params.update(extra_expected_params)
  296. upload_part_response['expected_params'] = expected_params
  297. self.stubber.add_response(**upload_part_response)
  298. def add_complete_multipart_response_with_default_expected_params(
  299. self, extra_expected_params=None):
  300. expected_params = {
  301. 'Bucket': self.bucket,
  302. 'Key': self.key, 'UploadId': self.multipart_id,
  303. 'MultipartUpload': {
  304. 'Parts': [
  305. {'ETag': 'etag-1', 'PartNumber': 1},
  306. {'ETag': 'etag-2', 'PartNumber': 2},
  307. {'ETag': 'etag-3', 'PartNumber': 3}
  308. ]
  309. }
  310. }
  311. if extra_expected_params:
  312. expected_params.update(extra_expected_params)
  313. response = self.create_stubbed_responses()[-1]
  314. response['expected_params'] = expected_params
  315. self.stubber.add_response(**response)
  316. def test_upload(self):
  317. self.extra_args['RequestPayer'] = 'requester'
  318. # Add requester pays to the create multipart upload and upload parts.
  319. self.add_create_multipart_response_with_default_expected_params(
  320. extra_expected_params={'RequestPayer': 'requester'})
  321. self.add_upload_part_responses_with_default_expected_params(
  322. extra_expected_params={'RequestPayer': 'requester'})
  323. self.add_complete_multipart_response_with_default_expected_params(
  324. extra_expected_params={'RequestPayer': 'requester'})
  325. future = self.manager.upload(
  326. self.filename, self.bucket, self.key, self.extra_args)
  327. future.result()
  328. self.assert_expected_client_calls_were_correct()
  329. def test_upload_for_fileobj(self):
  330. self.add_create_multipart_response_with_default_expected_params()
  331. self.add_upload_part_responses_with_default_expected_params()
  332. self.add_complete_multipart_response_with_default_expected_params()
  333. with open(self.filename, 'rb') as f:
  334. future = self.manager.upload(
  335. f, self.bucket, self.key, self.extra_args)
  336. future.result()
  337. self.assert_expected_client_calls_were_correct()
  338. self.assert_upload_part_bodies_were_correct()
  339. def test_upload_for_seekable_filelike_obj(self):
  340. self.add_create_multipart_response_with_default_expected_params()
  341. self.add_upload_part_responses_with_default_expected_params()
  342. self.add_complete_multipart_response_with_default_expected_params()
  343. bytes_io = six.BytesIO(self.content)
  344. future = self.manager.upload(
  345. bytes_io, self.bucket, self.key, self.extra_args)
  346. future.result()
  347. self.assert_expected_client_calls_were_correct()
  348. self.assert_upload_part_bodies_were_correct()
  349. def test_upload_for_seekable_filelike_obj_that_has_been_seeked(self):
  350. self.add_create_multipart_response_with_default_expected_params()
  351. self.add_upload_part_responses_with_default_expected_params()
  352. self.add_complete_multipart_response_with_default_expected_params()
  353. bytes_io = six.BytesIO(self.content)
  354. seek_pos = 1
  355. bytes_io.seek(seek_pos)
  356. future = self.manager.upload(
  357. bytes_io, self.bucket, self.key, self.extra_args)
  358. future.result()
  359. self.assert_expected_client_calls_were_correct()
  360. self.assertEqual(b''.join(self.sent_bodies), self.content[seek_pos:])
  361. def test_upload_for_non_seekable_filelike_obj(self):
  362. self.add_create_multipart_response_with_default_expected_params()
  363. self.add_upload_part_responses_with_default_expected_params()
  364. self.add_complete_multipart_response_with_default_expected_params()
  365. stream = NonSeekableReader(self.content)
  366. future = self.manager.upload(
  367. stream, self.bucket, self.key, self.extra_args)
  368. future.result()
  369. self.assert_expected_client_calls_were_correct()
  370. self.assert_upload_part_bodies_were_correct()
  371. def test_limits_in_memory_chunks_for_fileobj(self):
  372. # Limit the maximum in memory chunks to one but make number of
  373. # threads more than one. This means that the upload will have to
  374. # happen sequentially despite having many threads available because
  375. # data is sequentially partitioned into chunks in memory and since
  376. # there can only every be one in memory chunk, each upload part will
  377. # have to happen one at a time.
  378. self.config.max_request_concurrency = 10
  379. self.config.max_in_memory_upload_chunks = 1
  380. self._manager = TransferManager(self.client, self.config)
  381. # Add some default stubbed responses.
  382. # These responses are added in order of part number so if the
  383. # multipart upload is not done sequentially, which it should because
  384. # we limit the in memory upload chunks to one, the stubber will
  385. # raise exceptions for mismatching parameters for partNumber when
  386. # once the upload() method is called on the transfer manager.
  387. # If there is a mismatch, the stubber error will propogate on
  388. # the future.result()
  389. self.add_create_multipart_response_with_default_expected_params()
  390. self.add_upload_part_responses_with_default_expected_params()
  391. self.add_complete_multipart_response_with_default_expected_params()
  392. with open(self.filename, 'rb') as f:
  393. future = self.manager.upload(
  394. f, self.bucket, self.key, self.extra_args)
  395. future.result()
  396. # Make sure that the stubber had all of its stubbed responses consumed.
  397. self.assert_expected_client_calls_were_correct()
  398. # Ensure the contents were uploaded in sequentially order by checking
  399. # the sent contents were in order.
  400. self.assert_upload_part_bodies_were_correct()
  401. def test_upload_failure_invokes_abort(self):
  402. self.stubber.add_response(
  403. method='create_multipart_upload',
  404. service_response={
  405. 'UploadId': self.multipart_id
  406. },
  407. expected_params={
  408. 'Bucket': self.bucket,
  409. 'Key': self.key
  410. }
  411. )
  412. self.stubber.add_response(
  413. method='upload_part',
  414. service_response={
  415. 'ETag': 'etag-1'
  416. },
  417. expected_params={
  418. 'Bucket': self.bucket, 'Body': ANY,
  419. 'Key': self.key, 'UploadId': self.multipart_id,
  420. 'PartNumber': 1
  421. }
  422. )
  423. # With the upload part failing this should immediately initiate
  424. # an abort multipart with no more upload parts called.
  425. self.stubber.add_client_error(method='upload_part')
  426. self.stubber.add_response(
  427. method='abort_multipart_upload',
  428. service_response={},
  429. expected_params={
  430. 'Bucket': self.bucket,
  431. 'Key': self.key, 'UploadId': self.multipart_id
  432. }
  433. )
  434. future = self.manager.upload(self.filename, self.bucket, self.key)
  435. # The exception should get propogated to the future and not be
  436. # a cancelled error or something.
  437. with self.assertRaises(ClientError):
  438. future.result()
  439. self.assert_expected_client_calls_were_correct()
  440. def test_upload_passes_select_extra_args(self):
  441. self.extra_args['Metadata'] = {'foo': 'bar'}
  442. # Add metadata to expected create multipart upload call
  443. self.add_create_multipart_response_with_default_expected_params(
  444. extra_expected_params={'Metadata': {'foo': 'bar'}})
  445. self.add_upload_part_responses_with_default_expected_params()
  446. self.add_complete_multipart_response_with_default_expected_params()
  447. future = self.manager.upload(
  448. self.filename, self.bucket, self.key, self.extra_args)
  449. future.result()
  450. self.assert_expected_client_calls_were_correct()