test_message_filters.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. from sentry.ingest.inbound_filters import (
  2. _browser_extensions_filter,
  3. _legacy_browsers_filter,
  4. _localhost_filter,
  5. _web_crawlers_filter,
  6. )
  7. from sentry.models.options.project_option import ProjectOption
  8. from sentry.testutils.cases import TransactionTestCase
  9. from sentry.testutils.relay import RelayStoreHelper
  10. from sentry.testutils.skips import requires_kafka
  11. from sentry.utils.safe import set_path
  12. pytestmark = [requires_kafka]
  13. class FilterTests(RelayStoreHelper, TransactionTestCase):
  14. def _get_message(self):
  15. return {}
  16. def _set_filter_state(self, flt, state):
  17. ProjectOption.objects.set_value(project=self.project, key=f"filters:{flt.id}", value=state)
  18. def test_should_not_filter_simple_messages(self):
  19. # baseline test (so we know everything works as expected)
  20. message = self._get_message()
  21. self.post_and_retrieve_event(message)
  22. def _get_message_with_bad_ip(self):
  23. message = self._get_message()
  24. set_path(message, "user", "ip_address", value="127.0.0.1")
  25. return message
  26. def test_should_filter_local_ip_addresses_when_enabled(self):
  27. self._set_filter_state(_localhost_filter, "1")
  28. message = self._get_message_with_bad_ip()
  29. event = self.post_and_try_retrieve_event(message)
  30. assert event is None
  31. def test_should_not_filter_bad_ip_addresses_when_disabled(self):
  32. self._set_filter_state(_localhost_filter, "0")
  33. message = self._get_message_with_bad_ip()
  34. self.post_and_retrieve_event(message)
  35. def _get_message_with_bad_extension(self):
  36. message = self._get_message()
  37. set_path(message, "platform", value="javascript")
  38. set_path(
  39. message,
  40. "exception",
  41. value={"values": [{"type": "Error", "value": "http://loading.retry.widdit.com/"}]},
  42. )
  43. return message
  44. def test_should_filter_browser_extensions_when_enabled(self):
  45. self._set_filter_state(_browser_extensions_filter, "1")
  46. message = self._get_message_with_bad_extension()
  47. event = self.post_and_try_retrieve_event(message)
  48. assert event is None
  49. def test_should_not_filter_browser_extensions_when_disabled(self):
  50. self._set_filter_state(_browser_extensions_filter, "0")
  51. message = self._get_message_with_bad_extension()
  52. self.post_and_retrieve_event(message)
  53. def _get_message_from_webcrawler(self):
  54. message = self._get_message()
  55. set_path(
  56. message,
  57. "request",
  58. value={
  59. "url": "http://example.com",
  60. "method": "GET",
  61. "headers": [["User-Agent", "Mediapartners-Google"]],
  62. },
  63. )
  64. return message
  65. def test_should_filter_web_crawlers_when_enabled(self):
  66. self._set_filter_state(_web_crawlers_filter, "1")
  67. message = self._get_message_from_webcrawler()
  68. event = self.post_and_try_retrieve_event(message)
  69. assert event is None
  70. def test_should_not_filter_web_crawlers_when_disabled(self):
  71. self._set_filter_state(_web_crawlers_filter, "0")
  72. message = self._get_message_from_webcrawler()
  73. self.post_and_retrieve_event(message)
  74. def _get_message_from_legacy_browser(self):
  75. ie_5_user_agent = (
  76. "Mozilla/4.0 (compatible; MSIE 5.50; Windows NT; SiteKiosk 4.9; SiteCoach 1.0)"
  77. )
  78. message = self._get_message()
  79. set_path(message, "platform", value="javascript")
  80. set_path(
  81. message,
  82. "request",
  83. value={
  84. "url": "http://example.com",
  85. "method": "GET",
  86. "headers": [["User-Agent", ie_5_user_agent]],
  87. },
  88. )
  89. return message
  90. def test_should_filter_legacy_browsers(self):
  91. self._set_filter_state(_legacy_browsers_filter, "1")
  92. message = self._get_message_from_legacy_browser()
  93. event = self.post_and_try_retrieve_event(message)
  94. assert event is None
  95. def test_should_not_filter_legacy_browsers_when_disabled(self):
  96. self._set_filter_state(_legacy_browsers_filter, "0")
  97. message = self._get_message_from_legacy_browser()
  98. self.post_and_retrieve_event(message)