|
@@ -1,301 +0,0 @@
|
|
|
-#!/usr/bin/env python
|
|
|
-from __future__ import annotations
|
|
|
-
|
|
|
-import argparse
|
|
|
-import configparser
|
|
|
-import hashlib
|
|
|
-import logging
|
|
|
-import os
|
|
|
-import re
|
|
|
-import subprocess
|
|
|
-import sys
|
|
|
-from collections import defaultdict
|
|
|
-from typing import Any, Mapping, MutableMapping, Optional, Set
|
|
|
-
|
|
|
-from codeowners import CodeOwners
|
|
|
-
|
|
|
-"""
|
|
|
-Calculate python typing progress by teams as determined by CODEOWNERS.
|
|
|
-"""
|
|
|
-
|
|
|
-BAR_LENGTH = 60
|
|
|
-UNOWNED_KEY = "other"
|
|
|
-TOTALS_KEY = "TOTAL"
|
|
|
-CACHE_SEPARATOR = "\t"
|
|
|
-TEAM_REGEX = re.compile(r"@\S+/\S+")
|
|
|
-# TODO pass directories and ignores as parameters
|
|
|
-ROOT = {"src/"}
|
|
|
-# TODO make these regexes
|
|
|
-IGNORE = {"src/sentry/migrations/"}
|
|
|
-
|
|
|
-# Collect config files
|
|
|
-BASE_DIR = os.getcwd()
|
|
|
-config_filename = os.path.join(BASE_DIR, "mypy.ini")
|
|
|
-codeowners_filename = os.path.join(BASE_DIR, ".github/CODEOWNERS")
|
|
|
-
|
|
|
-logger = logging.getLogger(__name__)
|
|
|
-logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
|
|
-
|
|
|
-
|
|
|
-def get_source_files() -> Set[str]:
|
|
|
- logger.debug(f"get_source_files {config_filename}")
|
|
|
- config = configparser.ConfigParser()
|
|
|
- config.read(config_filename)
|
|
|
- files = config["mypy"]["files"]
|
|
|
- logger.debug(files)
|
|
|
- return {filename.strip() for filename in files.split(",")}
|
|
|
-
|
|
|
-
|
|
|
-def flatten_directories(paths: Set[str]) -> Set[str]:
|
|
|
- """
|
|
|
- For a list of files, recursively turn the directories into lists of their
|
|
|
- component files while passing along non-directories.
|
|
|
- """
|
|
|
- result = set()
|
|
|
- for path in paths:
|
|
|
- if path in IGNORE:
|
|
|
- continue
|
|
|
-
|
|
|
- if os.path.isdir(path):
|
|
|
- next_level = {os.path.join(path, x) for x in os.listdir(path)}
|
|
|
- flattened = flatten_directories(next_level)
|
|
|
- result.update(flattened)
|
|
|
- elif path.endswith(".py"):
|
|
|
- result.add(path)
|
|
|
- return result
|
|
|
-
|
|
|
-
|
|
|
-def get_all_teams(team: Optional[str] = None) -> Set[str]:
|
|
|
- """
|
|
|
- Re-read the codeowners file looking for team names. This isn't a full
|
|
|
- solution because it doesn't skip commented lines. I wish the codeowners
|
|
|
- parse did this for us.
|
|
|
- """
|
|
|
- if team:
|
|
|
- return {team}
|
|
|
-
|
|
|
- teams = set()
|
|
|
- with open(codeowners_filename) as f:
|
|
|
- for line in f.readlines():
|
|
|
- teams.update(TEAM_REGEX.findall(line))
|
|
|
-
|
|
|
- logger.debug("All teams")
|
|
|
- logger.debug("\n".join(teams))
|
|
|
- return teams
|
|
|
-
|
|
|
-
|
|
|
-def split_files_by_codeowner(files: Set[str], codeowners: Any) -> MutableMapping[str, Set[str]]:
|
|
|
- """
|
|
|
- Given a list of filenames and a codeowners objects, split the files up by
|
|
|
- owner. This isn't a full solution because it doesn't handle multiple owners
|
|
|
- on a file.
|
|
|
- """
|
|
|
- files_by_codeowner = defaultdict(set)
|
|
|
- for filename in files:
|
|
|
- owners = codeowners.of(filename)
|
|
|
- logger.debug(f"{filename} {owners}")
|
|
|
-
|
|
|
- owners = {owner[1] for owner in owners} if owners else {UNOWNED_KEY}
|
|
|
- for owner in owners:
|
|
|
- files_by_codeowner[owner].add(filename)
|
|
|
- return files_by_codeowner
|
|
|
-
|
|
|
-
|
|
|
-def load_cache(filename: Optional[str] = None) -> MutableMapping[str, int]:
|
|
|
- logger.debug(f"loading cache from {filename}")
|
|
|
-
|
|
|
- if not (filename and os.path.exists(filename)):
|
|
|
- logger.debug("file not found")
|
|
|
- return {}
|
|
|
-
|
|
|
- cache = {}
|
|
|
- with open(filename) as f:
|
|
|
- try:
|
|
|
- for line in f.readlines():
|
|
|
- key, value = line.split(CACHE_SEPARATOR)
|
|
|
- cache[key] = int(value)
|
|
|
- except (AttributeError, OSError, TypeError, ValueError):
|
|
|
- return {}
|
|
|
- return cache
|
|
|
-
|
|
|
-
|
|
|
-def store_cache(cache: Mapping[str, int], filename: str) -> None:
|
|
|
- # TODO We don't garbage collect stale hashes so the file cache will continue
|
|
|
- # to grow indefinitely.
|
|
|
- if not filename:
|
|
|
- return
|
|
|
-
|
|
|
- with open(filename, "w") as f:
|
|
|
- for key, value in cache.items():
|
|
|
- f.write(f"{key}{CACHE_SEPARATOR}{value}\n")
|
|
|
-
|
|
|
-
|
|
|
-def hash_file(filename: str) -> str:
|
|
|
- """https://stackoverflow.com/questions/22733826"""
|
|
|
- func = hashlib.md5()
|
|
|
- with open(filename, "rb") as f:
|
|
|
- while True:
|
|
|
- block = f.read(1024 * func.block_size)
|
|
|
- if not block:
|
|
|
- break
|
|
|
- func.update(block)
|
|
|
- return func.hexdigest()
|
|
|
-
|
|
|
-
|
|
|
-def analyze_file(file: str, cache: MutableMapping[str, int]) -> int:
|
|
|
- """Evan"s algorithm for grabbing LOC from a file."""
|
|
|
- filename = os.path.join(BASE_DIR, file)
|
|
|
-
|
|
|
- key = hash_file(filename)
|
|
|
- cached_value = cache.get(key)
|
|
|
- if cached_value is not None:
|
|
|
- logger.debug(f"cache hit {filename}")
|
|
|
- return cached_value
|
|
|
-
|
|
|
- logger.debug(f"cache size {len(cache.keys())}")
|
|
|
- logger.debug(f"cache miss {filename} {key}")
|
|
|
- proc_cmd = f"pygount {filename} --format=summary --suffix=py"
|
|
|
- proc = subprocess.run(proc_cmd.split(" "), capture_output=True)
|
|
|
- output = proc.stdout.decode("utf-8")
|
|
|
- value = int(output.split("\n")[-2].split()[-2])
|
|
|
-
|
|
|
- cache[key] = value
|
|
|
- return value
|
|
|
-
|
|
|
-
|
|
|
-def total_lines(files: Set[str], cache: MutableMapping[str, int], status: str = "") -> int:
|
|
|
- """Gets the total lines and primes the cache."""
|
|
|
- total = 0
|
|
|
- for i, file in enumerate(files):
|
|
|
- total += analyze_file(file, cache)
|
|
|
- progress(i, len(files), status)
|
|
|
- return total
|
|
|
-
|
|
|
-
|
|
|
-def analyze_files(
|
|
|
- files: Set[str],
|
|
|
- codeowners: Any,
|
|
|
- cache: MutableMapping[str, int],
|
|
|
- teams: Set[str],
|
|
|
- status: str = "",
|
|
|
-) -> Mapping[str, int]:
|
|
|
- logger.debug(f"file count {len(files)}")
|
|
|
- logger.debug(f"teams: {teams}")
|
|
|
-
|
|
|
- # This is slow.
|
|
|
- total = total_lines(files, cache, status)
|
|
|
- files_by_codeowner = split_files_by_codeowner(files, codeowners)
|
|
|
-
|
|
|
- count_by_team: defaultdict[str, int] = defaultdict(int)
|
|
|
- for team in teams:
|
|
|
- subset_of_files: set[str] = files_by_codeowner.get(team, set())
|
|
|
- logger.debug(f"{team} {len(subset_of_files)}")
|
|
|
- for file in subset_of_files:
|
|
|
- value = analyze_file(file, cache)
|
|
|
- count_by_team[team] += value
|
|
|
- logger.debug(f"{value} {file}")
|
|
|
-
|
|
|
- logger.debug(count_by_team)
|
|
|
- count_by_team[TOTALS_KEY] = total
|
|
|
- return count_by_team
|
|
|
-
|
|
|
-
|
|
|
-def get_result(
|
|
|
- covered_by_team: Mapping[str, int],
|
|
|
- not_covered_by_team: Mapping[str, int],
|
|
|
- team: str,
|
|
|
-) -> float:
|
|
|
- covered = covered_by_team.get(team, 0)
|
|
|
- total = covered + not_covered_by_team.get(team, 0)
|
|
|
- return ((float(covered) / float(total)) * 100) if total else 0.0
|
|
|
-
|
|
|
-
|
|
|
-def print_results(
|
|
|
- covered_by_team: Mapping[str, int],
|
|
|
- not_covered_by_team: Mapping[str, int],
|
|
|
- teams: Set[str],
|
|
|
-) -> None:
|
|
|
- """Pretty print the results."""
|
|
|
- tuples = sorted(
|
|
|
- ((team, get_result(covered_by_team, not_covered_by_team, team)) for team in teams),
|
|
|
- key=lambda x: x[1],
|
|
|
- ) + [(TOTALS_KEY, get_result(covered_by_team, not_covered_by_team, TOTALS_KEY))]
|
|
|
-
|
|
|
- bar = "=" * int(BAR_LENGTH / 2)
|
|
|
- print(f"{bar} Python coverage by team {bar}") # NOQA S002
|
|
|
- for team, percent in tuples:
|
|
|
- if percent:
|
|
|
- print(f"{team:<32} {(percent):.2f}%") # NOQA S002
|
|
|
-
|
|
|
-
|
|
|
-def setup_args() -> Any:
|
|
|
- # TODO take a config file
|
|
|
- parser = argparse.ArgumentParser(
|
|
|
- description="Generate a python typing report",
|
|
|
- )
|
|
|
- parser.add_argument(
|
|
|
- "--verbose",
|
|
|
- "-v",
|
|
|
- action="store_true",
|
|
|
- help="run script in debug mode",
|
|
|
- )
|
|
|
- parser.add_argument(
|
|
|
- "--team",
|
|
|
- "-t",
|
|
|
- action="store",
|
|
|
- type=str,
|
|
|
- help="only run analytics on this team",
|
|
|
- )
|
|
|
- parser.add_argument(
|
|
|
- "--cache",
|
|
|
- "-c",
|
|
|
- action="store",
|
|
|
- type=str,
|
|
|
- help="the location of a cache file",
|
|
|
- )
|
|
|
- return parser.parse_args()
|
|
|
-
|
|
|
-
|
|
|
-def progress(count: int, total: int, status: str = "") -> None:
|
|
|
- """
|
|
|
- https://gist.github.com/vladignatyev/06860ec2040cb497f0f3
|
|
|
- """
|
|
|
- if logger.level == logging.DEBUG:
|
|
|
- # progress is incompatible with logger for just don't try.
|
|
|
- return
|
|
|
- filled_len = int(round(BAR_LENGTH * count / float(total)))
|
|
|
-
|
|
|
- percents = round(100.0 * count / float(total), 1)
|
|
|
- bar = "=" * filled_len + "-" * (BAR_LENGTH - filled_len)
|
|
|
-
|
|
|
- sys.stdout.write(f"[{bar}] {percents}% ...{status}\r")
|
|
|
- sys.stdout.flush()
|
|
|
-
|
|
|
-
|
|
|
-def main() -> None:
|
|
|
- args = setup_args()
|
|
|
- if args.verbose:
|
|
|
- logger.setLevel(logging.DEBUG)
|
|
|
-
|
|
|
- with open(codeowners_filename) as f:
|
|
|
- codeowners = CodeOwners("\n".join(f.readlines()))
|
|
|
-
|
|
|
- covered_files = flatten_directories(get_source_files())
|
|
|
- all_files = flatten_directories(ROOT)
|
|
|
- cache = load_cache(args.cache)
|
|
|
- teams = get_all_teams(team=args.team)
|
|
|
-
|
|
|
- covered = analyze_files(covered_files, codeowners, cache, teams=teams, status="mypy.ini")
|
|
|
-
|
|
|
- # If the team has no coverage, then don't bother getting the denominator.
|
|
|
- teams_with_covered_lines = {t for t in teams if covered.get(t, 0) > 0}
|
|
|
-
|
|
|
- not_covered = analyze_files(
|
|
|
- all_files - covered_files, codeowners, cache, teams=teams_with_covered_lines, status="root"
|
|
|
- )
|
|
|
- store_cache(cache, args.cache)
|
|
|
- print_results(covered, not_covered, teams)
|
|
|
-
|
|
|
-
|
|
|
-if __name__ == "__main__":
|
|
|
- main()
|