tasks.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import asyncio
  2. from typing import List
  3. from django.db.models import F, Q, ExpressionWrapper, DateTimeField, Subquery, OuterRef
  4. from django.utils import timezone
  5. from celery import shared_task
  6. from .models import Monitor, MonitorCheck
  7. from .utils import fetch_all
  8. @shared_task
  9. def dispatch_checks():
  10. """
  11. dispatch monitor checks tasks in batches, include start time for check
  12. """
  13. now = timezone.now()
  14. latest_check = Subquery(
  15. MonitorCheck.objects.filter(monitor_id=OuterRef("id"))
  16. .order_by("-start_check")
  17. .values("start_check")[:1]
  18. )
  19. monitor_ids = (
  20. Monitor.objects.filter(organization__is_accepting_events=True)
  21. .annotate(
  22. last_min_check=ExpressionWrapper(
  23. now - F("interval"), output_field=DateTimeField()
  24. ),
  25. latest_check=latest_check,
  26. )
  27. .filter(Q(latest_check__lte=F("last_min_check")) | Q(latest_check=None))
  28. .values_list("id", flat=True)
  29. )
  30. batch_size = 100
  31. batch_ids = []
  32. for i, monitor_id in enumerate(monitor_ids.iterator(), 1):
  33. batch_ids.append(monitor_id)
  34. if i % batch_size == 0:
  35. perform_checks.delay(batch_ids, now)
  36. batch_ids = []
  37. if len(batch_ids) > 0:
  38. perform_checks.delay(batch_ids, now)
  39. @shared_task
  40. def perform_checks(monitor_ids: List[int], now=None):
  41. """
  42. Performant check monitors and save results
  43. 1. Fetch all monitor data for ids
  44. 2. Async perform all checks
  45. 3. Save in bulk results
  46. """
  47. if now is None:
  48. now = timezone.now()
  49. # Convert queryset to raw list[dict] for asyncio operations
  50. latest_is_up = Subquery(
  51. MonitorCheck.objects.filter(monitor_id=OuterRef("id"))
  52. .order_by("-start_check")
  53. .values("is_up")[:1]
  54. )
  55. monitors = list(
  56. Monitor.objects.filter(pk__in=monitor_ids)
  57. .annotate(latest_is_up=latest_is_up)
  58. .values()
  59. )
  60. loop = asyncio.get_event_loop()
  61. results = loop.run_until_complete(fetch_all(monitors, loop))
  62. monitor_checks = MonitorCheck.objects.bulk_create(
  63. [
  64. MonitorCheck(
  65. monitor_id=result["id"],
  66. is_up=result["is_up"],
  67. start_check=now,
  68. reason=result.get("reason", None),
  69. response_time=result.get("response_time", None),
  70. )
  71. for result in results
  72. ]
  73. )
  74. for i, result in enumerate(results):
  75. if result["latest_is_up"] is True and result["is_up"] is False:
  76. send_monitor_notification.delay(monitor_checks[i].pk, True)
  77. elif result["latest_is_up"] is False and result["is_up"] is True:
  78. send_monitor_notification.delay(monitor_checks[i].pk, False)
  79. @shared_task
  80. def send_monitor_notification(monitor_check_id: int, went_down: bool):
  81. check = MonitorCheck.objects.get(pk=monitor_check_id).select_related("monitor")