Просмотр исходного кода

feat(txnames): Write transaction names to redis (#41921)

We want to automatically detect and remove identifiers from URL
transaction names, e.g. `/auth/login/user123`. For this purpose, we need
to collect a set of recent transaction names for every project that uses
URL transaction names:

1. Hook into the `transaction_processed` signal in post-processing to
observe indexed transactions.
2. If an observed transaction has `transaction_info.source: "url"`, add
it to a redis set.
3. As soon as the redis set surpasses a certain size (1000), evict one
random element from the set, to keep its size limited.

NOTE: These redis sets are not meant to be stored on the default
cluster, so deploy a dedicated cluster before enabling the feature flag
in this PR.

Not in this PR: Connecting the redis datasource to the actual clustering
algorithm.

Co-authored-by: Oleksandr Kylymnychenko <oleksandr@sentry.io>
Co-authored-by: Iker Barriocanal <32816711+iker-barriocanal@users.noreply.github.com>
Joris Bayer 2 лет назад
Родитель
Сommit
96b4e6941c

+ 1 - 1
src/sentry/api/endpoints/project_transaction_names.py

@@ -5,7 +5,7 @@ from rest_framework.response import Response
 from sentry.api.base import region_silo_endpoint
 from sentry.api.bases.project import ProjectEndpoint
 from sentry.api.utils import get_date_range_from_stats_period
-from sentry.ingest.transaction_clusterer.datasource import fetch_unique_transaction_names
+from sentry.ingest.transaction_clusterer.datasource.snuba import fetch_unique_transaction_names
 from sentry.ingest.transaction_clusterer.tree import TreeClusterer
 
 

+ 2 - 0
src/sentry/conf/server.py

@@ -1038,6 +1038,8 @@ SENTRY_FEATURES = {
     "organizations:metrics-extraction": False,
     # Normalize transaction names during ingestion.
     "organizations:transaction-name-normalize": False,
+    # Try to derive normalization rules by clustering transaction names.
+    "organizations:transaction-name-clusterer": False,
     # Extraction metrics for transactions during ingestion.
     "organizations:transaction-metrics-extraction": False,
     # Allow performance alerts to be created on the metrics dataset. Allows UI to switch between

+ 1 - 0
src/sentry/features/__init__.py

@@ -165,6 +165,7 @@ default_manager.add("organizations:slack-overage-notifications", OrganizationFea
 default_manager.add("organizations:symbol-sources", OrganizationFeature)
 default_manager.add("organizations:team-roles", OrganizationFeature, True)
 default_manager.add("organizations:transaction-name-normalize", OrganizationFeature, True)
+default_manager.add("organizations:transaction-name-clusterer", OrganizationFeature, True)
 default_manager.add("organizations:transaction-metrics-extraction", OrganizationFeature)
 default_manager.add("organizations:unified-span-view", OrganizationFeature, True)
 default_manager.add("organizations:use-metrics-layer", OrganizationFeature, True)

+ 1 - 0
src/sentry/ingest/transaction_clusterer/datasource/__init__.py

@@ -0,0 +1 @@
+TRANSACTION_SOURCE = "url"

+ 57 - 0
src/sentry/ingest/transaction_clusterer/datasource/redis.py

@@ -0,0 +1,57 @@
+""" Write transactions into redis sets """
+from typing import Any, Set
+
+import sentry_sdk
+from django.conf import settings
+
+from sentry import features
+from sentry.eventstore.models import Event
+from sentry.ingest.transaction_clusterer.datasource import TRANSACTION_SOURCE
+from sentry.models import Project
+from sentry.utils import redis
+from sentry.utils.safe import safe_execute
+
+#: Maximum number of transaction names per project that we want
+#: to store in redis.
+MAX_SET_SIZE = 1000
+
+#: Retention of a set.
+#: Remove the set if it has not received any updates for 24 hours.
+SET_TTL = 24 * 60 * 60
+
+
+add_to_set = redis.load_script("utils/sadd_capped.lua")
+
+
+def _get_redis_key(project: Project) -> str:
+    return f"txnames:o:{project.organization_id}:p:{project.id}"
+
+
+def _get_redis_client() -> Any:
+    cluster_key = getattr(settings, "SENTRY_TRANSACTION_NAMES_REDIS_CLUSTER", "default")
+    return redis.redis_clusters.get(cluster_key)
+
+
+def _store_transaction_name(project: Project, transaction_name: str) -> None:
+    with sentry_sdk.start_span(op="txcluster.store_transaction_name"):
+        client = _get_redis_client()
+        redis_key = _get_redis_key(project)
+        add_to_set(client, [redis_key], [transaction_name, MAX_SET_SIZE, SET_TTL])
+
+
+def _get_transaction_names(project: Project) -> Set[str]:
+    client = _get_redis_client()
+    redis_key = _get_redis_key(project)
+
+    # TODO: Not sure if this works for large sets in production
+    return client.smembers(redis_key)  # type: ignore
+
+
+def record_transaction_name(project: Project, event: Event, **kwargs: Any) -> None:
+    source = (event.data.get("transaction_info") or {}).get("source")
+    if (
+        source == TRANSACTION_SOURCE
+        and event.transaction
+        and features.has("organizations:transaction-name-clusterer", project.organization)
+    ):
+        safe_execute(_store_transaction_name, project, event.transaction, _with_transaction=False)

+ 1 - 2
src/sentry/ingest/transaction_clusterer/datasource.py → src/sentry/ingest/transaction_clusterer/datasource/snuba.py

@@ -3,11 +3,10 @@ from typing import Iterable, Tuple
 
 from snuba_sdk import Column, Condition, Entity, Limit, Op, Query, Request
 
+from sentry.ingest.transaction_clusterer.datasource import TRANSACTION_SOURCE
 from sentry.models import Project
 from sentry.utils.snuba import raw_snql_query
 
-TRANSACTION_SOURCE = "url"
-
 
 def fetch_unique_transaction_names(
     project: Project, time_range: Tuple[datetime, datetime], limit: int

+ 21 - 0
src/sentry/scripts/utils/sadd_capped.lua

@@ -0,0 +1,21 @@
+-- Add an element to a set and cap it to a certain size.
+assert(#KEYS == 1, "provide exactly one set key")
+assert(#ARGV == 3, "provide a value, max_size and a TTL")
+
+local key = KEYS[1]
+local value = ARGV[1]
+local max_size = tonumber(ARGV[2])
+local ttl = ARGV[3]
+
+local inserted = redis.call("SADD", key, value)
+if inserted then
+    local current_size = redis.call("SCARD", key)
+    local overflow = current_size - max_size
+    if overflow > 0 then
+        -- Evict random entries.
+        -- NOTE: There is a chance that we remove the same element that we inserted.
+        redis.call("SPOP", key, overflow)
+    end
+end
+
+redis.call("EXPIRE", key, ttl)

+ 4 - 3
src/sentry/tasks/post_process.py

@@ -323,12 +323,12 @@ def post_process_group(
 
     with snuba.options_override({"consistent": True}):
         from sentry.eventstore.processing import event_processing_store
+        from sentry.ingest.transaction_clusterer.datasource.redis import (
+            record_transaction_name as record_transaction_name_for_clustering,  # We use the data being present/missing in the processing store; to ensure that we don't duplicate work should the forwarding consumers; need to rewind history.
+        )
         from sentry.models import Organization, Project
         from sentry.reprocessing2 import is_reprocessed_event
 
-        # We use the data being present/missing in the processing store
-        # to ensure that we don't duplicate work should the forwarding consumers
-        # need to rewind history.
         data = event_processing_store.get(cache_key)
         if not data:
             logger.info(
@@ -360,6 +360,7 @@ def post_process_group(
         # This should eventually be completely removed and transactions
         # will not go through any post processing.
         if is_transaction_event:
+            record_transaction_name_for_clustering(event.project, event)
             with sentry_sdk.start_span(op="tasks.post_process_group.transaction_processed_signal"):
                 transaction_processed.send_robust(
                     sender=post_process_group,

+ 78 - 0
tests/sentry/ingest/test_transaction_clusterer.py

@@ -1,4 +1,16 @@
+from unittest import mock
+
+import pytest
+
+from sentry.eventstore.models import Event
+from sentry.ingest.transaction_clusterer.datasource.redis import (
+    _get_transaction_names,
+    _store_transaction_name,
+    record_transaction_name,
+)
 from sentry.ingest.transaction_clusterer.tree import TreeClusterer
+from sentry.models.project import Project
+from sentry.testutils.helpers import Feature
 
 
 def test_multi_fanout():
@@ -27,3 +39,69 @@ def test_single_leaf():
     ]
     clusterer.add_input(transaction_names)
     assert clusterer.get_rules() == ["/a/*/**"]
+
+
+@mock.patch("sentry.ingest.transaction_clusterer.datasource.redis.MAX_SET_SIZE", 5)
+def test_collection():
+    project1 = Project(id=101, name="p1", organization_id=1)
+    project2 = Project(id=102, name="project2", organization_id=1)
+
+    for project in (project1, project2):
+        for i in range(len(project.name)):
+            _store_transaction_name(project, f"tx-{project.name}-{i}")
+            _store_transaction_name(project, f"tx-{project.name}-{i}")
+
+    set_entries1 = _get_transaction_names(project1)
+    assert set(set_entries1) == {"tx-p1-0", "tx-p1-1"}
+
+    set_entries2 = _get_transaction_names(project2)
+    assert len(set_entries2) == 5
+    # We don't know which entries made it into the final set:
+    for name in set_entries2:
+        assert name.startswith("tx-project2-")
+
+    project3 = Project(id=103, name="project3", organization_id=1)
+    assert set() == _get_transaction_names(project3)
+
+
+@mock.patch("sentry.ingest.transaction_clusterer.datasource.redis.MAX_SET_SIZE", 100)
+def test_distribution():
+    """Make sure that the redis set prefers newer entries"""
+    project = Project(id=103, name="", organization_id=1)
+    for i in range(1000):
+        _store_transaction_name(project, str(i))
+
+    freshness = sum(map(int, _get_transaction_names(project))) / 100
+
+    # The average is usually around ~900, check for > 800 to be on the safe side
+    assert freshness > 800, freshness
+
+
+@mock.patch("sentry.ingest.transaction_clusterer.datasource.redis._store_transaction_name")
+@pytest.mark.django_db
+@pytest.mark.parametrize(
+    "source,txname,feature_enabled,expected",
+    [
+        ("url", "/a/b/c", True, 1),
+        ("route", "/", True, 0),
+        ("url", None, True, 0),
+        ("url", "/", False, 0),
+        ("route", None, False, 0),
+    ],
+)
+def test_record_transactions(
+    mocked_record, default_organization, source, txname, feature_enabled, expected
+):
+    with Feature({"organizations:transaction-name-clusterer": feature_enabled}):
+        project = Project(id=111, name="project", organization_id=default_organization.id)
+        event = Event(
+            project.id,
+            "02552061b47b467cb38d1d2dd26eed21",
+            data={
+                "tags": [["transaction", txname]],
+                "transaction": txname,
+                "transaction_info": {"source": source},
+            },
+        )
+        record_transaction_name(project, event)
+        assert len(mocked_record.mock_calls) == expected

+ 35 - 0
tests/sentry/tasks/test_post_process.py

@@ -1254,3 +1254,38 @@ class PostProcessGroupPerformanceTest(
         assert event_processed_signal_mock.call_count == 0
         assert mock_processor.call_count == 0
         assert run_post_process_job_mock.call_count == 2
+
+
+class TransactionClustererTestCase(TestCase, SnubaTestCase):
+    @with_feature("organizations:transaction-name-clusterer")
+    @patch("sentry.ingest.transaction_clusterer.datasource.redis._store_transaction_name")
+    def test_process_transaction_event_clusterer(
+        self,
+        mock_store_transaction_name,
+    ):
+        min_ago = before_now(minutes=1).replace(tzinfo=pytz.utc)
+        event = self.store_event(
+            data={
+                "event_id": "b" * 32,
+                "transaction": "foo",
+                "start_timestamp": str(min_ago),
+                "timestamp": str(min_ago),
+                "type": "transaction",
+                "transaction_info": {
+                    "source": "url",
+                },
+                "contexts": {"trace": {"trace_id": "b" * 32, "span_id": "c" * 16, "op": ""}},
+            },
+            project_id=self.project.id,
+        )
+        cache_key = write_event_to_cache(event)
+        post_process_group(
+            is_new=False,
+            is_regression=False,
+            is_new_group_environment=False,
+            cache_key=cache_key,
+            group_id=None,
+            group_states=None,
+        )
+
+        assert mock_store_transaction_name.mock_calls == [mock.call(self.project, "foo")]