tasks.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. import asyncio
  2. from datetime import timedelta
  3. from typing import List
  4. from celery import shared_task
  5. from django.conf import settings
  6. from django.core.cache import cache
  7. from django.db import models
  8. from django.db.models import F, Q
  9. from django.db.models.expressions import Func
  10. from django.utils import timezone
  11. from django_redis import get_redis_connection
  12. from alerts.constants import RecipientType
  13. from alerts.models import AlertRecipient
  14. from .email import MonitorEmail
  15. from .models import Monitor, MonitorCheck, MonitorType
  16. from .utils import fetch_all
  17. from .webhooks import send_uptime_as_webhook
  18. UPTIME_COUNTER_KEY = "uptime_counter"
  19. UPTIME_TICK_EXPIRE = 2147483647
  20. UPTIME_CHECK_INTERVAL = settings.UPTIME_CHECK_INTERVAL
  21. class Epoch(Func):
  22. template = "EXTRACT(epoch FROM %(expressions)s)::INTEGER"
  23. output_field = models.IntegerField()
  24. def bucket_monitors(monitors, tick: int, check_interval=UPTIME_CHECK_INTERVAL):
  25. """
  26. Sort monitors into buckets based on:
  27. Each interval group.
  28. <30 seconds timeout vs >= 30 (potentially slow)
  29. Example: if there is one monior with interval of 1 and the check interval is 10,
  30. this monitor should run every time. The return should be a list of 10 ticks with
  31. the same monitor in each
  32. Result:
  33. {tick: {is_fast: monitors[]}}
  34. {1, {True: [monitor, monitor]}}
  35. {1, {False: [monitor]}}
  36. {2, {True: [monitor]}}
  37. """
  38. result = {}
  39. for i in range(tick, tick + check_interval):
  40. fast_tick_monitors = [
  41. monitor
  42. for monitor in monitors
  43. if i % monitor.interval.seconds == 0 and monitor.int_timeout < 30
  44. ]
  45. slow_tick_monitors = [
  46. monitor
  47. for monitor in monitors
  48. if i % monitor.interval.seconds == 0 and monitor.int_timeout >= 30
  49. ]
  50. if fast_tick_monitors or slow_tick_monitors:
  51. result[i] = {}
  52. if fast_tick_monitors:
  53. result[i][True] = fast_tick_monitors
  54. if slow_tick_monitors:
  55. result[i][False] = slow_tick_monitors
  56. return result
  57. @shared_task()
  58. def dispatch_checks():
  59. """
  60. Dispatch monitor checks tasks in batches, include start time for check
  61. Track each "second tick". A tick is the number of seconds away from an arbitrary start time.
  62. Fetch each monitor that would need to run in the next UPTIME_CHECK_INTERVAL
  63. Determine when monitors need to run based on each second tick and whether it's
  64. timeout is fast or slow (group slow together)
  65. For example, if our check interval is 10 and the monitor should run every 2 seconds,
  66. there should be 5 checks run every other second
  67. This method reduces the number of necessary celery tasks and sql queries. While keeping
  68. the timing percise and allowing for any arbitrary interval (to the second).
  69. It also has no need to track state of previous checks.
  70. The check result DB writes are then batched for better performance.
  71. """
  72. now = timezone.now()
  73. try:
  74. with get_redis_connection() as con:
  75. tick = con.incr(UPTIME_COUNTER_KEY)
  76. if tick >= UPTIME_TICK_EXPIRE:
  77. con.delete(UPTIME_COUNTER_KEY)
  78. except NotImplementedError:
  79. cache.add(UPTIME_COUNTER_KEY, 0, UPTIME_TICK_EXPIRE)
  80. tick = cache.incr(UPTIME_COUNTER_KEY)
  81. tick = tick * settings.UPTIME_CHECK_INTERVAL
  82. monitors = (
  83. Monitor.objects.filter(organization__is_accepting_events=True)
  84. .annotate(mod=tick % Epoch(F("interval")))
  85. .filter(mod__lt=UPTIME_CHECK_INTERVAL)
  86. .exclude(Q(url="") & ~Q(monitor_type=MonitorType.HEARTBEAT))
  87. .only("id", "interval", "timeout")
  88. )
  89. for i, (tick, bucket) in enumerate(bucket_monitors(monitors, tick).items()):
  90. for is_fast, monitors_to_dispatch in bucket.items():
  91. run_time = now + timedelta(seconds=i)
  92. perform_checks.apply_async(
  93. args=([m.pk for m in monitors_to_dispatch], run_time),
  94. eta=run_time,
  95. expires=run_time + timedelta(minutes=1),
  96. )
  97. @shared_task
  98. def perform_checks(monitor_ids: List[int], now=None):
  99. """
  100. Performant check monitors and save results
  101. 1. Fetch all monitor data for ids
  102. 2. Async perform all checks
  103. 3. Save in bulk results
  104. """
  105. if now is None:
  106. now = timezone.now()
  107. # Convert queryset to raw list[dict] for asyncio operations
  108. monitors = list(
  109. Monitor.objects.with_check_annotations().filter(pk__in=monitor_ids).values()
  110. )
  111. results = asyncio.run(fetch_all(monitors))
  112. # Filter out "up" heartbeats
  113. results = [
  114. result
  115. for result in results
  116. if result["monitor_type"] != MonitorType.HEARTBEAT or result["is_up"] is False
  117. ]
  118. monitor_checks = MonitorCheck.objects.bulk_create(
  119. [
  120. MonitorCheck(
  121. monitor_id=result["id"],
  122. is_up=result["is_up"],
  123. is_change=result["latest_is_up"] != result["is_up"],
  124. start_check=now,
  125. reason=result.get("reason", None),
  126. response_time=result.get("response_time", None),
  127. data=result.get("data", None),
  128. )
  129. for result in results
  130. ]
  131. )
  132. for i, result in enumerate(results):
  133. if result["latest_is_up"] is True and result["is_up"] is False:
  134. send_monitor_notification.delay(
  135. monitor_checks[i].pk, True, result["last_change"]
  136. )
  137. elif result["latest_is_up"] is False and result["is_up"] is True:
  138. send_monitor_notification.delay(
  139. monitor_checks[i].pk, False, result["last_change"]
  140. )
  141. @shared_task
  142. def send_monitor_notification(monitor_check_id: int, went_down: bool, last_change: str):
  143. recipients = AlertRecipient.objects.filter(
  144. alert__project__monitor__checks=monitor_check_id, alert__uptime=True
  145. )
  146. for recipient in recipients:
  147. if recipient.recipient_type == RecipientType.EMAIL:
  148. MonitorEmail(
  149. pk=monitor_check_id,
  150. went_down=went_down,
  151. last_change=last_change if last_change else None,
  152. ).send_users_email()
  153. elif recipient.is_webhook:
  154. send_uptime_as_webhook(recipient, monitor_check_id, went_down, last_change)
  155. @shared_task
  156. def cleanup_old_monitor_checks():
  157. """Delete older checks and associated data"""
  158. days = settings.GLITCHTIP_MAX_UPTIME_CHECK_LIFE_DAYS
  159. qs = MonitorCheck.objects.filter(
  160. start_check__lt=timezone.now() - timedelta(days=days)
  161. )
  162. # pylint: disable=protected-access
  163. qs._raw_delete(qs.db) # noqa