Browse Source

test(relay): Add e2e tests for security reports and inferred titles (#19830)

Merge before #19794
Markus Unterwaditzer 4 years ago
parent
commit
3edce13b63
3 changed files with 120 additions and 23 deletions
  1. 26 0
      src/sentry/testutils/relay.py
  2. 8 0
      src/sentry/utils/pytest/relay.py
  3. 86 23
      tests/relay_integration/tests.py

+ 26 - 0
src/sentry/testutils/relay.py

@@ -93,6 +93,30 @@ class RelayStoreHelper(object):
         assert event is not None
         return event
 
+    def post_and_retrieve_security_report(self, data):
+        url = self.get_relay_security_url(self.project.id, self.projectkey.public_key)
+        responses.add_passthru(url)
+
+        event_ids = {
+            event.event_id
+            for event in eventstore.get_events(eventstore.Filter(project_ids=[self.project.id]))
+        }
+
+        def has_new_event():
+            # Hack: security report endpoint does not return event ID
+            for event in eventstore.get_events(eventstore.Filter(project_ids=[self.project.id])):
+                if event.event_id not in event_ids:
+                    return event
+
+        resp = requests.post(url, json=data)
+
+        assert resp.ok
+
+        event = self.wait_for_ingest_consumer(has_new_event)
+        # check that we found it in Snuba
+        assert event
+        return event
+
     def post_and_try_retrieve_event(self, data):
         try:
             return self.post_and_retrieve_event(data)
@@ -139,6 +163,7 @@ class RelayStoreHelper(object):
         get_relay_store_url,
         get_relay_minidump_url,
         get_relay_unreal_url,
+        get_relay_security_url,
         wait_for_ingest_consumer,
     ):
         self.auth_header = get_auth_header(
@@ -151,4 +176,5 @@ class RelayStoreHelper(object):
         self.get_relay_store_url = get_relay_store_url  # noqa
         self.get_relay_minidump_url = get_relay_minidump_url  # noqa
         self.get_relay_unreal_url = get_relay_unreal_url  # noqa
+        self.get_relay_security_url = get_relay_security_url  # noqa
         self.wait_for_ingest_consumer = wait_for_ingest_consumer(settings)  # noqa

+ 8 - 0
src/sentry/utils/pytest/relay.py

@@ -145,6 +145,14 @@ def get_relay_store_url(relay_server):
     return inner
 
 
+@pytest.fixture
+def get_relay_security_url(relay_server):
+    def inner(project_id, key):
+        return "{}/api/{}/security/?sentry_key={}".format(relay_server["url"], project_id, key)
+
+    return inner
+
+
 @pytest.fixture
 def get_relay_minidump_url(relay_server):
     def inner(project_id, key):

+ 86 - 23
tests/relay_integration/tests.py

@@ -1,36 +1,99 @@
 from __future__ import absolute_import, print_function
 
-import os
-
-from sentry import eventstore
-
-from django.core.urlresolvers import reverse
-from exam import fixture
 from sentry.testutils import TransactionTestCase, RelayStoreHelper
 from sentry.testutils.helpers.datetime import iso_format, before_now
 
 
-def get_fixture_path(name):
-    return os.path.join(os.path.dirname(__file__), "fixtures", name)
+class SentryRemoteTest(RelayStoreHelper, TransactionTestCase):
+    # used to be test_ungzipped_data
+    def test_simple_data(self):
+        event_data = {"message": "hello", "timestamp": iso_format(before_now(seconds=1))}
+        event = self.post_and_retrieve_event(event_data)
+        assert event.message == "hello"
 
+    def test_csp(self):
+        event_data = {
+            "csp-report": {
+                "document-uri": "https://example.com/foo/bar",
+                "referrer": "https://www.google.com/",
+                "violated-directive": "default-src self",
+                "original-policy": "default-src self; report-uri /csp-hotline.php",
+                "blocked-uri": "http://evilhackerscripts.com",
+            }
+        }
 
-def load_fixture(name):
-    with open(get_fixture_path(name)) as fp:
-        return fp.read()
+        event = self.post_and_retrieve_security_report(event_data)
+        assert event.message == "Blocked 'default-src' from 'evilhackerscripts.com'"
 
+    def test_hpkp(self):
+        event_data = {
+            "date-time": "2014-04-06T13:00:50Z",
+            "hostname": "www.example.com",
+            "port": 443,
+            "effective-expiration-date": "2014-05-01T12:40:50Z",
+            "include-subdomains": False,
+            "served-certificate-chain": [
+                "-----BEGIN CERTIFICATE-----\n MIIEBDCCAuygBQUAMEIxCzAJBgNVBAYTAlVT\n -----END CERTIFICATE-----"
+            ],
+            "validated-certificate-chain": [
+                "-----BEGIN CERTIFICATE-----\n MIIEBDCCAuygAwIBAgIDCzAJBgNVBAYTAlVT\n -----END CERTIFICATE-----"
+            ],
+            "known-pins": [
+                'pin-sha256="d6qzRu9zOECb90Uez27xWltNsj0e1Md7GkYYkVoZWmM="',
+                'pin-sha256="E9CZ9INDbd+2eRQozYqqbQ2yXLVKB9+xcprMF+44U1g="',
+            ],
+        }
 
-class SentryRemoteTest(RelayStoreHelper, TransactionTestCase):
-    @fixture
-    def path(self):
-        return reverse("sentry-api-store")
+        event = self.post_and_retrieve_security_report(event_data)
+        assert event.message == "Public key pinning validation failed for 'www.example.com'"
+        assert event.group.title == "Public key pinning validation failed for 'www.example.com'"
 
-    def get_event(self, event_id):
-        instance = eventstore.get_event_by_id(self.project.id, event_id)
-        return instance
+    def test_expect_ct(self):
+        event_data = {
+            "expect-ct-report": {
+                "date-time": "2014-04-06T13:00:50Z",
+                "hostname": "www.example.com",
+                "port": 443,
+                "effective-expiration-date": "2014-05-01T12:40:50Z",
+                "served-certificate-chain": [
+                    "-----BEGIN CERTIFICATE-----\nABC\n-----END CERTIFICATE-----"
+                ],
+                "validated-certificate-chain": [
+                    "-----BEGIN CERTIFICATE-----\nCDE\n-----END CERTIFICATE-----"
+                ],
+                "scts": [
+                    {
+                        "version": 1,
+                        "status": "invalid",
+                        "source": "embedded",
+                        "serialized_sct": "ABCD==",
+                    }
+                ],
+            }
+        }
 
-    # used to be test_ungzipped_data
-    def test_simple_data(self):
-        event_data = {"message": "hello", "timestamp": iso_format(before_now(seconds=1))}
-        event = self.post_and_retrieve_event(event_data)
+        event = self.post_and_retrieve_security_report(event_data)
+        assert event.message == "Expect-CT failed for 'www.example.com'"
+        assert event.group.title == "Expect-CT failed for 'www.example.com'"
 
-        assert event.message == "hello"
+    def test_expect_staple(self):
+        event_data = {
+            "expect-staple-report": {
+                "date-time": "2014-04-06T13:00:50Z",
+                "hostname": "www.example.com",
+                "port": 443,
+                "response-status": "ERROR_RESPONSE",
+                "cert-status": "REVOKED",
+                "effective-expiration-date": "2014-05-01T12:40:50Z",
+                "served-certificate-chain": [
+                    "-----BEGIN CERTIFICATE-----\nABC\n-----END CERTIFICATE-----"
+                ],
+                "validated-certificate-chain": [
+                    "-----BEGIN CERTIFICATE-----\nCDE\n-----END CERTIFICATE-----"
+                ],
+            }
+        }
+
+        event = self.post_and_retrieve_security_report(event_data)
+        assert event.message == "Expect-Staple failed for 'www.example.com'"
+        assert event.group.title == "Expect-Staple failed for 'www.example.com'"