Browse Source

fix(spans): Close multiprocess pool (#68890)

Close multiprocess pool on detect performance issues consumer
Add some tests to make check offsets are being committed as expected
Remove some unnecessary metrics
Shruthi 11 months ago
parent
commit
b5038f5810

+ 5 - 4
src/sentry/spans/consumers/detect_performance_issues/factory.py

@@ -13,7 +13,6 @@ from sentry_kafka_schemas.schema_types.buffered_segments_v1 import BufferedSegme
 
 from sentry import options
 from sentry.spans.consumers.detect_performance_issues.message import process_segment
-from sentry.utils import metrics
 from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
 
 BUFFERED_SEGMENT_SCHEMA: Codec[BufferedSegment] = get_codec("buffered-segments")
@@ -28,16 +27,15 @@ def _deserialize_segment(value: bytes) -> Mapping[str, Any]:
 def process_message(message: Message[KafkaPayload]):
     value = message.payload.value
     segment = _deserialize_segment(value)
-    metrics.incr("detect_performance_issues.spans.count", len(segment["spans"]))
 
-    assert len(segment["spans"]) > 0
+    assert segment["spans"]
 
     process_segment(segment["spans"])
 
 
 def _process_message(message: Message[KafkaPayload]):
     if not options.get("standalone-spans.detect-performance-issues-consumer.enable"):
-        return None
+        return
 
     assert isinstance(message.value, BrokerValue)
 
@@ -81,3 +79,6 @@ class DetectPerformanceIssuesStrategyFactory(ProcessingStrategyFactory[KafkaPayl
             input_block_size=self.input_block_size,
             output_block_size=self.output_block_size,
         )
+
+    def shutdown(self):
+        self.pool.close()

+ 25 - 5
tests/sentry/spans/consumers/detect_performance_issues/test_factory.py

@@ -31,15 +31,17 @@ def build_mock_message(data, topic=None):
 @mock.patch("sentry.spans.consumers.detect_performance_issues.factory.process_segment")
 def test_segment_deserialized_correctly(mock_process_segment):
     topic = ArroyoTopic(get_topic_definition(Topic.BUFFERED_SEGMENTS)["real_topic_name"])
-    partition = Partition(topic, 0)
+    partition_1 = Partition(topic, 0)
+    partition_2 = Partition(topic, 1)
+    mock_commit = mock.Mock()
     strategy = DetectPerformanceIssuesStrategyFactory(
         num_processes=2,
         input_block_size=1,
-        max_batch_size=1,
+        max_batch_size=2,
         max_batch_time=1,
         output_block_size=1,
     ).create_with_partitions(
-        commit=mock.Mock(),
+        commit=mock_commit,
         partitions={},
     )
 
@@ -51,15 +53,33 @@ def test_segment_deserialized_correctly(mock_process_segment):
         Message(
             BrokerValue(
                 KafkaPayload(b"key", message.value().encode("utf-8"), []),
-                partition,
+                partition_1,
                 1,
                 datetime.now(),
             )
         )
     )
 
+    strategy.submit(
+        Message(
+            BrokerValue(
+                KafkaPayload(b"key", message.value().encode("utf-8"), []),
+                partition_2,
+                1,
+                datetime.now(),
+            )
+        )
+    )
+
+    calls = [
+        mock.call({partition_1: 2}),
+        mock.call({partition_2: 2}),
+    ]
+
+    mock_commit.assert_has_calls(calls=calls, any_order=True)
+
     strategy.poll()
     strategy.join(1)
     strategy.terminate()
 
-    mock_process_segment.assert_called_once_with(segment_data["spans"])
+    assert mock_process_segment.call_args.args[0] == segment_data["spans"]