Browse Source

refs(subscriptions): When creating `QuerySubscriptions`, defer creation in snuba to tasks.

This changes how subscriptions are created in Snuba. By creating in tasks that we briefly defer, we
give the transaction time to commit/rollback and can then be sure that we should
create/update/delete subscriptions in Snuba.

We add in a `status` field to help handle cases where the user mashes buttons and causes an update
to get scheduled at the same time as a create. If the status of the subscription has changed, then
we just skip the operation, and it'll work fine.
Dan Fuller 5 years ago
parent
commit
81c27f4656

+ 1 - 1
migrations_lockfile.txt

@@ -10,7 +10,7 @@ auth: 0008_alter_user_username_max_length
 contenttypes: 0002_remove_content_type_name
 jira_ac: 0001_initial
 nodestore: 0001_initial
-sentry: 0054_create_key_transaction
+sentry: 0055_query_subscription_status
 sessions: 0001_initial
 sites: 0002_alter_domain_unique
 social_auth: 0001_initial

+ 1 - 2
src/sentry/incidents/logic.py

@@ -34,12 +34,11 @@ from sentry.incidents.models import (
 )
 from sentry.models import Integration, Project
 from sentry.snuba.discover import resolve_discover_aliases
-from sentry.snuba.models import QueryAggregations, QueryDatasets
+from sentry.snuba.models import query_aggregation_to_snuba, QueryAggregations, QueryDatasets
 from sentry.snuba.subscriptions import (
     bulk_create_snuba_subscriptions,
     bulk_delete_snuba_subscriptions,
     bulk_update_snuba_subscriptions,
-    query_aggregation_to_snuba,
 )
 from sentry.utils.snuba import bulk_raw_query, SnubaQueryParams, SnubaTSResult
 from sentry.utils.compat import zip

+ 1 - 2
src/sentry/incidents/subscription_processor.py

@@ -11,7 +11,6 @@ from django.db import transaction
 
 from sentry.incidents.logic import create_incident, update_incident_status
 from sentry.incidents.endpoints.serializers import WARNING_TRIGGER_LABEL, CRITICAL_TRIGGER_LABEL
-from sentry.snuba.subscriptions import query_aggregation_to_snuba
 from sentry.incidents.models import (
     AlertRule,
     AlertRuleThresholdType,
@@ -23,7 +22,7 @@ from sentry.incidents.models import (
     TriggerStatus,
 )
 from sentry.incidents.tasks import handle_trigger_action
-from sentry.snuba.models import QueryAggregations
+from sentry.snuba.models import query_aggregation_to_snuba, QueryAggregations
 from sentry.utils import metrics, redis
 from sentry.utils.dates import to_datetime, to_timestamp
 from sentry.utils.compat import zip

+ 42 - 0
src/sentry/migrations/0055_query_subscription_status.py

@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+# Generated by Django 1.11.28 on 2020-03-17 00:30
+from __future__ import unicode_literals
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+    # This flag is used to mark that a migration shouldn't be automatically run in
+    # production. We set this to True for operations that we think are risky and want
+    # someone from ops to run manually and monitor.
+    # General advice is that if in doubt, mark your migration as `is_dangerous`.
+    # Some things you should always mark as dangerous:
+    # - Large data migrations. Typically we want these to be run manually by ops so that
+    #   they can be monitored. Since data migrations will now hold a transaction open
+    #   this is even more important.
+    # - Adding columns to highly active tables, even ones that are NULL.
+    is_dangerous = False
+
+    # This flag is used to decide whether to run this migration in a transaction or not.
+    # By default we prefer to run in a transaction, but for migrations where you want
+    # to `CREATE INDEX CONCURRENTLY` this needs to be set to False. Typically you'll
+    # want to create an index concurrently when adding one to an existing table.
+    atomic = True
+
+
+    dependencies = [
+        ('sentry', '0054_create_key_transaction'),
+    ]
+
+    operations = [
+        migrations.AddField(
+            model_name='querysubscription',
+            name='status',
+            field=models.SmallIntegerField(default=0),
+        ),
+        migrations.AlterField(
+            model_name='querysubscription',
+            name='subscription_id',
+            field=models.TextField(null=True, unique=True),
+        ),
+    ]

+ 14 - 1
src/sentry/snuba/models.py

@@ -15,6 +15,12 @@ class QueryAggregations(Enum):
     UNIQUE_USERS = 1
 
 
+query_aggregation_to_snuba = {
+    QueryAggregations.TOTAL: ("count()", "", "count"),
+    QueryAggregations.UNIQUE_USERS: ("uniq", "tags[sentry:user]", "unique_users"),
+}
+
+
 class QueryDatasets(Enum):
     EVENTS = "events"
 
@@ -35,12 +41,19 @@ class QuerySubscriptionEnvironment(Model):
 class QuerySubscription(Model):
     __core__ = True
 
+    class Status(Enum):
+        ACTIVE = 0
+        CREATING = 1
+        UPDATING = 2
+        DELETING = 3
+
     project = FlexibleForeignKey("sentry.Project", db_constraint=False)
     environments = models.ManyToManyField(
         "sentry.Environment", through=QuerySubscriptionEnvironment
     )
     type = models.TextField()
-    subscription_id = models.TextField(unique=True)
+    status = models.SmallIntegerField(default=Status.ACTIVE.value)
+    subscription_id = models.TextField(unique=True, null=True)
     dataset = models.TextField()
     query = models.TextField()
     # TODO: Remove this default after we migrate

+ 4 - 1
src/sentry/snuba/query_subscription_consumer.py

@@ -12,7 +12,7 @@ from django.conf import settings
 
 from sentry.snuba.json_schemas import SUBSCRIPTION_PAYLOAD_VERSIONS, SUBSCRIPTION_WRAPPER_SCHEMA
 from sentry.snuba.models import QueryDatasets, QuerySubscription
-from sentry.snuba.subscriptions import _delete_from_snuba
+from sentry.snuba.tasks import _delete_from_snuba
 from sentry.utils import metrics
 
 logger = logging.getLogger(__name__)
@@ -161,6 +161,9 @@ class QuerySubscriptionConsumer(object):
                     subscription = QuerySubscription.objects.get_from_cache(
                         subscription_id=contents["subscription_id"]
                     )
+                    if subscription.status != QuerySubscription.Status.ACTIVE.value:
+                        metrics.incr("snuba_query_subscriber.subscription_inactive")
+                        return
             except QuerySubscription.DoesNotExist:
                 metrics.incr("snuba_query_subscriber.subscription_doesnt_exist")
                 logger.error(

+ 21 - 76
src/sentry/snuba/subscriptions.py

@@ -1,24 +1,14 @@
 from __future__ import absolute_import
 
-import json
 import logging
 
-from django.db import transaction
-
-from sentry.api.event_search import get_filter
-from sentry.snuba.discover import resolve_discover_aliases
-from sentry.snuba.models import (
-    QueryAggregations,
-    QueryDatasets,
-    QuerySubscription,
-    QuerySubscriptionEnvironment,
+from sentry.snuba.models import QuerySubscription, QuerySubscriptionEnvironment
+from sentry.snuba.tasks import (
+    create_subscription_in_snuba,
+    delete_subscription_from_snuba,
+    update_subscription_in_snuba,
 )
-from sentry.utils.snuba import _snuba_pool, SnubaError
 
-query_aggregation_to_snuba = {
-    QueryAggregations.TOTAL: ("count()", "", "count"),
-    QueryAggregations.UNIQUE_USERS: ("uniq", "tags[sentry:user]", "unique_users"),
-}
 logger = logging.getLogger(__name__)
 
 
@@ -41,7 +31,7 @@ def bulk_create_snuba_subscriptions(
     :return: A list of QuerySubscriptions
     """
     subscriptions = []
-    # TODO: Batch this up properly once we move to tasks.
+    # TODO: Batch this up properly once we care about multi-project rules.
     for project in projects:
         subscriptions.append(
             create_snuba_subscription(
@@ -76,17 +66,10 @@ def create_snuba_subscription(
     :param environments: List of environments to filter by
     :return: The QuerySubscription representing the subscription
     """
-    # TODO: Move this call to snuba into a task. This lets us successfully create a
-    # subscription in postgres and rollback as needed without having to create/delete
-    # from Snuba
-    subscription_id = _create_in_snuba(
-        project, dataset, query, aggregation, time_window, resolution, environments
-    )
-
     subscription = QuerySubscription.objects.create(
+        status=QuerySubscription.Status.CREATING.value,
         project=project,
         type=subscription_type,
-        subscription_id=subscription_id,
         dataset=dataset.value,
         query=query,
         aggregation=aggregation.value,
@@ -99,6 +82,10 @@ def create_snuba_subscription(
     ]
     QuerySubscriptionEnvironment.objects.bulk_create(sub_envs)
 
+    create_subscription_in_snuba.apply_async(
+        kwargs={"query_subscription_id": subscription.id}, countdown=5
+    )
+
     return subscription
 
 
@@ -118,7 +105,7 @@ def bulk_update_snuba_subscriptions(
     :return: A list of QuerySubscriptions
     """
     updated_subscriptions = []
-    # TODO: Batch this up properly once we move to tasks.
+    # TODO: Batch this up properly once we care about multi-project rules.
     for subscription in subscriptions:
         updated_subscriptions.append(
             update_snuba_subscription(
@@ -142,16 +129,8 @@ def update_snuba_subscription(
     :param environments: List of environments to filter by
     :return: The QuerySubscription representing the subscription
     """
-    # TODO: Move this call to snuba into a task. This lets us successfully update a
-    # subscription in postgres and rollback as needed without having to create/delete
-    # from snuba
-    dataset = QueryDatasets(subscription.dataset)
-    _delete_from_snuba(dataset, subscription.subscription_id)
-    subscription_id = _create_in_snuba(
-        subscription.project, dataset, query, aggregation, time_window, resolution, environments
-    )
     subscription.update(
-        subscription_id=subscription_id,
+        status=QuerySubscription.Status.UPDATING.value,
         query=query,
         aggregation=aggregation.value,
         time_window=int(time_window.total_seconds()),
@@ -165,6 +144,10 @@ def update_snuba_subscription(
             query_subscription=subscription, environment=e
         )
 
+    update_subscription_in_snuba.apply_async(
+        kwargs={"query_subscription_id": subscription.id}, countdown=5
+    )
+
     return subscription
 
 
@@ -175,7 +158,7 @@ def bulk_delete_snuba_subscriptions(subscriptions):
     :return:
     """
     for subscription in subscriptions:
-        # TODO: Batch this up properly once we move to tasks.
+        # TODO: Batch this up properly once we care about multi-project rules.
         delete_snuba_subscription(subscription)
 
 
@@ -185,45 +168,7 @@ def delete_snuba_subscription(subscription):
     :param subscription: The subscription to delete
     :return:
     """
-    with transaction.atomic():
-        subscription.delete()
-        # TODO: Move this call to snuba into a task. This lets us successfully delete a
-        # subscription in postgres and rollback as needed without having to create/delete
-        # from snuba
-        _delete_from_snuba(QueryDatasets(subscription.dataset), subscription.subscription_id)
-
-
-def _create_in_snuba(project, dataset, query, aggregation, time_window, resolution, environments):
-    conditions = resolve_discover_aliases({"conditions": get_filter(query).conditions})[0][
-        "conditions"
-    ]
-    if environments:
-        conditions.append(["environment", "IN", [env.name for env in environments]])
-    response = _snuba_pool.urlopen(
-        "POST",
-        "/%s/subscriptions" % (dataset.value,),
-        body=json.dumps(
-            {
-                "project_id": project.id,
-                "dataset": dataset.value,
-                # We only care about conditions here. Filter keys only matter for
-                # filtering to project and groups. Projects are handled with an
-                # explicit param, and groups can't be queried here.
-                "conditions": conditions,
-                "aggregations": [query_aggregation_to_snuba[aggregation]],
-                "time_window": int(time_window.total_seconds()),
-                "resolution": int(resolution.total_seconds()),
-            }
-        ),
-    )
-    if response.status != 202:
-        raise SnubaError("HTTP %s response from Snuba!" % response.status)
-    return json.loads(response.data)["subscription_id"]
-
-
-def _delete_from_snuba(dataset, subscription_id):
-    response = _snuba_pool.urlopen(
-        "DELETE", "/%s/subscriptions/%s" % (dataset.value, subscription_id)
+    subscription.update(status=QuerySubscription.Status.DELETING.value)
+    delete_subscription_from_snuba.apply_async(
+        kwargs={"query_subscription_id": subscription.id}, countdown=5
     )
-    if response.status != 202:
-        raise SnubaError("HTTP %s response from Snuba!" % response.status)

+ 141 - 0
src/sentry/snuba/tasks.py

@@ -0,0 +1,141 @@
+from __future__ import absolute_import
+
+import json
+
+from sentry.api.event_search import get_filter
+from sentry.snuba.discover import resolve_discover_aliases
+from sentry.snuba.models import (
+    QueryAggregations,
+    QueryDatasets,
+    QuerySubscription,
+    query_aggregation_to_snuba,
+)
+from sentry.tasks.base import instrumented_task
+from sentry.utils import metrics
+from sentry.utils.snuba import _snuba_pool, SnubaError
+
+
+@instrumented_task(
+    name="sentry.snuba.tasks.create_subscription_in_snuba",
+    queue="subscriptions",
+    default_retry_delay=5,
+    max_retries=5,
+)
+def create_subscription_in_snuba(query_subscription_id):
+    """
+    Task to create a corresponding subscription in Snuba from a `QuerySubscription` in
+    Sentry. We store the snuba subscription id locally on success.
+    """
+    try:
+        subscription = QuerySubscription.objects.get(id=query_subscription_id)
+    except QuerySubscription.DoesNotExist:
+        metrics.incr("snuba.subscriptions.create.subscription_does_not_exist")
+        return
+    if subscription.status != QuerySubscription.Status.CREATING.value:
+        metrics.incr("snuba.subscriptions.create.incorrect_status")
+        return
+    if subscription.subscription_id is not None:
+        metrics.incr("snuba.subscriptions.create.already_created_in_snuba")
+        return
+
+    subscription_id = _create_in_snuba(subscription)
+    subscription.update(
+        status=QuerySubscription.Status.ACTIVE.value, subscription_id=subscription_id
+    )
+
+
+@instrumented_task(
+    name="sentry.snuba.tasks.update_subscription_in_snuba",
+    queue="subscriptions",
+    default_retry_delay=5,
+    max_retries=5,
+)
+def update_subscription_in_snuba(query_subscription_id):
+    """
+    Task to update a corresponding subscription in Snuba from a `QuerySubscription` in
+    Sentry. Updating in Snuba means deleting the existing subscription, then creating a
+    new one.
+    """
+    try:
+        subscription = QuerySubscription.objects.get(id=query_subscription_id)
+    except QuerySubscription.DoesNotExist:
+        metrics.incr("snuba.subscriptions.update.subscription_does_not_exist")
+        return
+
+    if subscription.status != QuerySubscription.Status.UPDATING.value:
+        metrics.incr("snuba.subscriptions.update.incorrect_status")
+        return
+
+    if subscription.subscription_id is not None:
+        _delete_from_snuba(QueryDatasets(subscription.dataset), subscription.subscription_id)
+
+    subscription_id = _create_in_snuba(subscription)
+    subscription.update(
+        status=QuerySubscription.Status.ACTIVE.value, subscription_id=subscription_id
+    )
+
+
+@instrumented_task(
+    name="sentry.snuba.tasks.delete_subscription_from_snuba",
+    queue="subscriptions",
+    default_retry_delay=5,
+    max_retries=5,
+)
+def delete_subscription_from_snuba(query_subscription_id):
+    """
+    Task to delete a corresponding subscription in Snuba from a `QuerySubscription` in
+    Sentry. Deletes the local subscription once we've successfully removed from Snuba.
+    """
+    try:
+        subscription = QuerySubscription.objects.get(id=query_subscription_id)
+    except QuerySubscription.DoesNotExist:
+        metrics.incr("snuba.subscriptions.delete.subscription_does_not_exist")
+        return
+
+    if subscription.status != QuerySubscription.Status.DELETING.value:
+        metrics.incr("snuba.subscriptions.delete.incorrect_status")
+        return
+
+    if subscription.subscription_id is not None:
+        _delete_from_snuba(QueryDatasets(subscription.dataset), subscription.subscription_id)
+
+    subscription.delete()
+
+
+def _create_in_snuba(subscription):
+    conditions = resolve_discover_aliases(
+        {"conditions": get_filter(subscription.query).conditions}
+    )[0]["conditions"]
+    environments = list(subscription.environments.all())
+    if environments:
+        conditions.append(["environment", "IN", [env.name for env in environments]])
+    response = _snuba_pool.urlopen(
+        "POST",
+        "/%s/subscriptions" % (subscription.dataset,),
+        body=json.dumps(
+            {
+                "project_id": subscription.project_id,
+                "dataset": subscription.dataset,
+                # We only care about conditions here. Filter keys only matter for
+                # filtering to project and groups. Projects are handled with an
+                # explicit param, and groups can't be queried here.
+                "conditions": conditions,
+                "aggregations": [
+                    query_aggregation_to_snuba[QueryAggregations(subscription.aggregation)]
+                ],
+                "time_window": subscription.time_window,
+                "resolution": subscription.resolution,
+            }
+        ),
+    )
+    if response.status != 202:
+        raise SnubaError("HTTP %s response from Snuba!" % response.status)
+    return json.loads(response.data)["subscription_id"]
+
+
+def _delete_from_snuba(dataset, subscription_id):
+    response = _snuba_pool.urlopen(
+        "DELETE", "/%s/subscriptions/%s" % (dataset.value, subscription_id)
+    )
+    if response.status != 202:
+        raise SnubaError("HTTP %s response from Snuba!" % response.status)

+ 8 - 4
tests/sentry/incidents/test_logic.py

@@ -717,7 +717,8 @@ class UpdateAlertRuleTest(TestCase, BaseIncidentsTest):
 
     def test_update_subscription(self):
         old_subscription_id = self.alert_rule.query_subscriptions.get().subscription_id
-        update_alert_rule(self.alert_rule, query="some new query")
+        with self.tasks():
+            update_alert_rule(self.alert_rule, query="some new query")
         assert old_subscription_id != self.alert_rule.query_subscriptions.get().subscription_id
 
     def test_empty_query(self):
@@ -768,7 +769,8 @@ class UpdateAlertRuleTest(TestCase, BaseIncidentsTest):
         query_update = "level:warning"
         new_project = self.create_project(fire_project_created=True)
         updated_projects = [self.project, new_project]
-        update_alert_rule(alert_rule, updated_projects, query=query_update)
+        with self.tasks():
+            update_alert_rule(alert_rule, updated_projects, query=query_update)
         updated_subscriptions = alert_rule.query_subscriptions.all()
         assert set([sub.project for sub in updated_subscriptions]) == set(updated_projects)
         for sub in updated_subscriptions:
@@ -809,7 +811,8 @@ class UpdateAlertRuleTest(TestCase, BaseIncidentsTest):
             set([sub.project for sub in QuerySubscription.objects.filter(alert_rules=alert_rule)])
             == projects
         )
-        update_alert_rule(alert_rule, excluded_projects=[self.project])
+        with self.tasks():
+            update_alert_rule(alert_rule, excluded_projects=[self.project])
         assert [
             sub.project for sub in QuerySubscription.objects.filter(alert_rules=alert_rule)
         ] == [new_project]
@@ -828,7 +831,8 @@ class UpdateAlertRuleTest(TestCase, BaseIncidentsTest):
             set([sub.project for sub in QuerySubscription.objects.filter(alert_rules=alert_rule)])
             == projects
         )
-        update_alert_rule(alert_rule, projects=[new_project], include_all_projects=False)
+        with self.tasks():
+            update_alert_rule(alert_rule, projects=[new_project], include_all_projects=False)
         assert [
             sub.project for sub in QuerySubscription.objects.filter(alert_rules=alert_rule)
         ] == [new_project]

+ 1 - 2
tests/sentry/incidents/test_subscription_processor.py

@@ -17,7 +17,6 @@ from sentry.incidents.logic import (
     create_alert_rule_trigger,
     create_alert_rule_trigger_action,
 )
-from sentry.snuba.subscriptions import query_aggregation_to_snuba
 from sentry.incidents.models import (
     AlertRule,
     AlertRuleThresholdType,
@@ -39,7 +38,7 @@ from sentry.incidents.subscription_processor import (
     SubscriptionProcessor,
     update_alert_rule_stats,
 )
-from sentry.snuba.models import QueryAggregations, QuerySubscription
+from sentry.snuba.models import query_aggregation_to_snuba, QueryAggregations, QuerySubscription
 from sentry.testutils import TestCase
 from sentry.utils.dates import to_timestamp
 from sentry.utils.compat import map

Some files were not shown because too many files changed in this diff