Browse Source

feat(escalating-issues): Don't create new task for each project & 1000 groups (#52490)

## Objective:
Optimize auto transition tasks to prevent instantiating a bunch of tasks
in the queue.
NisanthanNanthakumar 1 year ago
parent
commit
39f67a52d2

+ 78 - 84
src/sentry/tasks/auto_ongoing_issues.py

@@ -1,7 +1,7 @@
 import logging
 from datetime import datetime, timedelta
 from functools import wraps
-from typing import Optional
+from typing import List, Optional
 
 import pytz
 from django.db import OperationalError
@@ -10,6 +10,7 @@ from sentry_sdk.crons.decorator import monitor
 
 from sentry import features
 from sentry.conf.server import CELERY_ISSUE_STATES_QUEUE
+from sentry.constants import ObjectStatus
 from sentry.issues.ongoing import bulk_transition_group_to_ongoing
 from sentry.models import (
     Group,
@@ -22,11 +23,13 @@ from sentry.models import (
 from sentry.monitoring.queues import backend
 from sentry.tasks.base import instrumented_task, retry
 from sentry.types.group import GroupSubStatus
+from sentry.utils.iterators import chunked
 from sentry.utils.query import RangeQuerySetWrapper
 
 logger = logging.getLogger(__name__)
 
 TRANSITION_AFTER_DAYS = 7
+ITERATOR_CHUNK = 10_000
 
 
 def skip_if_queue_has_items(func):
@@ -67,18 +70,21 @@ def skip_if_queue_has_items(func):
 def schedule_auto_transition_new() -> None:
 
     now = datetime.now(tz=pytz.UTC)
-    three_days_past = now - timedelta(days=TRANSITION_AFTER_DAYS)
+    seven_days_ago = now - timedelta(days=TRANSITION_AFTER_DAYS)
 
     for org in RangeQuerySetWrapper(Organization.objects.filter(status=OrganizationStatus.ACTIVE)):
         if features.has("organizations:escalating-issues", org):
-            for project_id in Project.objects.filter(organization_id=org.id).values_list(
-                "id", flat=True
-            ):
-                auto_transition_issues_new_to_ongoing.delay(
-                    project_id=project_id,
-                    first_seen_lte=int(three_days_past.timestamp()),
-                    expires=now + timedelta(hours=1),
-                )
+            project_ids = list(
+                Project.objects.filter(
+                    organization_id=org.id, status=ObjectStatus.ACTIVE
+                ).values_list("id", flat=True)
+            )
+
+            auto_transition_issues_new_to_ongoing.delay(
+                project_ids=project_ids,
+                first_seen_lte=int(seven_days_ago.timestamp()),
+                expires=now + timedelta(hours=1),
+            )
 
 
 @instrumented_task(
@@ -93,38 +99,32 @@ def schedule_auto_transition_new() -> None:
 @retry(on=(OperationalError,))
 @skip_if_queue_has_items
 def auto_transition_issues_new_to_ongoing(
-    project_id: int,
+    project_ids: List[int],
     first_seen_lte: int,
-    first_seen_gte: Optional[int] = None,
-    chunk_size: int = 1000,
+    project_id: Optional[int] = None,  # TODO(nisanthan): Remove this arg in next PR
     **kwargs,
 ) -> None:
-    queryset = Group.objects.filter(
-        project_id=project_id,
-        status=GroupStatus.UNRESOLVED,
-        substatus=GroupSubStatus.NEW,
-        first_seen__lte=datetime.fromtimestamp(first_seen_lte, pytz.UTC),
-    )
-
-    if first_seen_gte:
-        queryset = queryset.filter(first_seen__gte=datetime.fromtimestamp(first_seen_gte, pytz.UTC))
-
-    new_groups = list(queryset.order_by("first_seen")[:chunk_size])
-
-    bulk_transition_group_to_ongoing(
-        GroupStatus.UNRESOLVED,
-        GroupSubStatus.NEW,
-        new_groups,
-        activity_data={"after_days": TRANSITION_AFTER_DAYS},
-    )
-
-    if len(new_groups) == chunk_size:
-        auto_transition_issues_new_to_ongoing.delay(
-            project_id=project_id,
-            first_seen_lte=first_seen_lte,
-            first_seen_gte=new_groups[chunk_size - 1].first_seen.timestamp(),
-            chunk_size=chunk_size,
-            expires=datetime.now(tz=pytz.UTC) + timedelta(hours=1),
+    # TODO(nisanthan): Remove this conditional in next PR
+    if project_id is not None:
+        project_ids = [project_id]
+
+    for new_groups in chunked(
+        RangeQuerySetWrapper(
+            Group.objects.filter(
+                project_id__in=project_ids,
+                status=GroupStatus.UNRESOLVED,
+                substatus=GroupSubStatus.NEW,
+                first_seen__lte=datetime.fromtimestamp(first_seen_lte, pytz.UTC),
+            ),
+            step=ITERATOR_CHUNK,
+        ),
+        ITERATOR_CHUNK,
+    ):
+        bulk_transition_group_to_ongoing(
+            GroupStatus.UNRESOLVED,
+            GroupSubStatus.NEW,
+            new_groups,
+            activity_data={"after_days": TRANSITION_AFTER_DAYS},
         )
 
 
@@ -139,19 +139,23 @@ def auto_transition_issues_new_to_ongoing(
 @monitor(monitor_slug="schedule_auto_transition_regressed")
 @skip_if_queue_has_items
 def schedule_auto_transition_regressed() -> None:
+
     now = datetime.now(tz=pytz.UTC)
-    three_days_past = now - timedelta(days=TRANSITION_AFTER_DAYS)
+    seven_days_ago = now - timedelta(days=TRANSITION_AFTER_DAYS)
 
     for org in RangeQuerySetWrapper(Organization.objects.filter(status=OrganizationStatus.ACTIVE)):
         if features.has("organizations:escalating-issues", org):
-            for project_id in Project.objects.filter(organization_id=org.id).values_list(
-                "id", flat=True
-            ):
-                auto_transition_issues_regressed_to_ongoing.delay(
-                    project_id=project_id,
-                    date_added_lte=int(three_days_past.timestamp()),
-                    expires=now + timedelta(hours=1),
-                )
+            project_ids = list(
+                Project.objects.filter(
+                    organization_id=org.id, status=ObjectStatus.ACTIVE
+                ).values_list("id", flat=True)
+            )
+
+            auto_transition_issues_regressed_to_ongoing.delay(
+                project_ids=project_ids,
+                date_added_lte=int(seven_days_ago.timestamp()),
+                expires=now + timedelta(hours=1),
+            )
 
 
 @instrumented_task(
@@ -166,44 +170,34 @@ def schedule_auto_transition_regressed() -> None:
 @retry(on=(OperationalError,))
 @skip_if_queue_has_items
 def auto_transition_issues_regressed_to_ongoing(
-    project_id: int,
+    project_ids: List[int],
     date_added_lte: int,
-    date_added_gte: Optional[int] = None,
-    chunk_size: int = 1000,
+    project_id: Optional[int] = None,  # TODO(nisanthan): Remove this arg in next PR
     **kwargs,
 ) -> None:
-    queryset = (
-        Group.objects.filter(
-            project_id=project_id,
-            status=GroupStatus.UNRESOLVED,
-            substatus=GroupSubStatus.REGRESSED,
-            grouphistory__status=GroupHistoryStatus.REGRESSED,
-        )
-        .annotate(recent_regressed_history=Max("grouphistory__date_added"))
-        .filter(recent_regressed_history__lte=datetime.fromtimestamp(date_added_lte, pytz.UTC))
-    )
-
-    if date_added_gte:
-        queryset = queryset.filter(
-            recent_regressed_history__gte=datetime.fromtimestamp(date_added_gte, pytz.UTC)
-        )
 
-    groups_with_regressed_history = list(queryset.order_by("recent_regressed_history")[:chunk_size])
-
-    bulk_transition_group_to_ongoing(
-        GroupStatus.UNRESOLVED,
-        GroupSubStatus.REGRESSED,
-        groups_with_regressed_history,
-        activity_data={"after_days": TRANSITION_AFTER_DAYS},
-    )
-
-    if len(groups_with_regressed_history) == chunk_size:
-        auto_transition_issues_regressed_to_ongoing.delay(
-            project_id=project_id,
-            date_added_lte=date_added_lte,
-            date_added_gte=groups_with_regressed_history[
-                chunk_size - 1
-            ].recent_regressed_history.timestamp(),
-            chunk_size=chunk_size,
-            expires=datetime.now(tz=pytz.UTC) + timedelta(hours=1),
+    # TODO(nisanthan): Remove this conditional in next PR
+    if project_id is not None:
+        project_ids = [project_id]
+
+    for groups_with_regressed_history in chunked(
+        RangeQuerySetWrapper(
+            Group.objects.filter(
+                project_id__in=project_ids,
+                status=GroupStatus.UNRESOLVED,
+                substatus=GroupSubStatus.REGRESSED,
+                grouphistory__status=GroupHistoryStatus.REGRESSED,
+            )
+            .annotate(recent_regressed_history=Max("grouphistory__date_added"))
+            .filter(recent_regressed_history__lte=datetime.fromtimestamp(date_added_lte, pytz.UTC)),
+            step=ITERATOR_CHUNK,
+        ),
+        ITERATOR_CHUNK,
+    ):
+
+        bulk_transition_group_to_ongoing(
+            GroupStatus.UNRESOLVED,
+            GroupSubStatus.REGRESSED,
+            groups_with_regressed_history,
+            activity_data={"after_days": TRANSITION_AFTER_DAYS},
         )

+ 4 - 6
tests/sentry/tasks/test_auto_ongoing_issues.py

@@ -151,8 +151,8 @@ class ScheduleAutoNewOngoingIssuesTest(TestCase):
     @mock.patch("sentry.tasks.auto_ongoing_issues.backend")
     @mock.patch(
         "sentry.tasks.auto_ongoing_issues.auto_transition_issues_new_to_ongoing.delay",
-        wraps=lambda project_id, first_seen_lte, **kwargs: auto_transition_issues_new_to_ongoing(
-            project_id, first_seen_lte, chunk_size=10, kwargs=kwargs
+        wraps=lambda project_ids, first_seen_lte, **kwargs: auto_transition_issues_new_to_ongoing(
+            project_ids, first_seen_lte, kwargs=kwargs
         ),
     )
     def test_paginated_transition(self, mocked, mock_backend):
@@ -180,10 +180,8 @@ class ScheduleAutoNewOngoingIssuesTest(TestCase):
         with self.tasks():
             schedule_auto_transition_new()
 
-        # 1st time handles a full page
-        # 2nd time to handle 2 remaining
-        # 3rd time if for Groups with project_id=1 which shouldn't have any groups to transition
-        assert mocked.call_count == 3
+        # Should create a new task for each project
+        assert mocked.call_count == 2
 
         # after
         assert (