Просмотр исходного кода

feat(post-process-forwarder) Functioning batching batching_kafka_consumer (#28801)

This PR integrates the different components to build a functioning
batching kafka consumer for post process forwarder.

The older streaming consumer is removed and all the business logic
of processing has been moved to PostProcessWorker class.
Nikhar Saxena 3 лет назад
Родитель
Сommit
5cfbd4be3d

+ 3 - 0
src/sentry/conf/server.py

@@ -2388,3 +2388,6 @@ DEMO_DATA_QUICK_GEN_PARAMS = {}
 
 # adds an extra JS to HTML template
 INJECTED_SCRIPT_ASSETS = []
+
+# Sentry post process forwarder use batching consumer
+SENTRY_POST_PROCESS_FORWARDER_BATCHING = False

+ 1 - 0
src/sentry/eventstream/base.py

@@ -124,6 +124,7 @@ class EventStream(Service):
         commit_log_topic,
         synchronize_commit_group,
         commit_batch_size=100,
+        commit_batch_timeout_ms=5000,
         initial_offset_reset="latest",
     ):
         assert not self.requires_post_process_forwarder()

+ 95 - 14
src/sentry/eventstream/kafka/backend.py

@@ -1,8 +1,6 @@
 import logging
-import random
 import signal
-from contextlib import contextmanager
-from typing import Any, Generator, Mapping, Optional, Tuple
+from typing import Any, Mapping, Optional, Tuple
 
 from confluent_kafka import OFFSET_INVALID, TopicPartition
 from django.conf import settings
@@ -10,12 +8,18 @@ from django.utils.functional import cached_property
 
 from sentry import options
 from sentry.eventstream.kafka.consumer import SynchronizedConsumer
+from sentry.eventstream.kafka.postprocessworker import (
+    _CONCURRENCY_OPTION,
+    PostProcessForwarderWorker,
+    _sampled_eventstream_timer,
+)
 from sentry.eventstream.kafka.protocol import (
     get_task_kwargs_for_message,
     get_task_kwargs_for_message_from_headers,
 )
 from sentry.eventstream.snuba import SnubaProtocolEventStream
 from sentry.utils import json, kafka, metrics
+from sentry.utils.batching_kafka_consumer import BatchingKafkaConsumer
 
 logger = logging.getLogger(__name__)
 
@@ -137,16 +141,72 @@ class KafkaEventStream(SnubaProtocolEventStream):
     def requires_post_process_forwarder(self):
         return True
 
-    def run_post_process_forwarder(
+    def _build_consumer(
         self,
         consumer_group,
         commit_log_topic,
         synchronize_commit_group,
         commit_batch_size=100,
+        commit_batch_timeout_ms=5000,
         initial_offset_reset="latest",
     ):
-        logger.debug("Starting post-process forwarder...")
+        cluster_name = settings.KAFKA_TOPICS[settings.KAFKA_EVENTS]["cluster"]
+
+        synchronized_consumer = SynchronizedConsumer(
+            cluster_name=cluster_name,
+            consumer_group=consumer_group,
+            commit_log_topic=commit_log_topic,
+            synchronize_commit_group=synchronize_commit_group,
+            initial_offset_reset=initial_offset_reset,
+        )
+
+        concurrency = options.get(_CONCURRENCY_OPTION)
+        worker = PostProcessForwarderWorker(concurrency=concurrency)
+
+        consumer = BatchingKafkaConsumer(
+            topics=self.topic,
+            worker=worker,
+            max_batch_size=commit_batch_size,
+            max_batch_time=commit_batch_timeout_ms,
+            consumer=synchronized_consumer,
+            commit_on_shutdown=True,
+        )
+        return consumer
+
+    def run_batched_consumer(
+        self,
+        consumer_group,
+        commit_log_topic,
+        synchronize_commit_group,
+        commit_batch_size=100,
+        commit_batch_timeout_ms=5000,
+        initial_offset_reset="latest",
+    ):
+        consumer = self._build_consumer(
+            consumer_group,
+            commit_log_topic,
+            synchronize_commit_group,
+            commit_batch_size,
+            commit_batch_timeout_ms,
+            initial_offset_reset,
+        )
+
+        def handler(signum, frame):
+            consumer.signal_shutdown()
+
+        signal.signal(signal.SIGINT, handler)
+        signal.signal(signal.SIGTERM, handler)
 
+        consumer.run()
+
+    def run_streaming_consumer(
+        self,
+        consumer_group,
+        commit_log_topic,
+        synchronize_commit_group,
+        commit_batch_size=100,
+        initial_offset_reset="latest",
+    ):
         cluster_name = settings.KAFKA_TOPICS[settings.KAFKA_EVENTS]["cluster"]
 
         consumer = SynchronizedConsumer(
@@ -280,13 +340,13 @@ class KafkaEventStream(SnubaProtocolEventStream):
 
             if use_kafka_headers is True:
                 try:
-                    with self.sampled_eventstream_timer(
+                    with _sampled_eventstream_timer(
                         instance="get_task_kwargs_for_message_from_headers"
                     ):
                         task_kwargs = get_task_kwargs_for_message_from_headers(message.headers())
 
                     if task_kwargs is not None:
-                        with self.sampled_eventstream_timer(
+                        with _sampled_eventstream_timer(
                             instance="dispatch_post_process_group_task"
                         ):
                             if task_kwargs["group_id"] is None:
@@ -334,12 +394,33 @@ class KafkaEventStream(SnubaProtocolEventStream):
             with metrics.timer("eventstream.duration", instance="dispatch_post_process_group_task"):
                 self._dispatch_post_process_group_task(**task_kwargs)
 
-    @contextmanager
-    def sampled_eventstream_timer(self, instance: str) -> Generator[None, None, None]:
+    def run_post_process_forwarder(
+        self,
+        consumer_group,
+        commit_log_topic,
+        synchronize_commit_group,
+        commit_batch_size=100,
+        commit_batch_timeout_ms=5000,
+        initial_offset_reset="latest",
+    ):
+        logger.debug("Starting post-process forwarder...")
 
-        record_metric = random.random() < 0.1
-        if record_metric is True:
-            with metrics.timer("eventstream.duration", instance=instance):
-                yield
+        if settings.SENTRY_POST_PROCESS_FORWARDER_BATCHING:
+            logger.info("Starting batching consumer")
+            self.run_batched_consumer(
+                consumer_group,
+                commit_log_topic,
+                synchronize_commit_group,
+                commit_batch_size,
+                commit_batch_timeout_ms,
+                initial_offset_reset,
+            )
         else:
-            yield
+            logger.info("Starting streaming consumer")
+            self.run_streaming_consumer(
+                consumer_group,
+                commit_log_topic,
+                synchronize_commit_group,
+                commit_batch_size,
+                initial_offset_reset,
+            )

+ 7 - 0
src/sentry/runner/commands/run.py

@@ -312,6 +312,12 @@ def cron(**options):
     type=int,
     help="How many messages to process (may or may not result in an enqueued task) before committing offsets.",
 )
+@click.option(
+    "--commit-batch-timeout-ms",
+    default=5000,
+    type=int,
+    help="Time (in milliseconds) to wait before closing current batch and committing offsets.",
+)
 @click.option(
     "--initial-offset-reset",
     default="latest",
@@ -330,6 +336,7 @@ def post_process_forwarder(**options):
             commit_log_topic=options["commit_log_topic"],
             synchronize_commit_group=options["synchronize_commit_group"],
             commit_batch_size=options["commit_batch_size"],
+            commit_batch_timeout_ms=options["commit_batch_timeout_ms"],
             initial_offset_reset=options["initial_offset_reset"],
         )
     except ForwarderNotRequired:

+ 25 - 2
src/sentry/utils/batching_kafka_consumer.py

@@ -150,6 +150,21 @@ class BatchingKafkaConsumer:
     * Supports an optional "dead letter topic" where messages that raise an exception during
       `process_message` are sent so as not to block the pipeline.
 
+    A note on commit_on_shutdown parameter
+    If the process_message method of the worker provided to BatchingKafkaConsumer just works
+    with in memory stuff and does not influence/modify any external systems, then its ok to
+    keep the flag to False. But if the process_message method of the worker influences/modifies
+    any external systems then its necessary to set it to True to avoid duplicate work on the
+    external systems.
+    Example:
+        1. Worker process which deserializes the message and extracts a few needed parameters
+        can leave the commit_on_shutdown flag to False. This is ok since the next consumer which
+        picks up the work will rebuild its state from the messages which have not been committed.
+        2. Worker process which sends tasks to celery based on the message needs to set
+        commit_on_shutdown to True to avoid duplicate work.
+    This is different than the note below since crash scenarios are harder to handle and its ok for
+    duplicates to occur in crash cases.
+
     NOTE: This does not eliminate the possibility of duplicates if the consumer process
     crashes between writing to its backend and commiting Kafka offsets. This should eliminate
     the possibility of *losing* data though. An "exactly once" consumer would need to store
@@ -184,6 +199,7 @@ class BatchingKafkaConsumer:
         queued_min_messages=DEFAULT_QUEUED_MIN_MESSAGES,
         metrics_sample_rates=None,
         metrics_default_tags=None,
+        commit_on_shutdown: bool = False,
     ):
         assert isinstance(worker, AbstractBatchWorker)
         self.worker = worker
@@ -196,6 +212,7 @@ class BatchingKafkaConsumer:
         )
         self.__metrics_default_tags = metrics_default_tags or {}
         self.group_id = group_id
+        self.commit_on_shutdown = commit_on_shutdown
 
         self.shutdown = False
 
@@ -367,14 +384,20 @@ class BatchingKafkaConsumer:
     def _shutdown(self):
         logger.debug("Stopping")
 
-        # drop in-memory events, letting the next consumer take over where we left off
-        self._reset_batch()
+        if self.commit_on_shutdown:
+            self._flush(force=True)
+        else:
+            # drop in-memory events, letting the next consumer take over where we left off
+            self._reset_batch()
 
         # tell the consumer to shutdown, and close the consumer
         logger.debug("Stopping worker")
         self.worker.shutdown()
         logger.debug("Stopping consumer")
         self.consumer.close()
+        if self.dead_letter_topic:
+            logger.debug("Stopping producer")
+            self.producer.close()
         logger.debug("Stopped")
 
     def _reset_batch(self):

+ 113 - 0
tests/sentry/eventstream/kafka/test_consumer.py

@@ -1,10 +1,19 @@
 import os
 import subprocess
+import time
 import uuid
 from collections import defaultdict
 from contextlib import contextmanager
+from unittest.mock import patch
 
 import pytest
+from confluent_kafka.admin import AdminClient
+from django.test import override_settings
+
+from sentry.eventstream.kafka import KafkaEventStream
+from sentry.testutils import TestCase
+from sentry.utils import json, kafka_config
+from sentry.utils.batching_kafka_consumer import wait_for_topics
 
 try:
     from confluent_kafka import Consumer, KafkaError, Producer, TopicPartition
@@ -614,3 +623,107 @@ def test_consumer_rebalance_from_uncommitted_offset(requires_kafka):
         assert (
             message is None or message.error() is KafkaError._PARTITION_EOF
         ), "there should be no more messages to receive"
+
+
+def kafka_message_payload():
+    return [
+        2,
+        "insert",
+        {
+            "group_id": 43,
+            "event_id": "fe0ee9a2bc3b415497bad68aaf70dc7f",
+            "organization_id": 1,
+            "project_id": 1,
+            "primary_hash": "311ee66a5b8e697929804ceb1c456ffe",
+        },
+        {
+            "is_new": False,
+            "is_regression": None,
+            "is_new_group_environment": False,
+            "skip_consume": False,
+        },
+    ]
+
+
+@pytest.mark.usefixtures("requires_kafka")
+class BatchedConsumerTest(TestCase):
+    def _get_producer(self, topic):
+        cluster_name = settings.KAFKA_TOPICS[topic]["cluster"]
+        conf = {
+            "bootstrap.servers": settings.KAFKA_CLUSTERS[cluster_name]["common"][
+                "bootstrap.servers"
+            ],
+            "session.timeout.ms": 6000,
+        }
+        return Producer(conf)
+
+    def setUp(self):
+        super().setUp()
+        self.events_topic = f"events-{uuid.uuid4().hex}"
+        self.commit_log_topic = f"events-commit-{uuid.uuid4().hex}"
+        self.override_settings_cm = override_settings(
+            KAFKA_TOPICS={
+                "events": {"cluster": "default", "topic": self.events_topic},
+                "snuba-commit-log": {"cluster": "default", "topic": self.commit_log_topic},
+            },
+        )
+        self.override_settings_cm.__enter__()
+
+        cluster_options = kafka_config.get_kafka_admin_cluster_options(
+            "default", {"allow.auto.create.topics": "true"}
+        )
+        self.admin_client = AdminClient(cluster_options)
+        wait_for_topics(self.admin_client, [self.events_topic, self.commit_log_topic])
+
+    def tearDown(self):
+        super().tearDown()
+        self.override_settings_cm.__exit__(None, None, None)
+        self.admin_client.delete_topics([self.events_topic, self.commit_log_topic])
+
+    @patch("sentry.eventstream.kafka.postprocessworker.dispatch_post_process_group_task")
+    def test_post_process_forwarder_batch_consumer(self, dispatch_post_process_group_task):
+        consumer_group = f"consumer-{uuid.uuid1().hex}"
+        synchronize_commit_group = f"sync-consumer-{uuid.uuid1().hex}"
+
+        events_producer = self._get_producer("events")
+        commit_log_producer = self._get_producer("snuba-commit-log")
+        message = json.dumps(kafka_message_payload()).encode()
+
+        eventstream = KafkaEventStream()
+        consumer = eventstream._build_consumer(
+            consumer_group=consumer_group,
+            commit_log_topic=self.commit_log_topic,
+            synchronize_commit_group=synchronize_commit_group,
+            commit_batch_size=1,
+            initial_offset_reset="earliest",
+        )
+
+        # produce message to the events topic
+        events_producer.produce(self.events_topic, message)
+        assert events_producer.flush(5) == 0, "events producer did not successfully flush queue"
+
+        # Move the committed offset forward for our synchronizing group.
+        commit_log_producer.produce(
+            self.commit_log_topic,
+            key=f"{self.events_topic}:0:{synchronize_commit_group}".encode(),
+            value=f"{1}".encode(),
+        )
+        assert (
+            commit_log_producer.flush(5) == 0
+        ), "snuba-commit-log producer did not successfully flush queue"
+
+        # Run the loop for sometime
+        for _ in range(3):
+            consumer._run_once()
+            time.sleep(1)
+
+        # Verify that the task gets called once
+        dispatch_post_process_group_task.assert_called_once_with(
+            event_id="fe0ee9a2bc3b415497bad68aaf70dc7f",
+            project_id=1,
+            group_id=43,
+            primary_hash="311ee66a5b8e697929804ceb1c456ffe",
+            is_new=False,
+            is_regression=None,
+            is_new_group_environment=False,
+        )