Browse Source

feat(snql-sdk) Add a flag to use the SnQL SDK to run a snuba query (#24755)

SnQL and the SDK are still in the testing phase, so turning on the flag doesn't
actually submit the query using SnQL. Instead for now, it will run the query as
normal using the legacy endpoint, and asynchronously call the SnQL endpoint as a
dry run. If the generated SQL from the two calls is not the same, it logs an
error. That way we can turn the flag on, ensure there are no errors before we
flip over to the SnQL endpoint.

Dry run calls do not impact rate limits at all in Snuba, so the extra query only
adds a little bit of overhead.
Evan Hicks 4 years ago
parent
commit
ae618b27d2
4 changed files with 185 additions and 17 deletions
  1. 1 0
      requirements-base.txt
  2. 134 11
      src/sentry/utils/snuba.py
  3. 1 0
      tests/snuba/search/test_backend.py
  4. 49 6
      tests/snuba/test_snuba.py

+ 1 - 0
requirements-base.txt

@@ -50,6 +50,7 @@ rfc3986-validator==0.1.1
 # [end] jsonschema format validators
 sentry-relay==0.8.4
 sentry-sdk>=1.0.0,<1.2.0
+snuba-sdk>=0.0.8,<1.0.0
 simplejson==3.17.2
 statsd==3.1
 structlog==17.1.0

+ 134 - 11
src/sentry/utils/snuba.py

@@ -4,6 +4,7 @@ from contextlib import contextmanager
 from datetime import datetime, timedelta
 from hashlib import sha1
 from operator import itemgetter
+from typing import Any, Callable, List, MutableMapping, Mapping, Optional, Sequence, Tuple
 
 from dateutil.parser import parse as parse_datetime
 import logging
@@ -15,6 +16,8 @@ import time
 import urllib3
 import sentry_sdk
 from sentry_sdk import Hub
+from snuba_sdk.legacy import json_to_snql
+from snuba_sdk.query import Query
 
 from concurrent.futures import ThreadPoolExecutor
 from django.conf import settings
@@ -617,6 +620,7 @@ def raw_query(
     referrer=None,
     is_grouprelease=False,
     use_cache=False,
+    use_snql=False,
     **kwargs,
 ):
     """
@@ -635,16 +639,30 @@ def raw_query(
         is_grouprelease=is_grouprelease,
         **kwargs,
     )
-    return bulk_raw_query([snuba_params], referrer=referrer, use_cache=use_cache)[0]
+    return bulk_raw_query(
+        [snuba_params], referrer=referrer, use_cache=use_cache, use_snql=use_snql
+    )[0]
 
 
-def bulk_raw_query(snuba_param_list, referrer=None, use_cache=False):
+SnubaQuery = Tuple[MutableMapping[str, Any], Callable[[Any], Any], Callable[[Any], Any]]
+SnQLQuery = Tuple[Tuple[Query, Mapping[str, Any]], Callable[[Any], Any], Callable[[Any], Any]]
+ResultSet = List[Mapping[str, Any]]  # TODO: Would be nice to make this a concrete structure
+
+
+def bulk_raw_query(
+    snuba_param_list: Sequence[SnubaQueryParams],
+    referrer: Optional[str] = None,
+    use_cache: Optional[bool] = False,
+    use_snql: Optional[bool] = False,
+) -> ResultSet:
     headers = {}
     if referrer:
         headers["referer"] = referrer
 
     # Store the original position of the query so that we can maintain the order
-    query_param_list = list(enumerate(map(_prepare_query_params, snuba_param_list)))
+    query_param_list: List[Tuple[int, SnubaQuery]] = list(
+        enumerate(map(_prepare_query_params, snuba_param_list))
+    )
 
     results = []
 
@@ -655,7 +673,7 @@ def bulk_raw_query(snuba_param_list, referrer=None, use_cache=False):
             for _, query_params in query_param_list
         ]
         cache_data = cache.get_many(cache_keys)
-        to_query = []
+        to_query: List[Tuple[int, SnubaQuery, Optional[str]]] = []
         for (query_pos, query_params), cache_key in zip(query_param_list, cache_keys):
             cached_result = cache_data.get(cache_key)
             metric_tags = {"referrer": referrer} if referrer else None
@@ -669,7 +687,7 @@ def bulk_raw_query(snuba_param_list, referrer=None, use_cache=False):
         to_query = [(query_pos, query_params, None) for query_pos, query_params in query_param_list]
 
     if to_query:
-        query_results = _bulk_snuba_query(map(itemgetter(1), to_query), headers)
+        query_results = _bulk_snuba_query(map(itemgetter(1), to_query), headers, use_snql)
         for result, (query_pos, _, cache_key) in zip(query_results, to_query):
             if cache_key:
                 cache.set(cache_key, json.dumps(result), settings.SENTRY_SNUBA_CACHE_TTL_SECONDS)
@@ -681,23 +699,27 @@ def bulk_raw_query(snuba_param_list, referrer=None, use_cache=False):
     return map(itemgetter(1), results)
 
 
-def _bulk_snuba_query(snuba_param_list, headers):
+def _bulk_snuba_query(
+    snuba_param_list: List[SnubaQuery], headers: Mapping[str, str], use_snql: Optional[bool] = False
+) -> ResultSet:
     with sentry_sdk.start_span(
         op="start_snuba_query",
         description=f"running {len(snuba_param_list)} snuba queries",
     ) as span:
         span.set_tag("referrer", headers.get("referer", "<unknown>"))
+        query_fn = _snql_query if use_snql else _snuba_query
+
         if len(snuba_param_list) > 1:
             query_results = list(
                 _query_thread_pool.map(
-                    _snuba_query,
-                    [params + (Hub(Hub.current), headers) for params in snuba_param_list],
+                    query_fn,
+                    [(params, Hub(Hub.current), headers) for params in snuba_param_list],
                 )
             )
         else:
             # No need to submit to the thread pool if we're just performing a
             # single query
-            query_results = [_snuba_query(snuba_param_list[0] + (Hub(Hub.current), headers))]
+            query_results = [query_fn((snuba_param_list[0], Hub(Hub.current), headers))]
 
     results = []
     for response, _, reverse in query_results:
@@ -741,8 +763,12 @@ def _bulk_snuba_query(snuba_param_list, headers):
     return results
 
 
-def _snuba_query(params):
-    query_params, forward, reverse, thread_hub, headers = params
+RawResult = Tuple[urllib3.response.HTTPResponse, Callable[[Any], Any], Callable[[Any], Any]]
+
+
+def _snuba_query(params: Tuple[SnubaQuery, Hub, Mapping[str, str]]) -> RawResult:
+    query_data, thread_hub, headers = params
+    query_params, forward, reverse = query_data
     try:
         with timer("snuba_query"):
             referrer = headers.get("referer", "<unknown>")
@@ -764,6 +790,99 @@ def _snuba_query(params):
         raise SnubaError(err)
 
 
+def _snql_query(params: Tuple[SnubaQuery, Hub, Mapping[str, str]]) -> RawResult:
+    # Run the SnQL query in debug/dry_run mode
+    # Run the legacy query in debug mode
+    # Log any errors in SnQL execution and log if the returned SQL is not the same
+    query_data, thread_hub, headers = params
+    query_params, forward, reverse = query_data
+    og_debug = query_params.get("debug", False)
+    try:
+        query = json_to_snql(query_params, query_params["dataset"])
+        query.validate()  # Call this here just avoid it happening in the async all
+    except Exception as e:
+        logger.warning(
+            "snuba.snql.parsing.error",
+            extra={"error": str(e), "params": json.dumps(query_params)},
+        )
+        return _snuba_query(params)
+
+    query = query.set_dry_run(True).set_debug(True)
+    query_params["debug"] = True
+
+    snql_future = _query_thread_pool.submit(_raw_snql_query, query, Hub(thread_hub), headers)
+    # If this fails then there's no point doing anything else, so let any exception get reraised
+    legacy_result = _snuba_query(params)
+
+    query_params["debug"] = og_debug
+    legacy_resp = legacy_result[0]
+
+    try:
+        snql_resp = snql_future.result()
+    except Exception as e:
+        logger.warning(
+            "snuba.snql.dryrun.sending.error",
+            extra={"error": str(e), "params": json.dumps(query_params), "query": str(query)},
+        )
+        return legacy_result
+
+    legacy_data = json.loads(legacy_resp.data)
+    try:
+        snql_data = json.loads(snql_resp.data)
+    except Exception as e:
+        logger.warning(
+            "snuba.snql.dryrun.json.error",
+            extra={
+                "error": str(e),
+                "params": json.dumps(query_params),
+                "query": str(query),
+                "resp": snql_resp.data,
+            },
+        )
+        return legacy_result
+
+    if "sql" not in snql_data or "sql" not in legacy_data:
+        logger.warning(
+            "snuba.snql.dryrun.error",
+            extra={
+                "params": json.dumps(query_params),
+                "query": str(query),
+                "snql": "sql" in snql_data,
+                "legacy": "sql" in legacy_data,
+            },
+        )
+        return legacy_result
+
+    if snql_data["sql"] != legacy_data["sql"]:
+        logger.warning(
+            "snuba.snql.dryrun.mismatch.error",
+            extra={
+                "params": json.dumps(query_params),
+                "query": str(query),
+                "snql": snql_data["sql"],
+                "legacy": legacy_data["sql"],
+            },
+        )
+
+    return legacy_result
+
+
+def _raw_snql_query(
+    query: Query, thread_hub: Hub, headers: Mapping[str, str]
+) -> urllib3.response.HTTPResponse:
+    with timer("snql_query"):
+        referrer = headers.get("referer", "<unknown>")
+        if SNUBA_INFO:
+            logger.info(f"{referrer}.body: {query}")
+            query = query.set_debug(True)
+
+        body = query.snuba()
+        with thread_hub.start_span(op="snuba_snql", description=f"query {referrer}") as span:
+            span.set_tag("referrer", referrer)
+            span.set_tag("snql", str(query))
+            return _snuba_pool.urlopen("POST", f"/{query.dataset}/snql", body=body, headers=headers)
+
+
 def query(
     dataset=None,
     start=None,
@@ -775,6 +894,7 @@ def query(
     selected_columns=None,
     totals=None,
     use_cache=False,
+    use_snql=False,
     **kwargs,
 ):
 
@@ -795,6 +915,7 @@ def query(
             selected_columns=selected_columns,
             totals=totals,
             use_cache=use_cache,
+            use_snql=use_snql,
             **kwargs,
         )
     except (QueryOutsideRetentionError, QueryOutsideGroupActivityError):
@@ -966,6 +1087,7 @@ def _aliased_query_impl(
     dataset=None,
     orderby=None,
     condition_resolver=None,
+    use_snql=False,
     **kwargs,
 ):
     if dataset is None:
@@ -1018,6 +1140,7 @@ def _aliased_query_impl(
         having=having,
         dataset=dataset,
         orderby=orderby,
+        use_snql=use_snql,
         **kwargs,
     )
 

+ 1 - 0
tests/snuba/search/test_backend.py

@@ -1010,6 +1010,7 @@ class EventsSnubaSearchTest(TestCase, SnubaTestCase):
             "totals": True,
             "turbo": False,
             "sample": 1,
+            "use_snql": False,
         }
 
         self.make_query(search_filter_query="status:unresolved")

+ 49 - 6
tests/snuba/test_snuba.py

@@ -13,6 +13,8 @@ from sentry.utils import snuba
 
 
 class SnubaTest(TestCase, SnubaTestCase):
+    should_use_snql = False
+
     def _insert_event_for_time(self, ts, hash="a" * 32, group_id=None):
         self.snuba_insert(
             (
@@ -31,7 +33,7 @@ class SnubaTest(TestCase, SnubaTestCase):
             )
         )
 
-    def test(self):
+    def test(self) -> None:
         "This is just a simple 'hello, world' example test."
 
         now = datetime.now()
@@ -61,11 +63,12 @@ class SnubaTest(TestCase, SnubaTestCase):
                 end=now + timedelta(days=1),
                 groupby=["project_id"],
                 filter_keys={"project_id": [self.project.id]},
+                use_snql=self.should_use_snql,
             )
             == {self.project.id: 1}
         )
 
-    def test_fail(self):
+    def test_fail(self) -> None:
         now = datetime.now()
         with pytest.raises(snuba.SnubaError):
             snuba.query(
@@ -73,9 +76,10 @@ class SnubaTest(TestCase, SnubaTestCase):
                 end=now + timedelta(days=1),
                 filter_keys={"project_id": [self.project.id]},
                 groupby=[")("],
+                use_snql=self.should_use_snql,
             )
 
-    def test_organization_retention_respected(self):
+    def test_organization_retention_respected(self) -> None:
         base_time = datetime.utcnow()
 
         self._insert_event_for_time(base_time - timedelta(minutes=1))
@@ -88,13 +92,14 @@ class SnubaTest(TestCase, SnubaTestCase):
                 end=base_time + timedelta(days=1),
                 groupby=["project_id"],
                 filter_keys={"project_id": [self.project.id]},
+                use_snql=self.should_use_snql,
             )
 
         assert _get_event_count() == {self.project.id: 2}
         with self.options({"system.event-retention-days": 1}):
             assert _get_event_count() == {self.project.id: 1}
 
-    def test_organization_retention_larger_than_end_date(self):
+    def test_organization_retention_larger_than_end_date(self) -> None:
         base_time = datetime.utcnow()
 
         with self.options({"system.event-retention-days": 1}):
@@ -104,13 +109,50 @@ class SnubaTest(TestCase, SnubaTestCase):
                     end=base_time - timedelta(days=60),
                     groupby=["project_id"],
                     filter_keys={"project_id": [self.project.id]},
+                    use_snql=self.should_use_snql,
                 )
                 == {}
             )
 
 
+class SnQLSnubaTest(SnubaTest):
+    should_use_snql = True
+
+
 class BulkRawQueryTest(TestCase, SnubaTestCase):
-    def test_simple(self):
+    def test_simple(self) -> None:
+        one_min_ago = iso_format(before_now(minutes=1))
+        event_1 = self.store_event(
+            data={"fingerprint": ["group-1"], "message": "hello", "timestamp": one_min_ago},
+            project_id=self.project.id,
+        )
+        event_2 = self.store_event(
+            data={"fingerprint": ["group-2"], "message": "hello", "timestamp": one_min_ago},
+            project_id=self.project.id,
+        )
+
+        results = snuba.bulk_raw_query(
+            [
+                snuba.SnubaQueryParams(
+                    start=timezone.now() - timedelta(days=1),
+                    end=timezone.now(),
+                    selected_columns=["event_id", "group_id", "timestamp"],
+                    filter_keys={"project_id": [self.project.id], "group_id": [event_1.group.id]},
+                ),
+                snuba.SnubaQueryParams(
+                    start=timezone.now() - timedelta(days=1),
+                    end=timezone.now(),
+                    selected_columns=["event_id", "group_id", "timestamp"],
+                    filter_keys={"project_id": [self.project.id], "group_id": [event_2.group.id]},
+                ),
+            ],
+        )
+        assert [{(item["group_id"], item["event_id"]) for item in r["data"]} for r in results] == [
+            {(event_1.group.id, event_1.event_id)},
+            {(event_2.group.id, event_2.event_id)},
+        ]
+
+    def test_simple_use_snql(self) -> None:
         one_min_ago = iso_format(before_now(minutes=1))
         event_1 = self.store_event(
             data={"fingerprint": ["group-1"], "message": "hello", "timestamp": one_min_ago},
@@ -135,7 +177,8 @@ class BulkRawQueryTest(TestCase, SnubaTestCase):
                     selected_columns=["event_id", "group_id", "timestamp"],
                     filter_keys={"project_id": [self.project.id], "group_id": [event_2.group.id]},
                 ),
-            ]
+            ],
+            use_snql=True,
         )
         assert [{(item["group_id"], item["event_id"]) for item in r["data"]} for r in results] == [
             {(event_1.group.id, event_1.event_id)},