Browse Source

Revert "ref(indexer): allow spanner indexer in prod via indexer_db cli arg" (#38323)

Indexer is broken Reverts getsentry/sentry#37745
Nikhar Saxena 2 years ago
parent
commit
c68275d798

+ 0 - 9
src/sentry/conf/server.py

@@ -1484,8 +1484,6 @@ SENTRY_METRICS_INDEXER = "sentry.sentry_metrics.indexer.postgres.postgres_v2.Pos
 SENTRY_METRICS_INDEXER_OPTIONS = {}
 SENTRY_METRICS_INDEXER_CACHE_TTL = 3600 * 2
 
-SENTRY_METRICS_INDEXER_SPANNER_OPTIONS = {}
-
 # Rate limits during string indexing for our metrics product.
 # Which cluster to use. Example: {"cluster": "default"}
 SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS = {}
@@ -2422,11 +2420,6 @@ KAFKA_INGEST_PERFORMANCE_METRICS = "ingest-performance-metrics"
 KAFKA_SNUBA_GENERIC_METRICS = "snuba-generic-metrics"
 KAFKA_INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
 
-# topic for testing multiple indexer backends in parallel
-# in production. So far just testing backends for the perf data,
-# not release helth
-KAFKA_SNUBA_GENERICS_METRICS_CS = "snuba-metrics-generics-cloudspanner"
-
 KAFKA_SUBSCRIPTION_RESULT_TOPICS = {
     "events": KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
     "transactions": KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
@@ -2465,8 +2458,6 @@ KAFKA_TOPICS = {
     KAFKA_INGEST_PERFORMANCE_METRICS: {"cluster": "default"},
     KAFKA_SNUBA_GENERIC_METRICS: {"cluster": "default"},
     KAFKA_INGEST_REPLAYS_RECORDINGS: {"cluster": "default"},
-    # Metrics Testing Topics
-    KAFKA_SNUBA_GENERICS_METRICS_CS: {"cluster": "default"},
 }
 
 

+ 3 - 6
src/sentry/runner/commands/run.py

@@ -560,7 +560,6 @@ def ingest_consumer(consumer_types, all_consumer_types, **options):
 @click.option("--ingest-profile", required=True)
 @click.option("commit_max_batch_size", "--commit-max-batch-size", type=int, default=25000)
 @click.option("commit_max_batch_time", "--commit-max-batch-time-ms", type=int, default=10000)
-@click.option("--indexer-db", default="postgres")
 def metrics_streaming_consumer(**options):
     import sentry_sdk
 
@@ -570,7 +569,7 @@ def metrics_streaming_consumer(**options):
 
     use_case = UseCaseKey(options["ingest_profile"])
     sentry_sdk.set_tag("sentry_metrics.use_case_key", use_case.value)
-    ingest_config = get_ingest_config(use_case, options["indexer_db"])
+    ingest_config = get_ingest_config(use_case)
 
     streamer = get_streaming_metrics_consumer(indexer_profile=ingest_config, **options)
 
@@ -596,7 +595,6 @@ def metrics_streaming_consumer(**options):
 @click.option("--input-block-size", type=int, default=DEFAULT_BLOCK_SIZE)
 @click.option("--output-block-size", type=int, default=DEFAULT_BLOCK_SIZE)
 @click.option("--ingest-profile", required=True)
-@click.option("--indexer-db", default="postgres")
 @click.option("max_msg_batch_size", "--max-msg-batch-size", type=int, default=50)
 @click.option("max_msg_batch_time", "--max-msg-batch-time-ms", type=int, default=10000)
 @click.option("max_parallel_batch_size", "--max-parallel-batch-size", type=int, default=50)
@@ -610,7 +608,7 @@ def metrics_parallel_consumer(**options):
 
     use_case = UseCaseKey(options["ingest_profile"])
     sentry_sdk.set_tag("sentry_metrics.use_case_key", use_case.value)
-    ingest_config = get_ingest_config(use_case, options["db_backend"])
+    ingest_config = get_ingest_config(use_case)
 
     streamer = get_parallel_metrics_consumer(indexer_profile=ingest_config, **options)
 
@@ -656,13 +654,12 @@ def replays_recordings_consumer(**options):
 @click.option("commit_max_batch_time", "--commit-max-batch-time-ms", type=int, default=10000)
 @click.option("--topic", default="snuba-metrics", help="Topic to read indexer output from.")
 @click.option("--ingest-profile", required=True)
-@click.option("--indexer-db", default="postgres")
 def last_seen_updater(**options):
     from sentry.sentry_metrics.configuration import UseCaseKey, get_ingest_config
     from sentry.sentry_metrics.consumers.last_seen_updater import get_last_seen_updater
     from sentry.utils.metrics import global_tags
 
-    ingest_config = get_ingest_config(UseCaseKey(options["ingest_profile"]), options["indexer_db"])
+    ingest_config = get_ingest_config(UseCaseKey(options["ingest_profile"]))
 
     consumer = get_last_seen_updater(ingest_config=ingest_config, **options)
 

+ 5 - 66
src/sentry/sentry_metrics/configuration.py

@@ -1,6 +1,6 @@
 from dataclasses import dataclass
 from enum import Enum
-from typing import Any, Mapping, MutableMapping, Optional, Tuple
+from typing import Any, Mapping, MutableMapping, Optional
 
 from django.conf import settings
 
@@ -15,20 +15,10 @@ class UseCaseKey(Enum):
 # backwards compatibility
 RELEASE_HEALTH_PG_NAMESPACE = "releasehealth"
 PERFORMANCE_PG_NAMESPACE = "performance"
-RELEASE_HEALTH_CS_NAMESPACE = "releasehealth.cs"
-PERFORMANCE_CS_NAMESPACE = "performance.cs"
-
-
-class IndexerStorage(Enum):
-    CLOUDSPANNER = "cloudspanner"
-    POSTGRES = "postgres"
-    MOCK = "mock"
 
 
 @dataclass(frozen=True)
 class MetricsIngestConfiguration:
-    db_backend: IndexerStorage
-    db_backend_options: Mapping[str, Any]
     input_topic: str
     output_topic: str
     use_case_id: UseCaseKey
@@ -37,23 +27,17 @@ class MetricsIngestConfiguration:
     writes_limiter_namespace: str
 
 
-_METRICS_INGEST_CONFIG_BY_USE_CASE: MutableMapping[
-    Tuple[UseCaseKey, IndexerStorage], MetricsIngestConfiguration
-] = dict()
+_METRICS_INGEST_CONFIG_BY_USE_CASE: MutableMapping[UseCaseKey, MetricsIngestConfiguration] = dict()
 
 
 def _register_ingest_config(config: MetricsIngestConfiguration) -> None:
-    _METRICS_INGEST_CONFIG_BY_USE_CASE[(config.use_case_id, config.db_backend)] = config
+    _METRICS_INGEST_CONFIG_BY_USE_CASE[config.use_case_id] = config
 
 
-def get_ingest_config(
-    use_case_key: UseCaseKey, db_backend: IndexerStorage
-) -> MetricsIngestConfiguration:
+def get_ingest_config(use_case_key: UseCaseKey) -> MetricsIngestConfiguration:
     if len(_METRICS_INGEST_CONFIG_BY_USE_CASE) == 0:
         _register_ingest_config(
             MetricsIngestConfiguration(
-                db_backend=IndexerStorage.POSTGRES,
-                db_backend_options={},
                 input_topic=settings.KAFKA_INGEST_METRICS,
                 output_topic=settings.KAFKA_SNUBA_METRICS,
                 use_case_id=UseCaseKey.RELEASE_HEALTH,
@@ -62,11 +46,8 @@ def get_ingest_config(
                 writes_limiter_namespace=RELEASE_HEALTH_PG_NAMESPACE,
             )
         )
-
         _register_ingest_config(
             MetricsIngestConfiguration(
-                db_backend=IndexerStorage.POSTGRES,
-                db_backend_options={},
                 input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS,
                 output_topic=settings.KAFKA_SNUBA_GENERIC_METRICS,
                 use_case_id=UseCaseKey.PERFORMANCE,
@@ -76,46 +57,4 @@ def get_ingest_config(
             )
         )
 
-        _register_ingest_config(
-            MetricsIngestConfiguration(
-                db_backend=IndexerStorage.CLOUDSPANNER,
-                # todo: set cloudspanner options of db and instance ids
-                db_backend_options=settings.SENTRY_METRICS_INDEXER_SPANNER_OPTIONS,
-                input_topic=settings.KAFKA_INGEST_METRICS,
-                output_topic=settings.KAFKA_SNUBA_GENERICS_METRICS_CS,
-                use_case_id=UseCaseKey.RELEASE_HEALTH,
-                internal_metrics_tag="release-health-spanner",
-                writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS,
-                writes_limiter_namespace=RELEASE_HEALTH_CS_NAMESPACE,
-            )
-        )
-
-        _register_ingest_config(
-            MetricsIngestConfiguration(
-                db_backend=IndexerStorage.CLOUDSPANNER,
-                # todo: set cloudspanner options of db and instance ids
-                db_backend_options=settings.SENTRY_METRICS_INDEXER_SPANNER_OPTIONS,
-                input_topic=settings.KAFKA_INGEST_PERFORMANCE_METRICS,
-                output_topic=settings.KAFKA_SNUBA_GENERICS_METRICS_CS,
-                use_case_id=UseCaseKey.PERFORMANCE,
-                internal_metrics_tag="perf-spanner",
-                writes_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_WRITES_LIMITER_OPTIONS_PERFORMANCE,
-                writes_limiter_namespace=PERFORMANCE_CS_NAMESPACE,
-            )
-        )
-
-    if db_backend == IndexerStorage.MOCK:
-        _register_ingest_config(
-            MetricsIngestConfiguration(
-                db_backend=IndexerStorage.MOCK,
-                db_backend_options={},
-                input_topic="topic",
-                output_topic="output-topic",
-                use_case_id=use_case_key,
-                internal_metrics_tag="release-health",
-                writes_limiter_cluster_options={},
-                writes_limiter_namespace="test-namespace",
-            )
-        )
-
-    return _METRICS_INGEST_CONFIG_BY_USE_CASE[(use_case_key, db_backend)]
+    return _METRICS_INGEST_CONFIG_BY_USE_CASE[use_case_key]

+ 1 - 1
src/sentry/sentry_metrics/consumers/indexer/multiprocess.py

@@ -69,7 +69,7 @@ class TransformStep(ProcessingStep[MessageBatch]):
     def __init__(
         self, next_step: ProcessingStep[KafkaPayload], config: MetricsIngestConfiguration
     ) -> None:
-        self.__message_processor: MessageProcessor = MessageProcessor(config)
+        self.__message_processor: MessageProcessor = MessageProcessor(config.use_case_id)
         self.__next_step = next_step
         self.__closed = False
 

+ 1 - 1
src/sentry/sentry_metrics/consumers/indexer/parallel.py

@@ -112,7 +112,7 @@ class MetricsConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         partitions: Mapping[Partition, int],
     ) -> ProcessingStrategy[KafkaPayload]:
         parallel_strategy = ParallelTransformStep(
-            MessageProcessor(self.__config).process_messages,
+            MessageProcessor(self.__config.use_case_id).process_messages,
             Unbatcher(
                 SimpleProduceStep(
                     commit_function=commit,

+ 9 - 17
src/sentry/sentry_metrics/consumers/indexer/processing.py

@@ -1,30 +1,22 @@
 import logging
-from typing import Callable, Mapping
 
 from arroyo.types import Message
 
-from sentry.sentry_metrics.configuration import IndexerStorage, MetricsIngestConfiguration
+from sentry.sentry_metrics import indexer
+from sentry.sentry_metrics.configuration import UseCaseKey
 from sentry.sentry_metrics.consumers.indexer.batch import IndexerBatch
 from sentry.sentry_metrics.consumers.indexer.common import MessageBatch
-from sentry.sentry_metrics.indexer.base import StringIndexer
-from sentry.sentry_metrics.indexer.cloudspanner.cloudspanner import CloudSpannerIndexer
-from sentry.sentry_metrics.indexer.mock import MockIndexer
-from sentry.sentry_metrics.indexer.postgres.postgres_v2 import PostgresIndexer
 from sentry.utils import metrics
 
 logger = logging.getLogger(__name__)
 
-STORAGE_TO_INDEXER: Mapping[IndexerStorage, Callable[[], StringIndexer]] = {
-    IndexerStorage.CLOUDSPANNER: CloudSpannerIndexer,
-    IndexerStorage.POSTGRES: PostgresIndexer,
-    IndexerStorage.MOCK: MockIndexer,
-}
-
 
 class MessageProcessor:
-    def __init__(self, config: MetricsIngestConfiguration):
-        self._indexer = STORAGE_TO_INDEXER[config.db_backend](**config.db_backend_options)
-        self._config = config
+    # todo: update message processor to take config instead of just use case
+    # and use the config to initialize indexer vs using service model
+    def __init__(self, use_case_id: UseCaseKey):
+        self._use_case_id = use_case_id
+        self._indexer = indexer
 
     def process_messages(
         self,
@@ -48,13 +40,13 @@ class MessageProcessor:
         The value of the message is what we need to parse and then translate
         using the indexer.
         """
-        batch = IndexerBatch(self._config.use_case_id, outer_message)
+        batch = IndexerBatch(self._use_case_id, outer_message)
 
         org_strings = batch.extract_strings()
 
         with metrics.timer("metrics_consumer.bulk_record"):
             record_result = self._indexer.bulk_record(
-                use_case_id=self._config.use_case_id, org_strings=org_strings
+                use_case_id=self._use_case_id, org_strings=org_strings
             )
 
         mapping = record_result.get_mapped_results()

+ 2 - 2
src/sentry/sentry_metrics/indexer/postgres/postgres_v2.py

@@ -5,7 +5,7 @@ from typing import Any, Mapping, Optional, Set
 from django.conf import settings
 from django.db.models import Q
 
-from sentry.sentry_metrics.configuration import IndexerStorage, UseCaseKey, get_ingest_config
+from sentry.sentry_metrics.configuration import UseCaseKey, get_ingest_config
 from sentry.sentry_metrics.indexer.base import (
     FetchType,
     KeyCollection,
@@ -77,7 +77,7 @@ class PGStringIndexerV2(StringIndexer):
         if db_write_keys.size == 0:
             return db_read_key_results
 
-        config = get_ingest_config(use_case_id, IndexerStorage.POSTGRES)
+        config = get_ingest_config(use_case_id)
         writes_limiter = writes_limiter_factory.get_ratelimiter(config)
 
         with writes_limiter.check_write_limits(use_case_id, db_write_keys) as writes_limiter_state:

+ 2 - 5
src/sentry/sentry_metrics/indexer/ratelimiters.py

@@ -114,11 +114,7 @@ class WritesLimiter:
         self.rate_limiter: RedisSlidingWindowRateLimiter = RedisSlidingWindowRateLimiter(**options)
 
     @metrics.wraps("sentry_metrics.indexer.check_write_limits")
-    def check_write_limits(
-        self,
-        use_case_id: UseCaseKey,
-        keys: KeyCollection,
-    ) -> RateLimitState:
+    def check_write_limits(self, use_case_id: UseCaseKey, keys: KeyCollection) -> RateLimitState:
         """
         Takes a KeyCollection and applies DB write limits as configured via sentry.options.
 
@@ -131,6 +127,7 @@ class WritesLimiter:
 
         Upon (successful) exit, rate limits are consumed.
         """
+
         org_ids, requests = _construct_quota_requests(use_case_id, self.namespace, keys)
         timestamp, grants = self.rate_limiter.check_within_quotas(requests)
 

+ 1 - 2
tests/sentry/sentry_metrics/test_configuration.py

@@ -1,13 +1,12 @@
 from sentry.sentry_metrics.configuration import (
     _METRICS_INGEST_CONFIG_BY_USE_CASE,
-    IndexerStorage,
     UseCaseKey,
     get_ingest_config,
 )
 
 
 def test_unique_namespaces() -> None:
-    get_ingest_config(UseCaseKey.RELEASE_HEALTH, IndexerStorage.POSTGRES)
+    get_ingest_config(UseCaseKey.RELEASE_HEALTH)
     namespaces = [
         config.writes_limiter_namespace for config in _METRICS_INGEST_CONFIG_BY_USE_CASE.values()
     ]

+ 2 - 2
tests/sentry/sentry_metrics/test_last_seen_updater.py

@@ -7,7 +7,7 @@ from arroyo.backends.kafka import KafkaPayload
 from django.utils import timezone
 
 from sentry.metrics.dummy import DummyMetricsBackend
-from sentry.sentry_metrics.configuration import IndexerStorage, UseCaseKey, get_ingest_config
+from sentry.sentry_metrics.configuration import UseCaseKey, get_ingest_config
 from sentry.sentry_metrics.consumers.last_seen_updater import (
     LastSeenUpdaterMessageFilter,
     _last_seen_updater_processing_factory,
@@ -96,7 +96,7 @@ class TestLastSeenUpdaterEndToEnd(TestCase):
     @staticmethod
     def processing_factory():
         return _last_seen_updater_processing_factory(
-            ingest_config=get_ingest_config(UseCaseKey.RELEASE_HEALTH, IndexerStorage.POSTGRES),
+            ingest_config=get_ingest_config(UseCaseKey.RELEASE_HEALTH),
             max_batch_time=1.0,
             max_batch_size=1,
         )

Some files were not shown because too many files changed in this diff