Browse Source

fix: Refactor how KAFKA_TOPICS is resolved to improve typing (#51284)

Markus Unterwaditzer 1 year ago
parent
commit
ce590b9c38

+ 0 - 14
pyproject.toml

@@ -503,9 +503,6 @@ module = [
     "sentry.eventstore.compressor",
     "sentry.eventstore.models",
     "sentry.eventstore.snuba.backend",
-    "sentry.eventstream.base",
-    "sentry.eventstream.kafka.backend",
-    "sentry.eventstream.snuba",
     "sentry.features.handler",
     "sentry.features.helpers",
     "sentry.features.manager",
@@ -548,7 +545,6 @@ module = [
     "sentry.incidents.models",
     "sentry.incidents.subscription_processor",
     "sentry.incidents.tasks",
-    "sentry.ingest.billing_metrics_consumer",
     "sentry.ingest.consumer_v2.factory",
     "sentry.ingest.inbound_filters",
     "sentry.ingest.ingest_consumer",
@@ -676,8 +672,6 @@ module = [
     "sentry.issues.issue_occurrence",
     "sentry.issues.merge",
     "sentry.issues.occurrence_consumer",
-    "sentry.issues.producer",
-    "sentry.issues.run",
     "sentry.issues.search",
     "sentry.issues.status_change",
     "sentry.lang.dart.utils",
@@ -798,7 +792,6 @@ module = [
     "sentry.models.team",
     "sentry.models.user",
     "sentry.models.userip",
-    "sentry.monitors.consumers",
     "sentry.monitors.consumers.monitor_consumer",
     "sentry.monitors.endpoints.base",
     "sentry.monitors.endpoints.monitor_ingest_checkin_attachment",
@@ -858,8 +851,6 @@ module = [
     "sentry.plugins.sentry_webhooks.plugin",
     "sentry.plugins.utils",
     "sentry.plugins.validators.url",
-    "sentry.post_process_forwarder.post_process_forwarder",
-    "sentry.profiles.consumers",
     "sentry.profiles.task",
     "sentry.profiles.utils",
     "sentry.projectoptions.defaults",
@@ -880,7 +871,6 @@ module = [
     "sentry.release_health.sessions",
     "sentry.release_health.tasks",
     "sentry.replays.cache",
-    "sentry.replays.consumers",
     "sentry.replays.endpoints.organization_replay_count",
     "sentry.replays.endpoints.project_replay_clicks_index",
     "sentry.replays.endpoints.project_replay_recording_segment_details",
@@ -891,7 +881,6 @@ module = [
     "sentry.replays.query",
     "sentry.replays.tasks",
     "sentry.replays.testutils",
-    "sentry.replays.usecases.ingest.dom_index",
     "sentry.replays.usecases.reader",
     "sentry.reprocessing2",
     "sentry.roles",
@@ -951,8 +940,6 @@ module = [
     "sentry.sentry_apps.components",
     "sentry.sentry_apps.installations",
     "sentry.sentry_metrics.configuration",
-    "sentry.sentry_metrics.consumers.indexer.common",
-    "sentry.sentry_metrics.consumers.indexer.multiprocess",
     "sentry.sentry_metrics.consumers.indexer.slicing_router",
     "sentry.sentry_metrics.indexer.postgres.postgres_v2",
     "sentry.services.hybrid_cloud.actor",
@@ -995,7 +982,6 @@ module = [
     "sentry.snuba.metrics_performance",
     "sentry.snuba.models",
     "sentry.snuba.outcomes",
-    "sentry.snuba.query_subscriptions.run",
     "sentry.snuba.referrer",
     "sentry.snuba.sessions",
     "sentry.snuba.sessions_v2",

+ 2 - 18
src/sentry/conf/server.py

@@ -11,19 +11,7 @@ import socket
 import sys
 import tempfile
 from datetime import datetime, timedelta
-from typing import (
-    Any,
-    Callable,
-    Dict,
-    Iterable,
-    Mapping,
-    Optional,
-    Tuple,
-    TypedDict,
-    TypeVar,
-    Union,
-    overload,
-)
+from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Tuple, TypeVar, Union, overload
 from urllib.parse import urlparse
 
 import sentry
@@ -3024,12 +3012,8 @@ KAFKA_SUBSCRIPTION_RESULT_TOPICS = {
 }
 
 
-class TopicDefinition(TypedDict):
-    cluster: str
-
-
 # Cluster configuration for each Kafka topic by name.
-KAFKA_TOPICS: Mapping[str, Optional[TopicDefinition]] = {
+KAFKA_TOPICS: Mapping[str, Optional[sentry.conf.types.TopicDefinition]] = {
     KAFKA_EVENTS: {"cluster": "default"},
     KAFKA_EVENTS_COMMIT_LOG: {"cluster": "default"},
     KAFKA_TRANSACTIONS: {"cluster": "default"},

+ 8 - 0
src/sentry/conf/types.py

@@ -0,0 +1,8 @@
+from typing import TypedDict
+
+# Do not import _anything_ by sentry from here, or mypy/django-stubs will break
+# typing for django settings globally.
+
+
+class TopicDefinition(TypedDict):
+    cluster: str

+ 1 - 1
src/sentry/eventstream/base.py

@@ -110,7 +110,7 @@ class EventStream(Service):
                 queue=queue,
             )
 
-    def _get_queue_for_post_process(self, event: Event) -> str:
+    def _get_queue_for_post_process(self, event: Event | GroupEvent) -> str:
         event_type = self._get_event_type(event)
         if event_type == EventStreamEventType.Transaction:
             return "post_process_transactions"

+ 3 - 3
src/sentry/eventstream/kafka/backend.py

@@ -25,7 +25,7 @@ from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtoco
 from sentry.killswitches import killswitch_matches_context
 from sentry.post_process_forwarder import PostProcessForwarder, PostProcessForwarderType
 from sentry.utils import json
-from sentry.utils.kafka_config import get_kafka_producer_cluster_options
+from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
 
 logger = logging.getLogger(__name__)
 
@@ -45,7 +45,7 @@ class KafkaEventStream(SnubaProtocolEventStream):
 
     def get_producer(self, topic: str) -> Producer:
         if topic not in self.__producers:
-            cluster_name = settings.KAFKA_TOPICS[topic]["cluster"]
+            cluster_name = get_topic_definition(topic)["cluster"]
             cluster_options = get_kafka_producer_cluster_options(cluster_name)
             self.__producers[topic] = Producer(cluster_options)
 
@@ -57,7 +57,7 @@ class KafkaEventStream(SnubaProtocolEventStream):
 
     def _get_headers_for_insert(
         self,
-        event: Event,
+        event: Event | GroupEvent,
         is_new: bool,
         is_regression: bool,
         is_new_group_environment: bool,

+ 2 - 2
src/sentry/eventstream/snuba.py

@@ -94,7 +94,7 @@ class SnubaProtocolEventStream(EventStream):
 
     def _get_headers_for_insert(
         self,
-        event: Event,
+        event: Event | GroupEvent,
         is_new: bool,
         is_regression: bool,
         is_new_group_environment: bool,
@@ -129,7 +129,7 @@ class SnubaProtocolEventStream(EventStream):
             return
         project = event.project
         set_current_event_project(project.id)
-        retention_days = quotas.get_event_retention(organization=project.organization)
+        retention_days = quotas.backend.get_event_retention(organization=project.organization)
 
         event_data = event.get_raw_data(for_stream=True)
 

+ 2 - 2
src/sentry/ingest/billing_metrics_consumer.py

@@ -16,7 +16,7 @@ from sentry.sentry_metrics.configuration import UseCaseKey
 from sentry.sentry_metrics.indexer.strings import SHARED_TAG_STRINGS, TRANSACTION_METRICS_NAMES
 from sentry.sentry_metrics.utils import reverse_resolve_tag_value
 from sentry.utils import json
-from sentry.utils.kafka_config import get_kafka_consumer_cluster_options
+from sentry.utils.kafka_config import get_kafka_consumer_cluster_options, get_topic_definition
 from sentry.utils.outcomes import Outcome, track_outcome
 
 logger = logging.getLogger(__name__)
@@ -49,7 +49,7 @@ def get_metrics_billing_consumer(
 
 
 def _get_bootstrap_servers(topic: str, force_cluster: Union[str, None]) -> Sequence[str]:
-    cluster = force_cluster or settings.KAFKA_TOPICS[topic]["cluster"]
+    cluster = force_cluster or get_topic_definition(topic)["cluster"]
 
     options = get_kafka_consumer_cluster_options(cluster)
     servers = options["bootstrap.servers"]

+ 2 - 2
src/sentry/issues/producer.py

@@ -9,11 +9,11 @@ from django.conf import settings
 from sentry.issues.issue_occurrence import IssueOccurrence
 from sentry.utils import json
 from sentry.utils.arroyo_producer import SingletonProducer
-from sentry.utils.kafka_config import get_kafka_producer_cluster_options
+from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
 
 
 def _get_occurrence_producer() -> KafkaProducer:
-    cluster_name = settings.KAFKA_TOPICS[settings.KAFKA_INGEST_OCCURRENCES]["cluster"]
+    cluster_name = get_topic_definition(settings.KAFKA_INGEST_OCCURRENCES)["cluster"]
     producer_config = get_kafka_producer_cluster_options(cluster_name)
     producer_config.pop("compression.type", None)
     producer_config.pop("message.max.bytes", None)

+ 2 - 3
src/sentry/issues/run.py

@@ -14,6 +14,7 @@ from arroyo.processing.strategies import (
 from arroyo.types import Commit, Message, Partition
 
 from sentry.utils.arroyo import RunTaskWithMultiprocessing
+from sentry.utils.kafka_config import get_topic_definition
 
 logger = logging.getLogger(__name__)
 
@@ -53,12 +54,10 @@ def create_ingest_occurences_consumer(
     input_block_size: int,
     output_block_size: int,
 ) -> StreamProcessor[KafkaPayload]:
-    from django.conf import settings
-
     from sentry.utils.batching_kafka_consumer import create_topics
     from sentry.utils.kafka_config import get_kafka_consumer_cluster_options
 
-    kafka_cluster = settings.KAFKA_TOPICS[topic_name]["cluster"]
+    kafka_cluster = get_topic_definition(topic_name)["cluster"]
     create_topics(kafka_cluster, [topic_name])
 
     consumer = KafkaConsumer(

+ 1 - 2
src/sentry/monitors/consumers/__init__.py

@@ -7,7 +7,6 @@ from arroyo.backends.kafka.configuration import build_kafka_consumer_configurati
 from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload
 from arroyo.commit import ONCE_PER_SECOND
 from arroyo.processing.processor import StreamProcessor
-from django.conf import settings
 
 from sentry.monitors.consumers.monitor_consumer import StoreMonitorCheckInStrategyFactory
 from sentry.utils import kafka_config
@@ -46,7 +45,7 @@ def get_config(
     strict_offset_reset: bool,
     force_cluster: str | None,
 ) -> MutableMapping[str, Any]:
-    cluster_name: str = force_cluster or settings.KAFKA_TOPICS[topic]["cluster"]
+    cluster_name: str = force_cluster or kafka_config.get_topic_definition(topic)["cluster"]
     create_topics(cluster_name, [topic])
     return build_kafka_consumer_configuration(
         kafka_config.get_kafka_consumer_cluster_options(

Some files were not shown because too many files changed in this diff