tasks.py 1.2 KB

123456789101112131415161718192021222324252627282930313233
  1. from datetime import timedelta
  2. from django.db.models import Count
  3. from django.utils import timezone
  4. from celery import shared_task
  5. from projects.models import Project
  6. from .models import Notification
  7. @shared_task
  8. def process_alerts():
  9. """ Inspect alerts and determine if new notifications need sent """
  10. now = timezone.now()
  11. for project in Project.objects.all():
  12. for alert in project.projectalert_set.all():
  13. start_time = now - timedelta(minutes=alert.timespan_minutes)
  14. quantity_in_timespan = alert.quantity
  15. issues = (
  16. project.issue_set.filter(
  17. notification__isnull=True, event__created__gte=start_time,
  18. )
  19. .annotate(num_events=Count("event"))
  20. .filter(num_events__gte=quantity_in_timespan)
  21. )
  22. if issues:
  23. notification = alert.notification_set.create()
  24. notification.issues.add(*issues)
  25. send_notification.delay(notification.pk)
  26. @shared_task
  27. def send_notification(notification_id: int):
  28. notification = Notification.objects.get(pk=notification_id)
  29. notification.send_notifications()