Просмотр исходного кода

ref: fix typing in two more tests (#70583)

<!-- Describe your PR here. -->
anthony sottile 10 месяцев назад
Родитель
Сommit
ccc988923b

+ 0 - 2
pyproject.toml

@@ -532,8 +532,6 @@ module = [
     "tests.sentry.ingest.test_slicing",
     "tests.sentry.ingest.test_slicing",
     "tests.sentry.integrations.github.test_client",
     "tests.sentry.integrations.github.test_client",
     "tests.sentry.issues.test_utils",
     "tests.sentry.issues.test_utils",
-    "tests.sentry.models.test_organizationmember",
-    "tests.sentry.replays.test_project_replay_recording_segment_index",
     "tests.sentry.tasks.test_post_process",
     "tests.sentry.tasks.test_post_process",
 ]
 ]
 disable_error_code = [
 disable_error_code = [

+ 1 - 2
tests/sentry/models/test_organizationmember.py

@@ -202,8 +202,7 @@ class OrganizationMemberTest(TestCase, HybridCloudTestMixin):
 
 
     def test_regenerate_token(self):
     def test_regenerate_token(self):
         member = OrganizationMember(organization=self.organization, email="foo@example.com")
         member = OrganizationMember(organization=self.organization, email="foo@example.com")
-        assert member.token is None
-        assert member.token_expires_at is None
+        assert (member.token, member.token_expires_at) == (None, None)
 
 
         member.regenerate_token()
         member.regenerate_token()
         assert member.token
         assert member.token

+ 27 - 38
tests/sentry/replays/test_project_replay_recording_segment_index.py

@@ -13,9 +13,31 @@ from sentry.testutils.helpers.response import close_streaming_response
 Message = namedtuple("Message", ["project_id", "replay_id"])
 Message = namedtuple("Message", ["project_id", "replay_id"])
 
 
 
 
-class ProjectReplayRecordingSegmentIndexMixin:
+# have to use TransactionTestCase because we're using threadpools
+class FilestoreProjectReplayRecordingSegmentIndexTestCase(TransactionTestCase):
     endpoint = "sentry-api-0-project-replay-recording-segment-index"
     endpoint = "sentry-api-0-project-replay-recording-segment-index"
 
 
+    def setUp(self):
+        super().setUp()
+        self.login_as(self.user)
+        self.replay_id = uuid.uuid4().hex
+        self.url = reverse(
+            self.endpoint,
+            args=(self.organization.slug, self.project.slug, self.replay_id),
+        )
+
+    def save_recording_segment(
+        self, segment_id: int, data: bytes, compressed: bool = True, is_archived: bool = False
+    ) -> None:
+        metadata = RecordingSegmentStorageMeta(
+            project_id=self.project.id,
+            replay_id=self.replay_id,
+            segment_id=segment_id,
+            retention_days=30,
+            file_id=None,
+        )
+        FilestoreBlob().set(metadata, zlib.compress(data) if compressed else data)
+
     def test_index_download_basic_compressed(self):
     def test_index_download_basic_compressed(self):
         for i in range(0, 3):
         for i in range(0, 3):
             self.save_recording_segment(i, f'[{{"test":"hello {i}"}}]'.encode())
             self.save_recording_segment(i, f'[{{"test":"hello {i}"}}]'.encode())
@@ -80,49 +102,16 @@ class ProjectReplayRecordingSegmentIndexMixin:
         assert b'[[{"test":"hello 1"}],[{"test":"hello 2"}]]' == close_streaming_response(response)
         assert b'[[{"test":"hello 1"}],[{"test":"hello 2"}]]' == close_streaming_response(response)
 
 
 
 
-class FilestoreProjectReplayRecordingSegmentIndexTestCase(
-    ProjectReplayRecordingSegmentIndexMixin, TransactionTestCase
-):
-    # have to use TransactionTestCase because we're using threadpools
-
-    endpoint = "sentry-api-0-project-replay-recording-segment-index"
-
-    def setUp(self):
-        super().setUp()
-        self.login_as(self.user)
-        self.replay_id = uuid.uuid4().hex
-        self.url = reverse(
-            self.endpoint,
-            args=(self.organization.slug, self.project.slug, self.replay_id),
-        )
-
-    def save_recording_segment(self, segment_id, data: bytes, compressed: bool = True):
-        metadata = RecordingSegmentStorageMeta(
-            project_id=self.project.id,
-            replay_id=self.replay_id,
-            segment_id=segment_id,
-            retention_days=30,
-            file_id=None,
-        )
-        FilestoreBlob().set(metadata, zlib.compress(data) if compressed else data)
-
-
 class StorageProjectReplayRecordingSegmentIndexTestCase(
 class StorageProjectReplayRecordingSegmentIndexTestCase(
-    ProjectReplayRecordingSegmentIndexMixin, APITestCase, ReplaysSnubaTestCase
+    FilestoreProjectReplayRecordingSegmentIndexTestCase, APITestCase, ReplaysSnubaTestCase
 ):
 ):
     def setUp(self):
     def setUp(self):
         super().setUp()
         super().setUp()
-        self.login_as(self.user)
-        self.replay_id = uuid.uuid4().hex
-        self.url = reverse(
-            self.endpoint,
-            args=(self.organization.slug, self.project.slug, self.replay_id),
-        )
         self.features = {"organizations:session-replay": True}
         self.features = {"organizations:session-replay": True}
 
 
     def save_recording_segment(
     def save_recording_segment(
-        self, segment_id: int, data: bytes, compressed: bool = True, **metadata
-    ):
+        self, segment_id: int, data: bytes, compressed: bool = True, is_archived: bool = False
+    ) -> None:
         # Insert the row in clickhouse.
         # Insert the row in clickhouse.
         self.store_replays(
         self.store_replays(
             mock_replay(
             mock_replay(
@@ -131,7 +120,7 @@ class StorageProjectReplayRecordingSegmentIndexTestCase(
                 self.replay_id,
                 self.replay_id,
                 segment_id=segment_id,
                 segment_id=segment_id,
                 retention_days=30,
                 retention_days=30,
-                **metadata,
+                is_archived=is_archived,
             )
             )
         )
         )