test_processpool.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
  1. # Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import os
  14. import signal
  15. import time
  16. import threading
  17. import mock
  18. from six.moves import queue
  19. from botocore.exceptions import ClientError
  20. from botocore.exceptions import ReadTimeoutError
  21. from botocore.client import BaseClient
  22. from botocore.config import Config
  23. from __tests__ import unittest
  24. from __tests__ import skip_if_windows
  25. from __tests__ import FileCreator
  26. from __tests__ import StreamWithError
  27. from __tests__ import StubbedClientTest
  28. from s3transfer.compat import six
  29. from s3transfer.constants import PROCESS_USER_AGENT
  30. from s3transfer.exceptions import RetriesExceededError
  31. from s3transfer.exceptions import CancelledError
  32. from s3transfer.utils import OSUtils
  33. from s3transfer.utils import CallArgs
  34. from s3transfer.processpool import SHUTDOWN_SIGNAL
  35. from s3transfer.processpool import ignore_ctrl_c
  36. from s3transfer.processpool import DownloadFileRequest
  37. from s3transfer.processpool import GetObjectJob
  38. from s3transfer.processpool import ProcessTransferConfig
  39. from s3transfer.processpool import ProcessPoolDownloader
  40. from s3transfer.processpool import ProcessPoolTransferFuture
  41. from s3transfer.processpool import ProcessPoolTransferMeta
  42. from s3transfer.processpool import TransferMonitor
  43. from s3transfer.processpool import TransferState
  44. from s3transfer.processpool import ClientFactory
  45. from s3transfer.processpool import GetObjectSubmitter
  46. from s3transfer.processpool import GetObjectWorker
  47. class RenameFailingOSUtils(OSUtils):
  48. def __init__(self, exception):
  49. self.exception = exception
  50. def rename_file(self, current_filename, new_filename):
  51. raise self.exception
  52. class TestIgnoreCtrlC(unittest.TestCase):
  53. @skip_if_windows('os.kill() with SIGINT not supported on Windows')
  54. def test_ignore_ctrl_c(self):
  55. with ignore_ctrl_c():
  56. try:
  57. os.kill(os.getpid(), signal.SIGINT)
  58. except KeyboardInterrupt:
  59. self.fail('The ignore_ctrl_c context manager should have '
  60. 'ignored the KeyboardInterrupt exception')
  61. class TestProcessPoolDownloader(unittest.TestCase):
  62. def test_uses_client_kwargs(self):
  63. with mock.patch('s3transfer.processpool.ClientFactory') as factory:
  64. ProcessPoolDownloader(client_kwargs={'region_name': 'myregion'})
  65. self.assertEqual(
  66. factory.call_args[0][0], {'region_name': 'myregion'})
  67. class TestProcessPoolTransferFuture(unittest.TestCase):
  68. def setUp(self):
  69. self.monitor = TransferMonitor()
  70. self.transfer_id = self.monitor.notify_new_transfer()
  71. self.meta = ProcessPoolTransferMeta(
  72. transfer_id=self.transfer_id, call_args=CallArgs())
  73. self.future = ProcessPoolTransferFuture(
  74. monitor=self.monitor, meta=self.meta)
  75. def test_meta(self):
  76. self.assertEqual(self.future.meta, self.meta)
  77. def test_done(self):
  78. self.assertFalse(self.future.done())
  79. self.monitor.notify_done(self.transfer_id)
  80. self.assertTrue(self.future.done())
  81. def test_result(self):
  82. self.monitor.notify_done(self.transfer_id)
  83. self.assertIsNone(self.future.result())
  84. def test_result_with_exception(self):
  85. self.monitor.notify_exception(self.transfer_id, RuntimeError())
  86. self.monitor.notify_done(self.transfer_id)
  87. with self.assertRaises(RuntimeError):
  88. self.future.result()
  89. def test_result_with_keyboard_interrupt(self):
  90. mock_monitor = mock.Mock(TransferMonitor)
  91. mock_monitor._connect = mock.Mock()
  92. mock_monitor.poll_for_result.side_effect = KeyboardInterrupt()
  93. future = ProcessPoolTransferFuture(
  94. monitor=mock_monitor, meta=self.meta)
  95. with self.assertRaises(KeyboardInterrupt):
  96. future.result()
  97. self.assertTrue(mock_monitor._connect.called)
  98. self.assertTrue(mock_monitor.notify_exception.called)
  99. call_args = mock_monitor.notify_exception.call_args[0]
  100. self.assertEqual(call_args[0], self.transfer_id)
  101. self.assertIsInstance(call_args[1], CancelledError)
  102. def test_cancel(self):
  103. self.future.cancel()
  104. self.monitor.notify_done(self.transfer_id)
  105. with self.assertRaises(CancelledError):
  106. self.future.result()
  107. class TestProcessPoolTransferMeta(unittest.TestCase):
  108. def test_transfer_id(self):
  109. meta = ProcessPoolTransferMeta(1, CallArgs())
  110. self.assertEqual(meta.transfer_id, 1)
  111. def test_call_args(self):
  112. call_args = CallArgs()
  113. meta = ProcessPoolTransferMeta(1, call_args)
  114. self.assertEqual(meta.call_args, call_args)
  115. def test_user_context(self):
  116. meta = ProcessPoolTransferMeta(1, CallArgs())
  117. self.assertEqual(meta.user_context, {})
  118. meta.user_context['mykey'] = 'myvalue'
  119. self.assertEqual(meta.user_context, {'mykey': 'myvalue'})
  120. class TestClientFactory(unittest.TestCase):
  121. def test_create_client(self):
  122. client = ClientFactory().create_client()
  123. self.assertIsInstance(client, BaseClient)
  124. self.assertEqual(client.meta.service_model.service_name, 's3')
  125. self.assertIn(PROCESS_USER_AGENT, client.meta.config.user_agent)
  126. def test_create_client_with_client_kwargs(self):
  127. client = ClientFactory({'region_name': 'myregion'}).create_client()
  128. self.assertEqual(client.meta.region_name, 'myregion')
  129. def test_user_agent_with_config(self):
  130. client = ClientFactory({'config': Config()}).create_client()
  131. self.assertIn(PROCESS_USER_AGENT, client.meta.config.user_agent)
  132. def test_user_agent_with_existing_user_agent_extra(self):
  133. config = Config(user_agent_extra='foo/1.0')
  134. client = ClientFactory({'config': config}).create_client()
  135. self.assertIn(PROCESS_USER_AGENT, client.meta.config.user_agent)
  136. def test_user_agent_with_existing_user_agent(self):
  137. config = Config(user_agent='foo/1.0')
  138. client = ClientFactory({'config': config}).create_client()
  139. self.assertIn(PROCESS_USER_AGENT, client.meta.config.user_agent)
  140. class TestTransferMonitor(unittest.TestCase):
  141. def setUp(self):
  142. self.monitor = TransferMonitor()
  143. self.transfer_id = self.monitor.notify_new_transfer()
  144. def test_notify_new_transfer_creates_new_state(self):
  145. monitor = TransferMonitor()
  146. transfer_id = monitor.notify_new_transfer()
  147. self.assertFalse(monitor.is_done(transfer_id))
  148. self.assertIsNone(monitor.get_exception(transfer_id))
  149. def test_notify_new_transfer_increments_transfer_id(self):
  150. monitor = TransferMonitor()
  151. self.assertEqual(monitor.notify_new_transfer(), 0)
  152. self.assertEqual(monitor.notify_new_transfer(), 1)
  153. def test_notify_get_exception(self):
  154. exception = Exception()
  155. self.monitor.notify_exception(self.transfer_id, exception)
  156. self.assertEqual(
  157. self.monitor.get_exception(self.transfer_id), exception)
  158. def test_get_no_exception(self):
  159. self.assertIsNone(self.monitor.get_exception(self.transfer_id))
  160. def test_notify_jobs(self):
  161. self.monitor.notify_expected_jobs_to_complete(self.transfer_id, 2)
  162. self.assertEqual(self.monitor.notify_job_complete(self.transfer_id), 1)
  163. self.assertEqual(self.monitor.notify_job_complete(self.transfer_id), 0)
  164. def test_notify_jobs_for_multiple_transfers(self):
  165. self.monitor.notify_expected_jobs_to_complete(self.transfer_id, 2)
  166. other_transfer_id = self.monitor.notify_new_transfer()
  167. self.monitor.notify_expected_jobs_to_complete(other_transfer_id, 2)
  168. self.assertEqual(self.monitor.notify_job_complete(self.transfer_id), 1)
  169. self.assertEqual(
  170. self.monitor.notify_job_complete(other_transfer_id), 1)
  171. def test_done(self):
  172. self.assertFalse(self.monitor.is_done(self.transfer_id))
  173. self.monitor.notify_done(self.transfer_id)
  174. self.assertTrue(self.monitor.is_done(self.transfer_id))
  175. def test_poll_for_result(self):
  176. self.monitor.notify_done(self.transfer_id)
  177. self.assertIsNone(self.monitor.poll_for_result(self.transfer_id))
  178. def test_poll_for_result_raises_error(self):
  179. self.monitor.notify_exception(self.transfer_id, RuntimeError())
  180. self.monitor.notify_done(self.transfer_id)
  181. with self.assertRaises(RuntimeError):
  182. self.monitor.poll_for_result(self.transfer_id)
  183. def test_poll_for_result_waits_till_done(self):
  184. event_order = []
  185. def sleep_then_notify_done():
  186. time.sleep(0.05)
  187. event_order.append('notify_done')
  188. self.monitor.notify_done(self.transfer_id)
  189. t = threading.Thread(target=sleep_then_notify_done)
  190. t.start()
  191. self.monitor.poll_for_result(self.transfer_id)
  192. event_order.append('done_polling')
  193. self.assertEqual(event_order, ['notify_done', 'done_polling'])
  194. def test_notify_cancel_all_in_progress(self):
  195. monitor = TransferMonitor()
  196. transfer_ids = []
  197. for _ in range(10):
  198. transfer_ids.append(monitor.notify_new_transfer())
  199. monitor.notify_cancel_all_in_progress()
  200. for transfer_id in transfer_ids:
  201. self.assertIsInstance(
  202. monitor.get_exception(transfer_id), CancelledError)
  203. # Cancelling a transfer does not mean it is done as there may
  204. # be cleanup work left to do.
  205. self.assertFalse(monitor.is_done(transfer_id))
  206. def test_notify_cancel_does_not_affect_done_transfers(self):
  207. self.monitor.notify_done(self.transfer_id)
  208. self.monitor.notify_cancel_all_in_progress()
  209. self.assertTrue(self.monitor.is_done(self.transfer_id))
  210. self.assertIsNone(self.monitor.get_exception(self.transfer_id))
  211. class TestTransferState(unittest.TestCase):
  212. def setUp(self):
  213. self.state = TransferState()
  214. def test_done(self):
  215. self.assertFalse(self.state.done)
  216. self.state.set_done()
  217. self.assertTrue(self.state.done)
  218. def test_waits_till_done_is_set(self):
  219. event_order = []
  220. def sleep_then_set_done():
  221. time.sleep(0.05)
  222. event_order.append('set_done')
  223. self.state.set_done()
  224. t = threading.Thread(target=sleep_then_set_done)
  225. t.start()
  226. self.state.wait_till_done()
  227. event_order.append('done_waiting')
  228. self.assertEqual(event_order, ['set_done', 'done_waiting'])
  229. def test_exception(self):
  230. exception = RuntimeError()
  231. self.state.exception = exception
  232. self.assertEqual(self.state.exception, exception)
  233. def test_jobs_to_complete(self):
  234. self.state.jobs_to_complete = 5
  235. self.assertEqual(self.state.jobs_to_complete, 5)
  236. def test_decrement_jobs_to_complete(self):
  237. self.state.jobs_to_complete = 5
  238. self.assertEqual(self.state.decrement_jobs_to_complete(), 4)
  239. class TestGetObjectSubmitter(StubbedClientTest):
  240. def setUp(self):
  241. super(TestGetObjectSubmitter, self).setUp()
  242. self.transfer_config = ProcessTransferConfig()
  243. self.client_factory = mock.Mock(ClientFactory)
  244. self.client_factory.create_client.return_value = self.client
  245. self.transfer_monitor = TransferMonitor()
  246. self.osutil = mock.Mock(OSUtils)
  247. self.download_request_queue = queue.Queue()
  248. self.worker_queue = queue.Queue()
  249. self.submitter = GetObjectSubmitter(
  250. transfer_config=self.transfer_config,
  251. client_factory=self.client_factory,
  252. transfer_monitor=self.transfer_monitor,
  253. osutil=self.osutil,
  254. download_request_queue=self.download_request_queue,
  255. worker_queue=self.worker_queue,
  256. )
  257. self.transfer_id = self.transfer_monitor.notify_new_transfer()
  258. self.bucket = 'bucket'
  259. self.key = 'key'
  260. self.filename = 'myfile'
  261. self.temp_filename = 'myfile.temp'
  262. self.osutil.get_temp_filename.return_value = self.temp_filename
  263. self.extra_args = {}
  264. self.expected_size = None
  265. def add_download_file_request(self, **override_kwargs):
  266. kwargs = {
  267. 'transfer_id': self.transfer_id,
  268. 'bucket': self.bucket,
  269. 'key': self.key,
  270. 'filename': self.filename,
  271. 'extra_args': self.extra_args,
  272. 'expected_size': self.expected_size
  273. }
  274. kwargs.update(override_kwargs)
  275. self.download_request_queue.put(DownloadFileRequest(**kwargs))
  276. def add_shutdown(self):
  277. self.download_request_queue.put(SHUTDOWN_SIGNAL)
  278. def assert_submitted_get_object_jobs(self, expected_jobs):
  279. actual_jobs = []
  280. while not self.worker_queue.empty():
  281. actual_jobs.append(self.worker_queue.get())
  282. self.assertEqual(actual_jobs, expected_jobs)
  283. def test_run_for_non_ranged_download(self):
  284. self.add_download_file_request(expected_size=1)
  285. self.add_shutdown()
  286. self.submitter.run()
  287. self.osutil.allocate.assert_called_with(self.temp_filename, 1)
  288. self.assert_submitted_get_object_jobs([
  289. GetObjectJob(
  290. transfer_id=self.transfer_id,
  291. bucket=self.bucket,
  292. key=self.key,
  293. temp_filename=self.temp_filename,
  294. offset=0,
  295. extra_args={},
  296. filename=self.filename,
  297. )
  298. ])
  299. def test_run_for_ranged_download(self):
  300. self.transfer_config.multipart_chunksize = 2
  301. self.transfer_config.multipart_threshold = 4
  302. self.add_download_file_request(expected_size=4)
  303. self.add_shutdown()
  304. self.submitter.run()
  305. self.osutil.allocate.assert_called_with(self.temp_filename, 4)
  306. self.assert_submitted_get_object_jobs([
  307. GetObjectJob(
  308. transfer_id=self.transfer_id,
  309. bucket=self.bucket,
  310. key=self.key,
  311. temp_filename=self.temp_filename,
  312. offset=0,
  313. extra_args={'Range': 'bytes=0-1'},
  314. filename=self.filename,
  315. ),
  316. GetObjectJob(
  317. transfer_id=self.transfer_id,
  318. bucket=self.bucket,
  319. key=self.key,
  320. temp_filename=self.temp_filename,
  321. offset=2,
  322. extra_args={'Range': 'bytes=2-'},
  323. filename=self.filename,
  324. ),
  325. ])
  326. def test_run_when_expected_size_not_provided(self):
  327. self.stubber.add_response(
  328. 'head_object', {'ContentLength': 1},
  329. expected_params={
  330. 'Bucket': self.bucket,
  331. 'Key': self.key
  332. }
  333. )
  334. self.add_download_file_request(expected_size=None)
  335. self.add_shutdown()
  336. self.submitter.run()
  337. self.stubber.assert_no_pending_responses()
  338. self.osutil.allocate.assert_called_with(self.temp_filename, 1)
  339. self.assert_submitted_get_object_jobs([
  340. GetObjectJob(
  341. transfer_id=self.transfer_id,
  342. bucket=self.bucket,
  343. key=self.key,
  344. temp_filename=self.temp_filename,
  345. offset=0,
  346. extra_args={},
  347. filename=self.filename,
  348. )
  349. ])
  350. def test_run_with_extra_args(self):
  351. self.stubber.add_response(
  352. 'head_object', {'ContentLength': 1},
  353. expected_params={
  354. 'Bucket': self.bucket,
  355. 'Key': self.key,
  356. 'VersionId': 'versionid'
  357. }
  358. )
  359. self.add_download_file_request(
  360. extra_args={'VersionId': 'versionid'},
  361. expected_size=None
  362. )
  363. self.add_shutdown()
  364. self.submitter.run()
  365. self.stubber.assert_no_pending_responses()
  366. self.osutil.allocate.assert_called_with(self.temp_filename, 1)
  367. self.assert_submitted_get_object_jobs([
  368. GetObjectJob(
  369. transfer_id=self.transfer_id,
  370. bucket=self.bucket,
  371. key=self.key,
  372. temp_filename=self.temp_filename,
  373. offset=0,
  374. extra_args={'VersionId': 'versionid'},
  375. filename=self.filename,
  376. )
  377. ])
  378. def test_run_with_exception(self):
  379. self.stubber.add_client_error('head_object', 'NoSuchKey', 404)
  380. self.add_download_file_request(expected_size=None)
  381. self.add_shutdown()
  382. self.submitter.run()
  383. self.stubber.assert_no_pending_responses()
  384. self.assert_submitted_get_object_jobs([])
  385. self.assertIsInstance(
  386. self.transfer_monitor.get_exception(self.transfer_id), ClientError)
  387. def test_run_with_error_in_allocating_temp_file(self):
  388. self.osutil.allocate.side_effect = OSError()
  389. self.add_download_file_request(expected_size=1)
  390. self.add_shutdown()
  391. self.submitter.run()
  392. self.assert_submitted_get_object_jobs([])
  393. self.assertIsInstance(
  394. self.transfer_monitor.get_exception(self.transfer_id), OSError)
  395. @skip_if_windows('os.kill() with SIGINT not supported on Windows')
  396. def test_submitter_cannot_be_killed(self):
  397. self.add_download_file_request(expected_size=None)
  398. self.add_shutdown()
  399. def raise_ctrl_c(**kwargs):
  400. os.kill(os.getpid(), signal.SIGINT)
  401. mock_client = mock.Mock()
  402. mock_client.head_object = raise_ctrl_c
  403. self.client_factory.create_client.return_value = mock_client
  404. try:
  405. self.submitter.run()
  406. except KeyboardInterrupt:
  407. self.fail(
  408. 'The submitter should have not been killed by the '
  409. 'KeyboardInterrupt'
  410. )
  411. class TestGetObjectWorker(StubbedClientTest):
  412. def setUp(self):
  413. super(TestGetObjectWorker, self).setUp()
  414. self.files = FileCreator()
  415. self.queue = queue.Queue()
  416. self.client_factory = mock.Mock(ClientFactory)
  417. self.client_factory.create_client.return_value = self.client
  418. self.transfer_monitor = TransferMonitor()
  419. self.osutil = OSUtils()
  420. self.worker = GetObjectWorker(
  421. queue=self.queue,
  422. client_factory=self.client_factory,
  423. transfer_monitor=self.transfer_monitor,
  424. osutil=self.osutil
  425. )
  426. self.transfer_id = self.transfer_monitor.notify_new_transfer()
  427. self.bucket = 'bucket'
  428. self.key = 'key'
  429. self.remote_contents = b'my content'
  430. self.temp_filename = self.files.create_file('tempfile', '')
  431. self.extra_args = {}
  432. self.offset = 0
  433. self.final_filename = self.files.full_path('final_filename')
  434. self.stream = six.BytesIO(self.remote_contents)
  435. self.transfer_monitor.notify_expected_jobs_to_complete(
  436. self.transfer_id, 1000)
  437. def tearDown(self):
  438. super(TestGetObjectWorker, self).tearDown()
  439. self.files.remove_all()
  440. def add_get_object_job(self, **override_kwargs):
  441. kwargs = {
  442. 'transfer_id': self.transfer_id,
  443. 'bucket': self.bucket,
  444. 'key': self.key,
  445. 'temp_filename': self.temp_filename,
  446. 'extra_args': self.extra_args,
  447. 'offset': self.offset,
  448. 'filename': self.final_filename
  449. }
  450. kwargs.update(override_kwargs)
  451. self.queue.put(GetObjectJob(**kwargs))
  452. def add_shutdown(self):
  453. self.queue.put(SHUTDOWN_SIGNAL)
  454. def add_stubbed_get_object_response(self, body=None, expected_params=None):
  455. if body is None:
  456. body = self.stream
  457. get_object_response = {'Body': body}
  458. if expected_params is None:
  459. expected_params = {
  460. 'Bucket': self.bucket,
  461. 'Key': self.key
  462. }
  463. self.stubber.add_response(
  464. 'get_object', get_object_response, expected_params)
  465. def assert_contents(self, filename, contents):
  466. self.assertTrue(os.path.exists(filename))
  467. with open(filename, 'rb') as f:
  468. self.assertEqual(f.read(), contents)
  469. def assert_does_not_exist(self, filename):
  470. self.assertFalse(os.path.exists(filename))
  471. def test_run_is_final_job(self):
  472. self.add_get_object_job()
  473. self.add_shutdown()
  474. self.add_stubbed_get_object_response()
  475. self.transfer_monitor.notify_expected_jobs_to_complete(
  476. self.transfer_id, 1)
  477. self.worker.run()
  478. self.stubber.assert_no_pending_responses()
  479. self.assert_does_not_exist(self.temp_filename)
  480. self.assert_contents(self.final_filename, self.remote_contents)
  481. def test_run_jobs_is_not_final_job(self):
  482. self.add_get_object_job()
  483. self.add_shutdown()
  484. self.add_stubbed_get_object_response()
  485. self.transfer_monitor.notify_expected_jobs_to_complete(
  486. self.transfer_id, 1000)
  487. self.worker.run()
  488. self.stubber.assert_no_pending_responses()
  489. self.assert_contents(self.temp_filename, self.remote_contents)
  490. self.assert_does_not_exist(self.final_filename)
  491. def test_run_with_extra_args(self):
  492. self.add_get_object_job(extra_args={'VersionId': 'versionid'})
  493. self.add_shutdown()
  494. self.add_stubbed_get_object_response(
  495. expected_params={
  496. 'Bucket': self.bucket,
  497. 'Key': self.key,
  498. 'VersionId': 'versionid'
  499. }
  500. )
  501. self.worker.run()
  502. self.stubber.assert_no_pending_responses()
  503. def test_run_with_offset(self):
  504. offset = 1
  505. self.add_get_object_job(offset=offset)
  506. self.add_shutdown()
  507. self.add_stubbed_get_object_response()
  508. self.worker.run()
  509. with open(self.temp_filename, 'rb') as f:
  510. f.seek(offset)
  511. self.assertEqual(f.read(), self.remote_contents)
  512. def test_run_error_in_get_object(self):
  513. self.add_get_object_job()
  514. self.add_shutdown()
  515. self.stubber.add_client_error('get_object', 'NoSuchKey', 404)
  516. self.add_stubbed_get_object_response()
  517. self.worker.run()
  518. self.assertIsInstance(
  519. self.transfer_monitor.get_exception(self.transfer_id), ClientError)
  520. def test_run_does_retries_for_get_object(self):
  521. self.add_get_object_job()
  522. self.add_shutdown()
  523. self.add_stubbed_get_object_response(
  524. body=StreamWithError(
  525. self.stream, ReadTimeoutError(endpoint_url='')))
  526. self.add_stubbed_get_object_response()
  527. self.worker.run()
  528. self.stubber.assert_no_pending_responses()
  529. self.assert_contents(self.temp_filename, self.remote_contents)
  530. def test_run_can_exhaust_retries_for_get_object(self):
  531. self.add_get_object_job()
  532. self.add_shutdown()
  533. # 5 is the current setting for max number of GetObject attempts
  534. for _ in range(5):
  535. self.add_stubbed_get_object_response(
  536. body=StreamWithError(
  537. self.stream, ReadTimeoutError(endpoint_url='')))
  538. self.worker.run()
  539. self.stubber.assert_no_pending_responses()
  540. self.assertIsInstance(
  541. self.transfer_monitor.get_exception(self.transfer_id),
  542. RetriesExceededError
  543. )
  544. def test_run_skips_get_object_on_previous_exception(self):
  545. self.add_get_object_job()
  546. self.add_shutdown()
  547. self.transfer_monitor.notify_exception(self.transfer_id, Exception())
  548. self.worker.run()
  549. # Note we did not add a stubbed response for get_object
  550. self.stubber.assert_no_pending_responses()
  551. def test_run_final_job_removes_file_on_previous_exception(self):
  552. self.add_get_object_job()
  553. self.add_shutdown()
  554. self.transfer_monitor.notify_exception(self.transfer_id, Exception())
  555. self.transfer_monitor.notify_expected_jobs_to_complete(
  556. self.transfer_id, 1)
  557. self.worker.run()
  558. self.stubber.assert_no_pending_responses()
  559. self.assert_does_not_exist(self.temp_filename)
  560. self.assert_does_not_exist(self.final_filename)
  561. def test_run_fails_to_rename_file(self):
  562. exception = OSError()
  563. osutil = RenameFailingOSUtils(exception)
  564. self.worker = GetObjectWorker(
  565. queue=self.queue,
  566. client_factory=self.client_factory,
  567. transfer_monitor=self.transfer_monitor,
  568. osutil=osutil
  569. )
  570. self.add_get_object_job()
  571. self.add_shutdown()
  572. self.add_stubbed_get_object_response()
  573. self.transfer_monitor.notify_expected_jobs_to_complete(
  574. self.transfer_id, 1)
  575. self.worker.run()
  576. self.assertEqual(
  577. self.transfer_monitor.get_exception(self.transfer_id), exception)
  578. self.assert_does_not_exist(self.temp_filename)
  579. self.assert_does_not_exist(self.final_filename)
  580. @skip_if_windows('os.kill() with SIGINT not supported on Windows')
  581. def test_worker_cannot_be_killed(self):
  582. self.add_get_object_job()
  583. self.add_shutdown()
  584. self.transfer_monitor.notify_expected_jobs_to_complete(
  585. self.transfer_id, 1)
  586. def raise_ctrl_c(**kwargs):
  587. os.kill(os.getpid(), signal.SIGINT)
  588. mock_client = mock.Mock()
  589. mock_client.get_object = raise_ctrl_c
  590. self.client_factory.create_client.return_value = mock_client
  591. try:
  592. self.worker.run()
  593. except KeyboardInterrupt:
  594. self.fail(
  595. 'The worker should have not been killed by the '
  596. 'KeyboardInterrupt'
  597. )