Browse Source

feat(aci): get groups to fire actions for in delayed workflow processing (#84702)

Cathy Teng 4 weeks ago
parent
commit
8c8e672ea8

+ 9 - 12
src/sentry/workflow_engine/handlers/condition/event_frequency_handlers.py

@@ -7,12 +7,12 @@ from sentry.rules.conditions.event_frequency import (
 )
 from sentry.workflow_engine.models.data_condition import Condition
 from sentry.workflow_engine.registry import condition_handler_registry
-from sentry.workflow_engine.types import DataConditionHandler, DataConditionResult, WorkflowJob
+from sentry.workflow_engine.types import DataConditionHandler, DataConditionResult
 
 
 @condition_handler_registry.register(Condition.EVENT_FREQUENCY_COUNT)
 @condition_handler_registry.register(Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT)
-class EventFrequencyCountHandler(DataConditionHandler[WorkflowJob]):
+class EventFrequencyCountHandler(DataConditionHandler[list[int]]):
     comparison_json_schema = {
         "type": "object",
         "properties": {
@@ -24,15 +24,15 @@ class EventFrequencyCountHandler(DataConditionHandler[WorkflowJob]):
     }
 
     @staticmethod
-    def evaluate_value(value: WorkflowJob, comparison: Any) -> DataConditionResult:
-        if len(value.get("snuba_results", [])) != 1:
+    def evaluate_value(value: list[int], comparison: Any) -> DataConditionResult:
+        if not isinstance(value, list) or len(value) != 1:
             return False
-        return value["snuba_results"][0] > comparison["value"]
+        return value[0] > comparison["value"]
 
 
 @condition_handler_registry.register(Condition.EVENT_FREQUENCY_PERCENT)
 @condition_handler_registry.register(Condition.EVENT_UNIQUE_USER_FREQUENCY_PERCENT)
-class EventFrequencyPercentHandler(DataConditionHandler[WorkflowJob]):
+class EventFrequencyPercentHandler(DataConditionHandler[list[int]]):
     comparison_json_schema = {
         "type": "object",
         "properties": {
@@ -45,13 +45,10 @@ class EventFrequencyPercentHandler(DataConditionHandler[WorkflowJob]):
     }
 
     @staticmethod
-    def evaluate_value(value: WorkflowJob, comparison: Any) -> DataConditionResult:
-        if len(value.get("snuba_results", [])) != 2:
+    def evaluate_value(value: list[int], comparison: Any) -> DataConditionResult:
+        if not isinstance(value, list) or len(value) != 2:
             return False
-        return (
-            percent_increase(value["snuba_results"][0], value["snuba_results"][1])
-            > comparison["value"]
-        )
+        return percent_increase(value[0], value[1]) > comparison["value"]
 
 
 # Percent sessions values must be between 0-100 (%)

+ 49 - 4
src/sentry/workflow_engine/processors/delayed_workflow.py

@@ -25,6 +25,7 @@ from sentry.workflow_engine.models.data_condition import (
     Condition,
 )
 from sentry.workflow_engine.models.data_condition_group import get_slow_conditions
+from sentry.workflow_engine.processors.data_condition_group import evaluate_data_conditions
 from sentry.workflow_engine.types import DataConditionHandlerType
 
 logger = logging.getLogger("sentry.workflow_engine.processors.delayed_workflow")
@@ -228,6 +229,41 @@ def get_condition_group_results(
     return condition_group_results
 
 
+def get_groups_to_fire(
+    data_condition_groups: list[DataConditionGroup],
+    workflows_to_envs: WorkflowEnvMapping,
+    dcg_to_workflow: dict[int, int],
+    dcg_to_groups: DataConditionGroupGroups,
+    condition_group_results: dict[UniqueConditionQuery, dict[int, int]],
+) -> dict[int, set[DataConditionGroup]]:
+    groups_to_fire: dict[int, set[DataConditionGroup]] = defaultdict(set)
+    for dcg in data_condition_groups:
+        slow_conditions = get_slow_conditions(dcg)
+        action_match = DataConditionGroup.Type(dcg.logic_type)
+        workflow_id = dcg_to_workflow.get(dcg.id)
+        workflow_env = workflows_to_envs[workflow_id] if workflow_id else None
+
+        for group_id in dcg_to_groups[dcg.id]:
+            conditions_to_evaluate = []
+            for condition in slow_conditions:
+                unique_queries = generate_unique_queries(condition, workflow_env)
+                query_values = [
+                    condition_group_results[unique_query][group_id]
+                    for unique_query in unique_queries
+                ]
+                conditions_to_evaluate.append((condition, query_values))
+
+            passes, _ = evaluate_data_conditions(conditions_to_evaluate, action_match)
+            if (
+                passes and workflow_id is None
+            ):  # TODO: detector trigger passes. do something like create issue
+                pass
+            elif passes:
+                groups_to_fire[group_id].add(dcg)
+
+    return groups_to_fire
+
+
 @instrumented_task(
     name="sentry.workflow_engine.processors.delayed_workflow",
     queue="delayed_rules",
@@ -253,18 +289,27 @@ def process_delayed_workflows(
     dcg_to_groups, trigger_type_to_dcg_model = get_dcg_group_workflow_detector_data(
         workflow_event_dcg_data
     )
-    dcg_to_workflow = trigger_type_to_dcg_model[DataConditionHandlerType.WORKFLOW_TRIGGER]
+    dcg_to_workflow = trigger_type_to_dcg_model[DataConditionHandlerType.WORKFLOW_TRIGGER].copy()
     dcg_to_workflow.update(trigger_type_to_dcg_model[DataConditionHandlerType.ACTION_FILTER])
 
     _, workflows_to_envs = fetch_workflows_envs(list(dcg_to_workflow.values()))
     data_condition_groups = fetch_data_condition_groups(list(dcg_to_groups.keys()))
 
-    _ = get_condition_query_groups(
+    # Get unique query groups to query Snuba
+    condition_groups = get_condition_query_groups(
         data_condition_groups, dcg_to_groups, dcg_to_workflow, workflows_to_envs
     )
+    condition_group_results = get_condition_group_results(condition_groups)
+
+    # Evaluate DCGs
+    _ = get_groups_to_fire(
+        data_condition_groups,
+        workflows_to_envs,
+        dcg_to_workflow,
+        dcg_to_groups,
+        condition_group_results,
+    )
 
-    # TODO(cathy): fetch results from snuba
-    # TODO(cathy): evaluate condition groups
     # TODO(cathy): fire actions on passing groups
     # TODO(cathy): clean up redis buffer
 

+ 0 - 1
src/sentry/workflow_engine/types.py

@@ -47,7 +47,6 @@ class WorkflowJob(EventJob, total=False):
     has_alert: bool
     has_escalated: bool
     workflow: Workflow
-    snuba_results: list[int]  # TODO - @saponifi3 / TODO(cathy): audit this
 
 
 class ActionHandler:

+ 8 - 0
tests/sentry/workflow_engine/handlers/condition/test_base.py

@@ -39,6 +39,14 @@ class ConditionTestCase(BaseWorkflowTest):
     def assert_does_not_pass(self, data_condition: DataCondition, job: WorkflowJob) -> None:
         assert data_condition.evaluate_value(job) != data_condition.get_condition_result()
 
+    def assert_slow_condition_passes(self, data_condition: DataCondition, value: list[int]) -> None:
+        assert data_condition.evaluate_value(value) == data_condition.get_condition_result()
+
+    def assert_slow_condition_does_not_pass(
+        self, data_condition: DataCondition, value: list[int]
+    ) -> None:
+        assert data_condition.evaluate_value(value) != data_condition.get_condition_result()
+
     # TODO: activity
 
 

+ 8 - 17
tests/sentry/workflow_engine/handlers/condition/test_event_frequency_handlers.py

@@ -8,7 +8,6 @@ from sentry.rules.conditions.event_frequency import (
     EventUniqueUserFrequencyCondition,
 )
 from sentry.workflow_engine.models.data_condition import Condition
-from sentry.workflow_engine.types import WorkflowJob
 from tests.sentry.workflow_engine.handlers.condition.test_base import ConditionTestCase
 
 
@@ -21,10 +20,6 @@ class TestEventFrequencyCountCondition(ConditionTestCase):
         "comparisonType": ComparisonType.COUNT,
     }
 
-    def setUp(self):
-        super().setUp()
-        self.job = WorkflowJob({"event": self.group_event})
-
     def test_count(self):
         dc = self.create_data_condition(
             type=self.condition,
@@ -32,11 +27,11 @@ class TestEventFrequencyCountCondition(ConditionTestCase):
             condition_result=True,
         )
 
-        self.job["snuba_results"] = [self.payload["value"] + 1]
-        self.assert_passes(dc, self.job)
+        results = [self.payload["value"] + 1]
+        self.assert_slow_condition_passes(dc, results)
 
-        self.job["snuba_results"] = [self.payload["value"] - 1]
-        self.assert_does_not_pass(dc, self.job)
+        results = [self.payload["value"] - 1]
+        self.assert_slow_condition_does_not_pass(dc, results)
 
     def test_dual_write_count(self):
         dcg = self.create_data_condition_group()
@@ -83,10 +78,6 @@ class TestEventFrequencyPercentCondition(ConditionTestCase):
         "comparisonInterval": "1d",
     }
 
-    def setUp(self):
-        super().setUp()
-        self.job = WorkflowJob({"event": self.group_event})
-
     def test_percent(self):
         dc = self.create_data_condition(
             type=self.condition,
@@ -98,11 +89,11 @@ class TestEventFrequencyPercentCondition(ConditionTestCase):
             condition_result=True,
         )
 
-        self.job["snuba_results"] = [16, 10]
-        self.assert_passes(dc, self.job)
+        results = [16, 10]
+        self.assert_slow_condition_passes(dc, results)
 
-        self.job["snuba_results"] = [10, 10]
-        self.assert_does_not_pass(dc, self.job)
+        results = [10, 10]
+        self.assert_slow_condition_does_not_pass(dc, results)
 
     def test_dual_write_percent(self):
         dcg = self.create_data_condition_group()

+ 2 - 2
tests/sentry/workflow_engine/processors/test_data_condition_group.py

@@ -216,8 +216,8 @@ class TestEvaluateConditionGroupWithSlowConditions(TestCase):
 
     def test_execute_slow_conditions(self):
         (logic_result, condition_results), remaining_conditions = process_data_condition_group(
-            self.data_condition_group.id,
-            {"snuba_results": [10]},
+            self.data_condition_group,
+            [10],
             False,
         )
 

+ 155 - 6
tests/sentry/workflow_engine/processors/test_delayed_workflow.py

@@ -13,6 +13,10 @@ from sentry.rules.processing.delayed_processing import fetch_project
 from sentry.testutils.helpers.datetime import before_now, freeze_time
 from sentry.testutils.helpers.redis import mock_redis_buffer
 from sentry.utils import json
+from sentry.workflow_engine.handlers.condition.slow_condition_query_handlers import (
+    EventFrequencyQueryHandler,
+    EventUniqueUserFrequencyQueryHandler,
+)
 from sentry.workflow_engine.models import DataCondition, DataConditionGroup, Detector, Workflow
 from sentry.workflow_engine.models.data_condition import (
     PERCENT_CONDITIONS,
@@ -28,6 +32,7 @@ from sentry.workflow_engine.processors.delayed_workflow import (
     get_condition_group_results,
     get_condition_query_groups,
     get_dcg_group_workflow_detector_data,
+    get_groups_to_fire,
 )
 from sentry.workflow_engine.processors.workflow import WORKFLOW_ENGINE_BUFFER_LIST_KEY
 from sentry.workflow_engine.types import DataConditionHandlerType
@@ -106,6 +111,13 @@ class TestDelayedWorkflowBase(BaseWorkflowTest, BaseEventFrequencyPercentTest):
             self.detector_dcg.id
         ] = self.detector.id
 
+        self.dcg_to_workflow = self.trigger_type_to_dcg_model[
+            DataConditionHandlerType.WORKFLOW_TRIGGER
+        ].copy()
+        self.dcg_to_workflow.update(
+            self.trigger_type_to_dcg_model[DataConditionHandlerType.ACTION_FILTER]
+        )
+
         self.mock_redis_buffer = mock_redis_buffer()
         self.mock_redis_buffer.__enter__()
 
@@ -250,13 +262,8 @@ class TestDelayedWorkflowHelpers(TestDelayedWorkflowBase):
         assert trigger_type_to_dcg_model == self.trigger_type_to_dcg_model
 
     def test_fetch_workflows_envs(self):
-        dcg_to_workflow = self.trigger_type_to_dcg_model[DataConditionHandlerType.WORKFLOW_TRIGGER]
-        dcg_to_workflow.update(
-            self.trigger_type_to_dcg_model[DataConditionHandlerType.ACTION_FILTER]
-        )
-
         workflow_ids_to_workflows, workflows_to_envs = fetch_workflows_envs(
-            list(dcg_to_workflow.values())
+            list(self.dcg_to_workflow.values())
         )
         assert workflows_to_envs == {
             self.workflow1.id: self.environment.id,
@@ -490,3 +497,145 @@ class TestGetSnubaResults(BaseWorkflowTest):
             count_query: {group_id: 4},
             offset_percent_query: {group_id: 1},
         }
+
+
+class TestGetGroupsToFire(TestDelayedWorkflowBase):
+    def setUp(self):
+        super().setUp()
+
+        self.data_condition_groups = self.workflow1_dcgs + self.workflow2_dcgs + [self.detector_dcg]
+        self.dcg_to_groups[self.detector_dcg.id] = {self.group1.id}
+        self.workflows_to_envs = {self.workflow1.id: self.environment.id, self.workflow2.id: None}
+        self.condition_group_results = {
+            UniqueConditionQuery(
+                handler=EventFrequencyQueryHandler,
+                interval="1h",
+                environment_id=self.environment.id,
+            ): {self.group1.id: 101, self.group2.id: 101},
+            UniqueConditionQuery(
+                handler=EventFrequencyQueryHandler,
+                interval="1h",
+                comparison_interval="1w",
+                environment_id=self.environment.id,
+            ): {self.group1.id: 50, self.group2.id: 50},
+            UniqueConditionQuery(
+                handler=EventFrequencyQueryHandler, interval="1h", environment_id=None
+            ): {self.group1.id: 101, self.group2.id: 101},
+            UniqueConditionQuery(
+                handler=EventFrequencyQueryHandler,
+                interval="1h",
+                comparison_interval="1w",
+                environment_id=None,
+            ): {self.group1.id: 202, self.group2.id: 202},
+            UniqueConditionQuery(
+                handler=EventUniqueUserFrequencyQueryHandler,
+                interval="1h",
+                environment_id=self.environment.id,
+            ): {self.group1.id: 101, self.group2.id: 101},
+            UniqueConditionQuery(
+                handler=EventUniqueUserFrequencyQueryHandler, interval="1h", environment_id=None
+            ): {self.group1.id: 50, self.group2.id: 50},
+        }
+
+        # add slow condition to workflow1 IF dcg (ALL), passes
+        self.create_data_condition(
+            condition_group=self.workflow1_dcgs[1],
+            type=Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT,
+            comparison={"interval": "1h", "value": 100},
+            condition_result=True,
+        )
+        # add slow condition to detector WHEN dcg (ANY), passes but not in result
+        self.create_data_condition(
+            condition_group=self.detector_dcg,
+            type=Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT,
+            comparison={"interval": "1h", "value": 100},
+            condition_result=True,
+        )
+        # add slow condition to workflow2 WHEN dcg (ANY), fails but the DCG itself passes
+        self.create_data_condition(
+            condition_group=self.workflow2_dcgs[0],
+            type=Condition.EVENT_UNIQUE_USER_FREQUENCY_COUNT,
+            comparison={"interval": "1h", "value": 100},
+            condition_result=True,
+        )
+
+    def test_simple(self):
+        result = get_groups_to_fire(
+            self.data_condition_groups,
+            self.workflows_to_envs,
+            self.dcg_to_workflow,
+            self.dcg_to_groups,
+            self.condition_group_results,
+        )
+
+        assert result == {
+            self.group1.id: set(self.workflow1_dcgs),  # WHEN dcg (ANY-short), IF dcg (ALL)
+            self.group2.id: {self.workflow2_dcgs[0]},  # WHEN dcg (ANY-short)
+        }
+
+    def test_dcg_all_fails(self):
+        self.condition_group_results.update(
+            {
+                UniqueConditionQuery(
+                    handler=EventUniqueUserFrequencyQueryHandler,
+                    interval="1h",
+                    environment_id=self.environment.id,
+                ): {self.group1.id: 99}
+            }
+        )
+
+        result = get_groups_to_fire(
+            self.data_condition_groups,
+            self.workflows_to_envs,
+            self.dcg_to_workflow,
+            self.dcg_to_groups,
+            self.condition_group_results,
+        )
+
+        assert result == {
+            self.group1.id: {self.workflow1_dcgs[0]},  # WHEN dcg (ANY-short)
+            self.group2.id: {self.workflow2_dcgs[0]},  # WHEN dcg (ANY-short)
+        }
+
+    def test_dcg_any_fails(self):
+        self.condition_group_results.update(
+            {
+                UniqueConditionQuery(
+                    handler=EventFrequencyQueryHandler, interval="1h", environment_id=None
+                ): {self.group2.id: 99}
+            }
+        )
+
+        result = get_groups_to_fire(
+            self.data_condition_groups,
+            self.workflows_to_envs,
+            self.dcg_to_workflow,
+            self.dcg_to_groups,
+            self.condition_group_results,
+        )
+
+        assert result == {
+            self.group1.id: set(self.workflow1_dcgs),  # WHEN dcg (ANY-short), IF dcg (ALL)
+        }
+
+    def test_multiple_dcgs_per_group(self):
+        for dcg in self.workflow1_dcgs:
+            self.dcg_to_groups[dcg.id].add(self.group2.id)
+        for dcg in self.workflow2_dcgs:
+            self.dcg_to_groups[dcg.id].add(self.group1.id)
+
+        result = get_groups_to_fire(
+            self.data_condition_groups,
+            self.workflows_to_envs,
+            self.dcg_to_workflow,
+            self.dcg_to_groups,
+            self.condition_group_results,
+        )
+
+        # all dcgs except workflow 2 IF, which never passes
+        assert result == {
+            self.group1.id: set(self.workflow1_dcgs + [self.workflow2_dcgs[0]]),
+            self.group2.id: set(
+                self.workflow1_dcgs + [self.workflow2_dcgs[0]],
+            ),
+        }