tasks.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import asyncio
  2. from datetime import timedelta
  3. from typing import List
  4. from celery import shared_task
  5. from django.conf import settings
  6. from django.db import transaction
  7. from django.db.models import DateTimeField, ExpressionWrapper, F, OuterRef, Q, Subquery
  8. from django.utils import timezone
  9. from django.utils.dateparse import parse_datetime
  10. from alerts.models import AlertRecipient
  11. from .email import MonitorEmail
  12. from .models import Monitor, MonitorCheck
  13. from .utils import fetch_all
  14. from .webhooks import send_uptime_as_webhook
  15. @shared_task
  16. def dispatch_checks():
  17. """
  18. dispatch monitor checks tasks in batches, include start time for check
  19. """
  20. now = timezone.now()
  21. latest_check = Subquery(
  22. MonitorCheck.objects.filter(monitor_id=OuterRef("id"))
  23. .order_by("-start_check")
  24. .values("start_check")[:1]
  25. )
  26. # Use of atomic solves iterator() with pgbouncer InvalidCursorName issue
  27. # https://docs.djangoproject.com/en/3.2/ref/databases/#transaction-pooling-server-side-cursors
  28. with transaction.atomic():
  29. monitor_ids = (
  30. Monitor.objects.filter(organization__is_accepting_events=True)
  31. .annotate(
  32. last_min_check=ExpressionWrapper(
  33. now - F("interval"), output_field=DateTimeField()
  34. ),
  35. latest_check=latest_check,
  36. )
  37. .filter(Q(latest_check__lte=F("last_min_check")) | Q(latest_check=None))
  38. .values_list("id", flat=True)
  39. )
  40. batch_size = 100
  41. batch_ids = []
  42. for i, monitor_id in enumerate(monitor_ids.iterator(), 1):
  43. batch_ids.append(monitor_id)
  44. if i % batch_size == 0:
  45. perform_checks.apply_async(args=(batch_ids, now), expires=60)
  46. batch_ids = []
  47. if len(batch_ids) > 0:
  48. perform_checks.delay(batch_ids, now)
  49. @shared_task
  50. def perform_checks(monitor_ids: List[int], now=None):
  51. """
  52. Performant check monitors and save results
  53. 1. Fetch all monitor data for ids
  54. 2. Async perform all checks
  55. 3. Save in bulk results
  56. """
  57. if now is None:
  58. now = timezone.now()
  59. # Convert queryset to raw list[dict] for asyncio operations
  60. monitors = list(
  61. Monitor.objects.with_check_annotations().filter(pk__in=monitor_ids).values()
  62. )
  63. results = asyncio.run(fetch_all(monitors))
  64. monitor_checks = MonitorCheck.objects.bulk_create(
  65. [
  66. MonitorCheck(
  67. monitor_id=result["id"],
  68. is_up=result["is_up"],
  69. start_check=now,
  70. reason=result.get("reason", None),
  71. response_time=result.get("response_time", None),
  72. )
  73. for result in results
  74. ]
  75. )
  76. for i, result in enumerate(results):
  77. if result["latest_is_up"] is True and result["is_up"] is False:
  78. send_monitor_notification.delay(
  79. monitor_checks[i].pk, True, result["last_change"]
  80. )
  81. elif result["latest_is_up"] is False and result["is_up"] is True:
  82. send_monitor_notification.delay(
  83. monitor_checks[i].pk, False, result["last_change"]
  84. )
  85. @shared_task
  86. def send_monitor_notification(monitor_check_id: int, went_down: bool, last_change: str):
  87. recipients = AlertRecipient.objects.filter(
  88. alert__project__monitor__checks=monitor_check_id, alert__uptime=True
  89. )
  90. for recipient in recipients:
  91. if recipient.recipient_type == AlertRecipient.RecipientType.EMAIL:
  92. MonitorEmail(
  93. pk=monitor_check_id,
  94. went_down=went_down,
  95. last_change=parse_datetime(last_change) if last_change else None,
  96. ).send_users_email()
  97. elif recipient.recipient_type == AlertRecipient.RecipientType.WEBHOOK:
  98. send_uptime_as_webhook(
  99. recipient.url, monitor_check_id, went_down, last_change
  100. )
  101. @shared_task
  102. def cleanup_old_monitor_checks():
  103. """Delete older checks and associated data"""
  104. days = settings.GLITCHTIP_MAX_EVENT_LIFE_DAYS
  105. qs = MonitorCheck.objects.filter(created__lt=timezone.now() - timedelta(days=days))
  106. # pylint: disable=protected-access
  107. qs._raw_delete(qs.db) # noqa