test_download.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import copy
  14. import os
  15. import shutil
  16. import tempfile
  17. import socket
  18. import mock
  19. from __tests__ import BaseTaskTest
  20. from __tests__ import BaseSubmissionTaskTest
  21. from __tests__ import StreamWithError
  22. from __tests__ import FileCreator
  23. from __tests__ import unittest
  24. from __tests__ import RecordingExecutor
  25. from __tests__ import NonSeekableWriter
  26. from s3transfer.compat import six
  27. from s3transfer.compat import SOCKET_ERROR
  28. from s3transfer.exceptions import RetriesExceededError
  29. from s3transfer.bandwidth import BandwidthLimiter
  30. from s3transfer.download import DownloadFilenameOutputManager
  31. from s3transfer.download import DownloadSpecialFilenameOutputManager
  32. from s3transfer.download import DownloadSeekableOutputManager
  33. from s3transfer.download import DownloadNonSeekableOutputManager
  34. from s3transfer.download import DownloadSubmissionTask
  35. from s3transfer.download import GetObjectTask
  36. from s3transfer.download import ImmediatelyWriteIOGetObjectTask
  37. from s3transfer.download import IOWriteTask
  38. from s3transfer.download import IOStreamingWriteTask
  39. from s3transfer.download import IORenameFileTask
  40. from s3transfer.download import IOCloseTask
  41. from s3transfer.download import CompleteDownloadNOOPTask
  42. from s3transfer.download import DownloadChunkIterator
  43. from s3transfer.download import DeferQueue
  44. from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG
  45. from s3transfer.futures import BoundedExecutor
  46. from s3transfer.utils import OSUtils
  47. from s3transfer.utils import CallArgs
  48. class DownloadException(Exception):
  49. pass
  50. class WriteCollector(object):
  51. """A utility to collect information about writes and seeks"""
  52. def __init__(self):
  53. self._pos = 0
  54. self.writes = []
  55. def seek(self, pos, whence=0):
  56. self._pos = pos
  57. def write(self, data):
  58. self.writes.append((self._pos, data))
  59. self._pos += len(data)
  60. class AlwaysIndicatesSpecialFileOSUtils(OSUtils):
  61. """OSUtil that always returns True for is_special_file"""
  62. def is_special_file(self, filename):
  63. return True
  64. class CancelledStreamWrapper(object):
  65. """A wrapper to trigger a cancellation while stream reading
  66. Forces the transfer coordinator to cancel after a certain amount of reads
  67. :param stream: The underlying stream to read from
  68. :param transfer_coordinator: The coordinator for the transfer
  69. :param num_reads: On which read to sigal a cancellation. 0 is the first
  70. read.
  71. """
  72. def __init__(self, stream, transfer_coordinator, num_reads=0):
  73. self._stream = stream
  74. self._transfer_coordinator = transfer_coordinator
  75. self._num_reads = num_reads
  76. self._count = 0
  77. def read(self, *args, **kwargs):
  78. if self._num_reads == self._count:
  79. self._transfer_coordinator.cancel()
  80. self._stream.read(*args, **kwargs)
  81. self._count += 1
  82. class BaseDownloadOutputManagerTest(BaseTaskTest):
  83. def setUp(self):
  84. super(BaseDownloadOutputManagerTest, self).setUp()
  85. self.osutil = OSUtils()
  86. # Create a file to write to
  87. self.tempdir = tempfile.mkdtemp()
  88. self.filename = os.path.join(self.tempdir, 'myfile')
  89. self.call_args = CallArgs(fileobj=self.filename)
  90. self.future = self.get_transfer_future(self.call_args)
  91. self.io_executor = BoundedExecutor(1000, 1)
  92. def tearDown(self):
  93. super(BaseDownloadOutputManagerTest, self).tearDown()
  94. shutil.rmtree(self.tempdir)
  95. class TestDownloadFilenameOutputManager(BaseDownloadOutputManagerTest):
  96. def setUp(self):
  97. super(TestDownloadFilenameOutputManager, self).setUp()
  98. self.download_output_manager = DownloadFilenameOutputManager(
  99. self.osutil, self.transfer_coordinator,
  100. io_executor=self.io_executor)
  101. def test_is_compatible(self):
  102. self.assertTrue(
  103. self.download_output_manager.is_compatible(
  104. self.filename, self.osutil)
  105. )
  106. def test_get_download_task_tag(self):
  107. self.assertIsNone(self.download_output_manager.get_download_task_tag())
  108. def test_get_fileobj_for_io_writes(self):
  109. with self.download_output_manager.get_fileobj_for_io_writes(
  110. self.future) as f:
  111. # Ensure it is a file like object returned
  112. self.assertTrue(hasattr(f, 'read'))
  113. self.assertTrue(hasattr(f, 'seek'))
  114. # Make sure the name of the file returned is not the same as the
  115. # final filename as we should be writing to a temporary file.
  116. self.assertNotEqual(f.name, self.filename)
  117. def test_get_final_io_task(self):
  118. ref_contents = b'my_contents'
  119. with self.download_output_manager.get_fileobj_for_io_writes(
  120. self.future) as f:
  121. temp_filename = f.name
  122. # Write some data to test that the data gets moved over to the
  123. # final location.
  124. f.write(ref_contents)
  125. final_task = self.download_output_manager.get_final_io_task()
  126. # Make sure it is the appropriate task.
  127. self.assertIsInstance(final_task, IORenameFileTask)
  128. final_task()
  129. # Make sure the temp_file gets removed
  130. self.assertFalse(os.path.exists(temp_filename))
  131. # Make sure what ever was written to the temp file got moved to
  132. # the final filename
  133. with open(self.filename, 'rb') as f:
  134. self.assertEqual(f.read(), ref_contents)
  135. def test_can_queue_file_io_task(self):
  136. fileobj = WriteCollector()
  137. self.download_output_manager.queue_file_io_task(
  138. fileobj=fileobj, data='foo', offset=0)
  139. self.download_output_manager.queue_file_io_task(
  140. fileobj=fileobj, data='bar', offset=3)
  141. self.io_executor.shutdown()
  142. self.assertEqual(fileobj.writes, [(0, 'foo'), (3, 'bar')])
  143. def test_get_file_io_write_task(self):
  144. fileobj = WriteCollector()
  145. io_write_task = self.download_output_manager.get_io_write_task(
  146. fileobj=fileobj, data='foo', offset=3)
  147. self.assertIsInstance(io_write_task, IOWriteTask)
  148. io_write_task()
  149. self.assertEqual(fileobj.writes, [(3, 'foo')])
  150. class TestDownloadSpecialFilenameOutputManager(BaseDownloadOutputManagerTest):
  151. def setUp(self):
  152. super(TestDownloadSpecialFilenameOutputManager, self).setUp()
  153. self.osutil = AlwaysIndicatesSpecialFileOSUtils()
  154. self.download_output_manager = DownloadSpecialFilenameOutputManager(
  155. self.osutil, self.transfer_coordinator,
  156. io_executor=self.io_executor)
  157. def test_is_compatible_for_special_file(self):
  158. self.assertTrue(
  159. self.download_output_manager.is_compatible(
  160. self.filename, AlwaysIndicatesSpecialFileOSUtils()))
  161. def test_is_not_compatible_for_non_special_file(self):
  162. self.assertFalse(
  163. self.download_output_manager.is_compatible(
  164. self.filename, OSUtils()))
  165. def test_get_fileobj_for_io_writes(self):
  166. with self.download_output_manager.get_fileobj_for_io_writes(
  167. self.future) as f:
  168. # Ensure it is a file like object returned
  169. self.assertTrue(hasattr(f, 'read'))
  170. # Make sure the name of the file returned is the same as the
  171. # final filename as we should not be writing to a temporary file.
  172. self.assertEqual(f.name, self.filename)
  173. def test_get_final_io_task(self):
  174. self.assertIsInstance(
  175. self.download_output_manager.get_final_io_task(), IOCloseTask)
  176. def test_can_queue_file_io_task(self):
  177. fileobj = WriteCollector()
  178. self.download_output_manager.queue_file_io_task(
  179. fileobj=fileobj, data='foo', offset=0)
  180. self.download_output_manager.queue_file_io_task(
  181. fileobj=fileobj, data='bar', offset=3)
  182. self.io_executor.shutdown()
  183. self.assertEqual(fileobj.writes, [(0, 'foo'), (3, 'bar')])
  184. class TestDownloadSeekableOutputManager(BaseDownloadOutputManagerTest):
  185. def setUp(self):
  186. super(TestDownloadSeekableOutputManager, self).setUp()
  187. self.download_output_manager = DownloadSeekableOutputManager(
  188. self.osutil, self.transfer_coordinator,
  189. io_executor=self.io_executor)
  190. # Create a fileobj to write to
  191. self.fileobj = open(self.filename, 'wb')
  192. self.call_args = CallArgs(fileobj=self.fileobj)
  193. self.future = self.get_transfer_future(self.call_args)
  194. def tearDown(self):
  195. self.fileobj.close()
  196. super(TestDownloadSeekableOutputManager, self).tearDown()
  197. def test_is_compatible(self):
  198. self.assertTrue(
  199. self.download_output_manager.is_compatible(
  200. self.fileobj, self.osutil)
  201. )
  202. def test_is_compatible_bytes_io(self):
  203. self.assertTrue(
  204. self.download_output_manager.is_compatible(
  205. six.BytesIO(), self.osutil)
  206. )
  207. def test_not_compatible_for_non_filelike_obj(self):
  208. self.assertFalse(self.download_output_manager.is_compatible(
  209. object(), self.osutil)
  210. )
  211. def test_get_download_task_tag(self):
  212. self.assertIsNone(self.download_output_manager.get_download_task_tag())
  213. def test_get_fileobj_for_io_writes(self):
  214. self.assertIs(
  215. self.download_output_manager.get_fileobj_for_io_writes(
  216. self.future),
  217. self.fileobj
  218. )
  219. def test_get_final_io_task(self):
  220. self.assertIsInstance(
  221. self.download_output_manager.get_final_io_task(),
  222. CompleteDownloadNOOPTask
  223. )
  224. def test_can_queue_file_io_task(self):
  225. fileobj = WriteCollector()
  226. self.download_output_manager.queue_file_io_task(
  227. fileobj=fileobj, data='foo', offset=0)
  228. self.download_output_manager.queue_file_io_task(
  229. fileobj=fileobj, data='bar', offset=3)
  230. self.io_executor.shutdown()
  231. self.assertEqual(fileobj.writes, [(0, 'foo'), (3, 'bar')])
  232. def test_get_file_io_write_task(self):
  233. fileobj = WriteCollector()
  234. io_write_task = self.download_output_manager.get_io_write_task(
  235. fileobj=fileobj, data='foo', offset=3)
  236. self.assertIsInstance(io_write_task, IOWriteTask)
  237. io_write_task()
  238. self.assertEqual(fileobj.writes, [(3, 'foo')])
  239. class TestDownloadNonSeekableOutputManager(BaseDownloadOutputManagerTest):
  240. def setUp(self):
  241. super(TestDownloadNonSeekableOutputManager, self).setUp()
  242. self.download_output_manager = DownloadNonSeekableOutputManager(
  243. self.osutil, self.transfer_coordinator, io_executor=None)
  244. def test_is_compatible_with_seekable_stream(self):
  245. with open(self.filename, 'wb') as f:
  246. self.assertTrue(self.download_output_manager.is_compatible(
  247. f, self.osutil)
  248. )
  249. def test_not_compatible_with_filename(self):
  250. self.assertFalse(self.download_output_manager.is_compatible(
  251. self.filename, self.osutil))
  252. def test_compatible_with_non_seekable_stream(self):
  253. class NonSeekable(object):
  254. def write(self, data):
  255. pass
  256. f = NonSeekable()
  257. self.assertTrue(self.download_output_manager.is_compatible(
  258. f, self.osutil)
  259. )
  260. def test_is_compatible_with_bytesio(self):
  261. self.assertTrue(
  262. self.download_output_manager.is_compatible(
  263. six.BytesIO(), self.osutil)
  264. )
  265. def test_get_download_task_tag(self):
  266. self.assertIs(
  267. self.download_output_manager.get_download_task_tag(),
  268. IN_MEMORY_DOWNLOAD_TAG)
  269. def test_submit_writes_from_internal_queue(self):
  270. class FakeQueue(object):
  271. def request_writes(self, offset, data):
  272. return [
  273. {'offset': 0, 'data': 'foo'},
  274. {'offset': 3, 'data': 'bar'},
  275. ]
  276. q = FakeQueue()
  277. io_executor = BoundedExecutor(1000, 1)
  278. manager = DownloadNonSeekableOutputManager(
  279. self.osutil, self.transfer_coordinator, io_executor=io_executor,
  280. defer_queue=q)
  281. fileobj = WriteCollector()
  282. manager.queue_file_io_task(
  283. fileobj=fileobj, data='foo', offset=1)
  284. io_executor.shutdown()
  285. self.assertEqual(fileobj.writes, [(0, 'foo'), (3, 'bar')])
  286. def test_get_file_io_write_task(self):
  287. fileobj = WriteCollector()
  288. io_write_task = self.download_output_manager.get_io_write_task(
  289. fileobj=fileobj, data='foo', offset=1)
  290. self.assertIsInstance(io_write_task, IOStreamingWriteTask)
  291. io_write_task()
  292. self.assertEqual(fileobj.writes, [(0, 'foo')])
  293. class TestDownloadSubmissionTask(BaseSubmissionTaskTest):
  294. def setUp(self):
  295. super(TestDownloadSubmissionTask, self).setUp()
  296. self.tempdir = tempfile.mkdtemp()
  297. self.filename = os.path.join(self.tempdir, 'myfile')
  298. self.bucket = 'mybucket'
  299. self.key = 'mykey'
  300. self.extra_args = {}
  301. self.subscribers = []
  302. # Create a stream to read from
  303. self.content = b'my content'
  304. self.stream = six.BytesIO(self.content)
  305. # A list to keep track of all of the bodies sent over the wire
  306. # and their order.
  307. self.call_args = self.get_call_args()
  308. self.transfer_future = self.get_transfer_future(self.call_args)
  309. self.io_executor = BoundedExecutor(1000, 1)
  310. self.submission_main_kwargs = {
  311. 'client': self.client,
  312. 'config': self.config,
  313. 'osutil': self.osutil,
  314. 'request_executor': self.executor,
  315. 'io_executor': self.io_executor,
  316. 'transfer_future': self.transfer_future
  317. }
  318. self.submission_task = self.get_download_submission_task()
  319. def tearDown(self):
  320. super(TestDownloadSubmissionTask, self).tearDown()
  321. shutil.rmtree(self.tempdir)
  322. def get_call_args(self, **kwargs):
  323. default_call_args = {
  324. 'fileobj': self.filename, 'bucket': self.bucket,
  325. 'key': self.key, 'extra_args': self.extra_args,
  326. 'subscribers': self.subscribers
  327. }
  328. default_call_args.update(kwargs)
  329. return CallArgs(**default_call_args)
  330. def wrap_executor_in_recorder(self):
  331. self.executor = RecordingExecutor(self.executor)
  332. self.submission_main_kwargs['request_executor'] = self.executor
  333. def use_fileobj_in_call_args(self, fileobj):
  334. self.call_args = self.get_call_args(fileobj=fileobj)
  335. self.transfer_future = self.get_transfer_future(self.call_args)
  336. self.submission_main_kwargs['transfer_future'] = self.transfer_future
  337. def assert_tag_for_get_object(self, tag_value):
  338. submissions_to_compare = self.executor.submissions
  339. if len(submissions_to_compare) > 1:
  340. # If it was ranged get, make sure we do not include the join task.
  341. submissions_to_compare = submissions_to_compare[:-1]
  342. for submission in submissions_to_compare:
  343. self.assertEqual(
  344. submission['tag'], tag_value)
  345. def add_head_object_response(self):
  346. self.stubber.add_response(
  347. 'head_object', {'ContentLength': len(self.content)})
  348. def add_get_responses(self):
  349. chunksize = self.config.multipart_chunksize
  350. for i in range(0, len(self.content), chunksize):
  351. if i + chunksize > len(self.content):
  352. stream = six.BytesIO(self.content[i:])
  353. self.stubber.add_response('get_object', {'Body': stream})
  354. else:
  355. stream = six.BytesIO(self.content[i:i+chunksize])
  356. self.stubber.add_response('get_object', {'Body': stream})
  357. def configure_for_ranged_get(self):
  358. self.config.multipart_threshold = 1
  359. self.config.multipart_chunksize = 4
  360. def get_download_submission_task(self):
  361. return self.get_task(
  362. DownloadSubmissionTask, main_kwargs=self.submission_main_kwargs)
  363. def wait_and_assert_completed_successfully(self, submission_task):
  364. submission_task()
  365. self.transfer_future.result()
  366. self.stubber.assert_no_pending_responses()
  367. def test_submits_no_tag_for_get_object_filename(self):
  368. self.wrap_executor_in_recorder()
  369. self.add_head_object_response()
  370. self.add_get_responses()
  371. self.submission_task = self.get_download_submission_task()
  372. self.wait_and_assert_completed_successfully(self.submission_task)
  373. # Make sure no tag to limit that task specifically was not associated
  374. # to that task submission.
  375. self.assert_tag_for_get_object(None)
  376. def test_submits_no_tag_for_ranged_get_filename(self):
  377. self.wrap_executor_in_recorder()
  378. self.configure_for_ranged_get()
  379. self.add_head_object_response()
  380. self.add_get_responses()
  381. self.submission_task = self.get_download_submission_task()
  382. self.wait_and_assert_completed_successfully(self.submission_task)
  383. # Make sure no tag to limit that task specifically was not associated
  384. # to that task submission.
  385. self.assert_tag_for_get_object(None)
  386. def test_submits_no_tag_for_get_object_fileobj(self):
  387. self.wrap_executor_in_recorder()
  388. self.add_head_object_response()
  389. self.add_get_responses()
  390. with open(self.filename, 'wb') as f:
  391. self.use_fileobj_in_call_args(f)
  392. self.submission_task = self.get_download_submission_task()
  393. self.wait_and_assert_completed_successfully(self.submission_task)
  394. # Make sure no tag to limit that task specifically was not associated
  395. # to that task submission.
  396. self.assert_tag_for_get_object(None)
  397. def test_submits_no_tag_for_ranged_get_object_fileobj(self):
  398. self.wrap_executor_in_recorder()
  399. self.configure_for_ranged_get()
  400. self.add_head_object_response()
  401. self.add_get_responses()
  402. with open(self.filename, 'wb') as f:
  403. self.use_fileobj_in_call_args(f)
  404. self.submission_task = self.get_download_submission_task()
  405. self.wait_and_assert_completed_successfully(self.submission_task)
  406. # Make sure no tag to limit that task specifically was not associated
  407. # to that task submission.
  408. self.assert_tag_for_get_object(None)
  409. def tests_submits_tag_for_get_object_nonseekable_fileobj(self):
  410. self.wrap_executor_in_recorder()
  411. self.add_head_object_response()
  412. self.add_get_responses()
  413. with open(self.filename, 'wb') as f:
  414. self.use_fileobj_in_call_args(NonSeekableWriter(f))
  415. self.submission_task = self.get_download_submission_task()
  416. self.wait_and_assert_completed_successfully(self.submission_task)
  417. # Make sure no tag to limit that task specifically was not associated
  418. # to that task submission.
  419. self.assert_tag_for_get_object(IN_MEMORY_DOWNLOAD_TAG)
  420. def tests_submits_tag_for_ranged_get_object_nonseekable_fileobj(self):
  421. self.wrap_executor_in_recorder()
  422. self.configure_for_ranged_get()
  423. self.add_head_object_response()
  424. self.add_get_responses()
  425. with open(self.filename, 'wb') as f:
  426. self.use_fileobj_in_call_args(NonSeekableWriter(f))
  427. self.submission_task = self.get_download_submission_task()
  428. self.wait_and_assert_completed_successfully(self.submission_task)
  429. # Make sure no tag to limit that task specifically was not associated
  430. # to that task submission.
  431. self.assert_tag_for_get_object(IN_MEMORY_DOWNLOAD_TAG)
  432. class TestGetObjectTask(BaseTaskTest):
  433. def setUp(self):
  434. super(TestGetObjectTask, self).setUp()
  435. self.bucket = 'mybucket'
  436. self.key = 'mykey'
  437. self.extra_args = {}
  438. self.callbacks = []
  439. self.max_attempts = 5
  440. self.io_executor = BoundedExecutor(1000, 1)
  441. self.content = b'my content'
  442. self.stream = six.BytesIO(self.content)
  443. self.fileobj = WriteCollector()
  444. self.osutil = OSUtils()
  445. self.io_chunksize = 64 * (1024 ** 2)
  446. self.task_cls = GetObjectTask
  447. self.download_output_manager = DownloadSeekableOutputManager(
  448. self.osutil, self.transfer_coordinator, self.io_executor)
  449. def get_download_task(self, **kwargs):
  450. default_kwargs = {
  451. 'client': self.client, 'bucket': self.bucket, 'key': self.key,
  452. 'fileobj': self.fileobj, 'extra_args': self.extra_args,
  453. 'callbacks': self.callbacks,
  454. 'max_attempts': self.max_attempts,
  455. 'download_output_manager': self.download_output_manager,
  456. 'io_chunksize': self.io_chunksize,
  457. }
  458. default_kwargs.update(kwargs)
  459. self.transfer_coordinator.set_status_to_queued()
  460. return self.get_task(self.task_cls, main_kwargs=default_kwargs)
  461. def assert_io_writes(self, expected_writes):
  462. # Let the io executor process all of the writes before checking
  463. # what writes were sent to it.
  464. self.io_executor.shutdown()
  465. self.assertEqual(self.fileobj.writes, expected_writes)
  466. def test_main(self):
  467. self.stubber.add_response(
  468. 'get_object', service_response={'Body': self.stream},
  469. expected_params={'Bucket': self.bucket, 'Key': self.key}
  470. )
  471. task = self.get_download_task()
  472. task()
  473. self.stubber.assert_no_pending_responses()
  474. self.assert_io_writes([(0, self.content)])
  475. def test_extra_args(self):
  476. self.stubber.add_response(
  477. 'get_object', service_response={'Body': self.stream},
  478. expected_params={
  479. 'Bucket': self.bucket, 'Key': self.key, 'Range': 'bytes=0-'
  480. }
  481. )
  482. self.extra_args['Range'] = 'bytes=0-'
  483. task = self.get_download_task()
  484. task()
  485. self.stubber.assert_no_pending_responses()
  486. self.assert_io_writes([(0, self.content)])
  487. def test_control_chunk_size(self):
  488. self.stubber.add_response(
  489. 'get_object', service_response={'Body': self.stream},
  490. expected_params={'Bucket': self.bucket, 'Key': self.key}
  491. )
  492. task = self.get_download_task(io_chunksize=1)
  493. task()
  494. self.stubber.assert_no_pending_responses()
  495. expected_contents = []
  496. for i in range(len(self.content)):
  497. expected_contents.append((i, bytes(self.content[i:i+1])))
  498. self.assert_io_writes(expected_contents)
  499. def test_start_index(self):
  500. self.stubber.add_response(
  501. 'get_object', service_response={'Body': self.stream},
  502. expected_params={'Bucket': self.bucket, 'Key': self.key}
  503. )
  504. task = self.get_download_task(start_index=5)
  505. task()
  506. self.stubber.assert_no_pending_responses()
  507. self.assert_io_writes([(5, self.content)])
  508. def test_uses_bandwidth_limiter(self):
  509. bandwidth_limiter = mock.Mock(BandwidthLimiter)
  510. self.stubber.add_response(
  511. 'get_object', service_response={'Body': self.stream},
  512. expected_params={'Bucket': self.bucket, 'Key': self.key}
  513. )
  514. task = self.get_download_task(bandwidth_limiter=bandwidth_limiter)
  515. task()
  516. self.stubber.assert_no_pending_responses()
  517. self.assertEqual(
  518. bandwidth_limiter.get_bandwith_limited_stream.call_args_list,
  519. [mock.call(mock.ANY, self.transfer_coordinator)]
  520. )
  521. def test_retries_succeeds(self):
  522. self.stubber.add_response(
  523. 'get_object', service_response={
  524. 'Body': StreamWithError(self.stream, SOCKET_ERROR)
  525. },
  526. expected_params={'Bucket': self.bucket, 'Key': self.key}
  527. )
  528. self.stubber.add_response(
  529. 'get_object', service_response={'Body': self.stream},
  530. expected_params={'Bucket': self.bucket, 'Key': self.key}
  531. )
  532. task = self.get_download_task()
  533. task()
  534. # Retryable error should have not affected the bytes placed into
  535. # the io queue.
  536. self.stubber.assert_no_pending_responses()
  537. self.assert_io_writes([(0, self.content)])
  538. def test_retries_failure(self):
  539. for _ in range(self.max_attempts):
  540. self.stubber.add_response(
  541. 'get_object', service_response={
  542. 'Body': StreamWithError(self.stream, SOCKET_ERROR)
  543. },
  544. expected_params={'Bucket': self.bucket, 'Key': self.key}
  545. )
  546. task = self.get_download_task()
  547. task()
  548. self.transfer_coordinator.announce_done()
  549. # Should have failed out on a RetriesExceededError
  550. with self.assertRaises(RetriesExceededError):
  551. self.transfer_coordinator.result()
  552. self.stubber.assert_no_pending_responses()
  553. def test_retries_in_middle_of_streaming(self):
  554. # After the first read a retryable error will be thrown
  555. self.stubber.add_response(
  556. 'get_object', service_response={
  557. 'Body': StreamWithError(
  558. copy.deepcopy(self.stream), SOCKET_ERROR, 1)
  559. },
  560. expected_params={'Bucket': self.bucket, 'Key': self.key}
  561. )
  562. self.stubber.add_response(
  563. 'get_object', service_response={'Body': self.stream},
  564. expected_params={'Bucket': self.bucket, 'Key': self.key}
  565. )
  566. task = self.get_download_task(io_chunksize=1)
  567. task()
  568. self.stubber.assert_no_pending_responses()
  569. expected_contents = []
  570. # This is the content intially read in before the retry hit on the
  571. # second read()
  572. expected_contents.append((0, bytes(self.content[0:1])))
  573. # The rest of the content should be the entire set of data partitioned
  574. # out based on the one byte stream chunk size. Note the second
  575. # element in the list should be a copy of the first element since
  576. # a retryable exception happened in between.
  577. for i in range(len(self.content)):
  578. expected_contents.append((i, bytes(self.content[i:i+1])))
  579. self.assert_io_writes(expected_contents)
  580. def test_cancels_out_of_queueing(self):
  581. self.stubber.add_response(
  582. 'get_object',
  583. service_response={
  584. 'Body': CancelledStreamWrapper(
  585. self.stream, self.transfer_coordinator)
  586. },
  587. expected_params={'Bucket': self.bucket, 'Key': self.key}
  588. )
  589. task = self.get_download_task()
  590. task()
  591. self.stubber.assert_no_pending_responses()
  592. # Make sure that no contents were added to the queue because the task
  593. # should have been canceled before trying to add the contents to the
  594. # io queue.
  595. self.assert_io_writes([])
  596. def test_handles_callback_on_initial_error(self):
  597. # We can't use the stubber for this because we need to raise
  598. # a S3_RETRYABLE_DOWNLOAD_ERRORS, and the stubber only allows
  599. # you to raise a ClientError.
  600. self.client.get_object = mock.Mock(side_effect=SOCKET_ERROR())
  601. task = self.get_download_task()
  602. task()
  603. self.transfer_coordinator.announce_done()
  604. # Should have failed out on a RetriesExceededError because
  605. # get_object keeps raising a socket error.
  606. with self.assertRaises(RetriesExceededError):
  607. self.transfer_coordinator.result()
  608. class TestImmediatelyWriteIOGetObjectTask(TestGetObjectTask):
  609. def setUp(self):
  610. super(TestImmediatelyWriteIOGetObjectTask, self).setUp()
  611. self.task_cls = ImmediatelyWriteIOGetObjectTask
  612. # When data is written out, it should not use the io executor at all
  613. # if it does use the io executor that is a deviation from expected
  614. # behavior as the data should be written immediately to the file
  615. # object once downloaded.
  616. self.io_executor = None
  617. self.download_output_manager = DownloadSeekableOutputManager(
  618. self.osutil, self.transfer_coordinator, self.io_executor)
  619. def assert_io_writes(self, expected_writes):
  620. self.assertEqual(self.fileobj.writes, expected_writes)
  621. class BaseIOTaskTest(BaseTaskTest):
  622. def setUp(self):
  623. super(BaseIOTaskTest, self).setUp()
  624. self.files = FileCreator()
  625. self.osutil = OSUtils()
  626. self.temp_filename = os.path.join(self.files.rootdir, 'mytempfile')
  627. self.final_filename = os.path.join(self.files.rootdir, 'myfile')
  628. def tearDown(self):
  629. super(BaseIOTaskTest, self).tearDown()
  630. self.files.remove_all()
  631. class TestIOStreamingWriteTask(BaseIOTaskTest):
  632. def test_main(self):
  633. with open(self.temp_filename, 'wb') as f:
  634. task = self.get_task(
  635. IOStreamingWriteTask,
  636. main_kwargs={
  637. 'fileobj': f,
  638. 'data': b'foobar'
  639. }
  640. )
  641. task()
  642. task2 = self.get_task(
  643. IOStreamingWriteTask,
  644. main_kwargs={
  645. 'fileobj': f,
  646. 'data': b'baz'
  647. }
  648. )
  649. task2()
  650. with open(self.temp_filename, 'rb') as f:
  651. # We should just have written to the file in the order
  652. # the tasks were executed.
  653. self.assertEqual(f.read(), b'foobarbaz')
  654. class TestIOWriteTask(BaseIOTaskTest):
  655. def test_main(self):
  656. with open(self.temp_filename, 'wb') as f:
  657. # Write once to the file
  658. task = self.get_task(
  659. IOWriteTask,
  660. main_kwargs={
  661. 'fileobj': f,
  662. 'data': b'foo',
  663. 'offset': 0
  664. }
  665. )
  666. task()
  667. # Write again to the file
  668. task = self.get_task(
  669. IOWriteTask,
  670. main_kwargs={
  671. 'fileobj': f,
  672. 'data': b'bar',
  673. 'offset': 3
  674. }
  675. )
  676. task()
  677. with open(self.temp_filename, 'rb') as f:
  678. self.assertEqual(f.read(), b'foobar')
  679. class TestIORenameFileTask(BaseIOTaskTest):
  680. def test_main(self):
  681. with open(self.temp_filename, 'wb') as f:
  682. task = self.get_task(
  683. IORenameFileTask,
  684. main_kwargs={
  685. 'fileobj': f,
  686. 'final_filename': self.final_filename,
  687. 'osutil': self.osutil
  688. }
  689. )
  690. task()
  691. self.assertTrue(os.path.exists(self.final_filename))
  692. self.assertFalse(os.path.exists(self.temp_filename))
  693. class TestIOCloseTask(BaseIOTaskTest):
  694. def test_main(self):
  695. with open(self.temp_filename, 'w') as f:
  696. task = self.get_task(IOCloseTask, main_kwargs={'fileobj': f})
  697. task()
  698. self.assertTrue(f.closed)
  699. class TestDownloadChunkIterator(unittest.TestCase):
  700. def test_iter(self):
  701. content = b'my content'
  702. body = six.BytesIO(content)
  703. ref_chunks = []
  704. for chunk in DownloadChunkIterator(body, len(content)):
  705. ref_chunks.append(chunk)
  706. self.assertEqual(ref_chunks, [b'my content'])
  707. def test_iter_chunksize(self):
  708. content = b'1234'
  709. body = six.BytesIO(content)
  710. ref_chunks = []
  711. for chunk in DownloadChunkIterator(body, 3):
  712. ref_chunks.append(chunk)
  713. self.assertEqual(ref_chunks, [b'123', b'4'])
  714. def test_empty_content(self):
  715. body = six.BytesIO(b'')
  716. ref_chunks = []
  717. for chunk in DownloadChunkIterator(body, 3):
  718. ref_chunks.append(chunk)
  719. self.assertEqual(ref_chunks, [b''])
  720. class TestDeferQueue(unittest.TestCase):
  721. def setUp(self):
  722. self.q = DeferQueue()
  723. def test_no_writes_when_not_lowest_block(self):
  724. writes = self.q.request_writes(offset=1, data='bar')
  725. self.assertEqual(writes, [])
  726. def test_writes_returned_in_order(self):
  727. self.assertEqual(self.q.request_writes(offset=3, data='d'), [])
  728. self.assertEqual(self.q.request_writes(offset=2, data='c'), [])
  729. self.assertEqual(self.q.request_writes(offset=1, data='b'), [])
  730. # Everything at this point has been deferred, but as soon as we
  731. # send offset=0, that will unlock offsets 0-3.
  732. writes = self.q.request_writes(offset=0, data='a')
  733. self.assertEqual(
  734. writes,
  735. [
  736. {'offset': 0, 'data': 'a'},
  737. {'offset': 1, 'data': 'b'},
  738. {'offset': 2, 'data': 'c'},
  739. {'offset': 3, 'data': 'd'}
  740. ]
  741. )
  742. def test_unlocks_partial_range(self):
  743. self.assertEqual(self.q.request_writes(offset=5, data='f'), [])
  744. self.assertEqual(self.q.request_writes(offset=1, data='b'), [])
  745. # offset=0 unlocks 0-1, but offset=5 still needs to see 2-4 first.
  746. writes = self.q.request_writes(offset=0, data='a')
  747. self.assertEqual(
  748. writes,
  749. [
  750. {'offset': 0, 'data': 'a'},
  751. {'offset': 1, 'data': 'b'},
  752. ]
  753. )
  754. def test_data_can_be_any_size(self):
  755. self.q.request_writes(offset=5, data='hello world')
  756. writes = self.q.request_writes(offset=0, data='abcde')
  757. self.assertEqual(
  758. writes,
  759. [
  760. {'offset': 0, 'data': 'abcde'},
  761. {'offset': 5, 'data': 'hello world'},
  762. ]
  763. )
  764. def test_data_queued_in_order(self):
  765. # This immediately gets returned because offset=0 is the
  766. # next range we're waiting on.
  767. writes = self.q.request_writes(offset=0, data='hello world')
  768. self.assertEqual(writes, [{'offset': 0, 'data': 'hello world'}])
  769. # Same thing here but with offset
  770. writes = self.q.request_writes(offset=11, data='hello again')
  771. self.assertEqual(writes, [{'offset': 11, 'data': 'hello again'}])
  772. def test_writes_below_min_offset_are_ignored(self):
  773. self.q.request_writes(offset=0, data='a')
  774. self.q.request_writes(offset=1, data='b')
  775. self.q.request_writes(offset=2, data='c')
  776. # At this point we're expecting offset=3, so if a write
  777. # comes in below 3, we ignore it.
  778. self.assertEqual(self.q.request_writes(offset=0, data='a'), [])
  779. self.assertEqual(self.q.request_writes(offset=1, data='b'), [])
  780. self.assertEqual(
  781. self.q.request_writes(offset=3, data='d'),
  782. [{'offset': 3, 'data': 'd'}]
  783. )
  784. def test_duplicate_writes_are_ignored(self):
  785. self.q.request_writes(offset=2, data='c')
  786. self.q.request_writes(offset=1, data='b')
  787. # We're still waiting for offset=0, but if
  788. # a duplicate write comes in for offset=2/offset=1
  789. # it's ignored. This gives "first one wins" behavior.
  790. self.assertEqual(self.q.request_writes(offset=2, data='X'), [])
  791. self.assertEqual(self.q.request_writes(offset=1, data='Y'), [])
  792. self.assertEqual(
  793. self.q.request_writes(offset=0, data='a'),
  794. [
  795. {'offset': 0, 'data': 'a'},
  796. # Note we're seeing 'b' 'c', and not 'X', 'Y'.
  797. {'offset': 1, 'data': 'b'},
  798. {'offset': 2, 'data': 'c'},
  799. ]
  800. )