12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- import asyncio
- from typing import List
- from django.db.models import F, Q, ExpressionWrapper, DateTimeField, Subquery, OuterRef
- from django.utils import timezone
- from celery import shared_task
- from .models import Monitor, MonitorCheck
- from .utils import fetch_all
- @shared_task
- def dispatch_checks():
- """
- dispatch monitor checks tasks in batches, include start time for check
- """
- now = timezone.now()
- latest_check = Subquery(
- MonitorCheck.objects.filter(monitor_id=OuterRef("id"))
- .order_by("-start_check")
- .values("start_check")[:1]
- )
- monitor_ids = (
- Monitor.objects.filter(organization__is_accepting_events=True)
- .annotate(
- last_min_check=ExpressionWrapper(
- now - F("interval"), output_field=DateTimeField()
- ),
- latest_check=latest_check,
- )
- .filter(Q(latest_check__lte=F("last_min_check")) | Q(latest_check=None))
- .values_list("id", flat=True)
- )
- batch_size = 100
- batch_ids = []
- for i, monitor_id in enumerate(monitor_ids.iterator(), 1):
- batch_ids.append(monitor_id)
- if i % batch_size == 0:
- perform_checks.delay(batch_ids, now)
- batch_ids = []
- if len(batch_ids) > 0:
- perform_checks.delay(batch_ids, now)
- @shared_task
- def perform_checks(monitor_ids: List[int], now=None):
- """
- Performant check monitors and save results
- 1. Fetch all monitor data for ids
- 2. Async perform all checks
- 3. Save in bulk results
- """
- if now is None:
- now = timezone.now()
- # Convert queryset to raw list[dict] for asyncio operations
- latest_is_up = Subquery(
- MonitorCheck.objects.filter(monitor_id=OuterRef("id"))
- .order_by("-start_check")
- .values("is_up")[:1]
- )
- monitors = list(
- Monitor.objects.filter(pk__in=monitor_ids)
- .annotate(latest_is_up=latest_is_up)
- .values()
- )
- loop = asyncio.get_event_loop()
- results = loop.run_until_complete(fetch_all(monitors, loop))
- monitor_checks = MonitorCheck.objects.bulk_create(
- [
- MonitorCheck(
- monitor_id=result["id"],
- is_up=result["is_up"],
- start_check=now,
- reason=result.get("reason", None),
- response_time=result.get("response_time", None),
- )
- for result in results
- ]
- )
- for i, result in enumerate(results):
- if result["latest_is_up"] is True and result["is_up"] is False:
- send_monitor_notification.delay(monitor_checks[i].pk, True)
- elif result["latest_is_up"] is False and result["is_up"] is True:
- send_monitor_notification.delay(monitor_checks[i].pk, False)
- @shared_task
- def send_monitor_notification(monitor_check_id: int, went_down: bool):
- check = MonitorCheck.objects.get(pk=monitor_check_id).select_related("monitor")
|