123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- from __future__ import annotations
- from datetime import datetime, timedelta
- from typing import Any, Mapping, Optional
- import pytest
- from django.utils.functional import cached_property
- from sentry.eventstore.models import Event
- from sentry.incidents.models import IncidentActivityType
- from sentry.models.activity import Activity
- from sentry.models.actor import Actor, get_actor_id_for_user
- from sentry.models.grouprelease import GroupRelease
- from sentry.models.identity import Identity, IdentityProvider
- from sentry.models.integrations.integration import Integration
- from sentry.models.integrations.organization_integration import OrganizationIntegration
- from sentry.models.organization import Organization
- from sentry.models.organizationmember import OrganizationMember
- from sentry.models.organizationmemberteam import OrganizationMemberTeam
- from sentry.models.project import Project
- from sentry.models.user import User
- from sentry.services.hybrid_cloud.organization import RpcOrganization
- from sentry.services.hybrid_cloud.user import RpcUser
- from sentry.silo import SiloMode
- from sentry.testutils.factories import Factories
- from sentry.testutils.helpers.datetime import before_now, iso_format
- from sentry.testutils.silo import assume_test_silo_mode
- # XXX(dcramer): this is a compatibility layer to transition to pytest-based fixtures
- # all of the memoized fixtures are copypasta due to our inability to use pytest fixtures
- # on a per-class method basis
- from sentry.types.activity import ActivityType
- class Fixtures:
- @cached_property
- def session(self):
- return Factories.create_session()
- @cached_property
- def projectkey(self):
- return self.create_project_key(project=self.project)
- @cached_property
- def user(self):
- return self.create_user("admin@localhost", is_superuser=True)
- @cached_property
- def organization(self):
- # XXX(dcramer): ensure that your org slug doesnt match your team slug
- # and the same for your project slug
- return self.create_organization(name="baz", slug="baz", owner=self.user)
- @cached_property
- @assume_test_silo_mode(SiloMode.REGION)
- def team(self):
- team = self.create_team(organization=self.organization, name="foo", slug="foo")
- # XXX: handle legacy team fixture
- queryset = OrganizationMember.objects.filter(organization=self.organization)
- for om in queryset:
- OrganizationMemberTeam.objects.create(team=team, organizationmember=om, is_active=True)
- return team
- @cached_property
- def project(self):
- return self.create_project(
- name="Bar", slug="bar", teams=[self.team], fire_project_created=True
- )
- @cached_property
- def release(self):
- return self.create_release(project=self.project, version="foo-1.0")
- @cached_property
- def environment(self):
- return self.create_environment(name="development", project=self.project)
- @cached_property
- def group(self):
- return self.create_group(message="\u3053\u3093\u306b\u3061\u306f")
- @cached_property
- def event(self):
- return self.store_event(
- data={
- "event_id": "a" * 32,
- "message": "\u3053\u3093\u306b\u3061\u306f",
- "timestamp": iso_format(before_now(seconds=1)),
- },
- project_id=self.project.id,
- )
- @cached_property
- @assume_test_silo_mode(SiloMode.REGION)
- def activity(self):
- return Activity.objects.create(
- group=self.group,
- project=self.project,
- type=ActivityType.NOTE.value,
- user_id=self.user.id,
- data={},
- )
- @cached_property
- @assume_test_silo_mode(SiloMode.CONTROL)
- def integration(self):
- integration = Integration.objects.create(
- provider="github",
- name="GitHub",
- external_id="github:1",
- metadata={
- "access_token": "xxxxx-xxxxxxxxx-xxxxxxxxxx-xxxxxxxxxxxx",
- "expires_at": iso_format(datetime.utcnow() + timedelta(days=14)),
- },
- )
- integration.add_organization(self.organization, self.user)
- return integration
- @cached_property
- @assume_test_silo_mode(SiloMode.CONTROL)
- def organization_integration(self):
- return self.integration.add_organization(self.organization, self.user)
- def create_organization(self, *args, **kwargs):
- return Factories.create_organization(*args, **kwargs)
- def create_member(self, *args, **kwargs):
- return Factories.create_member(*args, **kwargs)
- def create_api_key(self, *args, **kwargs):
- return Factories.create_api_key(*args, **kwargs)
- def create_auth_provider(self, *args, **kwargs):
- return Factories.create_auth_provider(*args, **kwargs)
- def create_auth_identity(self, *args, **kwargs):
- return Factories.create_auth_identity(*args, **kwargs)
- def create_user_auth_token(self, *args, **kwargs):
- return Factories.create_user_auth_token(*args, **kwargs)
- def create_team_membership(self, *args, **kwargs):
- return Factories.create_team_membership(*args, **kwargs)
- def create_team(self, organization=None, **kwargs):
- if organization is None:
- organization = self.organization
- return Factories.create_team(organization=organization, **kwargs)
- def create_environment(self, project=None, **kwargs):
- if project is None:
- project = self.project
- return Factories.create_environment(project=project, **kwargs)
- def create_project(self, **kwargs):
- if "teams" not in kwargs:
- kwargs["teams"] = [self.team]
- return Factories.create_project(**kwargs)
- def create_project_bookmark(self, project=None, *args, **kwargs):
- if project is None:
- project = self.project
- return Factories.create_project_bookmark(project=project, *args, **kwargs)
- def create_project_key(self, project=None, *args, **kwargs):
- if project is None:
- project = self.project
- return Factories.create_project_key(project=project, *args, **kwargs)
- def create_project_rule(
- self, project=None, action_match=None, condition_match=None, *args, **kwargs
- ):
- if project is None:
- project = self.project
- return Factories.create_project_rule(
- project=project,
- action_data=action_match,
- condition_data=condition_match,
- *args,
- **kwargs,
- )
- def create_slack_project_rule(
- self, project=None, integration_id=None, channel_id=None, channel_name=None, *args, **kwargs
- ):
- if project is None:
- project = self.project
- return Factories.create_slack_project_rule(
- project,
- integration_id=integration_id,
- channel_id=channel_id,
- channel_name=channel_name,
- *args,
- **kwargs,
- )
- def create_release(self, project=None, user=None, *args, **kwargs):
- if project is None:
- project = self.project
- return Factories.create_release(project=project, user=user, *args, **kwargs)
- def create_group_release(self, project: Project = None, *args, **kwargs) -> GroupRelease:
- if project is None:
- project = self.project
- return Factories.create_group_release(project, *args, **kwargs)
- def create_release_file(self, release_id=None, file=None, name=None, dist_id=None):
- if release_id is None:
- release_id = self.release.id
- return Factories.create_release_file(release_id, file, name, dist_id)
- def create_artifact_bundle_zip(self, org=None, release=None, *args, **kwargs):
- return Factories.create_artifact_bundle_zip(org, release, *args, **kwargs)
- def create_release_archive(self, org=None, release=None, *args, **kwargs):
- if org is None:
- org = self.organization.slug
- if release is None:
- release = self.release.version
- return Factories.create_release_archive(org, release, *args, **kwargs)
- def create_artifact_bundle(self, org=None, *args, **kwargs):
- if org is None:
- org = self.organization
- return Factories.create_artifact_bundle(org, *args, **kwargs)
- def create_code_mapping(self, project=None, repo=None, organization_integration=None, **kwargs):
- if project is None:
- project = self.project
- if organization_integration is None:
- organization_integration = self.organization_integration
- return Factories.create_code_mapping(project, repo, organization_integration, **kwargs)
- def create_repo(self, project=None, *args, **kwargs):
- if project is None:
- project = self.project
- return Factories.create_repo(project=project, *args, **kwargs)
- def create_commit(self, *args, **kwargs):
- return Factories.create_commit(*args, **kwargs)
- def create_commit_author(self, *args, **kwargs):
- return Factories.create_commit_author(*args, **kwargs)
- def create_commit_file_change(self, *args, **kwargs):
- return Factories.create_commit_file_change(*args, **kwargs)
- def create_user(self, *args, **kwargs):
- return Factories.create_user(*args, **kwargs)
- def create_useremail(self, *args, **kwargs):
- return Factories.create_useremail(*args, **kwargs)
- def create_usersocialauth(
- self,
- user: User | None = None,
- provider: str | None = None,
- uid: str | None = None,
- extra_data: Mapping[str, Any] | None = None,
- ):
- if not user:
- user = self.user
- return Factories.create_usersocialauth(
- user=user, provider=provider, uid=uid, extra_data=extra_data
- )
- def store_event(self, *args, **kwargs) -> Event:
- return Factories.store_event(*args, **kwargs)
- def create_group(self, project=None, *args, **kwargs):
- if project is None:
- project = self.project
- return Factories.create_group(project=project, *args, **kwargs)
- def create_file(self, **kwargs):
- return Factories.create_file(**kwargs)
- def create_file_from_path(self, *args, **kwargs):
- return Factories.create_file_from_path(*args, **kwargs)
- def create_event_attachment(self, event=None, *args, **kwargs):
- if event is None:
- event = self.event
- return Factories.create_event_attachment(event=event, *args, **kwargs)
- def create_dif_file(self, project=None, *args, **kwargs):
- if project is None:
- project = self.project
- return Factories.create_dif_file(project=project, *args, **kwargs)
- def create_dif_from_path(self, project=None, *args, **kwargs):
- if project is None:
- project = self.project
- return Factories.create_dif_from_path(project=project, *args, **kwargs)
- def add_user_permission(self, *args, **kwargs):
- return Factories.add_user_permission(*args, **kwargs)
- def create_sentry_app(self, *args, **kwargs):
- return Factories.create_sentry_app(*args, **kwargs)
- def create_internal_integration(self, *args, **kwargs):
- return Factories.create_internal_integration(*args, **kwargs)
- def create_internal_integration_token(self, *args, **kwargs):
- return Factories.create_internal_integration_token(*args, **kwargs)
- def create_sentry_app_installation(self, *args, **kwargs):
- return Factories.create_sentry_app_installation(*args, **kwargs)
- def create_sentry_app_installation_for_provider(self, *args, **kwargs):
- return Factories.create_sentry_app_installation_for_provider(*args, **kwargs)
- def create_stacktrace_link_schema(self, *args, **kwargs):
- return Factories.create_stacktrace_link_schema(*args, **kwargs)
- def create_issue_link_schema(self, *args, **kwargs):
- return Factories.create_issue_link_schema(*args, **kwargs)
- def create_alert_rule_action_schema(self, *args, **kwargs):
- return Factories.create_alert_rule_action_schema(*args, **kwargs)
- def create_sentry_app_feature(self, *args, **kwargs):
- return Factories.create_sentry_app_feature(*args, **kwargs)
- def create_doc_integration(self, *args, **kwargs):
- return Factories.create_doc_integration(*args, **kwargs)
- def create_doc_integration_features(self, *args, **kwargs):
- return Factories.create_doc_integration_features(*args, **kwargs)
- def create_doc_integration_avatar(self, *args, **kwargs):
- return Factories.create_doc_integration_avatar(*args, **kwargs)
- def create_service_hook(self, *args, **kwargs):
- return Factories.create_service_hook(*args, **kwargs)
- def create_userreport(self, *args, **kwargs):
- return Factories.create_userreport(*args, **kwargs)
- def create_platform_external_issue(self, *args, **kwargs):
- return Factories.create_platform_external_issue(*args, **kwargs)
- def create_integration_external_issue(self, *args, **kwargs):
- return Factories.create_integration_external_issue(*args, **kwargs)
- def create_incident(self, organization=None, projects=None, *args, **kwargs):
- if not organization:
- organization = self.organization
- if projects is None:
- projects = [self.project]
- return Factories.create_incident(
- organization=organization, projects=projects, *args, **kwargs
- )
- def create_incident_activity(self, incident, *args, **kwargs):
- return Factories.create_incident_activity(incident=incident, *args, **kwargs)
- def create_incident_comment(self, incident, *args, **kwargs):
- return self.create_incident_activity(
- incident, type=IncidentActivityType.COMMENT.value, *args, **kwargs
- )
- def create_incident_trigger(self, incident, alert_rule_trigger, status):
- return Factories.create_incident_trigger(incident, alert_rule_trigger, status=status)
- def create_alert_rule(self, organization=None, projects=None, *args, **kwargs):
- if not organization:
- organization = self.organization
- if projects is None:
- projects = [self.project]
- return Factories.create_alert_rule(organization, projects, *args, **kwargs)
- def create_alert_rule_trigger(self, alert_rule=None, *args, **kwargs):
- if not alert_rule:
- alert_rule = self.create_alert_rule()
- return Factories.create_alert_rule_trigger(alert_rule, *args, **kwargs)
- def create_alert_rule_trigger_action(
- self,
- alert_rule_trigger=None,
- target_identifier=None,
- triggered_for_incident=None,
- *args,
- **kwargs,
- ):
- if not alert_rule_trigger:
- alert_rule_trigger = self.create_alert_rule_trigger()
- if not target_identifier:
- target_identifier = str(self.user.id)
- if triggered_for_incident is not None:
- Factories.create_incident_trigger(triggered_for_incident, alert_rule_trigger)
- return Factories.create_alert_rule_trigger_action(
- alert_rule_trigger, target_identifier=target_identifier, **kwargs
- )
- def create_notification_action(self, organization=None, projects=None, **kwargs):
- return Factories.create_notification_action(
- organization=organization, projects=projects, **kwargs
- )
- def create_notification_settings_provider(self, *args, **kwargs):
- return Factories.create_notification_settings_provider(*args, **kwargs)
- def create_external_user(self, user=None, organization=None, integration=None, **kwargs):
- if not user:
- user = self.user
- if not organization:
- organization = self.organization # Force creation.
- if not integration:
- integration = self.integration
- return Factories.create_external_user(
- user=user, organization=organization, integration_id=integration.id, **kwargs
- )
- def create_external_team(self, team=None, integration=None, **kwargs):
- if not team:
- team = self.team
- if not integration:
- integration = self.integration
- return Factories.create_external_team(
- team=team, organization=team.organization, integration_id=integration.id, **kwargs
- )
- def create_codeowners(self, project=None, code_mapping=None, **kwargs):
- if not project:
- project = self.project
- if not code_mapping:
- self.repo = self.create_repo(self.project)
- code_mapping = self.create_code_mapping(self.project, self.repo)
- return Factories.create_codeowners(project=project, code_mapping=code_mapping, **kwargs)
- def create_slack_integration(
- self,
- organization: Organization,
- external_id: str = "TXXXXXXX1",
- user: Optional[RpcUser] = None,
- identity_external_id: str = "UXXXXXXX1",
- **kwargs: Any,
- ):
- if user is None:
- with assume_test_silo_mode(SiloMode.REGION):
- user = organization.get_default_owner()
- integration = Factories.create_slack_integration(
- organization=organization, external_id=external_id, **kwargs
- )
- idp = Factories.create_identity_provider(integration=integration)
- Factories.create_identity(user, idp, identity_external_id)
- return integration
- def create_integration(
- self,
- organization: Organization,
- external_id: str,
- oi_params: Mapping[str, Any] | None = None,
- **kwargs: Any,
- ) -> Integration:
- """Create an integration and add an organization."""
- return Factories.create_integration(organization, external_id, oi_params, **kwargs)
- def create_provider_integration(self, **integration_params: Any) -> Integration:
- """Create an integration tied to a provider but no particular organization."""
- return Factories.create_provider_integration(**integration_params)
- def create_provider_integration_for(
- self,
- organization: Organization | RpcOrganization,
- user: User | RpcUser | None,
- **integration_params: Any,
- ) -> tuple[Integration, OrganizationIntegration]:
- """Create an integration tied to a provider, then add an organization."""
- return Factories.create_provider_integration_for(organization, user, **integration_params)
- def create_identity_integration(
- self,
- user: User | RpcUser,
- organization: Organization | RpcOrganization,
- integration_params: Mapping[Any, Any],
- identity_params: Mapping[Any, Any],
- ) -> tuple[Integration, OrganizationIntegration, Identity, IdentityProvider]:
- return Factories.create_identity_integration(
- user, organization, integration_params, identity_params
- )
- def create_organization_integration(self, **integration_params: Any) -> OrganizationIntegration:
- """Create an OrganizationIntegration entity."""
- return Factories.create_organization_integration(**integration_params)
- def create_identity(self, *args, **kwargs):
- return Factories.create_identity(*args, **kwargs)
- def create_identity_provider(
- self,
- integration: Integration | None = None,
- config: Mapping[str, Any] | None = None,
- **kwargs: Any,
- ) -> IdentityProvider:
- return Factories.create_identity_provider(integration=integration, config=config, **kwargs)
- def create_group_history(self, *args, **kwargs):
- if "actor" not in kwargs:
- kwargs["actor"] = Actor.objects.get(id=get_actor_id_for_user(self.user))
- return Factories.create_group_history(*args, **kwargs)
- def create_comment(self, *args, **kwargs):
- return Factories.create_comment(*args, **kwargs)
- def create_sentry_function(self, *args, **kwargs):
- return Factories.create_sentry_function(*args, **kwargs)
- def create_saved_search(self, *args, **kwargs):
- return Factories.create_saved_search(*args, **kwargs)
- def create_organization_mapping(self, *args, **kwargs):
- return Factories.create_org_mapping(*args, **kwargs)
- def create_basic_auth_header(self, *args, **kwargs):
- return Factories.create_basic_auth_header(*args, **kwargs)
- def snooze_rule(self, *args, **kwargs):
- return Factories.snooze_rule(*args, **kwargs)
- @pytest.fixture(autouse=True)
- def _init_insta_snapshot(self, insta_snapshot):
- self.insta_snapshot = insta_snapshot
|