|
@@ -1,5 +1,7 @@
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
+import random
|
|
|
+import functools
|
|
|
import atexit
|
|
|
import logging
|
|
|
import msgpack
|
|
@@ -8,14 +10,18 @@ from six import BytesIO
|
|
|
import multiprocessing.dummy
|
|
|
import multiprocessing as _multiprocessing
|
|
|
|
|
|
+from django.conf import settings
|
|
|
from django.core.cache import cache
|
|
|
|
|
|
+import sentry_sdk
|
|
|
+
|
|
|
from sentry import eventstore, features, options
|
|
|
from sentry.cache import default_cache
|
|
|
from sentry.models import Project, File, EventAttachment
|
|
|
from sentry.signals import event_accepted
|
|
|
from sentry.tasks.store import preprocess_event
|
|
|
from sentry.utils import json, metrics
|
|
|
+from sentry.utils.sdk import mark_scope_as_unsafe
|
|
|
from sentry.utils.dates import to_datetime
|
|
|
from sentry.utils.cache import cache_key_for_event
|
|
|
from sentry.utils.kafka import create_batching_kafka_consumer
|
|
@@ -41,6 +47,11 @@ class IngestConsumerWorker(AbstractBatchWorker):
|
|
|
return message
|
|
|
|
|
|
def flush_batch(self, batch):
|
|
|
+ mark_scope_as_unsafe()
|
|
|
+ with metrics.timer("ingest_consumer.flush_batch"):
|
|
|
+ return self._flush_batch(batch)
|
|
|
+
|
|
|
+ def _flush_batch(self, batch):
|
|
|
attachment_chunks = []
|
|
|
other_messages = []
|
|
|
transactions = []
|
|
@@ -84,22 +95,39 @@ class IngestConsumerWorker(AbstractBatchWorker):
|
|
|
if other_messages:
|
|
|
with metrics.timer("ingest_consumer.process_other_messages_batch"):
|
|
|
for _ in self.pool.imap_unordered(
|
|
|
- lambda args: args[0](args[1], projects=projects), other_messages, chunksize=100
|
|
|
+ lambda args: args[0](args[1], projects=projects), other_messages, chunksize=100,
|
|
|
):
|
|
|
pass
|
|
|
|
|
|
if transactions:
|
|
|
- process_transactions_batch(transactions, projects)
|
|
|
+ with metrics.timer("ingest_consumer.process_transactions"):
|
|
|
+ process_transactions_batch(transactions, projects)
|
|
|
|
|
|
def shutdown(self):
|
|
|
pass
|
|
|
|
|
|
|
|
|
+def trace_func(**span_kwargs):
|
|
|
+ def wrapper(f):
|
|
|
+ @functools.wraps(f)
|
|
|
+ def inner(*args, **kwargs):
|
|
|
+ span_kwargs["sampled"] = random.random() < getattr(
|
|
|
+ settings, "SENTRY_INGEST_CONSUMER_APM_SAMPLING", 0
|
|
|
+ )
|
|
|
+ with sentry_sdk.start_span(**span_kwargs):
|
|
|
+ return f(*args, **kwargs)
|
|
|
+
|
|
|
+ return inner
|
|
|
+
|
|
|
+ return wrapper
|
|
|
+
|
|
|
+
|
|
|
+@trace_func(transaction="ingest_consumer.process_transactions_batch")
|
|
|
@metrics.wraps("ingest_consumer.process_transactions_batch")
|
|
|
def process_transactions_batch(messages, projects):
|
|
|
if options.get("store.transactions-celery") is True:
|
|
|
for message in messages:
|
|
|
- process_event(message, projects)
|
|
|
+ _do_process_event(message, projects)
|
|
|
return
|
|
|
|
|
|
jobs = []
|
|
@@ -119,7 +147,7 @@ def process_transactions_batch(messages, projects):
|
|
|
|
|
|
|
|
|
@metrics.wraps("ingest_consumer.process_event")
|
|
|
-def process_event(message, projects):
|
|
|
+def _do_process_event(message, projects):
|
|
|
payload = message["payload"]
|
|
|
start_time = float(message["start_time"])
|
|
|
event_id = message["event_id"]
|
|
@@ -177,9 +205,14 @@ def process_event(message, projects):
|
|
|
# Preprocess this event, which spawns either process_event or
|
|
|
# save_event. Pass data explicitly to avoid fetching it again from the
|
|
|
# cache.
|
|
|
- preprocess_event(
|
|
|
- cache_key=cache_key, data=data, start_time=start_time, event_id=event_id, project=project
|
|
|
- )
|
|
|
+ with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"):
|
|
|
+ preprocess_event(
|
|
|
+ cache_key=cache_key,
|
|
|
+ data=data,
|
|
|
+ start_time=start_time,
|
|
|
+ event_id=event_id,
|
|
|
+ project=project,
|
|
|
+ )
|
|
|
|
|
|
# remember for an 1 hour that we saved this event (deduplication protection)
|
|
|
cache.set(deduplication_key, "", CACHE_TIMEOUT)
|
|
@@ -188,6 +221,12 @@ def process_event(message, projects):
|
|
|
event_accepted.send_robust(ip=remote_addr, data=data, project=project, sender=process_event)
|
|
|
|
|
|
|
|
|
+@trace_func(transaction="ingest_consumer.process_event")
|
|
|
+def process_event(message, projects):
|
|
|
+ return _do_process_event(message, projects)
|
|
|
+
|
|
|
+
|
|
|
+@trace_func(transaction="ingest_consumer.process_attachment_chunk")
|
|
|
@metrics.wraps("ingest_consumer.process_attachment_chunk")
|
|
|
def process_attachment_chunk(message, projects):
|
|
|
payload = message["payload"]
|
|
@@ -201,6 +240,7 @@ def process_attachment_chunk(message, projects):
|
|
|
)
|
|
|
|
|
|
|
|
|
+@trace_func(transaction="ingest_consumer.process_individual_attachment")
|
|
|
@metrics.wraps("ingest_consumer.process_individual_attachment")
|
|
|
def process_individual_attachment(message, projects):
|
|
|
event_id = message["event_id"]
|
|
@@ -256,6 +296,7 @@ def process_individual_attachment(message, projects):
|
|
|
attachment.delete()
|
|
|
|
|
|
|
|
|
+@trace_func(transaction="ingest_consumer.process_userreport")
|
|
|
@metrics.wraps("ingest_consumer.process_userreport")
|
|
|
def process_userreport(message, projects):
|
|
|
project_id = int(message["project_id"])
|