Browse Source

ref: Don't create topics in consumers (#51915)

We no longer do this pattern in our consumers - devserver creates all
the topics required for dev, and tests should create any topics that are
needed for that test.
Lyn Nagara 1 year ago
parent
commit
a2416e276f

+ 0 - 2
src/sentry/issues/run.py

@@ -54,11 +54,9 @@ def create_ingest_occurences_consumer(
     input_block_size: int,
     output_block_size: int,
 ) -> StreamProcessor[KafkaPayload]:
-    from sentry.utils.batching_kafka_consumer import create_topics
     from sentry.utils.kafka_config import get_kafka_consumer_cluster_options
 
     kafka_cluster = get_topic_definition(topic_name)["cluster"]
-    create_topics(kafka_cluster, [topic_name])
 
     consumer = KafkaConsumer(
         build_kafka_consumer_configuration(

+ 0 - 2
src/sentry/monitors/consumers/__init__.py

@@ -10,7 +10,6 @@ from arroyo.processing.processor import StreamProcessor
 
 from sentry.monitors.consumers.monitor_consumer import StoreMonitorCheckInStrategyFactory
 from sentry.utils import kafka_config
-from sentry.utils.batching_kafka_consumer import create_topics
 
 
 def get_monitor_check_ins_consumer(
@@ -46,7 +45,6 @@ def get_config(
     force_cluster: str | None,
 ) -> MutableMapping[str, Any]:
     cluster_name: str = force_cluster or kafka_config.get_topic_definition(topic)["cluster"]
-    create_topics(cluster_name, [topic])
     return build_kafka_consumer_configuration(
         kafka_config.get_kafka_consumer_cluster_options(
             cluster_name,

+ 0 - 2
src/sentry/profiles/consumers/__init__.py

@@ -10,7 +10,6 @@ from arroyo.processing.processor import StreamProcessor
 
 from sentry.profiles.consumers.process.factory import ProcessProfileStrategyFactory
 from sentry.utils import kafka_config
-from sentry.utils.batching_kafka_consumer import create_topics
 
 
 def get_profiles_process_consumer(
@@ -46,7 +45,6 @@ def get_config(
     force_cluster: str | None,
 ) -> MutableMapping[str, Any]:
     cluster_name: str = force_cluster or kafka_config.get_topic_definition(topic)["cluster"]
-    create_topics(cluster_name, [topic])
     return build_kafka_consumer_configuration(
         kafka_config.get_kafka_consumer_cluster_options(
             cluster_name,