Browse Source

feat(metrics) report fetch metadata in indexer bulk record (#33791)

records the source of a tag fetch in the results of bulk record
Oliver Newland 2 years ago
parent
commit
4ebd34aa84

+ 158 - 2
src/sentry/sentry_metrics/indexer/base.py

@@ -1,8 +1,164 @@
-from typing import Mapping, Optional, Set
+from collections import defaultdict
+from dataclasses import dataclass
+from enum import Enum
+from typing import (
+    Mapping,
+    MutableMapping,
+    MutableSequence,
+    Optional,
+    Sequence,
+    Set,
+    Tuple,
+    Type,
+    TypeVar,
+)
 
 from sentry.utils.services import Service
 
 
+class FetchType(Enum):
+    CACHE_HIT = "c"
+    HARDCODED = "h"
+    DB_READ = "d"
+    FIRST_SEEN = "f"
+
+
+KR = TypeVar("KR", bound="KeyResult")
+
+
+@dataclass(frozen=True)
+class KeyResult:
+    org_id: int
+    string: str
+    id: int
+
+    @classmethod
+    def from_string(cls: Type[KR], key: str, id: int) -> KR:
+        org_id, string = key.split(":", 1)
+        return cls(int(org_id), string, id)
+
+
+class KeyCollection:
+    """
+    A KeyCollection is a way of keeping track of a group of keys
+    used to fetch ids, whose results are stored in KeyResults.
+
+    A key is a org_id, string pair, either represented as a
+    tuple e.g (1, "a"), or a string "1:a".
+
+    Initial mapping is org_id's to sets of strings:
+        { 1: {"a", "b", "c"}, 2: {"e", "f"} }
+    """
+
+    def __init__(self, mapping: Mapping[int, Set[str]]):
+        self.mapping = mapping
+        self.size = self._size()
+
+    def _size(self) -> int:
+        total_size = 0
+        for org_id in self.mapping.keys():
+            total_size += len(self.mapping[org_id])
+        return total_size
+
+    def as_tuples(self) -> Sequence[Tuple[int, str]]:
+        """
+        Returns all the keys, each key represented as tuple -> (1, "a")
+        """
+        key_pairs: MutableSequence[Tuple[int, str]] = []
+        for org_id in self.mapping:
+            key_pairs.extend([(org_id, string) for string in self.mapping[org_id]])
+
+        return key_pairs
+
+    def as_strings(self) -> Sequence[str]:
+        """
+        Returns all the keys, each key represented as string -> "1:a"
+        """
+        keys: MutableSequence[str] = []
+        for org_id in self.mapping:
+            keys.extend([f"{org_id}:{string}" for string in self.mapping[org_id]])
+
+        return keys
+
+
+class KeyResults:
+    def __init__(self) -> None:
+        self.results: MutableMapping[int, MutableMapping[str, int]] = defaultdict(dict)
+        self.meta: MutableMapping[FetchType, MutableMapping[int, str]] = defaultdict(dict)
+
+    def add_key_result(self, key_result: KeyResult, fetch_type: Optional[FetchType] = None) -> None:
+        self.results[key_result.org_id].update({key_result.string: key_result.id})
+        if fetch_type:
+            self.meta[fetch_type].update({key_result.id: key_result.string})
+
+    def add_key_results(
+        self, key_results: Sequence[KeyResult], fetch_type: Optional[FetchType] = None
+    ) -> None:
+        for key_result in key_results:
+            self.results[key_result.org_id].update({key_result.string: key_result.id})
+            if fetch_type:
+                self.meta[fetch_type].update({key_result.id: key_result.string})
+
+    def get_mapped_results(self) -> Mapping[int, Mapping[str, int]]:
+        """
+        Only return results that have org_ids with string/int mappings.
+        """
+        mapped_results = {k: v for k, v in self.results.items() if len(v) > 0}
+        return mapped_results
+
+    def get_unmapped_keys(self, keys: KeyCollection) -> KeyCollection:
+        """
+        Takes a KeyCollection and compares it to the results. Returns
+        a new KeyCollection for any keys that don't have corresponding
+        ids in results.
+        """
+        unmapped_org_strings: MutableMapping[int, Set[str]] = defaultdict(set)
+        for org_id, strings in keys.mapping.items():
+            for string in strings:
+                if not self.results[org_id].get(string):
+                    unmapped_org_strings[org_id].add(string)
+
+        return KeyCollection(unmapped_org_strings)
+
+    def get_mapped_key_strings_to_ints(self) -> MutableMapping[str, int]:
+        """
+        Return the results, but formatted as the following:
+            {
+                "1:a": 10,
+                "1:b": 11,
+                "1:c", 12,
+                "2:e": 13
+            }
+        This is for when we use indexer_cache.set_many()
+        """
+        cache_key_results: MutableMapping[str, int] = {}
+        for org_id, result_dict in self.results.items():
+            for string, id in result_dict.items():
+                key = f"{org_id}:{string}"
+                cache_key_results[key] = id
+
+        return cache_key_results
+
+    def get_fetch_metadata(self) -> Mapping[FetchType, Mapping[int, str]]:
+        return self.meta
+
+    def merge(self, other: "KeyResults") -> "KeyResults":
+        new_results: "KeyResults" = KeyResults()
+
+        for org_id, strings in [*other.results.items(), *self.results.items()]:
+            new_results.results[org_id].update(strings)
+
+        for fetch_type in [*self.meta.keys(), *other.meta.keys()]:
+            new_results.meta[fetch_type].update(other.meta[fetch_type].items())
+            new_results.meta[fetch_type].update(self.meta[fetch_type].items())
+
+        return new_results
+
+    # For brevity, allow callers to address the mapping directly
+    def __getitem__(self, org_id: int) -> Mapping[str, int]:
+        return self.results[org_id]
+
+
 class StringIndexer(Service):
     """
     Provides integer IDs for metric names, tag keys and tag values
@@ -13,7 +169,7 @@ class StringIndexer(Service):
 
     __all__ = ("record", "resolve", "reverse_resolve", "bulk_record")
 
-    def bulk_record(self, org_strings: Mapping[int, Set[str]]) -> Mapping[int, Mapping[str, int]]:
+    def bulk_record(self, org_strings: Mapping[int, Set[str]]) -> KeyResults:
         raise NotImplementedError()
 
     def record(self, org_id: int, string: str) -> int:

+ 6 - 6
src/sentry/sentry_metrics/indexer/mock.py

@@ -1,10 +1,10 @@
 import itertools
 from collections import defaultdict
-from typing import DefaultDict, Dict, Mapping, MutableMapping, Optional, Set
+from typing import DefaultDict, Dict, Mapping, Optional, Set
 
 from sentry.sentry_metrics.indexer.strings import REVERSE_SHARED_STRINGS, SHARED_STRINGS
 
-from .base import StringIndexer
+from .base import KeyResult, KeyResults, StringIndexer
 
 
 class SimpleIndexer(StringIndexer):
@@ -18,8 +18,8 @@ class SimpleIndexer(StringIndexer):
         )
         self._reverse: Dict[int, str] = {}
 
-    def bulk_record(self, org_strings: Mapping[int, Set[str]]) -> Mapping[int, Mapping[str, int]]:
-        result: MutableMapping[int, MutableMapping[str, int]] = {}
+    def bulk_record(self, org_strings: Mapping[int, Set[str]]) -> KeyResults:
+        acc = KeyResults()
         for org_id, strs in org_strings.items():
             strings_to_ints = {}
             for string in strs:
@@ -27,9 +27,9 @@ class SimpleIndexer(StringIndexer):
                     strings_to_ints[string] = SHARED_STRINGS[string]
                 else:
                     strings_to_ints[string] = self._record(org_id, string)
-            result[org_id] = strings_to_ints
+                acc.add_key_result(KeyResult(org_id, string, strings_to_ints[string]))
 
-        return result
+        return acc
 
     def record(self, org_id: int, string: str) -> int:
         if string in SHARED_STRINGS:

+ 27 - 156
src/sentry/sentry_metrics/indexer/postgres_v2.py

@@ -1,154 +1,23 @@
-from collections import defaultdict
-from dataclasses import dataclass
 from functools import reduce
 from operator import or_
-from typing import (
-    Any,
-    Mapping,
-    MutableMapping,
-    MutableSequence,
-    Optional,
-    Sequence,
-    Set,
-    Tuple,
-    Type,
-    TypeVar,
-)
+from typing import Any, Mapping, Optional, Set
 
 from django.db.models import Q
 
-from sentry.sentry_metrics.indexer.base import StringIndexer
+from sentry.sentry_metrics.indexer.base import KeyCollection, KeyResult, KeyResults, StringIndexer
 from sentry.sentry_metrics.indexer.cache import indexer_cache
 from sentry.sentry_metrics.indexer.models import StringIndexer as StringIndexerTable
 from sentry.sentry_metrics.indexer.strings import REVERSE_SHARED_STRINGS, SHARED_STRINGS
 from sentry.utils import metrics
 
+from .base import FetchType
+
 _INDEXER_CACHE_METRIC = "sentry_metrics.indexer.memcache"
 _INDEXER_DB_METRIC = "sentry_metrics.indexer.postgres"
 # only used to compare to the older version of the PGIndexer
 _INDEXER_CACHE_FETCH_METRIC = "sentry_metrics.indexer.memcache.fetch"
 
 
-class KeyCollection:
-    """
-    A KeyCollection is a way of keeping track of a group of keys
-    used to fetch ids, whose results are stored in KeyResults.
-
-    A key is a org_id, string pair, either represented as a
-    tuple e.g (1, "a"), or a string "1:a".
-
-    Initial mapping is org_id's to sets of strings:
-        { 1: {"a", "b", "c"}, 2: {"e", "f"} }
-    """
-
-    def __init__(self, mapping: Mapping[int, Set[str]]):
-        self.mapping = mapping
-        self.size = self._size()
-
-    def _size(self) -> int:
-        total_size = 0
-        for org_id in self.mapping.keys():
-            total_size += len(self.mapping[org_id])
-        return total_size
-
-    def as_tuples(self) -> Sequence[Tuple[int, str]]:
-        """
-        Returns all the keys, each key represented as tuple -> (1, "a")
-        """
-        key_pairs: MutableSequence[Tuple[int, str]] = []
-        for org_id in self.mapping:
-            key_pairs.extend([(org_id, string) for string in self.mapping[org_id]])
-
-        return key_pairs
-
-    def as_strings(self) -> Sequence[str]:
-        """
-        Returns all the keys, each key represented as string -> "1:a"
-        """
-        keys: MutableSequence[str] = []
-        for org_id in self.mapping:
-            keys.extend([f"{org_id}:{string}" for string in self.mapping[org_id]])
-
-        return keys
-
-
-KR = TypeVar("KR", bound="KeyResult")
-
-
-@dataclass(frozen=True)
-class KeyResult:
-    org_id: int
-    string: str
-    id: int
-
-    @classmethod
-    def from_string(cls: Type[KR], key: str, id: int) -> KR:
-        org_id, string = key.split(":", 1)
-        return cls(int(org_id), string, id)
-
-
-class KeyResults:
-    def __init__(self) -> None:
-        self.results: MutableMapping[int, MutableMapping[str, int]] = defaultdict(dict)
-
-    def add_key_result(self, key_result: KeyResult) -> None:
-        self.results[key_result.org_id].update({key_result.string: key_result.id})
-
-    def add_key_results(self, key_results: Sequence[KeyResult]) -> None:
-        for key_result in key_results:
-            self.results[key_result.org_id].update({key_result.string: key_result.id})
-
-    def get_mapped_results(self) -> Mapping[int, Mapping[str, int]]:
-        """
-        Only return results that have org_ids with string/int mappings.
-        """
-        mapped_results = {k: v for k, v in self.results.items() if len(v) > 0}
-        return mapped_results
-
-    def get_unmapped_keys(self, keys: KeyCollection) -> KeyCollection:
-        """
-        Takes a KeyCollection and compares it to the results. Returns
-        a new KeyCollection for any keys that don't have corresponding
-        ids in results.
-        """
-        unmapped_org_strings: MutableMapping[int, Set[str]] = defaultdict(set)
-        for org_id, strings in keys.mapping.items():
-            for string in strings:
-                if not self.results[org_id].get(string):
-                    unmapped_org_strings[org_id].add(string)
-
-        return KeyCollection(unmapped_org_strings)
-
-    def get_mapped_key_strings_to_ints(self) -> MutableMapping[str, int]:
-        """
-        Return the results, but formatted as the following:
-            {
-                "1:a": 10,
-                "1:b": 11,
-                "1:c", 12,
-                "2:e": 13
-            }
-        This is for when we use indexer_cache.set_many()
-        """
-        cache_key_results: MutableMapping[str, int] = {}
-        for org_id, result_dict in self.results.items():
-            for string, id in result_dict.items():
-                key = f"{org_id}:{string}"
-                cache_key_results[key] = id
-
-        return cache_key_results
-
-
-def merge_results(
-    result_mappings: Sequence[Mapping[int, Mapping[str, int]]],
-) -> Mapping[int, Mapping[str, int]]:
-    new_results: MutableMapping[int, MutableMapping[str, int]] = defaultdict(dict)
-    for result_map in result_mappings:
-        for org_id, strings in result_map.items():
-            new_results[org_id].update(strings)
-    return new_results
-
-
 class PGStringIndexerV2(StringIndexer):
     """
     Provides integer IDs for metric names, tag keys and tag values
@@ -165,7 +34,7 @@ class PGStringIndexerV2(StringIndexer):
 
         return StringIndexerTable.objects.filter(query_statement)
 
-    def bulk_record(self, org_strings: Mapping[int, Set[str]]) -> Mapping[int, Mapping[str, int]]:
+    def bulk_record(self, org_strings: Mapping[int, Set[str]]) -> KeyResults:
         """
         Takes in a mapping with org_ids to sets of strings.
 
@@ -184,12 +53,17 @@ class PGStringIndexerV2(StringIndexer):
         Then the work to get the ids (either from cache, db, etc)
             .... # work to add results to KeyResults()
 
-        Those results will be added to `mapped_results`
+        Those results will be added to `mapped_results` which can
+        be retrieved
             key_results.get_mapped_results()
 
-        And any remaining unmapped keys get turned into a new
+        Remaining unmapped keys get turned into a new
         KeyCollection for the next step:
             new_keys = key_results.get_unmapped_keys(mapping)
+
+        When the last step is reached or a step resolves all the remaining
+        unmapped keys the key_results objects are merged and returned:
+            e.g. return cache_key_results.merge(db_read_key_results)
         """
         cache_keys = KeyCollection(org_strings)
         cache_key_strs = cache_keys.as_strings()
@@ -214,25 +88,24 @@ class PGStringIndexerV2(StringIndexer):
 
         cache_key_results = KeyResults()
         cache_key_results.add_key_results(
-            [KeyResult.from_string(k, v) for k, v in cache_results.items() if v is not None]
+            [KeyResult.from_string(k, v) for k, v in cache_results.items() if v is not None],
+            FetchType.CACHE_HIT,
         )
 
-        mapped_cache_results = cache_key_results.get_mapped_results()
         db_read_keys = cache_key_results.get_unmapped_keys(cache_keys)
 
         if db_read_keys.size == 0:
-            return mapped_cache_results
+            return cache_key_results
 
         db_read_key_results = KeyResults()
         db_read_key_results.add_key_results(
             [
                 KeyResult(org_id=db_obj.organization_id, string=db_obj.string, id=db_obj.id)
                 for db_obj in self._get_db_records(db_read_keys)
-            ]
+            ],
+            FetchType.DB_READ,
         )
         new_results_to_cache = db_read_key_results.get_mapped_key_strings_to_ints()
-
-        mapped_db_read_results = db_read_key_results.get_mapped_results()
         db_write_keys = db_read_key_results.get_unmapped_keys(db_read_keys)
 
         metrics.incr(
@@ -248,7 +121,7 @@ class PGStringIndexerV2(StringIndexer):
 
         if db_write_keys.size == 0:
             indexer_cache.set_many(new_results_to_cache)
-            return merge_results([mapped_cache_results, mapped_db_read_results])
+            return cache_key_results.merge(db_read_key_results)
 
         new_records = []
         for write_pair in db_write_keys.as_tuples():
@@ -268,17 +141,14 @@ class PGStringIndexerV2(StringIndexer):
             [
                 KeyResult(org_id=db_obj.organization_id, string=db_obj.string, id=db_obj.id)
                 for db_obj in self._get_db_records(db_write_keys)
-            ]
+            ],
+            fetch_type=FetchType.FIRST_SEEN,
         )
 
         new_results_to_cache.update(db_write_key_results.get_mapped_key_strings_to_ints())
         indexer_cache.set_many(new_results_to_cache)
 
-        mapped_db_write_results = db_write_key_results.get_mapped_results()
-
-        return merge_results(
-            [mapped_cache_results, mapped_db_read_results, mapped_db_write_results]
-        )
+        return cache_key_results.merge(db_read_key_results).merge(db_write_key_results)
 
     def record(self, org_id: int, string: str) -> int:
         """Store a string and return the integer ID generated for it"""
@@ -330,23 +200,24 @@ class StaticStringsIndexerDecorator(StringIndexer):
     def _get_db_records(self, db_keys: KeyCollection) -> Any:
         return self.indexer._get_db_records(db_keys)
 
-    def bulk_record(self, org_strings: Mapping[int, Set[str]]) -> Mapping[int, Mapping[str, int]]:
+    def bulk_record(self, org_strings: Mapping[int, Set[str]]) -> KeyResults:
         static_keys = KeyCollection(org_strings)
         static_key_results = KeyResults()
         for org_id, string in static_keys.as_tuples():
             if string in SHARED_STRINGS:
                 id = SHARED_STRINGS[string]
-                static_key_results.add_key_result(KeyResult(org_id, string, id))
+                static_key_results.add_key_result(
+                    KeyResult(org_id, string, id), FetchType.HARDCODED
+                )
 
-        static_string_results = static_key_results.get_mapped_results()
         org_strings_left = static_key_results.get_unmapped_keys(static_keys)
 
         if org_strings_left.size == 0:
-            return static_string_results
+            return static_key_results
 
         indexer_results = self.indexer.bulk_record(org_strings_left.mapping)
 
-        return merge_results([static_string_results, indexer_results])
+        return static_key_results.merge(indexer_results)
 
     def record(self, org_id: int, string: str) -> int:
         if string in SHARED_STRINGS:

+ 1 - 1
src/sentry/sentry_metrics/multiprocess.py

@@ -379,7 +379,7 @@ def process_messages(
     metrics.incr("process_messages.total_strings_indexer_lookup", amount=len(strings))
 
     with metrics.timer("metrics_consumer.bulk_record"):
-        mapping = indexer.bulk_record(org_strings)
+        mapping = indexer.bulk_record(org_strings).get_mapped_results()
 
     new_messages: List[Message[KafkaPayload]] = []
 

+ 66 - 5
tests/sentry/sentry_metrics/test_postgres_indexer.py

@@ -1,10 +1,9 @@
+from sentry.sentry_metrics.indexer.base import KeyCollection, KeyResult, KeyResults
 from sentry.sentry_metrics.indexer.cache import indexer_cache
 from sentry.sentry_metrics.indexer.models import MetricsKeyIndexer, StringIndexer
 from sentry.sentry_metrics.indexer.postgres import PGStringIndexer
 from sentry.sentry_metrics.indexer.postgres_v2 import (
-    KeyCollection,
-    KeyResult,
-    KeyResults,
+    FetchType,
     PGStringIndexerV2,
     StaticStringsIndexerDecorator,
 )
@@ -71,6 +70,10 @@ class StaticStringsIndexerTest(TestCase):
         assert results[2]["1.0.0"] == v1.id
         assert results[3]["2.0.0"] == v2.id
 
+        meta = results.get_fetch_metadata()
+        assert set(meta[FetchType.HARDCODED].values()) == {"release", "production", "environment"}
+        assert set(meta[FetchType.FIRST_SEEN].values()) == {"1.0.0", "2.0.0"}
+
 
 class PostgresIndexerV2Test(TestCase):
     def setUp(self) -> None:
@@ -93,7 +96,7 @@ class PostgresIndexerV2Test(TestCase):
             indexer_cache.get_many([f"{org1_id}:{string}" for string in self.strings]).values()
         ) == [None, None, None]
 
-        results = self.indexer.bulk_record(org_strings=org_strings)
+        results = PGStringIndexerV2().bulk_record(org_strings=org_strings).results
 
         org1_string_ids = list(
             StringIndexer.objects.filter(
@@ -171,6 +174,12 @@ class PostgresIndexerV2Test(TestCase):
         for string, id in results[org_id].items():
             assert expected_mapping[string] == id
 
+        fetch_meta = results.get_fetch_metadata()
+        assert {"v1.2.0", "v1.2.1", "v1.2.2"} == {
+            v for _, v in fetch_meta[FetchType.CACHE_HIT].items()
+        }
+        assert {"v1.2.3"} == set(fetch_meta[FetchType.FIRST_SEEN].values())
+
     def test_already_cached_plus_read_results(self) -> None:
         """
         Test that we correctly combine cached results with read results
@@ -195,6 +204,10 @@ class PostgresIndexerV2Test(TestCase):
         assert results[org_id]["boop"] == 11
         assert results[org_id]["bam"] == bam.id
 
+        fetch_meta = results.get_fetch_metadata()
+        assert {"beep", "boop"} == {v for _, v in fetch_meta[FetchType.CACHE_HIT].items()}
+        assert {"bam"} == set(fetch_meta[FetchType.DB_READ].values())
+
     def test_get_db_records(self):
         """
         Make sure that calling `_get_db_records` doesn't populate the cache
@@ -256,7 +269,6 @@ class KeyResultsTest(TestCase):
         assert key_results.get_unmapped_keys(collection).mapping == {1: {"b", "c"}, 2: {"e", "f"}}
 
         key_result_list = [
-            KeyResult(1, "a", 10),
             KeyResult(1, "b", 11),
             KeyResult(1, "c", 12),
             KeyResult(2, "e", 13),
@@ -277,3 +289,52 @@ class KeyResultsTest(TestCase):
         }
 
         assert key_results.get_unmapped_keys(collection).mapping == {}
+
+    def test_merges_with_metadata(self):
+        org_id = 1
+        cache_mappings = {"cache1": 1, "cache2": 2}
+        read_mappings = {"read3": 3, "read4": 4}
+        hardcode_mappings = {"hardcode5": 5, "hardcode6": 6}
+        write_mappings = {"write7": 7, "write8": 8}
+
+        mappings = {
+            *cache_mappings,
+            *read_mappings,
+            *hardcode_mappings,
+            *write_mappings,
+        }
+
+        kr_cache = KeyResults()
+        kr_dbread = KeyResults()
+        kr_hardcoded = KeyResults()
+        kr_write = KeyResults()
+        assert kr_cache.results == {} and kr_cache.meta == {}
+        assert kr_dbread.results == {} and kr_dbread.meta == {}
+        assert kr_hardcoded.results == {} and kr_hardcoded.meta == {}
+        assert kr_write.results == {} and kr_write.meta == {}
+
+        kr_cache.add_key_results(
+            [KeyResult(org_id=org_id, string=k, id=v) for k, v in cache_mappings.items()],
+            FetchType.CACHE_HIT,
+        )
+        kr_dbread.add_key_results(
+            [KeyResult(org_id=org_id, string=k, id=v) for k, v in read_mappings.items()],
+            FetchType.DB_READ,
+        )
+        kr_hardcoded.add_key_results(
+            [KeyResult(org_id=org_id, string=k, id=v) for k, v in hardcode_mappings.items()],
+            FetchType.HARDCODED,
+        )
+        kr_write.add_key_results(
+            [KeyResult(org_id=org_id, string=k, id=v) for k, v in write_mappings.items()],
+            FetchType.FIRST_SEEN,
+        )
+
+        kr_merged = kr_cache.merge(kr_dbread).merge(kr_hardcoded).merge(kr_write)
+
+        assert len(kr_merged.get_mapped_results()[org_id]) == len(mappings)
+        fetch_metadata = kr_merged.get_fetch_metadata()
+        assert fetch_metadata[FetchType.DB_READ].keys() == set(read_mappings.values())
+        assert fetch_metadata[FetchType.HARDCODED].keys() == set(hardcode_mappings.values())
+        assert fetch_metadata[FetchType.FIRST_SEEN].keys() == set(write_mappings.values())
+        assert fetch_metadata[FetchType.CACHE_HIT].keys() == set(cache_mappings.values())