@@ -16,46 +16,79 @@ from contextlib import contextmanager
from datetime import timedelta
from enum import Enum
from random import Random, getrandbits
-from typing import Union
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Final,
+ FrozenSet,
+ Generator,
+ List,
+ Literal,
+ NoReturn,
+ Optional,
+ Set,
+ Tuple,
+ Union,
+ overload,
import attr
from hypothesis import HealthCheck, Phase, Verbosity, settings as Settings
from hypothesis._settings import local_settings
+from hypothesis.database import ExampleDatabase
from hypothesis.errors import InvalidArgument, StopTest
from hypothesis.internal.cache import LRUReusedCache
-from hypothesis.internal.compat import ceil, int_from_bytes
+from hypothesis.internal.compat import (
+ NotRequired,
+ TypeAlias,
+ TypedDict,
+ ceil,
+ int_from_bytes,
+ override,
from hypothesis.internal.conjecture.data import (
+ Example,
+ InterestingOrigin,
+ IRNode,
+ _Overrun,
-from hypothesis.internal.conjecture.datatree import DataTree, PreviouslyUnseenBehaviour
+from hypothesis.internal.conjecture.datatree import (
+ DataTree,
+ PreviouslyUnseenBehaviour,
+ TreeRecordingObserver,
from hypothesis.internal.conjecture.junkdrawer import clamp, ensure_free_stackframes
from hypothesis.internal.conjecture.pareto import NO_SCORE, ParetoFront, ParetoOptimiser
from hypothesis.internal.conjecture.shrinker import Shrinker, sort_key
from hypothesis.internal.healthcheck import fail_health_check
from hypothesis.reporting import base_report, report
-CACHE_SIZE = 10000
-BUFFER_SIZE = 8 * 1024
+MAX_SHRINKS: Final[int] = 500
+CACHE_SIZE: Final[int] = 10000
+MUTATION_POOL_SIZE: Final[int] = 100
+MIN_TEST_CALLS: Final[int] = 10
+BUFFER_SIZE: Final[int] = 8 * 1024
# If the shrinking phase takes more than five minutes, abort it early and print
# a warning. Many CI systems will kill a build after around ten minutes with
# no output, and appearing to hang isn't great for interactive use either -
# showing partially-shrunk examples is better than quitting with no examples!
# (but make it monkeypatchable, for the rare users who need to keep on shrinking)
+MAX_SHRINKING_SECONDS: Final[int] = 300
+Ls: TypeAlias = List["Ls | int"]
@@ -63,15 +96,15 @@ class HealthCheckState:
valid_examples: int = attr.ib(default=0)
invalid_examples: int = attr.ib(default=0)
overrun_examples: int = attr.ib(default=0)
- draw_times: "defaultdict[str, list[float]]" = attr.ib(
+ draw_times: "defaultdict[str, List[float]]" = attr.ib(
factory=lambda: defaultdict(list)
- def total_draw_time(self):
+ def total_draw_time(self) -> float:
return math.fsum(sum(self.draw_times.values(), start=[]))
- def timing_report(self):
+ def timing_report(self) -> str:
"""Return a terminal report describing what was slow."""
if not self.draw_times:
return ""
@@ -109,7 +142,7 @@ class ExitReason(Enum):
flaky = "test was flaky"
very_slow_shrinking = "shrinking was very slow"
- def describe(self, settings):
+ def describe(self, settings: Settings) -> str:
return self.value.format(s=settings)
@@ -131,57 +164,90 @@ def _get_provider(backend: str) -> Union[type, PrimitiveProvider]:
+class CallStats(TypedDict):
+ status: str
+ runtime: float
+ drawtime: float
+ events: List[str]
+PhaseStatistics = TypedDict(
+ "PhaseStatistics",
+ {
+ "duration-seconds": float,
+ "test-cases": List[CallStats],
+ "distinct-failures": int,
+ "shrinks-successful": int,
+ },
+StatisticsDict = TypedDict(
+ "StatisticsDict",
+ {
+ "generate-phase": NotRequired[PhaseStatistics],
+ "reuse-phase": NotRequired[PhaseStatistics],
+ "shrink-phase": NotRequired[PhaseStatistics],
+ "stopped-because": NotRequired[str],
+ "targets": NotRequired[Dict[str, float]],
+ },
class ConjectureRunner:
def __init__(
- test_function,
+ test_function: Callable[[ConjectureData], None],
- settings=None,
- random=None,
- database_key=None,
- ignore_limits=False,
- ):
- self._test_function = test_function
- self.settings = settings or Settings()
- self.shrinks = 0
- self.finish_shrinking_deadline = None
- self.call_count = 0
- self.valid_examples = 0
- self.random = random or Random(getrandbits(128))
- self.database_key = database_key
- self.ignore_limits = ignore_limits
+ settings: Optional[Settings] = None,
+ random: Optional[Random] = None,
+ database_key: Optional[bytes] = None,
+ ignore_limits: bool = False,
+ ) -> None:
+ self._test_function: Callable[[ConjectureData], None] = test_function
+ self.settings: Settings = settings or Settings()
+ self.shrinks: int = 0
+ self.finish_shrinking_deadline: Optional[float] = None
+ self.call_count: int = 0
+ self.valid_examples: int = 0
+ self.random: Random = random or Random(getrandbits(128))
+ self.database_key: Optional[bytes] = database_key
+ self.ignore_limits: bool = ignore_limits
# Global dict of per-phase statistics, and a list of per-call stats
# which transfer to the global dict at the end of each phase.
- self._current_phase = "(not a phase)"
- self.statistics = {}
- self.stats_per_test_case = []
+ self._current_phase: str = "(not a phase)"
+ self.statistics: StatisticsDict = {}
+ self.stats_per_test_case: List[CallStats] = []
- self.interesting_examples = {}
+ # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests.
+ self.interesting_examples: Dict[InterestingOrigin, ConjectureResult] = {}
# We use call_count because there may be few possible valid_examples.
- self.first_bug_found_at = None
- self.last_bug_found_at = None
+ self.first_bug_found_at: Optional[int] = None
+ self.last_bug_found_at: Optional[int] = None
- self.shrunk_examples = set()
+ # At runtime, the keys are only ever type `InterestingOrigin`, but can be `None` during tests.
+ self.shrunk_examples: Set[Optional[InterestingOrigin]] = set()
- self.health_check_state = None
+ self.health_check_state: Optional[HealthCheckState] = None
- self.tree = DataTree()
+ self.tree: DataTree = DataTree()
- self.provider = _get_provider(self.settings.backend)
+ self.provider: Union[type, PrimitiveProvider] = _get_provider(
+ self.settings.backend
+ )
- self.best_observed_targets = defaultdict(lambda: NO_SCORE)
- self.best_examples_of_observed_targets = {}
+ self.best_observed_targets: "defaultdict[str, float]" = defaultdict(
+ lambda: NO_SCORE
+ )
+ self.best_examples_of_observed_targets: Dict[str, ConjectureResult] = {}
# We keep the pareto front in the example database if we have one. This
# is only marginally useful at present, but speeds up local development
# because it means that large targets will be quickly surfaced in your
# testing.
+ self.pareto_front: Optional[ParetoFront] = None
if self.database_key is not None and self.settings.database is not None:
self.pareto_front = ParetoFront(self.random)
- else:
- self.pareto_front = None
# We want to be able to get the ConjectureData object that results
# from running a buffer without recalculating, especially during
@@ -190,24 +256,28 @@ class ConjectureRunner:
self.__data_cache = LRUReusedCache(CACHE_SIZE)
self.__data_cache_ir = LRUReusedCache(CACHE_SIZE)
- self.__pending_call_explanation = None
- self._switch_to_hypothesis_provider = False
+ self.__pending_call_explanation: Optional[str] = None
+ self._switch_to_hypothesis_provider: bool = False
- def explain_next_call_as(self, explanation):
+ def explain_next_call_as(self, explanation: str) -> None:
self.__pending_call_explanation = explanation
- def clear_call_explanation(self):
+ def clear_call_explanation(self) -> None:
self.__pending_call_explanation = None
- def _log_phase_statistics(self, phase):
+ def _log_phase_statistics(
+ self, phase: Literal["reuse", "generate", "shrink"]
+ ) -> Generator[None, None, None]:
start_time = time.perf_counter()
self._current_phase = phase
- self.statistics[phase + "-phase"] = {
+ # We ignore the mypy type error here. Because `phase` is a string literal and "-phase" is a string literal
+ # as well, the concatenation will always be valid key in the dictionary.
+ self.statistics[phase + "-phase"] = { # type: ignore
"duration-seconds": time.perf_counter() - start_time,
"test-cases": list(self.stats_per_test_case),
"distinct-failures": len(self.interesting_examples),
@@ -215,13 +285,13 @@ class ConjectureRunner:
- def should_optimise(self):
+ def should_optimise(self) -> bool:
return Phase.target in self.settings.phases
- def __tree_is_exhausted(self):
+ def __tree_is_exhausted(self) -> bool:
return self.tree.is_exhausted and self.settings.backend == "hypothesis"
- def __stoppable_test_function(self, data):
+ def __stoppable_test_function(self, data: ConjectureData) -> None:
"""Run ``self._test_function``, but convert a ``StopTest`` exception
into a normal return and avoid raising Flaky for RecursionErrors.
@@ -242,15 +312,23 @@ class ConjectureRunner:
# correct engine.
- def _cache_key_ir(self, *, nodes=None, data=None):
+ def _cache_key_ir(
+ self,
+ *,
+ nodes: Optional[List[IRNode]] = None,
+ data: Union[ConjectureData, ConjectureResult, None] = None,
+ ) -> Tuple[Tuple[Any, ...], ...]:
assert (nodes is not None) ^ (data is not None)
extension = []
if data is not None:
nodes = data.examples.ir_tree_nodes
if data.invalid_at is not None:
# if we're invalid then we should have at least one node left (the invalid one).
+ assert isinstance(data, ConjectureData)
+ assert data.ir_tree_nodes is not None
assert data._node_index < len(data.ir_tree_nodes)
extension = [data.ir_tree_nodes[data._node_index]]
+ assert nodes is not None
# intentionally drop was_forced from equality here, because the was_forced
# of node prefixes on ConjectureData has no impact on that data's result
@@ -263,7 +341,7 @@ class ConjectureRunner:
for node in nodes + extension
- def _cache(self, data):
+ def _cache(self, data: Union[ConjectureData, ConjectureResult]) -> None:
result = data.as_result()
# when we shrink, we try out of bounds things, which can lead to the same
# data.buffer having multiple outcomes. eg data.buffer=b'' is Status.OVERRUN
@@ -281,7 +359,9 @@ class ConjectureRunner:
key = self._cache_key_ir(data=data)
self.__data_cache_ir[key] = result
- def cached_test_function_ir(self, nodes):
+ def cached_test_function_ir(
+ self, nodes: List[IRNode]
+ ) -> Union[ConjectureResult, _Overrun]:
key = self._cache_key_ir(nodes=nodes)
return self.__data_cache_ir[key]
@@ -307,7 +387,7 @@ class ConjectureRunner:
return data.as_result()
- def test_function(self, data):
+ def test_function(self, data: ConjectureData) -> None:
if self.__pending_call_explanation is not None:
self.__pending_call_explanation = None
@@ -328,7 +408,7 @@ class ConjectureRunner:
# the KeyboardInterrupt, never continue to the code below.
if not interrupted: # pragma: no branch
- call_stats = {
+ call_stats: CallStats = {
"status": data.status.name.lower(),
"runtime": data.finish_time - data.start_time,
"drawtime": math.fsum(data.draw_times.values()),
@@ -355,7 +435,9 @@ class ConjectureRunner:
self.best_observed_targets[k] = max(self.best_observed_targets[k], v)
if k not in self.best_examples_of_observed_targets:
- self.best_examples_of_observed_targets[k] = data.as_result()
+ data_as_result = data.as_result()
+ assert not isinstance(data_as_result, _Overrun)
+ self.best_examples_of_observed_targets[k] = data_as_result
existing_example = self.best_examples_of_observed_targets[k]
@@ -367,7 +449,9 @@ class ConjectureRunner:
if v > existing_score or sort_key(data.buffer) < sort_key(
- self.best_examples_of_observed_targets[k] = data.as_result()
+ data_as_result = data.as_result()
+ assert not isinstance(data_as_result, _Overrun)
+ self.best_examples_of_observed_targets[k] = data_as_result
if data.status == Status.VALID:
self.valid_examples += 1
@@ -391,7 +475,7 @@ class ConjectureRunner:
key = data.interesting_origin
changed = False
- existing = self.interesting_examples[key]
+ existing = self.interesting_examples[key] # type: ignore
except KeyError:
changed = True
self.last_bug_found_at = self.call_count
@@ -406,7 +490,7 @@ class ConjectureRunner:
if changed:
- self.interesting_examples[key] = data.as_result()
+ self.interesting_examples[key] = data.as_result() # type: ignore
@@ -449,10 +533,10 @@ class ConjectureRunner:
- def on_pareto_evict(self, data):
+ def on_pareto_evict(self, data: ConjectureData) -> None:
self.settings.database.delete(self.pareto_key, data.buffer)
- def generate_novel_prefix(self):
+ def generate_novel_prefix(self) -> bytes:
"""Uses the tree to proactively generate a starting sequence of bytes
that we haven't explored yet for this test.
@@ -462,7 +546,7 @@ class ConjectureRunner:
return self.tree.generate_novel_prefix(self.random)
- def record_for_health_check(self, data):
+ def record_for_health_check(self, data: ConjectureData) -> None:
# Once we've actually found a bug, there's no point in trying to run
# health checks - they'll just mask the actually important information.
if data.status == Status.INTERESTING:
@@ -534,18 +618,20 @@ class ConjectureRunner:
- def save_buffer(self, buffer, sub_key=None):
+ def save_buffer(
+ self, buffer: Union[bytes, bytearray], sub_key: Optional[bytes] = None
+ ) -> None:
if self.settings.database is not None:
key = self.sub_key(sub_key)
if key is None:
self.settings.database.save(key, bytes(buffer))
- def downgrade_buffer(self, buffer):
+ def downgrade_buffer(self, buffer: Union[bytes, bytearray]) -> None:
if self.settings.database is not None and self.database_key is not None:
self.settings.database.move(self.database_key, self.secondary_key, buffer)
- def sub_key(self, sub_key):
+ def sub_key(self, sub_key: Optional[bytes]) -> Optional[bytes]:
if self.database_key is None:
return None
if sub_key is None:
@@ -553,34 +639,34 @@ class ConjectureRunner:
return b".".join((self.database_key, sub_key))
- def secondary_key(self):
+ def secondary_key(self) -> Optional[bytes]:
return self.sub_key(b"secondary")
- def pareto_key(self):
+ def pareto_key(self) -> Optional[bytes]:
return self.sub_key(b"pareto")
- def debug(self, message):
+ def debug(self, message: str) -> None:
if self.settings.verbosity >= Verbosity.debug:
- def report_debug_info(self):
+ def report_debug_info(self) -> bool:
return self.settings.verbosity >= Verbosity.debug
- def debug_data(self, data):
+ def debug_data(self, data: Union[ConjectureData, ConjectureResult]) -> None:
if not self.report_debug_info:
- stack = [[]]
+ stack: List[Ls] = [[]]
- def go(ex):
+ def go(ex: Example) -> None:
if ex.length == 0:
if len(ex.children) == 0:
stack[-1].append(int_from_bytes(data.buffer[ex.start : ex.end]))
- node = []
+ node: Ls = []
for v in ex.children:
@@ -601,7 +687,7 @@ class ConjectureRunner:
self.debug(f"{data.index} bytes {stack[0]!r} -> {status}, {data.output}")
- def run(self):
+ def run(self) -> None:
with local_settings(self.settings):
@@ -615,15 +701,15 @@ class ConjectureRunner:
- def database(self):
+ def database(self) -> Optional[ExampleDatabase]:
if self.database_key is None:
return None
return self.settings.database
- def has_existing_examples(self):
+ def has_existing_examples(self) -> bool:
return self.database is not None and Phase.reuse in self.settings.phases
- def reuse_existing_examples(self):
+ def reuse_existing_examples(self) -> None:
"""If appropriate (we have a database and have been told to use it),
try to reload existing examples from the database.
@@ -672,6 +758,11 @@ class ConjectureRunner:
self.settings.database.delete(self.database_key, existing)
self.settings.database.delete(self.secondary_key, existing)
+ # Because self.database is not None (because self.has_existing_examples())
+ # and self.database_key is not None (because we fetched using it above),
+ # we can guarantee self.pareto_front is not None
+ assert self.pareto_front is not None
# If we've not found any interesting examples so far we try some of
# the pareto front from the last run.
if len(corpus) < desired_size and not self.interesting_examples:
@@ -688,7 +779,7 @@ class ConjectureRunner:
if data.status == Status.INTERESTING:
- def exit_with(self, reason):
+ def exit_with(self, reason: ExitReason) -> None:
if self.ignore_limits:
self.statistics["stopped-because"] = reason.describe(self.settings)
@@ -698,7 +789,7 @@ class ConjectureRunner:
self.exit_reason = reason
raise RunIsComplete
- def should_generate_more(self):
+ def should_generate_more(self) -> bool:
# End the generation phase where we would have ended it if no bugs had
# been found. This reproduces the exit logic in `self.test_function`,
# but with the important distinction that this clause will move on to
@@ -721,6 +812,8 @@ class ConjectureRunner:
or not self.settings.report_multiple_bugs
return False
+ assert isinstance(self.first_bug_found_at, int)
+ assert isinstance(self.last_bug_found_at, int)
assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count
# Otherwise, keep searching for between ten and 'a heuristic' calls.
# We cap 'calls after first bug' so errors are reported reasonably
@@ -730,7 +823,7 @@ class ConjectureRunner:
self.first_bug_found_at + 1000, self.last_bug_found_at * 2
- def generate_new_examples(self):
+ def generate_new_examples(self) -> None:
if Phase.generate not in self.settings.phases:
if self.interesting_examples:
@@ -744,10 +837,13 @@ class ConjectureRunner:
assert self.should_generate_more()
zero_data = self.cached_test_function(bytes(BUFFER_SIZE))
if zero_data.status > Status.OVERRUN:
+ assert isinstance(zero_data, ConjectureResult)
if zero_data.status == Status.OVERRUN or (
- zero_data.status == Status.VALID and len(zero_data.buffer) * 2 > BUFFER_SIZE
+ zero_data.status == Status.VALID
+ and isinstance(zero_data, ConjectureResult)
+ and len(zero_data.buffer) * 2 > BUFFER_SIZE
@@ -831,6 +927,10 @@ class ConjectureRunner:
if minimal_example.status < Status.VALID:
consecutive_zero_extend_is_invalid += 1
+ # Because the Status code is greater than Status.VALID, it cannot be
+ # Status.OVERRUN, which guarantees that the minimal_example is a
+ # ConjectureResult object.
+ assert isinstance(minimal_example, ConjectureResult)
consecutive_zero_extend_is_invalid = 0
@@ -846,10 +946,11 @@ class ConjectureRunner:
# running the test function for real here. If however we encounter
# some novel behaviour, we try again with the real test function,
# starting from the new novel prefix that has discovered.
+ trial_data = self.new_conjecture_data(
+ prefix=prefix, max_length=max_length
+ )
- trial_data = self.new_conjecture_data(
- prefix=prefix, max_length=max_length
- )
except PreviouslyUnseenBehaviour:
@@ -857,6 +958,7 @@ class ConjectureRunner:
# If the simulation entered part of the tree that has been killed,
# we don't want to run this.
+ assert isinstance(trial_data.observer, TreeRecordingObserver)
if trial_data.observer.killed:
@@ -898,7 +1000,9 @@ class ConjectureRunner:
self._current_phase = "target"
- def generate_mutations_from(self, data):
+ def generate_mutations_from(
+ self, data: Union[ConjectureData, ConjectureResult]
+ ) -> None:
# A thing that is often useful but rarely happens by accident is
# to generate the same value at multiple different points in the
# test case.
@@ -944,12 +1048,12 @@ class ConjectureRunner:
replacement = data.buffer[e.start : e.end]
- # We attempt to replace both the the examples with
+ # We attempt to replace both the examples with
# whichever choice we made. Note that this might end
# up messing up and getting the example boundaries
# wrong - labels matching are only a best guess as to
# whether the two are equivalent - but it doesn't
- # really matter. It may not achieve the desired result
+ # really matter. It may not achieve the desired result,
# but it's still a perfectly acceptable choice sequence
# to try.
new_data = self.cached_test_function(
@@ -968,6 +1072,7 @@ class ConjectureRunner:
failed_mutations += 1
+ assert isinstance(new_data, ConjectureResult)
if (
new_data.status >= data.status
and data.buffer != new_data.buffer
@@ -982,7 +1087,7 @@ class ConjectureRunner:
failed_mutations += 1
- def optimise_targets(self):
+ def optimise_targets(self) -> None:
"""If any target observations have been made, attempt to optimise them
if not self.should_optimise:
@@ -1022,11 +1127,11 @@ class ConjectureRunner:
if prev_calls == self.call_count:
- def pareto_optimise(self):
+ def pareto_optimise(self) -> None:
if self.pareto_front is not None:
- def _run(self):
+ def _run(self) -> None:
# have to use the primitive provider to interpret database bits...
self._switch_to_hypothesis_provider = True
with self._log_phase_statistics("reuse"):
@@ -1047,7 +1152,12 @@ class ConjectureRunner:
- def new_conjecture_data_ir(self, ir_tree_prefix, *, observer=None):
+ def new_conjecture_data_ir(
+ self,
+ ir_tree_prefix: List[IRNode],
+ *,
+ observer: Optional[DataObserver] = None,
+ ) -> ConjectureData:
provider = (
HypothesisProvider if self._switch_to_hypothesis_provider else self.provider
@@ -1059,7 +1169,12 @@ class ConjectureRunner:
ir_tree_prefix, observer=observer, provider=provider
- def new_conjecture_data(self, prefix, max_length=BUFFER_SIZE, observer=None):
+ def new_conjecture_data(
+ self,
+ prefix: Union[bytes, bytearray],
+ max_length: int = BUFFER_SIZE,
+ observer: Optional[DataObserver] = None,
+ ) -> ConjectureData:
provider = (
HypothesisProvider if self._switch_to_hypothesis_provider else self.provider
@@ -1075,10 +1190,12 @@ class ConjectureRunner:
- def new_conjecture_data_for_buffer(self, buffer):
+ def new_conjecture_data_for_buffer(
+ self, buffer: Union[bytes, bytearray]
+ ) -> ConjectureData:
return self.new_conjecture_data(buffer, max_length=len(buffer))
- def shrink_interesting_examples(self):
+ def shrink_interesting_examples(self) -> None:
"""If we've found interesting examples, try to replace each of them
with a minimal interesting example with the same interesting_origin.
@@ -1119,7 +1236,7 @@ class ConjectureRunner:
self.shrink(example, lambda d: d.status == Status.INTERESTING)
- def predicate(d):
+ def predicate(d: ConjectureData) -> bool:
if d.status < Status.INTERESTING:
return False
return d.interesting_origin == target
@@ -1128,7 +1245,7 @@ class ConjectureRunner:
- def clear_secondary_key(self):
+ def clear_secondary_key(self) -> None:
if self.has_existing_examples():
# If we have any smaller examples in the secondary corpus, now is
# a good time to try them to see if they work as shrinks. They
@@ -1154,12 +1271,26 @@ class ConjectureRunner:
# of this reason for interestingness.
self.settings.database.delete(self.secondary_key, c)
- def shrink(self, example, predicate=None, allow_transition=None):
+ def shrink(
+ self,
+ example: Union[ConjectureData, ConjectureResult],
+ predicate: Optional[Callable[[ConjectureData], bool]] = None,
+ allow_transition: Optional[
+ Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool]
+ ] = None,
+ ) -> Union[ConjectureData, ConjectureResult]:
s = self.new_shrinker(example, predicate, allow_transition)
return s.shrink_target
- def new_shrinker(self, example, predicate=None, allow_transition=None):
+ def new_shrinker(
+ self,
+ example: Union[ConjectureData, ConjectureResult],
+ predicate: Optional[Callable[[ConjectureData], bool]] = None,
+ allow_transition: Optional[
+ Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool]
+ ] = None,
+ ) -> Shrinker:
return Shrinker(
@@ -1169,7 +1300,13 @@ class ConjectureRunner:
in_target_phase=self._current_phase == "target",
- def cached_test_function(self, buffer, *, error_on_discard=False, extend=0):
+ def cached_test_function(
+ self,
+ buffer: Union[bytes, bytearray],
+ *,
+ error_on_discard: bool = False,
+ extend: int = 0,
+ ) -> Union[ConjectureResult, _Overrun]:
"""Checks the tree to see if we've tested this buffer, and returns the
previous result if we have.
@@ -1187,7 +1324,13 @@ class ConjectureRunner:
max_length = min(BUFFER_SIZE, len(buffer) + extend)
- def check_result(result):
+ @overload
+ def check_result(result: _Overrun) -> _Overrun: ...
+ @overload
+ def check_result(result: ConjectureResult) -> ConjectureResult: ...
+ def check_result(
+ result: Union[_Overrun, ConjectureResult],
+ ) -> Union[_Overrun, ConjectureResult]:
assert result is Overrun or (
isinstance(result, ConjectureResult) and result.status != Status.OVERRUN
@@ -1200,10 +1343,12 @@ class ConjectureRunner:
except KeyError:
+ observer: DataObserver
if error_on_discard:
class DiscardObserver(DataObserver):
- def kill_branch(self):
+ @override
+ def kill_branch(self) -> NoReturn:
raise ContainsDiscard
observer = DiscardObserver()
@@ -1241,11 +1386,15 @@ class ConjectureRunner:
result = check_result(data.as_result())
- if extend == 0 or (result is not Overrun and len(result.buffer) <= len(buffer)):
+ if extend == 0 or (
+ result is not Overrun
+ and not isinstance(result, _Overrun)
+ and len(result.buffer) <= len(buffer)
+ ):
self.__data_cache[buffer] = result
return result
- def passing_buffers(self, prefix=b""):
+ def passing_buffers(self, prefix: bytes = b"") -> FrozenSet[bytes]:
"""Return a collection of bytestrings which cause the test to pass.
Optionally restrict this by a certain prefix, which is useful for explain mode.