123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- import asyncio
- from datetime import timedelta
- from typing import List
- from celery import shared_task
- from django.conf import settings
- from django.core.cache import cache
- from django.db import models
- from django.db.models import F, Q
- from django.db.models.expressions import Func
- from django.utils import timezone
- from django_redis import get_redis_connection
- from alerts.constants import RecipientType
- from alerts.models import AlertRecipient
- from .email import MonitorEmail
- from .models import Monitor, MonitorCheck, MonitorType
- from .utils import fetch_all
- from .webhooks import send_uptime_as_webhook
- UPTIME_COUNTER_KEY = "uptime_counter"
- UPTIME_TICK_EXPIRE = 2147483647
- UPTIME_CHECK_INTERVAL = settings.UPTIME_CHECK_INTERVAL
- class Epoch(Func):
- template = "EXTRACT(epoch FROM %(expressions)s)::INTEGER"
- output_field = models.IntegerField()
- def bucket_monitors(monitors, tick: int, check_interval=UPTIME_CHECK_INTERVAL):
- """
- Sort monitors into buckets based on:
- Each interval group.
- <30 seconds timeout vs >= 30 (potentially slow)
- Example: if there is one monior with interval of 1 and the check interval is 10,
- this monitor should run every time. The return should be a list of 10 ticks with
- the same monitor in each
- Result:
- {tick: {is_fast: monitors[]}}
- {1, {True: [monitor, monitor]}}
- {1, {False: [monitor]}}
- {2, {True: [monitor]}}
- """
- result = {}
- for i in range(tick, tick + check_interval):
- fast_tick_monitors = [
- monitor
- for monitor in monitors
- if i % monitor.interval.seconds == 0 and monitor.int_timeout < 30
- ]
- slow_tick_monitors = [
- monitor
- for monitor in monitors
- if i % monitor.interval.seconds == 0 and monitor.int_timeout >= 30
- ]
- if fast_tick_monitors or slow_tick_monitors:
- result[i] = {}
- if fast_tick_monitors:
- result[i][True] = fast_tick_monitors
- if slow_tick_monitors:
- result[i][False] = slow_tick_monitors
- return result
- @shared_task()
- def dispatch_checks():
- """
- Dispatch monitor checks tasks in batches, include start time for check
- Track each "second tick". A tick is the number of seconds away from an arbitrary start time.
- Fetch each monitor that would need to run in the next UPTIME_CHECK_INTERVAL
- Determine when monitors need to run based on each second tick and whether it's
- timeout is fast or slow (group slow together)
- For example, if our check interval is 10 and the monitor should run every 2 seconds,
- there should be 5 checks run every other second
- This method reduces the number of necessary celery tasks and sql queries. While keeping
- the timing percise and allowing for any arbitrary interval (to the second).
- It also has no need to track state of previous checks.
- The check result DB writes are then batched for better performance.
- """
- now = timezone.now()
- try:
- with get_redis_connection() as con:
- tick = con.incr(UPTIME_COUNTER_KEY)
- if tick >= UPTIME_TICK_EXPIRE:
- con.delete(UPTIME_COUNTER_KEY)
- except NotImplementedError:
- cache.add(UPTIME_COUNTER_KEY, 0, UPTIME_TICK_EXPIRE)
- tick = cache.incr(UPTIME_COUNTER_KEY)
- tick = tick * settings.UPTIME_CHECK_INTERVAL
- monitors = (
- Monitor.objects.filter(organization__is_accepting_events=True)
- .annotate(mod=tick % Epoch(F("interval")))
- .filter(mod__lt=UPTIME_CHECK_INTERVAL)
- .exclude(Q(url="") & ~Q(monitor_type=MonitorType.HEARTBEAT))
- .only("id", "interval", "timeout")
- )
- for i, (tick, bucket) in enumerate(bucket_monitors(monitors, tick).items()):
- for is_fast, monitors_to_dispatch in bucket.items():
- run_time = now + timedelta(seconds=i)
- perform_checks.apply_async(
- args=([m.pk for m in monitors_to_dispatch], run_time),
- eta=run_time,
- expires=run_time + timedelta(minutes=1),
- )
- @shared_task
- def perform_checks(monitor_ids: List[int], now=None):
- """
- Performant check monitors and save results
- 1. Fetch all monitor data for ids
- 2. Async perform all checks
- 3. Save in bulk results
- """
- if now is None:
- now = timezone.now()
- # Convert queryset to raw list[dict] for asyncio operations
- monitors = list(
- Monitor.objects.with_check_annotations().filter(pk__in=monitor_ids).values()
- )
- results = asyncio.run(fetch_all(monitors))
- # Filter out "up" heartbeats
- results = [
- result
- for result in results
- if result["monitor_type"] != MonitorType.HEARTBEAT or result["is_up"] is False
- ]
- monitor_checks = MonitorCheck.objects.bulk_create(
- [
- MonitorCheck(
- monitor_id=result["id"],
- is_up=result["is_up"],
- is_change=result["latest_is_up"] != result["is_up"],
- start_check=now,
- reason=result.get("reason", None),
- response_time=result.get("response_time", None),
- data=result.get("data", None),
- )
- for result in results
- ]
- )
- for i, result in enumerate(results):
- if result["latest_is_up"] is True and result["is_up"] is False:
- send_monitor_notification.delay(
- monitor_checks[i].pk, True, result["last_change"]
- )
- elif result["latest_is_up"] is False and result["is_up"] is True:
- send_monitor_notification.delay(
- monitor_checks[i].pk, False, result["last_change"]
- )
- @shared_task
- def send_monitor_notification(monitor_check_id: int, went_down: bool, last_change: str):
- recipients = AlertRecipient.objects.filter(
- alert__project__monitor__checks=monitor_check_id, alert__uptime=True
- )
- for recipient in recipients:
- if recipient.recipient_type == RecipientType.EMAIL:
- MonitorEmail(
- pk=monitor_check_id,
- went_down=went_down,
- last_change=last_change if last_change else None,
- ).send_users_email()
- elif recipient.is_webhook:
- send_uptime_as_webhook(recipient, monitor_check_id, went_down, last_change)
- @shared_task
- def cleanup_old_monitor_checks():
- """Delete older checks and associated data"""
- days = settings.GLITCHTIP_MAX_UPTIME_CHECK_LIFE_DAYS
- qs = MonitorCheck.objects.filter(
- start_check__lt=timezone.now() - timedelta(days=days)
- )
- # pylint: disable=protected-access
- qs._raw_delete(qs.db) # noqa
|