123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- #!/usr/bin/env sentry exec
- from __future__ import annotations
- import abc
- import json # noqa - I want the `indent` param
- import sys
- from collections import defaultdict
- import django.apps
- import django.urls
- def audit_silo_limits(format="json"):
- """Lists which classes have had silo decorators applied."""
- from sentry.runner import configure
- configure()
- model_table = create_model_table()
- endpoint_table = create_endpoint_table()
- if format == "json":
- json_repr = {
- "models": ModelPresentation().as_json_repr(model_table),
- "endpoints": EndpointPresentation().as_json_repr(endpoint_table),
- }
- json.dump(json_repr, sys.stdout, indent=4)
- elif format == "markdown":
- ModelPresentation().print_markdown(model_table)
- EndpointPresentation().print_markdown(endpoint_table)
- else:
- raise ValueError
- def create_model_table():
- table = defaultdict(list)
- for model_class in django.apps.apps.get_models():
- if model_class._meta.app_label != "sentry":
- continue
- limit = getattr(model_class._meta, "_ModelSiloLimit__silo_limit", None)
- key = (limit.modes, limit.read_only) if limit else None
- table[key].append(model_class)
- return table
- def create_endpoint_table():
- from sentry.api.base import Endpoint
- def is_endpoint(view_function, bindings):
- view_class = getattr(view_function, "view_class", None)
- return view_class and issubclass(view_class, Endpoint)
- def get_endpoint_classes():
- url_mappings = list(django.urls.get_resolver().reverse_dict.items())
- for (view_function, bindings) in url_mappings:
- if is_endpoint(view_function, bindings):
- yield view_function.view_class
- table = defaultdict(list)
- for endpoint_class in get_endpoint_classes():
- limit = getattr(endpoint_class, "__silo_limit", None)
- key = limit.modes if limit else None
- table[key].append(endpoint_class)
- return table
- class ConsolePresentation(abc.ABC):
- @property
- @abc.abstractmethod
- def table_label(self):
- raise NotImplementedError
- @abc.abstractmethod
- def order(self, group):
- raise NotImplementedError
- @abc.abstractmethod
- def get_group_label(self, key):
- raise NotImplementedError
- @abc.abstractmethod
- def get_key_repr(self, key):
- raise NotImplementedError
- @staticmethod
- def format_mode_set(modes):
- if modes is None:
- return None
- return sorted(str(x) for x in modes)
- @staticmethod
- def format_value(value):
- return f"{value.__module__}.{value.__name__}"
- def normalize_table(self, table):
- return {
- key: sorted({self.format_value(value) for value in group})
- for (key, group) in (sorted(table.items(), key=self.order))
- }
- def as_json_repr(self, table):
- table = self.normalize_table(table)
- return {
- "total_count": sum(len(group) for group in table.values()),
- "decorators": [
- {
- "decorator": self.get_key_repr(group_key),
- "count": len(group),
- "values": group,
- }
- for group_key, group in table.items()
- ],
- }
- def print_markdown(self, table):
- table = self.normalize_table(table)
- total_count = sum(len(group) for group in table.values())
- table_header = f"{self.table_label} ({total_count})"
- print("\n" + table_header) # noqa
- print("=" * len(table_header), end="\n\n") # noqa
- for (group_key, group) in table.items():
- group_label = self.get_group_label(group_key)
- group_header = f"{group_label} ({len(group)})"
- print(group_header) # noqa
- print("-" * len(group_header), end="\n\n") # noqa
- for value in group:
- print(" - " + value) # noqa
- print() # noqa
- class ModelPresentation(ConsolePresentation):
- @property
- def table_label(self):
- return "MODELS"
- def order(self, group):
- group_key, _model_group = group
- if group_key is None:
- return ()
- write_modes, read_modes = group_key
- return (
- len(write_modes),
- len(read_modes),
- self.format_mode_set(write_modes),
- self.format_mode_set(read_modes),
- )
- def get_key_repr(self, key):
- if key is None:
- return None
- write_modes, read_modes = key
- return {
- "write_modes": self.format_mode_set(write_modes),
- "read_modes": self.format_mode_set(read_modes),
- }
- def get_group_label(self, key):
- if key is None:
- return "No decorator"
- write_modes, read_modes = key
- if read_modes:
- return (
- f"{self.format_mode_set(write_modes)}, read_only={self.format_mode_set(read_modes)}"
- )
- else:
- return self.format_mode_set(write_modes)
- class EndpointPresentation(ConsolePresentation):
- @property
- def table_label(self):
- return "VIEWS"
- def order(self, group):
- mode_set, _endpoint_group = group
- return len(mode_set or ()), self.format_mode_set(mode_set)
- def get_group_label(self, key):
- return self.format_mode_set(key) if key else "No decorator"
- def get_key_repr(self, key):
- return self.format_mode_set(key)
- if __name__ == "__main__":
- audit_silo_limits()
|