cache.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. from collections import deque
  2. from functools import wraps
  3. from typing import Any, Callable, Deque, Dict, Generic, Hashable, Tuple, TypeVar, cast
  4. __all__ = [
  5. "SimpleCache",
  6. "FastDictCache",
  7. "memoized",
  8. ]
  9. _T = TypeVar("_T", bound=Hashable)
  10. _U = TypeVar("_U")
  11. class SimpleCache(Generic[_T, _U]):
  12. """
  13. Very simple cache that discards the oldest item when the cache size is
  14. exceeded.
  15. :param maxsize: Maximum size of the cache. (Don't make it too big.)
  16. """
  17. def __init__(self, maxsize: int = 8) -> None:
  18. assert maxsize > 0
  19. self._data: Dict[_T, _U] = {}
  20. self._keys: Deque[_T] = deque()
  21. self.maxsize: int = maxsize
  22. def get(self, key: _T, getter_func: Callable[[], _U]) -> _U:
  23. """
  24. Get object from the cache.
  25. If not found, call `getter_func` to resolve it, and put that on the top
  26. of the cache instead.
  27. """
  28. # Look in cache first.
  29. try:
  30. return self._data[key]
  31. except KeyError:
  32. # Not found? Get it.
  33. value = getter_func()
  34. self._data[key] = value
  35. self._keys.append(key)
  36. # Remove the oldest key when the size is exceeded.
  37. if len(self._data) > self.maxsize:
  38. key_to_remove = self._keys.popleft()
  39. if key_to_remove in self._data:
  40. del self._data[key_to_remove]
  41. return value
  42. def clear(self) -> None:
  43. "Clear cache."
  44. self._data = {}
  45. self._keys = deque()
  46. _K = TypeVar("_K", bound=Tuple[Hashable, ...])
  47. _V = TypeVar("_V")
  48. class FastDictCache(Dict[_K, _V]):
  49. """
  50. Fast, lightweight cache which keeps at most `size` items.
  51. It will discard the oldest items in the cache first.
  52. The cache is a dictionary, which doesn't keep track of access counts.
  53. It is perfect to cache little immutable objects which are not expensive to
  54. create, but where a dictionary lookup is still much faster than an object
  55. instantiation.
  56. :param get_value: Callable that's called in case of a missing key.
  57. """
  58. # NOTE: This cache is used to cache `prompt_toolkit.layout.screen.Char` and
  59. # `prompt_toolkit.Document`. Make sure to keep this really lightweight.
  60. # Accessing the cache should stay faster than instantiating new
  61. # objects.
  62. # (Dictionary lookups are really fast.)
  63. # SimpleCache is still required for cases where the cache key is not
  64. # the same as the arguments given to the function that creates the
  65. # value.)
  66. def __init__(self, get_value: Callable[..., _V], size: int = 1000000) -> None:
  67. assert size > 0
  68. self._keys: Deque[_K] = deque()
  69. self.get_value = get_value
  70. self.size = size
  71. def __missing__(self, key: _K) -> _V:
  72. # Remove the oldest key when the size is exceeded.
  73. if len(self) > self.size:
  74. key_to_remove = self._keys.popleft()
  75. if key_to_remove in self:
  76. del self[key_to_remove]
  77. result = self.get_value(*key)
  78. self[key] = result
  79. self._keys.append(key)
  80. return result
  81. _F = TypeVar("_F", bound=Callable[..., object])
  82. def memoized(maxsize: int = 1024) -> Callable[[_F], _F]:
  83. """
  84. Memoization decorator for immutable classes and pure functions.
  85. """
  86. def decorator(obj: _F) -> _F:
  87. cache: SimpleCache[Hashable, Any] = SimpleCache(maxsize=maxsize)
  88. @wraps(obj)
  89. def new_callable(*a: Any, **kw: Any) -> Any:
  90. def create_new() -> Any:
  91. return obj(*a, **kw)
  92. key = (a, tuple(sorted(kw.items())))
  93. return cache.get(key, create_new)
  94. return cast(_F, new_callable)
  95. return decorator