test_download.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import copy
  14. import os
  15. import tempfile
  16. import time
  17. import shutil
  18. import glob
  19. from botocore.exceptions import ClientError
  20. from __tests__ import StreamWithError
  21. from __tests__ import FileSizeProvider
  22. from __tests__ import RecordingSubscriber
  23. from __tests__ import RecordingOSUtils
  24. from __tests__ import NonSeekableWriter
  25. from __tests__ import BaseGeneralInterfaceTest
  26. from __tests__ import skip_if_windows
  27. from __tests__ import skip_if_using_serial_implementation
  28. from s3transfer.compat import six
  29. from s3transfer.compat import SOCKET_ERROR
  30. from s3transfer.exceptions import RetriesExceededError
  31. from s3transfer.manager import TransferManager
  32. from s3transfer.manager import TransferConfig
  33. from s3transfer.download import GetObjectTask
  34. class BaseDownloadTest(BaseGeneralInterfaceTest):
  35. def setUp(self):
  36. super(BaseDownloadTest, self).setUp()
  37. self.config = TransferConfig(max_request_concurrency=1)
  38. self._manager = TransferManager(self.client, self.config)
  39. # Create a temporary directory to write to
  40. self.tempdir = tempfile.mkdtemp()
  41. self.filename = os.path.join(self.tempdir, 'myfile')
  42. # Initialize some default arguments
  43. self.bucket = 'mybucket'
  44. self.key = 'mykey'
  45. self.extra_args = {}
  46. self.subscribers = []
  47. # Create a stream to read from
  48. self.content = b'my content'
  49. self.stream = six.BytesIO(self.content)
  50. def tearDown(self):
  51. super(BaseDownloadTest, self).tearDown()
  52. shutil.rmtree(self.tempdir)
  53. @property
  54. def manager(self):
  55. return self._manager
  56. @property
  57. def method(self):
  58. return self.manager.download
  59. def create_call_kwargs(self):
  60. return {
  61. 'bucket': self.bucket,
  62. 'key': self.key,
  63. 'fileobj': self.filename
  64. }
  65. def create_invalid_extra_args(self):
  66. return {
  67. 'Foo': 'bar'
  68. }
  69. def create_stubbed_responses(self):
  70. # We want to make sure the beginning of the stream is always used
  71. # incase this gets called twice.
  72. self.stream.seek(0)
  73. return [
  74. {
  75. 'method': 'head_object',
  76. 'service_response': {
  77. 'ContentLength': len(self.content)
  78. }
  79. },
  80. {
  81. 'method': 'get_object',
  82. 'service_response': {
  83. 'Body': self.stream
  84. }
  85. }
  86. ]
  87. def create_expected_progress_callback_info(self):
  88. # Note that last read is from the empty sentinel indicating
  89. # that the stream is done.
  90. return [
  91. {'bytes_transferred': 10}
  92. ]
  93. def add_head_object_response(self, expected_params=None):
  94. head_response = self.create_stubbed_responses()[0]
  95. if expected_params:
  96. head_response['expected_params'] = expected_params
  97. self.stubber.add_response(**head_response)
  98. def add_successful_get_object_responses(
  99. self, expected_params=None, expected_ranges=None):
  100. # Add all get_object responses needed to complete the download.
  101. # Should account for both ranged and nonranged downloads.
  102. for i, stubbed_response in enumerate(
  103. self.create_stubbed_responses()[1:]):
  104. if expected_params:
  105. stubbed_response['expected_params'] = copy.deepcopy(
  106. expected_params)
  107. if expected_ranges:
  108. stubbed_response['expected_params'][
  109. 'Range'] = expected_ranges[i]
  110. self.stubber.add_response(**stubbed_response)
  111. def add_n_retryable_get_object_responses(self, n, num_reads=0):
  112. for _ in range(n):
  113. self.stubber.add_response(
  114. method='get_object',
  115. service_response={
  116. 'Body': StreamWithError(
  117. copy.deepcopy(self.stream), SOCKET_ERROR, num_reads)
  118. }
  119. )
  120. def test_download_temporary_file_does_not_exist(self):
  121. self.add_head_object_response()
  122. self.add_successful_get_object_responses()
  123. future = self.manager.download(**self.create_call_kwargs())
  124. future.result()
  125. # Make sure the file exists
  126. self.assertTrue(os.path.exists(self.filename))
  127. # Make sure the random temporary file does not exist
  128. possible_matches = glob.glob('%s*' % self.filename + os.extsep)
  129. self.assertEqual(possible_matches, [])
  130. def test_download_for_fileobj(self):
  131. self.add_head_object_response()
  132. self.add_successful_get_object_responses()
  133. with open(self.filename, 'wb') as f:
  134. future = self.manager.download(
  135. self.bucket, self.key, f, self.extra_args)
  136. future.result()
  137. # Ensure that the contents are correct
  138. with open(self.filename, 'rb') as f:
  139. self.assertEqual(self.content, f.read())
  140. def test_download_for_seekable_filelike_obj(self):
  141. self.add_head_object_response()
  142. self.add_successful_get_object_responses()
  143. # Create a file-like object to test. In this case, it is a BytesIO
  144. # object.
  145. bytes_io = six.BytesIO()
  146. future = self.manager.download(
  147. self.bucket, self.key, bytes_io, self.extra_args)
  148. future.result()
  149. # Ensure that the contents are correct
  150. bytes_io.seek(0)
  151. self.assertEqual(self.content, bytes_io.read())
  152. def test_download_for_nonseekable_filelike_obj(self):
  153. self.add_head_object_response()
  154. self.add_successful_get_object_responses()
  155. with open(self.filename, 'wb') as f:
  156. future = self.manager.download(
  157. self.bucket, self.key, NonSeekableWriter(f), self.extra_args)
  158. future.result()
  159. # Ensure that the contents are correct
  160. with open(self.filename, 'rb') as f:
  161. self.assertEqual(self.content, f.read())
  162. def test_download_cleanup_on_failure(self):
  163. self.add_head_object_response()
  164. # Throw an error on the download
  165. self.stubber.add_client_error('get_object')
  166. future = self.manager.download(**self.create_call_kwargs())
  167. with self.assertRaises(ClientError):
  168. future.result()
  169. # Make sure the actual file and the temporary do not exist
  170. # by globbing for the file and any of its extensions
  171. possible_matches = glob.glob('%s*' % self.filename)
  172. self.assertEqual(possible_matches, [])
  173. def test_download_with_nonexistent_directory(self):
  174. self.add_head_object_response()
  175. self.add_successful_get_object_responses()
  176. call_kwargs = self.create_call_kwargs()
  177. call_kwargs['fileobj'] = os.path.join(
  178. self.tempdir, 'missing-directory', 'myfile')
  179. future = self.manager.download(**call_kwargs)
  180. with self.assertRaises(IOError):
  181. future.result()
  182. def test_retries_and_succeeds(self):
  183. self.add_head_object_response()
  184. # Insert a response that will trigger a retry.
  185. self.add_n_retryable_get_object_responses(1)
  186. # Add the normal responses to simulate the download proceeding
  187. # as normal after the retry.
  188. self.add_successful_get_object_responses()
  189. future = self.manager.download(**self.create_call_kwargs())
  190. future.result()
  191. # The retry should have been consumed and the process should have
  192. # continued using the successful responses.
  193. self.stubber.assert_no_pending_responses()
  194. with open(self.filename, 'rb') as f:
  195. self.assertEqual(self.content, f.read())
  196. def test_retry_failure(self):
  197. self.add_head_object_response()
  198. max_retries = 3
  199. self.config.num_download_attempts = max_retries
  200. self._manager = TransferManager(self.client, self.config)
  201. # Add responses that fill up the maximum number of retries.
  202. self.add_n_retryable_get_object_responses(max_retries)
  203. future = self.manager.download(**self.create_call_kwargs())
  204. # A retry exceeded error should have happened.
  205. with self.assertRaises(RetriesExceededError):
  206. future.result()
  207. # All of the retries should have been used up.
  208. self.stubber.assert_no_pending_responses()
  209. def test_retry_rewinds_callbacks(self):
  210. self.add_head_object_response()
  211. # Insert a response that will trigger a retry after one read of the
  212. # stream has been made.
  213. self.add_n_retryable_get_object_responses(1, num_reads=1)
  214. # Add the normal responses to simulate the download proceeding
  215. # as normal after the retry.
  216. self.add_successful_get_object_responses()
  217. recorder_subscriber = RecordingSubscriber()
  218. # Set the streaming to a size that is smaller than the data we
  219. # currently provide to it to simulate rewinds of callbacks.
  220. self.config.io_chunksize = 3
  221. future = self.manager.download(
  222. subscribers=[recorder_subscriber], **self.create_call_kwargs())
  223. future.result()
  224. # Ensure that there is no more remaining responses and that contents
  225. # are correct.
  226. self.stubber.assert_no_pending_responses()
  227. with open(self.filename, 'rb') as f:
  228. self.assertEqual(self.content, f.read())
  229. # Assert that the number of bytes seen is equal to the length of
  230. # downloaded content.
  231. self.assertEqual(
  232. recorder_subscriber.calculate_bytes_seen(), len(self.content))
  233. # Also ensure that the second progress invocation was negative three
  234. # becasue a retry happened on the second read of the stream and we
  235. # know that the chunk size for each read is 3.
  236. progress_byte_amts = [
  237. call['bytes_transferred'] for call in
  238. recorder_subscriber.on_progress_calls
  239. ]
  240. self.assertEqual(-3, progress_byte_amts[1])
  241. def test_can_provide_file_size(self):
  242. self.add_successful_get_object_responses()
  243. call_kwargs = self.create_call_kwargs()
  244. call_kwargs['subscribers'] = [FileSizeProvider(len(self.content))]
  245. future = self.manager.download(**call_kwargs)
  246. future.result()
  247. # The HeadObject should have not happened and should have been able
  248. # to successfully download the file.
  249. self.stubber.assert_no_pending_responses()
  250. with open(self.filename, 'rb') as f:
  251. self.assertEqual(self.content, f.read())
  252. def test_uses_provided_osutil(self):
  253. osutil = RecordingOSUtils()
  254. # Use the recording os utility for the transfer manager
  255. self._manager = TransferManager(self.client, self.config, osutil)
  256. self.add_head_object_response()
  257. self.add_successful_get_object_responses()
  258. future = self.manager.download(**self.create_call_kwargs())
  259. future.result()
  260. # The osutil should have had its open() method invoked when opening
  261. # a temporary file and its rename_file() method invoked when the
  262. # the temporary file was moved to its final location.
  263. self.assertEqual(len(osutil.open_records), 1)
  264. self.assertEqual(len(osutil.rename_records), 1)
  265. @skip_if_windows('Windows does not support UNIX special files')
  266. @skip_if_using_serial_implementation(
  267. 'A seperate thread is needed to read from the fifo')
  268. def test_download_for_fifo_file(self):
  269. self.add_head_object_response()
  270. self.add_successful_get_object_responses()
  271. # Create the fifo file
  272. os.mkfifo(self.filename)
  273. future = self.manager.download(
  274. self.bucket, self.key, self.filename, self.extra_args)
  275. # The call to open a fifo will block until there is both a reader
  276. # and a writer, so we need to open it for reading after we've
  277. # started the transfer.
  278. with open(self.filename, 'rb') as fifo:
  279. future.result()
  280. self.assertEqual(fifo.read(), self.content)
  281. def test_raise_exception_on_s3_object_lambda_resource(self):
  282. s3_object_lambda_arn = (
  283. 'arn:aws:s3-object-lambda:us-west-2:123456789012:'
  284. 'accesspoint:my-accesspoint'
  285. )
  286. with self.assertRaisesRegexp(ValueError, 'methods do not support'):
  287. self.manager.download(
  288. s3_object_lambda_arn, self.key, self.filename, self.extra_args)
  289. class TestNonRangedDownload(BaseDownloadTest):
  290. # TODO: If you want to add tests outside of this test class and still
  291. # subclass from BaseDownloadTest you need to set ``__test__ = True``. If
  292. # you do not, your tests will not get picked up by the test runner! This
  293. # needs to be done until we find a better way to ignore running test cases
  294. # from the general test base class, which we do not want ran.
  295. __test__ = True
  296. def test_download(self):
  297. self.extra_args['RequestPayer'] = 'requester'
  298. expected_params = {
  299. 'Bucket': self.bucket,
  300. 'Key': self.key,
  301. 'RequestPayer': 'requester'
  302. }
  303. self.add_head_object_response(expected_params)
  304. self.add_successful_get_object_responses(expected_params)
  305. future = self.manager.download(
  306. self.bucket, self.key, self.filename, self.extra_args)
  307. future.result()
  308. # Ensure that the contents are correct
  309. with open(self.filename, 'rb') as f:
  310. self.assertEqual(self.content, f.read())
  311. def test_allowed_copy_params_are_valid(self):
  312. op_model = self.client.meta.service_model.operation_model('GetObject')
  313. for allowed_upload_arg in self._manager.ALLOWED_DOWNLOAD_ARGS:
  314. self.assertIn(allowed_upload_arg, op_model.input_shape.members)
  315. def test_download_empty_object(self):
  316. self.content = b''
  317. self.stream = six.BytesIO(self.content)
  318. self.add_head_object_response()
  319. self.add_successful_get_object_responses()
  320. future = self.manager.download(
  321. self.bucket, self.key, self.filename, self.extra_args)
  322. future.result()
  323. # Ensure that the empty file exists
  324. with open(self.filename, 'rb') as f:
  325. self.assertEqual(b'', f.read())
  326. def test_uses_bandwidth_limiter(self):
  327. self.content = b'a' * 1024 * 1024
  328. self.stream = six.BytesIO(self.content)
  329. self.config = TransferConfig(
  330. max_request_concurrency=1, max_bandwidth=len(self.content)/2)
  331. self._manager = TransferManager(self.client, self.config)
  332. self.add_head_object_response()
  333. self.add_successful_get_object_responses()
  334. start = time.time()
  335. future = self.manager.download(
  336. self.bucket, self.key, self.filename, self.extra_args)
  337. future.result()
  338. # This is just a smoke test to make sure that the limiter is
  339. # being used and not necessary its exactness. So we set the maximum
  340. # bandwidth to len(content)/2 per sec and make sure that it is
  341. # noticeably slower. Ideally it will take more than two seconds, but
  342. # given tracking at the beginning of transfers are not entirely
  343. # accurate setting at the initial start of a transfer, we give us
  344. # some flexibility by setting the expected time to half of the
  345. # theoretical time to take.
  346. self.assertGreaterEqual(time.time() - start, 1)
  347. # Ensure that the contents are correct
  348. with open(self.filename, 'rb') as f:
  349. self.assertEqual(self.content, f.read())
  350. class TestRangedDownload(BaseDownloadTest):
  351. # TODO: If you want to add tests outside of this test class and still
  352. # subclass from BaseDownloadTest you need to set ``__test__ = True``. If
  353. # you do not, your tests will not get picked up by the test runner! This
  354. # needs to be done until we find a better way to ignore running test cases
  355. # from the general test base class, which we do not want ran.
  356. __test__ = True
  357. def setUp(self):
  358. super(TestRangedDownload, self).setUp()
  359. self.config = TransferConfig(
  360. max_request_concurrency=1, multipart_threshold=1,
  361. multipart_chunksize=4)
  362. self._manager = TransferManager(self.client, self.config)
  363. def create_stubbed_responses(self):
  364. return [
  365. {
  366. 'method': 'head_object',
  367. 'service_response': {
  368. 'ContentLength': len(self.content)
  369. }
  370. },
  371. {
  372. 'method': 'get_object',
  373. 'service_response': {
  374. 'Body': six.BytesIO(self.content[0:4])
  375. }
  376. },
  377. {
  378. 'method': 'get_object',
  379. 'service_response': {
  380. 'Body': six.BytesIO(self.content[4:8])
  381. }
  382. },
  383. {
  384. 'method': 'get_object',
  385. 'service_response': {
  386. 'Body': six.BytesIO(self.content[8:])
  387. }
  388. }
  389. ]
  390. def create_expected_progress_callback_info(self):
  391. return [
  392. {'bytes_transferred': 4},
  393. {'bytes_transferred': 4},
  394. {'bytes_transferred': 2},
  395. ]
  396. def test_download(self):
  397. self.extra_args['RequestPayer'] = 'requester'
  398. expected_params = {
  399. 'Bucket': self.bucket,
  400. 'Key': self.key,
  401. 'RequestPayer': 'requester'
  402. }
  403. expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-']
  404. self.add_head_object_response(expected_params)
  405. self.add_successful_get_object_responses(
  406. expected_params, expected_ranges)
  407. future = self.manager.download(
  408. self.bucket, self.key, self.filename, self.extra_args)
  409. future.result()
  410. # Ensure that the contents are correct
  411. with open(self.filename, 'rb') as f:
  412. self.assertEqual(self.content, f.read())