authentication.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. from dataclasses import dataclass
  2. from typing import Any, Literal, Optional
  3. from asgiref.sync import sync_to_async
  4. from django.conf import settings
  5. from django.contrib.auth import SESSION_KEY
  6. from django.contrib.auth.models import AnonymousUser
  7. from django.http import HttpRequest
  8. from ninja.security import HttpBearer
  9. from ninja.security import SessionAuth as BaseSessionAuth
  10. from apps.api_tokens.models import APIToken
  11. @dataclass
  12. class Auth:
  13. user_id: int
  14. auth_type: Literal["session", "token"]
  15. data: Any = None
  16. class AuthHttpRequest(HttpRequest):
  17. """Django HttpRequest that is known to be authenticated by a user"""
  18. auth: Auth
  19. class SessionAuth(BaseSessionAuth):
  20. """
  21. Check if user is logged in by checking session
  22. This avoids an unnecessary database call.
  23. request.auth will result in the user id
  24. """
  25. async def authenticate(
  26. self, request: HttpRequest, key: Optional[str]
  27. ) -> Optional[Auth]:
  28. if settings.SESSION_ENGINE == "django.contrib.sessions.backends.cache":
  29. user_id = request.session.get(SESSION_KEY)
  30. # Django DB backed sessions don't support async yet
  31. else:
  32. user_id = await sync_to_async(request.session.get)(SESSION_KEY)
  33. return Auth(int(user_id), "session") if user_id else None
  34. class TokenAuth(HttpBearer):
  35. """
  36. API Token based authentication always connects to a specific user.
  37. Store the token object under data for checking scopes permissions.
  38. """
  39. def __call__(self, request: HttpRequest) -> Optional[Any]:
  40. # Workaround https://github.com/vitalik/django-ninja/issues/989
  41. if request.resolver_match and request.resolver_match.url_name in [
  42. "setup_wizard_hash"
  43. ]:
  44. request.auth = None # type: ignore
  45. return self.get_anon()
  46. return super().__call__(request)
  47. async def get_anon(self):
  48. return AnonymousUser()
  49. async def authenticate(self, request: HttpRequest, key: str) -> Optional[Auth]:
  50. try:
  51. token = await APIToken.objects.aget(token=key, user__is_active=True)
  52. except APIToken.DoesNotExist:
  53. return None
  54. return Auth(token.user_id, "token", data=token)