123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- from dataclasses import dataclass
- from typing import Any, Literal, Optional
- from asgiref.sync import sync_to_async
- from django.conf import settings
- from django.contrib.auth import SESSION_KEY
- from django.http import HttpRequest
- from ninja.security import HttpBearer
- from ninja.security import SessionAuth as BaseSessionAuth
- from apps.api_tokens.models import APIToken
- @dataclass
- class Auth:
- user_id: int
- auth_type: Literal["session", "token"]
- data: Any = None
- class AuthHttpRequest(HttpRequest):
- """Django HttpRequest that is known to be authenticated by a user"""
- auth: Auth
- class SessionAuth(BaseSessionAuth):
- """
- Check if user is logged in by checking session
- This avoids an unnecessary database call.
- request.auth will result in the user id
- """
- async def authenticate(
- self, request: HttpRequest, key: Optional[str]
- ) -> Optional[Auth]:
- if settings.SESSION_ENGINE == "django.contrib.sessions.backends.cache":
- user_id = request.session.get(SESSION_KEY)
- # Django DB backed sessions don't support async yet
- else:
- user_id = await sync_to_async(request.session.get)(SESSION_KEY)
- return Auth(int(user_id), "session") if user_id else None
- class TokenAuth(HttpBearer):
- """
- API Token based authentication always connects to a specific user.
- Store the token object under data for checking scopes permissions.
- """
- async def authenticate(self, request: HttpRequest, key: str) -> Optional[Auth]:
- try:
- token = await APIToken.objects.aget(token=key, user__is_active=True)
- except APIToken.DoesNotExist:
- return None
- return Auth(token.user_id, "token", data=token)
|