123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- #!/usr/bin/env python
- import os
- import os.path
- import pathlib
- import shutil
- import tempfile
- from typing import Iterator
- import click
- from sentry.runner import configure
- configure()
- _manifest_files_env = {
- "SENTRY_MODEL_MANIFEST_FILE_PATH": pathlib.Path(__file__).absolute().parent.parent,
- "GETSENTRY_MODEL_MANIFEST_FILE_PATH": pathlib.Path(__file__)
- .absolute()
- .parent.parent.parent.joinpath("getsentry"),
- }
- from sentry.testutils.modelmanifest import HybridCloudTestVisitor, ModelManifest
- def find_test_cases_matching(model_name: str) -> Iterator[HybridCloudTestVisitor]:
- for env_name, path_root in _manifest_files_env.items():
- manifest = ModelManifest.open(os.environ[env_name])
- model_test_ids = manifest.connections[
- manifest.get_or_create_id(manifest.model_names, model_name)
- ]
- for test_id, test_visitor in manifest.each_hybrid_cloud_test(path_root):
- if test_id not in model_test_ids:
- continue
- test_visitor.load()
- yield test_visitor
- def pick(prompt, options):
- choice = ""
- while choice not in options:
- choice = input(prompt + " (" + ", ".join(options) + ") ") # noqa
- return choice
- @click.command()
- @click.argument("target_model", required=True)
- def main(target_model: str):
- """
- Script to decorate the given target test for silo tests, making it easier to deploy changes to given tests.
- """
- for test_visitor in find_test_cases_matching(target_model):
- print(f"Trying {test_visitor.test_name} in {test_visitor.test_file_path}") # noqa
- if not test_visitor.decorator_was_stable:
- print(f"Found {test_visitor.test_name}") # noqa
- try:
- if pick("Skip this test?", ["y", "n"]) == "y":
- continue
- except EOFError:
- continue
- try:
- silo_mode = pick("silo mode?", ["all", "no", "control", "region"])
- set_stable = pick("set stable?", ["y", "n"]) == "y"
- except EOFError:
- continue
- _rewrite(test_visitor, f"{silo_mode}_silo_test", set_stable)
- def _decorate(target_test_silo_mode, set_stable, lineno, match_line):
- if not match_line:
- return False
- if not match_line[0] == lineno:
- return False
- ws = b" " * match_line[1]
- if set_stable:
- return ws + f"@{target_test_silo_mode}(stable=True)\n".encode()
- else:
- return ws + f"@{target_test_silo_mode}\n".encode()
- def _rewrite(test_visitor: HybridCloudTestVisitor, target_test_silo_mode, set_stable):
- path = test_visitor.test_file_path
- import_line = f"from sentry.testutils.silo import {target_test_silo_mode}\n".encode()
- if not test_visitor.decorator_match_line and not test_visitor.func_match_line:
- raise Exception(f"Could not find test case {test_visitor.target_symbol_parts}!")
- with tempfile.NamedTemporaryFile(delete=False) as tf:
- with open(path) as f:
- if not test_visitor.import_match_line:
- tf.write(import_line)
- for i, line in enumerate(f.readlines()):
- i += 1
- if test_visitor.import_match_line and test_visitor.import_match_line[0] == i:
- tf.write(import_line)
- continue
- if newline := _decorate(
- target_test_silo_mode, set_stable, i, test_visitor.decorator_match_line
- ):
- # If the decorator type is not changing, keep the original line.
- # If the decorator that existed was stable, don't replace it, keep it.
- if test_visitor.decorator_was_stable:
- tf.write(line.encode("utf8"))
- else:
- tf.write(newline)
- continue
- if not test_visitor.decorator_match_line and (
- newline := _decorate(
- target_test_silo_mode, set_stable, i, test_visitor.func_match_line
- )
- ):
- tf.write(newline)
- tf.write(line.encode("utf8"))
- tf.close()
- shutil.move(tf.name, path)
- if __name__ == "__main__":
- main()
|