Browse Source

ref(indexer): allow spanner indexer in prod via indexer_db cli arg (#37745)

**context**
We want to test out alternative backends for the metrics indexer, and test them in parallel to the current indexer. I've added a cli arg `--indexer-db` that defaults to `postgres`, that can be used to determine which backend we want to use. 

There is no problem with having another consumer group consume from the ingest topic (`ingest-metrics` or `ingest-performance-metrics`). However, we don't want to double produce to the snuba topic so we have to make a new topic to produce to. I've only added the logic for the performance metrics, but the same could be done for release health (aka would need a new dummy topic and logic to override the `output_topic`)

**other changes**
* ~I got rid of the `Db.Key` because it looked like that mapping could be replaced by using the `use_case_id`. Open to feedback on that, can always put it back.~ Got pulled out by @lynnagara  in https://github.com/getsentry/sentry/pull/37790
* I had to update the `WritesLimiter` but I think that makes sense so that we have a separation in redis between the backend implementations 
    * **[edit]** update rate limiter in https://github.com/getsentry/sentry/pull/38192
* I pass the full `MetricsIngestConfiguration` to the `process_messages` now instead of just the `use_case_id`. Seemed to make sense since both of the args I would have been passing in were on the config already
    *  **[edit]:** update the message processor in https://github.com/getsentry/sentry/pull/38225

This _should_ be able to be run in prod provided the ops config for the consumer and topic are added. 

**notes:**
The Last Seen Updater currently only works with the postgres backend as it is coupled to the postgres tables, futures changes would need to be made to enable different backends for the last seen updater https://github.com/getsentry/sentry/blob/fb1bc70261712ce11ee609123d77a4f7065d2ab3/src/sentry/sentry_metrics/consumers/last_seen_updater.py#L128
MeredithAnya 2 years ago
parent
commit
453e977949

+ 9 - 0
src/sentry/conf/server.py

@@ -1482,6 +1482,8 @@ SENTRY_METRICS_INDEXER = "sentry.sentry_metrics.indexer.postgres.postgres_v2.Pos
 SENTRY_METRICS_INDEXER_OPTIONS = {}
 SENTRY_METRICS_INDEXER_OPTIONS = {}
 SENTRY_METRICS_INDEXER_CACHE_TTL = 3600 * 2
 SENTRY_METRICS_INDEXER_CACHE_TTL = 3600 * 2
 
 
+SENTRY_METRICS_INDEXER_SPANNER_OPTIONS = {}
+
 # Rate limits during string indexing for our metrics product.
 # Rate limits during string indexing for our metrics product.
 # Which cluster to use. Example: {"cluster": "default"}
 # Which cluster to use. Example: {"cluster": "default"}
 SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS = {}
 SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS = {}
@@ -2418,6 +2420,11 @@ KAFKA_INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics"
 KAFKA_SNUBA_GENERIC_METRICS = "snuba-generic-metrics"
 KAFKA_SNUBA_GENERIC_METRICS = "snuba-generic-metrics"
 KAFKA_INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
 KAFKA_INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
 
 
+# topic for testing multiple indexer backends in parallel
+# in production. So far just testing backends for the perf data,
+# not release helth
+KAFKA_SNUBA_GENERICS_METRICS_CS = "snuba-metrics-generics-cloudspanner"
+
 KAFKA_SUBSCRIPTION_RESULT_TOPICS = {
 KAFKA_SUBSCRIPTION_RESULT_TOPICS = {
     "events": KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
     "events": KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
     "transactions": KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
     "transactions": KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
@@ -2456,6 +2463,8 @@ KAFKA_TOPICS = {
     KAFKA_INGEST_PERFORMANCE_METRICS: {"cluster": "default"},
     KAFKA_INGEST_PERFORMANCE_METRICS: {"cluster": "default"},
     KAFKA_SNUBA_GENERIC_METRICS: {"cluster": "default"},
     KAFKA_SNUBA_GENERIC_METRICS: {"cluster": "default"},
     KAFKA_INGEST_REPLAYS_RECORDINGS: {"cluster": "default"},
     KAFKA_INGEST_REPLAYS_RECORDINGS: {"cluster": "default"},
+    # Metrics Testing Topics
+    KAFKA_SNUBA_GENERICS_METRICS_CS: {"cluster": "default"},
 }
 }
 
 
 
 

+ 6 - 3
src/sentry/runner/commands/run.py

@@ -560,6 +560,7 @@ def ingest_consumer(consumer_types, all_consumer_types, **options):
 @click.option("--ingest-profile", required=True)
 @click.option("--ingest-profile", required=True)
 @click.option("commit_max_batch_size", "--commit-max-batch-size", type=int, default=25000)
 @click.option("commit_max_batch_size", "--commit-max-batch-size", type=int, default=25000)
 @click.option("commit_max_batch_time", "--commit-max-batch-time-ms", type=int, default=10000)
 @click.option("commit_max_batch_time", "--commit-max-batch-time-ms", type=int, default=10000)
+@click.option("--indexer-db", default="postgres")
 def metrics_streaming_consumer(**options):
 def metrics_streaming_consumer(**options):
     import sentry_sdk
     import sentry_sdk
 
 
@@ -569,7 +570,7 @@ def metrics_streaming_consumer(**options):
 
 
     use_case = UseCaseKey(options["ingest_profile"])
     use_case = UseCaseKey(options["ingest_profile"])
     sentry_sdk.set_tag("sentry_metrics.use_case_key", use_case.value)
     sentry_sdk.set_tag("sentry_metrics.use_case_key", use_case.value)
-    ingest_config = get_ingest_config(use_case)
+    ingest_config = get_ingest_config(use_case, options["indexer_db"])
 
 
     streamer = get_streaming_metrics_consumer(indexer_profile=ingest_config, **options)
     streamer = get_streaming_metrics_consumer(indexer_profile=ingest_config, **options)
 
 
@@ -595,6 +596,7 @@ def metrics_streaming_consumer(**options):
 @click.option("--input-block-size", type=int, default=DEFAULT_BLOCK_SIZE)
 @click.option("--input-block-size", type=int, default=DEFAULT_BLOCK_SIZE)
 @click.option("--output-block-size", type=int, default=DEFAULT_BLOCK_SIZE)
 @click.option("--output-block-size", type=int, default=DEFAULT_BLOCK_SIZE)
 @click.option("--ingest-profile", required=True)
 @click.option("--ingest-profile", required=True)
+@click.option("--indexer-db", default="postgres")
 @click.option("max_msg_batch_size", "--max-msg-batch-size", type=int, default=50)
 @click.option("max_msg_batch_size", "--max-msg-batch-size", type=int, default=50)
 @click.option("max_msg_batch_time", "--max-msg-batch-time-ms", type=int, default=10000)
 @click.option("max_msg_batch_time", "--max-msg-batch-time-ms", type=int, default=10000)
 @click.option("max_parallel_batch_size", "--max-parallel-batch-size", type=int, default=50)
 @click.option("max_parallel_batch_size", "--max-parallel-batch-size", type=int, default=50)
@@ -608,7 +610,7 @@ def metrics_parallel_consumer(**options):
 
 
     use_case = UseCaseKey(options["ingest_profile"])
     use_case = UseCaseKey(options["ingest_profile"])
     sentry_sdk.set_tag("sentry_metrics.use_case_key", use_case.value)
     sentry_sdk.set_tag("sentry_metrics.use_case_key", use_case.value)
-    ingest_config = get_ingest_config(use_case)
+    ingest_config = get_ingest_config(use_case, options["db_backend"])
 
 
     streamer = get_parallel_metrics_consumer(indexer_profile=ingest_config, **options)
     streamer = get_parallel_metrics_consumer(indexer_profile=ingest_config, **options)
 
 
@@ -654,12 +656,13 @@ def replays_recordings_consumer(**options):
 @click.option("commit_max_batch_time", "--commit-max-batch-time-ms", type=int, default=10000)
 @click.option("commit_max_batch_time", "--commit-max-batch-time-ms", type=int, default=10000)
 @click.option("--topic", default="snuba-metrics", help="Topic to read indexer output from.")
 @click.option("--topic", default="snuba-metrics", help="Topic to read indexer output from.")
 @click.option("--ingest-profile", required=True)
 @click.option("--ingest-profile", required=True)
+@click.option("--indexer-db", default="postgres")
 def last_seen_updater(**options):
 def last_seen_updater(**options):
     from sentry.sentry_metrics.configuration import UseCaseKey, get_ingest_config
     from sentry.sentry_metrics.configuration import UseCaseKey, get_ingest_config
     from sentry.sentry_metrics.consumers.last_seen_updater import get_last_seen_updater
     from sentry.sentry_metrics.consumers.last_seen_updater import get_last_seen_updater
     from sentry.utils.metrics import global_tags
     from sentry.utils.metrics import global_tags
 
 
-    ingest_config = get_ingest_config(UseCaseKey(options["ingest_profile"]))
+    ingest_config = get_ingest_config(UseCaseKey(options["ingest_profile"]), options["indexer_db"])
 
 
     consumer = get_last_seen_updater(ingest_config=ingest_config, **options)
     consumer = get_last_seen_updater(ingest_config=ingest_config, **options)
 
 

+ 66 - 5
src/sentry/sentry_metrics/configuration.py

@@ -1,6 +1,6 @@
 from dataclasses import dataclass
 from dataclasses import dataclass
 from enum import Enum
 from enum import Enum
-from typing import Any, Mapping, MutableMapping, Optional
+from typing import Any, Mapping, MutableMapping, Optional, Tuple
 
 
 from django.conf import settings
 from django.conf import settings
 
 
@@ -15,10 +15,20 @@ class UseCaseKey(Enum):
 # backwards compatibility
 # backwards compatibility
 RELEASE_HEALTH_PG_NAMESPACE = "releasehealth"
 RELEASE_HEALTH_PG_NAMESPACE = "releasehealth"
 PERFORMANCE_PG_NAMESPACE = "performance"
 PERFORMANCE_PG_NAMESPACE = "performance"
+RELEASE_HEALTH_CS_NAMESPACE = "releasehealth.cs"
+PERFORMANCE_CS_NAMESPACE = "performance.cs"
+
+
+class IndexerStorage(Enum):
+    CLOUDSPANNER = "cloudspanner"
+    POSTGRES = "postgres"
+    MOCK = "mock"
 
 
 
 
 @dataclass(frozen=True)
 @dataclass(frozen=True)
 class MetricsIngestConfiguration:
 class MetricsIngestConfiguration:
+    db_backend: IndexerStorage
+    db_backend_options: Mapping[str, Any]
     input_topic: str
     input_topic: str
     output_topic: str
     output_topic: str
     use_case_id: UseCaseKey
     use_case_id: UseCaseKey
@@ -27,17 +37,23 @@ class MetricsIngestConfiguration:
     writes_limiter_namespace: str
     writes_limiter_namespace: str
 
 
 
 
-_METRICS_INGEST_CONFIG_BY_USE_CASE: MutableMapping[UseCaseKey, MetricsIngestConfiguration] = dict()
+_METRICS_INGEST_CONFIG_BY_USE_CASE: MutableMapping[
+    Tuple[UseCaseKey, IndexerStorage], MetricsIngestConfiguration
+] = dict()
 
 
 
 
 def _register_ingest_config(config: MetricsIngestConfiguration) -> None:
 def _register_ingest_config(config: MetricsIngestConfiguration) -> None:
-    _METRICS_INGEST_CONFIG_BY_USE_CASE[config.use_case_id] = config
+    _METRICS_INGEST_CONFIG_BY_USE_CASE[(config.use_case_id, config.db_backend)] = config
 
 
 
 
-def get_ingest_config(use_case_key: UseCaseKey) -> MetricsIngestConfiguration:
+def get_ingest_config(
+    use_case_key: UseCaseKey, db_backend: IndexerStorage
+) -> MetricsIngestConfiguration:
     if len(_METRICS_INGEST_CONFIG_BY_USE_CASE) == 0:
     if len(_METRICS_INGEST_CONFIG_BY_USE_CASE) == 0:
         _register_ingest_config(
         _register_ingest_config(
             MetricsIngestConfiguration(
             MetricsIngestConfiguration(
+                db_backend=IndexerStorage.POSTGRES,
+                db_backend_options={},
                 input_topic=settings.KAFKA_INGEST_METRICS,
                 input_topic=settings.KAFKA_INGEST_METRICS,
                 output_topic=settings.KAFKA_SNUBA_METRICS,
                 output_topic=settings.KAFKA_SNUBA_METRICS,
                 use_case_id=UseCaseKey.RELEASE_HEALTH,
                 use_case_id=UseCaseKey.RELEASE_HEALTH,
@@ -46,8 +62,11 @@ def get_ingest_config(use_case_key: UseCaseKey) -> MetricsIngestConfiguration:
                 writes_limiter_namespace=RELEASE_HEALTH_PG_NAMESPACE,
                 writes_limiter_namespace=RELEASE_HEALTH_PG_NAMESPACE,
             )
             )
         )
         )
+
         _register_ingest_config(
         _register_ingest_config(
             MetricsIngestConfiguration(
             MetricsIngestConfiguration(
+                db_backend=IndexerStorage.POSTGRES,
+                db_backend_options={},
                 input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS,
                 input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS,
                 output_topic=settings.KAFKA_SNUBA_GENERIC_METRICS,
                 output_topic=settings.KAFKA_SNUBA_GENERIC_METRICS,
                 use_case_id=UseCaseKey.PERFORMANCE,
                 use_case_id=UseCaseKey.PERFORMANCE,
@@ -57,4 +76,46 @@ def get_ingest_config(use_case_key: UseCaseKey) -> MetricsIngestConfiguration:
             )
             )
         )
         )
 
 
-    return _METRICS_INGEST_CONFIG_BY_USE_CASE[use_case_key]
+        _register_ingest_config(
+            MetricsIngestConfiguration(
+                db_backend=IndexerStorage.CLOUDSPANNER,
+                # todo: set cloudspanner options of db and instance ids
+                db_backend_options=settings.SENTRY_METRICS_INDEXER_SPANNER_OPTIONS,
+                input_topic=settings.KAFKA_INGEST_METRICS,
+                output_topic=settings.KAFKA_SNUBA_GENERICS_METRICS_CS,
+                use_case_id=UseCaseKey.RELEASE_HEALTH,
+                internal_metrics_tag="release-health-spanner",
+                writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS,
+                writes_limiter_namespace=RELEASE_HEALTH_CS_NAMESPACE,
+            )
+        )
+
+        _register_ingest_config(
+            MetricsIngestConfiguration(
+                db_backend=IndexerStorage.CLOUDSPANNER,
+                # todo: set cloudspanner options of db and instance ids
+                db_backend_options=settings.SENTRY_METRICS_INDEXER_SPANNER_OPTIONS,
+                input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS,
+                output_topic=settings.KAFKA_SNUBA_GENERICS_METRICS_CS,
+                use_case_id=UseCaseKey.PERFORMANCE,
+                internal_metrics_tag="perf-spanner",
+                writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS_PERFORMANCE,
+                writes_limiter_namespace=PERFORMANCE_CS_NAMESPACE,
+            )
+        )
+
+    if db_backend == IndexerStorage.MOCK:
+        _register_ingest_config(
+            MetricsIngestConfiguration(
+                db_backend=IndexerStorage.MOCK,
+                db_backend_options={},
+                input_topic="topic",
+                output_topic="output-topic",
+                use_case_id=use_case_key,
+                internal_metrics_tag="release-health",
+                writes_limiter_cluster_options={},
+                writes_limiter_namespace="test-namespace",
+            )
+        )
+
+    return _METRICS_INGEST_CONFIG_BY_USE_CASE[(use_case_key, db_backend)]

+ 1 - 1
src/sentry/sentry_metrics/consumers/indexer/multiprocess.py

@@ -69,7 +69,7 @@ class TransformStep(ProcessingStep[MessageBatch]):
     def __init__(
     def __init__(
         self, next_step: ProcessingStep[KafkaPayload], config: MetricsIngestConfiguration
         self, next_step: ProcessingStep[KafkaPayload], config: MetricsIngestConfiguration
     ) -> None:
     ) -> None:
-        self.__message_processor: MessageProcessor = MessageProcessor(config.use_case_id)
+        self.__message_processor: MessageProcessor = MessageProcessor(config)
         self.__next_step = next_step
         self.__next_step = next_step
         self.__closed = False
         self.__closed = False
 
 

+ 1 - 1
src/sentry/sentry_metrics/consumers/indexer/parallel.py

@@ -112,7 +112,7 @@ class MetricsConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         partitions: Mapping[Partition, int],
         partitions: Mapping[Partition, int],
     ) -> ProcessingStrategy[KafkaPayload]:
     ) -> ProcessingStrategy[KafkaPayload]:
         parallel_strategy = ParallelTransformStep(
         parallel_strategy = ParallelTransformStep(
-            MessageProcessor(self.__config.use_case_id).process_messages,
+            MessageProcessor(self.__config).process_messages,
             Unbatcher(
             Unbatcher(
                 SimpleProduceStep(
                 SimpleProduceStep(
                     commit_function=commit,
                     commit_function=commit,

+ 17 - 9
src/sentry/sentry_metrics/consumers/indexer/processing.py

@@ -1,22 +1,30 @@
 import logging
 import logging
+from typing import Callable, Mapping
 
 
 from arroyo.types import Message
 from arroyo.types import Message
 
 
-from sentry.sentry_metrics import indexer
-from sentry.sentry_metrics.configuration import UseCaseKey
+from sentry.sentry_metrics.configuration import IndexerStorage, MetricsIngestConfiguration
 from sentry.sentry_metrics.consumers.indexer.batch import IndexerBatch
 from sentry.sentry_metrics.consumers.indexer.batch import IndexerBatch
 from sentry.sentry_metrics.consumers.indexer.common import MessageBatch
 from sentry.sentry_metrics.consumers.indexer.common import MessageBatch
+from sentry.sentry_metrics.indexer.base import StringIndexer
+from sentry.sentry_metrics.indexer.cloudspanner.cloudspanner import CloudSpannerIndexer
+from sentry.sentry_metrics.indexer.mock import MockIndexer
+from sentry.sentry_metrics.indexer.postgres.postgres_v2 import PostgresIndexer
 from sentry.utils import metrics
 from sentry.utils import metrics
 
 
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
 
 
+STORAGE_TO_INDEXER: Mapping[IndexerStorage, Callable[[], StringIndexer]] = {
+    IndexerStorage.CLOUDSPANNER: CloudSpannerIndexer,
+    IndexerStorage.POSTGRES: PostgresIndexer,
+    IndexerStorage.MOCK: MockIndexer,
+}
+
 
 
 class MessageProcessor:
 class MessageProcessor:
-    # todo: update message processor to take config instead of just use case
-    # and use the config to initialize indexer vs using service model
-    def __init__(self, use_case_id: UseCaseKey):
-        self._use_case_id = use_case_id
-        self._indexer = indexer
+    def __init__(self, config: MetricsIngestConfiguration):
+        self._indexer = STORAGE_TO_INDEXER[config.db_backend](**config.db_backend_options)
+        self._config = config
 
 
     def process_messages(
     def process_messages(
         self,
         self,
@@ -40,13 +48,13 @@ class MessageProcessor:
         The value of the message is what we need to parse and then translate
         The value of the message is what we need to parse and then translate
         using the indexer.
         using the indexer.
         """
         """
-        batch = IndexerBatch(self._use_case_id, outer_message)
+        batch = IndexerBatch(self._config.use_case_id, outer_message)
 
 
         org_strings = batch.extract_strings()
         org_strings = batch.extract_strings()
 
 
         with metrics.timer("metrics_consumer.bulk_record"):
         with metrics.timer("metrics_consumer.bulk_record"):
             record_result = self._indexer.bulk_record(
             record_result = self._indexer.bulk_record(
-                use_case_id=self._use_case_id, org_strings=org_strings
+                use_case_id=self._config.use_case_id, org_strings=org_strings
             )
             )
 
 
         mapping = record_result.get_mapped_results()
         mapping = record_result.get_mapped_results()

+ 2 - 2
src/sentry/sentry_metrics/indexer/postgres/postgres_v2.py

@@ -5,7 +5,7 @@ from typing import Any, Mapping, Optional, Set
 from django.conf import settings
 from django.conf import settings
 from django.db.models import Q
 from django.db.models import Q
 
 
-from sentry.sentry_metrics.configuration import UseCaseKey, get_ingest_config
+from sentry.sentry_metrics.configuration import IndexerStorage, UseCaseKey, get_ingest_config
 from sentry.sentry_metrics.indexer.base import (
 from sentry.sentry_metrics.indexer.base import (
     FetchType,
     FetchType,
     KeyCollection,
     KeyCollection,
@@ -77,7 +77,7 @@ class PGStringIndexerV2(StringIndexer):
         if db_write_keys.size == 0:
         if db_write_keys.size == 0:
             return db_read_key_results
             return db_read_key_results
 
 
-        config = get_ingest_config(use_case_id)
+        config = get_ingest_config(use_case_id, IndexerStorage.POSTGRES)
         writes_limiter = writes_limiter_factory.get_ratelimiter(config)
         writes_limiter = writes_limiter_factory.get_ratelimiter(config)
 
 
         with writes_limiter.check_write_limits(use_case_id, db_write_keys) as writes_limiter_state:
         with writes_limiter.check_write_limits(use_case_id, db_write_keys) as writes_limiter_state:

+ 5 - 2
src/sentry/sentry_metrics/indexer/ratelimiters.py

@@ -114,7 +114,11 @@ class WritesLimiter:
         self.rate_limiter: RedisSlidingWindowRateLimiter = RedisSlidingWindowRateLimiter(**options)
         self.rate_limiter: RedisSlidingWindowRateLimiter = RedisSlidingWindowRateLimiter(**options)
 
 
     @metrics.wraps("sentry_metrics.indexer.check_write_limits")
     @metrics.wraps("sentry_metrics.indexer.check_write_limits")
-    def check_write_limits(self, use_case_id: UseCaseKey, keys: KeyCollection) -> RateLimitState:
+    def check_write_limits(
+        self,
+        use_case_id: UseCaseKey,
+        keys: KeyCollection,
+    ) -> RateLimitState:
         """
         """
         Takes a KeyCollection and applies DB write limits as configured via sentry.options.
         Takes a KeyCollection and applies DB write limits as configured via sentry.options.
 
 
@@ -127,7 +131,6 @@ class WritesLimiter:
 
 
         Upon (successful) exit, rate limits are consumed.
         Upon (successful) exit, rate limits are consumed.
         """
         """
-
         org_ids, requests = _construct_quota_requests(use_case_id, self.namespace, keys)
         org_ids, requests = _construct_quota_requests(use_case_id, self.namespace, keys)
         timestamp, grants = self.rate_limiter.check_within_quotas(requests)
         timestamp, grants = self.rate_limiter.check_within_quotas(requests)
 
 

+ 2 - 1
tests/sentry/sentry_metrics/test_configuration.py

@@ -1,12 +1,13 @@
 from sentry.sentry_metrics.configuration import (
 from sentry.sentry_metrics.configuration import (
     _METRICS_INGEST_CONFIG_BY_USE_CASE,
     _METRICS_INGEST_CONFIG_BY_USE_CASE,
+    IndexerStorage,
     UseCaseKey,
     UseCaseKey,
     get_ingest_config,
     get_ingest_config,
 )
 )
 
 
 
 
 def test_unique_namespaces() -> None:
 def test_unique_namespaces() -> None:
-    get_ingest_config(UseCaseKey.RELEASE_HEALTH)
+    get_ingest_config(UseCaseKey.RELEASE_HEALTH, IndexerStorage.POSTGRES)
     namespaces = [
     namespaces = [
         config.writes_limiter_namespace for config in _METRICS_INGEST_CONFIG_BY_USE_CASE.values()
         config.writes_limiter_namespace for config in _METRICS_INGEST_CONFIG_BY_USE_CASE.values()
     ]
     ]

+ 2 - 2
tests/sentry/sentry_metrics/test_last_seen_updater.py

@@ -7,7 +7,7 @@ from arroyo.backends.kafka import KafkaPayload
 from django.utils import timezone
 from django.utils import timezone
 
 
 from sentry.metrics.dummy import DummyMetricsBackend
 from sentry.metrics.dummy import DummyMetricsBackend
-from sentry.sentry_metrics.configuration import UseCaseKey, get_ingest_config
+from sentry.sentry_metrics.configuration import IndexerStorage, UseCaseKey, get_ingest_config
 from sentry.sentry_metrics.consumers.last_seen_updater import (
 from sentry.sentry_metrics.consumers.last_seen_updater import (
     LastSeenUpdaterMessageFilter,
     LastSeenUpdaterMessageFilter,
     _last_seen_updater_processing_factory,
     _last_seen_updater_processing_factory,
@@ -96,7 +96,7 @@ class TestLastSeenUpdaterEndToEnd(TestCase):
     @staticmethod
     @staticmethod
     def processing_factory():
     def processing_factory():
         return _last_seen_updater_processing_factory(
         return _last_seen_updater_processing_factory(
-            ingest_config=get_ingest_config(UseCaseKey.RELEASE_HEALTH),
+            ingest_config=get_ingest_config(UseCaseKey.RELEASE_HEALTH, IndexerStorage.POSTGRES),
             max_batch_time=1.0,
             max_batch_time=1.0,
             max_batch_size=1,
             max_batch_size=1,
         )
         )

Some files were not shown because too many files changed in this diff