authentication.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. from dataclasses import dataclass
  2. from typing import Any, Literal, Optional
  3. from asgiref.sync import sync_to_async
  4. from django.conf import settings
  5. from django.contrib.auth import SESSION_KEY
  6. from django.http import HttpRequest
  7. from ninja.security import HttpBearer
  8. from ninja.security import SessionAuth as BaseSessionAuth
  9. from apps.api_tokens.models import APIToken
  10. @dataclass
  11. class Auth:
  12. user_id: int
  13. auth_type: Literal["session", "token"]
  14. data: Any = None
  15. class AuthHttpRequest(HttpRequest):
  16. """Django HttpRequest that is known to be authenticated by a user"""
  17. auth: Auth
  18. class SessionAuth(BaseSessionAuth):
  19. """
  20. Check if user is logged in by checking session
  21. This avoids an unnecessary database call.
  22. request.auth will result in the user id
  23. """
  24. async def authenticate(
  25. self, request: HttpRequest, key: Optional[str]
  26. ) -> Optional[Auth]:
  27. if settings.SESSION_ENGINE == "django.contrib.sessions.backends.cache":
  28. user_id = request.session.get(SESSION_KEY)
  29. # Django DB backed sessions don't support async yet
  30. else:
  31. user_id = await sync_to_async(request.session.get)(SESSION_KEY)
  32. return Auth(int(user_id), "session") if user_id else None
  33. class TokenAuth(HttpBearer):
  34. """
  35. API Token based authentication always connects to a specific user.
  36. Store the token object under data for checking scopes permissions.
  37. """
  38. async def authenticate(self, request: HttpRequest, key: str) -> Optional[Auth]:
  39. try:
  40. token = await APIToken.objects.aget(token=key, user__is_active=True)
  41. except APIToken.DoesNotExist:
  42. return None
  43. return Auth(token.user_id, "token", data=token)