parsers.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import logging
  2. import orjson
  3. from django.conf import settings
  4. from django.http import HttpRequest
  5. from ninja.parser import Parser
  6. from sentry_sdk import capture_message, set_context
  7. logger = logging.getLogger(__name__)
  8. class EnvelopeParser(Parser):
  9. def parse_body(self, request: HttpRequest):
  10. if (
  11. request.resolver_match
  12. and request.resolver_match.url_name == "event_envelope"
  13. ):
  14. if request.META.get("CONTENT_TYPE", None) in [
  15. "application/x-sentry-envelope",
  16. "application/octet-stream",
  17. "text/plain;charset=UTF-8",
  18. "text/plain",
  19. None,
  20. ]:
  21. result = [orjson.loads(line) for line in request.readlines()]
  22. if settings.EVENT_STORE_DEBUG:
  23. print(orjson.dumps(result))
  24. return result
  25. try:
  26. return orjson.loads(request.body)
  27. except orjson.JSONDecodeError:
  28. set_context(
  29. "incoming event",
  30. {"body": request.body, "headers": request.META},
  31. )
  32. message = f"Envelope API unexpected content type {request.META.get('CONTENT_TYPE')}"
  33. capture_message(message, level="warning")
  34. logger.warning(message)
  35. return orjson.loads(request.body)