Browse Source

fix(group-attributes): Add backfill script to re-sync data to group attributes (#64138)

This adds a backfill script to send Group data to snuba again, so that
we know the attributes are all correct. Copies in a lot of external code
Dan Fuller 1 year ago
parent
commit
958bca49dd

+ 1 - 1
migrations_lockfile.txt

@@ -9,5 +9,5 @@ feedback: 0004_index_together
 hybridcloud: 0009_make_user_id_optional_for_slug_reservation_replica
 nodestore: 0002_nodestore_no_dictfield
 replays: 0004_index_together
-sentry: 0640_index_together
+sentry: 0641_backfill_group_attributes
 social_auth: 0002_default_auto_field

+ 168 - 0
src/sentry/migrations/0641_backfill_group_attributes.py

@@ -0,0 +1,168 @@
+# Generated by Django 4.2.8 on 2024-01-27 01:39
+import dataclasses
+from datetime import datetime
+from enum import Enum
+from typing import Optional
+
+from django.db import migrations
+from django.db.models import F, Window
+from django.db.models.functions import Rank
+
+from sentry.issues.attributes import produce_snapshot_to_kafka
+from sentry.new_migrations.migrations import CheckedMigration
+from sentry.utils import redis
+from sentry.utils.iterators import chunked
+from sentry.utils.query import RangeQuerySetWrapperWithProgressBar
+
+CHUNK_SIZE = 10000
+
+
+class GroupOwnerType(Enum):
+    SUSPECT_COMMIT = 0
+    OWNERSHIP_RULE = 1
+    CODEOWNERS = 2
+
+
+@dataclasses.dataclass
+class GroupValues:
+    id: int
+    project_id: int
+    status: int
+    substatus: Optional[int]
+    first_seen: datetime
+    num_comments: int
+
+
+def _bulk_retrieve_group_values(group_ids, Group):
+    group_values_map = {
+        group["id"]: group
+        for group in Group.objects.filter(id__in=group_ids).values(
+            "id", "project_id", "status", "substatus", "first_seen", "num_comments"
+        )
+    }
+    assert len(group_values_map) == len(group_ids)
+
+    results = []
+    for group_id in group_ids:
+        group_values = group_values_map[group_id]
+        results.append(
+            GroupValues(
+                id=group_id,
+                project_id=group_values["project_id"],
+                status=group_values["status"],
+                substatus=group_values["substatus"],
+                first_seen=group_values["first_seen"],
+                num_comments=group_values["num_comments"],
+            )
+        )
+    return results
+
+
+def _bulk_retrieve_snapshot_values(group_values_list, GroupAssignee, GroupOwner):
+    group_assignee_map = {
+        ga["group_id"]: ga
+        for ga in GroupAssignee.objects.filter(
+            group_id__in=[gv.id for gv in group_values_list]
+        ).values("group_id", "user_id", "team_id")
+    }
+
+    group_owner_map = {}
+
+    for group_owner in (
+        GroupOwner.objects.annotate(
+            position=Window(Rank(), partition_by=[F("group_id"), F("type")], order_by="-date_added")
+        )
+        .filter(position=1, group_id__in=[g.id for g in group_values_list])
+        .values("group_id", "user_id", "team_id", "type")
+    ):
+        group_owner_map[(group_owner["group_id"], group_owner["type"])] = group_owner
+
+    snapshots = []
+    for group_value in group_values_list:
+        assignee = group_assignee_map.get(group_value.id)
+        suspect_owner = group_owner_map.get((group_value.id, GroupOwnerType.SUSPECT_COMMIT.value))
+        ownership_owner = group_owner_map.get((group_value.id, GroupOwnerType.OWNERSHIP_RULE.value))
+        codeowners_owner = group_owner_map.get((group_value.id, GroupOwnerType.CODEOWNERS.value))
+        snapshot = {
+            "group_deleted": False,
+            "project_id": group_value.project_id,
+            "group_id": group_value.id,
+            "status": group_value.status,
+            "substatus": group_value.substatus,
+            "first_seen": group_value.first_seen.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
+            "num_comments": group_value.num_comments,
+            "timestamp": datetime.now().isoformat(),
+            "assignee_user_id": assignee["user_id"] if assignee else None,
+            "assignee_team_id": assignee["team_id"] if assignee else None,
+            "owner_suspect_commit_user_id": suspect_owner["user_id"] if suspect_owner else None,
+            "owner_ownership_rule_user_id": ownership_owner["user_id"] if ownership_owner else None,
+            "owner_ownership_rule_team_id": ownership_owner["team_id"] if ownership_owner else None,
+            "owner_codeowners_user_id": codeowners_owner["user_id"] if codeowners_owner else None,
+            "owner_codeowners_team_id": codeowners_owner["team_id"] if codeowners_owner else None,
+        }
+        snapshots.append(snapshot)
+
+    return snapshots
+
+
+def bulk_send_snapshot_values(group_ids, groups, Group, GroupAssignee, GroupOwner) -> None:
+    if group_ids is None and groups is None:
+        raise ValueError("cannot send snapshot values when group_ids and groups are None")
+
+    group_list = groups or []
+    if group_ids:
+        group_list.extend(_bulk_retrieve_group_values(group_ids, Group))
+
+    snapshots = _bulk_retrieve_snapshot_values(group_list, GroupAssignee, GroupOwner)
+
+    for snapshot in snapshots:
+        produce_snapshot_to_kafka(snapshot)
+
+
+def backfill_group_attributes_to_snuba(apps, schema_editor):
+    Group = apps.get_model("sentry", "Group")
+    GroupAssignee = apps.get_model("sentry", "GroupAssignee")
+    GroupOwner = apps.get_model("sentry", "GroupOwner")
+
+    backfill_key = "backfill_group_attributes_to_snuba_progress"
+    redis_client = redis.redis_clusters.get("default")
+
+    progress_id = int(redis_client.get(backfill_key) or 0)
+
+    for group_ids in chunked(
+        RangeQuerySetWrapperWithProgressBar(
+            Group.objects.filter(id__gt=progress_id).values_list("id", flat=True),
+            step=CHUNK_SIZE,
+            result_value_getter=lambda item: item,
+        ),
+        CHUNK_SIZE,
+    ):
+        bulk_send_snapshot_values(group_ids, None, Group, GroupAssignee, GroupOwner)
+        # Save progress to redis in case we have to restart
+        redis_client.set(backfill_key, group_ids[-1], ex=60 * 60 * 24 * 7)
+
+
+class Migration(CheckedMigration):
+    # This flag is used to mark that a migration shouldn't be automatically run in production. For
+    # the most part, this should only be used for operations where it's safe to run the migration
+    # after your code has deployed. So this should not be used for most operations that alter the
+    # schema of a table.
+    # Here are some things that make sense to mark as dangerous:
+    # - Large data migrations. Typically we want these to be run manually by ops so that they can
+    #   be monitored and not block the deploy for a long period of time while they run.
+    # - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to
+    #   have ops run this and not block the deploy. Note that while adding an index is a schema
+    #   change, it's completely safe to run the operation after the code has deployed.
+    is_dangerous = True
+
+    dependencies = [
+        ("sentry", "0640_index_together"),
+    ]
+
+    operations = [
+        migrations.RunPython(
+            backfill_group_attributes_to_snuba,
+            reverse_code=migrations.RunPython.noop,
+            hints={"tables": ["sentry_groupedmessage"]},
+        )
+    ]

+ 1 - 0
tests/sentry/migrations/test_0633_add_priority_locked_at_to_groupedmessage.py

@@ -4,6 +4,7 @@ from sentry.testutils.cases import TestMigrations
 from sentry.testutils.silo import no_silo_test
 
 
+@pytest.mark.skip("Migration is no longer runnable. Retain until migration is removed.")
 @no_silo_test
 class AddPriorityColumnTests(TestMigrations):
     migrate_from = "0632_apitoken_backfill_last_chars"

+ 2 - 0
tests/sentry/migrations/test_0634_backfill_github_webhook_outbox_shard_ids.py

@@ -1,3 +1,4 @@
+import pytest
 from django.test import RequestFactory
 
 from sentry.models.outbox import ControlOutbox, WebhookProviderIdentifier
@@ -5,6 +6,7 @@ from sentry.testutils.cases import TestMigrations
 from sentry.testutils.silo import no_silo_test
 
 
+@pytest.mark.skip("Migration is no longer runnable. Retain until migration is removed.")
 @no_silo_test
 class TestBackfillGithubWebhookOutboxShardIds(TestMigrations):
     migrate_from = "0633_add_priority_locked_at_to_groupedmessage"

+ 58 - 0
tests/sentry/migrations/test_0641_backfill_group_attributes.py

@@ -0,0 +1,58 @@
+from sentry_sdk import Hub
+from snuba_sdk.legacy import json_to_snql
+
+from sentry.testutils.cases import SnubaTestCase, TestMigrations
+from sentry.utils import json, redis
+from sentry.utils.snuba import _snql_query
+
+
+def run_test(expected_groups):
+    project = expected_groups[0].project
+    json_body = {
+        "selected_columns": [
+            "group_id",
+        ],
+        "offset": 0,
+        "limit": 100,
+        "project": [project.id],
+        "dataset": "group_attributes",
+        "order_by": ["group_id"],
+        "consistent": True,
+        "tenant_ids": {
+            "referrer": "group_attributes",
+            "organization_id": project.organization_id,
+        },
+    }
+    request = json_to_snql(json_body, "group_attributes")
+    request.validate()
+    identity = lambda x: x
+    resp = _snql_query(((request, identity, identity), Hub(Hub.current), {}, "test_api"))[0]
+    assert resp.status == 200
+    data = json.loads(resp.data)["data"]
+    assert {g.id for g in expected_groups} == {d["group_id"] for d in data}
+
+
+class TestBackfillGroupAttributes(SnubaTestCase, TestMigrations):
+    migrate_from = "0640_index_together"
+    migrate_to = "0641_backfill_group_attributes"
+
+    def setup_initial_state(self):
+        self.group = self.create_group()
+        self.group_2 = self.create_group()
+
+    def test(self):
+        run_test([self.group, self.group_2])
+
+
+class TestBackfillGroupAttributesRetry(SnubaTestCase, TestMigrations):
+    migrate_from = "0640_index_together"
+    migrate_to = "0641_backfill_group_attributes"
+
+    def setup_initial_state(self):
+        self.group = self.create_group()
+        self.group_2 = self.create_group()
+        redis_client = redis.redis_clusters.get("default")
+        redis_client.set("backfill_group_attributes_to_snuba_progress", self.group.id)
+
+    def test_restart(self):
+        run_test([self.group_2])