plugin.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. from __future__ import annotations
  2. import functools
  3. from collections.abc import Callable
  4. from mypy.errorcodes import ATTR_DEFINED
  5. from mypy.messages import format_type
  6. from mypy.nodes import ARG_POS
  7. from mypy.plugin import AttributeContext, ClassDefContext, FunctionSigContext, Plugin
  8. from mypy.plugins.common import add_attribute_to_class
  9. from mypy.subtypes import find_member
  10. from mypy.types import (
  11. AnyType,
  12. CallableType,
  13. FunctionLike,
  14. Instance,
  15. NoneType,
  16. Type,
  17. TypeOfAny,
  18. UnionType,
  19. )
  20. def _make_using_required_str(ctx: FunctionSigContext) -> CallableType:
  21. sig = ctx.default_signature
  22. using_arg = sig.argument_by_name("using")
  23. if using_arg is None or using_arg.pos is None:
  24. ctx.api.fail("The using parameter is required", ctx.context)
  25. return sig
  26. for kind in sig.arg_kinds[: using_arg.pos]:
  27. if kind != ARG_POS:
  28. ctx.api.fail("Expected using to be the first optional", ctx.context)
  29. return sig
  30. str_type = ctx.api.named_generic_type("builtins.str", [])
  31. arg_kinds = [*sig.arg_kinds[: using_arg.pos], ARG_POS, *sig.arg_kinds[using_arg.pos + 1 :]]
  32. arg_types = [*sig.arg_types[: using_arg.pos], str_type, *sig.arg_types[using_arg.pos + 1 :]]
  33. return sig.copy_modified(arg_kinds=arg_kinds, arg_types=arg_types)
  34. def replace_transaction_atomic_sig_callback(ctx: FunctionSigContext) -> CallableType:
  35. sig = ctx.default_signature
  36. if not sig.argument_by_name("using"):
  37. # No using arg in the signature, bail
  38. return sig
  39. # We care about context managers.
  40. if not isinstance(sig.ret_type, Instance):
  41. return sig
  42. return _make_using_required_str(ctx)
  43. _FUNCTION_SIGNATURE_HOOKS = {
  44. "django.db.transaction.atomic": replace_transaction_atomic_sig_callback,
  45. "django.db.transaction.get_connection": _make_using_required_str,
  46. "django.db.transaction.on_commit": _make_using_required_str,
  47. "django.db.transaction.set_rollback": _make_using_required_str,
  48. }
  49. def _adjust_http_request_members(ctx: ClassDefContext) -> None:
  50. if ctx.cls.name == "HttpRequest":
  51. # added by sentry.api.base and sentry.web.frontend.base
  52. # TODO: idk why I can't use the real type here :/
  53. add_attribute_to_class(ctx.api, ctx.cls, "access", AnyType(TypeOfAny.explicit))
  54. # added by sentry.middleware.auth
  55. # TODO: figure out how to get the real types here
  56. add_attribute_to_class(ctx.api, ctx.cls, "auth", AnyType(TypeOfAny.explicit))
  57. # added by csp.middleware.CSPMiddleware
  58. add_attribute_to_class(ctx.api, ctx.cls, "csp_nonce", ctx.api.named_type("builtins.str"))
  59. # added by sudo.middleware.SudoMiddleware
  60. # this is slightly better than a method returning bool for overriding
  61. returns_bool = CallableType(
  62. arg_types=[],
  63. arg_kinds=[],
  64. arg_names=[],
  65. ret_type=ctx.api.named_type("builtins.bool"),
  66. fallback=ctx.api.named_type("builtins.function"),
  67. name="is_sudo",
  68. )
  69. add_attribute_to_class(ctx.api, ctx.cls, "is_sudo", returns_bool)
  70. # added by sentry.middleware.subdomain
  71. subdomain_tp = UnionType([NoneType(), ctx.api.named_type("builtins.str")])
  72. add_attribute_to_class(ctx.api, ctx.cls, "subdomain", subdomain_tp)
  73. # added by sentry.middleware.superuser
  74. # TODO: figure out how to get the real types here
  75. add_attribute_to_class(ctx.api, ctx.cls, "superuser", AnyType(TypeOfAny.explicit))
  76. def _lazy_service_wrapper_attribute(ctx: AttributeContext, *, attr: str) -> Type:
  77. # we use `Any` as the `__getattr__` return value
  78. # allow existing attributes to be returned as normal if they are not `Any`
  79. if not isinstance(ctx.default_attr_type, AnyType):
  80. return ctx.default_attr_type
  81. assert isinstance(ctx.type, Instance), ctx.type
  82. assert len(ctx.type.args) == 1, ctx.type
  83. assert isinstance(ctx.type.args[0], Instance), ctx.type
  84. generic_type = ctx.type.args[0]
  85. member = find_member(attr, generic_type, generic_type)
  86. if member is None:
  87. ctx.api.fail(
  88. f'{format_type(ctx.type, ctx.api.options)} has no attribute "{attr}"',
  89. ctx.context,
  90. code=ATTR_DEFINED,
  91. )
  92. return ctx.default_attr_type
  93. else:
  94. return member
  95. class SentryMypyPlugin(Plugin):
  96. def get_function_signature_hook(
  97. self, fullname: str
  98. ) -> Callable[[FunctionSigContext], FunctionLike] | None:
  99. return _FUNCTION_SIGNATURE_HOOKS.get(fullname)
  100. def get_base_class_hook(self, fullname: str) -> Callable[[ClassDefContext], None] | None:
  101. # XXX: this is a hack -- I don't know if there's a better callback to modify a class
  102. if fullname == "io.BytesIO":
  103. return _adjust_http_request_members
  104. else:
  105. return None
  106. def get_attribute_hook(self, fullname: str) -> Callable[[AttributeContext], Type] | None:
  107. if fullname.startswith("sentry.utils.lazy_service_wrapper.LazyServiceWrapper."):
  108. _, attr = fullname.rsplit(".", 1)
  109. return functools.partial(_lazy_service_wrapper_attribute, attr=attr)
  110. else:
  111. return None
  112. def plugin(version: str) -> type[SentryMypyPlugin]:
  113. return SentryMypyPlugin