Browse Source

ref(auditlog): Add audit log event manager (#33337)

* ref(auditlog): Add audit log event manager

* Add tests for audit log event manager

* Move get_note  method to render  on the dataclass

* Adjust typeing for render field
Maggie Bauer 2 years ago
parent
commit
9619d44752

+ 1 - 0
src/sentry/audit_log/__init__.py

@@ -0,0 +1 @@
+from .manager import *  # NOQA

+ 57 - 0
src/sentry/audit_log/manager.py

@@ -0,0 +1,57 @@
+from dataclasses import dataclass
+from typing import Callable, List
+
+import sentry_sdk
+
+from sentry.models.auditlogentry import AuditLogEntry
+
+
+@dataclass
+class AuditLogEvent:
+    event_id: int
+    name: str
+    api_name: str
+    # render holds a function that will determine the message to be displayed in the audit log
+    render: Callable[[AuditLogEntry], str]
+
+
+class AuditLogEventManager:
+    _event_registry = {}
+    _event_id_lookup = {}
+
+    def add(self, audit_log_event: AuditLogEvent):
+        if (
+            audit_log_event.name in self._event_registry
+            or audit_log_event.event_id in self._event_id_lookup
+        ):
+            # TODO(mbauer404): raise exception once getsentry specific audit logs
+            # have been permanently moved from sentry into getsentry
+            with sentry_sdk.push_scope() as scope:
+                scope.level = "warning"
+                sentry_sdk.capture_message(
+                    f"Duplicate audit log: {audit_log_event.name} with ID {audit_log_event.event_id}"
+                )
+            return
+
+        self._event_registry[audit_log_event.name] = audit_log_event
+        self._event_id_lookup[audit_log_event.event_id] = audit_log_event
+
+    def get(self, event_id: int) -> "AuditLogEvent":
+        if event_id not in self._event_id_lookup:
+            raise Exception("Event ID does not exist")
+        return self._event_id_lookup[event_id]
+
+    def get_event_id(self, name: str) -> int:
+        if name not in self._event_registry:
+            raise Exception("Event does not exist")
+        return self._event_registry[name].event_id
+
+    def get_api_names(self) -> List[str]:
+        # returns a list of all the api names
+        api_names: list = []
+        for audit_log_event in self._event_registry.values():
+            api_names.append(audit_log_event.api_name)
+        return api_names
+
+
+audit_log_manager = AuditLogEventManager()

+ 31 - 0
tests/sentry/audit_log/test_manager.py

@@ -0,0 +1,31 @@
+from django.utils import timezone
+
+from sentry.audit_log import AuditLogEvent, audit_log_manager
+from sentry.models import AuditLogEntry, AuditLogEntryEvent
+from sentry.testutils import TestCase
+
+
+class AuditLogEventManagerTest(TestCase):
+    def test_audit_log_manager(self):
+        log_event = AuditLogEvent(
+            event_id=1,
+            name="member_invite",
+            api_name="member.invite",
+            render=lambda audit_log_entry: "invited member {email}".format(**audit_log_entry.data),
+        )
+
+        log_entry = AuditLogEntry.objects.create(
+            organization=self.organization,
+            event=AuditLogEntryEvent.MEMBER_INVITE,
+            actor=self.user,
+            datetime=timezone.now(),
+            data={"email": "my_email@mail.com"},
+        )
+
+        audit_log_manager.add(log_event)
+
+        assert audit_log_manager.get(event_id=1) == log_event
+        assert audit_log_manager.get_event_id(name="member_invite") == 1
+        assert audit_log_manager.get_api_names() == ["member.invite"]
+
+        assert log_event.render(log_entry) == "invited member my_email@mail.com"