api.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import uuid
  2. from urllib.parse import urlparse
  3. from django.http import HttpRequest
  4. from ninja import NinjaAPI, Schema
  5. from ninja.errors import AuthenticationError, HttpError, ValidationError
  6. from projects.models import Project
  7. from .authentication import event_auth, get_project
  8. from .data_models import EnvelopeSchema, EventIngestSchema
  9. from .exceptions import ThrottleException
  10. from .parsers import EnvelopeParser
  11. from .renderers import ORJSONRenderer
  12. from .tasks import ingest_event
  13. api = NinjaAPI(parser=EnvelopeParser(), renderer=ORJSONRenderer())
  14. @api.exception_handler(ThrottleException)
  15. def throttled(request: HttpRequest, exc: ThrottleException):
  16. response = api.create_response(
  17. request,
  18. {"message": "Please retry later"},
  19. status=429,
  20. )
  21. if retry_after := exc.retry_after:
  22. if isinstance(retry_after, int):
  23. response["Retry-After"] = retry_after
  24. else:
  25. response["Retry-After"] = retry_after.strftime("%a, %d %b %Y %H:%M:%S GMT")
  26. return response
  27. class EventIngestOut(Schema):
  28. event_id: str
  29. def check_status():
  30. if settings.EVENT_STORE_DEBUG:
  31. print(json.dumps(self.request.data))
  32. @api.post("/{project_id}/store/", response=EventIngestOut)
  33. async def event_store(
  34. request: HttpRequest,
  35. payload: EventIngestSchema,
  36. project_id: int,
  37. ):
  38. check_status()
  39. project = await get_project(request)
  40. res = ingest_event.delay(project.id, payload.dict())
  41. return {"event_id": payload.event_id.hex}
  42. @api.post("/{project_id}/envelope/", response=EventIngestOut, auth=event_auth)
  43. async def event_envelope(
  44. request: HttpRequest,
  45. payload: EnvelopeSchema,
  46. project_id: int,
  47. ):
  48. check_status()
  49. return {"event_id": "aaaaaaaaaaa"}
  50. @api.post("/{project_id}/security/", response=EventIngestOut)
  51. async def event_security(
  52. request: HttpRequest,
  53. payload: EventIngestSchema,
  54. project_id: int,
  55. ):
  56. check_status()