tasks.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. from datetime import timedelta
  2. from celery import shared_task
  3. from django.conf import settings
  4. from django.db.models import Count
  5. from django.utils import timezone
  6. from django_redis import get_redis_connection
  7. from apps.issue_events.models import Issue
  8. from .constants import ISSUE_IDS_KEY
  9. from .models import Notification, ProjectAlert
  10. # Lua script for atomic smembers + del
  11. LUA_SCRIPT = """
  12. local members = redis.call('SMEMBERS', KEYS[1])
  13. redis.call('DEL', KEYS[1])
  14. return members
  15. """
  16. def process_alert(project_alert_id: int, issue_ids: list[int]):
  17. notification = Notification.objects.create(project_alert_id=project_alert_id)
  18. notification.issues.add(*issue_ids)
  19. send_notification.delay(notification.pk)
  20. @shared_task
  21. def process_event_alerts():
  22. """Inspect alerts and determine if new notifications need sent"""
  23. now = timezone.now()
  24. issue_ids: list[int] | None = None
  25. # Support not having redis, in theory
  26. if settings.CACHE_IS_REDIS:
  27. # Note all recent issue_ids at ingest time. Then we can filter by them here.
  28. issue_ids = [
  29. int(x)
  30. for x in get_redis_connection("default").eval(LUA_SCRIPT, 1, ISSUE_IDS_KEY)
  31. ]
  32. project_alerts = ProjectAlert.objects.filter(
  33. quantity__isnull=False, timespan_minutes__isnull=False
  34. )
  35. if issue_ids == []:
  36. return # There are no new issues, no work to do
  37. if issue_ids:
  38. project_alerts = project_alerts.filter(
  39. project__issues__id__in=issue_ids
  40. ).distinct()
  41. for alert in project_alerts:
  42. start_time = now - timedelta(minutes=alert.timespan_minutes)
  43. quantity_in_timespan = alert.quantity
  44. issues = (
  45. Issue.objects.filter(
  46. project_id=alert.project_id,
  47. issueevent__received__gte=start_time,
  48. )
  49. .exclude(notification__project_alert=alert)
  50. .annotate(num_events=Count("issueevent"))
  51. .filter(num_events__gte=quantity_in_timespan)
  52. )
  53. if issue_ids:
  54. issues = issues.filter(id__in=issue_ids)
  55. if issues:
  56. notification = alert.notification_set.create()
  57. notification.issues.add(*issues)
  58. send_notification.delay(notification.pk)
  59. @shared_task
  60. def send_notification(notification_id: int):
  61. notification = Notification.objects.get(pk=notification_id)
  62. notification.send_notifications()