test_plugin.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. from functools import cached_property
  2. import responses
  3. from sentry.testutils import PluginTestCase
  4. from sentry.utils import json
  5. from sentry_plugins.segment.plugin import SegmentPlugin
  6. class SegmentPluginTest(PluginTestCase):
  7. @cached_property
  8. def plugin(self):
  9. return SegmentPlugin()
  10. def test_conf_key(self):
  11. assert self.plugin.conf_key == "segment"
  12. def test_entry_point(self):
  13. self.assertPluginInstalled("segment", self.plugin)
  14. @responses.activate
  15. def test_simple_notification(self):
  16. responses.add(responses.POST, "https://api.segment.io/v1/track")
  17. self.plugin.set_option("write_key", "secret-api-key", self.project)
  18. event = self.store_event(
  19. data={
  20. "exception": {"type": "ValueError", "value": "foo bar"},
  21. "user": {"id": "1", "email": "foo@example.com"},
  22. "type": "error",
  23. "metadata": {"type": "ValueError", "value": "foo bar"},
  24. "level": "warning",
  25. },
  26. project_id=self.project.id,
  27. )
  28. with self.options({"system.url-prefix": "http://example.com"}):
  29. self.plugin.post_process(event)
  30. request = responses.calls[0].request
  31. payload = json.loads(request.body)
  32. assert {
  33. "userId": "1",
  34. "event": "Error Captured",
  35. "context": {"library": {"name": "sentry", "version": self.plugin.version}},
  36. "properties": {
  37. "environment": "",
  38. "eventId": event.event_id,
  39. "exceptionType": "ValueError",
  40. "level": "warning",
  41. "release": "",
  42. "transaction": "",
  43. },
  44. "integration": {"name": "sentry", "version": self.plugin.version},
  45. "timestamp": event.datetime.isoformat() + "Z",
  46. } == payload