Browse Source

perf(similarity): use mulithreadpool to do nodestore calls concurrently (#73394)

use a threadpool and chunk into batches of 5 for bigtable reads, with up
to 5 threads
Josh Ferge 6 days ago
parent
commit
ba7fcabdee

+ 6 - 0
src/sentry/options/defaults.py

@@ -2603,3 +2603,9 @@ register(
     default=0.0,
     default=0.0,
     flags=FLAG_AUTOMATOR_MODIFIABLE,
     flags=FLAG_AUTOMATOR_MODIFIABLE,
 )
 )
+
+register(
+    "similarity.backfill_nodestore_use_multithread",
+    default=False,
+    flags=FLAG_AUTOMATOR_MODIFIABLE,
+)

+ 45 - 19
src/sentry/tasks/embeddings_grouping/utils.py

@@ -1,5 +1,6 @@
 import logging
 import logging
 import time
 import time
+from concurrent.futures import ThreadPoolExecutor, as_completed
 from dataclasses import asdict
 from dataclasses import asdict
 from datetime import UTC, datetime, timedelta
 from datetime import UTC, datetime, timedelta
 from typing import Any, TypedDict
 from typing import Any, TypedDict
@@ -11,7 +12,7 @@ from redis.client import StrictRedis
 from rediscluster import RedisCluster
 from rediscluster import RedisCluster
 from snuba_sdk import Column, Condition, Entity, Limit, Op, Query, Request
 from snuba_sdk import Column, Condition, Entity, Limit, Op, Query, Request
 
 
-from sentry import features, nodestore
+from sentry import features, nodestore, options
 from sentry.conf.server import SEER_SIMILARITY_MODEL_VERSION
 from sentry.conf.server import SEER_SIMILARITY_MODEL_VERSION
 from sentry.eventstore.models import Event
 from sentry.eventstore.models import Event
 from sentry.grouping.grouping_info import get_grouping_info
 from sentry.grouping.grouping_info import get_grouping_info
@@ -377,26 +378,11 @@ def update_groups(project, seer_response, group_id_batch_filtered, group_hashes_
     )
     )
 
 
 
 
-@metrics.wraps(f"{BACKFILL_NAME}.lookup_event_bulk", sample_rate=1.0)
-@sentry_sdk.tracing.trace
-def lookup_group_data_stacktrace_bulk(
-    project: Project, rows: list[GroupEventRow]
-) -> dict[int, Event]:
-    project_id = project.id
-    node_id_to_group_data = {
-        Event.generate_node_id(project_id, event_id=row["event_id"]): (
-            row["event_id"],
-            row["group_id"],
-        )
-        for row in rows
-    }
-
-    groups_to_event = {}
-
+def _make_nodestore_call(project, node_keys):
     try:
     try:
         bulk_data = _retry_operation(
         bulk_data = _retry_operation(
             nodestore.backend.get_multi,
             nodestore.backend.get_multi,
-            list(node_id_to_group_data.keys()),
+            node_keys,
             retries=3,
             retries=3,
             delay=2,
             delay=2,
         )
         )
@@ -404,7 +390,7 @@ def lookup_group_data_stacktrace_bulk(
         extra = {
         extra = {
             "organization_id": project.organization.id,
             "organization_id": project.organization.id,
             "project_id": project.id,
             "project_id": project.id,
-            "group_data": json.dumps(rows),
+            "node_keys": json.dumps(node_keys),
             "error": e.message,
             "error": e.message,
         }
         }
         logger.exception(
         logger.exception(
@@ -413,6 +399,46 @@ def lookup_group_data_stacktrace_bulk(
         )
         )
         raise
         raise
 
 
+    return bulk_data
+
+
+def make_nodestore_call_multithreaded(project, node_keys):
+    def process_chunk(chunk):
+        return _make_nodestore_call(project, chunk)
+
+    chunk_size = 5
+    chunks = [node_keys[i : i + chunk_size] for i in range(0, len(node_keys), chunk_size)]
+
+    bulk_data = {}
+    with ThreadPoolExecutor(max_workers=5) as executor:
+        future_to_chunk = {executor.submit(process_chunk, chunk): chunk for chunk in chunks}
+        for future in as_completed(future_to_chunk):
+            bulk_data.update(future.result())
+
+    return bulk_data
+
+
+@metrics.wraps(f"{BACKFILL_NAME}.lookup_event_bulk", sample_rate=1.0)
+@sentry_sdk.tracing.trace
+def lookup_group_data_stacktrace_bulk(
+    project: Project, rows: list[GroupEventRow]
+) -> dict[int, Event]:
+    project_id = project.id
+    node_id_to_group_data = {
+        Event.generate_node_id(project_id, event_id=row["event_id"]): (
+            row["event_id"],
+            row["group_id"],
+        )
+        for row in rows
+    }
+
+    groups_to_event = {}
+
+    if options.get("similarity.backfill_nodestore_use_multithread"):
+        bulk_data = make_nodestore_call_multithreaded(project, list(node_id_to_group_data.keys()))
+    else:
+        bulk_data = _make_nodestore_call(project, list(node_id_to_group_data.keys()))
+
     for node_id, data in bulk_data.items():
     for node_id, data in bulk_data.items():
         if node_id in node_id_to_group_data:
         if node_id in node_id_to_group_data:
             event_id, group_id = (
             event_id, group_id = (

+ 29 - 0
tests/sentry/tasks/test_backfill_seer_grouping_records.py

@@ -196,6 +196,35 @@ class TestBackfillSeerGroupingRecords(SnubaTestCase, TestCase):
             "backfill_grouping_records._lookup_event_bulk.hit_ratio", 100, sample_rate=1.0
             "backfill_grouping_records._lookup_event_bulk.hit_ratio", 100, sample_rate=1.0
         )
         )
 
 
+    @patch("sentry.tasks.embeddings_grouping.utils.metrics")
+    @override_options({"similarity.backfill_nodestore_use_multithread": True})
+    def test_lookup_group_data_stacktrace_bulk_success_multithread(self, mock_metrics):
+        """Test successful bulk group data and stacktrace lookup"""
+        rows, events = self.bulk_rows, self.bulk_events
+        nodestore_results, _ = get_events_from_nodestore(
+            self.project, rows, self.group_hashes.keys()
+        )
+
+        expected_group_data = [
+            CreateGroupingRecordData(
+                group_id=event.group.id,
+                hash=self.group_hashes[event.group.id],
+                project_id=self.project.id,
+                message=event.title,
+                exception_type=get_path(event.data, "exception", "values", -1, "type"),
+            )
+            for event in events
+        ]
+        expected_stacktraces = [
+            f'Error{i}: error with value\n  File "function_{i}.py", function function_{i}'
+            for i in range(5)
+        ]
+        assert nodestore_results["data"] == expected_group_data
+        assert nodestore_results["stacktrace_list"] == expected_stacktraces
+        mock_metrics.gauge.assert_called_with(
+            "backfill_grouping_records._lookup_event_bulk.hit_ratio", 100, sample_rate=1.0
+        )
+
     @patch("time.sleep", return_value=None)
     @patch("time.sleep", return_value=None)
     @patch("sentry.tasks.embeddings_grouping.utils.logger")
     @patch("sentry.tasks.embeddings_grouping.utils.logger")
     @patch("sentry.nodestore.backend.get_multi")
     @patch("sentry.nodestore.backend.get_multi")