tasks.py 763 B

1234567891011121314151617181920212223242526272829
  1. import uuid
  2. from celery import shared_task
  3. from celery_batches import Batches
  4. from pydantic_core import ValidationError
  5. from .schema import EventIngestSchema
  6. FLUSH_EVERY = 100
  7. FLUSH_INTERVAL = 2
  8. @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
  9. def ingest_event(requests):
  10. project_events: tuple[int, list[EventIngestSchema]] = []
  11. for request in requests:
  12. try:
  13. project_events.append(
  14. (request.args[0], EventIngestSchema(**request.args[1]))
  15. )
  16. except ValidationError:
  17. # Log this!!
  18. pass
  19. print(project_events)
  20. @shared_task(base=Batches, flush_every=FLUSH_EVERY, flush_interval=FLUSH_INTERVAL)
  21. def ingest_transaction(requests):
  22. pass