tasks.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import asyncio
  2. from datetime import timedelta
  3. from typing import List
  4. from celery import shared_task
  5. from django.conf import settings
  6. from django.db import transaction
  7. from django.db.models import DateTimeField, ExpressionWrapper, F, OuterRef, Q, Subquery
  8. from django.utils import timezone
  9. from django.utils.dateparse import parse_datetime
  10. from .email import MonitorEmail
  11. from .models import Monitor, MonitorCheck
  12. from .utils import fetch_all
  13. @shared_task
  14. def dispatch_checks():
  15. """
  16. dispatch monitor checks tasks in batches, include start time for check
  17. """
  18. now = timezone.now()
  19. latest_check = Subquery(
  20. MonitorCheck.objects.filter(monitor_id=OuterRef("id"))
  21. .order_by("-start_check")
  22. .values("start_check")[:1]
  23. )
  24. # Use of atomic solves iterator() with pgbouncer InvalidCursorName issue
  25. # https://docs.djangoproject.com/en/3.2/ref/databases/#transaction-pooling-server-side-cursors
  26. with transaction.atomic():
  27. monitor_ids = (
  28. Monitor.objects.filter(organization__is_accepting_events=True)
  29. .annotate(
  30. last_min_check=ExpressionWrapper(
  31. now - F("interval"), output_field=DateTimeField()
  32. ),
  33. latest_check=latest_check,
  34. )
  35. .filter(Q(latest_check__lte=F("last_min_check")) | Q(latest_check=None))
  36. .values_list("id", flat=True)
  37. )
  38. batch_size = 100
  39. batch_ids = []
  40. for i, monitor_id in enumerate(monitor_ids.iterator(), 1):
  41. batch_ids.append(monitor_id)
  42. if i % batch_size == 0:
  43. perform_checks.delay(batch_ids, now)
  44. batch_ids = []
  45. if len(batch_ids) > 0:
  46. perform_checks.delay(batch_ids, now)
  47. @shared_task
  48. def perform_checks(monitor_ids: List[int], now=None):
  49. """
  50. Performant check monitors and save results
  51. 1. Fetch all monitor data for ids
  52. 2. Async perform all checks
  53. 3. Save in bulk results
  54. """
  55. if now is None:
  56. now = timezone.now()
  57. # Convert queryset to raw list[dict] for asyncio operations
  58. monitors = list(
  59. Monitor.objects.with_check_annotations().filter(pk__in=monitor_ids).values()
  60. )
  61. loop = asyncio.get_event_loop()
  62. results = loop.run_until_complete(fetch_all(monitors, loop))
  63. monitor_checks = MonitorCheck.objects.bulk_create(
  64. [
  65. MonitorCheck(
  66. monitor_id=result["id"],
  67. is_up=result["is_up"],
  68. start_check=now,
  69. reason=result.get("reason", None),
  70. response_time=result.get("response_time", None),
  71. )
  72. for result in results
  73. ]
  74. )
  75. for i, result in enumerate(results):
  76. if result["latest_is_up"] is True and result["is_up"] is False:
  77. send_monitor_notification.delay(
  78. monitor_checks[i].pk, True, result["last_change"]
  79. )
  80. elif result["latest_is_up"] is False and result["is_up"] is True:
  81. send_monitor_notification.delay(
  82. monitor_checks[i].pk, False, result["last_change"]
  83. )
  84. @shared_task
  85. def send_monitor_notification(monitor_check_id: int, went_down: bool, last_change: str):
  86. MonitorEmail(
  87. pk=monitor_check_id,
  88. went_down=went_down,
  89. last_change=parse_datetime(last_change) if last_change else None,
  90. ).send_users_email()
  91. @shared_task
  92. def cleanup_old_monitor_checks():
  93. """ Delete older checks and associated data """
  94. days = settings.GLITCHTIP_MAX_EVENT_LIFE_DAYS
  95. qs = MonitorCheck.objects.filter(created__lt=timezone.now() - timedelta(days=days))
  96. # pylint: disable=protected-access
  97. qs._raw_delete(qs.db) # noqa