Browse Source

feat(post-process-forwarder): Remove unnecessary metric (#44685)

This metric's usefulness came from a time when errors and transactions
shared a topic and we wanted to know how many messages were processed by
each. Today these pipelines are entirely separate and this metric offers
little more than what we already know from `kafka.consumer_offset` and
`kafka.consumer_lag` which is already broken down by group and
partition.
Lyn Nagara 2 years ago
parent
commit
265769cc69

+ 1 - 3
src/sentry/eventstream/kafka/consumer_strategy.py

@@ -12,7 +12,7 @@ from arroyo.types import Commit, Message, Partition
 
 from sentry import options
 from sentry.eventstream.base import GroupStates
-from sentry.eventstream.kafka.postprocessworker import _record_metrics, _sampled_eventstream_timer
+from sentry.eventstream.kafka.postprocessworker import _sampled_eventstream_timer
 from sentry.eventstream.kafka.protocol import (
     get_task_kwargs_for_message,
     get_task_kwargs_for_message_from_headers,
@@ -80,8 +80,6 @@ def _get_task_kwargs_and_dispatch(message: Message[KafkaPayload]) -> None:
     if not task_kwargs:
         return None
 
-    for partition in message.committable:
-        _record_metrics(partition.index, task_kwargs)
     dispatch_post_process_group_task(**task_kwargs)
 
 

+ 1 - 36
src/sentry/eventstream/kafka/postprocessworker.py

@@ -2,12 +2,9 @@ from __future__ import annotations
 
 import logging
 import random
-import time
-from collections import defaultdict
 from contextlib import contextmanager
 from enum import Enum
-from threading import Lock
-from typing import Any, Generator, Mapping, MutableMapping, Optional, Tuple
+from typing import Any, Generator, Mapping, Optional
 
 from sentry import options
 from sentry.eventstream.kafka.protocol import (
@@ -20,7 +17,6 @@ logger = logging.getLogger(__name__)
 
 Message = Any
 _DURATION_METRIC = "eventstream.duration"
-_MESSAGES_METRIC = "eventstream.messages"
 
 
 class PostProcessForwarderType(str, Enum):
@@ -53,34 +49,3 @@ def _get_task_kwargs(message: Message) -> Optional[Mapping[str, Any]]:
     else:
         with metrics.timer(_DURATION_METRIC, instance="get_task_kwargs_for_message"):
             return get_task_kwargs_for_message(message.value())
-
-
-__metrics: MutableMapping[Tuple[int, str], int] = defaultdict(int)
-__metric_record_freq_sec = 1.0
-__last_flush = time.time()
-__lock = Lock()
-
-
-def _record_metrics(partition: int, task_kwargs: Mapping[str, Any]) -> None:
-    """
-    Records the number of messages processed per partition. Metric is flushed every second.
-    """
-    global __metrics
-    global __last_flush
-    # TODO: Fix this, it's already broken for transactions with groups
-    event_type = "transactions" if task_kwargs["group_id"] is None else "errors"
-    __metrics[(partition, event_type)] += 1
-
-    current_time = time.time()
-    if current_time - __last_flush > __metric_record_freq_sec:
-        with __lock:
-            metrics_to_send = __metrics
-            __metrics = defaultdict(int)
-            __last_flush = current_time
-        for ((partition, event_type), count) in metrics_to_send.items():
-            metrics.incr(
-                _MESSAGES_METRIC,
-                amount=count,
-                tags={"partition": partition, "type": event_type},
-                sample_rate=1,
-            )