123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- import os
- from typing import MutableSet
- import pytest
- from django.db.transaction import get_connection
- from sentry.silo import SiloMode
- pytest_plugins = ["sentry.utils.pytest"]
- # XXX: The below code is vendored code from https://github.com/utgwkk/pytest-github-actions-annotate-failures
- # so that we can add support for pytest_rerunfailures
- # retried tests will no longer be annotated in GHA
- #
- # Reference:
- # https://docs.pytest.org/en/latest/writing_plugins.html#hookwrapper-executing-around-other-hooks
- # https://docs.pytest.org/en/latest/writing_plugins.html#hook-function-ordering-call-example
- # https://docs.pytest.org/en/stable/reference.html#pytest.hookspec.pytest_runtest_makereport
- #
- # Inspired by:
- # https://github.com/pytest-dev/pytest/blob/master/src/_pytest/terminal.py
- @pytest.hookimpl(tryfirst=True, hookwrapper=True)
- def pytest_runtest_makereport(item, call):
- # execute all other hooks to obtain the report object
- outcome = yield
- report = outcome.get_result()
- # enable only in a workflow of GitHub Actions
- # ref: https://help.github.com/en/actions/configuring-and-managing-workflows/using-environment-variables#default-environment-variables
- if os.environ.get("GITHUB_ACTIONS") != "true":
- return
- # If we have the pytest_rerunfailures plugin,
- # and there are still retries to be run,
- # then do not return the error
- if hasattr(item, "execution_count"):
- import pytest_rerunfailures
- if item.execution_count <= pytest_rerunfailures.get_reruns_count(item):
- return
- if report.when == "call" and report.failed:
- # collect information to be annotated
- filesystempath, lineno, _ = report.location
- # try to convert to absolute path in GitHub Actions
- workspace = os.environ.get("GITHUB_WORKSPACE")
- if workspace:
- full_path = os.path.abspath(filesystempath)
- try:
- rel_path = os.path.relpath(full_path, workspace)
- except ValueError:
- # os.path.relpath() will raise ValueError on Windows
- # when full_path and workspace have different mount points.
- # https://github.com/utgwkk/pytest-github-actions-annotate-failures/issues/20
- rel_path = filesystempath
- if not rel_path.startswith(".."):
- filesystempath = rel_path
- if lineno is not None:
- # 0-index to 1-index
- lineno += 1
- # get the name of the current failed test, with parametrize info
- longrepr = report.head_line or item.name
- # get the error message and line number from the actual error
- try:
- longrepr += "\n\n" + report.longrepr.reprcrash.message
- lineno = report.longrepr.reprcrash.lineno
- except AttributeError:
- pass
- print(_error_workflow_command(filesystempath, lineno, longrepr)) # noqa: S002
- def _error_workflow_command(filesystempath, lineno, longrepr):
- # Build collection of arguments. Ordering is strict for easy testing
- details_dict = {"file": filesystempath}
- if lineno is not None:
- details_dict["line"] = lineno
- details = ",".join(f"{k}={v}" for k, v in details_dict.items())
- if longrepr is None:
- return f"\n::error {details}"
- else:
- longrepr = _escape(longrepr)
- return f"\n::error {details}::{longrepr}"
- def _escape(s):
- return s.replace("%", "%25").replace("\r", "%0D").replace("\n", "%0A")
- _MODEL_MANIFEST_FILE_PATH = "./model-manifest.json" # os.getenv("SENTRY_MODEL_MANIFEST_FILE_PATH")
- _model_manifest = None
- @pytest.fixture(scope="session", autouse=True)
- def create_model_manifest_file():
- """Audit which models are touched by each test case and write it to file."""
- # We have to construct the ModelManifest lazily, because importing
- # sentry.testutils.modelmanifest too early causes a dependency cycle.
- from sentry.testutils.modelmanifest import ModelManifest
- if _MODEL_MANIFEST_FILE_PATH:
- global _model_manifest
- _model_manifest = ModelManifest.open(_MODEL_MANIFEST_FILE_PATH)
- with _model_manifest.write():
- yield
- else:
- yield
- @pytest.fixture(scope="class", autouse=True)
- def register_class_in_model_manifest(request: pytest.FixtureRequest):
- if _model_manifest:
- with _model_manifest.register(request.node.nodeid):
- yield
- else:
- yield
- @pytest.fixture(autouse=True)
- def validate_silo_mode():
- # NOTE! Hybrid cloud uses many mechanisms to simulate multiple different configurations of the application
- # during tests. It depends upon `override_settings` using the correct contextmanager behaviors and correct
- # thread handling in acceptance tests. If you hit one of these, it's possible either that cleanup logic has
- # a bug, or you may be using a contextmanager incorrectly. Let us know and we can help!
- if SiloMode.get_current_mode() != SiloMode.MONOLITH:
- raise Exception(
- "Possible test leak bug! SiloMode was not reset to Monolith between tests. Please read the comment for validate_silo_mode() in tests/conftest.py."
- )
- yield
- if SiloMode.get_current_mode() != SiloMode.MONOLITH:
- raise Exception(
- "Possible test leak bug! SiloMode was not reset to Monolith between tests. Please read the comment for validate_silo_mode() in tests/conftest.py."
- )
- @pytest.fixture(autouse=True)
- def setup_simulate_on_commit(request):
- from sentry.testutils.hybrid_cloud import simulate_on_commit
- with simulate_on_commit(request):
- yield
- @pytest.fixture(autouse=True)
- def protect_hybrid_cloud_writes_and_deletes(request):
- """
- Ensure the deletions on any hybrid cloud foreign keys would be recorded to an outbox
- by preventing any deletes that do not pass through a special 'connection'.
- This logic creates an additional database role which cannot make deletions on special
- restricted hybrid cloud objects, forcing code that would delete it in tests to explicitly
- escalate their role -- the hope being that only codepaths that are smart about outbox
- creation will do so.
- If you are running into issues with permissions to delete objects, consider whether
- you are deleting an object with a hybrid cloud foreign key pointing to it, and whether
- there is an 'expected' way to delete it (usually through the ORM .delete() method, but
- not the QuerySet.delete() or raw SQL delete).
- If you are certain you need to delete the objects in a new codepath, check out User.delete
- logic to see how to escalate the connection's role in tests. Make absolutely sure that you
- create Outbox objects in the same transaction that matches what you delete.
- """
- from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
- from sentry.models import (
- Organization,
- OrganizationMapping,
- OrganizationMember,
- OrganizationMemberMapping,
- )
- from sentry.testutils.silo import iter_models, reset_test_role, restrict_role
- try:
- with get_connection().cursor() as conn:
- conn.execute("SET ROLE 'postgres'")
- except (RuntimeError, AssertionError) as e:
- # Tests that do not have access to the database should pass through.
- # Ideally we'd use request.fixture names to infer this, but there didn't seem to be a single stable
- # fixture name that fully covered all cases of database access, so this approach is "try and then fail".
- if "Database access not allowed" in str(e) or "Database queries to" in str(e):
- yield
- return
- reset_test_role(role="postgres_unprivileged")
- # "De-escalate" the default connection's permission level to prevent queryset level deletions of HCFK.
- seen_models: MutableSet[type] = set()
- for model in iter_models():
- for field in model._meta.fields:
- if not isinstance(field, HybridCloudForeignKey):
- continue
- fk_model = field.foreign_model
- if fk_model is None or fk_model in seen_models:
- continue
- seen_models.add(fk_model)
- restrict_role(role="postgres_unprivileged", model=fk_model, revocation_type="DELETE")
- # Protect organization members from being updated without also invoking the correct outbox logic.
- # If you hit test failures as a result of lacking these privileges, first ensure that you create the correct
- # outboxes in a transaction, and cover that transaction with `in_test_psql_role_override`
- restrict_role(role="postgres_unprivileged", model=OrganizationMember, revocation_type="INSERT")
- restrict_role(role="postgres_unprivileged", model=OrganizationMember, revocation_type="UPDATE")
- restrict_role(role="postgres_unprivileged", model=Organization, revocation_type="INSERT")
- restrict_role(role="postgres_unprivileged", model=Organization, revocation_type="UPDATE")
- restrict_role(role="postgres_unprivileged", model=OrganizationMapping, revocation_type="INSERT")
- restrict_role(role="postgres_unprivileged", model=OrganizationMapping, revocation_type="UPDATE")
- # OrganizationMember objects need to cascade, but they can't use the standard hybrid cloud foreign key because the
- # identifiers are not snowflake ids.
- restrict_role(role="postgres_unprivileged", model=OrganizationMember, revocation_type="DELETE")
- restrict_role(
- role="postgres_unprivileged", model=OrganizationMemberMapping, revocation_type="INSERT"
- )
- with get_connection().cursor() as conn:
- conn.execute("SET ROLE 'postgres_unprivileged'")
- try:
- yield
- finally:
- with get_connection().cursor() as conn:
- conn.execute("SET ROLE 'postgres'")
|