|
@@ -1,6 +1,8 @@
|
|
|
import logging
|
|
|
+import random
|
|
|
import signal
|
|
|
-from typing import Any, Mapping, Optional, Tuple
|
|
|
+from contextlib import contextmanager
|
|
|
+from typing import Any, Generator, Mapping, Optional, Tuple
|
|
|
|
|
|
from confluent_kafka import OFFSET_INVALID, TopicPartition
|
|
|
from django.conf import settings
|
|
@@ -278,14 +280,14 @@ class KafkaEventStream(SnubaProtocolEventStream):
|
|
|
|
|
|
if use_kafka_headers is True:
|
|
|
try:
|
|
|
- with metrics.timer(
|
|
|
- "eventstream.duration", instance="get_task_kwargs_for_message_from_headers"
|
|
|
+ with self.sampled_eventstream_timer(
|
|
|
+ instance="get_task_kwargs_for_message_from_headers"
|
|
|
):
|
|
|
task_kwargs = get_task_kwargs_for_message_from_headers(message.headers())
|
|
|
|
|
|
if task_kwargs is not None:
|
|
|
- with metrics.timer(
|
|
|
- "eventstream.duration", instance="dispatch_post_process_group_task"
|
|
|
+ with self.sampled_eventstream_timer(
|
|
|
+ instance="dispatch_post_process_group_task"
|
|
|
):
|
|
|
self._dispatch_post_process_group_task(**task_kwargs)
|
|
|
|
|
@@ -311,3 +313,13 @@ class KafkaEventStream(SnubaProtocolEventStream):
|
|
|
if task_kwargs is not None:
|
|
|
with metrics.timer("eventstream.duration", instance="dispatch_post_process_group_task"):
|
|
|
self._dispatch_post_process_group_task(**task_kwargs)
|
|
|
+
|
|
|
+ @contextmanager
|
|
|
+ def sampled_eventstream_timer(self, instance: str) -> Generator[None, None, None]:
|
|
|
+
|
|
|
+ record_metric = random.random() < 0.1
|
|
|
+ if record_metric is True:
|
|
|
+ with metrics.timer("eventstream.duration", instance=instance):
|
|
|
+ yield
|
|
|
+ else:
|
|
|
+ yield
|