|
@@ -55,9 +55,8 @@ def trace_func(**span_kwargs):
|
|
|
|
|
|
|
|
|
def process_transaction_no_celery(
|
|
|
- data: MutableMapping[str, Any], project_id: int, start_time: float
|
|
|
+ data: MutableMapping[str, Any], project_id: int, attachments: Any, start_time: float
|
|
|
) -> None:
|
|
|
-
|
|
|
set_current_event_project(project_id)
|
|
|
|
|
|
manager = EventManager(data)
|
|
@@ -74,7 +73,8 @@ def process_transaction_no_celery(
|
|
|
data = dict(data.items())
|
|
|
|
|
|
with sentry_sdk.start_span(op="event_processing_store.store"):
|
|
|
- event_processing_store.store(data)
|
|
|
+ cache_key = event_processing_store.store(data)
|
|
|
+ save_attachments(attachments, cache_key)
|
|
|
|
|
|
|
|
|
@trace_func(name="ingest_consumer.process_event")
|
|
@@ -189,11 +189,12 @@ def process_event(
|
|
|
# The no_celery_mode version of the transactions consumer skips one trip to rc-processing
|
|
|
# Otherwise, we have to store the event in processing store here for the save_event task to
|
|
|
# fetch later
|
|
|
- if no_celery_mode and not attachments:
|
|
|
+ if no_celery_mode:
|
|
|
cache_key = None
|
|
|
else:
|
|
|
with metrics.timer("ingest_consumer._store_event"):
|
|
|
cache_key = event_processing_store.store(data)
|
|
|
+ save_attachments(attachments, cache_key)
|
|
|
|
|
|
try:
|
|
|
# Records rc-processing usage broken down by
|
|
@@ -211,23 +212,12 @@ def process_event(
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
- if attachments:
|
|
|
- with sentry_sdk.start_span(op="ingest_consumer.set_attachment_cache"):
|
|
|
- attachment_objects = [
|
|
|
- CachedAttachment(type=attachment.pop("attachment_type"), **attachment)
|
|
|
- for attachment in attachments
|
|
|
- ]
|
|
|
- assert cache_key is not None
|
|
|
- attachment_cache.set(
|
|
|
- cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT
|
|
|
- )
|
|
|
-
|
|
|
if data.get("type") == "transaction":
|
|
|
if no_celery_mode:
|
|
|
with sentry_sdk.start_span(op="ingest_consumer.process_transaction_no_celery"):
|
|
|
sentry_sdk.set_tag("no_celery_mode", True)
|
|
|
|
|
|
- process_transaction_no_celery(data, project_id, start_time)
|
|
|
+ process_transaction_no_celery(data, project_id, attachments, start_time)
|
|
|
else:
|
|
|
assert cache_key is not None
|
|
|
# No need for preprocess/process for transactions thus submit
|
|
@@ -284,6 +274,17 @@ def process_event(
|
|
|
raise Retriable(exc)
|
|
|
|
|
|
|
|
|
+def save_attachments(attachments: Any, cache_key: str) -> None:
|
|
|
+ if attachments:
|
|
|
+ with sentry_sdk.start_span(op="ingest_consumer.set_attachment_cache"):
|
|
|
+ attachment_objects = [
|
|
|
+ CachedAttachment(type=attachment.pop("attachment_type"), **attachment)
|
|
|
+ for attachment in attachments
|
|
|
+ ]
|
|
|
+ assert cache_key is not None
|
|
|
+ attachment_cache.set(cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT)
|
|
|
+
|
|
|
+
|
|
|
@trace_func(name="ingest_consumer.process_attachment_chunk")
|
|
|
@metrics.wraps("ingest_consumer.process_attachment_chunk")
|
|
|
def process_attachment_chunk(message: IngestMessage) -> None:
|