1234567891011121314151617181920212223242526272829303132 |
- import logging
- from celery import shared_task
- from celery_batches import Batches
- from glitchtip.celery import app
- from .process_event import process_issue_events, process_transaction_events
- from .schema import InterchangeIssueEvent
- logger = logging.getLogger(__name__)
- FLUSH_EVERY = 100
- FLUSH_INTERVAL = 2
- @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
- def ingest_event(requests):
- logger.info(f"Process {len(requests)} issue event requests")
- process_issue_events(
- [InterchangeIssueEvent(**request.args[0]) for request in requests]
- )
- [app.backend.mark_as_done(request.id, None, request) for request in requests]
- @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
- def ingest_transaction(requests):
- logger.info(f"Process {len(requests)} transaction event requests")
- process_transaction_events(
- [InterchangeIssueEvent(**request.args[0]) for request in requests]
- )
- [app.backend.mark_as_done(request.id, None, request) for request in requests]
|