|
@@ -15,21 +15,33 @@ import warnings
|
|
|
import types
|
|
|
import numbers
|
|
|
import inspect
|
|
|
+from typing import TYPE_CHECKING, overload
|
|
|
+from typing import Any, Awaitable, Callable, Dict, Iterator, List, Optional, Set, Type, TypeVar, Tuple, Union
|
|
|
|
|
|
from dateutil import parser
|
|
|
from dateutil.tz import tzlocal
|
|
|
|
|
|
try:
|
|
|
- from maya import MayaDT
|
|
|
+ from maya import MayaDT # type: ignore
|
|
|
except ImportError:
|
|
|
MayaDT = None
|
|
|
|
|
|
+if TYPE_CHECKING:
|
|
|
+ from typing_extensions import ParamSpec
|
|
|
+
|
|
|
+ P = ParamSpec("P")
|
|
|
+
|
|
|
+T = TypeVar("T")
|
|
|
+
|
|
|
_TIME_NS_PRESENT = hasattr(time, 'time_ns')
|
|
|
_MONOTONIC_NS_PRESENT = hasattr(time, 'monotonic_ns')
|
|
|
_PERF_COUNTER_NS_PRESENT = hasattr(time, 'perf_counter_ns')
|
|
|
_EPOCH = datetime.datetime(1970, 1, 1)
|
|
|
_EPOCHTZ = datetime.datetime(1970, 1, 1, tzinfo=dateutil.tz.UTC)
|
|
|
|
|
|
+T2 = TypeVar("T2")
|
|
|
+_Freezable = Union[str, datetime.datetime, datetime.date, datetime.timedelta, types.FunctionType, Callable[[], Union[str, datetime.datetime, datetime.date, datetime.timedelta]], Iterator[datetime.datetime]]
|
|
|
+
|
|
|
real_time = time.time
|
|
|
real_localtime = time.localtime
|
|
|
real_gmtime = time.gmtime
|
|
@@ -57,38 +69,40 @@ _real_time_object_ids = {id(obj) for obj in real_date_objects}
|
|
|
# time.clock is deprecated and was removed in Python 3.8
|
|
|
real_clock = getattr(time, 'clock', None)
|
|
|
|
|
|
-freeze_factories = []
|
|
|
-tz_offsets = []
|
|
|
-ignore_lists = []
|
|
|
-tick_flags = []
|
|
|
+freeze_factories: List[Union["StepTickTimeFactory", "TickingDateTimeFactory", "FrozenDateTimeFactory"]] = []
|
|
|
+tz_offsets: List[datetime.timedelta] = []
|
|
|
+ignore_lists: List[Tuple[str, ...]] = []
|
|
|
+tick_flags: List[bool] = []
|
|
|
|
|
|
try:
|
|
|
# noinspection PyUnresolvedReferences
|
|
|
- real_uuid_generate_time = uuid._uuid_generate_time
|
|
|
+ real_uuid_generate_time = uuid._uuid_generate_time # type: ignore
|
|
|
uuid_generate_time_attr = '_uuid_generate_time'
|
|
|
except AttributeError:
|
|
|
# noinspection PyUnresolvedReferences
|
|
|
- uuid._load_system_functions()
|
|
|
+ if hasattr(uuid, '_load_system_functions'):
|
|
|
+ # A no-op after Python ~3.9, being removed in 3.13.
|
|
|
+ uuid._load_system_functions()
|
|
|
# noinspection PyUnresolvedReferences
|
|
|
- real_uuid_generate_time = uuid._generate_time_safe
|
|
|
+ real_uuid_generate_time = uuid._generate_time_safe # type: ignore
|
|
|
uuid_generate_time_attr = '_generate_time_safe'
|
|
|
except ImportError:
|
|
|
real_uuid_generate_time = None
|
|
|
- uuid_generate_time_attr = None
|
|
|
+ uuid_generate_time_attr = None # type: ignore
|
|
|
|
|
|
try:
|
|
|
# noinspection PyUnresolvedReferences
|
|
|
- real_uuid_create = uuid._UuidCreate
|
|
|
+ real_uuid_create = uuid._UuidCreate # type: ignore
|
|
|
except (AttributeError, ImportError):
|
|
|
real_uuid_create = None
|
|
|
|
|
|
|
|
|
# keep a cache of module attributes otherwise freezegun will need to analyze too many modules all the time
|
|
|
-_GLOBAL_MODULES_CACHE = {}
|
|
|
+_GLOBAL_MODULES_CACHE: Dict[str, Tuple[str, List[Tuple[str, Any]]]] = {}
|
|
|
|
|
|
|
|
|
-def _get_module_attributes(module):
|
|
|
- result = []
|
|
|
+def _get_module_attributes(module: types.ModuleType) -> List[Tuple[str, Any]]:
|
|
|
+ result: List[Tuple[str, Any]] = []
|
|
|
try:
|
|
|
module_attributes = dir(module)
|
|
|
except (ImportError, TypeError):
|
|
@@ -104,7 +118,7 @@ def _get_module_attributes(module):
|
|
|
return result
|
|
|
|
|
|
|
|
|
-def _setup_module_cache(module):
|
|
|
+def _setup_module_cache(module: types.ModuleType) -> None:
|
|
|
date_attrs = []
|
|
|
all_module_attributes = _get_module_attributes(module)
|
|
|
for attribute_name, attribute_value in all_module_attributes:
|
|
@@ -113,7 +127,7 @@ def _setup_module_cache(module):
|
|
|
_GLOBAL_MODULES_CACHE[module.__name__] = (_get_module_attributes_hash(module), date_attrs)
|
|
|
|
|
|
|
|
|
-def _get_module_attributes_hash(module):
|
|
|
+def _get_module_attributes_hash(module: types.ModuleType) -> str:
|
|
|
try:
|
|
|
module_dir = dir(module)
|
|
|
except (ImportError, TypeError):
|
|
@@ -121,7 +135,7 @@ def _get_module_attributes_hash(module):
|
|
|
return f'{id(module)}-{hash(frozenset(module_dir))}'
|
|
|
|
|
|
|
|
|
-def _get_cached_module_attributes(module):
|
|
|
+def _get_cached_module_attributes(module: types.ModuleType) -> List[Tuple[str, Any]]:
|
|
|
module_hash, cached_attrs = _GLOBAL_MODULES_CACHE.get(module.__name__, ('0', []))
|
|
|
if _get_module_attributes_hash(module) == module_hash:
|
|
|
return cached_attrs
|
|
@@ -142,7 +156,7 @@ _is_cpython = (
|
|
|
call_stack_inspection_limit = 5
|
|
|
|
|
|
|
|
|
-def _should_use_real_time():
|
|
|
+def _should_use_real_time() -> bool:
|
|
|
if not call_stack_inspection_limit:
|
|
|
return False
|
|
|
|
|
@@ -153,38 +167,38 @@ def _should_use_real_time():
|
|
|
if not ignore_lists[-1]:
|
|
|
return False
|
|
|
|
|
|
- frame = inspect.currentframe().f_back.f_back
|
|
|
+ frame = inspect.currentframe().f_back.f_back # type: ignore
|
|
|
|
|
|
for _ in range(call_stack_inspection_limit):
|
|
|
- module_name = frame.f_globals.get('__name__')
|
|
|
+ module_name = frame.f_globals.get('__name__') # type: ignore
|
|
|
if module_name and module_name.startswith(ignore_lists[-1]):
|
|
|
return True
|
|
|
|
|
|
- frame = frame.f_back
|
|
|
+ frame = frame.f_back # type: ignore
|
|
|
if frame is None:
|
|
|
break
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
-def get_current_time():
|
|
|
+def get_current_time() -> datetime.datetime:
|
|
|
return freeze_factories[-1]()
|
|
|
|
|
|
|
|
|
-def fake_time():
|
|
|
+def fake_time() -> float:
|
|
|
if _should_use_real_time():
|
|
|
return real_time()
|
|
|
current_time = get_current_time()
|
|
|
return calendar.timegm(current_time.timetuple()) + current_time.microsecond / 1000000.0
|
|
|
|
|
|
if _TIME_NS_PRESENT:
|
|
|
- def fake_time_ns():
|
|
|
+ def fake_time_ns() -> int:
|
|
|
if _should_use_real_time():
|
|
|
return real_time_ns()
|
|
|
- return int(int(fake_time()) * 1e9)
|
|
|
+ return int(fake_time() * 1e9)
|
|
|
|
|
|
|
|
|
-def fake_localtime(t=None):
|
|
|
+def fake_localtime(t: Optional[float]=None) -> time.struct_time:
|
|
|
if t is not None:
|
|
|
return real_localtime(t)
|
|
|
if _should_use_real_time():
|
|
@@ -193,7 +207,7 @@ def fake_localtime(t=None):
|
|
|
return shifted_time.timetuple()
|
|
|
|
|
|
|
|
|
-def fake_gmtime(t=None):
|
|
|
+def fake_gmtime(t: Optional[float]=None) -> time.struct_time:
|
|
|
if t is not None:
|
|
|
return real_gmtime(t)
|
|
|
if _should_use_real_time():
|
|
@@ -201,7 +215,7 @@ def fake_gmtime(t=None):
|
|
|
return get_current_time().timetuple()
|
|
|
|
|
|
|
|
|
-def _get_fake_monotonic():
|
|
|
+def _get_fake_monotonic() -> float:
|
|
|
# For monotonic timers like .monotonic(), .perf_counter(), etc
|
|
|
current_time = get_current_time()
|
|
|
return (
|
|
@@ -210,7 +224,7 @@ def _get_fake_monotonic():
|
|
|
)
|
|
|
|
|
|
|
|
|
-def _get_fake_monotonic_ns():
|
|
|
+def _get_fake_monotonic_ns() -> int:
|
|
|
# For monotonic timers like .monotonic(), .perf_counter(), etc
|
|
|
current_time = get_current_time()
|
|
|
return (
|
|
@@ -219,14 +233,14 @@ def _get_fake_monotonic_ns():
|
|
|
) * 1000
|
|
|
|
|
|
|
|
|
-def fake_monotonic():
|
|
|
+def fake_monotonic() -> float:
|
|
|
if _should_use_real_time():
|
|
|
return real_monotonic()
|
|
|
|
|
|
return _get_fake_monotonic()
|
|
|
|
|
|
|
|
|
-def fake_perf_counter():
|
|
|
+def fake_perf_counter() -> float:
|
|
|
if _should_use_real_time():
|
|
|
return real_perf_counter()
|
|
|
|
|
@@ -234,7 +248,7 @@ def fake_perf_counter():
|
|
|
|
|
|
|
|
|
if _MONOTONIC_NS_PRESENT:
|
|
|
- def fake_monotonic_ns():
|
|
|
+ def fake_monotonic_ns() -> int:
|
|
|
if _should_use_real_time():
|
|
|
return real_monotonic_ns()
|
|
|
|
|
@@ -242,13 +256,13 @@ if _MONOTONIC_NS_PRESENT:
|
|
|
|
|
|
|
|
|
if _PERF_COUNTER_NS_PRESENT:
|
|
|
- def fake_perf_counter_ns():
|
|
|
+ def fake_perf_counter_ns() -> int:
|
|
|
if _should_use_real_time():
|
|
|
return real_perf_counter_ns()
|
|
|
return _get_fake_monotonic_ns()
|
|
|
|
|
|
|
|
|
-def fake_strftime(format, time_to_format=None):
|
|
|
+def fake_strftime(format: Any, time_to_format: Any=None) -> str:
|
|
|
if time_to_format is None:
|
|
|
if not _should_use_real_time():
|
|
|
time_to_format = fake_localtime()
|
|
@@ -259,12 +273,12 @@ def fake_strftime(format, time_to_format=None):
|
|
|
return real_strftime(format, time_to_format)
|
|
|
|
|
|
if real_clock is not None:
|
|
|
- def fake_clock():
|
|
|
+ def fake_clock() -> Any:
|
|
|
if _should_use_real_time():
|
|
|
- return real_clock()
|
|
|
+ return real_clock() # type: ignore
|
|
|
|
|
|
if len(freeze_factories) == 1:
|
|
|
- return 0.0 if not tick_flags[-1] else real_clock()
|
|
|
+ return 0.0 if not tick_flags[-1] else real_clock() # type: ignore
|
|
|
|
|
|
first_frozen_time = freeze_factories[0]()
|
|
|
last_frozen_time = get_current_time()
|
|
@@ -273,22 +287,22 @@ if real_clock is not None:
|
|
|
total_seconds = timedelta.total_seconds()
|
|
|
|
|
|
if tick_flags[-1]:
|
|
|
- total_seconds += real_clock()
|
|
|
+ total_seconds += real_clock() # type: ignore
|
|
|
|
|
|
return total_seconds
|
|
|
|
|
|
|
|
|
class FakeDateMeta(type):
|
|
|
@classmethod
|
|
|
- def __instancecheck__(self, obj):
|
|
|
+ def __instancecheck__(self, obj: Any) -> bool:
|
|
|
return isinstance(obj, real_date)
|
|
|
|
|
|
@classmethod
|
|
|
- def __subclasscheck__(cls, subclass):
|
|
|
+ def __subclasscheck__(cls, subclass: Any) -> bool:
|
|
|
return issubclass(subclass, real_date)
|
|
|
|
|
|
|
|
|
-def datetime_to_fakedatetime(datetime):
|
|
|
+def datetime_to_fakedatetime(datetime: datetime.datetime) -> "FakeDatetime":
|
|
|
return FakeDatetime(datetime.year,
|
|
|
datetime.month,
|
|
|
datetime.day,
|
|
@@ -299,39 +313,39 @@ def datetime_to_fakedatetime(datetime):
|
|
|
datetime.tzinfo)
|
|
|
|
|
|
|
|
|
-def date_to_fakedate(date):
|
|
|
+def date_to_fakedate(date: datetime.date) -> "FakeDate":
|
|
|
return FakeDate(date.year,
|
|
|
date.month,
|
|
|
date.day)
|
|
|
|
|
|
|
|
|
class FakeDate(real_date, metaclass=FakeDateMeta):
|
|
|
- def __add__(self, other):
|
|
|
+ def __add__(self, other: Any) -> "FakeDate":
|
|
|
result = real_date.__add__(self, other)
|
|
|
if result is NotImplemented:
|
|
|
return result
|
|
|
return date_to_fakedate(result)
|
|
|
|
|
|
- def __sub__(self, other):
|
|
|
+ def __sub__(self, other: Any) -> "FakeDate": # type: ignore
|
|
|
result = real_date.__sub__(self, other)
|
|
|
if result is NotImplemented:
|
|
|
- return result
|
|
|
+ return result # type: ignore
|
|
|
if isinstance(result, real_date):
|
|
|
return date_to_fakedate(result)
|
|
|
else:
|
|
|
- return result
|
|
|
+ return result # type: ignore
|
|
|
|
|
|
@classmethod
|
|
|
- def today(cls):
|
|
|
+ def today(cls: Type["FakeDate"]) -> "FakeDate":
|
|
|
result = cls._date_to_freeze() + cls._tz_offset()
|
|
|
return date_to_fakedate(result)
|
|
|
|
|
|
@staticmethod
|
|
|
- def _date_to_freeze():
|
|
|
+ def _date_to_freeze() -> datetime.datetime:
|
|
|
return get_current_time()
|
|
|
|
|
|
@classmethod
|
|
|
- def _tz_offset(cls):
|
|
|
+ def _tz_offset(cls) -> datetime.timedelta:
|
|
|
return tz_offsets[-1]
|
|
|
|
|
|
FakeDate.min = date_to_fakedate(real_date.min)
|
|
@@ -340,50 +354,51 @@ FakeDate.max = date_to_fakedate(real_date.max)
|
|
|
|
|
|
class FakeDatetimeMeta(FakeDateMeta):
|
|
|
@classmethod
|
|
|
- def __instancecheck__(self, obj):
|
|
|
+ def __instancecheck__(self, obj: Any) -> bool:
|
|
|
return isinstance(obj, real_datetime)
|
|
|
|
|
|
@classmethod
|
|
|
- def __subclasscheck__(cls, subclass):
|
|
|
+ def __subclasscheck__(cls, subclass: Any) -> bool:
|
|
|
return issubclass(subclass, real_datetime)
|
|
|
|
|
|
|
|
|
class FakeDatetime(real_datetime, FakeDate, metaclass=FakeDatetimeMeta):
|
|
|
- def __add__(self, other):
|
|
|
+ def __add__(self, other: Any) -> "FakeDatetime": # type: ignore
|
|
|
result = real_datetime.__add__(self, other)
|
|
|
if result is NotImplemented:
|
|
|
return result
|
|
|
return datetime_to_fakedatetime(result)
|
|
|
|
|
|
- def __sub__(self, other):
|
|
|
+ def __sub__(self, other: Any) -> "FakeDatetime": # type: ignore
|
|
|
result = real_datetime.__sub__(self, other)
|
|
|
if result is NotImplemented:
|
|
|
- return result
|
|
|
+ return result # type: ignore
|
|
|
if isinstance(result, real_datetime):
|
|
|
return datetime_to_fakedatetime(result)
|
|
|
else:
|
|
|
- return result
|
|
|
+ return result # type: ignore
|
|
|
|
|
|
- def astimezone(self, tz=None):
|
|
|
+ def astimezone(self, tz: Optional[datetime.tzinfo]=None) -> "FakeDatetime":
|
|
|
if tz is None:
|
|
|
tz = tzlocal()
|
|
|
return datetime_to_fakedatetime(real_datetime.astimezone(self, tz))
|
|
|
|
|
|
@classmethod
|
|
|
- def fromtimestamp(cls, t, tz=None):
|
|
|
+ def fromtimestamp(cls, t: float, tz: Optional[datetime.tzinfo]=None) -> "FakeDatetime":
|
|
|
if tz is None:
|
|
|
- return real_datetime.fromtimestamp(
|
|
|
- t, tz=dateutil.tz.tzoffset("freezegun", cls._tz_offset())
|
|
|
- ).replace(tzinfo=None)
|
|
|
- return datetime_to_fakedatetime(real_datetime.fromtimestamp(t, tz))
|
|
|
+ tz = dateutil.tz.tzoffset("freezegun", cls._tz_offset())
|
|
|
+ result = real_datetime.fromtimestamp(t, tz=tz).replace(tzinfo=None)
|
|
|
+ else:
|
|
|
+ result = real_datetime.fromtimestamp(t, tz)
|
|
|
+ return datetime_to_fakedatetime(result)
|
|
|
|
|
|
- def timestamp(self):
|
|
|
+ def timestamp(self) -> float:
|
|
|
if self.tzinfo is None:
|
|
|
- return (self - _EPOCH - self._tz_offset()).total_seconds()
|
|
|
- return (self - _EPOCHTZ).total_seconds()
|
|
|
+ return (self - _EPOCH - self._tz_offset()).total_seconds() # type: ignore
|
|
|
+ return (self - _EPOCHTZ).total_seconds() # type: ignore
|
|
|
|
|
|
@classmethod
|
|
|
- def now(cls, tz=None):
|
|
|
+ def now(cls, tz: Optional[datetime.tzinfo] = None) -> "FakeDatetime":
|
|
|
now = cls._time_to_freeze() or real_datetime.now()
|
|
|
if tz:
|
|
|
result = tz.fromutc(now.replace(tzinfo=tz)) + cls._tz_offset()
|
|
@@ -391,33 +406,34 @@ class FakeDatetime(real_datetime, FakeDate, metaclass=FakeDatetimeMeta):
|
|
|
result = now + cls._tz_offset()
|
|
|
return datetime_to_fakedatetime(result)
|
|
|
|
|
|
- def date(self):
|
|
|
+ def date(self) -> "FakeDate":
|
|
|
return date_to_fakedate(self)
|
|
|
|
|
|
@property
|
|
|
- def nanosecond(self):
|
|
|
+ def nanosecond(self) -> int:
|
|
|
try:
|
|
|
# noinspection PyUnresolvedReferences
|
|
|
- return real_datetime.nanosecond
|
|
|
+ return real_datetime.nanosecond # type: ignore
|
|
|
except AttributeError:
|
|
|
return 0
|
|
|
|
|
|
@classmethod
|
|
|
- def today(cls):
|
|
|
+ def today(cls) -> "FakeDatetime":
|
|
|
return cls.now(tz=None)
|
|
|
|
|
|
@classmethod
|
|
|
- def utcnow(cls):
|
|
|
+ def utcnow(cls) -> "FakeDatetime":
|
|
|
result = cls._time_to_freeze() or real_datetime.now(datetime.timezone.utc)
|
|
|
return datetime_to_fakedatetime(result)
|
|
|
|
|
|
@staticmethod
|
|
|
- def _time_to_freeze():
|
|
|
+ def _time_to_freeze() -> Optional[datetime.datetime]:
|
|
|
if freeze_factories:
|
|
|
return get_current_time()
|
|
|
+ return None
|
|
|
|
|
|
@classmethod
|
|
|
- def _tz_offset(cls):
|
|
|
+ def _tz_offset(cls) -> datetime.timedelta:
|
|
|
return tz_offsets[-1]
|
|
|
|
|
|
|
|
@@ -425,17 +441,17 @@ FakeDatetime.min = datetime_to_fakedatetime(real_datetime.min)
|
|
|
FakeDatetime.max = datetime_to_fakedatetime(real_datetime.max)
|
|
|
|
|
|
|
|
|
-def convert_to_timezone_naive(time_to_freeze):
|
|
|
+def convert_to_timezone_naive(time_to_freeze: datetime.datetime) -> datetime.datetime:
|
|
|
"""
|
|
|
Converts a potentially timezone-aware datetime to be a naive UTC datetime
|
|
|
"""
|
|
|
if time_to_freeze.tzinfo:
|
|
|
- time_to_freeze -= time_to_freeze.utcoffset()
|
|
|
+ time_to_freeze -= time_to_freeze.utcoffset() # type: ignore
|
|
|
time_to_freeze = time_to_freeze.replace(tzinfo=None)
|
|
|
return time_to_freeze
|
|
|
|
|
|
|
|
|
-def pickle_fake_date(datetime_):
|
|
|
+def pickle_fake_date(datetime_: datetime.date) -> Tuple[Type[FakeDate], Tuple[int, int, int]]:
|
|
|
# A pickle function for FakeDate
|
|
|
return FakeDate, (
|
|
|
datetime_.year,
|
|
@@ -444,7 +460,7 @@ def pickle_fake_date(datetime_):
|
|
|
)
|
|
|
|
|
|
|
|
|
-def pickle_fake_datetime(datetime_):
|
|
|
+def pickle_fake_datetime(datetime_: datetime.datetime) -> Tuple[Type[FakeDatetime], Tuple[int, int, int, int, int, int, int, Optional[datetime.tzinfo]]]:
|
|
|
# A pickle function for FakeDatetime
|
|
|
return FakeDatetime, (
|
|
|
datetime_.year,
|
|
@@ -458,7 +474,7 @@ def pickle_fake_datetime(datetime_):
|
|
|
)
|
|
|
|
|
|
|
|
|
-def _parse_time_to_freeze(time_to_freeze_str):
|
|
|
+def _parse_time_to_freeze(time_to_freeze_str: Optional[_Freezable]) -> datetime.datetime:
|
|
|
"""Parses all the possible inputs for freeze_time
|
|
|
:returns: a naive ``datetime.datetime`` object
|
|
|
"""
|
|
@@ -472,12 +488,12 @@ def _parse_time_to_freeze(time_to_freeze_str):
|
|
|
elif isinstance(time_to_freeze_str, datetime.timedelta):
|
|
|
time_to_freeze = datetime.datetime.now(datetime.timezone.utc) + time_to_freeze_str
|
|
|
else:
|
|
|
- time_to_freeze = parser.parse(time_to_freeze_str)
|
|
|
+ time_to_freeze = parser.parse(time_to_freeze_str) # type: ignore
|
|
|
|
|
|
return convert_to_timezone_naive(time_to_freeze)
|
|
|
|
|
|
|
|
|
-def _parse_tz_offset(tz_offset):
|
|
|
+def _parse_tz_offset(tz_offset: Union[datetime.timedelta, float]) -> datetime.timedelta:
|
|
|
if isinstance(tz_offset, datetime.timedelta):
|
|
|
return tz_offset
|
|
|
else:
|
|
@@ -486,30 +502,44 @@ def _parse_tz_offset(tz_offset):
|
|
|
|
|
|
class TickingDateTimeFactory:
|
|
|
|
|
|
- def __init__(self, time_to_freeze, start):
|
|
|
+ def __init__(self, time_to_freeze: datetime.datetime, start: datetime.datetime):
|
|
|
self.time_to_freeze = time_to_freeze
|
|
|
self.start = start
|
|
|
|
|
|
- def __call__(self):
|
|
|
+ def __call__(self) -> datetime.datetime:
|
|
|
return self.time_to_freeze + (real_datetime.now() - self.start)
|
|
|
|
|
|
+ def tick(self, delta: Union[datetime.timedelta, int]=datetime.timedelta(seconds=1)) -> datetime.datetime:
|
|
|
+ if isinstance(delta, numbers.Real):
|
|
|
+ # noinspection PyTypeChecker
|
|
|
+ self.move_to(self.time_to_freeze + datetime.timedelta(seconds=delta))
|
|
|
+ else:
|
|
|
+ self.move_to(self.time_to_freeze + delta) # type: ignore
|
|
|
+ return self.time_to_freeze
|
|
|
+
|
|
|
+ def move_to(self, target_datetime: _Freezable) -> None:
|
|
|
+ """Moves frozen date to the given ``target_datetime``"""
|
|
|
+ self.start = real_datetime.now()
|
|
|
+ self.time_to_freeze = _parse_time_to_freeze(target_datetime)
|
|
|
+
|
|
|
|
|
|
class FrozenDateTimeFactory:
|
|
|
|
|
|
- def __init__(self, time_to_freeze):
|
|
|
+ def __init__(self, time_to_freeze: datetime.datetime):
|
|
|
self.time_to_freeze = time_to_freeze
|
|
|
|
|
|
- def __call__(self):
|
|
|
+ def __call__(self) -> datetime.datetime:
|
|
|
return self.time_to_freeze
|
|
|
|
|
|
- def tick(self, delta=datetime.timedelta(seconds=1)):
|
|
|
+ def tick(self, delta: Union[datetime.timedelta, int]=datetime.timedelta(seconds=1)) -> datetime.datetime:
|
|
|
if isinstance(delta, numbers.Real):
|
|
|
# noinspection PyTypeChecker
|
|
|
self.time_to_freeze += datetime.timedelta(seconds=delta)
|
|
|
else:
|
|
|
- self.time_to_freeze += delta
|
|
|
+ self.time_to_freeze += delta # type: ignore
|
|
|
+ return self.time_to_freeze
|
|
|
|
|
|
- def move_to(self, target_datetime):
|
|
|
+ def move_to(self, target_datetime: _Freezable) -> None:
|
|
|
"""Moves frozen date to the given ``target_datetime``"""
|
|
|
target_datetime = _parse_time_to_freeze(target_datetime)
|
|
|
delta = target_datetime - self.time_to_freeze
|
|
@@ -518,24 +548,25 @@ class FrozenDateTimeFactory:
|
|
|
|
|
|
class StepTickTimeFactory:
|
|
|
|
|
|
- def __init__(self, time_to_freeze, step_width):
|
|
|
+ def __init__(self, time_to_freeze: datetime.datetime, step_width: float):
|
|
|
self.time_to_freeze = time_to_freeze
|
|
|
self.step_width = step_width
|
|
|
|
|
|
- def __call__(self):
|
|
|
+ def __call__(self) -> datetime.datetime:
|
|
|
return_time = self.time_to_freeze
|
|
|
self.tick()
|
|
|
return return_time
|
|
|
|
|
|
- def tick(self, delta=None):
|
|
|
+ def tick(self, delta: Union[datetime.timedelta, int, None]=None) -> datetime.datetime:
|
|
|
if not delta:
|
|
|
delta = datetime.timedelta(seconds=self.step_width)
|
|
|
- self.time_to_freeze += delta
|
|
|
+ self.time_to_freeze += delta # type: ignore
|
|
|
+ return self.time_to_freeze
|
|
|
|
|
|
- def update_step_width(self, step_width):
|
|
|
+ def update_step_width(self, step_width: float) -> None:
|
|
|
self.step_width = step_width
|
|
|
|
|
|
- def move_to(self, target_datetime):
|
|
|
+ def move_to(self, target_datetime: _Freezable) -> None:
|
|
|
"""Moves frozen date to the given ``target_datetime``"""
|
|
|
target_datetime = _parse_time_to_freeze(target_datetime)
|
|
|
delta = target_datetime - self.time_to_freeze
|
|
@@ -544,26 +575,48 @@ class StepTickTimeFactory:
|
|
|
|
|
|
class _freeze_time:
|
|
|
|
|
|
- def __init__(self, time_to_freeze_str, tz_offset, ignore, tick, as_arg, as_kwarg, auto_tick_seconds, real_asyncio):
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ time_to_freeze_str: Optional[_Freezable],
|
|
|
+ tz_offset: Union[int, datetime.timedelta],
|
|
|
+ ignore: List[str],
|
|
|
+ tick: bool,
|
|
|
+ as_arg: bool,
|
|
|
+ as_kwarg: str,
|
|
|
+ auto_tick_seconds: float,
|
|
|
+ real_asyncio: Optional[bool],
|
|
|
+ ):
|
|
|
self.time_to_freeze = _parse_time_to_freeze(time_to_freeze_str)
|
|
|
self.tz_offset = _parse_tz_offset(tz_offset)
|
|
|
self.ignore = tuple(ignore)
|
|
|
self.tick = tick
|
|
|
self.auto_tick_seconds = auto_tick_seconds
|
|
|
- self.undo_changes = []
|
|
|
- self.modules_at_start = set()
|
|
|
+ self.undo_changes: List[Tuple[types.ModuleType, str, Any]] = []
|
|
|
+ self.modules_at_start: Set[str] = set()
|
|
|
self.as_arg = as_arg
|
|
|
self.as_kwarg = as_kwarg
|
|
|
self.real_asyncio = real_asyncio
|
|
|
|
|
|
- def __call__(self, func):
|
|
|
+ @overload
|
|
|
+ def __call__(self, func: Type[T2]) -> Type[T2]:
|
|
|
+ ...
|
|
|
+
|
|
|
+ @overload
|
|
|
+ def __call__(self, func: "Callable[P, Awaitable[Any]]") -> "Callable[P, Awaitable[Any]]":
|
|
|
+ ...
|
|
|
+
|
|
|
+ @overload
|
|
|
+ def __call__(self, func: "Callable[P, T]") -> "Callable[P, T]":
|
|
|
+ ...
|
|
|
+
|
|
|
+ def __call__(self, func: Union[Type[T2], "Callable[P, Awaitable[Any]]", "Callable[P, T]"]) -> Union[Type[T2], "Callable[P, Awaitable[Any]]", "Callable[P, T]"]: # type: ignore
|
|
|
if inspect.isclass(func):
|
|
|
return self.decorate_class(func)
|
|
|
elif inspect.iscoroutinefunction(func):
|
|
|
return self.decorate_coroutine(func)
|
|
|
- return self.decorate_callable(func)
|
|
|
+ return self.decorate_callable(func) # type: ignore
|
|
|
|
|
|
- def decorate_class(self, klass):
|
|
|
+ def decorate_class(self, klass: Type[T2]) -> Type[T2]:
|
|
|
if issubclass(klass, unittest.TestCase):
|
|
|
# If it's a TestCase, we freeze time around setup and teardown, as well
|
|
|
# as for every test case. This requires some care to avoid freezing
|
|
@@ -574,41 +627,39 @@ class _freeze_time:
|
|
|
orig_tearDownClass = klass.tearDownClass
|
|
|
|
|
|
# noinspection PyDecorator
|
|
|
- @classmethod
|
|
|
- def setUpClass(cls):
|
|
|
+ @classmethod # type: ignore
|
|
|
+ def setUpClass(cls: type) -> None:
|
|
|
self.start()
|
|
|
if orig_setUpClass is not None:
|
|
|
orig_setUpClass()
|
|
|
self.stop()
|
|
|
|
|
|
# noinspection PyDecorator
|
|
|
- @classmethod
|
|
|
- def tearDownClass(cls):
|
|
|
+ @classmethod # type: ignore
|
|
|
+ def tearDownClass(cls: type) -> None:
|
|
|
self.start()
|
|
|
if orig_tearDownClass is not None:
|
|
|
orig_tearDownClass()
|
|
|
self.stop()
|
|
|
|
|
|
- klass.setUpClass = setUpClass
|
|
|
- klass.tearDownClass = tearDownClass
|
|
|
+ klass.setUpClass = setUpClass # type: ignore
|
|
|
+ klass.tearDownClass = tearDownClass # type: ignore
|
|
|
|
|
|
orig_setUp = klass.setUp
|
|
|
orig_tearDown = klass.tearDown
|
|
|
|
|
|
- def setUp(*args, **kwargs):
|
|
|
+ def setUp(*args: Any, **kwargs: Any) -> None:
|
|
|
self.start()
|
|
|
if orig_setUp is not None:
|
|
|
orig_setUp(*args, **kwargs)
|
|
|
|
|
|
- def tearDown(*args, **kwargs):
|
|
|
+ def tearDown(*args: Any, **kwargs: Any) -> None:
|
|
|
if orig_tearDown is not None:
|
|
|
orig_tearDown(*args, **kwargs)
|
|
|
self.stop()
|
|
|
|
|
|
- klass.setUp = setUp
|
|
|
- klass.tearDown = tearDown
|
|
|
-
|
|
|
- return klass
|
|
|
+ klass.setUp = setUp # type: ignore[method-assign]
|
|
|
+ klass.tearDown = tearDown # type: ignore[method-assign]
|
|
|
|
|
|
else:
|
|
|
|
|
@@ -629,18 +680,18 @@ class _freeze_time:
|
|
|
except (AttributeError, TypeError):
|
|
|
# Sometimes we can't set this for built-in types and custom callables
|
|
|
continue
|
|
|
- return klass
|
|
|
+ return klass
|
|
|
|
|
|
- def __enter__(self):
|
|
|
+ def __enter__(self) -> Union[StepTickTimeFactory, TickingDateTimeFactory, FrozenDateTimeFactory]:
|
|
|
return self.start()
|
|
|
|
|
|
- def __exit__(self, *args):
|
|
|
+ def __exit__(self, *args: Any) -> None:
|
|
|
self.stop()
|
|
|
|
|
|
- def start(self):
|
|
|
+ def start(self) -> Union[StepTickTimeFactory, TickingDateTimeFactory, FrozenDateTimeFactory]:
|
|
|
|
|
|
if self.auto_tick_seconds:
|
|
|
- freeze_factory = StepTickTimeFactory(self.time_to_freeze, self.auto_tick_seconds)
|
|
|
+ freeze_factory: Union[StepTickTimeFactory, TickingDateTimeFactory, FrozenDateTimeFactory] = StepTickTimeFactory(self.time_to_freeze, self.auto_tick_seconds)
|
|
|
elif self.tick:
|
|
|
freeze_factory = TickingDateTimeFactory(self.time_to_freeze, real_datetime.now())
|
|
|
else:
|
|
@@ -656,19 +707,19 @@ class _freeze_time:
|
|
|
return freeze_factory
|
|
|
|
|
|
# Change the modules
|
|
|
- datetime.datetime = FakeDatetime
|
|
|
- datetime.date = FakeDate
|
|
|
+ datetime.datetime = FakeDatetime # type: ignore[misc]
|
|
|
+ datetime.date = FakeDate # type: ignore[misc]
|
|
|
|
|
|
time.time = fake_time
|
|
|
time.monotonic = fake_monotonic
|
|
|
time.perf_counter = fake_perf_counter
|
|
|
- time.localtime = fake_localtime
|
|
|
- time.gmtime = fake_gmtime
|
|
|
- time.strftime = fake_strftime
|
|
|
+ time.localtime = fake_localtime # type: ignore
|
|
|
+ time.gmtime = fake_gmtime # type: ignore
|
|
|
+ time.strftime = fake_strftime # type: ignore
|
|
|
if uuid_generate_time_attr:
|
|
|
setattr(uuid, uuid_generate_time_attr, None)
|
|
|
- uuid._UuidCreate = None
|
|
|
- uuid._last_timestamp = None
|
|
|
+ uuid._UuidCreate = None # type: ignore[attr-defined]
|
|
|
+ uuid._last_timestamp = None # type: ignore[attr-defined]
|
|
|
|
|
|
copyreg.dispatch_table[real_datetime] = pickle_fake_datetime
|
|
|
copyreg.dispatch_table[real_date] = pickle_fake_date
|
|
@@ -699,10 +750,10 @@ class _freeze_time:
|
|
|
|
|
|
if real_clock is not None:
|
|
|
# time.clock is deprecated and was removed in Python 3.8
|
|
|
- time.clock = fake_clock
|
|
|
+ time.clock = fake_clock # type: ignore[attr-defined]
|
|
|
to_patch.append(('real_clock', real_clock, fake_clock))
|
|
|
|
|
|
- self.fake_names = tuple(fake.__name__ for real_name, real, fake in to_patch)
|
|
|
+ self.fake_names = tuple(fake.__name__ for real_name, real, fake in to_patch) # type: ignore
|
|
|
self.reals = {id(fake): real for real_name, real, fake in to_patch}
|
|
|
fakes = {id(real): fake for real_name, real, fake in to_patch}
|
|
|
add_change = self.undo_changes.append
|
|
@@ -741,20 +792,20 @@ class _freeze_time:
|
|
|
event_loop = asyncio.new_event_loop()
|
|
|
event_loop.close()
|
|
|
EventLoopClass = type(event_loop)
|
|
|
- add_change((EventLoopClass, "time", EventLoopClass.time))
|
|
|
- EventLoopClass.time = lambda self: real_monotonic()
|
|
|
+ add_change((EventLoopClass, "time", EventLoopClass.time)) # type: ignore
|
|
|
+ EventLoopClass.time = lambda self: real_monotonic() # type: ignore[method-assign]
|
|
|
|
|
|
return freeze_factory
|
|
|
|
|
|
- def stop(self):
|
|
|
+ def stop(self) -> None:
|
|
|
freeze_factories.pop()
|
|
|
ignore_lists.pop()
|
|
|
tick_flags.pop()
|
|
|
tz_offsets.pop()
|
|
|
|
|
|
if not freeze_factories:
|
|
|
- datetime.datetime = real_datetime
|
|
|
- datetime.date = real_date
|
|
|
+ datetime.datetime = real_datetime # type: ignore[misc]
|
|
|
+ datetime.date = real_date # type: ignore[misc]
|
|
|
copyreg.dispatch_table.pop(real_datetime)
|
|
|
copyreg.dispatch_table.pop(real_date)
|
|
|
for module_or_object, attribute, original_value in self.undo_changes:
|
|
@@ -794,7 +845,7 @@ class _freeze_time:
|
|
|
time.gmtime = real_gmtime
|
|
|
time.localtime = real_localtime
|
|
|
time.strftime = real_strftime
|
|
|
- time.clock = real_clock
|
|
|
+ time.clock = real_clock # type: ignore[attr-defined]
|
|
|
|
|
|
if _TIME_NS_PRESENT:
|
|
|
time.time_ns = real_time_ns
|
|
@@ -807,33 +858,33 @@ class _freeze_time:
|
|
|
|
|
|
if uuid_generate_time_attr:
|
|
|
setattr(uuid, uuid_generate_time_attr, real_uuid_generate_time)
|
|
|
- uuid._UuidCreate = real_uuid_create
|
|
|
- uuid._last_timestamp = None
|
|
|
+ uuid._UuidCreate = real_uuid_create # type: ignore[attr-defined]
|
|
|
+ uuid._last_timestamp = None # type: ignore[attr-defined]
|
|
|
|
|
|
- def decorate_coroutine(self, coroutine):
|
|
|
+ def decorate_coroutine(self, coroutine: "Callable[P, Awaitable[T]]") -> "Callable[P, Awaitable[T]]":
|
|
|
return wrap_coroutine(self, coroutine)
|
|
|
|
|
|
- def decorate_callable(self, func):
|
|
|
- def wrapper(*args, **kwargs):
|
|
|
+ def decorate_callable(self, func: "Callable[P, T]") -> "Callable[P, T]":
|
|
|
+ @functools.wraps(func)
|
|
|
+ def wrapper(*args: "P.args", **kwargs: "P.kwargs") -> T:
|
|
|
with self as time_factory:
|
|
|
if self.as_arg and self.as_kwarg:
|
|
|
assert False, "You can't specify both as_arg and as_kwarg at the same time. Pick one."
|
|
|
elif self.as_arg:
|
|
|
- result = func(time_factory, *args, **kwargs)
|
|
|
+ result = func(time_factory, *args, **kwargs) # type: ignore
|
|
|
elif self.as_kwarg:
|
|
|
kwargs[self.as_kwarg] = time_factory
|
|
|
result = func(*args, **kwargs)
|
|
|
else:
|
|
|
result = func(*args, **kwargs)
|
|
|
return result
|
|
|
- functools.update_wrapper(wrapper, func)
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
-def freeze_time(time_to_freeze=None, tz_offset=0, ignore=None, tick=False, as_arg=False, as_kwarg='',
|
|
|
- auto_tick_seconds=0, real_asyncio=False):
|
|
|
- acceptable_times = (type(None), str, datetime.date, datetime.timedelta,
|
|
|
+def freeze_time(time_to_freeze: Optional[_Freezable]=None, tz_offset: Union[int, datetime.timedelta]=0, ignore: Optional[List[str]]=None, tick: bool=False, as_arg: bool=False, as_kwarg: str='',
|
|
|
+ auto_tick_seconds: float=0, real_asyncio: bool=False) -> _freeze_time:
|
|
|
+ acceptable_times: Any = (type(None), str, datetime.date, datetime.timedelta,
|
|
|
types.FunctionType, types.GeneratorType)
|
|
|
|
|
|
if MayaDT is not None:
|
|
@@ -883,10 +934,10 @@ except ImportError:
|
|
|
pass
|
|
|
else:
|
|
|
# These are copied from Python sqlite3.dbapi2
|
|
|
- def adapt_date(val):
|
|
|
+ def adapt_date(val: datetime.date) -> str:
|
|
|
return val.isoformat()
|
|
|
|
|
|
- def adapt_datetime(val):
|
|
|
+ def adapt_datetime(val: datetime.datetime) -> str:
|
|
|
return val.isoformat(" ")
|
|
|
|
|
|
sqlite3.register_adapter(FakeDate, adapt_date)
|