Browse Source

ref(similarity-embedding): Make backfill script smarter (#70992)

Sort groups to backfill by times_seen
Remove groups with times_seen = 1, and add metadata for these groups
Add different metadata for groups where a nearest neighbor is found
Jodi Jang 9 months ago
parent
commit
fb16c798fb

+ 4 - 4
src/sentry/api/endpoints/project_backfill_similar_issues_embeddings_records.py

@@ -29,13 +29,13 @@ class ProjectBackfillSimilarIssuesEmbeddingsRecords(ProjectEndpoint):
 
         # needs to either be a superuser or be in single org mode
 
-        last_processed_id = None
+        last_processed_index = None
         dry_run = False
-        if request.data.get("last_processed_id"):
-            last_processed_id = int(request.data["last_processed_id"])
+        if request.data.get("last_processed_index"):
+            last_processed_index = int(request.data["last_processed_index"])
 
         if request.data.get("dry_run"):
             dry_run = True
 
-        backfill_seer_grouping_records.delay(project.id, last_processed_id, dry_run)
+        backfill_seer_grouping_records.delay(project.id, last_processed_index, dry_run)
         return Response(status=204)

+ 2 - 0
src/sentry/seer/utils.py

@@ -210,6 +210,7 @@ class SeerSimilarIssuesMetadata:
 
 
 class CreateGroupingRecordData(TypedDict):
+    group_id: int
     hash: str
     project_id: int
     message: str
@@ -223,6 +224,7 @@ class CreateGroupingRecordsRequest(TypedDict):
 
 class BulkCreateGroupingRecordsResponse(TypedDict):
     success: bool
+    groups_with_neighbor: NotRequired[dict[str, RawSeerSimilarIssueData]]
 
 
 # TODO: Handle non-200 responses

+ 109 - 44
src/sentry/tasks/backfill_seer_grouping_records.py

@@ -1,5 +1,6 @@
 import logging
 import time
+from dataclasses import asdict
 from datetime import datetime, timedelta
 from typing import Any, TypedDict
 
@@ -11,6 +12,7 @@ from snuba_sdk.orderby import Direction, OrderBy
 
 from sentry import features, nodestore
 from sentry.api.endpoints.group_similar_issues_embeddings import get_stacktrace_string
+from sentry.conf.server import SEER_SIMILARITY_MODEL_VERSION
 from sentry.eventstore.models import Event
 from sentry.grouping.grouping_info import get_grouping_info
 from sentry.issues.grouptype import ErrorGroupType
@@ -21,6 +23,9 @@ from sentry.models.project import Project
 from sentry.seer.utils import (
     CreateGroupingRecordData,
     CreateGroupingRecordsRequest,
+    IncompleteSeerDataError,
+    SeerSimilarIssueData,
+    SimilarGroupNotFoundError,
     delete_grouping_records,
     post_bulk_grouping_records,
 )
@@ -58,17 +63,22 @@ class GroupStacktraceData(TypedDict):
     time_limit=60 * 15 + 5,
 )
 def backfill_seer_grouping_records(
-    project_id: int, last_processed_id: int | None, dry_run: bool = False, *args: Any, **kwargs: Any
+    project_id: int,
+    last_processed_index: int | None,
+    dry_run: bool = False,
+    *args: Any,
+    **kwargs: Any,
 ) -> None:
     """
     Task to backfill seer grouping_records table.
-    Pass in last_processed_id = 0 if running project for the first time, else None
+    Pass in last_processed_index = None if calling for the first time. This function will spawn
+    child tasks that will pass the last_processed_index
     """
     logger.info(
         "backfill_seer_grouping_records.start",
         extra={
             "project_id": project_id,
-            "last_processed_id": last_processed_id,
+            "last_processed_index": last_processed_index,
             "dry_run": dry_run,
         },
     )
@@ -81,10 +91,11 @@ def backfill_seer_grouping_records(
         return
 
     redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
-    if last_processed_id is None:
-        last_processed_id = int(redis_client.get(make_backfill_redis_key(project_id)) or 0)
 
-    if last_processed_id == 0 and dry_run:
+    if last_processed_index is None:
+        last_processed_index = int(redis_client.get(make_backfill_redis_key(project_id)) or 0)
+
+    if last_processed_index == 0 and dry_run:
         logger.info(
             "backfill_seer_grouping_records.delete_all_seer_records",
             extra={"project_id": project.id},
@@ -92,19 +103,36 @@ def backfill_seer_grouping_records(
         delete_grouping_records(project_id)
         redis_client.delete(make_backfill_redis_key(project_id))
 
-    group_id_message_data_batch = (
-        Group.objects.filter(
-            project_id=project.id, id__gt=last_processed_id, type=ErrorGroupType.type_id
+    if last_processed_index == 0:
+        # Set the metadata of groups where times_seen = 1
+        # Do not set the version number, so we can consider it for future backfills later
+        groups_seen_once = Group.objects.filter(
+            project_id=project_id, type=ErrorGroupType.type_id, times_seen=1
         )
+        for group in groups_seen_once:
+            seer_similarity_seen_once = {"times_seen_once": True}
+            if group.data.get("metadata"):
+                group.data["metadata"]["seer_similarity"] = seer_similarity_seen_once
+            else:
+                group.data["metadata"] = {"seer_similarity": seer_similarity_seen_once}
+
+        if not dry_run:
+            Group.objects.bulk_update(groups_seen_once, ["data"])
+
+    group_id_message_data = (
+        Group.objects.filter(project_id=project.id, type=ErrorGroupType.type_id, times_seen__gt=1)
         .values_list("id", "message", "data")
-        .order_by("id")[:BATCH_SIZE]
+        .order_by("times_seen")
     )
+    batch_end_index = min(last_processed_index + BATCH_SIZE, len(group_id_message_data))
+    group_id_message_data_batch = group_id_message_data[last_processed_index:batch_end_index]
+
     logger.info(
         "backfill_seer_grouping_records.batch",
         extra={
             "project_id": project.id,
             "batch_len": len(group_id_message_data_batch),
-            "last_processed_id": last_processed_id,
+            "last_processed_index": last_processed_index,
         },
     )
 
@@ -118,7 +146,7 @@ def backfill_seer_grouping_records(
     group_id_message_batch_filtered = {
         group_id: message
         for (group_id, message, data) in group_id_message_data_batch
-        if get_path(data, "metadata", "embeddings_info", "nn_model_version") is None
+        if get_path(data, "metadata", "seer_similarity", "similarity_model_version") is None
     }
     if len(group_id_message_data_batch) != len(group_id_message_batch_filtered):
         logger.info(
@@ -174,6 +202,10 @@ def backfill_seer_grouping_records(
             project, rows, group_id_message_batch_filtered, group_hashes_dict
         )
 
+        # If nodestore is down, we should stop
+        if data["data"] == [] and data["stacktrace_list"] == []:
+            return
+
         with metrics.timer(f"{BACKFILL_NAME}.post_bulk_grouping_records", sample_rate=1.0):
             response = post_bulk_grouping_records(
                 CreateGroupingRecordsRequest(
@@ -182,21 +214,44 @@ def backfill_seer_grouping_records(
                     stacktrace_list=data["stacktrace_list"],
                 )
             )
-        if response["success"]:
+
+        if response.get("success"):
+            groups_with_neighbor = response["groups_with_neighbor"]
             groups = Group.objects.filter(project_id=project.id, id__in=group_id_batch)
             for group in groups:
-                if group.data.get("metadata"):
-                    group.data["metadata"]["embeddings_info"] = {
-                        "nn_model_version": 0,
-                        "group_hash": json.dumps([group_hashes_dict[group.id]]),
-                    }
-                else:
-                    group.data["metadata"] = {
-                        "embeddings_info": {
-                            "nn_model_version": 0,
-                            "group_hash": json.dumps([group_hashes_dict[group.id]]),
-                        }
-                    }
+                seer_similarity = {
+                    "similarity_model_version": SEER_SIMILARITY_MODEL_VERSION,
+                    "request_hash": group_hashes_dict[group.id],
+                }
+                if str(group.id) in groups_with_neighbor:
+                    # TODO: remove this try catch once the helper is made
+                    try:
+                        seer_similarity["results"] = [
+                            asdict(
+                                SeerSimilarIssueData.from_raw(
+                                    project_id, groups_with_neighbor[str(group.id)]
+                                )
+                            )
+                        ]
+                    # TODO: if we reach this exception, we need to delete the record from seer or this will always happen
+                    # we should not update the similarity data for this group cause we'd want to try again once we delete it
+                    except (IncompleteSeerDataError, SimilarGroupNotFoundError):
+                        logger.exception(
+                            "tasks.backfill_seer_grouping_records.invalid_parent_group",
+                            extra={
+                                "project_id": project_id,
+                                "group_id": group.id,
+                                "parent_hash": groups_with_neighbor[str(group.id)]["parent_hash"],
+                            },
+                        )
+                        seer_similarity = {}
+
+                if seer_similarity:
+                    if group.data.get("metadata"):
+                        group.data["metadata"]["seer_similarity"] = seer_similarity
+                    else:
+                        group.data["metadata"] = {"seer_similarity": seer_similarity}
+
             if not dry_run:
                 num_updated = Group.objects.bulk_update(groups, ["data"])
                 logger.info(
@@ -204,24 +259,31 @@ def backfill_seer_grouping_records(
                     extra={"project_id": project.id, "num_updated": num_updated},
                 )
 
-        last_processed_id = group_id_message_data_batch[len(group_id_message_data_batch) - 1][0]
-        redis_client.set(
-            f"{make_backfill_redis_key(project_id)}",
-            last_processed_id if last_processed_id is not None else 0,
-            ex=60 * 60 * 24 * 7,
-        )
+            last_processed_index = batch_end_index
+            redis_client.set(
+                f"{make_backfill_redis_key(project_id)}",
+                last_processed_index if last_processed_index is not None else 0,
+                ex=60 * 60 * 24 * 7,
+            )
 
-        logger.info(
-            "calling next backfill task",
-            extra={
-                "project_id": project.id,
-                "last_processed_id": last_processed_id,
-                "dry_run": dry_run,
-            },
-        )
-        backfill_seer_grouping_records.apply_async(
-            args=[project.id, last_processed_id, dry_run],
-        )
+            if last_processed_index <= len(group_id_message_data):
+                logger.info(
+                    "calling next backfill task",
+                    extra={
+                        "project_id": project.id,
+                        "last_processed_index": last_processed_index,
+                        "dry_run": dry_run,
+                    },
+                )
+                backfill_seer_grouping_records.apply_async(
+                    args=[project.id, last_processed_index, dry_run],
+                )
+        else:
+            # If seer is down, we should stop
+            logger.info(
+                "backfill_seer_bulk_insert_returned_invald_result",
+                extra={"project_id": project.id},
+            )
     else:
         logger.info(
             "backfill_seer_snuba_returned_empty_result",
@@ -327,6 +389,7 @@ def lookup_group_data_stacktrace_bulk(
                         continue
                     group_data.append(
                         CreateGroupingRecordData(
+                            group_id=group_id,
                             project_id=project_id,
                             message=messages[group_id],
                             hash=hashes[group_id],
@@ -383,7 +446,9 @@ def lookup_group_data_stacktrace_single(
             grouping_info = get_grouping_info(None, project=project, event=event)
         stacktrace_string = get_stacktrace_string(grouping_info)
         group_data = (
-            CreateGroupingRecordData(hash=hash, project_id=project_id, message=message)
+            CreateGroupingRecordData(
+                group_id=group_id, hash=hash, project_id=project_id, message=message
+            )
             if stacktrace_string != ""
             else None
         )
@@ -403,5 +468,5 @@ def lookup_event(project_id: int, event_id: str, group_id: int) -> Event:
 
 
 def make_backfill_redis_key(project_id):
-    redis_key = "grouping_record_backfill.last_processed_id"
+    redis_key = "grouping_record_backfill.last_processed_index"
     return f"{redis_key}-{project_id}"

+ 5 - 5
tests/sentry/api/endpoints/test_project_backfill_similar_issues_embeddings_records.py

@@ -41,7 +41,7 @@ class ProjectBackfillSimilarIssuesEmbeddingsRecordsTest(APITestCase):
         "sentry.api.endpoints.project_backfill_similar_issues_embeddings_records.backfill_seer_grouping_records.delay"
     )
     @with_feature("projects:similarity-embeddings-backfill")
-    def test_post_success_no_last_processed_id(
+    def test_post_success_no_last_processed_index(
         self, mock_backfill_seer_grouping_records, mock_is_active_superuser
     ):
         response = self.client.post(self.url, data={})
@@ -53,7 +53,7 @@ class ProjectBackfillSimilarIssuesEmbeddingsRecordsTest(APITestCase):
     )
     @with_feature("projects:similarity-embeddings-backfill")
     @override_settings(SENTRY_SINGLE_ORGANIZATION=True)
-    def test_post_success_no_last_processed_id_single_org(
+    def test_post_success_no_last_processed_index_single_org(
         self, mock_backfill_seer_grouping_records
     ):
         response = self.client.post(self.url, data={})
@@ -68,10 +68,10 @@ class ProjectBackfillSimilarIssuesEmbeddingsRecordsTest(APITestCase):
         "sentry.api.endpoints.project_backfill_similar_issues_embeddings_records.backfill_seer_grouping_records.delay"
     )
     @with_feature("projects:similarity-embeddings-backfill")
-    def test_post_success_last_processed_id(
+    def test_post_success_last_processed_index(
         self, mock_backfill_seer_grouping_records, mock_is_active_superuser
     ):
-        response = self.client.post(self.url, data={"last_processed_id": "8"})
+        response = self.client.post(self.url, data={"last_processed_index": "8"})
         assert response.status_code == 204, response.content
         mock_backfill_seer_grouping_records.assert_called_with(self.project.id, 8, False)
 
@@ -86,6 +86,6 @@ class ProjectBackfillSimilarIssuesEmbeddingsRecordsTest(APITestCase):
     def test_post_success_dry_run(
         self, mock_backfill_seer_grouping_records, mock_is_active_superuser
     ):
-        response = self.client.post(self.url, data={"last_processed_id": "8", "dry_run": "true"})
+        response = self.client.post(self.url, data={"last_processed_index": "8", "dry_run": "true"})
         assert response.status_code == 204, response.content
         mock_backfill_seer_grouping_records.assert_called_with(self.project.id, 8, True)

+ 6 - 3
tests/sentry/seer/test_utils.py

@@ -29,8 +29,8 @@ DUMMY_POOL = ConnectionPool("dummy")
 CREATE_GROUPING_RECORDS_REQUEST_PARAMS: CreateGroupingRecordsRequest = {
     "group_id_list": [1, 2],
     "data": [
-        {"hash": "hash-1", "project_id": 1, "message": "message"},
-        {"hash": "hash-2", "project_id": 1, "message": "message 2"},
+        {"group_id": 1, "hash": "hash-1", "project_id": 1, "message": "message"},
+        {"group_id": 2, "hash": "hash-2", "project_id": 1, "message": "message 2"},
     ],
     "stacktrace_list": ["stacktrace 1", "stacktrace 2"],
 }
@@ -381,7 +381,10 @@ def test_from_raw_nonexistent_group(default_project):
 @mock.patch("sentry.seer.utils.logger")
 @mock.patch("sentry.seer.utils.seer_grouping_connection_pool.urlopen")
 def test_post_bulk_grouping_records_success(mock_seer_request, mock_logger):
-    expected_return_value = {"success": True}
+    expected_return_value = {
+        "success": True,
+        "groups_with_neighbor": {"1": "00000000000000000000000000000000"},
+    }
     mock_seer_request.return_value = HTTPResponse(
         json.dumps(expected_return_value).encode("utf-8"), status=200
     )

+ 283 - 39
tests/sentry/tasks/test_backfill_seer_grouping_records.py

@@ -10,11 +10,12 @@ from django.conf import settings
 from google.api_core.exceptions import DeadlineExceeded, ServiceUnavailable
 
 from sentry.api.endpoints.group_similar_issues_embeddings import get_stacktrace_string
+from sentry.conf.server import SEER_SIMILARITY_MODEL_VERSION
 from sentry.grouping.grouping_info import get_grouping_info
 from sentry.issues.occurrence_consumer import EventLookupError
 from sentry.models.group import Group
 from sentry.models.grouphash import GroupHash
-from sentry.seer.utils import CreateGroupingRecordData
+from sentry.seer.utils import CreateGroupingRecordData, RawSeerSimilarIssueData
 from sentry.tasks.backfill_seer_grouping_records import (
     GroupStacktraceData,
     backfill_seer_grouping_records,
@@ -80,7 +81,10 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
         }
 
     def create_group_event_rows(self, num: int) -> Mapping[str, Any]:
-        """Create num events and their corresponding group rows"""
+        """
+        Create num events and their corresponding group rows. Set times_seen for the corresponding
+        group to 5.
+        """
         rows, events, messages = [], [], {}
         function_names = [f"function_{str(i)}" for i in range(num)]
         type_names = [f"Error{str(i)}" for i in range(num)]
@@ -93,6 +97,8 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
             }
             event = self.store_event(data=data, project_id=self.project.id, assert_no_errors=False)
             events.append(event)
+            event.group.times_seen = 5
+            event.group.save()
             messages.update({event.group.id: event.group.message})
             rows.append(
                 {
@@ -119,6 +125,8 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
         self.event = self.store_event(
             data={"exception": EXCEPTION}, project_id=self.project.id, assert_no_errors=False
         )
+        self.event.group.times_seen = 5
+        self.event.group.save()
         group_hashes = GroupHash.objects.all().distinct("group_id")
         self.group_hashes = {group_hash.group_id: group_hash.hash for group_hash in group_hashes}
 
@@ -146,7 +154,10 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
             self.project, event.event_id, event.group_id, event.group.message, hash
         )
         expected_group_data = CreateGroupingRecordData(
-            hash=hash, project_id=self.project.id, message=event.group.message
+            group_id=event.group.id,
+            hash=hash,
+            project_id=self.project.id,
+            message=event.group.message,
         )
         assert group_data == expected_group_data
         assert stacktrace_string == EXCEPTION_STACKTRACE_STRING
@@ -218,6 +229,7 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
         expected_event_ids = {event.event_id for event in events}
         expected_group_data = [
             CreateGroupingRecordData(
+                group_id=event.group.id,
                 hash=self.group_hashes[event.group.id],
                 project_id=self.project.id,
                 message=event.group.message,
@@ -236,9 +248,12 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
             "backfill_grouping_records._lookup_event_bulk.hit_ratio", 100, sample_rate=1.0
         )
 
+    @patch("time.sleep", return_value=None)
     @patch("sentry.nodestore.backend.get_multi")
     @patch("sentry.tasks.backfill_seer_grouping_records.logger")
-    def test_lookup_group_data_stacktrace_bulk_exceptions(self, mock_logger, mock_get_multi):
+    def test_lookup_group_data_stacktrace_bulk_exceptions(
+        self, mock_logger, mock_get_multi, mock_sleep
+    ):
         """
         Test cases where ServiceUnavailable or DeadlineExceeded exceptions occur in bulk data
         lookup
@@ -298,7 +313,10 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
         ) = lookup_group_data_stacktrace_bulk(self.project, rows, messages, hashes)
         expected_group_data = [
             CreateGroupingRecordData(
-                hash=hashes[event.group.id], project_id=self.project.id, message=event.group.message
+                group_id=event.group.id,
+                hash=hashes[event.group.id],
+                project_id=self.project.id,
+                message=event.group.message,
             )
             for event in events
         ]
@@ -335,7 +353,10 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
         ) = lookup_group_data_stacktrace_bulk(self.project, rows, messages, hashes)
         expected_group_data = [
             CreateGroupingRecordData(
-                hash=hashes[event.group.id], project_id=self.project.id, message=event.group.message
+                group_id=event.group.id,
+                hash=hashes[event.group.id],
+                project_id=self.project.id,
+                message=event.group.message,
             )
             for event in events
         ]
@@ -371,7 +392,10 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
         ) = lookup_group_data_stacktrace_bulk(self.project, rows, messages, hashes)
         expected_group_data = [
             CreateGroupingRecordData(
-                hash=hashes[event.group.id], project_id=self.project.id, message=event.group.message
+                group_id=event.group.id,
+                hash=hashes[event.group.id],
+                project_id=self.project.id,
+                message=event.group.message,
             )
             for event in events
         ]
@@ -398,7 +422,10 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
 
         expected_group_data = [
             CreateGroupingRecordData(
-                hash=hashes[event.group.id], project_id=self.project.id, message=event.group.message
+                group_id=event.group.id,
+                hash=hashes[event.group.id],
+                project_id=self.project.id,
+                message=event.group.message,
             )
             for event in events
         ]
@@ -425,6 +452,7 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
             stacktrace_string = get_stacktrace_string(grouping_info)
             group_data.append(
                 CreateGroupingRecordData(
+                    group_id=event.group.id,
                     hash=self.group_hashes[event.group.id],
                     project_id=self.project.id,
                     message=event.group.message,
@@ -445,7 +473,10 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
         events = self.bulk_events
         expected_group_data = [
             CreateGroupingRecordData(
-                hash=hashes[event.group.id], project_id=self.project.id, message=event.group.message
+                group_id=event.group.id,
+                hash=hashes[event.group.id],
+                project_id=self.project.id,
+                message=event.group.message,
             )
             for event in events
         ]
@@ -473,6 +504,7 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
             stacktrace_string = get_stacktrace_string(grouping_info)
             group_data.append(
                 CreateGroupingRecordData(
+                    group_id=event.group.id,
                     hash=self.group_hashes[event.group.id],
                     project_id=self.project.id,
                     message=event.group.message,
@@ -497,7 +529,10 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
         events = self.bulk_events[:-1]
         expected_group_data = [
             CreateGroupingRecordData(
-                hash=hashes[event.group.id], project_id=self.project.id, message=event.group.message
+                group_id=event.group.id,
+                hash=hashes[event.group.id],
+                project_id=self.project.id,
+                message=event.group.message,
             )
             for event in events
         ]
@@ -539,7 +574,10 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
         events = self.bulk_events[:-1]
         expected_group_data = [
             CreateGroupingRecordData(
-                hash=hashes[event.group.id], project_id=self.project.id, message=event.group.message
+                group_id=event.group.id,
+                hash=hashes[event.group.id],
+                project_id=self.project.id,
+                message=event.group.message,
             )
             for event in events
         ]
@@ -562,47 +600,47 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
     @django_db_all
     @with_feature("projects:similarity-embeddings-backfill")
     @patch("sentry.tasks.backfill_seer_grouping_records.post_bulk_grouping_records")
-    def test_backfill_seer_grouping_records_success(self, mock_post_bulk_grouping_records):
+    def test_backfill_seer_grouping_records_success_simple(self, mock_post_bulk_grouping_records):
         """
         Test that the metadata is set for all groups showing that the record has been created.
         """
-        mock_post_bulk_grouping_records.return_value = {"success": True}
+        mock_post_bulk_grouping_records.return_value = {"success": True, "groups_with_neighbor": {}}
 
         with TaskRunner():
-            backfill_seer_grouping_records(self.project.id, 0)
+            backfill_seer_grouping_records(self.project.id, None)
 
-        for group in Group.objects.filter(project_id=self.project.id):
-            assert group.data["metadata"].get("embeddings_info") == {
-                "nn_model_version": 0,
-                "group_hash": json.dumps([self.group_hashes[group.id]]),
+        groups = Group.objects.filter(project_id=self.project.id)
+        for group in groups:
+            assert group.data["metadata"].get("seer_similarity") == {
+                "similarity_model_version": SEER_SIMILARITY_MODEL_VERSION,
+                "request_hash": self.group_hashes[group.id],
             }
         redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
-        last_processed_id = int(redis_client.get(make_backfill_redis_key(self.project.id)) or 0)
-        assert last_processed_id != 0
+        last_processed_index = int(redis_client.get(make_backfill_redis_key(self.project.id)) or 0)
+        assert last_processed_index == len(groups)
 
     @django_db_all
-    @patch(
-        "sentry.tasks.backfill_seer_grouping_records.lookup_group_data_stacktrace_bulk_with_fallback"
-    )
+    @patch("time.sleep", return_value=None)
+    @patch("sentry.nodestore.backend.get_multi")
+    @patch("sentry.tasks.backfill_seer_grouping_records.lookup_event")
     def test_backfill_seer_grouping_records_failure(
-        self, mock_lookup_group_data_stacktrace_bulk_with_fallback
+        self, mock_lookup_event, mock_get_multi, mock_sleep
     ):
         """
         Test that the group metadata and redis last processed id aren't updated on a failure.
         """
-        mock_lookup_group_data_stacktrace_bulk_with_fallback.side_effect = ServiceUnavailable(
-            message="Service Unavailable"
-        )
+        mock_lookup_event.side_effect = ServiceUnavailable(message="Service Unavailable")
+        mock_get_multi.side_effect = ServiceUnavailable(message="Service Unavailable")
 
         with TaskRunner():
-            backfill_seer_grouping_records(self.project.id, 0)
+            backfill_seer_grouping_records(self.project.id, None)
 
         redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
-        last_processed_id = int(redis_client.get(make_backfill_redis_key(self.project.id)) or 0)
-        assert last_processed_id == 0
+        last_processed_index = int(redis_client.get(make_backfill_redis_key(self.project.id)) or 0)
+        assert last_processed_index == 0
 
         for group in Group.objects.filter(project_id=self.project.id):
-            assert not group.data["metadata"].get("embeddings_info")
+            assert not group.data["metadata"].get("seer_similarity")
 
     @django_db_all
     def test_backfill_seer_grouping_records_no_feature(self):
@@ -612,10 +650,10 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
         project = self.create_project(organization=self.organization)
 
         with TaskRunner():
-            backfill_seer_grouping_records(project, 0)
+            backfill_seer_grouping_records(project, None)
 
         for group in Group.objects.filter(project_id=self.project.id):
-            assert not group.data["metadata"].get("embeddings_info")
+            assert not group.data["metadata"].get("seer_similarity")
 
     @django_db_all
     @with_feature("projects:similarity-embeddings-backfill")
@@ -627,16 +665,222 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
         """
         Test that the metadata is set for all groups showing that the record has been created.
         """
-        mock_post_bulk_grouping_records.return_value = {"success": True}
+        mock_post_bulk_grouping_records.return_value = {"success": True, "groups_with_neighbor": []}
         mock_delete_grouping_records.return_value = True
         with TaskRunner():
             backfill_seer_grouping_records(self.project.id, 0, dry_run=True)
 
+        groups = Group.objects.filter(project_id=self.project.id)
+        for group in groups:
+            assert not group.data["metadata"].get("seer_similarity") == {
+                "similarity_model_version": SEER_SIMILARITY_MODEL_VERSION,
+                "request_hash": self.group_hashes[group.id],
+            }
+        redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
+        last_processed_index = int(redis_client.get(make_backfill_redis_key(self.project.id)) or 0)
+        assert last_processed_index == len(groups)
+
+    @with_feature("projects:similarity-embeddings-backfill")
+    @patch("sentry.tasks.backfill_seer_grouping_records.post_bulk_grouping_records")
+    def test_backfill_seer_grouping_records_groups_1_times_seen(
+        self, mock_post_bulk_grouping_records
+    ):
+        """
+        Test that different metadata is set for groups where times_seen > 1 and times_seen == 1.
+        """
+        mock_post_bulk_grouping_records.return_value = {"success": True, "groups_with_neighbor": {}}
+
+        function_names = [f"new_function_{str(i)}" for i in range(5)]
+        type_names = [f"NewError{str(i)}" for i in range(5)]
+        value_names = ["error with value" for i in range(5)]
+        groups_seen_once = []
+        for i in range(5):
+            data = {
+                "exception": self.create_exception_values(
+                    function_names[i], type_names[i], value_names[i]
+                )
+            }
+            event = self.store_event(data=data, project_id=self.project.id, assert_no_errors=False)
+            groups_seen_once.append(event.group)
+
+        with TaskRunner():
+            backfill_seer_grouping_records(self.project.id, None)
+
         for group in Group.objects.filter(project_id=self.project.id):
-            assert not group.data["metadata"].get("embeddings_info") == {
-                "nn_model_version": 0,
-                "group_hash": json.dumps([self.group_hashes[group.id]]),
+            if group not in groups_seen_once:
+                assert group.data["metadata"].get("seer_similarity") == {
+                    "similarity_model_version": SEER_SIMILARITY_MODEL_VERSION,
+                    "request_hash": self.group_hashes[group.id],
+                }
+            else:
+                assert group.data["metadata"].get("seer_similarity") == {"times_seen_once": True}
+
+        redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
+        last_processed_index = int(redis_client.get(make_backfill_redis_key(self.project.id)) or 0)
+        assert last_processed_index == len(
+            Group.objects.filter(project_id=self.project.id, times_seen__gt=1)
+        )
+
+    @with_feature("projects:similarity-embeddings-backfill")
+    @patch("sentry.tasks.backfill_seer_grouping_records.post_bulk_grouping_records")
+    def test_backfill_seer_grouping_records_groups_have_neighbor(
+        self, mock_post_bulk_grouping_records
+    ):
+        """
+        Test that groups that have nearest neighbors, do not get records created for them in
+        grouping_records.
+        Test that the metadata of groups that have nearest neighbors and those that have records
+        created are different.
+        """
+        # Create groups with 1 < times_seen < 5
+        # The groups that will be similar to these groups, have times_seen = 5
+        function_names = [f"another_function_{str(i)}" for i in range(5)]
+        type_names = [f"AnotherError{str(i)}" for i in range(5)]
+        value_names = ["error with value" for i in range(5)]
+        groups_with_neighbor = {}
+        for i in range(5):
+            data = {
+                "exception": self.create_exception_values(
+                    function_names[i], type_names[i], value_names[i]
+                )
+            }
+            event = self.store_event(data=data, project_id=self.project.id, assert_no_errors=False)
+            event.group.times_seen = 2
+            event.group.save()
+            # Arbitrarily choose a parent group's hash that has times_seen = 5
+            parent_group = Group.objects.filter(times_seen__gt=2).first()
+            parent_group_hash = GroupHash.objects.filter(group_id=parent_group.id).first()
+            groups_with_neighbor[str(event.group.id)] = RawSeerSimilarIssueData(
+                stacktrace_distance=0.01,
+                message_distance=0.01,
+                should_group=True,
+                parent_hash=parent_group_hash.hash,
+            )
+
+        mock_post_bulk_grouping_records.return_value = {
+            "success": True,
+            "groups_with_neighbor": groups_with_neighbor,
+        }
+
+        with TaskRunner():
+            backfill_seer_grouping_records(self.project.id, None)
+
+        groups = Group.objects.filter(project_id=self.project.id, times_seen__gt=1)
+        for group in groups:
+            if str(group.id) not in groups_with_neighbor:
+                assert group.data["metadata"].get("seer_similarity") == {
+                    "similarity_model_version": SEER_SIMILARITY_MODEL_VERSION,
+                    "request_hash": self.group_hashes[group.id],
+                }
+            else:
+                request_hash = GroupHash.objects.get(group_id=group.id).hash
+                parent_group_id = Group.objects.filter(times_seen__gt=2).first().id
+                assert group.data["metadata"].get("seer_similarity") == {
+                    "similarity_model_version": SEER_SIMILARITY_MODEL_VERSION,
+                    "request_hash": request_hash,
+                    "results": [
+                        {
+                            "stacktrace_distance": 0.01,
+                            "message_distance": 0.01,
+                            "should_group": True,
+                            "parent_hash": groups_with_neighbor[str(group.id)]["parent_hash"],
+                            "parent_group_id": parent_group_id,
+                        }
+                    ],
+                }
+
+        redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
+        last_processed_index = int(redis_client.get(make_backfill_redis_key(self.project.id)) or 0)
+        assert last_processed_index == len(groups)
+
+    @with_feature("projects:similarity-embeddings-backfill")
+    @patch("sentry.tasks.backfill_seer_grouping_records.logger")
+    @patch("sentry.tasks.backfill_seer_grouping_records.post_bulk_grouping_records")
+    def test_backfill_seer_grouping_records_groups_has_invalid_neighbor(
+        self, mock_post_bulk_grouping_records, mock_logger
+    ):
+        """
+        Test that groups that have nearest neighbors that do not exist, do not have their metadata
+        updated.
+        """
+        # Create group with 1 < times_seen < 5
+        group_with_neighbor = {}
+        data = {
+            "exception": self.create_exception_values(
+                "another_function!", "AnotherError!", "error with value"
+            )
+        }
+        event = self.store_event(data=data, project_id=self.project.id, assert_no_errors=False)
+        event.group.times_seen = 2
+        event.group.save()
+        # Make the similar group a hash that does not exist
+        group_with_neighbor[str(event.group.id)] = RawSeerSimilarIssueData(
+            stacktrace_distance=0.01,
+            message_distance=0.01,
+            should_group=True,
+            parent_hash="00000000000000000000000000000000",
+        )
+
+        mock_post_bulk_grouping_records.return_value = {
+            "success": True,
+            "groups_with_neighbor": group_with_neighbor,
+        }
+
+        with TaskRunner():
+            backfill_seer_grouping_records(self.project.id, None)
+
+        groups = Group.objects.filter(project_id=self.project.id, times_seen__gt=1)
+        for group in groups:
+            if str(group.id) not in group_with_neighbor:
+                assert group.data["metadata"].get("seer_similarity") == {
+                    "similarity_model_version": SEER_SIMILARITY_MODEL_VERSION,
+                    "request_hash": self.group_hashes[group.id],
+                }
+            else:
+                assert group.data["metadata"].get("seer_similarity") is None
+                mock_logger.exception.assert_called_with(
+                    "tasks.backfill_seer_grouping_records.invalid_parent_group",
+                    extra={
+                        "project_id": self.project.id,
+                        "group_id": group.id,
+                        "parent_hash": "00000000000000000000000000000000",
+                    },
+                )
+
+        redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
+        last_processed_index = int(redis_client.get(make_backfill_redis_key(self.project.id)) or 0)
+        assert last_processed_index == len(groups)
+
+    @django_db_all
+    @with_feature("projects:similarity-embeddings-backfill")
+    @patch("sentry.tasks.backfill_seer_grouping_records.post_bulk_grouping_records")
+    def test_backfill_seer_grouping_records_multiple_batches(self, mock_post_bulk_grouping_records):
+        """
+        Test that the metadata is set for all 21 groups showing that the record has been created,
+        where 21 > the batch size, 20.
+        """
+        function_names = [f"another_function_{str(i)}" for i in range(10)]
+        type_names = [f"AnotherError{str(i)}" for i in range(10)]
+        value_names = ["error with value" for _ in range(10)]
+        for i in range(10):
+            data = {
+                "exception": self.create_exception_values(
+                    function_names[i], type_names[i], value_names[i]
+                )
             }
+            event = self.store_event(data=data, project_id=self.project.id, assert_no_errors=False)
+            event.group.times_seen = 2
+            event.group.save()
+
+        mock_post_bulk_grouping_records.return_value = {"success": True, "groups_with_neighbor": {}}
+
+        with TaskRunner():
+            backfill_seer_grouping_records(self.project.id, None)
+
+        groups = Group.objects.filter(project_id=self.project.id)
+        for group in groups:
+            assert group.data["metadata"].get("seer_similarity") is not None
+
         redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
-        last_processed_id = int(redis_client.get(make_backfill_redis_key(self.project.id)) or 0)
-        assert last_processed_id != 0
+        last_processed_index = int(redis_client.get(make_backfill_redis_key(self.project.id)) or 0)
+        assert last_processed_index == len(groups)