typed_code.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. #!/usr/bin/env python
  2. import argparse
  3. import configparser
  4. import hashlib
  5. import logging
  6. import os
  7. import re
  8. import subprocess
  9. import sys
  10. from collections import defaultdict
  11. from typing import Any, Mapping, MutableMapping, Optional, Set
  12. from codeowners import CodeOwners
  13. """
  14. Calculate python typing progress by teams as determined by CODEOWNERS.
  15. """
  16. BAR_LENGTH = 60
  17. UNOWNED_KEY = "other"
  18. TOTALS_KEY = "TOTAL"
  19. CACHE_SEPARATOR = "\t"
  20. TEAM_REGEX = re.compile(r"@\S+/\S+")
  21. # TODO pass directories and ignores as parameters
  22. ROOT = {"src/"}
  23. # TODO make these regexes
  24. IGNORE = {"src/sentry/migrations/"}
  25. # Collect config files
  26. BASE_DIR = os.getcwd()
  27. config_filename = os.path.join(BASE_DIR, "mypy.ini")
  28. codeowners_filename = os.path.join(BASE_DIR, ".github/CODEOWNERS")
  29. logger = logging.getLogger(__name__)
  30. logging.basicConfig(stream=sys.stdout, level=logging.INFO)
  31. def get_source_files() -> Set[str]:
  32. logger.debug(f"get_source_files {config_filename}")
  33. config = configparser.ConfigParser()
  34. config.read(config_filename)
  35. files = config["mypy"]["files"]
  36. logger.debug(files)
  37. return {filename.strip() for filename in files.split(",")}
  38. def flatten_directories(paths: Set[str]) -> Set[str]:
  39. """
  40. For a list of files, recursively turn the directories into lists of their
  41. component files while passing along non-directories.
  42. """
  43. result = set()
  44. for path in paths:
  45. if path in IGNORE:
  46. continue
  47. if os.path.isdir(path):
  48. next_level = {os.path.join(path, x) for x in os.listdir(path)}
  49. flattened = flatten_directories(next_level)
  50. result.update(flattened)
  51. elif path.endswith(".py"):
  52. result.add(path)
  53. return result
  54. def get_all_teams(team: Optional[str] = None) -> Set[str]:
  55. """
  56. Re-read the codeowners file looking for team names. This isn't a full
  57. solution because it doesn't skip commented lines. I wish the codeowners
  58. parse did this for us.
  59. """
  60. if team:
  61. return {team}
  62. teams = set()
  63. with open(codeowners_filename) as f:
  64. for line in f.readlines():
  65. teams.update(TEAM_REGEX.findall(line))
  66. logger.debug("All teams")
  67. logger.debug("\n".join(teams))
  68. return teams
  69. def split_files_by_codeowner(files: Set[str], codeowners: Any) -> MutableMapping[str, Set[str]]:
  70. """
  71. Given a list of filenames and a codeowners objects, split the files up by
  72. owner. This isn't a full solution because it doesn't handle multiple owners
  73. on a file.
  74. """
  75. files_by_codeowner = defaultdict(set)
  76. for filename in files:
  77. owners = codeowners.of(filename)
  78. logger.debug(f"{filename} {owners}")
  79. owners = {owner[1] for owner in owners} if owners else {UNOWNED_KEY}
  80. for owner in owners:
  81. files_by_codeowner[owner].add(filename)
  82. return files_by_codeowner
  83. def load_cache(filename: Optional[str] = None) -> MutableMapping[str, int]:
  84. logger.debug(f"loading cache from {filename}")
  85. if not (filename and os.path.exists(filename)):
  86. logger.debug("file not found")
  87. return {}
  88. cache = {}
  89. with open(filename) as f:
  90. try:
  91. for line in f.readlines():
  92. key, value = line.split(CACHE_SEPARATOR)
  93. cache[key] = int(value)
  94. except (AttributeError, OSError, TypeError, ValueError):
  95. return {}
  96. return cache
  97. def store_cache(cache: Mapping[str, int], filename: str) -> None:
  98. # TODO We don't garbage collect stale hashes so the file cache will continue
  99. # to grow indefinitely.
  100. if not filename:
  101. return
  102. with open(filename, "w") as f:
  103. for key, value in cache.items():
  104. f.write(f"{key}{CACHE_SEPARATOR}{value}\n")
  105. def hash_file(filename: str) -> str:
  106. """https://stackoverflow.com/questions/22733826"""
  107. func = hashlib.md5()
  108. with open(filename, "rb") as f:
  109. while True:
  110. block = f.read(1024 * func.block_size)
  111. if not block:
  112. break
  113. func.update(block)
  114. return func.hexdigest()
  115. def analyze_file(file: str, cache: MutableMapping[str, int]) -> int:
  116. """Evan"s algorithm for grabbing LOC from a file."""
  117. filename = os.path.join(BASE_DIR, file)
  118. key = hash_file(filename)
  119. cached_value = cache.get(key)
  120. if cached_value is not None:
  121. logger.debug(f"cache hit {filename}")
  122. return cached_value
  123. logger.debug(f"cache size {len(cache.keys())}")
  124. logger.debug(f"cache miss {filename} {key}")
  125. proc_cmd = f"pygount {filename} --format=summary --suffix=py"
  126. proc = subprocess.run(proc_cmd.split(" "), capture_output=True)
  127. output = proc.stdout.decode("utf-8")
  128. value = int(output.split("\n")[-2].split()[-2])
  129. cache[key] = value
  130. return value
  131. def total_lines(files: Set[str], cache: MutableMapping[str, int], status: str = "") -> int:
  132. """Gets the total lines and primes the cache."""
  133. total = 0
  134. for i, file in enumerate(files):
  135. total += analyze_file(file, cache)
  136. progress(i, len(files), status)
  137. return total
  138. def analyze_files(
  139. files: Set[str],
  140. codeowners: Any,
  141. cache: MutableMapping[str, int],
  142. teams: Set[str],
  143. status: str = "",
  144. ) -> Mapping[str, int]:
  145. logger.debug(f"file count {len(files)}")
  146. logger.debug(f"teams: {teams}")
  147. # This is slow.
  148. total = total_lines(files, cache, status)
  149. files_by_codeowner = split_files_by_codeowner(files, codeowners)
  150. count_by_team = defaultdict(int)
  151. for team in teams:
  152. subset_of_files = files_by_codeowner.get(team, [])
  153. logger.debug(f"{team} {len(subset_of_files)}")
  154. for file in subset_of_files:
  155. value = analyze_file(file, cache)
  156. count_by_team[team] += value
  157. logger.debug(f"{value} {file}")
  158. logger.debug(count_by_team)
  159. count_by_team[TOTALS_KEY] = total
  160. return count_by_team
  161. def get_result(
  162. covered_by_team: Mapping[str, int],
  163. not_covered_by_team: Mapping[str, int],
  164. team: str,
  165. ) -> float:
  166. covered = covered_by_team.get(team, 0)
  167. total = covered + not_covered_by_team.get(team, 0)
  168. return ((float(covered) / float(total)) * 100) if total else 0.0
  169. def print_results(
  170. covered_by_team: Mapping[str, int],
  171. not_covered_by_team: Mapping[str, int],
  172. teams: Set[str],
  173. ) -> None:
  174. """Pretty print the results."""
  175. tuples = sorted(
  176. ((team, get_result(covered_by_team, not_covered_by_team, team)) for team in teams),
  177. key=lambda x: x[1],
  178. ) + [(TOTALS_KEY, get_result(covered_by_team, not_covered_by_team, TOTALS_KEY))]
  179. bar = "=" * int(BAR_LENGTH / 2)
  180. print(f"{bar} Python coverage by team {bar}") # NOQA S002
  181. for team, percent in tuples:
  182. if percent:
  183. print(f"{team:<32} {(percent):.2f}%") # NOQA S002
  184. def setup_args() -> Any:
  185. # TODO take a config file
  186. parser = argparse.ArgumentParser(
  187. description="Generate a python typing report",
  188. )
  189. parser.add_argument(
  190. "--verbose",
  191. "-v",
  192. action="store_true",
  193. help="run script in debug mode",
  194. )
  195. parser.add_argument(
  196. "--team",
  197. "-t",
  198. action="store",
  199. type=str,
  200. help="only run analytics on this team",
  201. )
  202. parser.add_argument(
  203. "--cache",
  204. "-c",
  205. action="store",
  206. type=str,
  207. help="the location of a cache file",
  208. )
  209. return parser.parse_args()
  210. def progress(count: int, total: int, status: str = "") -> None:
  211. """
  212. https://gist.github.com/vladignatyev/06860ec2040cb497f0f3
  213. """
  214. if logger.level == logging.DEBUG:
  215. # progress is incompatible with logger for just don't try.
  216. return
  217. filled_len = int(round(BAR_LENGTH * count / float(total)))
  218. percents = round(100.0 * count / float(total), 1)
  219. bar = "=" * filled_len + "-" * (BAR_LENGTH - filled_len)
  220. sys.stdout.write(f"[{bar}] {percents}% ...{status}\r")
  221. sys.stdout.flush()
  222. def main() -> None:
  223. args = setup_args()
  224. if args.verbose:
  225. logger.setLevel(logging.DEBUG)
  226. with open(codeowners_filename) as f:
  227. codeowners = CodeOwners("\n".join(f.readlines()))
  228. covered_files = flatten_directories(get_source_files())
  229. all_files = flatten_directories(ROOT)
  230. cache = load_cache(args.cache)
  231. teams = get_all_teams(team=args.team)
  232. covered = analyze_files(covered_files, codeowners, cache, teams=teams, status="mypy.ini")
  233. # If the team has no coverage, then don't bother getting the denominator.
  234. teams_with_covered_lines = {t for t in teams if covered.get(t, 0) > 0}
  235. not_covered = analyze_files(
  236. all_files - covered_files, codeowners, cache, teams=teams_with_covered_lines, status="root"
  237. )
  238. store_cache(cache, args.cache)
  239. print_results(covered, not_covered, teams)
  240. if __name__ == "__main__":
  241. main()