tasks.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import asyncio
  2. import logging
  3. from datetime import timedelta
  4. from celery import shared_task
  5. from django.conf import settings
  6. from django.core.cache import cache
  7. from django.db.models import F, Q
  8. from django.utils import timezone
  9. from django_redis import get_redis_connection
  10. from apps.alerts.constants import RecipientType
  11. from apps.alerts.models import AlertRecipient
  12. from .email import MonitorEmail
  13. from .models import Monitor, MonitorCheck, MonitorType
  14. from .utils import fetch_all
  15. from .webhooks import send_uptime_as_webhook
  16. logger = logging.getLogger(__name__)
  17. UPTIME_COUNTER_KEY = "uptime_counter"
  18. UPTIME_TICK_EXPIRE = 2147483647
  19. UPTIME_CHECK_INTERVAL = settings.UPTIME_CHECK_INTERVAL
  20. def bucket_monitors(monitors, tick: int, check_interval=UPTIME_CHECK_INTERVAL):
  21. """
  22. Sort monitors into buckets based on:
  23. Each interval group.
  24. <30 seconds timeout vs >= 30 (potentially slow)
  25. Example: if there is one monior with interval of 1 and the check interval is 10,
  26. this monitor should run every time. The return should be a list of 10 ticks with
  27. the same monitor in each
  28. Result:
  29. {tick: {is_fast: monitors[]}}
  30. {1, {True: [monitor, monitor]}}
  31. {1, {False: [monitor]}}
  32. {2, {True: [monitor]}}
  33. """
  34. result = {}
  35. for i in range(tick, tick + check_interval):
  36. fast_tick_monitors = [
  37. monitor
  38. for monitor in monitors
  39. if i % monitor.interval == 0 and monitor.int_timeout < 30
  40. ]
  41. slow_tick_monitors = [
  42. monitor
  43. for monitor in monitors
  44. if i % monitor.interval == 0 and monitor.int_timeout >= 30
  45. ]
  46. if fast_tick_monitors or slow_tick_monitors:
  47. result[i] = {}
  48. if fast_tick_monitors:
  49. result[i][True] = fast_tick_monitors
  50. if slow_tick_monitors:
  51. result[i][False] = slow_tick_monitors
  52. return result
  53. @shared_task()
  54. def dispatch_checks():
  55. """
  56. Dispatch monitor checks tasks in batches, include start time for check
  57. Track each "second tick". A tick is the number of seconds away from an arbitrary start time.
  58. Fetch each monitor that would need to run in the next UPTIME_CHECK_INTERVAL
  59. Determine when monitors need to run based on each second tick and whether it's
  60. timeout is fast or slow (group slow together)
  61. For example, if our check interval is 10 and the monitor should run every 2 seconds,
  62. there should be 5 checks run every other second
  63. This method reduces the number of necessary celery tasks and sql queries. While keeping
  64. the timing precise and allowing for any arbitrary interval (to the second).
  65. It also has no need to track state of previous checks.
  66. The check result DB writes are then batched for better performance.
  67. """
  68. now = timezone.now()
  69. try:
  70. with get_redis_connection() as con:
  71. tick = con.incr(UPTIME_COUNTER_KEY)
  72. if tick >= UPTIME_TICK_EXPIRE:
  73. con.delete(UPTIME_COUNTER_KEY)
  74. elif tick % 1000 == 0: # Set sanity check TTL
  75. con.expire(UPTIME_COUNTER_KEY, 86400)
  76. except NotImplementedError:
  77. cache.add(UPTIME_COUNTER_KEY, 0, UPTIME_TICK_EXPIRE)
  78. tick = cache.incr(UPTIME_COUNTER_KEY)
  79. tick = tick * settings.UPTIME_CHECK_INTERVAL
  80. monitors = (
  81. Monitor.objects.filter(organization__event_throttle_rate__lt=100)
  82. .annotate(mod=tick % F("interval"))
  83. .filter(mod__lt=UPTIME_CHECK_INTERVAL)
  84. .exclude(Q(url="") & ~Q(monitor_type=MonitorType.HEARTBEAT))
  85. .only("id", "interval", "timeout")
  86. )
  87. for i, (tick, bucket) in enumerate(bucket_monitors(monitors, tick).items()):
  88. for is_fast, monitors_to_dispatch in bucket.items():
  89. run_time = now + timedelta(seconds=i)
  90. perform_checks.apply_async(
  91. args=([m.pk for m in monitors_to_dispatch], run_time),
  92. eta=run_time,
  93. expires=run_time + timedelta(minutes=1),
  94. )
  95. @shared_task
  96. def perform_checks(monitor_ids: list[int], now=None):
  97. """
  98. Performant check monitors and save results
  99. 1. Fetch all monitor data for ids
  100. 2. Async perform all checks
  101. 3. Save in bulk results
  102. """
  103. if now is None:
  104. now = timezone.now()
  105. # Convert queryset to raw list[dict] for asyncio operations
  106. monitors = list(
  107. Monitor.objects.with_check_annotations().filter(pk__in=monitor_ids).values()
  108. )
  109. results = []
  110. for result in asyncio.run(fetch_all(monitors)):
  111. # Log and ignore exceptions
  112. if isinstance(result, Exception):
  113. logger.error("Critical monitor check failure", exc_info=result)
  114. # Filter out "up" heartbeats
  115. elif result["monitor_type"] != MonitorType.HEARTBEAT or result["is_up"] is False:
  116. results.append(result)
  117. monitor_checks = MonitorCheck.objects.bulk_create(
  118. [
  119. MonitorCheck(
  120. monitor_id=result["id"],
  121. is_up=result["is_up"],
  122. is_change=result["latest_is_up"] != result["is_up"],
  123. start_check=now,
  124. reason=result.get("reason", None),
  125. response_time=result.get("response_time", None),
  126. data=result.get("data", None),
  127. )
  128. for result in results
  129. ]
  130. )
  131. for i, result in enumerate(results):
  132. if result["latest_is_up"] is True and result["is_up"] is False:
  133. send_monitor_notification.delay(
  134. monitor_checks[i].pk, True, result["last_change"]
  135. )
  136. elif result["latest_is_up"] is False and result["is_up"] is True:
  137. send_monitor_notification.delay(
  138. monitor_checks[i].pk, False, result["last_change"]
  139. )
  140. @shared_task
  141. def send_monitor_notification(monitor_check_id: int, went_down: bool, last_change: str):
  142. recipients = AlertRecipient.objects.filter(
  143. alert__project__monitor__checks=monitor_check_id, alert__uptime=True
  144. )
  145. for recipient in recipients:
  146. if recipient.recipient_type == RecipientType.EMAIL:
  147. MonitorEmail(
  148. pk=monitor_check_id,
  149. went_down=went_down,
  150. last_change=last_change if last_change else None,
  151. ).send_users_email()
  152. elif recipient.is_webhook:
  153. send_uptime_as_webhook(recipient, monitor_check_id, went_down, last_change)