|
@@ -12,12 +12,12 @@ from django.http import (
|
|
|
HttpResponseNotFound,
|
|
|
HttpResponseRedirect,
|
|
|
)
|
|
|
+from django.http.response import HttpResponseBase
|
|
|
from django.middleware.csrf import CsrfViewMiddleware
|
|
|
from django.template.context_processors import csrf
|
|
|
from django.urls import reverse
|
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
|
from django.views.generic import View
|
|
|
-from rest_framework.request import Request
|
|
|
|
|
|
from sentry import options
|
|
|
from sentry.api.utils import generate_organization_url, is_member_disabled_from_limit
|
|
@@ -52,7 +52,7 @@ class _HasRespond(Protocol):
|
|
|
|
|
|
def respond(
|
|
|
self, template: str, context: dict[str, Any] | None = None, status: int = 200
|
|
|
- ) -> HttpResponse:
|
|
|
+ ) -> HttpResponseBase:
|
|
|
...
|
|
|
|
|
|
|
|
@@ -65,7 +65,7 @@ class OrganizationMixin:
|
|
|
# as it's only used in a single location and over complicates the rest of
|
|
|
# the code
|
|
|
def determine_active_organization(
|
|
|
- self, request: Request, organization_slug: str | None = None
|
|
|
+ self, request: HttpRequest, organization_slug: str | None = None
|
|
|
) -> None:
|
|
|
"""
|
|
|
Using the current request and potentially optional organization_slug, 'determines'
|
|
@@ -100,7 +100,7 @@ class OrganizationMixin:
|
|
|
self.active_organization = active_organization
|
|
|
|
|
|
def _lookup_organizations(
|
|
|
- self, is_implicit: bool, organization_slug: str | None, request: Request
|
|
|
+ self, is_implicit: bool, organization_slug: str | None, request: HttpRequest
|
|
|
) -> tuple[RpcUserOrganizationContext | None, RpcOrganizationSummary | None]:
|
|
|
active_organization: RpcUserOrganizationContext | None = self._try_superuser_org_lookup(
|
|
|
organization_slug, request
|
|
@@ -124,7 +124,7 @@ class OrganizationMixin:
|
|
|
is_implicit: bool,
|
|
|
organization_slug: str,
|
|
|
organizations: list[RpcOrganizationSummary],
|
|
|
- request: Request,
|
|
|
+ request: HttpRequest,
|
|
|
) -> RpcUserOrganizationContext | None:
|
|
|
try:
|
|
|
backup_org: RpcOrganizationSummary | None = next(
|
|
@@ -145,7 +145,7 @@ class OrganizationMixin:
|
|
|
return None
|
|
|
|
|
|
def _try_superuser_org_lookup(
|
|
|
- self, organization_slug: str | None, request: Request
|
|
|
+ self, organization_slug: str | None, request: HttpRequest
|
|
|
) -> RpcUserOrganizationContext | None:
|
|
|
active_organization: RpcUserOrganizationContext | None = None
|
|
|
if organization_slug is not None:
|
|
@@ -155,7 +155,7 @@ class OrganizationMixin:
|
|
|
)
|
|
|
return active_organization
|
|
|
|
|
|
- def _find_implicit_slug(self, request: Request) -> str | None:
|
|
|
+ def _find_implicit_slug(self, request: HttpRequest) -> str | None:
|
|
|
organization_slug = request.session.get("activeorg")
|
|
|
if request.subdomain is not None and request.subdomain != organization_slug:
|
|
|
# Customer domain is being used, set the subdomain as the requesting org slug.
|
|
@@ -163,7 +163,7 @@ class OrganizationMixin:
|
|
|
return organization_slug
|
|
|
|
|
|
def is_not_2fa_compliant(
|
|
|
- self, request: Request, organization: RpcOrganization | Organization
|
|
|
+ self, request: HttpRequest, organization: RpcOrganization | Organization
|
|
|
) -> bool:
|
|
|
return (
|
|
|
organization.flags.require_2fa
|
|
@@ -172,12 +172,12 @@ class OrganizationMixin:
|
|
|
)
|
|
|
|
|
|
def is_member_disabled_from_limit(
|
|
|
- self, request: Request, organization: RpcUserOrganizationContext | RpcOrganization
|
|
|
+ self, request: HttpRequest, organization: RpcUserOrganizationContext | RpcOrganization
|
|
|
) -> bool:
|
|
|
return is_member_disabled_from_limit(request, organization)
|
|
|
|
|
|
def get_active_team(
|
|
|
- self, request: Request, organization: RpcOrganization, team_slug: str
|
|
|
+ self, request: HttpRequest, organization: RpcOrganization, team_slug: str
|
|
|
) -> Team | None:
|
|
|
"""
|
|
|
Returns the currently selected team for the request or None
|
|
@@ -194,7 +194,7 @@ class OrganizationMixin:
|
|
|
return team
|
|
|
|
|
|
def get_active_project(
|
|
|
- self, request: Request, organization: RpcOrganization, project_slug: str
|
|
|
+ self, request: HttpRequest, organization: RpcOrganization, project_slug: str
|
|
|
) -> Project | None:
|
|
|
try:
|
|
|
project = Project.objects.get(slug=project_slug, organization=organization)
|
|
@@ -206,7 +206,7 @@ class OrganizationMixin:
|
|
|
|
|
|
return project
|
|
|
|
|
|
- def redirect_to_org(self: _HasRespond, request: Request) -> HttpResponse:
|
|
|
+ def redirect_to_org(self: _HasRespond, request: HttpRequest) -> HttpResponseBase:
|
|
|
from sentry import features
|
|
|
|
|
|
using_customer_domain = request and is_using_customer_domain(request)
|
|
@@ -273,7 +273,7 @@ class BaseView(View, OrganizationMixin):
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
@csrf_exempt
|
|
|
- def dispatch(self, request: Request, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
|
+ def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponseBase:
|
|
|
"""
|
|
|
A note on the CSRF protection process.
|
|
|
|
|
@@ -337,28 +337,28 @@ class BaseView(View, OrganizationMixin):
|
|
|
|
|
|
return self.handle(request, *args, **kwargs)
|
|
|
|
|
|
- def test_csrf(self, request: Request) -> HttpResponse:
|
|
|
+ def test_csrf(self, request: HttpRequest) -> HttpResponse:
|
|
|
def _fake_get_response(request: HttpRequest) -> HttpResponse:
|
|
|
raise AssertionError("this should be unreachable")
|
|
|
|
|
|
middleware = CsrfViewMiddleware(_fake_get_response)
|
|
|
return middleware.process_view(request, self.dispatch, [request], {})
|
|
|
|
|
|
- def get_access(self, request: Request, *args: Any, **kwargs: Any) -> access.Access:
|
|
|
+ def get_access(self, request: HttpRequest, *args: Any, **kwargs: Any) -> access.Access:
|
|
|
return access.DEFAULT
|
|
|
|
|
|
def convert_args(
|
|
|
- self, request: Request, *args: Any, **kwargs: Any
|
|
|
+ self, request: HttpRequest, *args: Any, **kwargs: Any
|
|
|
) -> tuple[tuple[Any, ...], dict[str, Any]]:
|
|
|
return (args, kwargs)
|
|
|
|
|
|
- def handle(self, request: Request, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
|
+ def handle(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
|
|
- def is_auth_required(self, request: Request, *args: Any, **kwargs: Any) -> bool:
|
|
|
+ def is_auth_required(self, request: HttpRequest, *args: Any, **kwargs: Any) -> bool:
|
|
|
return self.auth_required and not (request.user.is_authenticated and request.user.is_active)
|
|
|
|
|
|
- def handle_auth_required(self, request: Request, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
|
+ def handle_auth_required(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
|
auth.initiate_login(request, next_url=request.get_full_path())
|
|
|
if "organization_slug" in kwargs:
|
|
|
redirect_to = reverse("sentry-auth-organization", args=[kwargs["organization_slug"]])
|
|
@@ -366,38 +366,40 @@ class BaseView(View, OrganizationMixin):
|
|
|
redirect_to = auth.get_login_url()
|
|
|
return self.redirect(redirect_to, headers={"X-Robots-Tag": "noindex, nofollow"})
|
|
|
|
|
|
- def is_sudo_required(self, request: Request, *args: Any, **kwargs: Any) -> bool:
|
|
|
+ def is_sudo_required(self, request: HttpRequest, *args: Any, **kwargs: Any) -> bool:
|
|
|
return self.sudo_required and not request.is_sudo()
|
|
|
|
|
|
- def handle_sudo_required(self, request: Request, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
|
+ def handle_sudo_required(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
|
return redirect_to_sudo(request.get_full_path())
|
|
|
|
|
|
- def has_permission(self, request: Request, *args: Any, **kwargs: Any) -> bool:
|
|
|
+ def has_permission(self, request: HttpRequest, *args: Any, **kwargs: Any) -> bool:
|
|
|
return True
|
|
|
|
|
|
def handle_permission_required(
|
|
|
- self, request: Request, *args: Any, **kwargs: Any
|
|
|
+ self, request: HttpRequest, *args: Any, **kwargs: Any
|
|
|
) -> HttpResponse:
|
|
|
redirect_uri = self.get_no_permission_url(request, *args, **kwargs)
|
|
|
return self.redirect(redirect_uri)
|
|
|
|
|
|
- def handle_not_2fa_compliant(self, request: Request, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
|
+ def handle_not_2fa_compliant(
|
|
|
+ self, request: HttpRequest, *args: Any, **kwargs: Any
|
|
|
+ ) -> HttpResponse:
|
|
|
redirect_uri = self.get_not_2fa_compliant_url(request, *args, **kwargs)
|
|
|
return self.redirect(redirect_uri)
|
|
|
|
|
|
- def get_no_permission_url(self, request: Request, *args: Any, **kwargs: Any) -> str:
|
|
|
+ def get_no_permission_url(self, request: HttpRequest, *args: Any, **kwargs: Any) -> str:
|
|
|
return reverse("sentry-login")
|
|
|
|
|
|
- def get_not_2fa_compliant_url(self, request: Request, *args: Any, **kwargs: Any) -> str:
|
|
|
+ def get_not_2fa_compliant_url(self, request: HttpRequest, *args: Any, **kwargs: Any) -> str:
|
|
|
return reverse("sentry-account-settings-security")
|
|
|
|
|
|
- def get_context_data(self, request: Request, **kwargs: Any) -> dict[str, Any]:
|
|
|
+ def get_context_data(self, request: HttpRequest, **kwargs: Any) -> dict[str, Any]:
|
|
|
context = csrf(request)
|
|
|
return context
|
|
|
|
|
|
def respond(
|
|
|
self, template: str, context: dict[str, Any] | None = None, status: int = 200
|
|
|
- ) -> HttpResponse:
|
|
|
+ ) -> HttpResponseBase:
|
|
|
default_context = self.default_context
|
|
|
if context:
|
|
|
default_context.update(context)
|
|
@@ -415,7 +417,7 @@ class BaseView(View, OrganizationMixin):
|
|
|
return Team.objects.get_for_user(organization=organization, user=user, with_projects=True)
|
|
|
|
|
|
def create_audit_entry(
|
|
|
- self, request: Request, transaction_id: int | None = None, **kwargs: Any
|
|
|
+ self, request: HttpRequest, transaction_id: int | None = None, **kwargs: Any
|
|
|
) -> object:
|
|
|
return create_audit_entry(request, transaction_id, audit_logger, **kwargs)
|
|
|
|
|
@@ -434,19 +436,19 @@ class AbstractOrganizationView(BaseView, abc.ABC):
|
|
|
required_scope: str | None = None
|
|
|
valid_sso_required = True
|
|
|
|
|
|
- def get_access(self, request: Request, *args: Any, **kwargs: Any) -> access.Access:
|
|
|
+ def get_access(self, request: HttpRequest, *args: Any, **kwargs: Any) -> access.Access:
|
|
|
if self.active_organization is None:
|
|
|
return access.DEFAULT
|
|
|
return access.from_request_org_and_scopes(
|
|
|
request=request, rpc_user_org_context=self.active_organization
|
|
|
)
|
|
|
|
|
|
- def get_context_data(self, request: Request, organization: RpcOrganization | Organization, **kwargs: Any) -> dict[str, Any]: # type: ignore[override]
|
|
|
+ def get_context_data(self, request: HttpRequest, organization: RpcOrganization | Organization, **kwargs: Any) -> dict[str, Any]: # type: ignore[override]
|
|
|
context = super().get_context_data(request)
|
|
|
context["organization"] = organization
|
|
|
return context
|
|
|
|
|
|
- def has_permission(self, request: Request, organization: RpcOrganization | Organization, *args: Any, **kwargs: Any) -> bool: # type: ignore[override]
|
|
|
+ def has_permission(self, request: HttpRequest, organization: RpcOrganization | Organization, *args: Any, **kwargs: Any) -> bool: # type: ignore[override]
|
|
|
if organization is None:
|
|
|
return False
|
|
|
if self.valid_sso_required:
|
|
@@ -465,7 +467,7 @@ class AbstractOrganizationView(BaseView, abc.ABC):
|
|
|
return True
|
|
|
|
|
|
def is_auth_required(
|
|
|
- self, request: Request, organization_slug: str | None = None, *args: Any, **kwargs: Any
|
|
|
+ self, request: HttpRequest, organization_slug: str | None = None, *args: Any, **kwargs: Any
|
|
|
) -> bool:
|
|
|
result = super().is_auth_required(request, *args, **kwargs)
|
|
|
if result:
|
|
@@ -492,7 +494,7 @@ class AbstractOrganizationView(BaseView, abc.ABC):
|
|
|
|
|
|
return False
|
|
|
|
|
|
- def handle_permission_required(self, request: Request, organization: Organization | RpcOrganization | None, *args: Any, **kwargs: Any) -> HttpResponse: # type: ignore[override]
|
|
|
+ def handle_permission_required(self, request: HttpRequest, organization: Organization | RpcOrganization | None, *args: Any, **kwargs: Any) -> HttpResponse: # type: ignore[override]
|
|
|
if organization and self.needs_sso(request, organization):
|
|
|
logger.info(
|
|
|
"access.must-sso",
|
|
@@ -529,7 +531,7 @@ class AbstractOrganizationView(BaseView, abc.ABC):
|
|
|
redirect_uri = self.get_no_permission_url(request, *args, **kwargs)
|
|
|
return self.redirect(redirect_uri)
|
|
|
|
|
|
- def needs_sso(self, request: Request, organization: Organization | RpcOrganization) -> bool:
|
|
|
+ def needs_sso(self, request: HttpRequest, organization: Organization | RpcOrganization) -> bool:
|
|
|
if not organization:
|
|
|
return False
|
|
|
# XXX(dcramer): this branch should really never hit
|
|
@@ -550,7 +552,7 @@ class AbstractOrganizationView(BaseView, abc.ABC):
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def convert_args(
|
|
|
- self, request: Request, organization_slug: str | None = None, *args: Any, **kwargs: Any
|
|
|
+ self, request: HttpRequest, organization_slug: str | None = None, *args: Any, **kwargs: Any
|
|
|
) -> tuple[tuple[Any, ...], dict[str, Any]]:
|
|
|
if "organization" not in kwargs:
|
|
|
kwargs["organization"] = self._get_organization()
|
|
@@ -598,7 +600,7 @@ class ProjectView(OrganizationView):
|
|
|
- project
|
|
|
"""
|
|
|
|
|
|
- def get_context_data(self, request: Request, organization: Organization, project: Project, **kwargs: Any) -> dict[str, Any]: # type: ignore[override]
|
|
|
+ def get_context_data(self, request: HttpRequest, organization: Organization, project: Project, **kwargs: Any) -> dict[str, Any]: # type: ignore[override]
|
|
|
from sentry.api.serializers import serialize
|
|
|
|
|
|
context = super().get_context_data(request, organization)
|
|
@@ -606,7 +608,7 @@ class ProjectView(OrganizationView):
|
|
|
context["processing_issues"] = serialize(project).get("processingIssues", 0)
|
|
|
return context
|
|
|
|
|
|
- def has_permission(self, request: Request, organization: Organization, project: Project, *args: Any, **kwargs: Any) -> bool: # type: ignore[override]
|
|
|
+ def has_permission(self, request: HttpRequest, organization: Organization, project: Project, *args: Any, **kwargs: Any) -> bool: # type: ignore[override]
|
|
|
if project is None:
|
|
|
return False
|
|
|
rv = super().has_permission(request, organization)
|
|
@@ -629,7 +631,7 @@ class ProjectView(OrganizationView):
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
- def convert_args(self, request: Request, organization_slug: str, project_slug: str, *args: Any, **kwargs: Any) -> tuple[tuple[Any, ...], dict[str, Any]]: # type: ignore[override]
|
|
|
+ def convert_args(self, request: HttpRequest, organization_slug: str, project_slug: str, *args: Any, **kwargs: Any) -> tuple[tuple[Any, ...], dict[str, Any]]: # type: ignore[override]
|
|
|
organization: Organization | None = None
|
|
|
active_project: Project | None = None
|
|
|
if self.active_organization:
|
|
@@ -649,7 +651,7 @@ class ProjectView(OrganizationView):
|
|
|
class AvatarPhotoView(View):
|
|
|
model: type[AvatarBase]
|
|
|
|
|
|
- def get(self, request: Request, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
|
+ def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
|
avatar_id = kwargs["avatar_id"]
|
|
|
try:
|
|
|
avatar = self.model.objects.get(ident=avatar_id)
|