test_compat.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import multiprocessing.managers
  14. import os
  15. import tempfile
  16. import shutil
  17. import signal
  18. from botocore.compat import six
  19. from __tests__ import unittest
  20. from __tests__ import skip_if_windows
  21. from s3transfer.compat import seekable, readable
  22. from s3transfer.compat import BaseManager
  23. class ErrorRaisingSeekWrapper(object):
  24. """An object wrapper that throws an error when seeked on
  25. :param fileobj: The fileobj that it wraps
  26. :param execption: The exception to raise when seeked on.
  27. """
  28. def __init__(self, fileobj, exception):
  29. self._fileobj = fileobj
  30. self._exception = exception
  31. def seek(self, offset, whence=0):
  32. raise self._exception
  33. def tell(self):
  34. return self._fileobj.tell()
  35. class TestSeekable(unittest.TestCase):
  36. def setUp(self):
  37. self.tempdir = tempfile.mkdtemp()
  38. self.filename = os.path.join(self.tempdir, 'foo')
  39. def tearDown(self):
  40. shutil.rmtree(self.tempdir)
  41. def test_seekable_fileobj(self):
  42. with open(self.filename, 'w') as f:
  43. self.assertTrue(seekable(f))
  44. def test_non_file_like_obj(self):
  45. # Fails becase there is no seekable(), seek(), nor tell()
  46. self.assertFalse(seekable(object()))
  47. def test_non_seekable_ioerror(self):
  48. # Should return False if IOError is thrown.
  49. with open(self.filename, 'w') as f:
  50. self.assertFalse(seekable(ErrorRaisingSeekWrapper(f, IOError())))
  51. def test_non_seekable_oserror(self):
  52. # Should return False if OSError is thrown.
  53. with open(self.filename, 'w') as f:
  54. self.assertFalse(seekable(ErrorRaisingSeekWrapper(f, OSError())))
  55. class TestReadable(unittest.TestCase):
  56. def test_readable_fileobj(self):
  57. with tempfile.TemporaryFile() as f:
  58. self.assertTrue(readable(f))
  59. def test_readable_file_like_obj(self):
  60. self.assertTrue(readable(six.BytesIO()))
  61. def test_non_file_like_obj(self):
  62. self.assertFalse(readable(object()))
  63. class TestBaseManager(unittest.TestCase):
  64. def create_pid_manager(self):
  65. class PIDManager(BaseManager):
  66. pass
  67. PIDManager.register('getpid', os.getpid)
  68. return PIDManager()
  69. def get_pid(self, pid_manager):
  70. pid = pid_manager.getpid()
  71. # A proxy object is returned back. The needed value can be acquired
  72. # from the repr and converting that to an integer
  73. return int(str(pid))
  74. @skip_if_windows('os.kill() with SIGINT not supported on Windows')
  75. def test_can_provide_signal_handler_initializers_to_start(self):
  76. manager = self.create_pid_manager()
  77. manager.start(signal.signal, (signal.SIGINT, signal.SIG_IGN))
  78. pid = self.get_pid(manager)
  79. try:
  80. os.kill(pid, signal.SIGINT)
  81. except KeyboardInterrupt:
  82. pass
  83. # Try using the manager after the os.kill on the parent process. The
  84. # manager should not have died and should still be usable.
  85. self.assertEqual(pid, self.get_pid(manager))