_common.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. from __future__ import unicode_literals
  2. import os
  3. import time
  4. import subprocess
  5. import warnings
  6. import tempfile
  7. import pickle
  8. import pytest
  9. class PicklableMixin(object):
  10. def _get_nobj_bytes(self, obj, dump_kwargs, load_kwargs):
  11. """
  12. Pickle and unpickle an object using ``pickle.dumps`` / ``pickle.loads``
  13. """
  14. pkl = pickle.dumps(obj, **dump_kwargs)
  15. return pickle.loads(pkl, **load_kwargs)
  16. def _get_nobj_file(self, obj, dump_kwargs, load_kwargs):
  17. """
  18. Pickle and unpickle an object using ``pickle.dump`` / ``pickle.load`` on
  19. a temporary file.
  20. """
  21. with tempfile.TemporaryFile('w+b') as pkl:
  22. pickle.dump(obj, pkl, **dump_kwargs)
  23. pkl.seek(0) # Reset the file to the beginning to read it
  24. nobj = pickle.load(pkl, **load_kwargs)
  25. return nobj
  26. def assertPicklable(self, obj, singleton=False, asfile=False,
  27. dump_kwargs=None, load_kwargs=None):
  28. """
  29. Assert that an object can be pickled and unpickled. This assertion
  30. assumes that the desired behavior is that the unpickled object compares
  31. equal to the original object, but is not the same object.
  32. """
  33. get_nobj = self._get_nobj_file if asfile else self._get_nobj_bytes
  34. dump_kwargs = dump_kwargs or {}
  35. load_kwargs = load_kwargs or {}
  36. nobj = get_nobj(obj, dump_kwargs, load_kwargs)
  37. if not singleton:
  38. self.assertIsNot(obj, nobj)
  39. self.assertEqual(obj, nobj)
  40. class TZContextBase(object):
  41. """
  42. Base class for a context manager which allows changing of time zones.
  43. Subclasses may define a guard variable to either block or or allow time
  44. zone changes by redefining ``_guard_var_name`` and ``_guard_allows_change``.
  45. The default is that the guard variable must be affirmatively set.
  46. Subclasses must define ``get_current_tz`` and ``set_current_tz``.
  47. """
  48. _guard_var_name = "DATEUTIL_MAY_CHANGE_TZ"
  49. _guard_allows_change = True
  50. def __init__(self, tzval):
  51. self.tzval = tzval
  52. self._old_tz = None
  53. @classmethod
  54. def tz_change_allowed(cls):
  55. """
  56. Class method used to query whether or not this class allows time zone
  57. changes.
  58. """
  59. guard = bool(os.environ.get(cls._guard_var_name, False))
  60. # _guard_allows_change gives the "default" behavior - if True, the
  61. # guard is overcoming a block. If false, the guard is causing a block.
  62. # Whether tz_change is allowed is therefore the XNOR of the two.
  63. return guard == cls._guard_allows_change
  64. @classmethod
  65. def tz_change_disallowed_message(cls):
  66. """ Generate instructions on how to allow tz changes """
  67. msg = ('Changing time zone not allowed. Set {envar} to {gval} '
  68. 'if you would like to allow this behavior')
  69. return msg.format(envar=cls._guard_var_name,
  70. gval=cls._guard_allows_change)
  71. def __enter__(self):
  72. if not self.tz_change_allowed():
  73. msg = self.tz_change_disallowed_message()
  74. pytest.skip(msg)
  75. # If this is used outside of a test suite, we still want an error.
  76. raise ValueError(msg) # pragma: no cover
  77. self._old_tz = self.get_current_tz()
  78. self.set_current_tz(self.tzval)
  79. def __exit__(self, type, value, traceback):
  80. if self._old_tz is not None:
  81. self.set_current_tz(self._old_tz)
  82. self._old_tz = None
  83. def get_current_tz(self):
  84. raise NotImplementedError
  85. def set_current_tz(self):
  86. raise NotImplementedError
  87. class TZEnvContext(TZContextBase):
  88. """
  89. Context manager that temporarily sets the `TZ` variable (for use on
  90. *nix-like systems). Because the effect is local to the shell anyway, this
  91. will apply *unless* a guard is set.
  92. If you do not want the TZ environment variable set, you may set the
  93. ``DATEUTIL_MAY_NOT_CHANGE_TZ_VAR`` variable to a truthy value.
  94. """
  95. _guard_var_name = "DATEUTIL_MAY_NOT_CHANGE_TZ_VAR"
  96. _guard_allows_change = False
  97. def get_current_tz(self):
  98. return os.environ.get('TZ', UnsetTz)
  99. def set_current_tz(self, tzval):
  100. if tzval is UnsetTz and 'TZ' in os.environ:
  101. del os.environ['TZ']
  102. else:
  103. os.environ['TZ'] = tzval
  104. time.tzset()
  105. class TZWinContext(TZContextBase):
  106. """
  107. Context manager for changing local time zone on Windows.
  108. Because the effect of this is system-wide and global, it may have
  109. unintended side effect. Set the ``DATEUTIL_MAY_CHANGE_TZ`` environment
  110. variable to a truthy value before using this context manager.
  111. """
  112. def get_current_tz(self):
  113. p = subprocess.Popen(['tzutil', '/g'], stdout=subprocess.PIPE)
  114. ctzname, err = p.communicate()
  115. ctzname = ctzname.decode() # Popen returns
  116. if p.returncode:
  117. raise OSError('Failed to get current time zone: ' + err)
  118. return ctzname
  119. def set_current_tz(self, tzname):
  120. p = subprocess.Popen('tzutil /s "' + tzname + '"')
  121. out, err = p.communicate()
  122. if p.returncode:
  123. raise OSError('Failed to set current time zone: ' +
  124. (err or 'Unknown error.'))
  125. ###
  126. # Utility classes
  127. class NotAValueClass(object):
  128. """
  129. A class analogous to NaN that has operations defined for any type.
  130. """
  131. def _op(self, other):
  132. return self # Operation with NotAValue returns NotAValue
  133. def _cmp(self, other):
  134. return False
  135. __add__ = __radd__ = _op
  136. __sub__ = __rsub__ = _op
  137. __mul__ = __rmul__ = _op
  138. __div__ = __rdiv__ = _op
  139. __truediv__ = __rtruediv__ = _op
  140. __floordiv__ = __rfloordiv__ = _op
  141. __lt__ = __rlt__ = _op
  142. __gt__ = __rgt__ = _op
  143. __eq__ = __req__ = _op
  144. __le__ = __rle__ = _op
  145. __ge__ = __rge__ = _op
  146. NotAValue = NotAValueClass()
  147. class ComparesEqualClass(object):
  148. """
  149. A class that is always equal to whatever you compare it to.
  150. """
  151. def __eq__(self, other):
  152. return True
  153. def __ne__(self, other):
  154. return False
  155. def __le__(self, other):
  156. return True
  157. def __ge__(self, other):
  158. return True
  159. def __lt__(self, other):
  160. return False
  161. def __gt__(self, other):
  162. return False
  163. __req__ = __eq__
  164. __rne__ = __ne__
  165. __rle__ = __le__
  166. __rge__ = __ge__
  167. __rlt__ = __lt__
  168. __rgt__ = __gt__
  169. ComparesEqual = ComparesEqualClass()
  170. class UnsetTzClass(object):
  171. """ Sentinel class for unset time zone variable """
  172. pass
  173. UnsetTz = UnsetTzClass()