|
@@ -0,0 +1,303 @@
|
|
|
+#!/usr/bin/env python
|
|
|
+
|
|
|
+import argparse
|
|
|
+import configparser
|
|
|
+import hashlib
|
|
|
+import logging
|
|
|
+import os
|
|
|
+import re
|
|
|
+import subprocess
|
|
|
+import sys
|
|
|
+from collections import defaultdict
|
|
|
+from typing import Any, Mapping, MutableMapping, Optional, Set
|
|
|
+
|
|
|
+from codeowners import CodeOwners
|
|
|
+
|
|
|
+"""
|
|
|
+Calculate python typing progress by teams as determined by CODEOWNERS.
|
|
|
+"""
|
|
|
+
|
|
|
+BAR_LENGTH = 60
|
|
|
+UNOWNED_KEY = "other"
|
|
|
+TOTALS_KEY = "TOTAL"
|
|
|
+CACHE_SEPARATOR = "\t"
|
|
|
+TEAM_REGEX = re.compile(r"@\S+/\S+")
|
|
|
+# TODO pass directories and ignores as parameters
|
|
|
+ROOT = {"src/"}
|
|
|
+# TODO make these regexes
|
|
|
+IGNORE = {"src/sentry/migrations/"}
|
|
|
+
|
|
|
+# Collect config files
|
|
|
+BASE_DIR = os.getcwd()
|
|
|
+config_filename = os.path.join(BASE_DIR, "mypy.ini")
|
|
|
+codeowners_filename = os.path.join(BASE_DIR, ".github/CODEOWNERS")
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
|
|
+
|
|
|
+
|
|
|
+def get_source_files() -> Set[str]:
|
|
|
+ logger.debug(f"get_source_files {config_filename}")
|
|
|
+ config = configparser.ConfigParser()
|
|
|
+ config.read(config_filename)
|
|
|
+ files = config["mypy"]["files"]
|
|
|
+ logger.debug(files)
|
|
|
+ return {filename.strip() for filename in files.split(",")}
|
|
|
+
|
|
|
+
|
|
|
+def flatten_directories(paths: Set[str]) -> Set[str]:
|
|
|
+ """
|
|
|
+ For a list of files, recursively turn the directories into lists of their
|
|
|
+ component files while passing along non-directories.
|
|
|
+ """
|
|
|
+ result = set()
|
|
|
+ for path in paths:
|
|
|
+ if path in IGNORE:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if os.path.isdir(path):
|
|
|
+ next_level = {os.path.join(path, x) for x in os.listdir(path)}
|
|
|
+ flattened = flatten_directories(next_level)
|
|
|
+ result.update(flattened)
|
|
|
+ elif path.endswith(".py"):
|
|
|
+ result.add(path)
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+def get_all_teams(team: Optional[str] = None) -> Set[str]:
|
|
|
+ """
|
|
|
+ Re-read the codeowners file looking for team names. This isn't a full
|
|
|
+ solution because it doesn't skip commented lines. I wish the codeowners
|
|
|
+ parse did this for us.
|
|
|
+ """
|
|
|
+ if team:
|
|
|
+ return {team}
|
|
|
+
|
|
|
+ teams = set()
|
|
|
+ with open(codeowners_filename) as f:
|
|
|
+ for line in f.readlines():
|
|
|
+ teams.update(TEAM_REGEX.findall(line))
|
|
|
+
|
|
|
+ logger.debug("All teams")
|
|
|
+ logger.debug("\n".join(teams))
|
|
|
+ return teams
|
|
|
+
|
|
|
+
|
|
|
+def split_files_by_codeowner(files: Set[str], codeowners: Any) -> MutableMapping[str, Set[str]]:
|
|
|
+ """
|
|
|
+ Given a list of filenames and a codeowners objects, split the files up by
|
|
|
+ owner. This isn't a full solution because it doesn't handle multiple owners
|
|
|
+ on a file.
|
|
|
+ """
|
|
|
+ files_by_codeowner = defaultdict(set)
|
|
|
+ for filename in files:
|
|
|
+ owners = codeowners.of(filename)
|
|
|
+ logger.debug(f"{filename} {owners}")
|
|
|
+
|
|
|
+ owners = {owner[1] for owner in owners} if owners else {UNOWNED_KEY}
|
|
|
+ for owner in owners:
|
|
|
+ files_by_codeowner[owner].add(filename)
|
|
|
+ return files_by_codeowner
|
|
|
+
|
|
|
+
|
|
|
+def load_cache(filename: Optional[str] = None) -> MutableMapping[str, int]:
|
|
|
+ logger.debug(f"loading cache from {filename}")
|
|
|
+
|
|
|
+ if not (filename and os.path.exists(filename)):
|
|
|
+ logger.debug("file not found")
|
|
|
+ return {}
|
|
|
+
|
|
|
+ cache = {}
|
|
|
+ with open(filename) as f:
|
|
|
+ try:
|
|
|
+ for line in f.readlines():
|
|
|
+ key, value = line.split(CACHE_SEPARATOR)
|
|
|
+ cache[key] = int(value)
|
|
|
+ except (AttributeError, OSError, TypeError, ValueError):
|
|
|
+ return {}
|
|
|
+ return cache
|
|
|
+
|
|
|
+
|
|
|
+def store_cache(cache: Mapping[str, int], filename: str) -> None:
|
|
|
+ # TODO We don't garbage collect stale hashes so the file cache will continue
|
|
|
+ # to grow indefinitely.
|
|
|
+ if not filename:
|
|
|
+ return
|
|
|
+
|
|
|
+ with open(filename, "w") as f:
|
|
|
+ for key, value in cache.items():
|
|
|
+ f.write(f"{key}{CACHE_SEPARATOR}{value}\n")
|
|
|
+
|
|
|
+
|
|
|
+def hash_file(filename: str) -> str:
|
|
|
+ """https://stackoverflow.com/questions/22733826"""
|
|
|
+ func = hashlib.md5()
|
|
|
+ with open(filename, "rb") as f:
|
|
|
+ while True:
|
|
|
+ block = f.read(1024 * func.block_size)
|
|
|
+ if not block:
|
|
|
+ break
|
|
|
+ func.update(block)
|
|
|
+ return func.hexdigest()
|
|
|
+
|
|
|
+
|
|
|
+def analyze_file(file: str, cache: MutableMapping[str, int]) -> int:
|
|
|
+ """Evan"s algorithm for grabbing LOC from a file."""
|
|
|
+ filename = os.path.join(BASE_DIR, file)
|
|
|
+
|
|
|
+ key = hash_file(filename)
|
|
|
+ cached_value = cache.get(key)
|
|
|
+ if cached_value is not None:
|
|
|
+ logger.debug(f"cache hit {filename}")
|
|
|
+ return cached_value
|
|
|
+
|
|
|
+ logger.debug(f"cache size {len(cache.keys())}")
|
|
|
+ logger.debug(f"cache miss {filename} {key}")
|
|
|
+ proc_cmd = f"pygount {filename} --format=summary --suffix=py"
|
|
|
+ proc = subprocess.run(proc_cmd.split(" "), capture_output=True)
|
|
|
+ output = proc.stdout.decode("utf-8")
|
|
|
+ value = int(output.split("\n")[-2].split()[-2])
|
|
|
+
|
|
|
+ cache[key] = value
|
|
|
+ return value
|
|
|
+
|
|
|
+
|
|
|
+def total_lines(files: Set[str], cache: MutableMapping[str, int], status: str = "") -> int:
|
|
|
+ """Gets the total lines and primes the cache."""
|
|
|
+ total = 0
|
|
|
+ for i, file in enumerate(files):
|
|
|
+ total += analyze_file(file, cache)
|
|
|
+ progress(i, len(files), status)
|
|
|
+ return total
|
|
|
+
|
|
|
+
|
|
|
+def analyze_files(
|
|
|
+ files: Set[str],
|
|
|
+ codeowners: Any,
|
|
|
+ cache: MutableMapping[str, int],
|
|
|
+ teams: Set[str],
|
|
|
+ status: str = "",
|
|
|
+) -> Mapping[str, int]:
|
|
|
+ logger.debug(f"file count {len(files)}")
|
|
|
+ logger.debug(f"teams: {teams}")
|
|
|
+
|
|
|
+ # This is slow.
|
|
|
+ total = total_lines(files, cache, status)
|
|
|
+ files_by_codeowner = split_files_by_codeowner(files, codeowners)
|
|
|
+
|
|
|
+ count_by_team = defaultdict(int)
|
|
|
+ for team in teams:
|
|
|
+ subset_of_files = files_by_codeowner.get(team, [])
|
|
|
+ logger.debug(f"{team} {len(subset_of_files)}")
|
|
|
+ for file in subset_of_files:
|
|
|
+ value = analyze_file(file, cache)
|
|
|
+ count_by_team[team] += value
|
|
|
+ logger.debug(f"{value} {file}")
|
|
|
+
|
|
|
+ logger.debug(count_by_team)
|
|
|
+ count_by_team[TOTALS_KEY] = total
|
|
|
+ return count_by_team
|
|
|
+
|
|
|
+
|
|
|
+def get_result(
|
|
|
+ covered_by_team: Mapping[str, int],
|
|
|
+ not_covered_by_team: Mapping[str, int],
|
|
|
+ team: str,
|
|
|
+) -> float:
|
|
|
+ covered = covered_by_team.get(team, 0)
|
|
|
+ total = covered + not_covered_by_team.get(team, 0)
|
|
|
+ return ((float(covered) / float(total)) * 100) if total else 0.0
|
|
|
+
|
|
|
+
|
|
|
+def print_results(
|
|
|
+ covered_by_team: Mapping[str, int],
|
|
|
+ not_covered_by_team: Mapping[str, int],
|
|
|
+ teams: Set[str],
|
|
|
+) -> None:
|
|
|
+ """Pretty print the results."""
|
|
|
+ tuples = (
|
|
|
+ sorted(
|
|
|
+ ((team, get_result(covered_by_team, not_covered_by_team, team)) for team in teams),
|
|
|
+ key=lambda x: x[1],
|
|
|
+ )
|
|
|
+ + [(TOTALS_KEY, get_result(covered_by_team, not_covered_by_team, TOTALS_KEY))]
|
|
|
+ )
|
|
|
+
|
|
|
+ bar = "=" * int(BAR_LENGTH / 2)
|
|
|
+ print(f"{bar} Python coverage by team {bar}") # NOQA S002
|
|
|
+ for team, percent in tuples:
|
|
|
+ if percent:
|
|
|
+ print(f"{team:<32} {(percent):.2f}%") # NOQA S002
|
|
|
+
|
|
|
+
|
|
|
+def setup_args() -> Any:
|
|
|
+ # TODO take a config file
|
|
|
+ parser = argparse.ArgumentParser(
|
|
|
+ description="Generate a python typing report",
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--verbose",
|
|
|
+ "-v",
|
|
|
+ action="store_true",
|
|
|
+ help="run script in debug mode",
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--team",
|
|
|
+ "-t",
|
|
|
+ action="store",
|
|
|
+ type=str,
|
|
|
+ help="only run analytics on this team",
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--cache",
|
|
|
+ "-c",
|
|
|
+ action="store",
|
|
|
+ type=str,
|
|
|
+ help="the location of a cache file",
|
|
|
+ )
|
|
|
+ return parser.parse_args()
|
|
|
+
|
|
|
+
|
|
|
+def progress(count: int, total: int, status: str = "") -> None:
|
|
|
+ """
|
|
|
+ https://gist.github.com/vladignatyev/06860ec2040cb497f0f3
|
|
|
+ """
|
|
|
+ if logger.level == logging.DEBUG:
|
|
|
+ # progress is incompatible with logger for just don't try.
|
|
|
+ return
|
|
|
+ filled_len = int(round(BAR_LENGTH * count / float(total)))
|
|
|
+
|
|
|
+ percents = round(100.0 * count / float(total), 1)
|
|
|
+ bar = "=" * filled_len + "-" * (BAR_LENGTH - filled_len)
|
|
|
+
|
|
|
+ sys.stdout.write(f"[{bar}] {percents}% ...{status}\r")
|
|
|
+ sys.stdout.flush()
|
|
|
+
|
|
|
+
|
|
|
+def main() -> None:
|
|
|
+ args = setup_args()
|
|
|
+ if args.verbose:
|
|
|
+ logger.setLevel(logging.DEBUG)
|
|
|
+
|
|
|
+ with open(codeowners_filename) as f:
|
|
|
+ codeowners = CodeOwners("\n".join(f.readlines()))
|
|
|
+
|
|
|
+ covered_files = flatten_directories(get_source_files())
|
|
|
+ all_files = flatten_directories(ROOT)
|
|
|
+ cache = load_cache(args.cache)
|
|
|
+ teams = get_all_teams(team=args.team)
|
|
|
+
|
|
|
+ covered = analyze_files(covered_files, codeowners, cache, teams=teams, status="mypy.ini")
|
|
|
+
|
|
|
+ # If the team has no coverage, then don't bother getting the denominator.
|
|
|
+ teams_with_covered_lines = {t for t in teams if covered.get(t, 0) > 0}
|
|
|
+
|
|
|
+ not_covered = analyze_files(
|
|
|
+ all_files - covered_files, codeowners, cache, teams=teams_with_covered_lines, status="root"
|
|
|
+ )
|
|
|
+ store_cache(cache, args.cache)
|
|
|
+ print_results(covered, not_covered, teams)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|