Browse Source

feat(crons): Move mark_timeout to parallel task (#56996)

Changes `mark_timeout` to run in parallel. Datadog will need to be
updated after merge with timing graphs.
Richard Ortenberg 1 year ago
parent
commit
0c25b84c6e
3 changed files with 42 additions and 131 deletions
  1. 2 30
      src/sentry/monitors/tasks.py
  2. 1 0
      src/sentry/utils/sdk.py
  3. 39 101
      tests/sentry/monitors/test_tasks.py

+ 2 - 30
src/sentry/monitors/tasks.py

@@ -246,41 +246,13 @@ def check_timeout(current_datetime: datetime):
     metrics.gauge("sentry.monitors.tasks.check_timeout.count", qs.count(), sample_rate=1)
     # check for any monitors which are still running and have exceeded their maximum runtime
     for checkin in qs:
-        try:
-            monitor_environment = checkin.monitor_environment
-            logger.info(
-                "monitor_environment.checkin-timeout",
-                extra={"monitor_environment_id": monitor_environment.id, "checkin_id": checkin.id},
-            )
-            affected = checkin.update(status=CheckInStatus.TIMEOUT)
-            if not affected:
-                continue
-
-            # we only mark the monitor as failed if a newer checkin wasn't responsible for the state
-            # change
-            has_newer_result = MonitorCheckIn.objects.filter(
-                monitor_environment=monitor_environment,
-                date_added__gt=checkin.date_added,
-                status__in=[CheckInStatus.OK, CheckInStatus.ERROR],
-            ).exists()
-            if not has_newer_result:
-                # TODO(epurkhiser): We also need a timestamp here, but not sure
-                # what we want it to be
-                mark_failed(checkin, ts=None)
-        except Exception:
-            logger.exception("Exception in check_monitors - mark timeout", exc_info=True)
-
-    # safety check for check-ins stuck in the backlog
-    backlog_count = MonitorCheckIn.objects.filter(
-        status=CheckInStatus.IN_PROGRESS, timeout_at__isnull=True
-    ).count()
-    if backlog_count:
-        logger.exception(f"Exception in check_monitors - backlog count {backlog_count} is > 0")
+        mark_checkin_timeout.delay(checkin.id)
 
 
 @instrumented_task(
     name="sentry.monitors.tasks.mark_checkin_timeout",
     max_retries=0,
+    record_timing=True,
 )
 def mark_checkin_timeout(checkin_id: int):
     logger.info("checkin.timeout", extra={"checkin_id": checkin_id})

+ 1 - 0
src/sentry/utils/sdk.py

@@ -130,6 +130,7 @@ SAMPLED_TASKS = {
     "sentry.monitors.tasks.check_missing": 1.0,
     "sentry.monitors.tasks.mark_environment_missing": 0.05,
     "sentry.monitors.tasks.check_timeout": 1.0,
+    "sentry.monitors.tasks.mark_checkin_timeout": 0.05,
     "sentry.monitors.tasks.clock_pulse": 1.0,
     "sentry.tasks.auto_enable_codecov": settings.SAMPLED_DEFAULT_RATE,
     "sentry.dynamic_sampling.tasks.boost_low_volume_projects": 0.2,

+ 39 - 101
tests/sentry/monitors/test_tasks.py

@@ -21,6 +21,7 @@ from sentry.monitors.tasks import (
     check_missing,
     check_timeout,
     clock_pulse,
+    mark_checkin_timeout,
     mark_environment_missing,
     try_monitor_tasks_trigger,
 )
@@ -42,7 +43,7 @@ def make_ref_time():
     # down to the minute.
     #
     # NOTE: We also remove the timezone info from the task run timestamp, since
-    # it recieves a date time object from the kafka producer. This helps test
+    # it receives a date time object from the kafka producer. This helps test
     # for bad timezone
     task_run_ts = ts.replace(second=12, microsecond=0, tzinfo=None)
 
@@ -214,7 +215,7 @@ class MonitorTaskCheckMissingTest(TestCase):
             config={"schedule": "* * * * *"},
             status=state,
         )
-        # Exepcted checkin was a full minute ago, if this monitor wasn't in the
+        # Expected checkin was a full minute ago, if this monitor wasn't in the
         # `state` the monitor would usually end up marked as timed out
         monitor_environment = MonitorEnvironment.objects.create(
             monitor=monitor,
@@ -357,8 +358,9 @@ class MonitorTaskCheckMissingTest(TestCase):
         ).exists()
 
 
-class MonitorTaskCheckTimemoutTest(TestCase):
-    def test_timeout_with_no_future_complete_checkin(self):
+class MonitorTaskCheckTimeoutTest(TestCase):
+    @mock.patch("sentry.monitors.tasks.mark_checkin_timeout")
+    def test_timeout_with_no_future_complete_checkin(self, mark_checkin_timeout_mock):
         org = self.create_organization()
         project = self.create_project(organization=org)
 
@@ -372,7 +374,7 @@ class MonitorTaskCheckTimemoutTest(TestCase):
             type=MonitorType.CRON_JOB,
             config={"schedule": "0 0 * * *"},
         )
-        # Next checkin should should have been 24 hours ago
+        # Next checkin should have been 24 hours ago
         monitor_environment = MonitorEnvironment.objects.create(
             monitor=monitor,
             environment=self.environment,
@@ -408,22 +410,30 @@ class MonitorTaskCheckTimemoutTest(TestCase):
         # the second checkin is not yet timed out.
         check_timeout(task_run_ts)
 
+        # assert that task is called for the specific checkin
+        assert mark_checkin_timeout_mock.delay.call_count == 1
+        assert mark_checkin_timeout_mock.delay.mock_calls[0] == mock.call(checkin1.id)
+
+        mark_checkin_timeout(checkin1.id)
+
         # First checkin is marked as timed out
         assert MonitorCheckIn.objects.filter(id=checkin1.id, status=CheckInStatus.TIMEOUT).exists()
 
-        # Second checkin is marked as timed out
+        # Second checkin is not marked as timed out
         assert MonitorCheckIn.objects.filter(
             id=checkin2.id, status=CheckInStatus.IN_PROGRESS
         ).exists()
 
         # XXX(epurkhiser): At the moment we mark the monitor with the MOST
-        # RECENT updated checkins status. In this scenario we actually already
-        # have checkin2 in progress, but because we just marked
+        # RECENT updated checkin's status. In this scenario we actually already
+        # have checkin2 in progress, but because we just marked checkin1
+        # as timed out it is not updated
         assert MonitorEnvironment.objects.filter(
             id=monitor_environment.id, status=MonitorStatus.TIMEOUT
         ).exists()
 
-    def test_timeout_with_future_complete_checkin(self):
+    @mock.patch("sentry.monitors.tasks.mark_checkin_timeout")
+    def test_timeout_with_future_complete_checkin(self, mark_checkin_timeout_mock):
         org = self.create_organization()
         project = self.create_project(organization=org)
 
@@ -472,6 +482,12 @@ class MonitorTaskCheckTimemoutTest(TestCase):
         # second checkin was already marked as OK.
         check_timeout(task_run_ts)
 
+        # assert that task is called for the specific checkin
+        assert mark_checkin_timeout_mock.delay.call_count == 1
+        assert mark_checkin_timeout_mock.delay.mock_calls[0] == mock.call(checkin1.id)
+
+        mark_checkin_timeout(checkin1.id)
+
         # The first checkin is marked as timed out
         assert MonitorCheckIn.objects.filter(id=checkin1.id, status=CheckInStatus.TIMEOUT).exists()
         # The second checkin has not changed status
@@ -482,7 +498,8 @@ class MonitorTaskCheckTimemoutTest(TestCase):
             id=monitor_environment.id, status=MonitorStatus.OK
         ).exists()
 
-    def test_timeout_via_max_runtime_configuration(self):
+    @mock.patch("sentry.monitors.tasks.mark_checkin_timeout")
+    def test_timeout_via_max_runtime_configuration(self, mark_checkin_timeout_mock):
         org = self.create_organization()
         project = self.create_project(organization=org)
 
@@ -517,103 +534,24 @@ class MonitorTaskCheckTimemoutTest(TestCase):
 
         # Running the check_monitors at 35 minutes does not mark the check-in as timed out, it's still allowed to be running
         check_timeout(task_run_ts + timedelta(minutes=35))
+
+        # assert that task is not called for the specific checkin
+        assert mark_checkin_timeout_mock.delay.call_count == 0
+
         assert MonitorCheckIn.objects.filter(
             id=checkin.id, status=CheckInStatus.IN_PROGRESS
         ).exists()
 
         # After 60 minutes the checkin will be marked as timed out
         check_timeout(task_run_ts + timedelta(minutes=60))
-        assert MonitorCheckIn.objects.filter(id=checkin.id, status=CheckInStatus.TIMEOUT).exists()
-
-        assert MonitorEnvironment.objects.filter(
-            id=monitor_environment.id, status=MonitorStatus.TIMEOUT
-        ).exists()
 
-    @mock.patch("sentry.monitors.tasks.logger")
-    def test_timeout_exception_handling(self, logger):
-        org = self.create_organization()
-        project = self.create_project(organization=org)
-
-        task_run_ts, ts = make_ref_time()
-        check_in_24hr_ago = ts - timedelta(hours=24)
+        # assert that task is called for the specific checkin
+        assert mark_checkin_timeout_mock.delay.call_count == 1
+        assert mark_checkin_timeout_mock.delay.mock_calls[0] == mock.call(checkin.id)
 
-        # This monitor will cause failure
-        exception_monitor = Monitor.objects.create(
-            organization_id=org.id,
-            project_id=project.id,
-            type=MonitorType.CRON_JOB,
-            config={
-                "schedule_type": ScheduleType.INTERVAL,
-                # XXX: Note the invalid schedule will cause an exception,
-                # typically the validator protects us against this
-                "schedule": [-2, "minute"],
-            },
-        )
-        exception_monitor_environment = MonitorEnvironment.objects.create(
-            monitor=exception_monitor,
-            environment=self.environment,
-            last_checkin=ts,
-            next_checkin=ts + timedelta(hours=24),
-            next_checkin_latest=ts + timedelta(hours=24, minutes=1),
-            status=MonitorStatus.OK,
-        )
-        MonitorCheckIn.objects.create(
-            monitor=exception_monitor,
-            monitor_environment=exception_monitor_environment,
-            project_id=project.id,
-            status=CheckInStatus.IN_PROGRESS,
-            date_added=check_in_24hr_ago,
-            date_updated=check_in_24hr_ago,
-            timeout_at=check_in_24hr_ago + timedelta(minutes=30),
-        )
+        mark_checkin_timeout(checkin.id)
 
-        # This monitor will be fine
-        monitor = Monitor.objects.create(
-            organization_id=org.id,
-            project_id=project.id,
-            type=MonitorType.CRON_JOB,
-            config={"schedule": "0 0 * * *"},
-            date_added=check_in_24hr_ago,
-        )
-        monitor_environment = MonitorEnvironment.objects.create(
-            monitor=monitor,
-            environment=self.environment,
-            last_checkin=ts,
-            next_checkin=ts + timedelta(hours=24),
-            next_checkin_latest=ts + timedelta(hours=24, minutes=1),
-            status=MonitorStatus.OK,
-        )
-        checkin1 = MonitorCheckIn.objects.create(
-            monitor=monitor,
-            monitor_environment=monitor_environment,
-            project_id=project.id,
-            status=CheckInStatus.IN_PROGRESS,
-            date_added=check_in_24hr_ago,
-            date_updated=check_in_24hr_ago,
-            timeout_at=check_in_24hr_ago + timedelta(minutes=30),
-        )
-        checkin2 = MonitorCheckIn.objects.create(
-            monitor=monitor,
-            monitor_environment=monitor_environment,
-            project_id=project.id,
-            status=CheckInStatus.IN_PROGRESS,
-            date_added=ts,
-            date_updated=ts,
-            timeout_at=ts + timedelta(minutes=30),
-        )
-
-        assert checkin1.date_added == checkin1.date_updated == check_in_24hr_ago
-
-        check_timeout(task_run_ts)
-
-        # Logged the exception
-        assert logger.exception.call_count == 1
-
-        assert MonitorCheckIn.objects.filter(id=checkin1.id, status=CheckInStatus.TIMEOUT).exists()
-
-        assert MonitorCheckIn.objects.filter(
-            id=checkin2.id, status=CheckInStatus.IN_PROGRESS
-        ).exists()
+        assert MonitorCheckIn.objects.filter(id=checkin.id, status=CheckInStatus.TIMEOUT).exists()
 
         assert MonitorEnvironment.objects.filter(
             id=monitor_environment.id, status=MonitorStatus.TIMEOUT
@@ -658,7 +596,7 @@ def test_monitor_task_trigger(dispatch_tasks):
     try_monitor_tasks_trigger(ts=now + timedelta(minutes=1))
     assert dispatch_tasks.call_count == 2
 
-    # A skipped minute trigges the task AND captures an error
+    # A skipped minute triggers the task AND captures an error
     with mock.patch("sentry_sdk.capture_message") as capture_message:
         assert capture_message.call_count == 0
         try_monitor_tasks_trigger(ts=now + timedelta(minutes=3, seconds=5))
@@ -675,7 +613,7 @@ def test_monitor_task_trigger_partition_desync(dispatch_tasks):
     """
     now = datetime.now().replace(second=0, microsecond=0)
 
-    # First message with timestamp just after the minute bounardary
+    # First message with timestamp just after the minute boundary
     # triggers the task
     try_monitor_tasks_trigger(ts=now + timedelta(seconds=1))
     assert dispatch_tasks.call_count == 1
@@ -685,7 +623,7 @@ def test_monitor_task_trigger_partition_desync(dispatch_tasks):
     try_monitor_tasks_trigger(ts=now - timedelta(seconds=1))
     assert dispatch_tasks.call_count == 1
 
-    # Third message again just after the minute bounadry does NOT trigger
+    # Third message again just after the minute boundary does NOT trigger
     # the task, we've already ticked at that time.
     try_monitor_tasks_trigger(ts=now + timedelta(seconds=1))
     assert dispatch_tasks.call_count == 1