test_manager.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import time
  14. from concurrent.futures import ThreadPoolExecutor
  15. from __tests__ import unittest
  16. from __tests__ import TransferCoordinatorWithInterrupt
  17. from s3transfer.exceptions import CancelledError
  18. from s3transfer.exceptions import FatalError
  19. from s3transfer.futures import TransferCoordinator
  20. from s3transfer.manager import TransferConfig
  21. from s3transfer.manager import TransferCoordinatorController
  22. class FutureResultException(Exception):
  23. pass
  24. class TestTransferConfig(unittest.TestCase):
  25. def test_exception_on_zero_attr_value(self):
  26. with self.assertRaises(ValueError):
  27. TransferConfig(max_request_queue_size=0)
  28. class TestTransferCoordinatorController(unittest.TestCase):
  29. def setUp(self):
  30. self.coordinator_controller = TransferCoordinatorController()
  31. def sleep_then_announce_done(self, transfer_coordinator, sleep_time):
  32. time.sleep(sleep_time)
  33. transfer_coordinator.set_result('done')
  34. transfer_coordinator.announce_done()
  35. def assert_coordinator_is_cancelled(self, transfer_coordinator):
  36. self.assertEqual(transfer_coordinator.status, 'cancelled')
  37. def test_add_transfer_coordinator(self):
  38. transfer_coordinator = TransferCoordinator()
  39. # Add the transfer coordinator
  40. self.coordinator_controller.add_transfer_coordinator(
  41. transfer_coordinator)
  42. # Ensure that is tracked.
  43. self.assertEqual(
  44. self.coordinator_controller.tracked_transfer_coordinators,
  45. set([transfer_coordinator]))
  46. def test_remove_transfer_coordinator(self):
  47. transfer_coordinator = TransferCoordinator()
  48. # Add the coordinator
  49. self.coordinator_controller.add_transfer_coordinator(
  50. transfer_coordinator)
  51. # Now remove the coordinator
  52. self.coordinator_controller.remove_transfer_coordinator(
  53. transfer_coordinator)
  54. # Make sure that it is no longer getting tracked.
  55. self.assertEqual(
  56. self.coordinator_controller.tracked_transfer_coordinators, set())
  57. def test_cancel(self):
  58. transfer_coordinator = TransferCoordinator()
  59. # Add the transfer coordinator
  60. self.coordinator_controller.add_transfer_coordinator(
  61. transfer_coordinator)
  62. # Cancel with the canceler
  63. self.coordinator_controller.cancel()
  64. # Check that coordinator got canceled
  65. self.assert_coordinator_is_cancelled(transfer_coordinator)
  66. def test_cancel_with_message(self):
  67. message = 'my cancel message'
  68. transfer_coordinator = TransferCoordinator()
  69. self.coordinator_controller.add_transfer_coordinator(
  70. transfer_coordinator)
  71. self.coordinator_controller.cancel(message)
  72. transfer_coordinator.announce_done()
  73. with self.assertRaisesRegexp(CancelledError, message):
  74. transfer_coordinator.result()
  75. def test_cancel_with_provided_exception(self):
  76. message = 'my cancel message'
  77. transfer_coordinator = TransferCoordinator()
  78. self.coordinator_controller.add_transfer_coordinator(
  79. transfer_coordinator)
  80. self.coordinator_controller.cancel(message, exc_type=FatalError)
  81. transfer_coordinator.announce_done()
  82. with self.assertRaisesRegexp(FatalError, message):
  83. transfer_coordinator.result()
  84. def test_wait_for_done_transfer_coordinators(self):
  85. # Create a coordinator and add it to the canceler
  86. transfer_coordinator = TransferCoordinator()
  87. self.coordinator_controller.add_transfer_coordinator(
  88. transfer_coordinator)
  89. sleep_time = 0.02
  90. with ThreadPoolExecutor(max_workers=1) as executor:
  91. # In a seperate thread sleep and then set the transfer coordinator
  92. # to done after sleeping.
  93. start_time = time.time()
  94. executor.submit(
  95. self.sleep_then_announce_done, transfer_coordinator,
  96. sleep_time)
  97. # Now call wait to wait for the transfer coordinator to be done.
  98. self.coordinator_controller.wait()
  99. end_time = time.time()
  100. wait_time = end_time - start_time
  101. # The time waited should not be less than the time it took to sleep in
  102. # the seperate thread because the wait ending should be dependent on
  103. # the sleeping thread announcing that the transfer coordinator is done.
  104. self.assertTrue(sleep_time <= wait_time)
  105. def test_wait_does_not_propogate_exceptions_from_result(self):
  106. transfer_coordinator = TransferCoordinator()
  107. transfer_coordinator.set_exception(FutureResultException())
  108. transfer_coordinator.announce_done()
  109. try:
  110. self.coordinator_controller.wait()
  111. except FutureResultException as e:
  112. self.fail('%s should not have been raised.' % e)
  113. def test_wait_can_be_interrupted(self):
  114. inject_interrupt_coordinator = TransferCoordinatorWithInterrupt()
  115. self.coordinator_controller.add_transfer_coordinator(
  116. inject_interrupt_coordinator)
  117. with self.assertRaises(KeyboardInterrupt):
  118. self.coordinator_controller.wait()