|
@@ -4,6 +4,8 @@ from collections import defaultdict
|
|
|
from collections.abc import Mapping
|
|
|
from typing import Any
|
|
|
|
|
|
+import orjson
|
|
|
+import rapidjson
|
|
|
import sentry_sdk
|
|
|
from arroyo import Topic as ArroyoTopic
|
|
|
from arroyo.backends.kafka import KafkaProducer, build_kafka_configuration
|
|
@@ -68,7 +70,15 @@ def prepare_buffered_segment_payload(segments) -> bytes:
|
|
|
return b'{"spans": [' + segment_str + b"]}"
|
|
|
|
|
|
|
|
|
-def _deserialize_span(value: bytes) -> Mapping[str, Any]:
|
|
|
+@metrics.wraps("spans.consumers.process.deserialize_span")
|
|
|
+def _deserialize_span(value: bytes, use_orjson=False, use_rapidjson=False) -> Mapping[str, Any]:
|
|
|
+ if use_orjson:
|
|
|
+ sentry_sdk.set_tag("json_lib", "orjson")
|
|
|
+ return orjson.loads(value)
|
|
|
+ if use_rapidjson:
|
|
|
+ sentry_sdk.set_tag("json_lib", "rapidjson")
|
|
|
+ return rapidjson.loads(value)
|
|
|
+
|
|
|
return SPAN_SCHEMA.decode(value)
|
|
|
|
|
|
|
|
@@ -97,8 +107,13 @@ def _process_message(message: Message[KafkaPayload]) -> SpanMessageWithMetadata
|
|
|
timestamp = int(message.value.timestamp.timestamp())
|
|
|
partition = message.value.partition.index
|
|
|
|
|
|
+ use_orjson = options.get("standalone-spans.deserialize-spans-orjson.enable")
|
|
|
+ use_rapidjson = options.get("standalone-spans.deserialize-spans-rapidjson.enable")
|
|
|
+
|
|
|
with txn.start_child(op="deserialize"):
|
|
|
- span = _deserialize_span(payload_value)
|
|
|
+ span = _deserialize_span(
|
|
|
+ payload_value, use_orjson=use_orjson, use_rapidjson=use_rapidjson
|
|
|
+ )
|
|
|
|
|
|
segment_id: str | None = span.get("segment_id", None)
|
|
|
if segment_id is None:
|
|
@@ -121,49 +136,52 @@ def process_message(message: Message[KafkaPayload]) -> SpanMessageWithMetadata |
|
|
|
return FILTERED_PAYLOAD
|
|
|
|
|
|
|
|
|
-def _batch_write_to_redis(
|
|
|
- message: Message[ValuesBatch[SpanMessageWithMetadata]],
|
|
|
-):
|
|
|
+def _batch_write_to_redis(message: Message[ValuesBatch[SpanMessageWithMetadata]]):
|
|
|
"""
|
|
|
Gets a batch of `SpanMessageWithMetadata` and creates a dictionary with
|
|
|
segment_id as key and a list of spans belonging to that segment_id as value.
|
|
|
Pushes the batch of spans to redis.
|
|
|
"""
|
|
|
- batch = message.payload
|
|
|
- latest_ts_by_partition: dict[int, int] = {}
|
|
|
- spans_map: dict[SegmentKey, list[bytes]] = defaultdict(list)
|
|
|
- segment_first_seen_ts: dict[SegmentKey, int] = {}
|
|
|
-
|
|
|
- for item in batch:
|
|
|
- payload = item.payload
|
|
|
- partition = payload.partition
|
|
|
- segment_id = payload.segment_id
|
|
|
- project_id = payload.project_id
|
|
|
- span = payload.span
|
|
|
- timestamp = payload.timestamp
|
|
|
-
|
|
|
- key = SegmentKey(segment_id, project_id, partition)
|
|
|
-
|
|
|
- # Collects spans for each segment_id
|
|
|
- spans_map[key].append(span)
|
|
|
-
|
|
|
- # Collects "first_seen" timestamps for each segment in batch.
|
|
|
- # Batch step doesn't guarantee order, so pick lowest ts.
|
|
|
- if key not in segment_first_seen_ts or timestamp < segment_first_seen_ts[key]:
|
|
|
- segment_first_seen_ts[key] = timestamp
|
|
|
-
|
|
|
- # Collects latest timestamps processed in each partition. It is
|
|
|
- # important to keep track of this per partition because message
|
|
|
- # timestamps are guaranteed to be monotonic per partition only.
|
|
|
- if partition not in latest_ts_by_partition or timestamp > latest_ts_by_partition[partition]:
|
|
|
- latest_ts_by_partition[partition] = timestamp
|
|
|
-
|
|
|
- client = RedisSpansBuffer()
|
|
|
- return client.batch_write_and_check_processing(
|
|
|
- spans_map=spans_map,
|
|
|
- segment_first_seen_ts=segment_first_seen_ts,
|
|
|
- latest_ts_by_partition=latest_ts_by_partition,
|
|
|
- )
|
|
|
+ with sentry_sdk.start_transaction(op="process", name="spans.process.expand_segments"):
|
|
|
+ batch = message.payload
|
|
|
+ latest_ts_by_partition: dict[int, int] = {}
|
|
|
+ spans_map: dict[SegmentKey, list[bytes]] = defaultdict(list)
|
|
|
+ segment_first_seen_ts: dict[SegmentKey, int] = {}
|
|
|
+
|
|
|
+ for item in batch:
|
|
|
+ payload = item.payload
|
|
|
+ partition = payload.partition
|
|
|
+ segment_id = payload.segment_id
|
|
|
+ project_id = payload.project_id
|
|
|
+ span = payload.span
|
|
|
+ timestamp = payload.timestamp
|
|
|
+
|
|
|
+ key = SegmentKey(segment_id, project_id, partition)
|
|
|
+
|
|
|
+ # Collects spans for each segment_id
|
|
|
+ spans_map[key].append(span)
|
|
|
+
|
|
|
+ # Collects "first_seen" timestamps for each segment in batch.
|
|
|
+ # Batch step doesn't guarantee order, so pick lowest ts.
|
|
|
+ if key not in segment_first_seen_ts or timestamp < segment_first_seen_ts[key]:
|
|
|
+ segment_first_seen_ts[key] = timestamp
|
|
|
+
|
|
|
+ # Collects latest timestamps processed in each partition. It is
|
|
|
+ # important to keep track of this per partition because message
|
|
|
+ # timestamps are guaranteed to be monotonic per partition only.
|
|
|
+ if (
|
|
|
+ partition not in latest_ts_by_partition
|
|
|
+ or timestamp > latest_ts_by_partition[partition]
|
|
|
+ ):
|
|
|
+ latest_ts_by_partition[partition] = timestamp
|
|
|
+
|
|
|
+ client = RedisSpansBuffer()
|
|
|
+
|
|
|
+ return client.batch_write_and_check_processing(
|
|
|
+ spans_map=spans_map,
|
|
|
+ segment_first_seen_ts=segment_first_seen_ts,
|
|
|
+ latest_ts_by_partition=latest_ts_by_partition,
|
|
|
+ )
|
|
|
|
|
|
|
|
|
def batch_write_to_redis(
|
|
@@ -177,19 +195,17 @@ def batch_write_to_redis(
|
|
|
|
|
|
|
|
|
def _expand_segments(should_process_segments: list[ProcessSegmentsContext]):
|
|
|
- buffered_segments: list[KafkaPayload | FilteredPayload] = []
|
|
|
+ with sentry_sdk.start_transaction(op="process", name="spans.process.expand_segments") as txn:
|
|
|
+ buffered_segments: list[KafkaPayload | FilteredPayload] = []
|
|
|
|
|
|
- for result in should_process_segments:
|
|
|
- timestamp = result.timestamp
|
|
|
- partition = result.partition
|
|
|
- should_process = result.should_process_segments
|
|
|
+ for result in should_process_segments:
|
|
|
+ timestamp = result.timestamp
|
|
|
+ partition = result.partition
|
|
|
+ should_process = result.should_process_segments
|
|
|
|
|
|
- if not should_process:
|
|
|
- continue
|
|
|
+ if not should_process:
|
|
|
+ continue
|
|
|
|
|
|
- with sentry_sdk.start_transaction(
|
|
|
- op="process", name="spans.process.expand_segments"
|
|
|
- ) as txn:
|
|
|
client = RedisSpansBuffer()
|
|
|
payload_context = {}
|
|
|
|
|
@@ -202,7 +218,7 @@ def _expand_segments(should_process_segments: list[ProcessSegmentsContext]):
|
|
|
|
|
|
# With pipelining, redis server is forced to queue replies using
|
|
|
# up memory, so batching the keys we fetch.
|
|
|
- with txn.start_child(op="process", description="produce_fetched_segments"):
|
|
|
+ with txn.start_child(op="process", description="read_and_expire_many_segments"):
|
|
|
for i in range(0, len(keys), BATCH_SIZE):
|
|
|
segments = client.read_and_expire_many_segments(keys[i : i + BATCH_SIZE])
|
|
|
|