Browse Source

ref(rules): Add type hints and parameterize common functions (#33800)

* parameterize ownership rule match testing
* add type hints
Gilbert Szeto 2 years ago
parent
commit
7960f9138d
3 changed files with 91 additions and 65 deletions
  1. 3 0
      mypy.ini
  2. 87 64
      src/sentry/ownership/grammar.py
  3. 1 1
      src/sentry/search/events/builder.py

+ 3 - 0
mypy.ini

@@ -55,6 +55,7 @@ files = src/sentry/analytics/,
         src/sentry/models/options/,
         src/sentry/models/rulefirehistory.py,
         src/sentry/notifications/,
+        src/sentry/ownership/grammar.py,
         src/sentry/pipeline/,
         src/sentry/processing/realtime_metrics/,
         src/sentry/profiles/,
@@ -134,6 +135,8 @@ ignore_missing_imports = True
 ignore_missing_imports = True
 [mypy-mistune.*]
 ignore_missing_imports = True
+[mypy-parsimonious.*]
+ignore_missing_imports = True
 [mypy-rb.*]
 ignore_missing_imports = True
 [mypy-rest_framework.*]

+ 87 - 64
src/sentry/ownership/grammar.py

@@ -1,16 +1,31 @@
+from __future__ import annotations
+
 import operator
 import re
 from collections import namedtuple
 from functools import reduce
-from typing import Iterable, List, Mapping, Pattern, Tuple
+from typing import (
+    Any,
+    Callable,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    Optional,
+    Pattern,
+    Sequence,
+    Tuple,
+    Union,
+)
 
 from django.db.models import Q
 from parsimonious.exceptions import ParseError  # noqa
 from parsimonious.grammar import Grammar, NodeVisitor
+from parsimonious.nodes import Node
 from rest_framework.serializers import ValidationError
 
 from sentry.eventstore.models import EventSubjectTemplateData
-from sentry.models import ActorTuple
+from sentry.models import ActorTuple, RepositoryProjectPathConfig
 from sentry.utils.glob import glob_match
 from sentry.utils.safe import get_path
 
@@ -60,20 +75,23 @@ _       = space*
 )
 
 
+PathSearchable = Union[Mapping[str, Any], Sequence[Any]]
+
+
 class Rule(namedtuple("Rule", "matcher owners")):
     """
     A Rule represents a single line in an Ownership file.
     This line contains a Matcher and a list of Owners.
     """
 
-    def dump(self):
+    def dump(self) -> Mapping[str, Sequence[Owner]]:
         return {"matcher": self.matcher.dump(), "owners": [o.dump() for o in self.owners]}
 
     @classmethod
-    def load(cls, data):
+    def load(cls, data: Mapping[str, Any]) -> Rule:
         return cls(Matcher.load(data["matcher"]), [Owner.load(o) for o in data["owners"]])
 
-    def test(self, data):
+    def test(self, data: Mapping[str, Any]) -> Union[bool, Any]:
         return self.matcher.test(data)
 
 
@@ -92,14 +110,17 @@ class Matcher(namedtuple("Matcher", "type pattern")):
         src/*
     """
 
-    def dump(self):
+    def dump(self) -> Mapping[str, str]:
         return {"type": self.type, "pattern": self.pattern}
 
     @classmethod
-    def load(cls, data):
+    def load(cls, data: Mapping[str, str]) -> Matcher:
         return cls(data["type"], data["pattern"])
 
-    def test(self, data):
+    def test(
+        self,
+        data: PathSearchable,
+    ) -> bool:
         if self.type == URL:
             return self.test_url(data)
         elif self.type == PATH:
@@ -109,29 +130,49 @@ class Matcher(namedtuple("Matcher", "type pattern")):
         elif self.type.startswith("tags."):
             return self.test_tag(data)
         elif self.type == CODEOWNERS:
-            return self.test_codeowners(data)
+            return self.test_frames(
+                data,
+                ["filename", "abs_path"],
+                # Codeowners has a slightly different syntax compared to issue owners
+                # As such we need to match it using gitignore logic.
+                # See syntax documentation here:
+                # https://docs.github.com/en/github/creating-cloning-and-archiving-repositories/creating-a-repository-on-github/about-code-owners
+                lambda val, pattern: bool(_path_to_regex(pattern).search(val))
+                if val is not None
+                else False,
+            )
         return False
 
-    def test_url(self, data):
+    def test_url(self, data: PathSearchable) -> bool:
+        if not isinstance(data, Mapping):
+            return False
+
         try:
             url = data["request"]["url"]
         except KeyError:
             return False
-        return url and glob_match(url, self.pattern, ignorecase=True)
-
-    def test_frames(self, data, keys):
-        for frame in _iter_frames(data):
+        return url and bool(glob_match(url, self.pattern, ignorecase=True))
+
+    def test_frames(
+        self,
+        data: PathSearchable,
+        keys: Sequence[str],
+        match_frame_value_func: Callable[[Optional[str], str], bool] = lambda val, pattern: bool(
+            glob_match(val, pattern, ignorecase=True, path_normalize=True)
+        ),
+    ) -> bool:
+        for frame in (f for f in _iter_frames(data) if isinstance(f, Mapping)):
             for key in keys:
                 value = frame.get(key)
                 if not value:
                     continue
 
-                if glob_match(value, self.pattern, ignorecase=True, path_normalize=True):
+                if match_frame_value_func(value, self.pattern):
                     return True
 
         return False
 
-    def test_tag(self, data):
+    def test_tag(self, data: PathSearchable) -> bool:
         tag = self.type[5:]
 
         # inspect the event-payload User interface first before checking tags.user
@@ -160,26 +201,6 @@ class Matcher(namedtuple("Matcher", "type pattern")):
                 return True
         return False
 
-    def test_codeowners(self, data):
-        """
-        Codeowners has a slightly different syntax compared to issue owners
-        As such we need to match it using gitignore logic.
-        See syntax documentation here:
-        https://docs.github.com/en/github/creating-cloning-and-archiving-repositories/creating-a-repository-on-github/about-code-owners
-        """
-        spec = _path_to_regex(self.pattern)
-        keys = ["filename", "abs_path"]
-        for frame in _iter_frames(data):
-            value = next((frame.get(key) for key in keys if frame.get(key)), None)
-
-            if not value:
-                continue
-
-            if spec.search(value):
-                return True
-
-        return False
-
 
 class Owner(namedtuple("Owner", "type identifier")):
     """
@@ -192,46 +213,46 @@ class Owner(namedtuple("Owner", "type identifier")):
         #team
     """
 
-    def dump(self):
+    def dump(self) -> Mapping[str, str]:
         return {"type": self.type, "identifier": self.identifier}
 
     @classmethod
-    def load(cls, data):
+    def load(cls, data: Mapping[str, str]) -> Owner:
         return cls(data["type"], data["identifier"])
 
 
-class OwnershipVisitor(NodeVisitor):
+class OwnershipVisitor(NodeVisitor):  # type: ignore
     visit_comment = visit_empty = lambda *a: None
 
-    def visit_ownership(self, node, children):
+    def visit_ownership(self, node: Node, children: Sequence[Optional[Rule]]) -> Sequence[Rule]:
         return [_f for _f in children if _f]
 
-    def visit_line(self, node, children):
+    def visit_line(self, node: Node, children: Tuple[Node, Sequence[Optional[Rule]], Any]) -> Any:
         _, line, _ = children
         comment_or_rule_or_empty = line[0]
         if comment_or_rule_or_empty:
             return comment_or_rule_or_empty
 
-    def visit_rule(self, node, children):
+    def visit_rule(self, node: Node, children: Tuple[Node, Matcher, Sequence[Owner]]) -> Rule:
         _, matcher, owners = children
         return Rule(matcher, owners)
 
-    def visit_matcher(self, node, children):
+    def visit_matcher(self, node: Node, children: Tuple[Node, str, str]) -> Matcher:
         _, tag, identifier = children
         return Matcher(tag, identifier)
 
-    def visit_matcher_tag(self, node, children):
+    def visit_matcher_tag(self, node: Node, children: Sequence[Any]) -> str:
         if not children:
             return "path"
         (tag,) = children
         type, _ = tag
-        return type[0].text
+        return str(type[0].text)
 
-    def visit_owners(self, node, children):
+    def visit_owners(self, node: Node, children: Tuple[Any, Sequence[Owner]]) -> Sequence[Owner]:
         _, owners = children
         return owners
 
-    def visit_owner(self, node, children):
+    def visit_owner(self, node: Node, children: Tuple[Node, bool, str]) -> Owner:
         _, is_team, pattern = children
         type = "team" if is_team else "user"
         # User emails are case insensitive, so coerce them
@@ -240,19 +261,19 @@ class OwnershipVisitor(NodeVisitor):
             pattern = pattern.lower()
         return Owner(type, pattern)
 
-    def visit_team_prefix(self, node, children):
+    def visit_team_prefix(self, node: Node, children: Sequence[Any]) -> bool:
         return bool(children)
 
-    def visit_any_identifier(self, node, children):
+    def visit_any_identifier(self, node: Node, children: Sequence[Any]) -> Node:
         return children[0]
 
-    def visit_identifier(self, node, children):
-        return node.text
+    def visit_identifier(self, node: Node, children: Sequence[Any]) -> str:
+        return str(node.text)
 
-    def visit_quoted_identifier(self, node, children):
-        return node.text[1:-1].encode("ascii", "backslashreplace").decode("unicode-escape")
+    def visit_quoted_identifier(self, node: Node, children: Sequence[Any]) -> str:
+        return str(node.text[1:-1].encode("ascii", "backslashreplace").decode("unicode-escape"))
 
-    def generic_visit(self, node, children):
+    def generic_visit(self, node: Node, children: Sequence[Any]) -> Union[Sequence[Node], Node]:
         return children or node
 
 
@@ -342,7 +363,7 @@ def _path_to_regex(pattern: str) -> Pattern[str]:
     return re.compile(regex)
 
 
-def _iter_frames(data):
+def _iter_frames(data: PathSearchable) -> Iterator[Any]:
     try:
         yield from get_path(data, "stacktrace", "frames", filter=True) or ()
     except KeyError:
@@ -360,29 +381,29 @@ def _iter_frames(data):
             continue
 
 
-def parse_rules(data):
+def parse_rules(data: str) -> Any:
     """Convert a raw text input into a Rule tree"""
     tree = ownership_grammar.parse(data)
     return OwnershipVisitor().visit(tree)
 
 
-def dump_schema(rules):
+def dump_schema(rules: Sequence[Rule]) -> Mapping[str, Any]:
     """Convert a Rule tree into a JSON schema"""
     return {"$version": VERSION, "rules": [r.dump() for r in rules]}
 
 
-def load_schema(schema):
+def load_schema(schema: Mapping[str, Any]) -> Sequence[Rule]:
     """Convert a JSON schema into a Rule tree"""
     if schema["$version"] != VERSION:
         raise RuntimeError("Invalid schema $version: %r" % schema["$version"])
     return [Rule.load(r) for r in schema["rules"]]
 
 
-def convert_schema_to_rules_text(schema):
+def convert_schema_to_rules_text(schema: Mapping[str, Any]) -> str:
     rules = load_schema(schema)
     text = ""
 
-    def owner_prefix(type):
+    def owner_prefix(type: str) -> str:
         if type == "team":
             return "#"
         return ""
@@ -416,14 +437,16 @@ def parse_code_owners(data: str) -> Tuple[List[str], List[str], List[str]]:
     return teams, usernames, emails
 
 
-def get_codeowners_path_and_owners(rule):
+def get_codeowners_path_and_owners(rule: str) -> Tuple[str, Sequence[str]]:
     # Regex does a negative lookbehind for a backslash. Matches on whitespace without a preceding backslash.
     pattern = re.compile(r"(?<!\\)\s")
     path, *code_owners = (i for i in pattern.split(rule.strip()) if i)
     return path, code_owners
 
 
-def convert_codeowners_syntax(codeowners, associations, code_mapping):
+def convert_codeowners_syntax(
+    codeowners: str, associations: Mapping[str, Any], code_mapping: RepositoryProjectPathConfig
+) -> str:
     """Converts CODEOWNERS text into IssueOwner syntax
     codeowners: CODEOWNERS text
     associations: dict of {externalName: sentryName}
@@ -481,7 +504,7 @@ def convert_codeowners_syntax(codeowners, associations, code_mapping):
     return result
 
 
-def resolve_actors(owners: Iterable["Owner"], project_id: int) -> Mapping["Owner", "ActorTuple"]:
+def resolve_actors(owners: Iterable[Owner], project_id: int) -> Mapping[Owner, ActorTuple]:
     """Convert a list of Owner objects into a dictionary
     of {Owner: Actor} pairs. Actors not identified are returned
     as None."""
@@ -533,7 +556,7 @@ def resolve_actors(owners: Iterable["Owner"], project_id: int) -> Mapping["Owner
     return {o: actors.get((o.type, o.identifier.lower())) for o in owners}
 
 
-def create_schema_from_issue_owners(issue_owners, project_id):
+def create_schema_from_issue_owners(issue_owners: str, project_id: int) -> Mapping[str, Any]:
     try:
         rules = parse_rules(issue_owners)
     except ParseError as e:

+ 1 - 1
src/sentry/search/events/builder.py

@@ -4,7 +4,7 @@ from typing import Any, Callable, Dict, List, Mapping, Match, Optional, Set, Tup
 
 import sentry_sdk
 from django.utils.functional import cached_property
-from parsimonious.exceptions import ParseError  # type: ignore
+from parsimonious.exceptions import ParseError
 from snuba_sdk.aliased_expression import AliasedExpression
 from snuba_sdk.column import Column
 from snuba_sdk.conditions import And, BooleanCondition, Condition, Op, Or