tasks.py 781 B

123456789101112131415161718192021222324252627282930
  1. import logging
  2. from celery import shared_task
  3. from celery_batches import Batches
  4. from glitchtip.celery import app
  5. from .process_event import process_issue_events
  6. from .schema import InterchangeIssueEvent
  7. logger = logging.getLogger(__name__)
  8. FLUSH_EVERY = 100
  9. FLUSH_INTERVAL = 2
  10. @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
  11. def ingest_event(requests):
  12. logger.info(f"Process {len(requests)} ingest_event requests")
  13. process_issue_events(
  14. [InterchangeIssueEvent(**request.args[0]) for request in requests]
  15. )
  16. [app.backend.mark_as_done(request.id, None, request) for request in requests]
  17. @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
  18. def ingest_transaction(requests):
  19. pass