|
@@ -3,6 +3,7 @@ from typing import Any, Dict, Mapping, Optional, Tuple
|
|
|
|
|
|
import jsonschema
|
|
|
import rapidjson
|
|
|
+import sentry_sdk
|
|
|
from arroyo import Topic
|
|
|
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
|
|
|
from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload
|
|
@@ -234,28 +235,41 @@ def _process_message(
|
|
|
:raises EventLookupError: when the provided event_id in the message couldn't be found.
|
|
|
"""
|
|
|
metrics.incr("occurrence_ingest.messages", sample_rate=1.0)
|
|
|
-
|
|
|
- try:
|
|
|
- kwargs = _get_kwargs(message)
|
|
|
- occurrence_data = kwargs["occurrence_data"]
|
|
|
- if occurrence_data["type"] not in INGEST_ALLOWED_ISSUE_TYPES:
|
|
|
- return None
|
|
|
-
|
|
|
- project = Project.objects.get_from_cache(id=occurrence_data["project_id"])
|
|
|
- organization = Organization.objects.get_from_cache(id=project.organization_id)
|
|
|
-
|
|
|
- if not features.has("organizations:profile-blocked-main-thread-ingest", organization):
|
|
|
- metrics.incr("occurrence_ingest.dropped_feature_disabled", sample_rate=1.0)
|
|
|
- return None
|
|
|
-
|
|
|
- if "event_data" in kwargs:
|
|
|
- return process_event_and_issue_occurrence(
|
|
|
- kwargs["occurrence_data"], kwargs["event_data"]
|
|
|
- )
|
|
|
- else:
|
|
|
- return lookup_event_and_process_issue_occurrence(kwargs["occurrence_data"])
|
|
|
- except (ValueError, KeyError) as e:
|
|
|
- raise InvalidEventPayloadError(e)
|
|
|
+ with sentry_sdk.start_transaction(
|
|
|
+ op="_process_message",
|
|
|
+ name="issues.occurrence_consumer",
|
|
|
+ sampled=True,
|
|
|
+ ):
|
|
|
+ try:
|
|
|
+ kwargs = _get_kwargs(message)
|
|
|
+ occurrence_data = kwargs["occurrence_data"]
|
|
|
+ if occurrence_data["type"] not in INGEST_ALLOWED_ISSUE_TYPES:
|
|
|
+ return None
|
|
|
+
|
|
|
+ project = Project.objects.get_from_cache(id=occurrence_data["project_id"])
|
|
|
+ organization = Organization.objects.get_from_cache(id=project.organization_id)
|
|
|
+
|
|
|
+ sentry_sdk.set_tag("organization_id", organization.id)
|
|
|
+ sentry_sdk.set_tag("organization_slug", organization.slug)
|
|
|
+ sentry_sdk.set_tag("project_id", project.id)
|
|
|
+ sentry_sdk.set_tag("project_slug", project.slug)
|
|
|
+
|
|
|
+ if not features.has("organizations:profile-blocked-main-thread-ingest", organization):
|
|
|
+ metrics.incr("occurrence_ingest.dropped_feature_disabled", sample_rate=1.0)
|
|
|
+ sentry_sdk.set_tag("result", "dropped_feature_disabled")
|
|
|
+ return None
|
|
|
+
|
|
|
+ if "event_data" in kwargs:
|
|
|
+ sentry_sdk.set_tag("result", "success")
|
|
|
+ return process_event_and_issue_occurrence(
|
|
|
+ kwargs["occurrence_data"], kwargs["event_data"]
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ sentry_sdk.set_tag("result", "success")
|
|
|
+ return lookup_event_and_process_issue_occurrence(kwargs["occurrence_data"])
|
|
|
+ except (ValueError, KeyError) as e:
|
|
|
+ sentry_sdk.set_tag("result", "error")
|
|
|
+ raise InvalidEventPayloadError(e)
|
|
|
|
|
|
|
|
|
class OccurrenceStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
|