add_mode_limits.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #!.venv/bin/python
  2. from __future__ import annotations
  3. import os
  4. import re
  5. import sys
  6. from dataclasses import dataclass
  7. from enum import Enum, auto
  8. from typing import Iterable
  9. from sentry.utils import json
  10. """
  11. Instructions for use:
  12. 1. Commit or stash any Git changes in progress.
  13. 2. Scroll down to "Fill these predicates in..." and write what you want to do.
  14. 3. From the Sentry project root, do
  15. ./scripts/servermode/audit_mode_limits.py | ./scripts/servermode/add_mode_limits.py
  16. 4. Do `git status` or `git diff` to observe the results. Commit if you're happy.
  17. """
  18. class ClassCategory(Enum):
  19. MODEL = auto()
  20. VIEW = auto()
  21. @dataclass
  22. class LimitedClass:
  23. package: str
  24. name: str
  25. category: ClassCategory
  26. is_decorated: bool
  27. def parse_audit(audit) -> Iterable[LimitedClass]:
  28. def split_qualname(value):
  29. dot_index = value.rindex(".")
  30. package = value[:dot_index]
  31. name = value[dot_index + 1 :]
  32. return package, name
  33. def parse_group(category, dec_group):
  34. is_decorated = dec_group["decorator"] is not None
  35. for value in dec_group["values"]:
  36. package, name = split_qualname(value)
  37. yield LimitedClass(package, name, category, is_decorated)
  38. for dec_group in audit["models"]["decorators"]:
  39. yield from parse_group(ClassCategory.MODEL, dec_group)
  40. for dec_group in audit["views"]["decorators"]:
  41. yield from parse_group(ClassCategory.VIEW, dec_group)
  42. def read_audit():
  43. pipe_input = sys.stdin.read()
  44. brace_index = pipe_input.index("{")
  45. pipe_input = pipe_input[brace_index:] # strip leading junk
  46. server_mode_audit = json.loads(pipe_input)
  47. return list(parse_audit(server_mode_audit))
  48. def find_source_paths():
  49. for (dirpath, dirnames, filenames) in os.walk("./src/sentry"):
  50. for filename in filenames:
  51. if filename.endswith(".py"):
  52. yield os.path.join(dirpath, filename)
  53. def find_class_declarations():
  54. for src_path in find_source_paths():
  55. with open(src_path) as f:
  56. src_code = f.read()
  57. for match in re.findall(r"\nclass\s+(\w+)\(", src_code):
  58. yield src_path, match
  59. def insert_import(src_code: str, import_stmt: str) -> str:
  60. future_import = None
  61. for future_import in re.finditer(r"from\s+__future__\s+.*\n+", src_code):
  62. pass # iterate to last match
  63. if future_import:
  64. start, end = future_import.span()
  65. return src_code[:end] + import_stmt + "\n" + src_code[end:]
  66. else:
  67. return import_stmt + "\n" + src_code
  68. def apply_decorators(
  69. decorator_name: str,
  70. import_stmt: str,
  71. target_classes: Iterable[LimitedClass],
  72. ) -> None:
  73. target_names = {c.name for c in target_classes if not c.is_decorated}
  74. for src_path, class_name in find_class_declarations():
  75. if class_name in target_names:
  76. with open(src_path) as f:
  77. src_code = f.read()
  78. new_code = re.sub(
  79. rf"\nclass\s+{class_name}\(",
  80. f"\n@{decorator_name}\nclass {class_name}(",
  81. src_code,
  82. )
  83. new_code = insert_import(new_code, import_stmt)
  84. with open(src_path, mode="w") as f:
  85. f.write(new_code)
  86. def main():
  87. classes = read_audit()
  88. def filter_classes(category, predicate):
  89. return (c for c in classes if c.category == category and predicate(c))
  90. ####################################################################
  91. # Fill these predicates in with the logic you want to apply
  92. def control_model_predicate(c: LimitedClass) -> bool:
  93. return False
  94. def customer_model_predicate(c: LimitedClass) -> bool:
  95. return False
  96. def control_endpoint_predicate(c: LimitedClass) -> bool:
  97. return False
  98. def customer_endpoint_predicate(c: LimitedClass) -> bool:
  99. return False
  100. ####################################################################
  101. apply_decorators(
  102. "control_silo_model",
  103. "from sentry.db.models import control_silo_model",
  104. filter_classes(ClassCategory.MODEL, control_model_predicate),
  105. )
  106. apply_decorators(
  107. "customer_silo_model",
  108. "from sentry.db.models import customer_silo_model",
  109. filter_classes(ClassCategory.MODEL, customer_model_predicate),
  110. )
  111. apply_decorators(
  112. "control_silo_endpoint",
  113. "from sentry.api.base import control_silo_endpoint",
  114. filter_classes(ClassCategory.VIEW, control_endpoint_predicate),
  115. )
  116. apply_decorators(
  117. "customer_silo_endpoint",
  118. "from sentry.api.base import customer_silo_endpoint",
  119. filter_classes(ClassCategory.VIEW, customer_endpoint_predicate),
  120. )
  121. if __name__ == "__main__":
  122. main()