|
@@ -1,8 +1,10 @@
|
|
|
+import dataclasses
|
|
|
import logging
|
|
|
from collections.abc import Mapping
|
|
|
from typing import Any
|
|
|
|
|
|
-from arroyo.backends.kafka.consumer import KafkaPayload
|
|
|
+from arroyo.backends.kafka.consumer import Headers, KafkaPayload
|
|
|
+from arroyo.processing.strategies import RunTask
|
|
|
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
|
|
|
from arroyo.processing.strategies.commit import CommitOffsets
|
|
|
from arroyo.types import BrokerValue, Commit, Message, Partition
|
|
@@ -18,29 +20,50 @@ from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
|
|
|
logger = logging.getLogger(__name__)
|
|
|
SPAN_SCHEMA: Codec[SpanEvent] = get_codec("snuba-spans")
|
|
|
|
|
|
-PROCESS_SEGMENT_DELAY = 2 * 60 # 2 minutes
|
|
|
BATCH_SIZE = 100
|
|
|
|
|
|
|
|
|
+@dataclasses.dataclass
|
|
|
+class ProduceSegmentContext:
|
|
|
+ should_process_segments: bool
|
|
|
+ timestamp: int
|
|
|
+ partition: int
|
|
|
+
|
|
|
+
|
|
|
+def get_project_id(headers: Headers) -> int | None:
|
|
|
+ for k, v in headers:
|
|
|
+ if k == "project_id":
|
|
|
+ return int(v.decode("utf-8"))
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
def _deserialize_span(value: bytes) -> Mapping[str, Any]:
|
|
|
return SPAN_SCHEMA.decode(value)
|
|
|
|
|
|
|
|
|
-def process_message(message: Message[KafkaPayload]):
|
|
|
+def process_message(message: Message[KafkaPayload]) -> ProduceSegmentContext | None:
|
|
|
if not options.get("standalone-spans.process-spans-consumer.enable"):
|
|
|
- return
|
|
|
+ return None
|
|
|
+
|
|
|
+ try:
|
|
|
+ project_id = get_project_id(message.payload.headers)
|
|
|
+ except Exception:
|
|
|
+ logger.exception("Failed to parse span message header")
|
|
|
+ return None
|
|
|
+
|
|
|
+ if project_id is None or project_id not in options.get(
|
|
|
+ "standalone-spans.process-spans-consumer.project-allowlist"
|
|
|
+ ):
|
|
|
+ return None
|
|
|
|
|
|
assert isinstance(message.value, BrokerValue)
|
|
|
try:
|
|
|
span = _deserialize_span(message.payload.value)
|
|
|
segment_id = span["segment_id"]
|
|
|
- project_id = span["project_id"]
|
|
|
except Exception:
|
|
|
logger.exception("Failed to process span payload")
|
|
|
- return
|
|
|
-
|
|
|
- if project_id not in options.get("standalone-spans.process-spans-consumer.project-allowlist"):
|
|
|
- return
|
|
|
+ return None
|
|
|
|
|
|
timestamp = int(message.value.timestamp.timestamp())
|
|
|
partition = message.value.partition.index
|
|
@@ -51,8 +74,22 @@ def process_message(message: Message[KafkaPayload]):
|
|
|
project_id, segment_id, timestamp, partition, message.payload.value
|
|
|
)
|
|
|
|
|
|
- if should_process_segments:
|
|
|
- keys = client.get_unprocessed_segments_and_prune_bucket(timestamp, partition)
|
|
|
+ return ProduceSegmentContext(
|
|
|
+ should_process_segments=should_process_segments, timestamp=timestamp, partition=partition
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def produce_segment(message: Message[ProduceSegmentContext | None]):
|
|
|
+ if message.payload is None:
|
|
|
+ return
|
|
|
+
|
|
|
+ context: ProduceSegmentContext = message.payload
|
|
|
+ if context.should_process_segments:
|
|
|
+ client = RedisSpansBuffer()
|
|
|
+
|
|
|
+ keys = client.get_unprocessed_segments_and_prune_bucket(
|
|
|
+ context.timestamp, context.partition
|
|
|
+ )
|
|
|
# With pipelining, redis server is forced to queue replies using
|
|
|
# up memory, so batching the keys we fetch.
|
|
|
for i in range(0, len(keys), BATCH_SIZE):
|
|
@@ -83,9 +120,11 @@ class ProcessSpansStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
|
|
|
commit: Commit,
|
|
|
partitions: Mapping[Partition, int],
|
|
|
) -> ProcessingStrategy[KafkaPayload]:
|
|
|
+ next_step = RunTask(function=produce_segment, next_step=CommitOffsets(commit))
|
|
|
+
|
|
|
return RunTaskWithMultiprocessing(
|
|
|
function=process_message,
|
|
|
- next_step=CommitOffsets(commit),
|
|
|
+ next_step=next_step,
|
|
|
max_batch_size=self.max_batch_size,
|
|
|
max_batch_time=self.max_batch_time,
|
|
|
pool=self.pool,
|