test_release.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. from sentry.models import (
  2. Environment,
  3. Project,
  4. Release,
  5. ReleaseCommit,
  6. ReleaseEnvironment,
  7. ReleaseFile,
  8. ScheduledDeletion,
  9. )
  10. from sentry.tasks.deletion.hybrid_cloud import schedule_hybrid_cloud_foreign_key_jobs
  11. from sentry.tasks.deletion.scheduled import run_deletion
  12. from sentry.testutils import TransactionTestCase
  13. from sentry.testutils.helpers import TaskRunner
  14. from sentry.testutils.outbox import outbox_runner
  15. from sentry.testutils.silo import exempt_from_silo_limits
  16. class DeleteReleaseTest(TransactionTestCase):
  17. def test_simple(self):
  18. org = self.create_organization()
  19. project = self.create_project(organization=org)
  20. env = self.create_environment(organization=org)
  21. release = self.create_release(project=project, environments=[env])
  22. file = self.create_release_file(release_id=release.id)
  23. deletion = ScheduledDeletion.schedule(release, days=0)
  24. deletion.update(in_progress=True)
  25. with self.tasks():
  26. run_deletion(deletion.id)
  27. assert not Release.objects.filter(id=release.id).exists()
  28. assert not ReleaseCommit.objects.filter(release=release).exists()
  29. assert not ReleaseEnvironment.objects.filter(release=release).exists()
  30. assert not ReleaseFile.objects.filter(id=file.id).exists()
  31. # Shared objects should continue to exist.
  32. assert Environment.objects.filter(id=env.id).exists()
  33. assert Project.objects.filter(id=project.id).exists()
  34. def test_cascade_from_user(self):
  35. org = self.create_organization()
  36. project = self.create_project(organization=org)
  37. env = self.create_environment(organization=org)
  38. release1 = self.create_release(project=project, environments=[env])
  39. release1.update(owner_id=self.user.id)
  40. release2 = self.create_release(project=project, environments=[env])
  41. release2.update(owner_id=self.create_user().id)
  42. assert release1.owner_id
  43. assert release2.owner_id
  44. with exempt_from_silo_limits(), outbox_runner():
  45. self.user.delete()
  46. with TaskRunner():
  47. schedule_hybrid_cloud_foreign_key_jobs()
  48. release1.refresh_from_db()
  49. release2.refresh_from_db()
  50. assert release1.owner_id is None
  51. assert release2.owner_id is not None