middleware.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. # https://github.com/pennersr/django-allauth/blob/main/allauth/account/middleware.py
  2. import os
  3. from types import SimpleNamespace
  4. from allauth.account.adapter import get_adapter
  5. from allauth.account.internal import flows
  6. from allauth.core import context
  7. from allauth.core.exceptions import (
  8. ImmediateHttpResponse,
  9. ReauthenticationRequired,
  10. )
  11. from asgiref.sync import iscoroutinefunction, sync_to_async
  12. from django.conf import settings
  13. from django.http import HttpResponseRedirect
  14. from django.urls import NoReverseMatch, reverse
  15. from django.utils.decorators import sync_and_async_middleware
  16. @sync_and_async_middleware
  17. def AccountMiddleware(get_response):
  18. if iscoroutinefunction(get_response):
  19. async def middleware(request):
  20. request.allauth = SimpleNamespace()
  21. with context.request_context(request):
  22. response = await get_response(request)
  23. if _should_check_dangling_login(request, response):
  24. await _acheck_dangling_login(request)
  25. if _should_redirect_accounts(request, response):
  26. response = await _aredirect_accounts(request)
  27. return response
  28. else:
  29. def middleware(request):
  30. request.allauth = SimpleNamespace()
  31. with context.request_context(request):
  32. response = get_response(request)
  33. if _should_check_dangling_login(request, response):
  34. _check_dangling_login(request)
  35. if _should_redirect_accounts(request, response):
  36. response = _redirect_accounts(request)
  37. return response
  38. def process_exception(request, exception):
  39. if isinstance(exception, ImmediateHttpResponse):
  40. return exception.response
  41. elif isinstance(exception, ReauthenticationRequired):
  42. redirect_url = reverse("account_login")
  43. methods = get_adapter().get_reauthentication_methods(request.user)
  44. if methods:
  45. redirect_url = methods[0]["url"]
  46. return flows.reauthentication.suspend_request(request, redirect_url)
  47. middleware.process_exception = process_exception
  48. return middleware
  49. def _should_check_dangling_login(request, response):
  50. sec_fetch_dest = request.headers.get("sec-fetch-dest")
  51. if sec_fetch_dest and sec_fetch_dest != "document":
  52. return False
  53. content_type = response.headers.get("content-type")
  54. if content_type:
  55. content_type = content_type.partition(";")[0]
  56. if content_type and content_type != "text/html":
  57. return False
  58. # STATIC_URL might be None, as the staticfiles app is not strictly required
  59. if (
  60. settings.STATIC_URL and request.path.startswith(settings.STATIC_URL)
  61. ) or request.path in ["/favicon.ico", "/robots.txt", "/humans.txt", "/"]:
  62. return False
  63. if response.status_code // 100 != 2:
  64. return False
  65. return True
  66. def _check_dangling_login(request):
  67. if not getattr(request, "_account_login_accessed", False):
  68. if flows.login.LOGIN_SESSION_KEY in request.session:
  69. request.session.pop(flows.login.LOGIN_SESSION_KEY)
  70. async def _acheck_dangling_login(request):
  71. await sync_to_async(_check_dangling_login)(request)
  72. def _should_redirect_accounts(request, response) -> bool:
  73. """
  74. URLs should be hackable. Yet, assuming allauth is included like this...
  75. path("accounts/", include("allauth.urls")),
  76. ... and a user would attempt to navigate to /accounts/, a 404 would be
  77. presented. This code catches that 404, and redirects to either the email
  78. management overview or the login page, depending on whether or not the user
  79. is authenticated.
  80. """
  81. if response.status_code != 404:
  82. return False
  83. try:
  84. login_path = reverse("account_login")
  85. email_path = reverse("account_email")
  86. except NoReverseMatch:
  87. # Project might have deviated URLs, let's keep out of the way.
  88. return False
  89. prefix = os.path.commonprefix([login_path, email_path])
  90. if len(prefix) <= 1 or prefix != request.path:
  91. return False
  92. # If we have a prefix that is not just '/', and that is what our request is
  93. # pointing to, redirect.
  94. return True
  95. async def _aredirect_accounts(request) -> HttpResponseRedirect:
  96. email_path = reverse("account_email")
  97. login_path = reverse("account_login")
  98. user = await request.auser()
  99. return HttpResponseRedirect(email_path if user.is_authenticated else login_path)
  100. def _redirect_accounts(request) -> HttpResponseRedirect:
  101. email_path = reverse("account_email")
  102. login_path = reverse("account_login")
  103. user = request.user
  104. return HttpResponseRedirect(email_path if user.is_authenticated else login_path)