base.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from __future__ import annotations
  2. import logging
  3. import sys
  4. from typing import Any, NoReturn
  5. import sentry_plugins
  6. from sentry.exceptions import InvalidIdentity, PluginError
  7. from sentry.shared_integrations.constants import (
  8. ERR_INTERNAL,
  9. ERR_UNAUTHORIZED,
  10. ERR_UNSUPPORTED_RESPONSE_TYPE,
  11. )
  12. from sentry.shared_integrations.exceptions import (
  13. ApiError,
  14. ApiHostError,
  15. ApiUnauthorized,
  16. UnsupportedResponseType,
  17. )
  18. class CorePluginMixin:
  19. author: str | None = "Sentry Team"
  20. author_url: str | None = "https://github.com/getsentry/sentry"
  21. version: str | None = sentry_plugins.VERSION
  22. resource_links = [
  23. ("Report Issue", "https://github.com/getsentry/sentry/issues"),
  24. ("View Source", "https://github.com/getsentry/sentry/tree/master/src/sentry_plugins"),
  25. ]
  26. # HACK(dcramer): work around MRO issue with plugin metaclass
  27. logger: logging.Logger | None = None
  28. # TODO(dcramer): The following is a possible "better implementation" of the
  29. # core issue implementation, though it would need a compat layer to push
  30. # it upstream
  31. def error_message_from_json(self, data):
  32. return data.get("message", "unknown error")
  33. def message_from_error(self, exc):
  34. if isinstance(exc, ApiUnauthorized):
  35. return ERR_UNAUTHORIZED
  36. elif isinstance(exc, ApiHostError):
  37. return exc.text
  38. elif isinstance(exc, UnsupportedResponseType):
  39. return ERR_UNSUPPORTED_RESPONSE_TYPE.format(content_type=exc.content_type)
  40. elif isinstance(exc, ApiError):
  41. if exc.json:
  42. msg = self.error_message_from_json(exc.json) or "unknown error"
  43. else:
  44. msg = getattr(exc, "text", "unknown error")
  45. return f"Error Communicating with {self.title} (HTTP {exc.code}): {msg}"
  46. else:
  47. return ERR_INTERNAL
  48. def raise_error(self, exc: BaseException, identity: Any = None) -> NoReturn:
  49. if isinstance(exc, ApiUnauthorized):
  50. raise InvalidIdentity(self.message_from_error(exc), identity=identity).with_traceback(
  51. sys.exc_info()[2]
  52. )
  53. elif isinstance(exc, ApiError):
  54. raise PluginError(self.message_from_error(exc)).with_traceback(sys.exc_info()[2])
  55. elif isinstance(exc, PluginError):
  56. raise
  57. else:
  58. self.logger.exception(str(exc))
  59. raise PluginError(self.message_from_error(exc)).with_traceback(sys.exc_info()[2])