Browse Source

feat: Arroyo 2.3.0 (#42392)

Lyn Nagara 2 years ago
parent
commit
de0cdbc7df

+ 1 - 1
requirements-base.txt

@@ -54,7 +54,7 @@ requests>=2.25.1
 rfc3339-validator>=0.1.2
 rfc3339-validator>=0.1.2
 rfc3986-validator>=0.1.1
 rfc3986-validator>=0.1.1
 # [end] jsonschema format validators
 # [end] jsonschema format validators
-sentry-arroyo>=2.2.0
+sentry-arroyo>=2.3.0
 sentry-relay>=0.8.15
 sentry-relay>=0.8.15
 sentry-sdk>=1.11.0
 sentry-sdk>=1.11.0
 snuba-sdk>=1.0.1
 snuba-sdk>=1.0.1

+ 3 - 2
requirements-dev-frozen.txt

@@ -44,6 +44,7 @@ drf-spectacular==0.22.1
 email-reply-parser==0.5.12
 email-reply-parser==0.5.12
 exceptiongroup==1.0.0rc9
 exceptiongroup==1.0.0rc9
 execnet==1.9.0
 execnet==1.9.0
+fastjsonschema==2.16.2
 fido2==0.9.2
 fido2==0.9.2
 filelock==3.7.0
 filelock==3.7.0
 flake8==6.0.0
 flake8==6.0.0
@@ -132,7 +133,7 @@ pytest-sentry==0.1.11
 pytest-xdist==3.0.2
 pytest-xdist==3.0.2
 python-dateutil==2.8.1
 python-dateutil==2.8.1
 python-memcached==1.59
 python-memcached==1.59
-python-rapidjson==1.4
+python-rapidjson==1.8
 python-u2flib-server==5.0.0
 python-u2flib-server==5.0.0
 python-utils==3.3.3
 python-utils==3.3.3
 python3-saml==1.14.0
 python3-saml==1.14.0
@@ -153,7 +154,7 @@ rfc3986-validator==0.1.1
 rsa==4.8
 rsa==4.8
 s3transfer==0.5.2
 s3transfer==0.5.2
 selenium==4.3.0
 selenium==4.3.0
-sentry-arroyo==2.2.0
+sentry-arroyo==2.3.0
 sentry-relay==0.8.15
 sentry-relay==0.8.15
 sentry-sdk==1.11.0
 sentry-sdk==1.11.0
 simplejson==3.17.6
 simplejson==3.17.6

+ 3 - 2
requirements-frozen.txt

@@ -34,6 +34,7 @@ django-pg-zero-downtime-migrations==0.11
 djangorestframework==3.12.4
 djangorestframework==3.12.4
 drf-spectacular==0.22.1
 drf-spectacular==0.22.1
 email-reply-parser==0.5.12
 email-reply-parser==0.5.12
+fastjsonschema==2.16.2
 fido2==0.9.2
 fido2==0.9.2
 google-api-core==2.10.1
 google-api-core==2.10.1
 google-auth==1.35.0
 google-auth==1.35.0
@@ -88,7 +89,7 @@ pyrsistent==0.18.1
 pysocks==1.7.1
 pysocks==1.7.1
 python-dateutil==2.8.1
 python-dateutil==2.8.1
 python-memcached==1.59
 python-memcached==1.59
-python-rapidjson==1.4
+python-rapidjson==1.8
 python-u2flib-server==5.0.0
 python-u2flib-server==5.0.0
 python-utils==3.3.3
 python-utils==3.3.3
 python3-saml==1.14.0
 python3-saml==1.14.0
@@ -107,7 +108,7 @@ rfc3986-validator==0.1.1
 rsa==4.8
 rsa==4.8
 s3transfer==0.5.2
 s3transfer==0.5.2
 selenium==4.3.0
 selenium==4.3.0
-sentry-arroyo==2.2.0
+sentry-arroyo==2.3.0
 sentry-relay==0.8.15
 sentry-relay==0.8.15
 sentry-sdk==1.11.0
 sentry-sdk==1.11.0
 simplejson==3.17.6
 simplejson==3.17.6

+ 5 - 5
src/sentry/eventstream/kafka/synchronized.py

@@ -19,7 +19,7 @@ from arroyo.backends.abstract import Consumer
 from arroyo.backends.kafka import KafkaPayload
 from arroyo.backends.kafka import KafkaPayload
 from arroyo.backends.kafka.commit import CommitCodec
 from arroyo.backends.kafka.commit import CommitCodec
 from arroyo.errors import ConsumerError, EndOfPartition
 from arroyo.errors import ConsumerError, EndOfPartition
-from arroyo.types import BrokerValue, Partition, Position, Topic, TPayload
+from arroyo.types import BrokerValue, Partition, Topic, TPayload
 from arroyo.utils.concurrent import execute
 from arroyo.utils.concurrent import execute
 
 
 from sentry.utils import metrics
 from sentry.utils import metrics
@@ -298,11 +298,11 @@ class SynchronizedConsumer(Consumer[TPayload]):
     def seek(self, offsets: Mapping[Partition, int]) -> None:
     def seek(self, offsets: Mapping[Partition, int]) -> None:
         return self.__consumer.seek(offsets)
         return self.__consumer.seek(offsets)
 
 
-    def stage_positions(self, positions: Mapping[Partition, Position]) -> None:
-        return self.__consumer.stage_positions(positions)
+    def stage_offsets(self, offsets: Mapping[Partition, int]) -> None:
+        return self.__consumer.stage_offsets(offsets)
 
 
-    def commit_positions(self) -> Mapping[Partition, Position]:
-        return self.__consumer.commit_positions()
+    def commit_offsets(self) -> Mapping[Partition, int]:
+        return self.__consumer.commit_offsets()
 
 
     def close(self, timeout: Optional[float] = None) -> None:
     def close(self, timeout: Optional[float] = None) -> None:
         # TODO: Be careful to ensure there are not any deadlock conditions
         # TODO: Be careful to ensure there are not any deadlock conditions

+ 5 - 15
src/sentry/ingest/billing_metrics_consumer.py

@@ -1,16 +1,6 @@
 import logging
 import logging
 from datetime import datetime, timedelta, timezone
 from datetime import datetime, timedelta, timezone
-from typing import (
-    Any,
-    Callable,
-    Mapping,
-    MutableMapping,
-    Optional,
-    Sequence,
-    TypedDict,
-    Union,
-    cast,
-)
+from typing import Any, Mapping, MutableMapping, Optional, Sequence, TypedDict, Union, cast
 
 
 from arroyo import Topic
 from arroyo import Topic
 from arroyo.backends.kafka import KafkaConsumer, KafkaPayload
 from arroyo.backends.kafka import KafkaConsumer, KafkaPayload
@@ -18,7 +8,7 @@ from arroyo.backends.kafka.configuration import build_kafka_consumer_configurati
 from arroyo.commit import IMMEDIATE
 from arroyo.commit import IMMEDIATE
 from arroyo.processing import StreamProcessor
 from arroyo.processing import StreamProcessor
 from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
 from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
-from arroyo.types import Message, Partition, Position
+from arroyo.types import Commit, Message, Partition
 from django.conf import settings
 from django.conf import settings
 
 
 from sentry.constants import DataCategory
 from sentry.constants import DataCategory
@@ -73,7 +63,7 @@ class BillingMetricsConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPaylo
 
 
     def create_with_partitions(
     def create_with_partitions(
         self,
         self,
-        commit: Callable[[Mapping[Partition, Position]], None],
+        commit: Commit,
         partitions: Mapping[Partition, int],
         partitions: Mapping[Partition, int],
     ) -> ProcessingStrategy[KafkaPayload]:
     ) -> ProcessingStrategy[KafkaPayload]:
         return BillingTxCountMetricConsumerStrategy(
         return BillingTxCountMetricConsumerStrategy(
@@ -106,7 +96,7 @@ class BillingTxCountMetricConsumerStrategy(ProcessingStrategy[KafkaPayload]):
 
 
     def __init__(
     def __init__(
         self,
         self,
-        commit: Callable[[Mapping[Partition, Position]], None],
+        commit: Commit,
         max_batch_size: int,
         max_batch_size: int,
         max_batch_time: int,
         max_batch_time: int,
     ) -> None:
     ) -> None:
@@ -115,7 +105,7 @@ class BillingTxCountMetricConsumerStrategy(ProcessingStrategy[KafkaPayload]):
         self.__max_batch_time = timedelta(milliseconds=max_batch_time)
         self.__max_batch_time = timedelta(milliseconds=max_batch_time)
         self.__messages_since_last_commit = 0
         self.__messages_since_last_commit = 0
         self.__last_commit = datetime.now(timezone.utc)
         self.__last_commit = datetime.now(timezone.utc)
-        self.__ready_to_commit: MutableMapping[Partition, Position] = {}
+        self.__ready_to_commit: MutableMapping[Partition, int] = {}
         self.__closed = False
         self.__closed = False
 
 
     def poll(self) -> None:
     def poll(self) -> None:

+ 4 - 4
src/sentry/issues/occurrence_consumer.py

@@ -1,5 +1,5 @@
 import logging
 import logging
-from typing import Any, Callable, Mapping, Optional
+from typing import Any, Mapping, Optional
 
 
 from arroyo import Topic
 from arroyo import Topic
 from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
 from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
@@ -7,7 +7,7 @@ from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload
 from arroyo.commit import ONCE_PER_SECOND
 from arroyo.commit import ONCE_PER_SECOND
 from arroyo.processing.processor import StreamProcessor
 from arroyo.processing.processor import StreamProcessor
 from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
 from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
-from arroyo.types import Message, Partition, Position
+from arroyo.types import Commit, Message, Partition
 from django.conf import settings
 from django.conf import settings
 
 
 from sentry.utils.kafka_config import get_kafka_consumer_cluster_options
 from sentry.utils.kafka_config import get_kafka_consumer_cluster_options
@@ -46,7 +46,7 @@ def create_ingest_occurences_consumer(
 class OccurrenceStrategy(ProcessingStrategy[KafkaPayload]):
 class OccurrenceStrategy(ProcessingStrategy[KafkaPayload]):
     def __init__(
     def __init__(
         self,
         self,
-        committer: Callable[[Mapping[Partition, Position]], None],
+        committer: Commit,
         partitions: Mapping[Partition, int],
         partitions: Mapping[Partition, int],
     ):
     ):
         pass
         pass
@@ -73,7 +73,7 @@ class OccurrenceStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
 
 
     def create_with_partitions(
     def create_with_partitions(
         self,
         self,
-        commit: Callable[[Mapping[Partition, Position]], None],
+        commit: Commit,
         partitions: Mapping[Partition, int],
         partitions: Mapping[Partition, int],
     ) -> ProcessingStrategy[KafkaPayload]:
     ) -> ProcessingStrategy[KafkaPayload]:
         return OccurrenceStrategy(commit, partitions)
         return OccurrenceStrategy(commit, partitions)

+ 1 - 2
src/sentry/profiles/consumers/process/factory.py

@@ -31,8 +31,7 @@ class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         commit: Commit,
         commit: Commit,
         partitions: Mapping[Partition, int],
         partitions: Mapping[Partition, int],
     ) -> ProcessingStrategy[KafkaPayload]:
     ) -> ProcessingStrategy[KafkaPayload]:
-        next_step: CommitOffsets[None] = CommitOffsets(commit)
         return RunTask(
         return RunTask(
             function=process_message,
             function=process_message,
-            next_step=next_step,
+            next_step=CommitOffsets(commit),
         )
         )

+ 3 - 3
src/sentry/replays/consumers/recording/factory.py

@@ -1,9 +1,9 @@
 import logging
 import logging
-from typing import Callable, Mapping
+from typing import Mapping
 
 
 from arroyo.backends.kafka.consumer import KafkaPayload
 from arroyo.backends.kafka.consumer import KafkaPayload
 from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
 from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
-from arroyo.types import Partition, Position
+from arroyo.types import Commit, Partition
 
 
 from sentry.replays.consumers.recording.process_recording import ProcessRecordingSegmentStrategy
 from sentry.replays.consumers.recording.process_recording import ProcessRecordingSegmentStrategy
 
 
@@ -18,7 +18,7 @@ class ProcessReplayRecordingStrategyFactory(ProcessingStrategyFactory[KafkaPaylo
 
 
     def create_with_partitions(
     def create_with_partitions(
         self,
         self,
-        commit: Callable[[Mapping[Partition, Position]], None],
+        commit: Commit,
         partitions: Mapping[Partition, int],
         partitions: Mapping[Partition, int],
     ) -> ProcessingStrategy[KafkaPayload]:
     ) -> ProcessingStrategy[KafkaPayload]:
         return ProcessRecordingSegmentStrategy(commit)
         return ProcessRecordingSegmentStrategy(commit)

+ 4 - 4
src/sentry/replays/consumers/recording/process_recording.py

@@ -6,7 +6,7 @@ import random
 import time
 import time
 from collections import deque
 from collections import deque
 from concurrent.futures import Future
 from concurrent.futures import Future
-from typing import Callable, Deque, Mapping, MutableMapping, NamedTuple, Optional, cast
+from typing import Deque, MutableMapping, NamedTuple, Optional, cast
 
 
 import msgpack
 import msgpack
 import sentry_sdk
 import sentry_sdk
@@ -14,7 +14,7 @@ from arroyo import Partition
 from arroyo.backends.kafka.consumer import KafkaPayload
 from arroyo.backends.kafka.consumer import KafkaPayload
 from arroyo.processing.strategies import MessageRejected
 from arroyo.processing.strategies import MessageRejected
 from arroyo.processing.strategies.abstract import ProcessingStrategy
 from arroyo.processing.strategies.abstract import ProcessingStrategy
-from arroyo.types import Message, Position
+from arroyo.types import Commit, Message
 from django.conf import settings
 from django.conf import settings
 
 
 from sentry.replays.cache import RecordingSegmentParts
 from sentry.replays.cache import RecordingSegmentParts
@@ -52,13 +52,13 @@ class ReplayRecordingMessageFuture(NamedTuple):
 class ProcessRecordingSegmentStrategy(ProcessingStrategy[KafkaPayload]):
 class ProcessRecordingSegmentStrategy(ProcessingStrategy[KafkaPayload]):
     def __init__(
     def __init__(
         self,
         self,
-        commit: Callable[[Mapping[Partition, Position]], None],
+        commit: Commit,
     ) -> None:
     ) -> None:
         self.__closed = False
         self.__closed = False
         self.__futures: Deque[ReplayRecordingMessageFuture] = deque()
         self.__futures: Deque[ReplayRecordingMessageFuture] = deque()
         self.__threadpool = concurrent.futures.ThreadPoolExecutor(max_workers=16)
         self.__threadpool = concurrent.futures.ThreadPoolExecutor(max_workers=16)
         self.__commit = commit
         self.__commit = commit
-        self.__commit_data: MutableMapping[Partition, Position] = {}
+        self.__commit_data: MutableMapping[Partition, int] = {}
         self.__last_committed: float = 0
         self.__last_committed: float = 0
         self.__max_pending_futures = 32
         self.__max_pending_futures = 32
 
 

+ 5 - 5
src/sentry/sentry_metrics/consumers/indexer/multiprocess.py

@@ -1,12 +1,12 @@
 import logging
 import logging
 import time
 import time
 from functools import partial
 from functools import partial
-from typing import Any, Callable, Mapping, MutableMapping, Optional
+from typing import Any, Mapping, MutableMapping, Optional
 
 
 from arroyo.backends.abstract import Producer as AbstractProducer
 from arroyo.backends.abstract import Producer as AbstractProducer
 from arroyo.backends.kafka import KafkaPayload
 from arroyo.backends.kafka import KafkaPayload
 from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep
 from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep
-from arroyo.types import Message, Partition, Position
+from arroyo.types import Commit, Message, Partition
 from confluent_kafka import Producer
 from confluent_kafka import Producer
 from django.conf import settings
 from django.conf import settings
 
 
@@ -19,7 +19,7 @@ class SimpleProduceStep(ProcessingStep[KafkaPayload]):
     def __init__(
     def __init__(
         self,
         self,
         output_topic: str,
         output_topic: str,
-        commit_function: Callable[[Mapping[Partition, Position]], None],
+        commit_function: Commit,
         commit_max_batch_size: int,
         commit_max_batch_size: int,
         commit_max_batch_time: float,
         commit_max_batch_time: float,
         producer: Optional[AbstractProducer[KafkaPayload]] = None,
         producer: Optional[AbstractProducer[KafkaPayload]] = None,
@@ -32,7 +32,7 @@ class SimpleProduceStep(ProcessingStep[KafkaPayload]):
         self.__commit_function = commit_function
         self.__commit_function = commit_function
 
 
         self.__closed = False
         self.__closed = False
-        self.__produced_message_offsets: MutableMapping[Partition, Position] = {}
+        self.__produced_message_offsets: MutableMapping[Partition, int] = {}
         self.__callbacks = 0
         self.__callbacks = 0
         self.__started = time.time()
         self.__started = time.time()
         # TODO: Need to make these flags
         # TODO: Need to make these flags
@@ -102,7 +102,7 @@ class SimpleProduceStep(ProcessingStep[KafkaPayload]):
             headers=message.payload.headers,
             headers=message.payload.headers,
         )
         )
 
 
-    def callback(self, error: Any, message: Any, committable: Mapping[Partition, Position]) -> None:
+    def callback(self, error: Any, message: Any, committable: Mapping[Partition, int]) -> None:
         if message and error is None:
         if message and error is None:
             self.__callbacks += 1
             self.__callbacks += 1
             self.__produced_message_offsets.update(committable)
             self.__produced_message_offsets.update(committable)

Some files were not shown because too many files changed in this diff