|
@@ -266,28 +266,23 @@ class QuerySubscriptionConsumer:
|
|
)
|
|
)
|
|
return
|
|
return
|
|
|
|
|
|
- logger.debug(
|
|
|
|
- "query-subscription-consumer.handle_message",
|
|
|
|
- extra={
|
|
|
|
- "timestamp": contents["timestamp"],
|
|
|
|
- "query_subscription_id": contents["subscription_id"],
|
|
|
|
- "project_id": subscription.project_id,
|
|
|
|
- "subscription_dataset": subscription.snuba_query.dataset,
|
|
|
|
- "subscription_query": subscription.snuba_query.query,
|
|
|
|
- "subscription_aggregation": subscription.snuba_query.aggregate,
|
|
|
|
- "subscription_time_window": subscription.snuba_query.time_window,
|
|
|
|
- "subscription_resolution": subscription.snuba_query.resolution,
|
|
|
|
- "offset": message.offset(),
|
|
|
|
- "partition": message.partition(),
|
|
|
|
- "value": message.value(),
|
|
|
|
- },
|
|
|
|
- )
|
|
|
|
|
|
+ sentry_sdk.set_tag("project_id", subscription.project_id)
|
|
|
|
+ sentry_sdk.set_tag("query_subscription_id", contents["subscription_id"])
|
|
|
|
|
|
callback = subscriber_registry[subscription.type]
|
|
callback = subscriber_registry[subscription.type]
|
|
with sentry_sdk.start_span(op="process_message") as span, metrics.timer(
|
|
with sentry_sdk.start_span(op="process_message") as span, metrics.timer(
|
|
"snuba_query_subscriber.callback.duration", instance=subscription.type
|
|
"snuba_query_subscriber.callback.duration", instance=subscription.type
|
|
):
|
|
):
|
|
span.set_data("payload", contents)
|
|
span.set_data("payload", contents)
|
|
|
|
+ span.set_data("subscription_dataset", subscription.snuba_query.dataset)
|
|
|
|
+ span.set_data("subscription_query", subscription.snuba_query.query)
|
|
|
|
+ span.set_data("subscription_aggregation", subscription.snuba_query.aggregate)
|
|
|
|
+ span.set_data("subscription_time_window", subscription.snuba_query.time_window)
|
|
|
|
+ span.set_data("subscription_resolution", subscription.snuba_query.resolution)
|
|
|
|
+ span.set_data("message_offset", message.offset())
|
|
|
|
+ span.set_data("message_partition", message.partition())
|
|
|
|
+ span.set_data("message_value", message.value())
|
|
|
|
+
|
|
callback(contents, subscription)
|
|
callback(contents, subscription)
|
|
|
|
|
|
def parse_message_value(self, value):
|
|
def parse_message_value(self, value):
|