1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- from dataclasses import dataclass
- from typing import Any, Literal, Optional
- from asgiref.sync import sync_to_async
- from django.conf import settings
- from django.contrib.auth import SESSION_KEY
- from django.contrib.auth.models import AnonymousUser
- from django.http import HttpRequest
- from ninja.security import HttpBearer
- from ninja.security import SessionAuth as BaseSessionAuth
- from apps.api_tokens.models import APIToken
- @dataclass
- class Auth:
- user_id: int
- auth_type: Literal["session", "token"]
- data: Any = None
- class AuthHttpRequest(HttpRequest):
- """Django HttpRequest that is known to be authenticated by a user"""
- auth: Auth
- class SessionAuth(BaseSessionAuth):
- """
- Check if user is logged in by checking session
- This avoids an unnecessary database call.
- request.auth will result in the user id
- """
- async def authenticate(
- self, request: HttpRequest, key: Optional[str]
- ) -> Optional[Auth]:
- if settings.SESSION_ENGINE == "django.contrib.sessions.backends.cache":
- user_id = request.session.get(SESSION_KEY)
- # Django DB backed sessions don't support async yet
- else:
- user_id = await sync_to_async(request.session.get)(SESSION_KEY)
- return Auth(int(user_id), "session") if user_id else None
- class TokenAuth(HttpBearer):
- """
- API Token based authentication always connects to a specific user.
- Store the token object under data for checking scopes permissions.
- """
- def __call__(self, request: HttpRequest) -> Optional[Any]:
- # Workaround https://github.com/vitalik/django-ninja/issues/989
- if request.resolver_match and request.resolver_match.url_name in [
- "setup_wizard_hash"
- ]:
- request.auth = None # type: ignore
- return self.get_anon()
- return super().__call__(request)
- async def get_anon(self):
- return AnonymousUser()
- async def authenticate(self, request: HttpRequest, key: str) -> Optional[Auth]:
- try:
- token = await APIToken.objects.aget(token=key, user__is_active=True)
- except APIToken.DoesNotExist:
- return None
- return Auth(token.user_id, "token", data=token)
|