test_futures.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import sys
  14. import time
  15. import traceback
  16. import mock
  17. from concurrent.futures import ThreadPoolExecutor
  18. from __tests__ import unittest
  19. from __tests__ import RecordingExecutor
  20. from __tests__ import TransferCoordinatorWithInterrupt
  21. from s3transfer.exceptions import CancelledError
  22. from s3transfer.exceptions import FatalError
  23. from s3transfer.exceptions import TransferNotDoneError
  24. from s3transfer.futures import TransferFuture
  25. from s3transfer.futures import TransferMeta
  26. from s3transfer.futures import TransferCoordinator
  27. from s3transfer.futures import BoundedExecutor
  28. from s3transfer.futures import ExecutorFuture
  29. from s3transfer.futures import BaseExecutor
  30. from s3transfer.futures import NonThreadedExecutor
  31. from s3transfer.futures import NonThreadedExecutorFuture
  32. from s3transfer.tasks import Task
  33. from s3transfer.utils import FunctionContainer
  34. from s3transfer.utils import TaskSemaphore
  35. from s3transfer.utils import NoResourcesAvailable
  36. def return_call_args(*args, **kwargs):
  37. return args, kwargs
  38. def raise_exception(exception):
  39. raise exception
  40. def get_exc_info(exception):
  41. try:
  42. raise_exception(exception)
  43. except:
  44. return sys.exc_info()
  45. class RecordingTransferCoordinator(TransferCoordinator):
  46. def __init__(self):
  47. self.all_transfer_futures_ever_associated = set()
  48. super(RecordingTransferCoordinator, self).__init__()
  49. def add_associated_future(self, future):
  50. self.all_transfer_futures_ever_associated.add(future)
  51. super(RecordingTransferCoordinator, self).add_associated_future(future)
  52. class ReturnFooTask(Task):
  53. def _main(self, **kwargs):
  54. return 'foo'
  55. class SleepTask(Task):
  56. def _main(self, sleep_time, **kwargs):
  57. time.sleep(sleep_time)
  58. class TestTransferFuture(unittest.TestCase):
  59. def setUp(self):
  60. self.meta = TransferMeta()
  61. self.coordinator = TransferCoordinator()
  62. self.future = self._get_transfer_future()
  63. def _get_transfer_future(self, **kwargs):
  64. components = {
  65. 'meta': self.meta,
  66. 'coordinator': self.coordinator,
  67. }
  68. for component_name, component in kwargs.items():
  69. components[component_name] = component
  70. return TransferFuture(**components)
  71. def test_meta(self):
  72. self.assertIs(self.future.meta, self.meta)
  73. def test_done(self):
  74. self.assertFalse(self.future.done())
  75. self.coordinator.set_result(None)
  76. self.assertTrue(self.future.done())
  77. def test_result(self):
  78. result = 'foo'
  79. self.coordinator.set_result(result)
  80. self.coordinator.announce_done()
  81. self.assertEqual(self.future.result(), result)
  82. def test_keyboard_interrupt_on_result_does_not_block(self):
  83. # This should raise a KeyboardInterrupt when result is called on it.
  84. self.coordinator = TransferCoordinatorWithInterrupt()
  85. self.future = self._get_transfer_future()
  86. # result() should not block and immediately raise the keyboard
  87. # interrupt exception.
  88. with self.assertRaises(KeyboardInterrupt):
  89. self.future.result()
  90. def test_cancel(self):
  91. self.future.cancel()
  92. self.assertTrue(self.future.done())
  93. self.assertEqual(self.coordinator.status, 'cancelled')
  94. def test_set_exception(self):
  95. # Set the result such that there is no exception
  96. self.coordinator.set_result('result')
  97. self.coordinator.announce_done()
  98. self.assertEqual(self.future.result(), 'result')
  99. self.future.set_exception(ValueError())
  100. with self.assertRaises(ValueError):
  101. self.future.result()
  102. def test_set_exception_only_after_done(self):
  103. with self.assertRaises(TransferNotDoneError):
  104. self.future.set_exception(ValueError())
  105. self.coordinator.set_result('result')
  106. self.coordinator.announce_done()
  107. self.future.set_exception(ValueError())
  108. with self.assertRaises(ValueError):
  109. self.future.result()
  110. class TestTransferMeta(unittest.TestCase):
  111. def setUp(self):
  112. self.transfer_meta = TransferMeta()
  113. def test_size(self):
  114. self.assertEqual(self.transfer_meta.size, None)
  115. self.transfer_meta.provide_transfer_size(5)
  116. self.assertEqual(self.transfer_meta.size, 5)
  117. def test_call_args(self):
  118. call_args = object()
  119. transfer_meta = TransferMeta(call_args)
  120. # Assert the that call args provided is the same as is returned
  121. self.assertIs(transfer_meta.call_args, call_args)
  122. def test_transfer_id(self):
  123. transfer_meta = TransferMeta(transfer_id=1)
  124. self.assertEqual(transfer_meta.transfer_id, 1)
  125. def test_user_context(self):
  126. self.transfer_meta.user_context['foo'] = 'bar'
  127. self.assertEqual(self.transfer_meta.user_context, {'foo': 'bar'})
  128. class TestTransferCoordinator(unittest.TestCase):
  129. def setUp(self):
  130. self.transfer_coordinator = TransferCoordinator()
  131. def test_transfer_id(self):
  132. transfer_coordinator = TransferCoordinator(transfer_id=1)
  133. self.assertEqual(transfer_coordinator.transfer_id, 1)
  134. def test_repr(self):
  135. transfer_coordinator = TransferCoordinator(transfer_id=1)
  136. self.assertEqual(
  137. repr(transfer_coordinator), 'TransferCoordinator(transfer_id=1)')
  138. def test_initial_status(self):
  139. # A TransferCoordinator with no progress should have the status
  140. # of not-started
  141. self.assertEqual(self.transfer_coordinator.status, 'not-started')
  142. def test_set_status_to_queued(self):
  143. self.transfer_coordinator.set_status_to_queued()
  144. self.assertEqual(self.transfer_coordinator.status, 'queued')
  145. def test_cannot_set_status_to_queued_from_done_state(self):
  146. self.transfer_coordinator.set_exception(RuntimeError)
  147. with self.assertRaises(RuntimeError):
  148. self.transfer_coordinator.set_status_to_queued()
  149. def test_status_running(self):
  150. self.transfer_coordinator.set_status_to_running()
  151. self.assertEqual(self.transfer_coordinator.status, 'running')
  152. def test_cannot_set_status_to_running_from_done_state(self):
  153. self.transfer_coordinator.set_exception(RuntimeError)
  154. with self.assertRaises(RuntimeError):
  155. self.transfer_coordinator.set_status_to_running()
  156. def test_set_result(self):
  157. success_result = 'foo'
  158. self.transfer_coordinator.set_result(success_result)
  159. self.transfer_coordinator.announce_done()
  160. # Setting result should result in a success state and the return value
  161. # that was set.
  162. self.assertEqual(self.transfer_coordinator.status, 'success')
  163. self.assertEqual(self.transfer_coordinator.result(), success_result)
  164. def test_set_exception(self):
  165. exception_result = RuntimeError
  166. self.transfer_coordinator.set_exception(exception_result)
  167. self.transfer_coordinator.announce_done()
  168. # Setting an exception should result in a failed state and the return
  169. # value should be the rasied exception
  170. self.assertEqual(self.transfer_coordinator.status, 'failed')
  171. self.assertEqual(self.transfer_coordinator.exception, exception_result)
  172. with self.assertRaises(exception_result):
  173. self.transfer_coordinator.result()
  174. def test_exception_cannot_override_done_state(self):
  175. self.transfer_coordinator.set_result('foo')
  176. self.transfer_coordinator.set_exception(RuntimeError)
  177. # It status should be success even after the exception is set because
  178. # success is a done state.
  179. self.assertEqual(self.transfer_coordinator.status, 'success')
  180. def test_exception_can_override_done_state_with_override_flag(self):
  181. self.transfer_coordinator.set_result('foo')
  182. self.transfer_coordinator.set_exception(RuntimeError, override=True)
  183. self.assertEqual(self.transfer_coordinator.status, 'failed')
  184. def test_cancel(self):
  185. self.assertEqual(self.transfer_coordinator.status, 'not-started')
  186. self.transfer_coordinator.cancel()
  187. # This should set the state to cancelled and raise the CancelledError
  188. # exception and should have also set the done event so that result()
  189. # is no longer set.
  190. self.assertEqual(self.transfer_coordinator.status, 'cancelled')
  191. with self.assertRaises(CancelledError):
  192. self.transfer_coordinator.result()
  193. def test_cancel_can_run_done_callbacks_that_uses_result(self):
  194. exceptions = []
  195. def capture_exception(transfer_coordinator, captured_exceptions):
  196. try:
  197. transfer_coordinator.result()
  198. except Exception as e:
  199. captured_exceptions.append(e)
  200. self.assertEqual(self.transfer_coordinator.status, 'not-started')
  201. self.transfer_coordinator.add_done_callback(
  202. capture_exception, self.transfer_coordinator, exceptions)
  203. self.transfer_coordinator.cancel()
  204. self.assertEqual(len(exceptions), 1)
  205. self.assertIsInstance(exceptions[0], CancelledError)
  206. def test_cancel_with_message(self):
  207. message = 'my message'
  208. self.transfer_coordinator.cancel(message)
  209. self.transfer_coordinator.announce_done()
  210. with self.assertRaisesRegexp(CancelledError, message):
  211. self.transfer_coordinator.result()
  212. def test_cancel_with_provided_exception(self):
  213. message = 'my message'
  214. self.transfer_coordinator.cancel(message, exc_type=FatalError)
  215. self.transfer_coordinator.announce_done()
  216. with self.assertRaisesRegexp(FatalError, message):
  217. self.transfer_coordinator.result()
  218. def test_cancel_cannot_override_done_state(self):
  219. self.transfer_coordinator.set_result('foo')
  220. self.transfer_coordinator.cancel()
  221. # It status should be success even after cancel is called because
  222. # succes is a done state.
  223. self.assertEqual(self.transfer_coordinator.status, 'success')
  224. def test_set_result_can_override_cancel(self):
  225. self.transfer_coordinator.cancel()
  226. # Result setting should override any cancel or set exception as this
  227. # is always invoked by the final task.
  228. self.transfer_coordinator.set_result('foo')
  229. self.transfer_coordinator.announce_done()
  230. self.assertEqual(self.transfer_coordinator.status, 'success')
  231. def test_submit(self):
  232. # Submit a callable to the transfer coordinator. It should submit it
  233. # to the executor.
  234. executor = RecordingExecutor(
  235. BoundedExecutor(1, 1, {'my-tag': TaskSemaphore(1)}))
  236. task = ReturnFooTask(self.transfer_coordinator)
  237. future = self.transfer_coordinator.submit(executor, task, tag='my-tag')
  238. executor.shutdown()
  239. # Make sure the future got submit and executed as well by checking its
  240. # result value which should include the provided future tag.
  241. self.assertEqual(
  242. executor.submissions,
  243. [{'block': True, 'tag': 'my-tag', 'task': task}]
  244. )
  245. self.assertEqual(future.result(), 'foo')
  246. def test_association_and_disassociation_on_submit(self):
  247. self.transfer_coordinator = RecordingTransferCoordinator()
  248. # Submit a callable to the transfer coordinator.
  249. executor = BoundedExecutor(1, 1)
  250. task = ReturnFooTask(self.transfer_coordinator)
  251. future = self.transfer_coordinator.submit(executor, task)
  252. executor.shutdown()
  253. # Make sure the future that got submitted was associated to the
  254. # transfer future at some point.
  255. self.assertEqual(
  256. self.transfer_coordinator.all_transfer_futures_ever_associated,
  257. set([future])
  258. )
  259. # Make sure the future got disassociated once the future is now done
  260. # by looking at the currently associated futures.
  261. self.assertEqual(
  262. self.transfer_coordinator.associated_futures, set([]))
  263. def test_done(self):
  264. # These should result in not done state:
  265. # queued
  266. self.assertFalse(self.transfer_coordinator.done())
  267. # running
  268. self.transfer_coordinator.set_status_to_running()
  269. self.assertFalse(self.transfer_coordinator.done())
  270. # These should result in done state:
  271. # failed
  272. self.transfer_coordinator.set_exception(Exception)
  273. self.assertTrue(self.transfer_coordinator.done())
  274. # success
  275. self.transfer_coordinator.set_result('foo')
  276. self.assertTrue(self.transfer_coordinator.done())
  277. # cancelled
  278. self.transfer_coordinator.cancel()
  279. self.assertTrue(self.transfer_coordinator.done())
  280. def test_result_waits_until_done(self):
  281. execution_order = []
  282. def sleep_then_set_result(transfer_coordinator, execution_order):
  283. time.sleep(0.05)
  284. execution_order.append('setting_result')
  285. transfer_coordinator.set_result(None)
  286. self.transfer_coordinator.announce_done()
  287. with ThreadPoolExecutor(max_workers=1) as executor:
  288. executor.submit(
  289. sleep_then_set_result, self.transfer_coordinator,
  290. execution_order)
  291. self.transfer_coordinator.result()
  292. execution_order.append('after_result')
  293. # The result() call should have waited until the other thread set
  294. # the result after sleeping for 0.05 seconds.
  295. self.assertTrue(execution_order, ['setting_result', 'after_result'])
  296. def test_failure_cleanups(self):
  297. args = (1, 2)
  298. kwargs = {'foo': 'bar'}
  299. second_args = (2, 4)
  300. second_kwargs = {'biz': 'baz'}
  301. self.transfer_coordinator.add_failure_cleanup(
  302. return_call_args, *args, **kwargs)
  303. self.transfer_coordinator.add_failure_cleanup(
  304. return_call_args, *second_args, **second_kwargs)
  305. # Ensure the callbacks got added.
  306. self.assertEqual(len(self.transfer_coordinator.failure_cleanups), 2)
  307. result_list = []
  308. # Ensure they will get called in the correct order.
  309. for cleanup in self.transfer_coordinator.failure_cleanups:
  310. result_list.append(cleanup())
  311. self.assertEqual(
  312. result_list, [(args, kwargs), (second_args, second_kwargs)])
  313. def test_associated_futures(self):
  314. first_future = object()
  315. # Associate one future to the transfer
  316. self.transfer_coordinator.add_associated_future(first_future)
  317. associated_futures = self.transfer_coordinator.associated_futures
  318. # The first future should be in the returned list of futures.
  319. self.assertEqual(associated_futures, set([first_future]))
  320. second_future = object()
  321. # Associate another future to the transfer.
  322. self.transfer_coordinator.add_associated_future(second_future)
  323. # The association should not have mutated the returned list from
  324. # before.
  325. self.assertEqual(associated_futures, set([first_future]))
  326. # Both futures should be in the returned list.
  327. self.assertEqual(
  328. self.transfer_coordinator.associated_futures,
  329. set([first_future, second_future]))
  330. def test_done_callbacks_on_done(self):
  331. done_callback_invocations = []
  332. callback = FunctionContainer(
  333. done_callback_invocations.append, 'done callback called')
  334. # Add the done callback to the transfer.
  335. self.transfer_coordinator.add_done_callback(callback)
  336. # Announce that the transfer is done. This should invoke the done
  337. # callback.
  338. self.transfer_coordinator.announce_done()
  339. self.assertEqual(done_callback_invocations, ['done callback called'])
  340. # If done is announced again, we should not invoke the callback again
  341. # because done has already been announced and thus the callback has
  342. # been ran as well.
  343. self.transfer_coordinator.announce_done()
  344. self.assertEqual(done_callback_invocations, ['done callback called'])
  345. def test_failure_cleanups_on_done(self):
  346. cleanup_invocations = []
  347. callback = FunctionContainer(
  348. cleanup_invocations.append, 'cleanup called')
  349. # Add the failure cleanup to the transfer.
  350. self.transfer_coordinator.add_failure_cleanup(callback)
  351. # Announce that the transfer is done. This should invoke the failure
  352. # cleanup.
  353. self.transfer_coordinator.announce_done()
  354. self.assertEqual(cleanup_invocations, ['cleanup called'])
  355. # If done is announced again, we should not invoke the cleanup again
  356. # because done has already been announced and thus the cleanup has
  357. # been ran as well.
  358. self.transfer_coordinator.announce_done()
  359. self.assertEqual(cleanup_invocations, ['cleanup called'])
  360. class TestBoundedExecutor(unittest.TestCase):
  361. def setUp(self):
  362. self.coordinator = TransferCoordinator()
  363. self.tag_semaphores = {}
  364. self.executor = self.get_executor()
  365. def get_executor(self, max_size=1, max_num_threads=1):
  366. return BoundedExecutor(max_size, max_num_threads, self.tag_semaphores)
  367. def get_task(self, task_cls, main_kwargs=None):
  368. return task_cls(self.coordinator, main_kwargs=main_kwargs)
  369. def get_sleep_task(self, sleep_time=0.01):
  370. return self.get_task(SleepTask, main_kwargs={'sleep_time': sleep_time})
  371. def add_semaphore(self, task_tag, count):
  372. self.tag_semaphores[task_tag] = TaskSemaphore(count)
  373. def assert_submit_would_block(self, task, tag=None):
  374. with self.assertRaises(NoResourcesAvailable):
  375. self.executor.submit(task, tag=tag, block=False)
  376. def assert_submit_would_not_block(self, task, tag=None, **kwargs):
  377. try:
  378. self.executor.submit(task, tag=tag, block=False)
  379. except NoResourcesAvailable:
  380. self.fail(
  381. 'Task %s should not have been blocked. Caused by:\n%s' % (
  382. task, traceback.format_exc()
  383. )
  384. )
  385. def add_done_callback_to_future(self, future, fn, *args, **kwargs):
  386. callback_for_future = FunctionContainer(fn, *args, **kwargs)
  387. future.add_done_callback(callback_for_future)
  388. def test_submit_single_task(self):
  389. # Ensure we can submit a task to the executor
  390. task = self.get_task(ReturnFooTask)
  391. future = self.executor.submit(task)
  392. # Ensure what we get back is a Future
  393. self.assertIsInstance(future, ExecutorFuture)
  394. # Ensure the callable got executed.
  395. self.assertEqual(future.result(), 'foo')
  396. def test_executor_blocks_on_full_capacity(self):
  397. first_task = self.get_sleep_task()
  398. second_task = self.get_sleep_task()
  399. self.executor.submit(first_task)
  400. # The first task should be sleeping for a substantial period of
  401. # time such that on the submission of the second task, it will
  402. # raise an error saying that it cannot be submitted as the max
  403. # capacity of the semaphore is one.
  404. self.assert_submit_would_block(second_task)
  405. def test_executor_clears_capacity_on_done_tasks(self):
  406. first_task = self.get_sleep_task()
  407. second_task = self.get_task(ReturnFooTask)
  408. # Submit a task.
  409. future = self.executor.submit(first_task)
  410. # Submit a new task when the first task finishes. This should not get
  411. # blocked because the first task should have finished clearing up
  412. # capacity.
  413. self.add_done_callback_to_future(
  414. future, self.assert_submit_would_not_block, second_task)
  415. # Wait for it to complete.
  416. self.executor.shutdown()
  417. def test_would_not_block_when_full_capacity_in_other_semaphore(self):
  418. first_task = self.get_sleep_task()
  419. # Now let's create a new task with a tag and so it uses different
  420. # semaphore.
  421. task_tag = 'other'
  422. other_task = self.get_sleep_task()
  423. self.add_semaphore(task_tag, 1)
  424. # Submit the normal first task
  425. self.executor.submit(first_task)
  426. # Even though The first task should be sleeping for a substantial
  427. # period of time, the submission of the second task should not
  428. # raise an error because it should use a different semaphore
  429. self.assert_submit_would_not_block(other_task, task_tag)
  430. # Another submission of the other task though should raise
  431. # an exception as the capacity is equal to one for that tag.
  432. self.assert_submit_would_block(other_task, task_tag)
  433. def test_shutdown(self):
  434. slow_task = self.get_sleep_task()
  435. future = self.executor.submit(slow_task)
  436. self.executor.shutdown()
  437. # Ensure that the shutdown waits until the task is done
  438. self.assertTrue(future.done())
  439. def test_shutdown_no_wait(self):
  440. slow_task = self.get_sleep_task()
  441. future = self.executor.submit(slow_task)
  442. self.executor.shutdown(False)
  443. # Ensure that the shutdown returns immediately even if the task is
  444. # not done, which it should not be because it it slow.
  445. self.assertFalse(future.done())
  446. def test_replace_underlying_executor(self):
  447. mocked_executor_cls = mock.Mock(BaseExecutor)
  448. executor = BoundedExecutor(10, 1, {}, mocked_executor_cls)
  449. executor.submit(self.get_task(ReturnFooTask))
  450. self.assertTrue(mocked_executor_cls.return_value.submit.called)
  451. class TestExecutorFuture(unittest.TestCase):
  452. def test_result(self):
  453. with ThreadPoolExecutor(max_workers=1) as executor:
  454. future = executor.submit(return_call_args, 'foo', biz='baz')
  455. wrapped_future = ExecutorFuture(future)
  456. self.assertEqual(wrapped_future.result(), (('foo',), {'biz': 'baz'}))
  457. def test_done(self):
  458. with ThreadPoolExecutor(max_workers=1) as executor:
  459. future = executor.submit(return_call_args, 'foo', biz='baz')
  460. wrapped_future = ExecutorFuture(future)
  461. self.assertTrue(wrapped_future.done())
  462. def test_add_done_callback(self):
  463. done_callbacks = []
  464. with ThreadPoolExecutor(max_workers=1) as executor:
  465. future = executor.submit(return_call_args, 'foo', biz='baz')
  466. wrapped_future = ExecutorFuture(future)
  467. wrapped_future.add_done_callback(
  468. FunctionContainer(done_callbacks.append, 'called'))
  469. self.assertEqual(done_callbacks, ['called'])
  470. class TestNonThreadedExecutor(unittest.TestCase):
  471. def test_submit(self):
  472. executor = NonThreadedExecutor()
  473. future = executor.submit(return_call_args, 1, 2, foo='bar')
  474. self.assertIsInstance(future, NonThreadedExecutorFuture)
  475. self.assertEqual(future.result(), ((1, 2), {'foo': 'bar'}))
  476. def test_submit_with_exception(self):
  477. executor = NonThreadedExecutor()
  478. future = executor.submit(raise_exception, RuntimeError())
  479. self.assertIsInstance(future, NonThreadedExecutorFuture)
  480. with self.assertRaises(RuntimeError):
  481. future.result()
  482. def test_submit_with_exception_and_captures_info(self):
  483. exception = ValueError('message')
  484. tb = get_exc_info(exception)[2]
  485. future = NonThreadedExecutor().submit(raise_exception, exception)
  486. try:
  487. future.result()
  488. # An exception should have been raised
  489. self.fail('Future should have raised a ValueError')
  490. except ValueError:
  491. actual_tb = sys.exc_info()[2]
  492. last_frame = traceback.extract_tb(actual_tb)[-1]
  493. last_expected_frame = traceback.extract_tb(tb)[-1]
  494. self.assertEqual(last_frame, last_expected_frame)
  495. class TestNonThreadedExecutorFuture(unittest.TestCase):
  496. def setUp(self):
  497. self.future = NonThreadedExecutorFuture()
  498. def test_done_starts_false(self):
  499. self.assertFalse(self.future.done())
  500. def test_done_after_setting_result(self):
  501. self.future.set_result('result')
  502. self.assertTrue(self.future.done())
  503. def test_done_after_setting_exception(self):
  504. self.future.set_exception_info(Exception(), None)
  505. self.assertTrue(self.future.done())
  506. def test_result(self):
  507. self.future.set_result('result')
  508. self.assertEqual(self.future.result(), 'result')
  509. def test_exception_result(self):
  510. exception = ValueError('message')
  511. self.future.set_exception_info(exception, None)
  512. with self.assertRaisesRegexp(ValueError, 'message'):
  513. self.future.result()
  514. def test_exception_result_doesnt_modify_last_frame(self):
  515. exception = ValueError('message')
  516. tb = get_exc_info(exception)[2]
  517. self.future.set_exception_info(exception, tb)
  518. try:
  519. self.future.result()
  520. # An exception should have been raised
  521. self.fail()
  522. except ValueError:
  523. actual_tb = sys.exc_info()[2]
  524. last_frame = traceback.extract_tb(actual_tb)[-1]
  525. last_expected_frame = traceback.extract_tb(tb)[-1]
  526. self.assertEqual(last_frame, last_expected_frame)
  527. def test_done_callback(self):
  528. done_futures = []
  529. self.future.add_done_callback(done_futures.append)
  530. self.assertEqual(done_futures, [])
  531. self.future.set_result('result')
  532. self.assertEqual(done_futures, [self.future])
  533. def test_done_callback_after_done(self):
  534. self.future.set_result('result')
  535. done_futures = []
  536. self.future.add_done_callback(done_futures.append)
  537. self.assertEqual(done_futures, [self.future])