Browse Source

feat(escalating-issues): Use bulk updates/creates (#52059)

## Objective:
Bulk update Groups to reduce load in Postgres

---------

Co-authored-by: Katie Byers <lobsterkatie@gmail.com>
NisanthanNanthakumar 1 year ago
parent
commit
f75f7bce21

+ 7 - 3
src/sentry/conf/server.py

@@ -764,6 +764,10 @@ CELERY_QUEUES_CONTROL = [
     ),
 ]
 
+CELERY_ISSUE_STATES_QUEUE = Queue(
+    "auto_transition_issue_states", routing_key="auto_transition_issue_states"
+)
+
 CELERY_QUEUES_REGION = [
     Queue("activity.notify", routing_key="activity.notify"),
     Queue("alerts", routing_key="alerts"),
@@ -852,8 +856,8 @@ CELERY_QUEUES_REGION = [
     Queue("transactions.name_clusterer", routing_key="transactions.name_clusterer"),
     Queue("auto_enable_codecov", routing_key="auto_enable_codecov"),
     Queue("weekly_escalating_forecast", routing_key="weekly_escalating_forecast"),
-    Queue("auto_transition_issue_states", routing_key="auto_transition_issue_states"),
     Queue("recap_servers", routing_key="recap_servers"),
+    CELERY_ISSUE_STATES_QUEUE,
 ]
 
 from celery.schedules import crontab
@@ -980,7 +984,7 @@ CELERYBEAT_SCHEDULE_REGION = {
     "schedule-auto-resolution": {
         "task": "sentry.tasks.schedule_auto_resolution",
         # Run every 15 minutes
-        "schedule": crontab(minute="*/15"),
+        "schedule": crontab(minute="*/10"),
         "options": {"expires": 60 * 25},
     },
     "auto-remove-inbox": {
@@ -1081,7 +1085,7 @@ CELERYBEAT_SCHEDULE_REGION = {
     "schedule_auto_transition_new": {
         "task": "sentry.tasks.schedule_auto_transition_new",
         # Run job every 6 hours
-        "schedule": crontab(minute=0, hour="*/6"),
+        "schedule": crontab(minute="*/10"),
         "options": {"expires": 3600},
     },
     "schedule_auto_transition_regressed": {

+ 22 - 24
src/sentry/issues/ongoing.py

@@ -1,44 +1,42 @@
-from typing import Any, Mapping, Optional
+from typing import Any, List, Mapping, Optional
 
 from django.db.models.signals import post_save
 
-from sentry.models import (
-    Activity,
-    Group,
-    GroupStatus,
-    record_group_history_from_activity_type,
-    remove_group_from_inbox,
-)
+from sentry.models import Group, GroupStatus, bulk_remove_groups_from_inbox
 from sentry.types.activity import ActivityType
 from sentry.types.group import GroupSubStatus
 
 
-def transition_group_to_ongoing(
+def bulk_transition_group_to_ongoing(
     from_status: GroupStatus,
     from_substatus: GroupSubStatus,
-    group: Group,
+    groups: List[Group],
     activity_data: Optional[Mapping[str, Any]] = None,
 ) -> None:
     # make sure we don't update the Group when its already updated by conditionally updating the Group
-    updated = Group.objects.filter(
-        id=group.id, status=from_status, substatus=from_substatus
-    ).update(status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.ONGOING)
-    if updated:
+    groups_to_transistion = Group.objects.filter(
+        id__in=[group.id for group in groups], status=from_status, substatus=from_substatus
+    )
+
+    Group.objects.update_group_status(
+        groups=groups_to_transistion,
+        status=GroupStatus.UNRESOLVED,  # type: ignore
+        substatus=GroupSubStatus.ONGOING,  # type: ignore
+        activity_type=ActivityType.AUTO_SET_ONGOING,
+        activity_data=activity_data,
+        send_activity_notification=False,
+    )
+
+    for group in groups_to_transistion:
         group.status = GroupStatus.UNRESOLVED
         group.substatus = GroupSubStatus.ONGOING
+
+    bulk_remove_groups_from_inbox(groups)
+
+    for group in groups_to_transistion:
         post_save.send_robust(
             sender=Group,
             instance=group,
             created=False,
             update_fields=["status", "substatus"],
         )
-
-        remove_group_from_inbox(group)
-
-        Activity.objects.create_group_activity(
-            group, ActivityType.AUTO_SET_ONGOING, data=activity_data, send_notification=False
-        )
-
-        record_group_history_from_activity_type(
-            group, activity_type=ActivityType.AUTO_SET_ONGOING.value, actor=None
-        )

+ 3 - 3
src/sentry/issues/update_inbox.py

@@ -3,7 +3,7 @@ from __future__ import annotations
 from typing import Any, Dict, List
 
 from sentry import features
-from sentry.issues.ongoing import transition_group_to_ongoing
+from sentry.issues.ongoing import bulk_transition_group_to_ongoing
 from sentry.models import Group, GroupStatus, Project, User
 from sentry.models.groupinbox import (
     GroupInboxReason,
@@ -51,10 +51,10 @@ def update_inbox(
                 and group.substatus != GroupSubStatus.ONGOING
                 and group.status == GroupStatus.UNRESOLVED
             ):
-                transition_group_to_ongoing(
+                bulk_transition_group_to_ongoing(
                     group.status,
                     group.substatus,
-                    group,
+                    [group],
                     activity_data={"manually": True},
                 )
 

+ 22 - 9
src/sentry/models/group.py

@@ -9,7 +9,7 @@ from datetime import timedelta
 from enum import Enum
 from functools import reduce
 from operator import or_
-from typing import TYPE_CHECKING, Mapping, Optional, Sequence
+from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence
 
 from django.db import models
 from django.db.models import Q, QuerySet
@@ -398,19 +398,32 @@ class GroupManager(BaseManager):
         status: GroupStatus,
         substatus: GroupSubStatus | None,
         activity_type: ActivityType,
+        activity_data: Optional[Mapping[str, Any]] = None,
+        send_activity_notification: bool = True,
     ) -> None:
         """For each groups, update status to `status` and create an Activity."""
         from sentry.models import Activity
 
-        updated_count = (
-            self.filter(id__in=[g.id for g in groups])
-            .exclude(status=status)
-            .update(status=status, substatus=substatus)
+        to_be_updated = self.filter(id__in=[g.id for g in groups]).exclude(
+            status=status, substatus=substatus
         )
-        if updated_count:
-            for group in groups:
-                Activity.objects.create_group_activity(group, activity_type)
-                record_group_history_from_activity_type(group, activity_type.value)
+
+        for group in to_be_updated:
+            group.status = status
+            group.substatus = substatus
+
+        self.bulk_update(to_be_updated, ["status", "substatus"])
+
+        for group in to_be_updated:
+            group.status = status
+            group.substatus = substatus
+            Activity.objects.create_group_activity(
+                group,
+                activity_type,
+                data=activity_data,
+                send_notification=send_activity_notification,
+            )
+            record_group_history_from_activity_type(group, activity_type.value)
 
     def from_share_id(self, share_id: str) -> Group:
         if not share_id or len(share_id) != 32:

+ 40 - 1
src/sentry/models/grouphistory.py

@@ -1,4 +1,4 @@
-from typing import TYPE_CHECKING, Optional, Union
+from typing import TYPE_CHECKING, List, Optional, Union
 
 from django.db import models
 from django.db.models import SET_NULL, Q
@@ -277,3 +277,42 @@ def record_group_history(
         prev_history=prev_history,
         prev_history_date=prev_history.date_added if prev_history else None,
     )
+
+
+def bulk_record_group_history(
+    groups: List["Group"],
+    status: int,
+    actor: Optional[Union["User", "RpcUser", "Team"]] = None,
+    release: Optional["Release"] = None,
+):
+    from sentry.models import Team, User
+    from sentry.services.hybrid_cloud.user import RpcUser
+
+    def get_prev_history_date(group, status):
+        prev_history = get_prev_history(group, status)
+        return prev_history.date_added if prev_history else None
+
+    actor_id = None
+    if actor:
+        if isinstance(actor, RpcUser) or isinstance(actor, User):
+            actor_id = get_actor_id_for_user(actor)
+        elif isinstance(actor, Team):
+            actor_id = actor.actor_id
+        else:
+            raise ValueError("record_group_history actor argument must be RPCUser or Team")
+
+    return GroupHistory.objects.bulk_create(
+        [
+            GroupHistory(
+                organization=group.project.organization,
+                group=group,
+                project=group.project,
+                release=release,
+                actor_id=actor_id,
+                status=status,
+                prev_history=get_prev_history(group, status),
+                prev_history_date=get_prev_history_date(group, status),
+            )
+            for group in groups
+        ]
+    )

+ 28 - 1
src/sentry/models/groupinbox.py

@@ -7,7 +7,11 @@ from django.utils import timezone
 
 from sentry.db.models import FlexibleForeignKey, JSONField, Model, region_silo_only_model
 from sentry.models import Activity
-from sentry.models.grouphistory import GroupHistoryStatus, record_group_history
+from sentry.models.grouphistory import (
+    GroupHistoryStatus,
+    bulk_record_group_history,
+    record_group_history,
+)
 from sentry.types.activity import ActivityType
 
 INBOX_REASON_DETAILS = {
@@ -104,6 +108,29 @@ def remove_group_from_inbox(group, action=None, user=None, referrer=None):
         pass
 
 
+def bulk_remove_groups_from_inbox(groups, action=None, user=None, referrer=None):
+    try:
+        group_inbox = GroupInbox.objects.filter(group__in=groups)
+        group_inbox.delete()
+
+        if action is GroupInboxRemoveAction.MARK_REVIEWED and user is not None:
+            Activity.objects.bulk_create(
+                [
+                    Activity(
+                        project_id=group_inbox_item.group.project_id,
+                        group_id=group_inbox_item.group.id,
+                        type=ActivityType.MARK_REVIEWED.value,
+                        user_id=user.id,
+                    )
+                    for group_inbox_item in group_inbox
+                ]
+            )
+
+            bulk_record_group_history(groups, GroupHistoryStatus.REVIEWED, actor=user)
+    except GroupInbox.DoesNotExist:
+        pass
+
+
 def get_inbox_details(group_list):
     group_ids = [g.id for g in group_list]
     group_inboxes = GroupInbox.objects.filter(group__in=group_ids)

+ 3 - 0
src/sentry/tasks/auto_archive_issues.py

@@ -19,6 +19,7 @@ from sentry.models import (
     record_group_history_from_activity_type,
     remove_group_from_inbox,
 )
+from sentry.tasks.auto_ongoing_issues import skip_if_queue_has_items
 from sentry.tasks.base import instrumented_task, retry
 from sentry.types.activity import ActivityType
 from sentry.types.group import GroupSubStatus
@@ -37,6 +38,7 @@ ITERATOR_CHUNK = 10_000
 )
 @retry
 @monitor(monitor_slug="auto-archive-job-monitor")
+@skip_if_queue_has_items
 def run_auto_archive() -> None:
     """
     Automatically transition issues that are ongoing for 14 days to archived until escalating.
@@ -64,6 +66,7 @@ def run_auto_archive() -> None:
     soft_time_limit=20 * 60,
 )
 @retry
+@skip_if_queue_has_items
 def run_auto_archive_for_project(project_ids: List[int]) -> None:
     now = datetime.now(tz=pytz.UTC)
     fourteen_days_ago = now - timedelta(days=14)

+ 49 - 15
src/sentry/tasks/auto_ongoing_issues.py

@@ -1,4 +1,6 @@
+import logging
 from datetime import datetime, timedelta
+from functools import wraps
 from typing import Optional
 
 import pytz
@@ -7,7 +9,8 @@ from django.db.models import Max
 from sentry_sdk.crons.decorator import monitor
 
 from sentry import features
-from sentry.issues.ongoing import transition_group_to_ongoing
+from sentry.conf.server import CELERY_ISSUE_STATES_QUEUE
+from sentry.issues.ongoing import bulk_transition_group_to_ongoing
 from sentry.models import (
     Group,
     GroupHistoryStatus,
@@ -16,13 +19,41 @@ from sentry.models import (
     OrganizationStatus,
     Project,
 )
+from sentry.monitoring.queues import backend
 from sentry.tasks.base import instrumented_task, retry
 from sentry.types.group import GroupSubStatus
 from sentry.utils.query import RangeQuerySetWrapper
 
+logger = logging.getLogger(__name__)
+
 TRANSITION_AFTER_DAYS = 7
 
 
+def skip_if_queue_has_items(func):
+    """
+    Prevent adding more tasks in queue if the queue is not empty.
+    We want to prevent crons from scheduling more tasks than the workers
+    are capable of processing before the next cycle.
+    """
+
+    def inner(func):
+        @wraps(func)
+        def wrapped(*args, **kwargs):
+            queue_size = backend.get_size(CELERY_ISSUE_STATES_QUEUE.name)
+            if queue_size > 0:
+                logger.exception(
+                    f"{CELERY_ISSUE_STATES_QUEUE.name} queue size greater than 0.",
+                    extra={"size": queue_size, "task": func.__name__},
+                )
+                return
+
+            func(*args, **kwargs)
+
+        return wrapped
+
+    return inner(func)
+
+
 @instrumented_task(
     name="sentry.tasks.schedule_auto_transition_new",
     queue="auto_transition_issue_states",
@@ -32,7 +63,9 @@ TRANSITION_AFTER_DAYS = 7
 )
 @retry(on=(OperationalError,))
 @monitor(monitor_slug="schedule_auto_transition_new")
+@skip_if_queue_has_items
 def schedule_auto_transition_new() -> None:
+
     now = datetime.now(tz=pytz.UTC)
     three_days_past = now - timedelta(days=TRANSITION_AFTER_DAYS)
 
@@ -58,6 +91,7 @@ def schedule_auto_transition_new() -> None:
     acks_late=True,
 )
 @retry(on=(OperationalError,))
+@skip_if_queue_has_items
 def auto_transition_issues_new_to_ongoing(
     project_id: int,
     first_seen_lte: int,
@@ -77,13 +111,12 @@ def auto_transition_issues_new_to_ongoing(
 
     new_groups = list(queryset.order_by("first_seen")[:chunk_size])
 
-    for group in new_groups:
-        transition_group_to_ongoing(
-            GroupStatus.UNRESOLVED,
-            GroupSubStatus.NEW,
-            group,
-            activity_data={"after_days": TRANSITION_AFTER_DAYS},
-        )
+    bulk_transition_group_to_ongoing(
+        GroupStatus.UNRESOLVED,
+        GroupSubStatus.NEW,
+        new_groups,
+        activity_data={"after_days": TRANSITION_AFTER_DAYS},
+    )
 
     if len(new_groups) == chunk_size:
         auto_transition_issues_new_to_ongoing.delay(
@@ -104,6 +137,7 @@ def auto_transition_issues_new_to_ongoing(
 )
 @retry(on=(OperationalError,))
 @monitor(monitor_slug="schedule_auto_transition_regressed")
+@skip_if_queue_has_items
 def schedule_auto_transition_regressed() -> None:
     now = datetime.now(tz=pytz.UTC)
     three_days_past = now - timedelta(days=TRANSITION_AFTER_DAYS)
@@ -130,6 +164,7 @@ def schedule_auto_transition_regressed() -> None:
     acks_late=True,
 )
 @retry(on=(OperationalError,))
+@skip_if_queue_has_items
 def auto_transition_issues_regressed_to_ongoing(
     project_id: int,
     date_added_lte: int,
@@ -155,13 +190,12 @@ def auto_transition_issues_regressed_to_ongoing(
 
     groups_with_regressed_history = list(queryset.order_by("recent_regressed_history")[:chunk_size])
 
-    for group in groups_with_regressed_history:
-        transition_group_to_ongoing(
-            GroupStatus.UNRESOLVED,
-            GroupSubStatus.REGRESSED,
-            group,
-            activity_data={"after_days": TRANSITION_AFTER_DAYS},
-        )
+    bulk_transition_group_to_ongoing(
+        GroupStatus.UNRESOLVED,
+        GroupSubStatus.REGRESSED,
+        groups_with_regressed_history,
+        activity_data={"after_days": TRANSITION_AFTER_DAYS},
+    )
 
     if len(groups_with_regressed_history) == chunk_size:
         auto_transition_issues_regressed_to_ongoing.delay(

+ 3 - 0
src/sentry/tasks/auto_resolve_issues.py

@@ -14,6 +14,7 @@ from sentry.models import (
     remove_group_from_inbox,
 )
 from sentry.models.grouphistory import GroupHistoryStatus, record_group_history
+from sentry.tasks.auto_ongoing_issues import skip_if_queue_has_items
 from sentry.tasks.base import instrumented_task
 from sentry.tasks.integrations import kick_off_status_syncs
 from sentry.types.activity import ActivityType
@@ -27,6 +28,7 @@ ONE_HOUR = 3600
     time_limit=75,
     soft_time_limit=60,
 )
+@skip_if_queue_has_items
 def schedule_auto_resolution():
     options = ProjectOption.objects.filter(
         key__in=["sentry:resolve_age", "sentry:_last_auto_resolve"]
@@ -56,6 +58,7 @@ def schedule_auto_resolution():
     time_limit=75,
     soft_time_limit=60,
 )
+@skip_if_queue_has_items
 def auto_resolve_project_issues(project_id, cutoff=None, chunk_size=1000, **kwargs):
     project = Project.objects.get_from_cache(id=project_id)
 

+ 3 - 3
tests/sentry/issues/test_ongoing.py

@@ -1,4 +1,4 @@
-from sentry.issues.ongoing import transition_group_to_ongoing
+from sentry.issues.ongoing import bulk_transition_group_to_ongoing
 from sentry.models import Activity, GroupHistory, GroupHistoryStatus, GroupStatus
 from sentry.testutils import TestCase
 from sentry.types.activity import ActivityType
@@ -9,7 +9,7 @@ class TransitionNewToOngoingTest(TestCase):
     def test_new_to_ongoing(self) -> None:
         group = self.create_group(status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.NEW)
 
-        transition_group_to_ongoing(GroupStatus.UNRESOLVED, GroupSubStatus.NEW, group)
+        bulk_transition_group_to_ongoing(GroupStatus.UNRESOLVED, GroupSubStatus.NEW, [group])
         assert Activity.objects.filter(
             group=group, type=ActivityType.AUTO_SET_ONGOING.value
         ).exists()
@@ -20,7 +20,7 @@ class TransitionNewToOngoingTest(TestCase):
     def test_regressed_to_ongoing(self) -> None:
         group = self.create_group(status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.REGRESSED)
 
-        transition_group_to_ongoing(GroupStatus.UNRESOLVED, GroupSubStatus.REGRESSED, group)
+        bulk_transition_group_to_ongoing(GroupStatus.UNRESOLVED, GroupSubStatus.REGRESSED, [group])
         assert Activity.objects.filter(
             group=group, type=ActivityType.AUTO_SET_ONGOING.value
         ).exists()

Some files were not shown because too many files changed in this diff