Browse Source

feat(metrics): Add metrics consumer for billing outcomes (#39236)

Add a consumer that reads from `snuba-metrics` (the topic the metrics
indexer produces to), and calls `track_outcome` for processed
transactions.

`d:transactions/duration@millisecond` is the one metric that we
unconditionally extract for every transaction, so the number of elements
in the bucket value corresponds to the number of transactions that
contributed to the bucket of this metric. For each bucket, report this
length as the `quantity` of the tracked outcome.

Future work:

Replace the usage of `track_outcome` with a streaming consumer that
awaits delivery of outcomes before committing offsets.

Co-authored-by: Joris Bayer <joris.bayer@sentry.io>
Iker Barriocanal 2 years ago
parent
commit
5399ff90c1

+ 1 - 0
mypy.ini

@@ -57,6 +57,7 @@ files = src/sentry/analytics/,
         src/sentry/grouping/strategies/template.py,
         src/sentry/grouping/strategies/utils.py,
         src/sentry/incidents/charts.py,
+        src/sentry/ingest/billing_metrics_consumer.py,
         src/sentry/integrations/base.py,
         src/sentry/integrations/github/,
         src/sentry/integrations/slack/,

+ 192 - 0
src/sentry/ingest/billing_metrics_consumer.py

@@ -0,0 +1,192 @@
+import logging
+from datetime import datetime, timedelta
+from typing import (
+    Any,
+    Callable,
+    Mapping,
+    MutableMapping,
+    Optional,
+    Sequence,
+    TypedDict,
+    Union,
+    cast,
+)
+
+from arroyo import Topic
+from arroyo.backends.kafka import KafkaConsumer, KafkaPayload
+from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
+from arroyo.processing import StreamProcessor
+from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
+from arroyo.types import Message, Partition, Position
+from django.conf import settings
+
+from sentry.constants import DataCategory
+from sentry.sentry_metrics.indexer.strings import TRANSACTION_METRICS_NAMES
+from sentry.utils import json
+from sentry.utils.outcomes import Outcome, track_outcome
+
+logger = logging.getLogger(__name__)
+
+
+def get_metrics_billing_consumer(
+    group_id: str,
+    auto_offset_reset: str,
+    force_topic: Union[str, None],
+    force_cluster: Union[str, None],
+    max_batch_size: int,
+    max_batch_time: int,
+) -> StreamProcessor[KafkaPayload]:
+    topic = force_topic or settings.KAFKA_SNUBA_GENERIC_METRICS
+    bootstrap_servers = _get_bootstrap_servers(topic, force_cluster)
+
+    return StreamProcessor(
+        consumer=KafkaConsumer(
+            build_kafka_consumer_configuration(
+                default_config={},
+                group_id=group_id,
+                auto_offset_reset=auto_offset_reset,
+                bootstrap_servers=bootstrap_servers,
+            ),
+        ),
+        topic=Topic(topic),
+        processor_factory=BillingMetricsConsumerStrategyFactory(max_batch_size, max_batch_time),
+    )
+
+
+def _get_bootstrap_servers(topic: str, force_cluster: Union[str, None]) -> Sequence[str]:
+    cluster = force_cluster or settings.KAFKA_TOPICS[topic]["cluster"]
+
+    options = settings.KAFKA_CLUSTERS[cluster]
+    servers = options["common"]["bootstrap.servers"]
+    if isinstance(servers, (list, tuple)):
+        return servers
+    return [servers]
+
+
+class BillingMetricsConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
+    def __init__(self, max_batch_size: int, max_batch_time: int):
+        self.__max_batch_size = max_batch_size
+        self.__max_batch_time = max_batch_time
+
+    def create_with_partitions(
+        self,
+        commit: Callable[[Mapping[Partition, Position]], None],
+        partitions: Mapping[Partition, int],
+    ) -> ProcessingStrategy[KafkaPayload]:
+        return BillingTxCountMetricConsumerStrategy(
+            commit, self.__max_batch_size, self.__max_batch_time
+        )
+
+
+class MetricsBucket(TypedDict):
+    """
+    Metrics bucket as decoded from kafka.
+
+    Only defines the fields that are relevant for this consumer."""
+
+    org_id: int
+    project_id: int
+    metric_id: int
+    timestamp: int
+    value: Any
+
+
+class BillingTxCountMetricConsumerStrategy(ProcessingStrategy[KafkaPayload]):
+    """A metrics consumer that generates a billing outcome for each processed
+    transaction, processing a bucket at a time. The transaction count is
+    computed from the amount of values from `d:transactions/duration@millisecond`
+    buckets.
+    """
+
+    #: The ID of the metric used to count transactions
+    metric_id = TRANSACTION_METRICS_NAMES["d:transactions/duration@millisecond"]
+
+    def __init__(
+        self,
+        commit: Callable[[Mapping[Partition, Position]], None],
+        max_batch_size: int,
+        max_batch_time: int,
+    ) -> None:
+        self.__commit = commit
+        self.__max_batch_size = max_batch_size
+        self.__max_batch_time = timedelta(milliseconds=max_batch_time)
+        self.__messages_since_last_commit = 0
+        self.__last_commit = datetime.now()
+        self.__ready_to_commit: MutableMapping[Partition, Position] = {}
+        self.__closed = False
+
+    def poll(self) -> None:
+        if self._should_commit():
+            self._bulk_commit()
+
+    def terminate(self) -> None:
+        self.close()
+
+    def close(self) -> None:
+        self.__closed = True
+
+    def submit(self, message: Message[KafkaPayload]) -> None:
+        assert not self.__closed
+        self.__messages_since_last_commit += 1
+
+        payload = self._get_payload(message)
+        self._produce_billing_outcomes(payload)
+        self._mark_commit_ready(message)
+
+    def _get_payload(self, message: Message[KafkaPayload]) -> MetricsBucket:
+        payload = json.loads(message.payload.value.decode("utf-8"), use_rapid_json=True)
+        return cast(MetricsBucket, payload)
+
+    def _count_processed_transactions(self, bucket_payload: MetricsBucket) -> int:
+        if bucket_payload["metric_id"] != self.metric_id:
+            return 0
+        value = bucket_payload["value"]
+        try:
+            return len(value)
+        except TypeError:
+            # Unexpected value type for this metric ID, skip.
+            return 0
+
+    def _produce_billing_outcomes(self, payload: MetricsBucket) -> None:
+        quantity = self._count_processed_transactions(payload)
+        if quantity < 1:
+            return
+
+        # track_outcome does not guarantee to deliver the outcome, making this
+        # an at-most-once delivery.
+        #
+        # If it turns out that we drop too many outcomes on shutdown,
+        # we may have to revisit this part to achieve a
+        # better approximation of exactly-once delivery.
+        track_outcome(
+            org_id=payload["org_id"],
+            project_id=payload["project_id"],
+            key_id=None,
+            outcome=Outcome.ACCEPTED,
+            reason=None,
+            timestamp=datetime.fromtimestamp(payload["timestamp"]),
+            event_id=None,
+            category=DataCategory.TRANSACTION_PROCESSED,
+            quantity=quantity,
+        )
+
+    def _mark_commit_ready(self, message: Message[KafkaPayload]) -> None:
+        self.__ready_to_commit[message.partition] = Position(message.next_offset, message.timestamp)
+
+    def join(self, timeout: Optional[float] = None) -> None:
+        self._bulk_commit()
+
+    def _should_commit(self) -> bool:
+        if not self.__ready_to_commit:
+            return False
+        if self.__messages_since_last_commit >= self.__max_batch_size:
+            return True
+        if self.__last_commit + self.__max_batch_time <= datetime.now():
+            return True
+        return False
+
+    def _bulk_commit(self) -> None:
+        self.__commit(self.__ready_to_commit)
+        self.__ready_to_commit = {}
+        self.__messages_since_last_commit = 0
+        self.__last_commit = datetime.now()

+ 6 - 1
src/sentry/runner/commands/devserver.py

@@ -50,6 +50,7 @@ _DEFAULT_DAEMONS = {
         "--ingest-profile",
         "performance",
     ],
+    "metrics-billing": ["sentry", "run", "billing-metrics-consumer"],
     "profiles": ["sentry", "run", "ingest-profiles"],
 }
 
@@ -270,7 +271,11 @@ and run `sentry devservices up kafka zookeeper`.
                     "`SENTRY_USE_METRICS_DEV` can only be used when "
                     "`SENTRY_EVENTSTREAM=sentry.eventstream.kafka.KafkaEventStream`."
                 )
-            daemons += [_get_daemon("metrics-rh"), _get_daemon("metrics-perf")]
+            daemons += [
+                _get_daemon("metrics-rh"),
+                _get_daemon("metrics-perf"),
+                _get_daemon("metrics-billing"),
+            ]
 
     if settings.SENTRY_USE_RELAY:
         daemons += [_get_daemon("ingest")]

+ 11 - 0
src/sentry/runner/commands/run.py

@@ -658,6 +658,17 @@ def metrics_parallel_consumer(**options):
     streamer.run()
 
 
+@run.command("billing-metrics-consumer")
+@log_options()
+@batching_kafka_options("billing-metrics-consumer")
+@configuration
+def metrics_billing_consumer(**options):
+    from sentry.ingest.billing_metrics_consumer import get_metrics_billing_consumer
+
+    consumer = get_metrics_billing_consumer(**options)
+    consumer.run()
+
+
 @run.command("ingest-profiles")
 @log_options()
 @click.option("--topic", default="profiles", help="Topic to get profiles data from.")

+ 2 - 1
src/sentry/testutils/skips.py

@@ -65,7 +65,8 @@ def relay_is_available():
     if "relay" in _service_status:
         return _service_status["relay"]
     try:
-        socket.create_connection(("127.0.0.1", settings.SENTRY_RELAY_PORT), 1.0)
+        with socket.create_connection(("127.0.0.1", settings.SENTRY_RELAY_PORT), 1.0):
+            pass
     except OSError:
         _service_status["relay"] = False
     else:

+ 123 - 0
tests/sentry/ingest/billing_metrics_consumer/test_billing_metrics_consumer_kafka.py

@@ -0,0 +1,123 @@
+from datetime import datetime
+from unittest import mock
+
+from arroyo.backends.kafka import KafkaPayload
+from arroyo.types import Message, Partition, Topic
+
+from sentry.constants import DataCategory
+from sentry.ingest.billing_metrics_consumer import (
+    BillingTxCountMetricConsumerStrategy,
+    MetricsBucket,
+)
+from sentry.sentry_metrics.indexer.strings import TRANSACTION_METRICS_NAMES
+from sentry.utils import json
+from sentry.utils.outcomes import Outcome
+
+
+@mock.patch("sentry.ingest.billing_metrics_consumer.track_outcome")
+def test_outcomes_consumed(track_outcome):
+    # Based on test_ingest_consumer_kafka.py
+
+    topic = Topic("snuba-generic-metrics")
+
+    # admin = kafka_admin(settings)
+    # admin.delete_topic(metrics_topic)
+    # producer = kafka_producer(settings)
+
+    buckets = [
+        {  # Counter metric with wrong ID will not generate an outcome
+            "metric_id": 123,
+            "type": "c",
+            "org_id": 1,
+            "project_id": 2,
+            "timestamp": 123,
+            "value": 123.4,
+        },
+        {  # Distribution metric with wrong ID will not generate an outcome
+            "metric_id": 123,
+            "type": "d",
+            "org_id": 1,
+            "project_id": 2,
+            "timestamp": 123,
+            "value": [1.0, 2.0],
+        },
+        {  # Empty distribution will not generate an outcome
+            # NOTE: Should not be emitted by Relay anyway
+            "metric_id": TRANSACTION_METRICS_NAMES["d:transactions/duration@millisecond"],
+            "type": "d",
+            "org_id": 1,
+            "project_id": 2,
+            "timestamp": 123,
+            "value": [],
+        },
+        {  # Valid distribution bucket emits an outcome
+            "metric_id": TRANSACTION_METRICS_NAMES["d:transactions/duration@millisecond"],
+            "type": "d",
+            "org_id": 1,
+            "project_id": 2,
+            "timestamp": 123456,
+            "value": [1.0, 2.0, 3.0],
+        },
+        {  # Another bucket to introduce some noise
+            "metric_id": 123,
+            "type": "c",
+            "org_id": 1,
+            "project_id": 2,
+            "timestamp": 123,
+            "value": 123.4,
+        },
+    ]
+
+    fake_commit = mock.MagicMock()
+
+    strategy = BillingTxCountMetricConsumerStrategy(
+        commit=fake_commit,
+        max_batch_size=2,
+        max_batch_time=10000,
+    )
+
+    def generate_kafka_message(bucket: MetricsBucket) -> Message[KafkaPayload]:
+        encoded = json.dumps(bucket).encode()
+        payload = KafkaPayload(key=None, value=encoded, headers=[])
+        message = Message(
+            Partition(topic, index=0), generate_kafka_message.counter, payload, datetime.now()
+        )
+        generate_kafka_message.counter += 1
+        return message
+
+    generate_kafka_message.counter = 0
+
+    # Mimick the behavior of StreamProcessor._run_once: Call poll repeatedly,
+    # then call submit when there is a message.
+    strategy.poll()
+    strategy.poll()
+    assert track_outcome.call_count == 0
+    for i, bucket in enumerate(buckets):
+        strategy.poll()
+        strategy.submit(generate_kafka_message(bucket))
+        # commit is called for every two messages:
+        assert fake_commit.call_count == i // 2
+        if i < 3:
+            assert track_outcome.call_count == 0
+        else:
+            assert track_outcome.mock_calls == [
+                mock.call(
+                    org_id=1,
+                    project_id=2,
+                    key_id=None,
+                    outcome=Outcome.ACCEPTED,
+                    reason=None,
+                    timestamp=datetime(1970, 1, 2, 10, 17, 36),
+                    event_id=None,
+                    category=DataCategory.TRANSACTION_PROCESSED,
+                    quantity=3,
+                )
+            ]
+
+    # There's been 5 messages, 2 x 2 of them have their offsets committed:
+    assert fake_commit.call_count == 2
+
+    # Joining should commit the offset of the last message:
+    strategy.join()
+
+    assert fake_commit.call_count == 3