test_tasks.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. from concurrent import futures
  14. from functools import partial
  15. from threading import Event
  16. from __tests__ import unittest
  17. from __tests__ import RecordingSubscriber
  18. from __tests__ import BaseTaskTest
  19. from __tests__ import BaseSubmissionTaskTest
  20. from s3transfer.futures import TransferCoordinator
  21. from s3transfer.futures import BoundedExecutor
  22. from s3transfer.subscribers import BaseSubscriber
  23. from s3transfer.tasks import Task
  24. from s3transfer.tasks import SubmissionTask
  25. from s3transfer.tasks import CreateMultipartUploadTask
  26. from s3transfer.tasks import CompleteMultipartUploadTask
  27. from s3transfer.utils import get_callbacks
  28. from s3transfer.utils import CallArgs
  29. from s3transfer.utils import FunctionContainer
  30. class TaskFailureException(Exception):
  31. pass
  32. class SuccessTask(Task):
  33. def _main(self, return_value='success', callbacks=None,
  34. failure_cleanups=None):
  35. if callbacks:
  36. for callback in callbacks:
  37. callback()
  38. if failure_cleanups:
  39. for failure_cleanup in failure_cleanups:
  40. self._transfer_coordinator.add_failure_cleanup(failure_cleanup)
  41. return return_value
  42. class FailureTask(Task):
  43. def _main(self, exception=TaskFailureException):
  44. raise exception()
  45. class ReturnKwargsTask(Task):
  46. def _main(self, **kwargs):
  47. return kwargs
  48. class SubmitMoreTasksTask(Task):
  49. def _main(self, executor, tasks_to_submit):
  50. for task_to_submit in tasks_to_submit:
  51. self._transfer_coordinator.submit(executor, task_to_submit)
  52. class NOOPSubmissionTask(SubmissionTask):
  53. def _submit(self, transfer_future, **kwargs):
  54. pass
  55. class ExceptionSubmissionTask(SubmissionTask):
  56. def _submit(self, transfer_future, executor=None, tasks_to_submit=None,
  57. additional_callbacks=None, exception=TaskFailureException):
  58. if executor and tasks_to_submit:
  59. for task_to_submit in tasks_to_submit:
  60. self._transfer_coordinator.submit(executor, task_to_submit)
  61. if additional_callbacks:
  62. for callback in additional_callbacks:
  63. callback()
  64. raise exception()
  65. class StatusRecordingTransferCoordinator(TransferCoordinator):
  66. def __init__(self, transfer_id=None):
  67. super(StatusRecordingTransferCoordinator, self).__init__(transfer_id)
  68. self.status_changes = [self._status]
  69. def set_status_to_queued(self):
  70. super(StatusRecordingTransferCoordinator, self).set_status_to_queued()
  71. self._record_status_change()
  72. def set_status_to_running(self):
  73. super(StatusRecordingTransferCoordinator, self).set_status_to_running()
  74. self._record_status_change()
  75. def _record_status_change(self):
  76. self.status_changes.append(self._status)
  77. class RecordingStateSubscriber(BaseSubscriber):
  78. def __init__(self, transfer_coordinator):
  79. self._transfer_coordinator = transfer_coordinator
  80. self.status_during_on_queued = None
  81. def on_queued(self, **kwargs):
  82. self.status_during_on_queued = self._transfer_coordinator.status
  83. class TestSubmissionTask(BaseSubmissionTaskTest):
  84. def setUp(self):
  85. super(TestSubmissionTask, self).setUp()
  86. self.executor = BoundedExecutor(1000, 5)
  87. self.call_args = CallArgs(subscribers=[])
  88. self.transfer_future = self.get_transfer_future(self.call_args)
  89. self.main_kwargs = {'transfer_future': self.transfer_future}
  90. def test_transitions_from_not_started_to_queued_to_running(self):
  91. self.transfer_coordinator = StatusRecordingTransferCoordinator()
  92. submission_task = self.get_task(
  93. NOOPSubmissionTask, main_kwargs=self.main_kwargs)
  94. # Status should be queued until submission task has been ran.
  95. self.assertEqual(self.transfer_coordinator.status, 'not-started')
  96. submission_task()
  97. # Once submission task has been ran, the status should now be running.
  98. self.assertEqual(self.transfer_coordinator.status, 'running')
  99. # Ensure the transitions were as expected as well.
  100. self.assertEqual(
  101. self.transfer_coordinator.status_changes,
  102. ['not-started', 'queued', 'running']
  103. )
  104. def test_on_queued_callbacks(self):
  105. submission_task = self.get_task(
  106. NOOPSubmissionTask, main_kwargs=self.main_kwargs)
  107. subscriber = RecordingSubscriber()
  108. self.call_args.subscribers.append(subscriber)
  109. submission_task()
  110. # Make sure the on_queued callback of the subscriber is called.
  111. self.assertEqual(
  112. subscriber.on_queued_calls, [{'future': self.transfer_future}])
  113. def test_on_queued_status_in_callbacks(self):
  114. submission_task = self.get_task(
  115. NOOPSubmissionTask, main_kwargs=self.main_kwargs)
  116. subscriber = RecordingStateSubscriber(self.transfer_coordinator)
  117. self.call_args.subscribers.append(subscriber)
  118. submission_task()
  119. # Make sure the status was queued during on_queued callback.
  120. self.assertEqual(subscriber.status_during_on_queued, 'queued')
  121. def test_sets_exception_from_submit(self):
  122. submission_task = self.get_task(
  123. ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
  124. submission_task()
  125. # Make sure the status of the future is failed
  126. self.assertEqual(self.transfer_coordinator.status, 'failed')
  127. # Make sure the future propogates the exception encountered in the
  128. # submission task.
  129. with self.assertRaises(TaskFailureException):
  130. self.transfer_future.result()
  131. def test_catches_and_sets_keyboard_interrupt_exception_from_submit(self):
  132. self.main_kwargs['exception'] = KeyboardInterrupt
  133. submission_task = self.get_task(
  134. ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
  135. submission_task()
  136. self.assertEqual(self.transfer_coordinator.status, 'failed')
  137. with self.assertRaises(KeyboardInterrupt):
  138. self.transfer_future.result()
  139. def test_calls_done_callbacks_on_exception(self):
  140. submission_task = self.get_task(
  141. ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
  142. subscriber = RecordingSubscriber()
  143. self.call_args.subscribers.append(subscriber)
  144. # Add the done callback to the callbacks to be invoked when the
  145. # transfer is done.
  146. done_callbacks = get_callbacks(self.transfer_future, 'done')
  147. for done_callback in done_callbacks:
  148. self.transfer_coordinator.add_done_callback(done_callback)
  149. submission_task()
  150. # Make sure the task failed to start
  151. self.assertEqual(self.transfer_coordinator.status, 'failed')
  152. # Make sure the on_done callback of the subscriber is called.
  153. self.assertEqual(
  154. subscriber.on_done_calls, [{'future': self.transfer_future}])
  155. def test_calls_failure_cleanups_on_exception(self):
  156. submission_task = self.get_task(
  157. ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
  158. # Add the callback to the callbacks to be invoked when the
  159. # transfer fails.
  160. invocations_of_cleanup = []
  161. cleanup_callback = FunctionContainer(
  162. invocations_of_cleanup.append, 'cleanup happened')
  163. self.transfer_coordinator.add_failure_cleanup(cleanup_callback)
  164. submission_task()
  165. # Make sure the task failed to start
  166. self.assertEqual(self.transfer_coordinator.status, 'failed')
  167. # Make sure the cleanup was called.
  168. self.assertEqual(invocations_of_cleanup, ['cleanup happened'])
  169. def test_cleanups_only_ran_once_on_exception(self):
  170. # We want to be able to handle the case where the final task completes
  171. # and anounces done but there is an error in the submission task
  172. # which will cause it to need to anounce done as well. In this case,
  173. # we do not want the done callbacks to be invoke more than once.
  174. final_task = self.get_task(FailureTask, is_final=True)
  175. self.main_kwargs['executor'] = self.executor
  176. self.main_kwargs['tasks_to_submit'] = [final_task]
  177. submission_task = self.get_task(
  178. ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
  179. subscriber = RecordingSubscriber()
  180. self.call_args.subscribers.append(subscriber)
  181. # Add the done callback to the callbacks to be invoked when the
  182. # transfer is done.
  183. done_callbacks = get_callbacks(self.transfer_future, 'done')
  184. for done_callback in done_callbacks:
  185. self.transfer_coordinator.add_done_callback(done_callback)
  186. submission_task()
  187. # Make sure the task failed to start
  188. self.assertEqual(self.transfer_coordinator.status, 'failed')
  189. # Make sure the on_done callback of the subscriber is called only once.
  190. self.assertEqual(
  191. subscriber.on_done_calls, [{'future': self.transfer_future}])
  192. def test_done_callbacks_only_ran_once_on_exception(self):
  193. # We want to be able to handle the case where the final task completes
  194. # and anounces done but there is an error in the submission task
  195. # which will cause it to need to anounce done as well. In this case,
  196. # we do not want the failure cleanups to be invoked more than once.
  197. final_task = self.get_task(FailureTask, is_final=True)
  198. self.main_kwargs['executor'] = self.executor
  199. self.main_kwargs['tasks_to_submit'] = [final_task]
  200. submission_task = self.get_task(
  201. ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
  202. # Add the callback to the callbacks to be invoked when the
  203. # transfer fails.
  204. invocations_of_cleanup = []
  205. cleanup_callback = FunctionContainer(
  206. invocations_of_cleanup.append, 'cleanup happened')
  207. self.transfer_coordinator.add_failure_cleanup(cleanup_callback)
  208. submission_task()
  209. # Make sure the task failed to start
  210. self.assertEqual(self.transfer_coordinator.status, 'failed')
  211. # Make sure the cleanup was called only onece.
  212. self.assertEqual(invocations_of_cleanup, ['cleanup happened'])
  213. def test_handles_cleanups_submitted_in_other_tasks(self):
  214. invocations_of_cleanup = []
  215. event = Event()
  216. cleanup_callback = FunctionContainer(
  217. invocations_of_cleanup.append, 'cleanup happened')
  218. # We want the cleanup to be added in the execution of the task and
  219. # still be executed by the submission task when it fails.
  220. task = self.get_task(
  221. SuccessTask, main_kwargs={
  222. 'callbacks': [event.set],
  223. 'failure_cleanups': [cleanup_callback]
  224. }
  225. )
  226. self.main_kwargs['executor'] = self.executor
  227. self.main_kwargs['tasks_to_submit'] = [task]
  228. self.main_kwargs['additional_callbacks'] = [event.wait]
  229. submission_task = self.get_task(
  230. ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
  231. submission_task()
  232. self.assertEqual(self.transfer_coordinator.status, 'failed')
  233. # Make sure the cleanup was called even though the callback got
  234. # added in a completely different task.
  235. self.assertEqual(invocations_of_cleanup, ['cleanup happened'])
  236. def test_waits_for_tasks_submitted_by_other_tasks_on_exception(self):
  237. # In this test, we want to make sure that any tasks that may be
  238. # submitted in another task complete before we start performing
  239. # cleanups.
  240. #
  241. # This is tested by doing the following:
  242. #
  243. # ExecutionSubmissionTask
  244. # |
  245. # +--submits-->SubmitMoreTasksTask
  246. # |
  247. # +--submits-->SuccessTask
  248. # |
  249. # +-->sleeps-->adds failure cleanup
  250. #
  251. # In the end, the failure cleanup of the SuccessTask should be ran
  252. # when the ExecutionSubmissionTask fails. If the
  253. # ExeceptionSubmissionTask did not run the failure cleanup it is most
  254. # likely that it did not wait for the SuccessTask to complete, which
  255. # it needs to because the ExeceptionSubmissionTask does not know
  256. # what failure cleanups it needs to run until all spawned tasks have
  257. # completed.
  258. invocations_of_cleanup = []
  259. event = Event()
  260. cleanup_callback = FunctionContainer(
  261. invocations_of_cleanup.append, 'cleanup happened')
  262. cleanup_task = self.get_task(
  263. SuccessTask, main_kwargs={
  264. 'callbacks': [event.set],
  265. 'failure_cleanups': [cleanup_callback]
  266. }
  267. )
  268. task_for_submitting_cleanup_task = self.get_task(
  269. SubmitMoreTasksTask, main_kwargs={
  270. 'executor': self.executor,
  271. 'tasks_to_submit': [cleanup_task]
  272. }
  273. )
  274. self.main_kwargs['executor'] = self.executor
  275. self.main_kwargs['tasks_to_submit'] = [
  276. task_for_submitting_cleanup_task]
  277. self.main_kwargs['additional_callbacks'] = [event.wait]
  278. submission_task = self.get_task(
  279. ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
  280. submission_task()
  281. self.assertEqual(self.transfer_coordinator.status, 'failed')
  282. self.assertEqual(invocations_of_cleanup, ['cleanup happened'])
  283. def test_submission_task_announces_done_if_cancelled_before_main(self):
  284. invocations_of_done = []
  285. done_callback = FunctionContainer(
  286. invocations_of_done.append, 'done announced')
  287. self.transfer_coordinator.add_done_callback(done_callback)
  288. self.transfer_coordinator.cancel()
  289. submission_task = self.get_task(
  290. NOOPSubmissionTask, main_kwargs=self.main_kwargs)
  291. submission_task()
  292. # Because the submission task was cancelled before being run
  293. # it did not submit any extra tasks so a result it is responsible
  294. # for making sure it announces done as nothing else will.
  295. self.assertEqual(invocations_of_done, ['done announced'])
  296. class TestTask(unittest.TestCase):
  297. def setUp(self):
  298. self.transfer_id = 1
  299. self.transfer_coordinator = TransferCoordinator(
  300. transfer_id=self.transfer_id)
  301. def test_repr(self):
  302. main_kwargs = {
  303. 'bucket': 'mybucket',
  304. 'param_to_not_include': 'foo'
  305. }
  306. task = ReturnKwargsTask(
  307. self.transfer_coordinator, main_kwargs=main_kwargs)
  308. # The repr should not include the other parameter because it is not
  309. # a desired parameter to include.
  310. self.assertEqual(
  311. repr(task),
  312. 'ReturnKwargsTask(transfer_id=%s, %s)' % (
  313. self.transfer_id, {'bucket': 'mybucket'})
  314. )
  315. def test_transfer_id(self):
  316. task = SuccessTask(self.transfer_coordinator)
  317. # Make sure that the id is the one provided to the id associated
  318. # to the transfer coordinator.
  319. self.assertEqual(task.transfer_id, self.transfer_id)
  320. def test_context_status_transitioning_success(self):
  321. # The status should be set to running.
  322. self.transfer_coordinator.set_status_to_running()
  323. self.assertEqual(self.transfer_coordinator.status, 'running')
  324. # If a task is called, the status still should be running.
  325. SuccessTask(self.transfer_coordinator)()
  326. self.assertEqual(self.transfer_coordinator.status, 'running')
  327. # Once the final task is called, the status should be set to success.
  328. SuccessTask(self.transfer_coordinator, is_final=True)()
  329. self.assertEqual(self.transfer_coordinator.status, 'success')
  330. def test_context_status_transitioning_failed(self):
  331. self.transfer_coordinator.set_status_to_running()
  332. SuccessTask(self.transfer_coordinator)()
  333. self.assertEqual(self.transfer_coordinator.status, 'running')
  334. # A failure task should result in the failed status
  335. FailureTask(self.transfer_coordinator)()
  336. self.assertEqual(self.transfer_coordinator.status, 'failed')
  337. # Even if the final task comes in and succeeds, it should stay failed.
  338. SuccessTask(self.transfer_coordinator, is_final=True)()
  339. self.assertEqual(self.transfer_coordinator.status, 'failed')
  340. def test_result_setting_for_success(self):
  341. override_return = 'foo'
  342. SuccessTask(self.transfer_coordinator)()
  343. SuccessTask(self.transfer_coordinator, main_kwargs={
  344. 'return_value': override_return}, is_final=True)()
  345. # The return value for the transfer future should be of the final
  346. # task.
  347. self.assertEqual(self.transfer_coordinator.result(), override_return)
  348. def test_result_setting_for_error(self):
  349. FailureTask(self.transfer_coordinator)()
  350. # If another failure comes in, the result should still throw the
  351. # original exception when result() is eventually called.
  352. FailureTask(self.transfer_coordinator, main_kwargs={
  353. 'exception': Exception})()
  354. # Even if a success task comes along, the result of the future
  355. # should be the original exception
  356. SuccessTask(self.transfer_coordinator, is_final=True)()
  357. with self.assertRaises(TaskFailureException):
  358. self.transfer_coordinator.result()
  359. def test_done_callbacks_success(self):
  360. callback_results = []
  361. SuccessTask(self.transfer_coordinator, done_callbacks=[
  362. partial(callback_results.append, 'first'),
  363. partial(callback_results.append, 'second')
  364. ])()
  365. # For successful tasks, the done callbacks should get called.
  366. self.assertEqual(callback_results, ['first', 'second'])
  367. def test_done_callbacks_failure(self):
  368. callback_results = []
  369. FailureTask(self.transfer_coordinator, done_callbacks=[
  370. partial(callback_results.append, 'first'),
  371. partial(callback_results.append, 'second')
  372. ])()
  373. # For even failed tasks, the done callbacks should get called.
  374. self.assertEqual(callback_results, ['first', 'second'])
  375. # Callbacks should continue to be called even after a related failure
  376. SuccessTask(self.transfer_coordinator, done_callbacks=[
  377. partial(callback_results.append, 'third'),
  378. partial(callback_results.append, 'fourth')
  379. ])()
  380. self.assertEqual(
  381. callback_results, ['first', 'second', 'third', 'fourth'])
  382. def test_failure_cleanups_on_failure(self):
  383. callback_results = []
  384. self.transfer_coordinator.add_failure_cleanup(
  385. callback_results.append, 'first')
  386. self.transfer_coordinator.add_failure_cleanup(
  387. callback_results.append, 'second')
  388. FailureTask(self.transfer_coordinator)()
  389. # The failure callbacks should have not been called yet because it
  390. # is not the last task
  391. self.assertEqual(callback_results, [])
  392. # Now the failure callbacks should get called.
  393. SuccessTask(self.transfer_coordinator, is_final=True)()
  394. self.assertEqual(callback_results, ['first', 'second'])
  395. def test_no_failure_cleanups_on_success(self):
  396. callback_results = []
  397. self.transfer_coordinator.add_failure_cleanup(
  398. callback_results.append, 'first')
  399. self.transfer_coordinator.add_failure_cleanup(
  400. callback_results.append, 'second')
  401. SuccessTask(self.transfer_coordinator, is_final=True)()
  402. # The failure cleanups should not have been called because no task
  403. # failed for the transfer context.
  404. self.assertEqual(callback_results, [])
  405. def test_passing_main_kwargs(self):
  406. main_kwargs = {'foo': 'bar', 'baz': 'biz'}
  407. ReturnKwargsTask(
  408. self.transfer_coordinator, main_kwargs=main_kwargs,
  409. is_final=True)()
  410. # The kwargs should have been passed to the main()
  411. self.assertEqual(self.transfer_coordinator.result(), main_kwargs)
  412. def test_passing_pending_kwargs_single_futures(self):
  413. pending_kwargs = {}
  414. ref_main_kwargs = {'foo': 'bar', 'baz': 'biz'}
  415. # Pass some tasks to an executor
  416. with futures.ThreadPoolExecutor(1) as executor:
  417. pending_kwargs['foo'] = executor.submit(
  418. SuccessTask(
  419. self.transfer_coordinator,
  420. main_kwargs={'return_value': ref_main_kwargs['foo']}
  421. )
  422. )
  423. pending_kwargs['baz'] = executor.submit(
  424. SuccessTask(
  425. self.transfer_coordinator,
  426. main_kwargs={'return_value': ref_main_kwargs['baz']}
  427. )
  428. )
  429. # Create a task that depends on the tasks passed to the executor
  430. ReturnKwargsTask(
  431. self.transfer_coordinator, pending_main_kwargs=pending_kwargs,
  432. is_final=True)()
  433. # The result should have the pending keyword arg values flushed
  434. # out.
  435. self.assertEqual(self.transfer_coordinator.result(), ref_main_kwargs)
  436. def test_passing_pending_kwargs_list_of_futures(self):
  437. pending_kwargs = {}
  438. ref_main_kwargs = {'foo': ['first', 'second']}
  439. # Pass some tasks to an executor
  440. with futures.ThreadPoolExecutor(1) as executor:
  441. first_future = executor.submit(
  442. SuccessTask(
  443. self.transfer_coordinator,
  444. main_kwargs={'return_value': ref_main_kwargs['foo'][0]}
  445. )
  446. )
  447. second_future = executor.submit(
  448. SuccessTask(
  449. self.transfer_coordinator,
  450. main_kwargs={'return_value': ref_main_kwargs['foo'][1]}
  451. )
  452. )
  453. # Make the pending keyword arg value a list
  454. pending_kwargs['foo'] = [first_future, second_future]
  455. # Create a task that depends on the tasks passed to the executor
  456. ReturnKwargsTask(
  457. self.transfer_coordinator, pending_main_kwargs=pending_kwargs,
  458. is_final=True)()
  459. # The result should have the pending keyword arg values flushed
  460. # out in the expected order.
  461. self.assertEqual(self.transfer_coordinator.result(), ref_main_kwargs)
  462. def test_passing_pending_and_non_pending_kwargs(self):
  463. main_kwargs = {'nonpending_value': 'foo'}
  464. pending_kwargs = {}
  465. ref_main_kwargs = {
  466. 'nonpending_value': 'foo',
  467. 'pending_value': 'bar',
  468. 'pending_list': ['first', 'second']
  469. }
  470. # Create the pending tasks
  471. with futures.ThreadPoolExecutor(1) as executor:
  472. pending_kwargs['pending_value'] = executor.submit(
  473. SuccessTask(
  474. self.transfer_coordinator,
  475. main_kwargs={'return_value':
  476. ref_main_kwargs['pending_value']}
  477. )
  478. )
  479. first_future = executor.submit(
  480. SuccessTask(
  481. self.transfer_coordinator,
  482. main_kwargs={'return_value':
  483. ref_main_kwargs['pending_list'][0]}
  484. )
  485. )
  486. second_future = executor.submit(
  487. SuccessTask(
  488. self.transfer_coordinator,
  489. main_kwargs={'return_value':
  490. ref_main_kwargs['pending_list'][1]}
  491. )
  492. )
  493. # Make the pending keyword arg value a list
  494. pending_kwargs['pending_list'] = [first_future, second_future]
  495. # Create a task that depends on the tasks passed to the executor
  496. # and just regular nonpending kwargs.
  497. ReturnKwargsTask(
  498. self.transfer_coordinator, main_kwargs=main_kwargs,
  499. pending_main_kwargs=pending_kwargs,
  500. is_final=True)()
  501. # The result should have all of the kwargs (both pending and
  502. # nonpending)
  503. self.assertEqual(self.transfer_coordinator.result(), ref_main_kwargs)
  504. def test_single_failed_pending_future(self):
  505. pending_kwargs = {}
  506. # Pass some tasks to an executor. Make one successful and the other
  507. # a failure.
  508. with futures.ThreadPoolExecutor(1) as executor:
  509. pending_kwargs['foo'] = executor.submit(
  510. SuccessTask(
  511. self.transfer_coordinator,
  512. main_kwargs={'return_value': 'bar'}
  513. )
  514. )
  515. pending_kwargs['baz'] = executor.submit(
  516. FailureTask(self.transfer_coordinator))
  517. # Create a task that depends on the tasks passed to the executor
  518. ReturnKwargsTask(
  519. self.transfer_coordinator, pending_main_kwargs=pending_kwargs,
  520. is_final=True)()
  521. # The end result should raise the exception from the initial
  522. # pending future value
  523. with self.assertRaises(TaskFailureException):
  524. self.transfer_coordinator.result()
  525. def test_single_failed_pending_future_in_list(self):
  526. pending_kwargs = {}
  527. # Pass some tasks to an executor. Make one successful and the other
  528. # a failure.
  529. with futures.ThreadPoolExecutor(1) as executor:
  530. first_future = executor.submit(
  531. SuccessTask(
  532. self.transfer_coordinator,
  533. main_kwargs={'return_value': 'bar'}
  534. )
  535. )
  536. second_future = executor.submit(
  537. FailureTask(self.transfer_coordinator))
  538. pending_kwargs['pending_list'] = [first_future, second_future]
  539. # Create a task that depends on the tasks passed to the executor
  540. ReturnKwargsTask(
  541. self.transfer_coordinator, pending_main_kwargs=pending_kwargs,
  542. is_final=True)()
  543. # The end result should raise the exception from the initial
  544. # pending future value in the list
  545. with self.assertRaises(TaskFailureException):
  546. self.transfer_coordinator.result()
  547. class BaseMultipartTaskTest(BaseTaskTest):
  548. def setUp(self):
  549. super(BaseMultipartTaskTest, self).setUp()
  550. self.bucket = 'mybucket'
  551. self.key = 'foo'
  552. class TestCreateMultipartUploadTask(BaseMultipartTaskTest):
  553. def test_main(self):
  554. upload_id = 'foo'
  555. extra_args = {'Metadata': {'foo': 'bar'}}
  556. response = {'UploadId': upload_id}
  557. task = self.get_task(
  558. CreateMultipartUploadTask,
  559. main_kwargs={
  560. 'client': self.client,
  561. 'bucket': self.bucket,
  562. 'key': self.key,
  563. 'extra_args': extra_args
  564. }
  565. )
  566. self.stubber.add_response(
  567. method='create_multipart_upload',
  568. service_response=response,
  569. expected_params={
  570. 'Bucket': self.bucket, 'Key': self.key,
  571. 'Metadata': {'foo': 'bar'}
  572. }
  573. )
  574. result_id = task()
  575. self.stubber.assert_no_pending_responses()
  576. # Ensure the upload id returned is correct
  577. self.assertEqual(upload_id, result_id)
  578. # Make sure that the abort was added as a cleanup failure
  579. self.assertEqual(len(self.transfer_coordinator.failure_cleanups), 1)
  580. # Make sure if it is called, it will abort correctly
  581. self.stubber.add_response(
  582. method='abort_multipart_upload',
  583. service_response={},
  584. expected_params={
  585. 'Bucket': self.bucket,
  586. 'Key': self.key,
  587. 'UploadId': upload_id
  588. }
  589. )
  590. self.transfer_coordinator.failure_cleanups[0]()
  591. self.stubber.assert_no_pending_responses()
  592. class TestCompleteMultipartUploadTask(BaseMultipartTaskTest):
  593. def test_main(self):
  594. upload_id = 'my-id'
  595. parts = [{'ETag': 'etag', 'PartNumber': 0}]
  596. task = self.get_task(
  597. CompleteMultipartUploadTask,
  598. main_kwargs={
  599. 'client': self.client,
  600. 'bucket': self.bucket,
  601. 'key': self.key,
  602. 'upload_id': upload_id,
  603. 'parts': parts,
  604. 'extra_args': {}
  605. }
  606. )
  607. self.stubber.add_response(
  608. method='complete_multipart_upload',
  609. service_response={},
  610. expected_params={
  611. 'Bucket': self.bucket, 'Key': self.key,
  612. 'UploadId': upload_id,
  613. 'MultipartUpload': {
  614. 'Parts': parts
  615. }
  616. }
  617. )
  618. task()
  619. self.stubber.assert_no_pending_responses()
  620. def test_includes_extra_args(self):
  621. upload_id = 'my-id'
  622. parts = [{'ETag': 'etag', 'PartNumber': 0}]
  623. task = self.get_task(
  624. CompleteMultipartUploadTask,
  625. main_kwargs={
  626. 'client': self.client,
  627. 'bucket': self.bucket,
  628. 'key': self.key,
  629. 'upload_id': upload_id,
  630. 'parts': parts,
  631. 'extra_args': {'RequestPayer': 'requester'}
  632. }
  633. )
  634. self.stubber.add_response(
  635. method='complete_multipart_upload',
  636. service_response={},
  637. expected_params={
  638. 'Bucket': self.bucket, 'Key': self.key,
  639. 'UploadId': upload_id,
  640. 'MultipartUpload': {
  641. 'Parts': parts
  642. },
  643. 'RequestPayer': 'requester'
  644. }
  645. )
  646. task()
  647. self.stubber.assert_no_pending_responses()