Browse Source

perf(seer): add multithreading for making seer requests (#73611)

we have multiple seer instances, so let's parallelize the request we
make to seer.
Josh Ferge 8 months ago
parent
commit
c2ccd17ae0

+ 10 - 0
src/sentry/options/defaults.py

@@ -2622,3 +2622,13 @@ register(
     default=20,
     flags=FLAG_AUTOMATOR_MODIFIABLE,
 )
+register(
+    "similarity.backfill_seer_chunk_size",
+    default=30,
+    flags=FLAG_AUTOMATOR_MODIFIABLE,
+)
+register(
+    "similarity.backfill_seer_threads",
+    default=1,
+    flags=FLAG_AUTOMATOR_MODIFIABLE,
+)

+ 12 - 5
src/sentry/tasks/embeddings_grouping/backfill_seer_grouping_records_for_project.py

@@ -21,6 +21,7 @@ from sentry.tasks.embeddings_grouping.utils import (
     make_backfill_grouping_index_redis_key,
     make_backfill_project_index_redis_key,
     send_group_and_stacktrace_to_seer,
+    send_group_and_stacktrace_to_seer_multithreaded,
     update_groups,
 )
 
@@ -170,11 +171,17 @@ def backfill_seer_grouping_records_for_project(
         if group_id in group_hashes_dict
     ]
 
-    seer_response = send_group_and_stacktrace_to_seer(
-        project,
-        groups_to_backfill_with_no_embedding_has_snuba_row_and_nodestore_row,
-        nodestore_results,
-    )
+    if options.get("similarity.backfill_seer_threads") > 1:
+        seer_response = send_group_and_stacktrace_to_seer_multithreaded(
+            groups_to_backfill_with_no_embedding_has_snuba_row_and_nodestore_row,
+            nodestore_results,
+        )
+    else:
+        seer_response = send_group_and_stacktrace_to_seer(
+            groups_to_backfill_with_no_embedding_has_snuba_row_and_nodestore_row,
+            nodestore_results,
+        )
+
     if not seer_response.get("success"):
         logger.info(
             "backfill_seer_grouping_records.seer_failed",

+ 60 - 1
src/sentry/tasks/embeddings_grouping/utils.py

@@ -317,7 +317,7 @@ def get_events_from_nodestore(
 @sentry_sdk.tracing.trace
 @metrics.wraps(f"{BACKFILL_NAME}.send_group_and_stacktrace_to_seer", sample_rate=1.0)
 def send_group_and_stacktrace_to_seer(
-    project, groups_to_backfill_with_no_embedding_has_snuba_row_and_nodestore_row, nodestore_results
+    groups_to_backfill_with_no_embedding_has_snuba_row_and_nodestore_row, nodestore_results
 ):
     seer_response = post_bulk_grouping_records(
         CreateGroupingRecordsRequest(
@@ -329,6 +329,65 @@ def send_group_and_stacktrace_to_seer(
     return seer_response
 
 
+@sentry_sdk.tracing.trace
+@metrics.wraps(f"{BACKFILL_NAME}.send_group_and_stacktrace_to_seer", sample_rate=1.0)
+def send_group_and_stacktrace_to_seer_multithreaded(
+    groups_to_backfill_with_no_embedding_has_snuba_row_and_nodestore_row, nodestore_results
+):
+    def process_chunk(chunk_data, chunk_stacktrace):
+        return post_bulk_grouping_records(
+            CreateGroupingRecordsRequest(
+                group_id_list=chunk_data["group_ids"],
+                data=chunk_data["data"],
+                stacktrace_list=chunk_stacktrace,
+            )
+        )
+
+    chunk_size = options.get("similarity.backfill_seer_chunk_size")
+    chunks = [
+        {
+            "group_ids": groups_to_backfill_with_no_embedding_has_snuba_row_and_nodestore_row[
+                i : i + chunk_size
+            ],
+            "data": nodestore_results["data"][i : i + chunk_size],
+        }
+        for i in range(
+            0,
+            len(groups_to_backfill_with_no_embedding_has_snuba_row_and_nodestore_row),
+            chunk_size,
+        )
+    ]
+    stacktrace_chunks = [
+        nodestore_results["stacktrace_list"][i : i + chunk_size]
+        for i in range(0, len(nodestore_results["stacktrace_list"]), chunk_size)
+    ]
+
+    seer_responses = []
+    with ThreadPoolExecutor(
+        max_workers=options.get("similarity.backfill_seer_threads")
+    ) as executor:
+        future_to_chunk = {
+            executor.submit(process_chunk, chunk, stacktrace_chunks[i]): chunk
+            for i, chunk in enumerate(chunks)
+        }
+        for future in as_completed(future_to_chunk):
+            chunk_response = future.result()
+            seer_responses.append(chunk_response)
+
+    aggregated_response: dict[str, Any] = {
+        "success": True,
+        "groups_with_neighbor": {},
+    }
+    for seer_response in seer_responses:
+        if not seer_response["success"]:
+            aggregated_response["success"] = False
+            return aggregated_response
+
+        aggregated_response["groups_with_neighbor"].update(seer_response["groups_with_neighbor"])
+
+    return aggregated_response
+
+
 @sentry_sdk.tracing.trace
 def update_groups(project, seer_response, group_id_batch_filtered, group_hashes_dict):
     groups_with_neighbor = seer_response["groups_with_neighbor"]

+ 3 - 0
tests/sentry/tasks/test_backfill_seer_grouping_records.py

@@ -475,6 +475,9 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
     @with_feature("projects:similarity-embeddings-backfill")
     @patch("sentry.tasks.embeddings_grouping.backfill_seer_grouping_records_for_project.logger")
     @patch("sentry.tasks.embeddings_grouping.utils.post_bulk_grouping_records")
+    @override_options(
+        {"similarity.backfill_seer_threads": 2, "similarity.backfill_seer_chunk_size": 10}
+    )
     def test_backfill_seer_grouping_records_success_cohorts_simple(
         self, mock_post_bulk_grouping_records, mock_logger
     ):