utils.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import json
  2. from typing import Union
  3. from django.test import TestCase
  4. from model_bakery import baker
  5. from apps.organizations_ext.constants import OrganizationUserRole
  6. from glitchtip.test_utils.test_case import GlitchTipTestCaseMixin
  7. from ..process_event import process_issue_events
  8. from ..schema import (
  9. InterchangeIssueEvent,
  10. IssueEventSchema,
  11. )
  12. class EventIngestTestCase(GlitchTipTestCaseMixin, TestCase):
  13. """
  14. Base class for event ingest tests with helper functions
  15. """
  16. def setUp(self):
  17. self.create_project()
  18. self.params = f"?sentry_key={self.projectkey.public_key}"
  19. def get_json_data(self, filename: str):
  20. with open(filename) as json_file:
  21. return json.load(json_file)
  22. def create_logged_in_user(self):
  23. self.user = baker.make("users.user")
  24. self.client.force_login(self.user)
  25. self.org_user = self.organization.add_user(
  26. self.user, OrganizationUserRole.ADMIN
  27. )
  28. self.team = baker.make("teams.Team", organization=self.organization)
  29. self.team.members.add(self.org_user)
  30. self.project = baker.make("projects.Project", organization=self.organization)
  31. self.project.teams.add(self.team)
  32. def process_events(self, data: Union[dict, list[dict]]) -> list:
  33. if isinstance(data, dict):
  34. data = [data]
  35. events = [
  36. InterchangeIssueEvent(
  37. project_id=self.project.id,
  38. organization_id=self.organization.id if self.organization else None,
  39. payload=IssueEventSchema(**dat),
  40. )
  41. for dat in data
  42. ]
  43. process_issue_events(events)
  44. return events