Browse Source

chore(subscriptions): Add Typing to Incident Tasks (#46647)

Follow up to https://github.com/getsentry/sentry/pull/46614 in which I
add typing to the incident tasks file.

---------

Co-authored-by: Lyn <lyn.nagara@gmail.com>
Colleen O'Rourke 1 year ago
parent
commit
2924db978d

+ 2 - 0
mypy.ini

@@ -71,7 +71,9 @@ files = fixtures/mypy-stubs,
         src/sentry/grouping/strategies/template.py,
         src/sentry/grouping/strategies/template.py,
         src/sentry/grouping/strategies/utils.py,
         src/sentry/grouping/strategies/utils.py,
         src/sentry/incidents/charts.py,
         src/sentry/incidents/charts.py,
+        src/sentry/incidents/tasks.py,
         src/sentry/incidents/subscription_processor.py,
         src/sentry/incidents/subscription_processor.py,
+        src/sentry/incidents/utils/types.py,
         src/sentry/ingest/billing_metrics_consumer.py,
         src/sentry/ingest/billing_metrics_consumer.py,
         src/sentry/ingest/transaction_clusterer/,
         src/sentry/ingest/transaction_clusterer/,
         src/sentry/integrations/base.py,
         src/sentry/integrations/base.py,

+ 2 - 10
src/sentry/incidents/subscription_processor.py

@@ -4,7 +4,7 @@ import logging
 import operator
 import operator
 from copy import deepcopy
 from copy import deepcopy
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
-from typing import Any, Dict, List, Optional, Sequence, Tuple, TypedDict, TypeVar, cast
+from typing import Dict, List, Optional, Sequence, Tuple, TypeVar, cast
 
 
 from django.conf import settings
 from django.conf import settings
 from django.db import transaction
 from django.db import transaction
@@ -31,6 +31,7 @@ from sentry.incidents.models import (
     TriggerStatus,
     TriggerStatus,
 )
 )
 from sentry.incidents.tasks import handle_trigger_action
 from sentry.incidents.tasks import handle_trigger_action
+from sentry.incidents.utils.types import SubscriptionUpdate
 from sentry.models import Project
 from sentry.models import Project
 from sentry.snuba.dataset import Dataset
 from sentry.snuba.dataset import Dataset
 from sentry.snuba.entity_subscription import (
 from sentry.snuba.entity_subscription import (
@@ -62,15 +63,6 @@ CRASH_RATE_ALERT_MINIMUM_THRESHOLD: Optional[int] = None
 T = TypeVar("T")
 T = TypeVar("T")
 
 
 
 
-class SubscriptionUpdate(TypedDict):
-    subscription_id: int
-    values: Dict[str, List[Any]]
-    timestamp: datetime
-    interval: int
-    partition: int
-    offset: int
-
-
 class SubscriptionProcessor:
 class SubscriptionProcessor:
     """
     """
     Class for processing subscription updates for an alert rule. Accepts a subscription
     Class for processing subscription updates for an alert rule. Accepts a subscription

+ 32 - 18
src/sentry/incidents/tasks.py

@@ -1,4 +1,7 @@
+from __future__ import annotations
+
 import logging
 import logging
+from typing import Any, Dict, Optional
 from urllib.parse import urlencode
 from urllib.parse import urlencode
 
 
 from django.urls import reverse
 from django.urls import reverse
@@ -15,9 +18,11 @@ from sentry.incidents.models import (
     IncidentStatus,
     IncidentStatus,
     IncidentStatusMethod,
     IncidentStatusMethod,
 )
 )
+from sentry.incidents.utils.types import SubscriptionUpdate
 from sentry.models import Project
 from sentry.models import Project
-from sentry.services.hybrid_cloud.user import user_service
+from sentry.services.hybrid_cloud.user import RpcUser, user_service
 from sentry.snuba.dataset import Dataset
 from sentry.snuba.dataset import Dataset
+from sentry.snuba.models import QuerySubscription
 from sentry.snuba.query_subscription_consumer import register_subscriber
 from sentry.snuba.query_subscription_consumer import register_subscriber
 from sentry.tasks.base import instrumented_task
 from sentry.tasks.base import instrumented_task
 from sentry.utils import metrics
 from sentry.utils import metrics
@@ -31,8 +36,8 @@ INCIDENT_SNAPSHOT_BATCH_SIZE = 50
 SUBSCRIPTION_METRICS_LOGGER = "subscription_metrics_logger"
 SUBSCRIPTION_METRICS_LOGGER = "subscription_metrics_logger"
 
 
 
 
-@instrumented_task(name="sentry.incidents.tasks.send_subscriber_notifications", queue="incidents")
-def send_subscriber_notifications(activity_id):
+@instrumented_task(name="sentry.incidents.tasks.send_subscriber_notifications", queue="incidents")  # type: ignore
+def send_subscriber_notifications(activity_id: int) -> None:
     from sentry.incidents.logic import get_incident_subscribers, unsubscribe_from_incident
     from sentry.incidents.logic import get_incident_subscribers, unsubscribe_from_incident
 
 
     try:
     try:
@@ -67,7 +72,9 @@ def send_subscriber_notifications(activity_id):
             msg.send_async([subscriber_user.email])
             msg.send_async([subscriber_user.email])
 
 
 
 
-def generate_incident_activity_email(activity, user, activity_user=None):
+def generate_incident_activity_email(
+    activity: IncidentActivity, user: RpcUser, activity_user: Optional[RpcUser] = None
+) -> MessageBuilder:
     incident = activity.incident
     incident = activity.incident
     return MessageBuilder(
     return MessageBuilder(
         subject=f"Activity on Alert {incident.title} (#{incident.identifier})",
         subject=f"Activity on Alert {incident.title} (#{incident.identifier})",
@@ -78,7 +85,9 @@ def generate_incident_activity_email(activity, user, activity_user=None):
     )
     )
 
 
 
 
-def build_activity_context(activity, user, activity_user=None):
+def build_activity_context(
+    activity: IncidentActivity, user: RpcUser, activity_user: Optional[RpcUser] = None
+) -> Dict[str, Any]:
     if activity_user is None:
     if activity_user is None:
         activity_user = user_service.get_user(user_id=activity.user_id)
         activity_user = user_service.get_user(user_id=activity.user_id)
 
 
@@ -112,12 +121,11 @@ def build_activity_context(activity, user, activity_user=None):
 
 
 
 
 @register_subscriber(SUBSCRIPTION_METRICS_LOGGER)
 @register_subscriber(SUBSCRIPTION_METRICS_LOGGER)
-def handle_subscription_metrics_logger(subscription_update, subscription):
+def handle_subscription_metrics_logger(
+    subscription_update: SubscriptionUpdate, subscription: QuerySubscription
+) -> None:
     """
     """
     Logs results from a `QuerySubscription`.
     Logs results from a `QuerySubscription`.
-    :param subscription_update: dict formatted according to schemas in
-    sentry_kafka_schemas.schema_types.events_subscription_results_v1.SubscriptionResults
-    :param subscription: The `QuerySubscription` that this update is for
     """
     """
     from sentry.incidents.subscription_processor import SubscriptionProcessor
     from sentry.incidents.subscription_processor import SubscriptionProcessor
 
 
@@ -125,7 +133,7 @@ def handle_subscription_metrics_logger(subscription_update, subscription):
         if subscription.snuba_query.dataset == Dataset.Metrics.value:
         if subscription.snuba_query.dataset == Dataset.Metrics.value:
             processor = SubscriptionProcessor(subscription)
             processor = SubscriptionProcessor(subscription)
             # XXX: Temporary hack so that we can extract these values without raising an exception
             # XXX: Temporary hack so that we can extract these values without raising an exception
-            processor.reset_trigger_counts = lambda *arg, **kwargs: None
+            processor.reset_trigger_counts = lambda *arg, **kwargs: None  # type: ignore
             aggregation_value = processor.get_aggregation_value(subscription_update)
             aggregation_value = processor.get_aggregation_value(subscription_update)
 
 
             logger.info(
             logger.info(
@@ -143,12 +151,11 @@ def handle_subscription_metrics_logger(subscription_update, subscription):
 
 
 
 
 @register_subscriber(INCIDENTS_SNUBA_SUBSCRIPTION_TYPE)
 @register_subscriber(INCIDENTS_SNUBA_SUBSCRIPTION_TYPE)
-def handle_snuba_query_update(subscription_update, subscription):
+def handle_snuba_query_update(
+    subscription_update: SubscriptionUpdate, subscription: QuerySubscription
+) -> None:
     """
     """
     Handles a subscription update for a `QuerySubscription`.
     Handles a subscription update for a `QuerySubscription`.
-    :param subscription_update: dict formatted according to schemas in
-    sentry_kafka_schemas.schema_types.events_subscription_results_v1.SubscriptionResults
-    :param subscription: The `QuerySubscription` that this update is for
     """
     """
     from sentry.incidents.subscription_processor import SubscriptionProcessor
     from sentry.incidents.subscription_processor import SubscriptionProcessor
 
 
@@ -162,8 +169,15 @@ def handle_snuba_query_update(subscription_update, subscription):
     queue="incidents",
     queue="incidents",
     default_retry_delay=60,
     default_retry_delay=60,
     max_retries=5,
     max_retries=5,
-)
-def handle_trigger_action(action_id, incident_id, project_id, method, metric_value=None, **kwargs):
+)  # type: ignore
+def handle_trigger_action(
+    action_id: int,
+    incident_id: int,
+    project_id: int,
+    method: str,
+    metric_value: Optional[int] = None,
+    **kwargs: Any,
+) -> None:
     from sentry.incidents.logic import CRITICAL_TRIGGER_LABEL, WARNING_TRIGGER_LABEL
     from sentry.incidents.logic import CRITICAL_TRIGGER_LABEL, WARNING_TRIGGER_LABEL
 
 
     try:
     try:
@@ -220,8 +234,8 @@ def handle_trigger_action(action_id, incident_id, project_id, method, metric_val
     queue="incidents",
     queue="incidents",
     default_retry_delay=60,
     default_retry_delay=60,
     max_retries=2,
     max_retries=2,
-)
-def auto_resolve_snapshot_incidents(alert_rule_id, **kwargs):
+)  # type: ignore
+def auto_resolve_snapshot_incidents(alert_rule_id: int, **kwargs: Any) -> None:
     from sentry.incidents.logic import update_incident_status
     from sentry.incidents.logic import update_incident_status
     from sentry.incidents.models import AlertRule
     from sentry.incidents.models import AlertRule
 
 

+ 9 - 0
src/sentry/incidents/utils/types.py

@@ -0,0 +1,9 @@
+from datetime import datetime
+from typing import Any, TypedDict
+
+
+class SubscriptionUpdate(TypedDict):
+    entity: str
+    subscription_id: str
+    values: Any
+    timestamp: datetime

+ 9 - 6
src/sentry/snuba/query_subscription_consumer.py

@@ -27,6 +27,7 @@ from sentry_kafka_schemas.schema_types.events_subscription_results_v1 import (
 )
 )
 
 
 from sentry import options
 from sentry import options
+from sentry.incidents.utils.types import SubscriptionUpdate
 from sentry.snuba.dataset import Dataset, EntityKey
 from sentry.snuba.dataset import Dataset, EntityKey
 from sentry.snuba.models import QuerySubscription
 from sentry.snuba.models import QuerySubscription
 from sentry.snuba.tasks import _delete_from_snuba
 from sentry.snuba.tasks import _delete_from_snuba
@@ -34,8 +35,7 @@ from sentry.utils import kafka_config, metrics
 from sentry.utils.arroyo import MetricsWrapper
 from sentry.utils.arroyo import MetricsWrapper
 
 
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
-
-TQuerySubscriptionCallable = Callable[[PayloadV3, QuerySubscription], None]
+TQuerySubscriptionCallable = Callable[[SubscriptionUpdate, QuerySubscription], None]
 
 
 subscriber_registry: Dict[str, TQuerySubscriptionCallable] = {}
 subscriber_registry: Dict[str, TQuerySubscriptionCallable] = {}
 
 
@@ -68,7 +68,7 @@ def register_subscriber(
     return inner
     return inner
 
 
 
 
-def parse_message_value(value: bytes, jsoncodec: JsonCodec) -> PayloadV3:
+def parse_message_value(value: bytes, jsoncodec: JsonCodec) -> SubscriptionUpdate:
     """
     """
     Parses the value received via the Kafka consumer and verifies that it
     Parses the value received via the Kafka consumer and verifies that it
     matches the expected schema.
     matches the expected schema.
@@ -87,9 +87,12 @@ def parse_message_value(value: bytes, jsoncodec: JsonCodec) -> PayloadV3:
     # break things. This should convert the payload into a class rather than passing
     # break things. This should convert the payload into a class rather than passing
     # the dict around, but until we get time to refactor we can keep things working
     # the dict around, but until we get time to refactor we can keep things working
     # here.
     # here.
-    payload.setdefault("values", payload.get("result"))  # type: ignore
-    payload["timestamp"] = parse_date(payload["timestamp"]).replace(tzinfo=pytz.utc)  # type: ignore
-    return payload
+    return {
+        "entity": payload["entity"],
+        "subscription_id": payload["subscription_id"],
+        "values": payload["result"],
+        "timestamp": parse_date(payload["timestamp"]).replace(tzinfo=pytz.utc),
+    }
 
 
 
 
 def handle_message(
 def handle_message(

+ 2 - 0
tests/sentry/snuba/test_query_subscription_consumer.py

@@ -111,6 +111,8 @@ class HandleMessageTest(BaseQuerySubscriptionTest, TestCase):
 
 
         data = deepcopy(data)
         data = deepcopy(data)
         data["payload"]["values"] = data["payload"]["result"]
         data["payload"]["values"] = data["payload"]["result"]
+        data["payload"].pop("result")
+        data["payload"].pop("request")
         data["payload"]["timestamp"] = parse_date(data["payload"]["timestamp"]).replace(
         data["payload"]["timestamp"] = parse_date(data["payload"]["timestamp"]).replace(
             tzinfo=pytz.utc
             tzinfo=pytz.utc
         )
         )