Browse Source

ref: type src/sentry/web/frontend/base.py (#41514)

a lot of the types here are probably wrong, but currently decay to `Any`
-- they'll get narrowed a bit better when `django-stubs` is in play but
this should fix about ~half of the errors I see in `django-stubs` and
make it easier to get exact types here
<!-- Describe your PR here. -->
anthony sottile 2 years ago
parent
commit
1c7b9cf01c

+ 3 - 0
mypy.ini

@@ -165,6 +165,7 @@ files = fixtures/mypy-stubs,
         src/sentry/utils/time_window.py,
         src/sentry/utils/yaml.py,
         src/sentry/web/decorators.py,
+        src/sentry/web/frontend/base.py,
         src/sentry/web/helpers.py,
         tests/sentry/eventstream/kafka/test_consumer_strategy.py,
         tests/sentry/eventstream/kafka/test_synchronized.py,
@@ -196,6 +197,8 @@ no_implicit_reexport=True
 follow_imports = skip
 [mypy-bitfield.*]
 follow_imports = silent
+[mypy-sudo.*]
+follow_imports = silent
 
 [mypy-celery.*]
 ignore_missing_imports = True

+ 1 - 1
src/sentry/auth/view.py

@@ -12,7 +12,7 @@ if TYPE_CHECKING:
     from sentry.models.organization import Organization
 
 
-class AuthView(BaseView):  # type: ignore[misc]
+class AuthView(BaseView):
     """
     A segment of Provider's auth pipeline.
 

+ 1 - 1
src/sentry/integrations/slack/views/link_identity.py

@@ -32,7 +32,7 @@ def build_linking_url(
     )
 
 
-class SlackLinkIdentityView(BaseView):  # type: ignore
+class SlackLinkIdentityView(BaseView):
     @transaction_start("SlackLinkIdentityView")
     @never_cache
     def handle(self, request: Request, signed_params: str) -> Response:

+ 1 - 1
src/sentry/integrations/slack/views/link_team.py

@@ -59,7 +59,7 @@ class SelectTeamForm(forms.Form):  # type: ignore
         self.fields["team"].widget.choices = self.fields["team"].choices
 
 
-class SlackLinkTeamView(BaseView):  # type: ignore
+class SlackLinkTeamView(BaseView):
     @transaction_start("SlackLinkTeamView")
     @never_cache
     def handle(self, request: Request, signed_params: str) -> HttpResponse:

+ 1 - 1
src/sentry/integrations/slack/views/unlink_identity.py

@@ -31,7 +31,7 @@ def build_unlinking_url(
     )
 
 
-class SlackUnlinkIdentityView(BaseView):  # type: ignore
+class SlackUnlinkIdentityView(BaseView):
     @transaction_start("SlackUnlinkIdentityView")
     @never_cache
     def handle(self, request: Request, signed_params: str) -> Response:

+ 1 - 1
src/sentry/integrations/slack/views/unlink_team.py

@@ -34,7 +34,7 @@ def build_team_unlinking_url(
     )
 
 
-class SlackUnlinkTeamView(BaseView):  # type: ignore
+class SlackUnlinkTeamView(BaseView):
     @transaction_start("SlackUnlinkIdentityView")
     @never_cache
     def handle(self, request: Request, signed_params: str) -> Response:

+ 1 - 1
src/sentry/pipeline/views/base.py

@@ -12,7 +12,7 @@ if TYPE_CHECKING:
     from sentry.pipeline.base import Pipeline
 
 
-class PipelineView(BaseView, abc.ABC):  # type: ignore
+class PipelineView(BaseView, abc.ABC):
     """
     A class implementing the PipelineView may be used in a PipelineProviders
     get_pipeline_views list.

+ 105 - 65
src/sentry/web/frontend/base.py

@@ -1,5 +1,7 @@
+from __future__ import annotations
+
 import logging
-from typing import List, NoReturn, Optional, Tuple
+from typing import Any, Mapping, Protocol
 
 from django.http import (
     HttpResponse,
@@ -20,6 +22,8 @@ from sentry.api.utils import generate_organization_url, is_member_disabled_from_
 from sentry.auth import access
 from sentry.auth.superuser import is_active_superuser
 from sentry.models import Authenticator, Organization, Project, ProjectStatus, Team, TeamStatus
+from sentry.models.avatars.base import AvatarBase
+from sentry.models.user import User
 from sentry.services.hybrid_cloud.organization import (
     ApiOrganization,
     ApiUserOrganizationContext,
@@ -38,15 +42,26 @@ logger = logging.getLogger(__name__)
 audit_logger = logging.getLogger("sentry.audit.ui")
 
 
+class _HasRespond(Protocol):
+    active_organization: ApiUserOrganizationContext | None
+
+    def respond(
+        self, template: str, context: dict[str, Any] | None = None, status: int = 200
+    ) -> HttpResponse:
+        ...
+
+
 class OrganizationMixin:
     # This attribute will only be set once determine_active_organization is called.  Subclasses should likely invoke
     # that method, passing along the organization_slug context that might exist (or might not).
-    active_organization: Optional[ApiUserOrganizationContext]
+    active_organization: ApiUserOrganizationContext | None
 
     # TODO(dcramer): move the implicit organization logic into its own class
     # as it's only used in a single location and over complicates the rest of
     # the code
-    def determine_active_organization(self, request: Request, organization_slug=None) -> NoReturn:
+    def determine_active_organization(
+        self, request: Request, organization_slug: str | None = None
+    ) -> None:
         """
         Using the current request and potentially optional organization_slug, 'determines'
         the current session for this mixin object's scope, placing it into the active_organization attribute.
@@ -84,14 +99,13 @@ class OrganizationMixin:
         self.active_organization = active_organization
 
     def _lookup_organizations(
-        self, is_implicit: bool, organization_slug: Optional[str], request: Request
-    ) -> Tuple[Optional[ApiUserOrganizationContext], Optional[ApiOrganization]]:
-        active_organization: Optional[ApiUserOrganizationContext] = self._try_superuser_org_lookup(
+        self, is_implicit: bool, organization_slug: str | None, request: Request
+    ) -> tuple[ApiUserOrganizationContext | None, ApiOrganization | None]:
+        active_organization: ApiUserOrganizationContext | None = self._try_superuser_org_lookup(
             organization_slug, request
         )
-        backup_organization: Optional[ApiOrganization] = None
+        backup_organization: ApiOrganization | None = None
         if active_organization is None:
-            organizations: List[ApiOrganization]
             organizations = organization_service.get_organizations(
                 user_id=request.user.id, scope=None, only_visible=True
             )
@@ -108,11 +122,11 @@ class OrganizationMixin:
         self,
         is_implicit: bool,
         organization_slug: str,
-        organizations: List[ApiOrganization],
+        organizations: list[ApiOrganization],
         request: Request,
-    ) -> Optional[ApiUserOrganizationContext]:
+    ) -> ApiUserOrganizationContext | None:
         try:
-            backup_org: Optional[ApiOrganization] = next(
+            backup_org: ApiOrganization | None = next(
                 o for o in organizations if o.slug == organization_slug
             )
         except StopIteration:
@@ -133,9 +147,9 @@ class OrganizationMixin:
         return None
 
     def _try_superuser_org_lookup(
-        self, organization_slug: str, request: Request
-    ) -> Optional[ApiUserOrganizationContext]:
-        active_organization: Optional[ApiUserOrganizationContext] = None
+        self, organization_slug: str | None, request: Request
+    ) -> ApiUserOrganizationContext | None:
+        active_organization: ApiUserOrganizationContext | None = None
         if organization_slug is not None:
             if is_active_superuser(request):
                 active_organization = organization_service.get_organization_by_slug(
@@ -143,24 +157,28 @@ class OrganizationMixin:
                 )
         return active_organization
 
-    def _find_implicit_slug(self, request):
+    def _find_implicit_slug(self, request: Request) -> str | None:
         organization_slug = request.session.get("activeorg")
         if request.subdomain is not None and request.subdomain != organization_slug:
             # Customer domain is being used, set the subdomain as the requesting org slug.
             organization_slug = request.subdomain
-        return organization_slug
+        return organization_slug  # type: ignore[no-any-return]
 
-    def is_not_2fa_compliant(self, request: Request, organization):
+    def is_not_2fa_compliant(self, request: Request, organization: ApiOrganization) -> bool:
         return (
             organization.flags.require_2fa
             and not Authenticator.objects.user_has_2fa(request.user)
             and not is_active_superuser(request)
         )
 
-    def is_member_disabled_from_limit(self, request: Request, organization):
+    def is_member_disabled_from_limit(
+        self, request: Request, organization: ApiOrganization
+    ) -> bool:
         return is_member_disabled_from_limit(request, organization)
 
-    def get_active_team(self, request: Request, organization, team_slug):
+    def get_active_team(
+        self, request: Request, organization: ApiOrganization, team_slug: str
+    ) -> Team | None:
         """
         Returns the currently selected team for the request or None
         if no match.
@@ -175,7 +193,9 @@ class OrganizationMixin:
 
         return team
 
-    def get_active_project(self, request: Request, organization, project_slug):
+    def get_active_project(
+        self, request: Request, organization: ApiOrganization, project_slug: str
+    ) -> Project | None:
         try:
             project = Project.objects.get(slug=project_slug, organization=organization)
         except Project.DoesNotExist:
@@ -186,7 +206,7 @@ class OrganizationMixin:
 
         return project
 
-    def redirect_to_org(self, request: Request):
+    def redirect_to_org(self: _HasRespond, request: Request) -> HttpResponse:
         from sentry import features
 
         using_customer_domain = request and is_using_customer_domain(request)
@@ -221,14 +241,21 @@ class OrganizationMixin:
         return HttpResponseRedirect(url)
 
 
-class BaseView(View, OrganizationMixin):
+class BaseView(View, OrganizationMixin):  # type: ignore[misc]
     auth_required = True
     # TODO(dcramer): change sudo so it can be required only on POST
     sudo_required = False
 
     csrf_protect = True
 
-    def __init__(self, auth_required=None, sudo_required=None, csrf_protect=None, *args, **kwargs):
+    def __init__(
+        self,
+        auth_required: bool | None = None,
+        sudo_required: bool | None = None,
+        csrf_protect: bool | None = None,
+        *args: Any,
+        **kwargs: Any,
+    ) -> None:
         if auth_required is not None:
             self.auth_required = auth_required
         if sudo_required is not None:
@@ -237,8 +264,8 @@ class BaseView(View, OrganizationMixin):
             self.csrf_protect = csrf_protect
         super().__init__(*args, **kwargs)
 
-    @csrf_exempt
-    def dispatch(self, request, *args, **kwargs):
+    @csrf_exempt  # type: ignore[misc]
+    def dispatch(self, request: Request, *args: Any, **kwargs: Any) -> Response:
         """
         A note on the CSRF protection process.
 
@@ -294,23 +321,25 @@ class BaseView(View, OrganizationMixin):
 
         return self.handle(request, *args, **kwargs)
 
-    def test_csrf(self, request: Request):
+    def test_csrf(self, request: Request) -> HttpResponse:
         middleware = CsrfViewMiddleware()
         return middleware.process_view(request, self.dispatch, [request], {})
 
-    def get_access(self, request: Request, *args, **kwargs):
+    def get_access(self, request: Request, *args: Any, **kwargs: Any) -> access.Access:
         return access.DEFAULT
 
-    def convert_args(self, request: Request, *args, **kwargs):
+    def convert_args(
+        self, request: Request, *args: Any, **kwargs: Any
+    ) -> tuple[tuple[Any, ...], dict[str, Any]]:
         return (args, kwargs)
 
-    def handle(self, request: Request, *args, **kwargs) -> Response:
+    def handle(self, request: Request, *args: Any, **kwargs: Any) -> Response:
         return super().dispatch(request, *args, **kwargs)
 
-    def is_auth_required(self, request: Request, *args, **kwargs):
+    def is_auth_required(self, request: Request, *args: Any, **kwargs: Any) -> bool:
         return self.auth_required and not (request.user.is_authenticated and request.user.is_active)
 
-    def handle_auth_required(self, request: Request, *args, **kwargs):
+    def handle_auth_required(self, request: Request, *args: Any, **kwargs: Any) -> HttpResponse:
         auth.initiate_login(request, next_url=request.get_full_path())
         if "organization_slug" in kwargs:
             redirect_to = reverse("sentry-auth-organization", args=[kwargs["organization_slug"]])
@@ -318,54 +347,60 @@ class BaseView(View, OrganizationMixin):
             redirect_to = auth.get_login_url()
         return self.redirect(redirect_to, headers={"X-Robots-Tag": "noindex, nofollow"})
 
-    def is_sudo_required(self, request: Request, *args, **kwargs):
+    def is_sudo_required(self, request: Request, *args: Any, **kwargs: Any) -> bool:
         return self.sudo_required and not request.is_sudo()
 
-    def handle_sudo_required(self, request: Request, *args, **kwargs):
+    def handle_sudo_required(self, request: Request, *args: Any, **kwargs: Any) -> HttpResponse:
         return redirect_to_sudo(request.get_full_path())
 
-    def has_permission(self, request: Request, *args, **kwargs):
+    def has_permission(self, request: Request, *args: Any, **kwargs: Any) -> bool:
         return True
 
-    def handle_permission_required(self, request: Request, *args, **kwargs):
+    def handle_permission_required(
+        self, request: Request, *args: Any, **kwargs: Any
+    ) -> HttpResponse:
         redirect_uri = self.get_no_permission_url(request, *args, **kwargs)
         return self.redirect(redirect_uri)
 
-    def handle_not_2fa_compliant(self, request: Request, *args, **kwargs):
+    def handle_not_2fa_compliant(self, request: Request, *args: Any, **kwargs: Any) -> HttpResponse:
         redirect_uri = self.get_not_2fa_compliant_url(request, *args, **kwargs)
         return self.redirect(redirect_uri)
 
-    def get_no_permission_url(self, request: Request, *args, **kwargs):
-        return reverse("sentry-login")
+    def get_no_permission_url(self, request: Request, *args: Any, **kwargs: Any) -> str:
+        return reverse("sentry-login")  # type: ignore[no-any-return]
 
-    def get_not_2fa_compliant_url(self, request: Request, *args, **kwargs):
-        return reverse("sentry-account-settings-security")
+    def get_not_2fa_compliant_url(self, request: Request, *args: Any, **kwargs: Any) -> str:
+        return reverse("sentry-account-settings-security")  # type: ignore[no-any-return]
 
-    def get_context_data(self, request: Request, **kwargs):
+    def get_context_data(self, request: Request, **kwargs: Any) -> dict[str, Any]:
         context = csrf(request)
-        return context
+        return context  # type: ignore[no-any-return]
 
-    def respond(self, template, context=None, status=200):
+    def respond(
+        self, template: str, context: dict[str, Any] | None = None, status: int = 200
+    ) -> HttpResponse:
         default_context = self.default_context
         if context:
             default_context.update(context)
 
         return render_to_response(template, default_context, self.request, status=status)
 
-    def redirect(self, url, headers=None):
+    def redirect(self, url: str, headers: Mapping[str, str] | None = None) -> HttpResponse:
         res = HttpResponseRedirect(url)
         if headers:
             for k, v in headers.items():
                 res[k] = v
         return res
 
-    def get_team_list(self, user, organization):
-        return Team.objects.get_for_user(organization=organization, user=user, with_projects=True)
+    def get_team_list(self, user: User, organization: Organization) -> list[Team]:
+        return Team.objects.get_for_user(organization=organization, user=user, with_projects=True)  # type: ignore[no-any-return]
 
-    def create_audit_entry(self, request: Request, transaction_id=None, **kwargs):
+    def create_audit_entry(
+        self, request: Request, transaction_id: int | None = None, **kwargs: Any
+    ) -> object:
         return create_audit_entry(request, transaction_id, audit_logger, **kwargs)
 
-    def handle_disabled_member(self, organization):
+    def handle_disabled_member(self, organization: Organization) -> HttpResponse:
         redirect_uri = reverse("sentry-organization-disabled-member", args=[organization.slug])
         return self.redirect(redirect_uri)
 
@@ -378,22 +413,22 @@ class OrganizationView(BaseView):
     resulting dispatch.
     """
 
-    required_scope = None
+    required_scope: str | None = None
     valid_sso_required = True
 
-    def get_access(self, request: Request, organization, *args, **kwargs):
+    def get_access(self, request: Request, organization: Organization, *args: Any, **kwargs: Any) -> access.Access:  # type: ignore[override]
         if organization is None:
             return access.DEFAULT
         return access.from_request(request, organization)
 
-    def get_context_data(self, request: Request, organization, **kwargs):
+    def get_context_data(self, request: Request, organization: Organization, **kwargs: Any) -> dict[str, Any]:  # type: ignore[override]
         context = super().get_context_data(request)
         context["organization"] = organization
         context["TEAM_LIST"] = self.get_team_list(request.user, organization)
         context["ACCESS"] = request.access.to_django_context()
         return context
 
-    def has_permission(self, request: Request, organization, *args, **kwargs):
+    def has_permission(self, request: Request, organization: Organization, *args: Any, **kwargs: Any) -> bool:  # type: ignore[override]
         if organization is None:
             return False
         if self.valid_sso_required:
@@ -411,7 +446,9 @@ class OrganizationView(BaseView):
             return False
         return True
 
-    def is_auth_required(self, request: Request, organization_slug=None, *args, **kwargs):
+    def is_auth_required(
+        self, request: Request, organization_slug: str | None = None, *args: Any, **kwargs: Any
+    ) -> bool:
         result = super().is_auth_required(request, *args, **kwargs)
         if result:
             return result
@@ -434,7 +471,7 @@ class OrganizationView(BaseView):
 
         return False
 
-    def handle_permission_required(self, request: Request, organization, *args, **kwargs):
+    def handle_permission_required(self, request: Request, organization: Organization, *args: Any, **kwargs: Any) -> HttpResponse:  # type: ignore[override]
         if self.needs_sso(request, organization):
             logger.info(
                 "access.must-sso",
@@ -456,7 +493,7 @@ class OrganizationView(BaseView):
             redirect_uri = self.get_no_permission_url(request, *args, **kwargs)
         return self.redirect(redirect_uri)
 
-    def needs_sso(self, request: Request, organization):
+    def needs_sso(self, request: Request, organization: Organization) -> bool:
         if not organization:
             return False
         # XXX(dcramer): this branch should really never hit
@@ -472,13 +509,16 @@ class OrganizationView(BaseView):
             return True
         return False
 
-    def convert_args(self, request: Request, organization_slug=None, *args, **kwargs):
+    def convert_args(
+        self, request: Request, organization_slug: str | None = None, *args: Any, **kwargs: Any
+    ) -> tuple[tuple[Any, ...], dict[str, Any]]:
         # TODO:  Extract separate view base classes based on control vs region / monolith,
         # with distinct convert_args implementation.
         if SiloMode.get_current_mode() == SiloMode.CONTROL:
+            assert self.active_organization is not None
             kwargs["organization"] = self.active_organization.organization
         else:
-            organization: Optional[Organization] = None
+            organization: Organization | None = None
             if self.active_organization:
                 for org in Organization.objects.filter(id=self.active_organization.organization.id):
                     organization = org
@@ -499,13 +539,13 @@ class ProjectView(OrganizationView):
     - project
     """
 
-    def get_context_data(self, request: Request, organization, project, **kwargs):
+    def get_context_data(self, request: Request, organization: Organization, project: Project, **kwargs: Any) -> dict[str, Any]:  # type: ignore[override]
         context = super().get_context_data(request, organization)
         context["project"] = project
         context["processing_issues"] = serialize(project).get("processingIssues", 0)
         return context
 
-    def has_permission(self, request: Request, organization, project, *args, **kwargs):
+    def has_permission(self, request: Request, organization: Organization, project: Project, *args: Any, **kwargs: Any) -> bool:  # type: ignore[override]
         if project is None:
             return False
         rv = super().has_permission(request, organization)
@@ -528,9 +568,9 @@ class ProjectView(OrganizationView):
             return False
         return True
 
-    def convert_args(self, request: Request, organization_slug, project_slug, *args, **kwargs):
-        organization: Optional[Organization] = None
-        active_project: Optional[Project] = None
+    def convert_args(self, request: Request, organization_slug: str, project_slug: str, *args: Any, **kwargs: Any) -> tuple[tuple[Any, ...], dict[str, Any]]:  # type: ignore[override]
+        organization: Organization | None = None
+        active_project: Project | None = None
         if self.active_organization:
             for org in Organization.objects.filter(id=self.active_organization.organization.id):
                 organization = org
@@ -548,10 +588,10 @@ class ProjectView(OrganizationView):
         return (args, kwargs)
 
 
-class AvatarPhotoView(View):
-    model = None
+class AvatarPhotoView(View):  # type: ignore[misc]
+    model: type[AvatarBase]
 
-    def get(self, request: Request, *args, **kwargs) -> Response:
+    def get(self, request: Request, *args: Any, **kwargs: Any) -> Response:
         avatar_id = kwargs["avatar_id"]
         try:
             avatar = self.model.objects.get(ident=avatar_id)

+ 3 - 1
src/sudo/views.py

@@ -5,6 +5,8 @@ sudo.views
 :copyright: (c) 2020 by Matt Robenolt.
 :license: BSD, see LICENSE for more details.
 """
+from __future__ import annotations
+
 from urllib.parse import urlparse, urlunparse
 
 from django.contrib.auth.decorators import login_required
@@ -81,7 +83,7 @@ def sudo(request, **kwargs):
     return SudoView(**kwargs).dispatch(request)
 
 
-def redirect_to_sudo(next_url, sudo_url=None):
+def redirect_to_sudo(next_url: str, sudo_url: str | None = None) -> HttpResponseRedirect:
     """
     Redirects the user to the login page, passing the given 'next' page
     """