tasks.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435
  1. from datetime import timedelta
  2. from django.db.models import Count
  3. from django.utils import timezone
  4. from celery import shared_task
  5. from projects.models import Project
  6. from .models import Notification
  7. @shared_task
  8. def process_event_alerts():
  9. """ Inspect alerts and determine if new notifications need sent """
  10. now = timezone.now()
  11. for project in Project.objects.all():
  12. for alert in project.projectalert_set.filter(
  13. quantity__isnull=False, timespan_minutes__isnull=False
  14. ):
  15. start_time = now - timedelta(minutes=alert.timespan_minutes)
  16. quantity_in_timespan = alert.quantity
  17. issues = (
  18. project.issue_set.filter(
  19. notification__isnull=True, event__created__gte=start_time,
  20. )
  21. .annotate(num_events=Count("event"))
  22. .filter(num_events__gte=quantity_in_timespan)
  23. )
  24. if issues:
  25. notification = alert.notification_set.create()
  26. notification.issues.add(*issues)
  27. send_notification.delay(notification.pk)
  28. @shared_task
  29. def send_notification(notification_id: int):
  30. notification = Notification.objects.get(pk=notification_id)
  31. notification.send_notifications()