typed_code.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. #!/usr/bin/env python
  2. from __future__ import annotations
  3. import argparse
  4. import configparser
  5. import hashlib
  6. import logging
  7. import os
  8. import re
  9. import subprocess
  10. import sys
  11. from collections import defaultdict
  12. from typing import Any, Mapping, MutableMapping, Optional, Set
  13. from codeowners import CodeOwners
  14. """
  15. Calculate python typing progress by teams as determined by CODEOWNERS.
  16. """
  17. BAR_LENGTH = 60
  18. UNOWNED_KEY = "other"
  19. TOTALS_KEY = "TOTAL"
  20. CACHE_SEPARATOR = "\t"
  21. TEAM_REGEX = re.compile(r"@\S+/\S+")
  22. # TODO pass directories and ignores as parameters
  23. ROOT = {"src/"}
  24. # TODO make these regexes
  25. IGNORE = {"src/sentry/migrations/"}
  26. # Collect config files
  27. BASE_DIR = os.getcwd()
  28. config_filename = os.path.join(BASE_DIR, "mypy.ini")
  29. codeowners_filename = os.path.join(BASE_DIR, ".github/CODEOWNERS")
  30. logger = logging.getLogger(__name__)
  31. logging.basicConfig(stream=sys.stdout, level=logging.INFO)
  32. def get_source_files() -> Set[str]:
  33. logger.debug(f"get_source_files {config_filename}")
  34. config = configparser.ConfigParser()
  35. config.read(config_filename)
  36. files = config["mypy"]["files"]
  37. logger.debug(files)
  38. return {filename.strip() for filename in files.split(",")}
  39. def flatten_directories(paths: Set[str]) -> Set[str]:
  40. """
  41. For a list of files, recursively turn the directories into lists of their
  42. component files while passing along non-directories.
  43. """
  44. result = set()
  45. for path in paths:
  46. if path in IGNORE:
  47. continue
  48. if os.path.isdir(path):
  49. next_level = {os.path.join(path, x) for x in os.listdir(path)}
  50. flattened = flatten_directories(next_level)
  51. result.update(flattened)
  52. elif path.endswith(".py"):
  53. result.add(path)
  54. return result
  55. def get_all_teams(team: Optional[str] = None) -> Set[str]:
  56. """
  57. Re-read the codeowners file looking for team names. This isn't a full
  58. solution because it doesn't skip commented lines. I wish the codeowners
  59. parse did this for us.
  60. """
  61. if team:
  62. return {team}
  63. teams = set()
  64. with open(codeowners_filename) as f:
  65. for line in f.readlines():
  66. teams.update(TEAM_REGEX.findall(line))
  67. logger.debug("All teams")
  68. logger.debug("\n".join(teams))
  69. return teams
  70. def split_files_by_codeowner(files: Set[str], codeowners: Any) -> MutableMapping[str, Set[str]]:
  71. """
  72. Given a list of filenames and a codeowners objects, split the files up by
  73. owner. This isn't a full solution because it doesn't handle multiple owners
  74. on a file.
  75. """
  76. files_by_codeowner = defaultdict(set)
  77. for filename in files:
  78. owners = codeowners.of(filename)
  79. logger.debug(f"{filename} {owners}")
  80. owners = {owner[1] for owner in owners} if owners else {UNOWNED_KEY}
  81. for owner in owners:
  82. files_by_codeowner[owner].add(filename)
  83. return files_by_codeowner
  84. def load_cache(filename: Optional[str] = None) -> MutableMapping[str, int]:
  85. logger.debug(f"loading cache from {filename}")
  86. if not (filename and os.path.exists(filename)):
  87. logger.debug("file not found")
  88. return {}
  89. cache = {}
  90. with open(filename) as f:
  91. try:
  92. for line in f.readlines():
  93. key, value = line.split(CACHE_SEPARATOR)
  94. cache[key] = int(value)
  95. except (AttributeError, OSError, TypeError, ValueError):
  96. return {}
  97. return cache
  98. def store_cache(cache: Mapping[str, int], filename: str) -> None:
  99. # TODO We don't garbage collect stale hashes so the file cache will continue
  100. # to grow indefinitely.
  101. if not filename:
  102. return
  103. with open(filename, "w") as f:
  104. for key, value in cache.items():
  105. f.write(f"{key}{CACHE_SEPARATOR}{value}\n")
  106. def hash_file(filename: str) -> str:
  107. """https://stackoverflow.com/questions/22733826"""
  108. func = hashlib.md5()
  109. with open(filename, "rb") as f:
  110. while True:
  111. block = f.read(1024 * func.block_size)
  112. if not block:
  113. break
  114. func.update(block)
  115. return func.hexdigest()
  116. def analyze_file(file: str, cache: MutableMapping[str, int]) -> int:
  117. """Evan"s algorithm for grabbing LOC from a file."""
  118. filename = os.path.join(BASE_DIR, file)
  119. key = hash_file(filename)
  120. cached_value = cache.get(key)
  121. if cached_value is not None:
  122. logger.debug(f"cache hit {filename}")
  123. return cached_value
  124. logger.debug(f"cache size {len(cache.keys())}")
  125. logger.debug(f"cache miss {filename} {key}")
  126. proc_cmd = f"pygount {filename} --format=summary --suffix=py"
  127. proc = subprocess.run(proc_cmd.split(" "), capture_output=True)
  128. output = proc.stdout.decode("utf-8")
  129. value = int(output.split("\n")[-2].split()[-2])
  130. cache[key] = value
  131. return value
  132. def total_lines(files: Set[str], cache: MutableMapping[str, int], status: str = "") -> int:
  133. """Gets the total lines and primes the cache."""
  134. total = 0
  135. for i, file in enumerate(files):
  136. total += analyze_file(file, cache)
  137. progress(i, len(files), status)
  138. return total
  139. def analyze_files(
  140. files: Set[str],
  141. codeowners: Any,
  142. cache: MutableMapping[str, int],
  143. teams: Set[str],
  144. status: str = "",
  145. ) -> Mapping[str, int]:
  146. logger.debug(f"file count {len(files)}")
  147. logger.debug(f"teams: {teams}")
  148. # This is slow.
  149. total = total_lines(files, cache, status)
  150. files_by_codeowner = split_files_by_codeowner(files, codeowners)
  151. count_by_team: defaultdict[str, int] = defaultdict(int)
  152. for team in teams:
  153. subset_of_files: set[str] = files_by_codeowner.get(team, set())
  154. logger.debug(f"{team} {len(subset_of_files)}")
  155. for file in subset_of_files:
  156. value = analyze_file(file, cache)
  157. count_by_team[team] += value
  158. logger.debug(f"{value} {file}")
  159. logger.debug(count_by_team)
  160. count_by_team[TOTALS_KEY] = total
  161. return count_by_team
  162. def get_result(
  163. covered_by_team: Mapping[str, int],
  164. not_covered_by_team: Mapping[str, int],
  165. team: str,
  166. ) -> float:
  167. covered = covered_by_team.get(team, 0)
  168. total = covered + not_covered_by_team.get(team, 0)
  169. return ((float(covered) / float(total)) * 100) if total else 0.0
  170. def print_results(
  171. covered_by_team: Mapping[str, int],
  172. not_covered_by_team: Mapping[str, int],
  173. teams: Set[str],
  174. ) -> None:
  175. """Pretty print the results."""
  176. tuples = sorted(
  177. ((team, get_result(covered_by_team, not_covered_by_team, team)) for team in teams),
  178. key=lambda x: x[1],
  179. ) + [(TOTALS_KEY, get_result(covered_by_team, not_covered_by_team, TOTALS_KEY))]
  180. bar = "=" * int(BAR_LENGTH / 2)
  181. print(f"{bar} Python coverage by team {bar}") # NOQA S002
  182. for team, percent in tuples:
  183. if percent:
  184. print(f"{team:<32} {(percent):.2f}%") # NOQA S002
  185. def setup_args() -> Any:
  186. # TODO take a config file
  187. parser = argparse.ArgumentParser(
  188. description="Generate a python typing report",
  189. )
  190. parser.add_argument(
  191. "--verbose",
  192. "-v",
  193. action="store_true",
  194. help="run script in debug mode",
  195. )
  196. parser.add_argument(
  197. "--team",
  198. "-t",
  199. action="store",
  200. type=str,
  201. help="only run analytics on this team",
  202. )
  203. parser.add_argument(
  204. "--cache",
  205. "-c",
  206. action="store",
  207. type=str,
  208. help="the location of a cache file",
  209. )
  210. return parser.parse_args()
  211. def progress(count: int, total: int, status: str = "") -> None:
  212. """
  213. https://gist.github.com/vladignatyev/06860ec2040cb497f0f3
  214. """
  215. if logger.level == logging.DEBUG:
  216. # progress is incompatible with logger for just don't try.
  217. return
  218. filled_len = int(round(BAR_LENGTH * count / float(total)))
  219. percents = round(100.0 * count / float(total), 1)
  220. bar = "=" * filled_len + "-" * (BAR_LENGTH - filled_len)
  221. sys.stdout.write(f"[{bar}] {percents}% ...{status}\r")
  222. sys.stdout.flush()
  223. def main() -> None:
  224. args = setup_args()
  225. if args.verbose:
  226. logger.setLevel(logging.DEBUG)
  227. with open(codeowners_filename) as f:
  228. codeowners = CodeOwners("\n".join(f.readlines()))
  229. covered_files = flatten_directories(get_source_files())
  230. all_files = flatten_directories(ROOT)
  231. cache = load_cache(args.cache)
  232. teams = get_all_teams(team=args.team)
  233. covered = analyze_files(covered_files, codeowners, cache, teams=teams, status="mypy.ini")
  234. # If the team has no coverage, then don't bother getting the denominator.
  235. teams_with_covered_lines = {t for t in teams if covered.get(t, 0) > 0}
  236. not_covered = analyze_files(
  237. all_files - covered_files, codeowners, cache, teams=teams_with_covered_lines, status="root"
  238. )
  239. store_cache(cache, args.cache)
  240. print_results(covered, not_covered, teams)
  241. if __name__ == "__main__":
  242. main()