plugin.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. from __future__ import annotations
  2. import functools
  3. from collections.abc import Callable
  4. from mypy.errorcodes import ATTR_DEFINED
  5. from mypy.messages import format_type
  6. from mypy.nodes import ARG_POS
  7. from mypy.plugin import AttributeContext, ClassDefContext, FunctionSigContext, Plugin
  8. from mypy.plugins.common import add_attribute_to_class
  9. from mypy.subtypes import find_member
  10. from mypy.types import (
  11. AnyType,
  12. CallableType,
  13. FunctionLike,
  14. Instance,
  15. NoneType,
  16. Type,
  17. TypeOfAny,
  18. TypeType,
  19. TypeVarType,
  20. UnionType,
  21. )
  22. def _make_using_required_str(ctx: FunctionSigContext) -> CallableType:
  23. sig = ctx.default_signature
  24. using_arg = sig.argument_by_name("using")
  25. if using_arg is None or using_arg.pos is None:
  26. ctx.api.fail("The using parameter is required", ctx.context)
  27. return sig
  28. for kind in sig.arg_kinds[: using_arg.pos]:
  29. if kind != ARG_POS:
  30. ctx.api.fail("Expected using to be the first optional", ctx.context)
  31. return sig
  32. str_type = ctx.api.named_generic_type("builtins.str", [])
  33. arg_kinds = [*sig.arg_kinds[: using_arg.pos], ARG_POS, *sig.arg_kinds[using_arg.pos + 1 :]]
  34. arg_types = [*sig.arg_types[: using_arg.pos], str_type, *sig.arg_types[using_arg.pos + 1 :]]
  35. return sig.copy_modified(arg_kinds=arg_kinds, arg_types=arg_types)
  36. def replace_transaction_atomic_sig_callback(ctx: FunctionSigContext) -> CallableType:
  37. sig = ctx.default_signature
  38. if not sig.argument_by_name("using"):
  39. # No using arg in the signature, bail
  40. return sig
  41. # We care about context managers.
  42. if not isinstance(sig.ret_type, Instance):
  43. return sig
  44. return _make_using_required_str(ctx)
  45. _FUNCTION_SIGNATURE_HOOKS = {
  46. "django.db.transaction.atomic": replace_transaction_atomic_sig_callback,
  47. "django.db.transaction.get_connection": _make_using_required_str,
  48. "django.db.transaction.on_commit": _make_using_required_str,
  49. "django.db.transaction.set_rollback": _make_using_required_str,
  50. }
  51. def _adjust_http_request_members(ctx: ClassDefContext) -> None:
  52. if ctx.cls.name == "HttpRequest":
  53. # added by sentry.api.base and sentry.web.frontend.base
  54. # TODO: idk why I can't use the real type here :/
  55. add_attribute_to_class(ctx.api, ctx.cls, "access", AnyType(TypeOfAny.explicit))
  56. # added by sentry.middleware.auth
  57. # TODO: figure out how to get the real types here
  58. add_attribute_to_class(ctx.api, ctx.cls, "auth", AnyType(TypeOfAny.explicit))
  59. # added by csp.middleware.CSPMiddleware
  60. add_attribute_to_class(ctx.api, ctx.cls, "csp_nonce", ctx.api.named_type("builtins.str"))
  61. # added by sudo.middleware.SudoMiddleware
  62. # this is slightly better than a method returning bool for overriding
  63. returns_bool = CallableType(
  64. arg_types=[],
  65. arg_kinds=[],
  66. arg_names=[],
  67. ret_type=ctx.api.named_type("builtins.bool"),
  68. fallback=ctx.api.named_type("builtins.function"),
  69. name="is_sudo",
  70. )
  71. add_attribute_to_class(ctx.api, ctx.cls, "is_sudo", returns_bool)
  72. # added by sentry.middleware.subdomain
  73. subdomain_tp = UnionType([NoneType(), ctx.api.named_type("builtins.str")])
  74. add_attribute_to_class(ctx.api, ctx.cls, "subdomain", subdomain_tp)
  75. # added by sentry.middleware.superuser
  76. # TODO: figure out how to get the real types here
  77. add_attribute_to_class(ctx.api, ctx.cls, "superuser", AnyType(TypeOfAny.explicit))
  78. def _lazy_service_wrapper_attribute(ctx: AttributeContext, *, attr: str) -> Type:
  79. # we use `Any` as the `__getattr__` return value
  80. # allow existing attributes to be returned as normal if they are not `Any`
  81. if not isinstance(ctx.default_attr_type, AnyType):
  82. return ctx.default_attr_type
  83. assert isinstance(ctx.type, Instance), ctx.type
  84. assert len(ctx.type.args) == 1, ctx.type
  85. assert isinstance(ctx.type.args[0], Instance), ctx.type
  86. generic_type = ctx.type.args[0]
  87. member = find_member(attr, generic_type, generic_type)
  88. if member is None:
  89. ctx.api.fail(
  90. f'{format_type(ctx.type, ctx.api.options)} has no attribute "{attr}"',
  91. ctx.context,
  92. code=ATTR_DEFINED,
  93. )
  94. return ctx.default_attr_type
  95. else:
  96. return member
  97. def _resolve_objects_for_typevars(ctx: AttributeContext) -> Type:
  98. # XXX: hack around python/mypy#17395
  99. # self: type[<TypeVar>]
  100. # default_attr_type: BaseManager[ConcreteTypeVarBound]
  101. if (
  102. isinstance(ctx.type, TypeType)
  103. and isinstance(ctx.type.item, TypeVarType)
  104. and isinstance(ctx.default_attr_type, Instance)
  105. and ctx.default_attr_type.type.fullname == "sentry.db.models.manager.base.BaseManager"
  106. ):
  107. tvar = ctx.type.item
  108. return ctx.default_attr_type.copy_modified(args=(tvar,))
  109. else:
  110. return ctx.default_attr_type
  111. class SentryMypyPlugin(Plugin):
  112. def get_function_signature_hook(
  113. self, fullname: str
  114. ) -> Callable[[FunctionSigContext], FunctionLike] | None:
  115. return _FUNCTION_SIGNATURE_HOOKS.get(fullname)
  116. def get_base_class_hook(self, fullname: str) -> Callable[[ClassDefContext], None] | None:
  117. # XXX: this is a hack -- I don't know if there's a better callback to modify a class
  118. if fullname == "io.BytesIO":
  119. return _adjust_http_request_members
  120. else:
  121. return None
  122. def get_class_attribute_hook(self, fullname: str) -> Callable[[AttributeContext], Type] | None:
  123. if fullname.startswith("sentry.") and fullname.endswith(".objects"):
  124. return _resolve_objects_for_typevars
  125. else:
  126. return None
  127. def get_attribute_hook(self, fullname: str) -> Callable[[AttributeContext], Type] | None:
  128. if fullname.startswith("sentry.utils.lazy_service_wrapper.LazyServiceWrapper."):
  129. _, attr = fullname.rsplit(".", 1)
  130. return functools.partial(_lazy_service_wrapper_attribute, attr=attr)
  131. else:
  132. return None
  133. def plugin(version: str) -> type[SentryMypyPlugin]:
  134. return SentryMypyPlugin