functional.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. General functional-style helpers for disttrial.
  5. """
  6. from functools import partial, wraps
  7. from typing import Awaitable, Callable, Iterable, Optional, TypeVar
  8. from twisted.internet.defer import Deferred, succeed
  9. _A = TypeVar("_A")
  10. _B = TypeVar("_B")
  11. _C = TypeVar("_C")
  12. def fromOptional(default: _A, optional: Optional[_A]) -> _A:
  13. """
  14. Get a definite value from an optional value.
  15. @param default: The value to return if the optional value is missing.
  16. @param optional: The optional value to return if it exists.
  17. """
  18. if optional is None:
  19. return default
  20. return optional
  21. def takeWhile(condition: Callable[[_A], bool], xs: Iterable[_A]) -> Iterable[_A]:
  22. """
  23. :return: An iterable over C{xs} that stops when C{condition} returns
  24. ``False`` based on the value of iterated C{xs}.
  25. """
  26. for x in xs:
  27. if condition(x):
  28. yield x
  29. else:
  30. break
  31. async def sequence(a: Awaitable[_A], b: Awaitable[_B]) -> _B:
  32. """
  33. Wait for one action to complete and then another.
  34. If either action fails, failure is propagated. If the first action fails,
  35. the second action is not waited on.
  36. """
  37. await a
  38. return await b
  39. def flip(f: Callable[[_A, _B], _C]) -> Callable[[_B, _A], _C]:
  40. """
  41. Create a function like another but with the order of the first two
  42. arguments flipped.
  43. """
  44. @wraps(f)
  45. def g(b, a):
  46. return f(a, b)
  47. return g
  48. def compose(fx: Callable[[_B], _C], fy: Callable[[_A], _B]) -> Callable[[_A], _C]:
  49. """
  50. Create a function that calls one function with an argument and then
  51. another function with the result of the first function.
  52. """
  53. @wraps(fx)
  54. @wraps(fy)
  55. def g(a):
  56. return fx(fy(a))
  57. return g
  58. # Discard the result of an awaitable and substitute None in its place.
  59. discardResult: Callable[[Awaitable[_A]], Deferred[None]] = compose(
  60. Deferred.fromCoroutine,
  61. partial(flip(sequence), succeed(None)),
  62. )
  63. async def iterateWhile(
  64. predicate: Callable[[_A], bool],
  65. action: Callable[[], Awaitable[_A]],
  66. ) -> _A:
  67. """
  68. Call a function repeatedly until its result fails to satisfy a predicate.
  69. @param predicate: The check to apply.
  70. @param action: The function to call.
  71. @return: The result of C{action} which did not satisfy C{predicate}.
  72. """
  73. while True:
  74. result = await action()
  75. if not predicate(result):
  76. return result
  77. def countingCalls(f: Callable[[int], _A]) -> Callable[[], _A]:
  78. """
  79. Wrap a function with another that automatically passes an integer counter
  80. of the number of calls that have gone through the wrapper.
  81. """
  82. counter = 0
  83. def g() -> _A:
  84. nonlocal counter
  85. try:
  86. result = f(counter)
  87. finally:
  88. counter += 1
  89. return result
  90. return g