|
@@ -28,7 +28,7 @@ RECORDINGS_CODEC: Codec[ReplayRecording] = get_topic_codec(Topic.INGEST_REPLAYS_
|
|
|
class MessageContext:
|
|
|
message: bytes
|
|
|
transaction: Span
|
|
|
- current_hub: sentry_sdk.Hub
|
|
|
+ isolation_scope: sentry_sdk.Scope
|
|
|
|
|
|
# The message attribute can cause large log messages to be emitted which can pin the CPU
|
|
|
# to 100.
|
|
@@ -110,8 +110,8 @@ def initialize_threaded_context(message: Message[KafkaPayload]) -> MessageContex
|
|
|
sampled=random.random()
|
|
|
< getattr(settings, "SENTRY_REPLAY_RECORDINGS_CONSUMER_APM_SAMPLING", 0),
|
|
|
)
|
|
|
- current_hub = sentry_sdk.Hub(sentry_sdk.Hub.current)
|
|
|
- return MessageContext(message.payload.value, transaction, current_hub)
|
|
|
+ isolation_scope = sentry_sdk.Scope.get_isolation_scope().fork()
|
|
|
+ return MessageContext(message.payload.value, transaction, isolation_scope)
|
|
|
|
|
|
|
|
|
def process_message_threaded(message: Message[MessageContext]) -> Any:
|
|
@@ -125,7 +125,7 @@ def process_message_threaded(message: Message[MessageContext]) -> Any:
|
|
|
logger.exception("Could not decode recording message.")
|
|
|
return None
|
|
|
|
|
|
- ingest_recording(message_dict, context.transaction, context.current_hub)
|
|
|
+ ingest_recording(message_dict, context.transaction, context.isolation_scope)
|
|
|
|
|
|
|
|
|
def process_message(message: Message[KafkaPayload]) -> Any:
|
|
@@ -136,7 +136,7 @@ def process_message(message: Message[KafkaPayload]) -> Any:
|
|
|
sampled=random.random()
|
|
|
< getattr(settings, "SENTRY_REPLAY_RECORDINGS_CONSUMER_APM_SAMPLING", 0),
|
|
|
)
|
|
|
- current_hub = sentry_sdk.Hub(sentry_sdk.Hub.current)
|
|
|
+ isolation_scope = sentry_sdk.Scope.get_isolation_scope().fork()
|
|
|
|
|
|
try:
|
|
|
message_dict: ReplayRecording = RECORDINGS_CODEC.decode(message.payload.value)
|
|
@@ -145,4 +145,4 @@ def process_message(message: Message[KafkaPayload]) -> Any:
|
|
|
logger.exception("Could not decode recording message.")
|
|
|
return None
|
|
|
|
|
|
- ingest_recording(message_dict, transaction, current_hub)
|
|
|
+ ingest_recording(message_dict, transaction, isolation_scope)
|