Browse Source

feat(processing): Recap Servers Polling Support (#51789)

ref: https://github.com/getsentry/team-processing/issues/70
Kamil Ogórek 1 year ago
parent
commit
e3bc4e5cd7

+ 45 - 0
src/sentry/api/endpoints/project_details.py

@@ -45,6 +45,11 @@ from sentry.models import (
 from sentry.notifications.types import NotificationSettingTypes
 from sentry.notifications.utils import has_alert_integration
 from sentry.notifications.utils.legacy_mappings import get_option_value_from_boolean
+from sentry.tasks.recap_servers import (
+    RECAP_SERVER_TOKEN_OPTION,
+    RECAP_SERVER_URL_OPTION,
+    poll_project_recap_server,
+)
 from sentry.types.integrations import ExternalProviders
 from sentry.utils import json
 
@@ -127,6 +132,8 @@ class ProjectAdminSerializer(ProjectMemberSerializer):
     performanceIssueCreationRate = serializers.FloatField(required=False, min_value=0, max_value=1)
     performanceIssueCreationThroughPlatform = serializers.BooleanField(required=False)
     performanceIssueSendToPlatform = serializers.BooleanField(required=False)
+    recapServerUrl = serializers.URLField(required=False, allow_blank=True, allow_null=True)
+    recapServerToken = serializers.CharField(required=False, allow_blank=True, allow_null=True)
 
     def validate(self, data):
         max_delay = (
@@ -320,6 +327,32 @@ class ProjectAdminSerializer(ProjectMemberSerializer):
             raise serializers.ValidationError("List of sensitive fields is too long.")
         return value
 
+    def validate_recapServerUrl(self, value):
+        from sentry import features
+
+        project = self.context["project"]
+
+        # Adding recapServerUrl is only allowed if recap server polling is enabled.
+        has_recap_server_enabled = features.has("projects:recap-server", project)
+
+        if not has_recap_server_enabled:
+            raise serializers.ValidationError("Project is not allowed to set recap server url")
+
+        return value
+
+    def validate_recapServerToken(self, value):
+        from sentry import features
+
+        project = self.context["project"]
+
+        # Adding recapServerToken is only allowed if recap server polling is enabled.
+        has_recap_server_enabled = features.has("projects:recap-server", project)
+
+        if not has_recap_server_enabled:
+            raise serializers.ValidationError("Project is not allowed to set recap server token")
+
+        return value
+
 
 class RelaxedProjectPermission(ProjectPermission):
     scope_map = {
@@ -487,6 +520,18 @@ class ProjectDetailsEndpoint(ProjectEndpoint):
         elif result.get("isBookmarked") is False:
             ProjectBookmark.objects.filter(project_id=project.id, user_id=request.user.id).delete()
 
+        if result.get("recapServerUrl") is not None:
+            if result["recapServerUrl"] == "":
+                project.delete_option(RECAP_SERVER_URL_OPTION)
+            elif project.get_option(RECAP_SERVER_URL_OPTION) != result["recapServerUrl"]:
+                project.update_option(RECAP_SERVER_URL_OPTION, result["recapServerUrl"])
+                poll_project_recap_server.delay(project.id)
+        if result.get("recapServerToken") is not None:
+            if result["recapServerToken"] == "":
+                project.delete_option(RECAP_SERVER_TOKEN_OPTION)
+            elif project.get_option(RECAP_SERVER_TOKEN_OPTION) != result["recapServerToken"]:
+                project.update_option(RECAP_SERVER_TOKEN_OPTION, result["recapServerToken"])
+                poll_project_recap_server.delay(project.id)
         if result.get("digestsMinDelay"):
             project.update_option("digests:mail:minimum_delay", result["digestsMinDelay"])
         if result.get("digestsMaxDelay"):

+ 1 - 0
src/sentry/api/serializers/models/project.py

@@ -895,6 +895,7 @@ class DetailedProjectSerializer(ProjectWithTeamSerializer):
                 "dataScrubber": bool(attrs["options"].get("sentry:scrub_data", True)),
                 "dataScrubberDefaults": bool(attrs["options"].get("sentry:scrub_defaults", True)),
                 "safeFields": attrs["options"].get("sentry:safe_fields", []),
+                "recapServerUrl": attrs["options"].get("sentry:recap_server_url"),
                 "storeCrashReports": convert_crashreport_count(
                     attrs["options"].get("sentry:store_crash_reports"), allow_none=True
                 ),

+ 10 - 0
src/sentry/conf/server.py

@@ -708,6 +708,7 @@ CELERY_IMPORTS = (
     "sentry.tasks.ping",
     "sentry.tasks.post_process",
     "sentry.tasks.process_buffer",
+    "sentry.tasks.recap_servers",
     "sentry.tasks.relay",
     "sentry.tasks.release_registry",
     "sentry.tasks.weekly_reports",
@@ -851,6 +852,7 @@ CELERY_QUEUES_REGION = [
     Queue("auto_enable_codecov", routing_key="auto_enable_codecov"),
     Queue("weekly_escalating_forecast", routing_key="weekly_escalating_forecast"),
     Queue("auto_transition_issue_states", routing_key="auto_transition_issue_states"),
+    Queue("recap_servers", routing_key="recap_servers"),
 ]
 
 from celery.schedules import crontab
@@ -1097,6 +1099,12 @@ CELERYBEAT_SCHEDULE_REGION = {
         "task": "sentry.tasks.integrations.github_comment_reactions",
         "schedule": crontab(hour=16),  # 9:00 PDT, 12:00 EDT, 16:00 UTC
     },
+    "poll_recap_servers": {
+        "task": "sentry.tasks.poll_recap_servers",
+        # Run every 1 minute
+        "schedule": crontab(minute="*/1"),
+        "options": {"expires": 60},
+    },
 }
 
 # Assign the configuration keys celery uses based on our silo mode.
@@ -1662,6 +1670,8 @@ SENTRY_FEATURES = {
     "projects:race-free-group-creation": True,
     # Enable functionality for rate-limiting events on projects.
     "projects:rate-limits": True,
+    # Enable functionality for recap server polling.
+    "projects:recap-server": False,
     # Enable functionality to trigger service hooks upon event ingestion.
     "projects:servicehooks": False,
     # Enable suspect resolutions feature

+ 1 - 0
src/sentry/features/__init__.py

@@ -267,6 +267,7 @@ default_manager.add("projects:discard-groups", ProjectFeature, FeatureHandlerStr
 default_manager.add("projects:minidump", ProjectFeature, FeatureHandlerStrategy.INTERNAL)
 default_manager.add("projects:race-free-group-creation", ProjectFeature, FeatureHandlerStrategy.INTERNAL)
 default_manager.add("projects:rate-limits", ProjectFeature, FeatureHandlerStrategy.INTERNAL)
+default_manager.add("projects:recap-server", ProjectFeature, FeatureHandlerStrategy.REMOTE)
 default_manager.add("projects:servicehooks", ProjectFeature, FeatureHandlerStrategy.INTERNAL)
 default_manager.add("projects:similarity-indexing", ProjectFeature, FeatureHandlerStrategy.INTERNAL)
 default_manager.add("projects:similarity-view", ProjectFeature, FeatureHandlerStrategy.INTERNAL)

+ 2 - 0
src/sentry/models/options/project_option.py

@@ -34,6 +34,8 @@ OPTION_KEYS = frozenset(
         "sentry:releases",
         "sentry:error_messages",
         "sentry:scrape_javascript",
+        "sentry:recap_server_url",
+        "sentry:recap_server_token",
         "sentry:token",
         "sentry:token_header",
         "sentry:verify_ssl",

+ 217 - 0
src/sentry/tasks/recap_servers.py

@@ -0,0 +1,217 @@
+import logging
+import urllib.parse
+import uuid
+from typing import Any, Dict
+
+from sentry import features, http, options
+from sentry.datascrubbing import scrub_data
+from sentry.event_manager import EventManager
+from sentry.models import Project, ProjectOption
+from sentry.tasks.base import instrumented_task
+from sentry.utils import json
+from sentry.utils.safe import safe_execute
+
+# TODO(recap): Add monitor check-in to make sure that polling works as expected.
+
+# NOTE: Currently there's an assumption that we won't be serving thousands of projects using this feature.
+# If that changes in the future, we should add a timing metrics to the task below and make sure to add
+# appropriate alerts for Sentry in case the transaction's duration takes significant time (~>30s).
+
+# NOTE: Should we restore `RECAP_SERVER_LATEST_ID` to 0 when recap server url changes?
+# Preferably we'd keep track of server_identity<->latest_id mappings in the future.
+
+# NOTE: Instead of using "legacy" `eventstore`, we can think about going through Relay, using project_key
+# (see: sentry/utils/sdk.py) and mimick sending data as a regular SDK event payload.
+
+
+RECAP_SERVER_URL_OPTION = "sentry:recap_server_url"
+RECAP_SERVER_TOKEN_OPTION = "sentry:recap_server_token"
+RECAP_SERVER_LATEST_ID = "sentry:recap_server_poll_id"
+
+logger = logging.getLogger(__name__)
+
+
+# TODO(recap): Add feature flag?
+@instrumented_task(
+    name="sentry.tasks.poll_recap_servers",
+    queue="recap_servers",
+)
+def poll_recap_servers(**kwargs):
+    project_ids = (
+        ProjectOption.objects.filter(key=RECAP_SERVER_URL_OPTION)
+        .exclude(value__isnull=True)
+        .values_list("project_id", flat=True)
+    )
+
+    for project_id in project_ids:
+        poll_project_recap_server.delay(project_id)
+
+
+@instrumented_task(
+    name="sentry.tasks.poll_project_recap_server",
+    queue="recap_servers",
+)
+def poll_project_recap_server(project_id: int, **kwargs) -> None:
+    try:
+        project = Project.objects.get(id=project_id)
+    except Project.DoesNotExist:
+        logger.warning("Polled project do not exist", extra={"project_id": project_id})
+        return
+
+    if not features.has("projects:recap-server", project):
+        logger.info(
+            "Recap server polling feature is not enabled for a given project",
+            extra={"project_id": project_id},
+        )
+        return
+
+    recap_server_url = project.get_option(RECAP_SERVER_URL_OPTION)
+    # Just a guard in case someone removes recap url in the exact moment we trigger polling task
+    if recap_server_url is None:
+        logger.warning(
+            "Polled project has no recap server url configured", extra={"project": project}
+        )
+        return
+
+    latest_id = project.get_option(RECAP_SERVER_LATEST_ID, 0)
+    url = recap_server_url.strip().rstrip("/") + "/rest/v1/crashes;sort=id:ascending"
+
+    # For initial query, we limit number of crashes to 1_000 items, which is the default of Recap Server,
+    # and for all following requests, we do not limit the number, as it's already capped at 10_000 by the server.
+    # For non-initial queries, we want to filter for all events that happened _after_ our previously
+    # fetched crashes, base on the most recent ID.
+    if latest_id == 0:
+        url = f"{url};limit=1000"
+    else:
+        # Apache Solr format requires us to encode the query.
+        # Exclusive bounds range - {N TO *}
+        url = url + urllib.parse.quote(f";q=id:{{{latest_id} TO *}}", safe=";=:")
+
+    headers = {
+        "Accept": "application/vnd.scea.recap.crashes+json; version=1",
+    }
+    access_token = project.get_option(RECAP_SERVER_TOKEN_OPTION, None)
+    if access_token is not None:
+        headers["Authorization"] = f"Bearer {access_token}"
+
+    result = http.fetch_file(url, headers=headers)
+
+    try:
+        crashes = json.loads(result.body)
+        if not isinstance(crashes, dict):
+            logger.exception(
+                "Polled project endpoint did not responded with json object",
+                extra={"project": project},
+            )
+            return
+    except json.JSONDecodeError as exc:
+        logger.exception(
+            "Polled project endpoint did not responded with valid json",
+            exc_info=exc,
+            extra={"project": project, "url": url},
+        )
+        return
+
+    if crashes.get("results") is None or crashes.get("results") == 0:
+        return
+
+    try:
+        for crash in crashes["_embedded"]["crash"]:
+            store_crash(crash, project, url)
+            latest_id = max(latest_id, crash["id"])
+    finally:
+        project.update_option(RECAP_SERVER_LATEST_ID, latest_id)
+
+
+def store_crash(crash, project: Project, url: str) -> None:
+    try:
+        event = translate_crash_to_event(crash, project, url)
+    except KeyError as exc:
+        logger.exception(
+            "Crash dump data has invalid payload",
+            exc_info=exc,
+            extra={"project": project, "url": url},
+        )
+        return
+
+    if options.get("processing.can-use-scrubbers"):
+        new_event = safe_execute(scrub_data, project=project, event=event, _with_transaction=False)
+        if new_event is not None:
+            event = new_event
+
+    event_manager = EventManager(event, project=project)
+    event_manager.save(project_id=project.id)
+
+
+def translate_crash_to_event(crash, project: Project, url: str) -> Dict[str, Any]:
+    event = {
+        "event_id": uuid.uuid4().hex,
+        "project": project.id,
+        "platform": "c",
+        "exception": {
+            "values": [
+                {
+                    "type": crash["stopReason"],
+                }
+            ]
+        },
+        "tags": {
+            "id": crash["id"],
+        },
+        "contexts": {
+            "request": {"url": crash["_links"]["self"]},
+        },
+    }
+
+    if "uploadDate" in crash:
+        event["timestamp"] = crash["uploadDate"]
+
+    if "stopLocation" in crash:
+        event["exception"]["values"][0]["value"] = crash["stopLocation"]
+    elif "returnLocation" in crash:
+        event["exception"]["values"][0]["value"] = crash["returnLocation"]
+
+    if "detailedStackTrace" in crash:
+        frames = []
+        for frame in crash["detailedStackTrace"]:
+            processed_frame = {
+                "filename": frame["sourceFile"],
+                "lineno": frame["sourceLine"],
+                "instruction_addr": frame["absoluteAddress"],
+                "module": frame["moduleName"],
+                "function": frame["resolvedSymbol"],
+                "raw_function": frame["displayValue"],
+                "in_app": True,
+            }
+            frames.append(processed_frame)
+        event["exception"]["values"][0]["stacktrace"] = {"frames": frames}
+    elif "stackTrace" in crash:
+        frames = []
+        for frame in crash["stackTrace"]:
+            processed_frame = {"function": frame, "in_app": True}
+            frames.append(processed_frame)
+        event["exception"]["values"][0]["stacktrace"] = {"frames": frames}
+
+    if "titleId" in crash:
+        event["tags"]["titleId"] = crash["titleId"]
+
+    if "platform" in crash:
+        if "sysVersion" in crash:
+            event["contexts"]["runtime"] = {
+                "name": crash["platform"],
+                "version": crash["sysVersion"],
+            }
+
+        if "hardwareId" in crash:
+            event["contexts"]["device"] = {
+                "name": crash["platform"],
+                "model_id": crash["hardwareId"],
+            }
+
+    if "appVersion" in crash:
+        event["contexts"]["app"] = {"app_version": crash["appVersion"]}
+
+    if "userData" in crash:
+        event["contexts"]["userData"] = crash["userData"]
+
+    return event

+ 17 - 0
static/app/data/forms/projectGeneralSettings.tsx

@@ -90,6 +90,23 @@ export const fields: Record<string, Field> = {
     }),
   },
 
+  // TODO(recap): Move this to a separate page or debug files one, not general settings
+  recapServerUrl: {
+    name: 'recapServerUrl',
+    type: 'string',
+    placeholder: t('URL'),
+    label: t('Recap Server URL'),
+    help: t('URL to the Recap Server events should be polled from'),
+  },
+
+  recapServerToken: {
+    name: 'recapServerToken',
+    type: 'string',
+    placeholder: t('Token'),
+    label: t('Recap Server Token'),
+    help: t('Auth Token to the configured Recap Server'),
+  },
+
   subjectPrefix: {
     name: 'subjectPrefix',
     type: 'string',

+ 15 - 1
static/app/views/settings/projectGeneralSettings/index.tsx

@@ -297,6 +297,8 @@ class ProjectGeneralSettings extends AsyncView<Props, State> {
       },
     };
 
+    const hasRecapServerFeature = project.features.includes('recap-server');
+
     return (
       <div>
         <SettingsPageHeader title={t('Project Settings')} />
@@ -306,7 +308,19 @@ class ProjectGeneralSettings extends AsyncView<Props, State> {
           <JsonForm
             {...jsonFormProps}
             title={t('Project Details')}
-            fields={[fields.name, fields.platform]}
+            // TODO(recap): Move this to a separate page or debug files one, not general settings
+            fields={[
+              fields.name,
+              fields.platform,
+              {
+                ...fields.recapServerUrl,
+                visible: hasRecapServerFeature,
+              },
+              {
+                ...fields.recapServerToken,
+                visible: hasRecapServerFeature,
+              },
+            ]}
           />
 
           <JsonForm

+ 72 - 0
tests/sentry/api/endpoints/test_project_details.py

@@ -1001,6 +1001,78 @@ class ProjectUpdateTest(APITestCase):
             assert resp.status_code == 200
             assert project.get_option("sentry:symbol_sources", json.dumps([source1]))
 
+    @mock.patch("sentry.tasks.recap_servers.poll_project_recap_server.delay")
+    def test_recap_server(self, poll_project_recap_server):
+        project = Project.objects.get(id=self.project.id)
+        with Feature({"projects:recap-server": True}):
+            resp = self.get_response(
+                self.org_slug, self.proj_slug, recapServerUrl="http://example.com"
+            )
+
+            assert resp.status_code == 200
+            assert project.get_option("sentry:recap_server_url") == "http://example.com"
+            assert poll_project_recap_server.called
+
+            resp = self.get_response(self.org_slug, self.proj_slug, recapServerToken="wat")
+
+            assert resp.status_code == 200
+            assert project.get_option("sentry:recap_server_token") == "wat"
+            assert poll_project_recap_server.called
+
+    @mock.patch("sentry.tasks.recap_servers.poll_project_recap_server.delay")
+    def test_recap_server_no_feature(self, poll_project_recap_server):
+        project = Project.objects.get(id=self.project.id)
+        with Feature({"projects:recap-server": False}):
+            resp = self.get_response(
+                self.org_slug, self.proj_slug, recapServerUrl="http://example.com"
+            )
+
+            assert resp.status_code == 400
+            assert project.get_option("sentry:recap_server_url") is None
+
+            resp = self.get_response(self.org_slug, self.proj_slug, recapServerToken="wat")
+
+            assert resp.status_code == 400
+            assert project.get_option("sentry:recap_server_token") is None
+
+    @mock.patch("sentry.tasks.recap_servers.poll_project_recap_server.delay")
+    def test_recap_server_no_modification(self, poll_project_recap_server):
+        project = Project.objects.get(id=self.project.id)
+        project.update_option("sentry:recap_server_url", "http://example.com")
+        project.update_option("sentry:recap_server_token", "wat")
+        with Feature({"projects:recap-server": True}):
+            resp = self.get_response(
+                self.org_slug, self.proj_slug, recapServerUrl="http://example.com"
+            )
+
+            assert resp.status_code == 200
+            assert project.get_option("sentry:recap_server_url") == "http://example.com"
+            assert not poll_project_recap_server.called
+
+            resp = self.get_response(self.org_slug, self.proj_slug, recapServerToken="wat")
+
+            assert resp.status_code == 200
+            assert project.get_option("sentry:recap_server_token") == "wat"
+            assert not poll_project_recap_server.called
+
+    @mock.patch("sentry.tasks.recap_servers.poll_project_recap_server.delay")
+    def test_recap_server_deletion(self, poll_project_recap_server):
+        project = Project.objects.get(id=self.project.id)
+        project.update_option("sentry:recap_server_url", "http://example.com")
+        project.update_option("sentry:recap_server_token", "wat")
+        with Feature({"projects:recap-server": True}):
+            resp = self.get_response(self.org_slug, self.proj_slug, recapServerUrl="")
+
+            assert resp.status_code == 200
+            assert project.get_option("sentry:recap_server_url") is None
+            assert not poll_project_recap_server.called
+
+            resp = self.get_response(self.org_slug, self.proj_slug, recapServerToken="")
+
+            assert resp.status_code == 200
+            assert project.get_option("sentry:recap_server_token") is None
+            assert not poll_project_recap_server.called
+
 
 @region_silo_test
 class CopyProjectSettingsTest(APITestCase):

+ 217 - 0
tests/sentry/tasks/test_recap_servers.py

@@ -0,0 +1,217 @@
+from unittest.mock import call, patch
+
+import pytest
+import responses
+
+from sentry import eventstore
+from sentry.tasks.recap_servers import (
+    RECAP_SERVER_LATEST_ID,
+    RECAP_SERVER_TOKEN_OPTION,
+    RECAP_SERVER_URL_OPTION,
+    poll_project_recap_server,
+    poll_recap_servers,
+)
+from sentry.testutils import TestCase
+from sentry.testutils.helpers import Feature
+from sentry.utils import json
+
+crash_payload = {
+    "_links": {
+        "self": {"href": "ApiBaseUrl/burp/137?field=stopReason"},
+        "files": {"href": "ApiBaseUrl/burp/137/files", "custom": True},
+    },
+    "id": 1,
+    "uploadDate": "2018-11-06T21:19:55.271Z",
+    "stopReason": "SEGFAULT",
+    "detailedStackTrace": [
+        {
+            "sourceFile": "/usr/build/src/foo.c",
+            "sourceLine": 42,
+            "moduleName": "boot.bin",
+            "moduleFingerprint": "iddqd",
+            "moduleOffset": "0x1",
+            "resolvedSymbol": "Foo::Run()+0x4",
+            "absoluteAddress": "0xaa00bb4",
+            "displayValue": "boot.bin!Foo::Update()+0x4",
+        },
+        {
+            "sourceFile": "/usr/build/src/bar.c",
+            "sourceLine": 1337,
+            "moduleName": "boot.bin",
+            "moduleFingerprint": "idkfa",
+            "moduleOffset": "0x10",
+            "resolvedSymbol": "Executor::Run()+0x30",
+            "absoluteAddress": "0xbb11aa4",
+            "displayValue": "boot.bin!Bar::Trigger()+0x30",
+        },
+    ],
+    "userData": {
+        "password": "should_be_redacted",
+    },
+}
+
+
+@pytest.mark.django_db
+@patch("sentry.tasks.recap_servers.poll_project_recap_server.delay")
+class PollRecapServersTest(TestCase):
+    def setUp(self):
+        self.org = self.create_organization(owner=self.user)
+
+    def test_poll_recap_servers_no_matches(
+        self,
+        poll_project_recap_server,
+    ):
+        poll_recap_servers()
+        assert poll_project_recap_server.call_count == 0
+
+    def test_poll_recap_servers_single_project(
+        self,
+        poll_project_recap_server,
+    ):
+        project = self.create_project(organization=self.org, name="foo")
+        project.update_option(RECAP_SERVER_URL_OPTION, "http://example.com")
+
+        poll_recap_servers()
+
+        assert poll_project_recap_server.call_count == 1
+        poll_project_recap_server.assert_has_calls([call(project.id)], any_order=True)
+
+    def test_poll_recap_servers_multiple_projects(self, poll_project_recap_server):
+        project = self.create_project(organization=self.org, name="foo")
+        project.update_option(RECAP_SERVER_URL_OPTION, "http://example.com")
+        project_dos = self.create_project(organization=self.org, name="bar")
+        project_dos.update_option(RECAP_SERVER_URL_OPTION, "http://example-dos.com")
+        project_tres = self.create_project(organization=self.org, name="baz")
+        project_tres.update_option(RECAP_SERVER_URL_OPTION, "http://example-tres.com")
+
+        poll_recap_servers()
+
+        assert poll_project_recap_server.call_count == 3
+        poll_project_recap_server.assert_has_calls(
+            [call(project.id), call(project_dos.id), call(project_tres.id)], any_order=True
+        )
+
+
+@pytest.mark.django_db
+class PollProjectRecapServerTest(TestCase):
+    @pytest.fixture(autouse=True)
+    def initialize(self):
+        with Feature({"projects:recap-server": True}):
+            yield  # Run test case
+
+    def setUp(self):
+        self.org = self.create_organization(owner=self.user)
+        self.project = self.create_project(organization=self.org, name="foo")
+
+    def get_crash_payload(self, id):
+        crash = dict(crash_payload)
+        crash["id"] = id
+        return crash
+
+    def test_poll_project_recap_server_incorrect_project(self):
+        poll_project_recap_server(1337)  # should not error
+
+    def test_poll_project_recap_server_missing_recap_url(self):
+        poll_project_recap_server(self.project.id)  # should not error
+
+    def test_poll_project_recap_server_disabled_feature(self):
+        with Feature({"projects:recap-server": False}):
+            self.project.update_option(RECAP_SERVER_URL_OPTION, "http://example.com")
+            poll_project_recap_server(self.project.id)  # should not error
+
+    @patch("sentry.tasks.recap_servers.store_crash")
+    @responses.activate
+    def test_poll_project_recap_server_initial_request(self, store_crash):
+        payload = {
+            "results": 3,
+            "_embedded": {
+                "crash": [
+                    {"id": 1},
+                    {"id": 1337},
+                    {"id": 42},
+                ]
+            },
+        }
+        outgoing_recap_request = responses.get(
+            url="http://example.com/rest/v1/crashes;sort=id:ascending;limit=1000",
+            body=json.dumps(payload),
+            content_type="application/json",
+        )
+
+        self.project.update_option(RECAP_SERVER_URL_OPTION, "http://example.com")
+        assert self.project.get_option(RECAP_SERVER_LATEST_ID) is None
+
+        poll_project_recap_server(self.project.id)
+
+        assert outgoing_recap_request.call_count == 1
+        assert store_crash.call_count == 3
+        assert self.project.get_option(RECAP_SERVER_LATEST_ID) == 1337
+
+    @patch("sentry.tasks.recap_servers.store_crash")
+    @responses.activate
+    def test_poll_project_recap_server_following_request(self, store_crash):
+        payload = {
+            "results": 2,
+            "_embedded": {
+                "crash": [
+                    {"id": 1337},
+                    {"id": 42},
+                ]
+            },
+        }
+        # Encoded query: {8 TO *}
+        outgoing_recap_request = responses.get(
+            url="http://example.com/rest/v1/crashes;sort=id:ascending;q=id:%7B8%20TO%20%2A%7D",
+            body=json.dumps(payload),
+            content_type="application/json",
+        )
+        self.project.update_option(RECAP_SERVER_URL_OPTION, "http://example.com")
+        self.project.update_option(RECAP_SERVER_LATEST_ID, 8)
+
+        poll_project_recap_server(self.project.id)
+
+        assert outgoing_recap_request.call_count == 1
+        assert store_crash.call_count == 2
+        assert self.project.get_option(RECAP_SERVER_LATEST_ID) == 1337
+
+    @patch("sentry.tasks.recap_servers.store_crash")
+    @responses.activate
+    def test_poll_project_recap_server_auth_token_header(self, store_crash):
+        outgoing_recap_request = responses.get(
+            url="http://example.com/rest/v1/crashes;sort=id:ascending;limit=1000",
+            body=json.dumps({"results": 0}),
+            content_type="application/json",
+            match=[responses.matchers.header_matcher({"Authorization": "Bearer mkey"})],
+        )
+
+        self.project.update_option(RECAP_SERVER_URL_OPTION, "http://example.com")
+        self.project.update_option(RECAP_SERVER_TOKEN_OPTION, "mkey")
+
+        poll_project_recap_server(self.project.id)
+
+        assert outgoing_recap_request.call_count == 1
+
+    # TODO(recap): Add more assetions on `event.data` when the time comes
+    @responses.activate
+    def test_poll_recap_servers_store_crash(self):
+        payload = {
+            "results": 2,
+            "_embedded": {"crash": [self.get_crash_payload(1337), self.get_crash_payload(42)]},
+        }
+        responses.get(
+            url="http://example.com/rest/v1/crashes;sort=id:ascending;limit=1000",
+            body=json.dumps(payload),
+            content_type="application/json",
+        )
+        self.project.update_option(RECAP_SERVER_URL_OPTION, "http://example.com")
+
+        poll_project_recap_server(self.project.id)
+
+        events = eventstore.backend.get_events(
+            eventstore.Filter(project_ids=[self.project.id]),
+            tenant_ids={"referrer": "relay-test", "organization_id": 123},
+        )
+
+        # Make sure that event went though the normalization and pii scrubbing process
+        assert events[0].data["contexts"]["userData"]["password"] == "[Filtered]"
+        assert events[1].data["contexts"]["userData"]["password"] == "[Filtered]"