1234567891011121314151617181920212223242526272829 |
- import uuid
- from celery import shared_task
- from celery_batches import Batches
- from pydantic_core import ValidationError
- from .schema import EventIngestSchema
- FLUSH_EVERY = 100
- FLUSH_INTERVAL = 2
- @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
- def ingest_event(requests):
- project_events: tuple[int, list[EventIngestSchema]] = []
- for request in requests:
- try:
- project_events.append(
- (request.args[0], EventIngestSchema(**request.args[1]))
- )
- except ValidationError:
- # Log this!!
- pass
- print(project_events)
- @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
- def ingest_transaction(requests):
- pass
|