tasks.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import asyncio
  2. from datetime import timedelta
  3. from typing import List
  4. from celery import shared_task
  5. from django.conf import settings
  6. from django.core.cache import cache
  7. from django.db.models import F, Q
  8. from django.utils import timezone
  9. from django_redis import get_redis_connection
  10. from apps.alerts.constants import RecipientType
  11. from apps.alerts.models import AlertRecipient
  12. from .email import MonitorEmail
  13. from .models import Monitor, MonitorCheck, MonitorType
  14. from .utils import fetch_all
  15. from .webhooks import send_uptime_as_webhook
  16. UPTIME_COUNTER_KEY = "uptime_counter"
  17. UPTIME_TICK_EXPIRE = 2147483647
  18. UPTIME_CHECK_INTERVAL = settings.UPTIME_CHECK_INTERVAL
  19. def bucket_monitors(monitors, tick: int, check_interval=UPTIME_CHECK_INTERVAL):
  20. """
  21. Sort monitors into buckets based on:
  22. Each interval group.
  23. <30 seconds timeout vs >= 30 (potentially slow)
  24. Example: if there is one monior with interval of 1 and the check interval is 10,
  25. this monitor should run every time. The return should be a list of 10 ticks with
  26. the same monitor in each
  27. Result:
  28. {tick: {is_fast: monitors[]}}
  29. {1, {True: [monitor, monitor]}}
  30. {1, {False: [monitor]}}
  31. {2, {True: [monitor]}}
  32. """
  33. result = {}
  34. for i in range(tick, tick + check_interval):
  35. fast_tick_monitors = [
  36. monitor
  37. for monitor in monitors
  38. if i % monitor.interval == 0 and monitor.int_timeout < 30
  39. ]
  40. slow_tick_monitors = [
  41. monitor
  42. for monitor in monitors
  43. if i % monitor.interval == 0 and monitor.int_timeout >= 30
  44. ]
  45. if fast_tick_monitors or slow_tick_monitors:
  46. result[i] = {}
  47. if fast_tick_monitors:
  48. result[i][True] = fast_tick_monitors
  49. if slow_tick_monitors:
  50. result[i][False] = slow_tick_monitors
  51. return result
  52. @shared_task()
  53. def dispatch_checks():
  54. """
  55. Dispatch monitor checks tasks in batches, include start time for check
  56. Track each "second tick". A tick is the number of seconds away from an arbitrary start time.
  57. Fetch each monitor that would need to run in the next UPTIME_CHECK_INTERVAL
  58. Determine when monitors need to run based on each second tick and whether it's
  59. timeout is fast or slow (group slow together)
  60. For example, if our check interval is 10 and the monitor should run every 2 seconds,
  61. there should be 5 checks run every other second
  62. This method reduces the number of necessary celery tasks and sql queries. While keeping
  63. the timing precise and allowing for any arbitrary interval (to the second).
  64. It also has no need to track state of previous checks.
  65. The check result DB writes are then batched for better performance.
  66. """
  67. now = timezone.now()
  68. try:
  69. with get_redis_connection() as con:
  70. tick = con.incr(UPTIME_COUNTER_KEY)
  71. if tick >= UPTIME_TICK_EXPIRE:
  72. con.delete(UPTIME_COUNTER_KEY)
  73. elif tick % 1000 == 0: # Set sanity check TTL
  74. con.expire(UPTIME_COUNTER_KEY, 86400)
  75. except NotImplementedError:
  76. cache.add(UPTIME_COUNTER_KEY, 0, UPTIME_TICK_EXPIRE)
  77. tick = cache.incr(UPTIME_COUNTER_KEY)
  78. tick = tick * settings.UPTIME_CHECK_INTERVAL
  79. monitors = (
  80. Monitor.objects.filter(organization__event_throttle_rate__lt=100)
  81. .annotate(mod=tick % F("interval"))
  82. .filter(mod__lt=UPTIME_CHECK_INTERVAL)
  83. .exclude(Q(url="") & ~Q(monitor_type=MonitorType.HEARTBEAT))
  84. .only("id", "interval", "timeout")
  85. )
  86. for i, (tick, bucket) in enumerate(bucket_monitors(monitors, tick).items()):
  87. for is_fast, monitors_to_dispatch in bucket.items():
  88. run_time = now + timedelta(seconds=i)
  89. perform_checks.apply_async(
  90. args=([m.pk for m in monitors_to_dispatch], run_time),
  91. eta=run_time,
  92. expires=run_time + timedelta(minutes=1),
  93. )
  94. @shared_task
  95. def perform_checks(monitor_ids: List[int], now=None):
  96. """
  97. Performant check monitors and save results
  98. 1. Fetch all monitor data for ids
  99. 2. Async perform all checks
  100. 3. Save in bulk results
  101. """
  102. if now is None:
  103. now = timezone.now()
  104. # Convert queryset to raw list[dict] for asyncio operations
  105. monitors = list(
  106. Monitor.objects.with_check_annotations().filter(pk__in=monitor_ids).values()
  107. )
  108. results = asyncio.run(fetch_all(monitors))
  109. # Filter out "up" heartbeats
  110. results = [
  111. result
  112. for result in results
  113. if result["monitor_type"] != MonitorType.HEARTBEAT or result["is_up"] is False
  114. ]
  115. monitor_checks = MonitorCheck.objects.bulk_create(
  116. [
  117. MonitorCheck(
  118. monitor_id=result["id"],
  119. is_up=result["is_up"],
  120. is_change=result["latest_is_up"] != result["is_up"],
  121. start_check=now,
  122. reason=result.get("reason", None),
  123. response_time=result.get("response_time", None),
  124. data=result.get("data", None),
  125. )
  126. for result in results
  127. ]
  128. )
  129. for i, result in enumerate(results):
  130. if result["latest_is_up"] is True and result["is_up"] is False:
  131. send_monitor_notification.delay(
  132. monitor_checks[i].pk, True, result["last_change"]
  133. )
  134. elif result["latest_is_up"] is False and result["is_up"] is True:
  135. send_monitor_notification.delay(
  136. monitor_checks[i].pk, False, result["last_change"]
  137. )
  138. @shared_task
  139. def send_monitor_notification(monitor_check_id: int, went_down: bool, last_change: str):
  140. recipients = AlertRecipient.objects.filter(
  141. alert__project__monitor__checks=monitor_check_id, alert__uptime=True
  142. )
  143. for recipient in recipients:
  144. if recipient.recipient_type == RecipientType.EMAIL:
  145. MonitorEmail(
  146. pk=monitor_check_id,
  147. went_down=went_down,
  148. last_change=last_change if last_change else None,
  149. ).send_users_email()
  150. elif recipient.is_webhook:
  151. send_uptime_as_webhook(recipient, monitor_check_id, went_down, last_change)