123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- import os
- from typing import MutableMapping
- import psutil
- import pytest
- import responses
- from django.db import connections
- from sentry.silo import SiloMode
- pytest_plugins = ["sentry.testutils.pytest"]
- @pytest.fixture(autouse=True)
- def unclosed_files():
- fds = frozenset(psutil.Process().open_files())
- yield
- assert frozenset(psutil.Process().open_files()) == fds
- @pytest.hookimpl(tryfirst=True, hookwrapper=True)
- def pytest_runtest_makereport(item, call):
-
- outcome = yield
- report = outcome.get_result()
-
-
- if os.environ.get("GITHUB_ACTIONS") != "true":
- return
-
-
-
- if hasattr(item, "execution_count"):
- import pytest_rerunfailures
- if item.execution_count <= pytest_rerunfailures.get_reruns_count(item):
- return
- if report.when == "call" and report.failed:
-
- filesystempath, lineno, _ = report.location
-
- workspace = os.environ.get("GITHUB_WORKSPACE")
- if workspace:
- full_path = os.path.abspath(filesystempath)
- try:
- rel_path = os.path.relpath(full_path, workspace)
- except ValueError:
-
-
-
- rel_path = filesystempath
- if not rel_path.startswith(".."):
- filesystempath = rel_path
- if lineno is not None:
-
- lineno += 1
-
- longrepr = report.head_line or item.name
-
- try:
- longrepr += "\n\n" + report.longrepr.reprcrash.message
- lineno = report.longrepr.reprcrash.lineno
- except AttributeError:
- pass
- print(_error_workflow_command(filesystempath, lineno, longrepr))
- def _error_workflow_command(filesystempath, lineno, longrepr):
-
- details_dict = {"file": filesystempath}
- if lineno is not None:
- details_dict["line"] = lineno
- details = ",".join(f"{k}={v}" for k, v in details_dict.items())
- if longrepr is None:
- return f"\n::error {details}"
- else:
- longrepr = _escape(longrepr)
- return f"\n::error {details}::{longrepr}"
- def _escape(s):
- return s.replace("%", "%25").replace("\r", "%0D").replace("\n", "%0A")
- @pytest.fixture(autouse=True)
- def validate_silo_mode():
-
-
-
-
- if SiloMode.get_current_mode() != SiloMode.MONOLITH:
- raise Exception(
- "Possible test leak bug! SiloMode was not reset to Monolith between tests. Please read the comment for validate_silo_mode() in tests/conftest.py."
- )
- yield
- if SiloMode.get_current_mode() != SiloMode.MONOLITH:
- raise Exception(
- "Possible test leak bug! SiloMode was not reset to Monolith between tests. Please read the comment for validate_silo_mode() in tests/conftest.py."
- )
- @pytest.fixture(autouse=True)
- def setup_simulate_on_commit(request):
- from sentry.testutils.hybrid_cloud import simulate_on_commit
- with simulate_on_commit(request):
- yield
- @pytest.fixture(autouse=True)
- def setup_enforce_monotonic_transactions(request):
- from sentry.testutils.hybrid_cloud import enforce_no_cross_transaction_interactions
- with enforce_no_cross_transaction_interactions():
- yield
- @pytest.fixture(autouse=True)
- def audit_hybrid_cloud_writes_and_deletes(request):
- """
- Ensure that write operations on hybrid cloud foreign keys are recorded
- alongside outboxes or use a context manager to indicate that the
- caller has considered outbox and didn't accidentally forget.
- Generally you can avoid assertion errors from these checks by:
- 1. Running deletion/write logic within an `outbox_context`.
- 2. Using Model.delete()/save methods that create outbox messages in the
- same transaction as a delete operation.
- Scenarios that are generally always unsafe are using
- `QuerySet.delete()`, `QuerySet.update()` or raw SQL to perform
- writes.
- The User.delete() method is a good example of how to safely
- delete records and generate outbox messages.
- """
- from sentry.testutils.silo import validate_protected_queries
- debug_cursor_state: MutableMapping[str, bool] = {}
- for conn in connections.all():
- debug_cursor_state[conn.alias] = conn.force_debug_cursor
- conn.queries_log.clear()
- conn.force_debug_cursor = True
- try:
- yield
- finally:
- for conn in connections.all():
- conn.force_debug_cursor = debug_cursor_state[conn.alias]
- validate_protected_queries(conn.queries)
- @pytest.fixture(autouse=True)
- def check_leaked_responses_mocks():
- yield
- leaked = responses.registered()
- if leaked:
- responses.reset()
- leaked_s = "".join(f"- {item}\n" for item in leaked)
- raise AssertionError(
- f"`responses` were leaked outside of the test context:\n{leaked_s}"
- f"(make sure to use `@responses.activate` or `with responses.mock:`)"
- )
|