12345678910111213141516171819202122232425262728 |
- import logging
- from celery import shared_task
- from celery_batches import Batches
- from glitchtip.celery import app
- from .process_event import process_issue_events
- from .schema import InterchangeIssueEvent
- logger = logging.getLogger(__name__)
- FLUSH_EVERY = 100
- FLUSH_INTERVAL = 2
- @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
- def ingest_event(requests):
- logger.info(f"Process {len(requests)} ingest_event requests")
- process_issue_events(
- [InterchangeIssueEvent(**request.args[0]) for request in requests]
- )
- [app.backend.mark_as_done(request.id, None, request) for request in requests]
- @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
- def ingest_transaction(requests):
- pass
|