test_upload.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the 'License'). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the 'license' file accompanying this file. This file is
  10. # distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. from __future__ import division
  14. import os
  15. import tempfile
  16. import shutil
  17. import math
  18. from botocore.stub import ANY
  19. from __tests__ import unittest
  20. from __tests__ import BaseTaskTest
  21. from __tests__ import BaseSubmissionTaskTest
  22. from __tests__ import FileSizeProvider
  23. from __tests__ import RecordingSubscriber
  24. from __tests__ import RecordingExecutor
  25. from __tests__ import NonSeekableReader
  26. from s3transfer.compat import six
  27. from s3transfer.futures import IN_MEMORY_UPLOAD_TAG
  28. from s3transfer.manager import TransferConfig
  29. from s3transfer.upload import AggregatedProgressCallback
  30. from s3transfer.upload import InterruptReader
  31. from s3transfer.upload import UploadFilenameInputManager
  32. from s3transfer.upload import UploadSeekableInputManager
  33. from s3transfer.upload import UploadNonSeekableInputManager
  34. from s3transfer.upload import UploadSubmissionTask
  35. from s3transfer.upload import PutObjectTask
  36. from s3transfer.upload import UploadPartTask
  37. from s3transfer.utils import CallArgs
  38. from s3transfer.utils import OSUtils
  39. from s3transfer.utils import MIN_UPLOAD_CHUNKSIZE
  40. class InterruptionError(Exception):
  41. pass
  42. class OSUtilsExceptionOnFileSize(OSUtils):
  43. def get_file_size(self, filename):
  44. raise AssertionError(
  45. "The file %s should not have been stated" % filename)
  46. class BaseUploadTest(BaseTaskTest):
  47. def setUp(self):
  48. super(BaseUploadTest, self).setUp()
  49. self.bucket = 'mybucket'
  50. self.key = 'foo'
  51. self.osutil = OSUtils()
  52. self.tempdir = tempfile.mkdtemp()
  53. self.filename = os.path.join(self.tempdir, 'myfile')
  54. self.content = b'my content'
  55. self.subscribers = []
  56. with open(self.filename, 'wb') as f:
  57. f.write(self.content)
  58. # A list to keep track of all of the bodies sent over the wire
  59. # and their order.
  60. self.sent_bodies = []
  61. self.client.meta.events.register(
  62. 'before-parameter-build.s3.*', self.collect_body)
  63. def tearDown(self):
  64. super(BaseUploadTest, self).tearDown()
  65. shutil.rmtree(self.tempdir)
  66. def collect_body(self, params, **kwargs):
  67. if 'Body' in params:
  68. self.sent_bodies.append(params['Body'].read())
  69. class TestAggregatedProgressCallback(unittest.TestCase):
  70. def setUp(self):
  71. self.aggregated_amounts = []
  72. self.threshold = 3
  73. self.aggregated_progress_callback = AggregatedProgressCallback(
  74. [self.callback], self.threshold)
  75. def callback(self, bytes_transferred):
  76. self.aggregated_amounts.append(bytes_transferred)
  77. def test_under_threshold(self):
  78. one_under_threshold_amount = self.threshold - 1
  79. self.aggregated_progress_callback(one_under_threshold_amount)
  80. self.assertEqual(self.aggregated_amounts, [])
  81. self.aggregated_progress_callback(1)
  82. self.assertEqual(self.aggregated_amounts, [self.threshold])
  83. def test_at_threshold(self):
  84. self.aggregated_progress_callback(self.threshold)
  85. self.assertEqual(self.aggregated_amounts, [self.threshold])
  86. def test_over_threshold(self):
  87. over_threshold_amount = self.threshold + 1
  88. self.aggregated_progress_callback(over_threshold_amount)
  89. self.assertEqual(self.aggregated_amounts, [over_threshold_amount])
  90. def test_flush(self):
  91. under_threshold_amount = self.threshold - 1
  92. self.aggregated_progress_callback(under_threshold_amount)
  93. self.assertEqual(self.aggregated_amounts, [])
  94. self.aggregated_progress_callback.flush()
  95. self.assertEqual(self.aggregated_amounts, [under_threshold_amount])
  96. def test_flush_with_nothing_to_flush(self):
  97. under_threshold_amount = self.threshold - 1
  98. self.aggregated_progress_callback(under_threshold_amount)
  99. self.assertEqual(self.aggregated_amounts, [])
  100. self.aggregated_progress_callback.flush()
  101. self.assertEqual(self.aggregated_amounts, [under_threshold_amount])
  102. # Flushing again should do nothing as it was just flushed
  103. self.aggregated_progress_callback.flush()
  104. self.assertEqual(self.aggregated_amounts, [under_threshold_amount])
  105. class TestInterruptReader(BaseUploadTest):
  106. def test_read_raises_exception(self):
  107. with open(self.filename, 'rb') as f:
  108. reader = InterruptReader(f, self.transfer_coordinator)
  109. # Read some bytes to show it can be read.
  110. self.assertEqual(reader.read(1), self.content[0:1])
  111. # Then set an exception in the transfer coordinator
  112. self.transfer_coordinator.set_exception(InterruptionError())
  113. # The next read should have the exception propograte
  114. with self.assertRaises(InterruptionError):
  115. reader.read()
  116. def test_seek(self):
  117. with open(self.filename, 'rb') as f:
  118. reader = InterruptReader(f, self.transfer_coordinator)
  119. # Ensure it can seek correctly
  120. reader.seek(1)
  121. self.assertEqual(reader.read(1), self.content[1:2])
  122. def test_tell(self):
  123. with open(self.filename, 'rb') as f:
  124. reader = InterruptReader(f, self.transfer_coordinator)
  125. # Ensure it can tell correctly
  126. reader.seek(1)
  127. self.assertEqual(reader.tell(), 1)
  128. class BaseUploadInputManagerTest(BaseUploadTest):
  129. def setUp(self):
  130. super(BaseUploadInputManagerTest, self).setUp()
  131. self.osutil = OSUtils()
  132. self.config = TransferConfig()
  133. self.recording_subscriber = RecordingSubscriber()
  134. self.subscribers.append(self.recording_subscriber)
  135. def _get_expected_body_for_part(self, part_number):
  136. # A helper method for retrieving the expected body for a specific
  137. # part number of the data
  138. total_size = len(self.content)
  139. chunk_size = self.config.multipart_chunksize
  140. start_index = (part_number - 1) * chunk_size
  141. end_index = part_number * chunk_size
  142. if end_index >= total_size:
  143. return self.content[start_index:]
  144. return self.content[start_index:end_index]
  145. class TestUploadFilenameInputManager(BaseUploadInputManagerTest):
  146. def setUp(self):
  147. super(TestUploadFilenameInputManager, self).setUp()
  148. self.upload_input_manager = UploadFilenameInputManager(
  149. self.osutil, self.transfer_coordinator)
  150. self.call_args = CallArgs(
  151. fileobj=self.filename, subscribers=self.subscribers)
  152. self.future = self.get_transfer_future(self.call_args)
  153. def test_is_compatible(self):
  154. self.assertTrue(
  155. self.upload_input_manager.is_compatible(
  156. self.future.meta.call_args.fileobj)
  157. )
  158. def test_stores_bodies_in_memory_put_object(self):
  159. self.assertFalse(
  160. self.upload_input_manager.stores_body_in_memory('put_object'))
  161. def test_stores_bodies_in_memory_upload_part(self):
  162. self.assertFalse(
  163. self.upload_input_manager.stores_body_in_memory('upload_part'))
  164. def test_provide_transfer_size(self):
  165. self.upload_input_manager.provide_transfer_size(self.future)
  166. # The provided file size should be equal to size of the contents of
  167. # the file.
  168. self.assertEqual(self.future.meta.size, len(self.content))
  169. def test_requires_multipart_upload(self):
  170. self.future.meta.provide_transfer_size(len(self.content))
  171. # With the default multipart threshold, the length of the content
  172. # should be smaller than the threshold thus not requiring a multipart
  173. # transfer.
  174. self.assertFalse(
  175. self.upload_input_manager.requires_multipart_upload(
  176. self.future, self.config))
  177. # Decreasing the threshold to that of the length of the content of
  178. # the file should trigger the need for a multipart upload.
  179. self.config.multipart_threshold = len(self.content)
  180. self.assertTrue(
  181. self.upload_input_manager.requires_multipart_upload(
  182. self.future, self.config))
  183. def test_get_put_object_body(self):
  184. self.future.meta.provide_transfer_size(len(self.content))
  185. read_file_chunk = self.upload_input_manager.get_put_object_body(
  186. self.future)
  187. read_file_chunk.enable_callback()
  188. # The file-like object provided back should be the same as the content
  189. # of the file.
  190. with read_file_chunk:
  191. self.assertEqual(read_file_chunk.read(), self.content)
  192. # The file-like object should also have been wrapped with the
  193. # on_queued callbacks to track the amount of bytes being transferred.
  194. self.assertEqual(
  195. self.recording_subscriber.calculate_bytes_seen(),
  196. len(self.content))
  197. def test_get_put_object_body_is_interruptable(self):
  198. self.future.meta.provide_transfer_size(len(self.content))
  199. read_file_chunk = self.upload_input_manager.get_put_object_body(
  200. self.future)
  201. # Set an exception in the transfer coordinator
  202. self.transfer_coordinator.set_exception(InterruptionError)
  203. # Ensure the returned read file chunk can be interrupted with that
  204. # error.
  205. with self.assertRaises(InterruptionError):
  206. read_file_chunk.read()
  207. def test_yield_upload_part_bodies(self):
  208. # Adjust the chunk size to something more grainular for testing.
  209. self.config.multipart_chunksize = 4
  210. self.future.meta.provide_transfer_size(len(self.content))
  211. # Get an iterator that will yield all of the bodies and their
  212. # respective part number.
  213. part_iterator = self.upload_input_manager.yield_upload_part_bodies(
  214. self.future, self.config.multipart_chunksize)
  215. expected_part_number = 1
  216. for part_number, read_file_chunk in part_iterator:
  217. # Ensure that the part number is as expected
  218. self.assertEqual(part_number, expected_part_number)
  219. read_file_chunk.enable_callback()
  220. # Ensure that the body is correct for that part.
  221. with read_file_chunk:
  222. self.assertEqual(
  223. read_file_chunk.read(),
  224. self._get_expected_body_for_part(part_number))
  225. expected_part_number += 1
  226. # All of the file-like object should also have been wrapped with the
  227. # on_queued callbacks to track the amount of bytes being transferred.
  228. self.assertEqual(
  229. self.recording_subscriber.calculate_bytes_seen(),
  230. len(self.content))
  231. def test_yield_upload_part_bodies_are_interruptable(self):
  232. # Adjust the chunk size to something more grainular for testing.
  233. self.config.multipart_chunksize = 4
  234. self.future.meta.provide_transfer_size(len(self.content))
  235. # Get an iterator that will yield all of the bodies and their
  236. # respective part number.
  237. part_iterator = self.upload_input_manager.yield_upload_part_bodies(
  238. self.future, self.config.multipart_chunksize)
  239. # Set an exception in the transfer coordinator
  240. self.transfer_coordinator.set_exception(InterruptionError)
  241. for _, read_file_chunk in part_iterator:
  242. # Ensure that each read file chunk yielded can be interrupted
  243. # with that error.
  244. with self.assertRaises(InterruptionError):
  245. read_file_chunk.read()
  246. class TestUploadSeekableInputManager(TestUploadFilenameInputManager):
  247. def setUp(self):
  248. super(TestUploadSeekableInputManager, self).setUp()
  249. self.upload_input_manager = UploadSeekableInputManager(
  250. self.osutil, self.transfer_coordinator)
  251. self.fileobj = open(self.filename, 'rb')
  252. self.call_args = CallArgs(
  253. fileobj=self.fileobj, subscribers=self.subscribers)
  254. self.future = self.get_transfer_future(self.call_args)
  255. def tearDown(self):
  256. self.fileobj.close()
  257. super(TestUploadSeekableInputManager, self).tearDown()
  258. def test_is_compatible_bytes_io(self):
  259. self.assertTrue(
  260. self.upload_input_manager.is_compatible(six.BytesIO()))
  261. def test_not_compatible_for_non_filelike_obj(self):
  262. self.assertFalse(self.upload_input_manager.is_compatible(object()))
  263. def test_stores_bodies_in_memory_upload_part(self):
  264. self.assertTrue(
  265. self.upload_input_manager.stores_body_in_memory('upload_part'))
  266. def test_get_put_object_body(self):
  267. start_pos = 3
  268. self.fileobj.seek(start_pos)
  269. adjusted_size = len(self.content) - start_pos
  270. self.future.meta.provide_transfer_size(adjusted_size)
  271. read_file_chunk = self.upload_input_manager.get_put_object_body(
  272. self.future)
  273. read_file_chunk.enable_callback()
  274. # The fact that the file was seeked to start should be taken into
  275. # account in length and content for the read file chunk.
  276. with read_file_chunk:
  277. self.assertEqual(len(read_file_chunk), adjusted_size)
  278. self.assertEqual(read_file_chunk.read(), self.content[start_pos:])
  279. self.assertEqual(
  280. self.recording_subscriber.calculate_bytes_seen(), adjusted_size)
  281. class TestUploadNonSeekableInputManager(TestUploadFilenameInputManager):
  282. def setUp(self):
  283. super(TestUploadNonSeekableInputManager, self).setUp()
  284. self.upload_input_manager = UploadNonSeekableInputManager(
  285. self.osutil, self.transfer_coordinator)
  286. self.fileobj = NonSeekableReader(self.content)
  287. self.call_args = CallArgs(
  288. fileobj=self.fileobj, subscribers=self.subscribers)
  289. self.future = self.get_transfer_future(self.call_args)
  290. def assert_multipart_parts(self):
  291. """
  292. Asserts that the input manager will generate a multipart upload
  293. and that each part is in order and the correct size.
  294. """
  295. # Assert that a multipart upload is required.
  296. self.assertTrue(
  297. self.upload_input_manager.requires_multipart_upload(
  298. self.future, self.config))
  299. # Get a list of all the parts that would be sent.
  300. parts = list(
  301. self.upload_input_manager.yield_upload_part_bodies(
  302. self.future, self.config.multipart_chunksize))
  303. # Assert that the actual number of parts is what we would expect
  304. # based on the configuration.
  305. size = self.config.multipart_chunksize
  306. num_parts = math.ceil(len(self.content) / size)
  307. self.assertEqual(len(parts), num_parts)
  308. # Run for every part but the last part.
  309. for i, part in enumerate(parts[:-1]):
  310. # Assert the part number is correct.
  311. self.assertEqual(part[0], i + 1)
  312. # Assert the part contains the right amount of data.
  313. data = part[1].read()
  314. self.assertEqual(len(data), size)
  315. # Assert that the last part is the correct size.
  316. expected_final_size = len(self.content) - ((num_parts - 1) * size)
  317. final_part = parts[-1]
  318. self.assertEqual(len(final_part[1].read()), expected_final_size)
  319. # Assert that the last part has the correct part number.
  320. self.assertEqual(final_part[0], len(parts))
  321. def test_provide_transfer_size(self):
  322. self.upload_input_manager.provide_transfer_size(self.future)
  323. # There is no way to get the size without reading the entire body.
  324. self.assertEqual(self.future.meta.size, None)
  325. def test_stores_bodies_in_memory_upload_part(self):
  326. self.assertTrue(
  327. self.upload_input_manager.stores_body_in_memory('upload_part'))
  328. def test_stores_bodies_in_memory_put_object(self):
  329. self.assertTrue(
  330. self.upload_input_manager.stores_body_in_memory('put_object'))
  331. def test_initial_data_parts_threshold_lesser(self):
  332. # threshold < size
  333. self.config.multipart_chunksize = 4
  334. self.config.multipart_threshold = 2
  335. self.assert_multipart_parts()
  336. def test_initial_data_parts_threshold_equal(self):
  337. # threshold == size
  338. self.config.multipart_chunksize = 4
  339. self.config.multipart_threshold = 4
  340. self.assert_multipart_parts()
  341. def test_initial_data_parts_threshold_greater(self):
  342. # threshold > size
  343. self.config.multipart_chunksize = 4
  344. self.config.multipart_threshold = 8
  345. self.assert_multipart_parts()
  346. class TestUploadSubmissionTask(BaseSubmissionTaskTest):
  347. def setUp(self):
  348. super(TestUploadSubmissionTask, self).setUp()
  349. self.tempdir = tempfile.mkdtemp()
  350. self.filename = os.path.join(self.tempdir, 'myfile')
  351. self.content = b'0' * (MIN_UPLOAD_CHUNKSIZE * 3)
  352. self.config.multipart_chunksize = MIN_UPLOAD_CHUNKSIZE
  353. self.config.multipart_threshold = MIN_UPLOAD_CHUNKSIZE * 5
  354. with open(self.filename, 'wb') as f:
  355. f.write(self.content)
  356. self.bucket = 'mybucket'
  357. self.key = 'mykey'
  358. self.extra_args = {}
  359. self.subscribers = []
  360. # A list to keep track of all of the bodies sent over the wire
  361. # and their order.
  362. self.sent_bodies = []
  363. self.client.meta.events.register(
  364. 'before-parameter-build.s3.*', self.collect_body)
  365. self.call_args = self.get_call_args()
  366. self.transfer_future = self.get_transfer_future(self.call_args)
  367. self.submission_main_kwargs = {
  368. 'client': self.client,
  369. 'config': self.config,
  370. 'osutil': self.osutil,
  371. 'request_executor': self.executor,
  372. 'transfer_future': self.transfer_future
  373. }
  374. self.submission_task = self.get_task(
  375. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs)
  376. def tearDown(self):
  377. super(TestUploadSubmissionTask, self).tearDown()
  378. shutil.rmtree(self.tempdir)
  379. def collect_body(self, params, **kwargs):
  380. if 'Body' in params:
  381. self.sent_bodies.append(params['Body'].read())
  382. def get_call_args(self, **kwargs):
  383. default_call_args = {
  384. 'fileobj': self.filename, 'bucket': self.bucket,
  385. 'key': self.key, 'extra_args': self.extra_args,
  386. 'subscribers': self.subscribers
  387. }
  388. default_call_args.update(kwargs)
  389. return CallArgs(**default_call_args)
  390. def add_multipart_upload_stubbed_responses(self):
  391. self.stubber.add_response(
  392. method='create_multipart_upload',
  393. service_response={'UploadId': 'my-id'}
  394. )
  395. self.stubber.add_response(
  396. method='upload_part',
  397. service_response={'ETag': 'etag-1'}
  398. )
  399. self.stubber.add_response(
  400. method='upload_part',
  401. service_response={'ETag': 'etag-2'}
  402. )
  403. self.stubber.add_response(
  404. method='upload_part',
  405. service_response={'ETag': 'etag-3'}
  406. )
  407. self.stubber.add_response(
  408. method='complete_multipart_upload',
  409. service_response={}
  410. )
  411. def wrap_executor_in_recorder(self):
  412. self.executor = RecordingExecutor(self.executor)
  413. self.submission_main_kwargs['request_executor'] = self.executor
  414. def use_fileobj_in_call_args(self, fileobj):
  415. self.call_args = self.get_call_args(fileobj=fileobj)
  416. self.transfer_future = self.get_transfer_future(self.call_args)
  417. self.submission_main_kwargs['transfer_future'] = self.transfer_future
  418. def assert_tag_value_for_put_object(self, tag_value):
  419. self.assertEqual(
  420. self.executor.submissions[0]['tag'], tag_value)
  421. def assert_tag_value_for_upload_parts(self, tag_value):
  422. for submission in self.executor.submissions[1:-1]:
  423. self.assertEqual(
  424. submission['tag'], tag_value)
  425. def test_provide_file_size_on_put(self):
  426. self.call_args.subscribers.append(FileSizeProvider(len(self.content)))
  427. self.stubber.add_response(
  428. method='put_object',
  429. service_response={},
  430. expected_params={
  431. 'Body': ANY, 'Bucket': self.bucket,
  432. 'Key': self.key
  433. }
  434. )
  435. # With this submitter, it will fail to stat the file if a transfer
  436. # size is not provided.
  437. self.submission_main_kwargs['osutil'] = OSUtilsExceptionOnFileSize()
  438. self.submission_task = self.get_task(
  439. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs)
  440. self.submission_task()
  441. self.transfer_future.result()
  442. self.stubber.assert_no_pending_responses()
  443. self.assertEqual(self.sent_bodies, [self.content])
  444. def test_submits_no_tag_for_put_object_filename(self):
  445. self.wrap_executor_in_recorder()
  446. self.stubber.add_response('put_object', {})
  447. self.submission_task = self.get_task(
  448. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs)
  449. self.submission_task()
  450. self.transfer_future.result()
  451. self.stubber.assert_no_pending_responses()
  452. # Make sure no tag to limit that task specifically was not associated
  453. # to that task submission.
  454. self.assert_tag_value_for_put_object(None)
  455. def test_submits_no_tag_for_multipart_filename(self):
  456. self.wrap_executor_in_recorder()
  457. # Set up for a multipart upload.
  458. self.add_multipart_upload_stubbed_responses()
  459. self.config.multipart_threshold = 1
  460. self.submission_task = self.get_task(
  461. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs)
  462. self.submission_task()
  463. self.transfer_future.result()
  464. self.stubber.assert_no_pending_responses()
  465. # Make sure no tag to limit any of the upload part tasks were
  466. # were associated when submitted to the executor
  467. self.assert_tag_value_for_upload_parts(None)
  468. def test_submits_no_tag_for_put_object_fileobj(self):
  469. self.wrap_executor_in_recorder()
  470. self.stubber.add_response('put_object', {})
  471. with open(self.filename, 'rb') as f:
  472. self.use_fileobj_in_call_args(f)
  473. self.submission_task = self.get_task(
  474. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs)
  475. self.submission_task()
  476. self.transfer_future.result()
  477. self.stubber.assert_no_pending_responses()
  478. # Make sure no tag to limit that task specifically was not associated
  479. # to that task submission.
  480. self.assert_tag_value_for_put_object(None)
  481. def test_submits_tag_for_multipart_fileobj(self):
  482. self.wrap_executor_in_recorder()
  483. # Set up for a multipart upload.
  484. self.add_multipart_upload_stubbed_responses()
  485. self.config.multipart_threshold = 1
  486. with open(self.filename, 'rb') as f:
  487. self.use_fileobj_in_call_args(f)
  488. self.submission_task = self.get_task(
  489. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs)
  490. self.submission_task()
  491. self.transfer_future.result()
  492. self.stubber.assert_no_pending_responses()
  493. # Make sure tags to limit all of the upload part tasks were
  494. # were associated when submitted to the executor as these tasks will
  495. # have chunks of data stored with them in memory.
  496. self.assert_tag_value_for_upload_parts(IN_MEMORY_UPLOAD_TAG)
  497. class TestPutObjectTask(BaseUploadTest):
  498. def test_main(self):
  499. extra_args = {'Metadata': {'foo': 'bar'}}
  500. with open(self.filename, 'rb') as fileobj:
  501. task = self.get_task(
  502. PutObjectTask,
  503. main_kwargs={
  504. 'client': self.client,
  505. 'fileobj': fileobj,
  506. 'bucket': self.bucket,
  507. 'key': self.key,
  508. 'extra_args': extra_args
  509. }
  510. )
  511. self.stubber.add_response(
  512. method='put_object',
  513. service_response={},
  514. expected_params={
  515. 'Body': ANY, 'Bucket': self.bucket, 'Key': self.key,
  516. 'Metadata': {'foo': 'bar'}
  517. }
  518. )
  519. task()
  520. self.stubber.assert_no_pending_responses()
  521. self.assertEqual(self.sent_bodies, [self.content])
  522. class TestUploadPartTask(BaseUploadTest):
  523. def test_main(self):
  524. extra_args = {'RequestPayer': 'requester'}
  525. upload_id = 'my-id'
  526. part_number = 1
  527. etag = 'foo'
  528. with open(self.filename, 'rb') as fileobj:
  529. task = self.get_task(
  530. UploadPartTask,
  531. main_kwargs={
  532. 'client': self.client,
  533. 'fileobj': fileobj,
  534. 'bucket': self.bucket,
  535. 'key': self.key,
  536. 'upload_id': upload_id,
  537. 'part_number': part_number,
  538. 'extra_args': extra_args
  539. }
  540. )
  541. self.stubber.add_response(
  542. method='upload_part',
  543. service_response={'ETag': etag},
  544. expected_params={
  545. 'Body': ANY, 'Bucket': self.bucket, 'Key': self.key,
  546. 'UploadId': upload_id, 'PartNumber': part_number,
  547. 'RequestPayer': 'requester'
  548. }
  549. )
  550. rval = task()
  551. self.stubber.assert_no_pending_responses()
  552. self.assertEqual(rval, {'ETag': etag, 'PartNumber': part_number})
  553. self.assertEqual(self.sent_bodies, [self.content])