123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301 |
- #!/usr/bin/env python
- from __future__ import annotations
- import argparse
- import configparser
- import hashlib
- import logging
- import os
- import re
- import subprocess
- import sys
- from collections import defaultdict
- from typing import Any, Mapping, MutableMapping, Optional, Set
- from codeowners import CodeOwners
- """
- Calculate python typing progress by teams as determined by CODEOWNERS.
- """
- BAR_LENGTH = 60
- UNOWNED_KEY = "other"
- TOTALS_KEY = "TOTAL"
- CACHE_SEPARATOR = "\t"
- TEAM_REGEX = re.compile(r"@\S+/\S+")
- # TODO pass directories and ignores as parameters
- ROOT = {"src/"}
- # TODO make these regexes
- IGNORE = {"src/sentry/migrations/"}
- # Collect config files
- BASE_DIR = os.getcwd()
- config_filename = os.path.join(BASE_DIR, "mypy.ini")
- codeowners_filename = os.path.join(BASE_DIR, ".github/CODEOWNERS")
- logger = logging.getLogger(__name__)
- logging.basicConfig(stream=sys.stdout, level=logging.INFO)
- def get_source_files() -> Set[str]:
- logger.debug(f"get_source_files {config_filename}")
- config = configparser.ConfigParser()
- config.read(config_filename)
- files = config["mypy"]["files"]
- logger.debug(files)
- return {filename.strip() for filename in files.split(",")}
- def flatten_directories(paths: Set[str]) -> Set[str]:
- """
- For a list of files, recursively turn the directories into lists of their
- component files while passing along non-directories.
- """
- result = set()
- for path in paths:
- if path in IGNORE:
- continue
- if os.path.isdir(path):
- next_level = {os.path.join(path, x) for x in os.listdir(path)}
- flattened = flatten_directories(next_level)
- result.update(flattened)
- elif path.endswith(".py"):
- result.add(path)
- return result
- def get_all_teams(team: Optional[str] = None) -> Set[str]:
- """
- Re-read the codeowners file looking for team names. This isn't a full
- solution because it doesn't skip commented lines. I wish the codeowners
- parse did this for us.
- """
- if team:
- return {team}
- teams = set()
- with open(codeowners_filename) as f:
- for line in f.readlines():
- teams.update(TEAM_REGEX.findall(line))
- logger.debug("All teams")
- logger.debug("\n".join(teams))
- return teams
- def split_files_by_codeowner(files: Set[str], codeowners: Any) -> MutableMapping[str, Set[str]]:
- """
- Given a list of filenames and a codeowners objects, split the files up by
- owner. This isn't a full solution because it doesn't handle multiple owners
- on a file.
- """
- files_by_codeowner = defaultdict(set)
- for filename in files:
- owners = codeowners.of(filename)
- logger.debug(f"{filename} {owners}")
- owners = {owner[1] for owner in owners} if owners else {UNOWNED_KEY}
- for owner in owners:
- files_by_codeowner[owner].add(filename)
- return files_by_codeowner
- def load_cache(filename: Optional[str] = None) -> MutableMapping[str, int]:
- logger.debug(f"loading cache from {filename}")
- if not (filename and os.path.exists(filename)):
- logger.debug("file not found")
- return {}
- cache = {}
- with open(filename) as f:
- try:
- for line in f.readlines():
- key, value = line.split(CACHE_SEPARATOR)
- cache[key] = int(value)
- except (AttributeError, OSError, TypeError, ValueError):
- return {}
- return cache
- def store_cache(cache: Mapping[str, int], filename: str) -> None:
- # TODO We don't garbage collect stale hashes so the file cache will continue
- # to grow indefinitely.
- if not filename:
- return
- with open(filename, "w") as f:
- for key, value in cache.items():
- f.write(f"{key}{CACHE_SEPARATOR}{value}\n")
- def hash_file(filename: str) -> str:
- """https://stackoverflow.com/questions/22733826"""
- func = hashlib.md5()
- with open(filename, "rb") as f:
- while True:
- block = f.read(1024 * func.block_size)
- if not block:
- break
- func.update(block)
- return func.hexdigest()
- def analyze_file(file: str, cache: MutableMapping[str, int]) -> int:
- """Evan"s algorithm for grabbing LOC from a file."""
- filename = os.path.join(BASE_DIR, file)
- key = hash_file(filename)
- cached_value = cache.get(key)
- if cached_value is not None:
- logger.debug(f"cache hit {filename}")
- return cached_value
- logger.debug(f"cache size {len(cache.keys())}")
- logger.debug(f"cache miss {filename} {key}")
- proc_cmd = f"pygount {filename} --format=summary --suffix=py"
- proc = subprocess.run(proc_cmd.split(" "), capture_output=True)
- output = proc.stdout.decode("utf-8")
- value = int(output.split("\n")[-2].split()[-2])
- cache[key] = value
- return value
- def total_lines(files: Set[str], cache: MutableMapping[str, int], status: str = "") -> int:
- """Gets the total lines and primes the cache."""
- total = 0
- for i, file in enumerate(files):
- total += analyze_file(file, cache)
- progress(i, len(files), status)
- return total
- def analyze_files(
- files: Set[str],
- codeowners: Any,
- cache: MutableMapping[str, int],
- teams: Set[str],
- status: str = "",
- ) -> Mapping[str, int]:
- logger.debug(f"file count {len(files)}")
- logger.debug(f"teams: {teams}")
- # This is slow.
- total = total_lines(files, cache, status)
- files_by_codeowner = split_files_by_codeowner(files, codeowners)
- count_by_team: defaultdict[str, int] = defaultdict(int)
- for team in teams:
- subset_of_files: set[str] = files_by_codeowner.get(team, set())
- logger.debug(f"{team} {len(subset_of_files)}")
- for file in subset_of_files:
- value = analyze_file(file, cache)
- count_by_team[team] += value
- logger.debug(f"{value} {file}")
- logger.debug(count_by_team)
- count_by_team[TOTALS_KEY] = total
- return count_by_team
- def get_result(
- covered_by_team: Mapping[str, int],
- not_covered_by_team: Mapping[str, int],
- team: str,
- ) -> float:
- covered = covered_by_team.get(team, 0)
- total = covered + not_covered_by_team.get(team, 0)
- return ((float(covered) / float(total)) * 100) if total else 0.0
- def print_results(
- covered_by_team: Mapping[str, int],
- not_covered_by_team: Mapping[str, int],
- teams: Set[str],
- ) -> None:
- """Pretty print the results."""
- tuples = sorted(
- ((team, get_result(covered_by_team, not_covered_by_team, team)) for team in teams),
- key=lambda x: x[1],
- ) + [(TOTALS_KEY, get_result(covered_by_team, not_covered_by_team, TOTALS_KEY))]
- bar = "=" * int(BAR_LENGTH / 2)
- print(f"{bar} Python coverage by team {bar}") # NOQA S002
- for team, percent in tuples:
- if percent:
- print(f"{team:<32} {(percent):.2f}%") # NOQA S002
- def setup_args() -> Any:
- # TODO take a config file
- parser = argparse.ArgumentParser(
- description="Generate a python typing report",
- )
- parser.add_argument(
- "--verbose",
- "-v",
- action="store_true",
- help="run script in debug mode",
- )
- parser.add_argument(
- "--team",
- "-t",
- action="store",
- type=str,
- help="only run analytics on this team",
- )
- parser.add_argument(
- "--cache",
- "-c",
- action="store",
- type=str,
- help="the location of a cache file",
- )
- return parser.parse_args()
- def progress(count: int, total: int, status: str = "") -> None:
- """
- https://gist.github.com/vladignatyev/06860ec2040cb497f0f3
- """
- if logger.level == logging.DEBUG:
- # progress is incompatible with logger for just don't try.
- return
- filled_len = int(round(BAR_LENGTH * count / float(total)))
- percents = round(100.0 * count / float(total), 1)
- bar = "=" * filled_len + "-" * (BAR_LENGTH - filled_len)
- sys.stdout.write(f"[{bar}] {percents}% ...{status}\r")
- sys.stdout.flush()
- def main() -> None:
- args = setup_args()
- if args.verbose:
- logger.setLevel(logging.DEBUG)
- with open(codeowners_filename) as f:
- codeowners = CodeOwners("\n".join(f.readlines()))
- covered_files = flatten_directories(get_source_files())
- all_files = flatten_directories(ROOT)
- cache = load_cache(args.cache)
- teams = get_all_teams(team=args.team)
- covered = analyze_files(covered_files, codeowners, cache, teams=teams, status="mypy.ini")
- # If the team has no coverage, then don't bother getting the denominator.
- teams_with_covered_lines = {t for t in teams if covered.get(t, 0) > 0}
- not_covered = analyze_files(
- all_files - covered_files, codeowners, cache, teams=teams_with_covered_lines, status="root"
- )
- store_cache(cache, args.cache)
- print_results(covered, not_covered, teams)
- if __name__ == "__main__":
- main()
|