tasks.py 1.0 KB

1234567891011121314151617181920212223242526272829303132
  1. import logging
  2. from celery import shared_task
  3. from celery_batches import Batches
  4. from glitchtip.celery import app
  5. from .process_event import process_issue_events, process_transaction_events
  6. from .schema import InterchangeIssueEvent
  7. logger = logging.getLogger(__name__)
  8. FLUSH_EVERY = 100
  9. FLUSH_INTERVAL = 2
  10. @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
  11. def ingest_event(requests):
  12. logger.info(f"Process {len(requests)} issue event requests")
  13. process_issue_events(
  14. [InterchangeIssueEvent(**request.args[0]) for request in requests]
  15. )
  16. [app.backend.mark_as_done(request.id, None, request) for request in requests]
  17. @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
  18. def ingest_transaction(requests):
  19. logger.info(f"Process {len(requests)} transaction event requests")
  20. process_transaction_events(
  21. [InterchangeIssueEvent(**request.args[0]) for request in requests]
  22. )
  23. [app.backend.mark_as_done(request.id, None, request) for request in requests]