|
@@ -16,7 +16,6 @@ from sentry_kafka_schemas.schema_types.snuba_spans_v1 import SpanEvent
|
|
|
from sentry import options
|
|
|
from sentry.spans.buffer.redis import RedisSpansBuffer
|
|
|
from sentry.spans.produce_segment import produce_segment_to_kafka
|
|
|
-from sentry.utils import metrics
|
|
|
from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
@@ -60,7 +59,6 @@ def _process_message(message: Message[KafkaPayload]) -> ProduceSegmentContext |
|
|
|
return None
|
|
|
|
|
|
assert isinstance(message.value, BrokerValue)
|
|
|
- metrics.incr("process_spans.spans.received.count")
|
|
|
|
|
|
with sentry_sdk.start_transaction(op="process", name="spans.process.process_message") as txn:
|
|
|
payload_value = message.payload.value
|
|
@@ -86,8 +84,6 @@ def _process_message(message: Message[KafkaPayload]) -> ProduceSegmentContext |
|
|
|
project_id, segment_id, timestamp, partition, payload_value
|
|
|
)
|
|
|
|
|
|
- metrics.incr("process_spans.spans.write.count")
|
|
|
-
|
|
|
return ProduceSegmentContext(
|
|
|
should_process_segments=should_process_segments, timestamp=timestamp, partition=partition
|
|
|
)
|
|
@@ -105,7 +101,7 @@ def _produce_segment(message: Message[ProduceSegmentContext | None]):
|
|
|
if message.payload is None:
|
|
|
return
|
|
|
|
|
|
- context: ProduceSegmentContext = message.payload
|
|
|
+ context = message.payload
|
|
|
|
|
|
if context.should_process_segments:
|
|
|
with sentry_sdk.start_transaction(
|
|
@@ -134,7 +130,6 @@ def _produce_segment(message: Message[ProduceSegmentContext | None]):
|
|
|
total_spans_read += len(segment)
|
|
|
produce_segment_to_kafka(segment)
|
|
|
|
|
|
- metrics.incr("process_spans.spans.read.count", total_spans_read)
|
|
|
sentry_sdk.set_context("payload", payload_context)
|
|
|
|
|
|
|
|
@@ -159,13 +154,14 @@ class ProcessSpansStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
|
|
|
self.max_batch_time = max_batch_time
|
|
|
self.input_block_size = input_block_size
|
|
|
self.output_block_size = output_block_size
|
|
|
- self.pool = MultiprocessingPool(num_processes)
|
|
|
+ self.__pool = MultiprocessingPool(num_processes)
|
|
|
|
|
|
def create_with_partitions(
|
|
|
self,
|
|
|
commit: Commit,
|
|
|
partitions: Mapping[Partition, int],
|
|
|
) -> ProcessingStrategy[KafkaPayload]:
|
|
|
+
|
|
|
next_step = RunTask(function=produce_segment, next_step=CommitOffsets(commit))
|
|
|
|
|
|
return RunTaskWithMultiprocessing(
|
|
@@ -173,7 +169,10 @@ class ProcessSpansStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
|
|
|
next_step=next_step,
|
|
|
max_batch_size=self.max_batch_size,
|
|
|
max_batch_time=self.max_batch_time,
|
|
|
- pool=self.pool,
|
|
|
+ pool=self.__pool,
|
|
|
input_block_size=self.input_block_size,
|
|
|
output_block_size=self.output_block_size,
|
|
|
)
|
|
|
+
|
|
|
+ def shutdown(self) -> None:
|
|
|
+ self.__pool.close()
|