123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- """util.py - General utilities for running, loading, and processing benchmarks"""
- import json
- import os
- import re
- import subprocess
- import sys
- import tempfile
- # Input file type enumeration
- IT_Invalid = 0
- IT_JSON = 1
- IT_Executable = 2
- _num_magic_bytes = 2 if sys.platform.startswith("win") else 4
- def is_executable_file(filename):
- """
- Return 'True' if 'filename' names a valid file which is likely
- an executable. A file is considered an executable if it starts with the
- magic bytes for a EXE, Mach O, or ELF file.
- """
- if not os.path.isfile(filename):
- return False
- with open(filename, mode="rb") as f:
- magic_bytes = f.read(_num_magic_bytes)
- if sys.platform == "darwin":
- return magic_bytes in [
- b"\xfe\xed\xfa\xce", # MH_MAGIC
- b"\xce\xfa\xed\xfe", # MH_CIGAM
- b"\xfe\xed\xfa\xcf", # MH_MAGIC_64
- b"\xcf\xfa\xed\xfe", # MH_CIGAM_64
- b"\xca\xfe\xba\xbe", # FAT_MAGIC
- b"\xbe\xba\xfe\xca", # FAT_CIGAM
- ]
- elif sys.platform.startswith("win"):
- return magic_bytes == b"MZ"
- else:
- return magic_bytes == b"\x7fELF"
- def is_json_file(filename):
- """
- Returns 'True' if 'filename' names a valid JSON output file.
- 'False' otherwise.
- """
- try:
- with open(filename, "r") as f:
- json.load(f)
- return True
- except BaseException:
- pass
- return False
- def classify_input_file(filename):
- """
- Return a tuple (type, msg) where 'type' specifies the classified type
- of 'filename'. If 'type' is 'IT_Invalid' then 'msg' is a human readable
- string representing the error.
- """
- ftype = IT_Invalid
- err_msg = None
- if not os.path.exists(filename):
- err_msg = "'%s' does not exist" % filename
- elif not os.path.isfile(filename):
- err_msg = "'%s' does not name a file" % filename
- elif is_executable_file(filename):
- ftype = IT_Executable
- elif is_json_file(filename):
- ftype = IT_JSON
- else:
- err_msg = (
- "'%s' does not name a valid benchmark executable or JSON file"
- % filename
- )
- return ftype, err_msg
- def check_input_file(filename):
- """
- Classify the file named by 'filename' and return the classification.
- If the file is classified as 'IT_Invalid' print an error message and exit
- the program.
- """
- ftype, msg = classify_input_file(filename)
- if ftype == IT_Invalid:
- print("Invalid input file: %s" % msg)
- sys.exit(1)
- return ftype
- def find_benchmark_flag(prefix, benchmark_flags):
- """
- Search the specified list of flags for a flag matching `<prefix><arg>` and
- if it is found return the arg it specifies. If specified more than once the
- last value is returned. If the flag is not found None is returned.
- """
- assert prefix.startswith("--") and prefix.endswith("=")
- result = None
- for f in benchmark_flags:
- if f.startswith(prefix):
- result = f[len(prefix) :]
- return result
- def remove_benchmark_flags(prefix, benchmark_flags):
- """
- Return a new list containing the specified benchmark_flags except those
- with the specified prefix.
- """
- assert prefix.startswith("--") and prefix.endswith("=")
- return [f for f in benchmark_flags if not f.startswith(prefix)]
- def load_benchmark_results(fname, benchmark_filter):
- """
- Read benchmark output from a file and return the JSON object.
- Apply benchmark_filter, a regular expression, with nearly the same
- semantics of the --benchmark_filter argument. May be None.
- Note: the Python regular expression engine is used instead of the
- one used by the C++ code, which may produce different results
- in complex cases.
- REQUIRES: 'fname' names a file containing JSON benchmark output.
- """
- def benchmark_wanted(benchmark):
- if benchmark_filter is None:
- return True
- name = benchmark.get("run_name", None) or benchmark["name"]
- return re.search(benchmark_filter, name) is not None
- with open(fname, "r") as f:
- results = json.load(f)
- if "context" in results:
- if "json_schema_version" in results["context"]:
- json_schema_version = results["context"]["json_schema_version"]
- if json_schema_version != 1:
- print(
- "In %s, got unnsupported JSON schema version: %i, expected 1"
- % (fname, json_schema_version)
- )
- sys.exit(1)
- if "benchmarks" in results:
- results["benchmarks"] = list(
- filter(benchmark_wanted, results["benchmarks"])
- )
- return results
- def sort_benchmark_results(result):
- benchmarks = result["benchmarks"]
- # From inner key to the outer key!
- benchmarks = sorted(
- benchmarks,
- key=lambda benchmark: benchmark["repetition_index"]
- if "repetition_index" in benchmark
- else -1,
- )
- benchmarks = sorted(
- benchmarks,
- key=lambda benchmark: 1
- if "run_type" in benchmark and benchmark["run_type"] == "aggregate"
- else 0,
- )
- benchmarks = sorted(
- benchmarks,
- key=lambda benchmark: benchmark["per_family_instance_index"]
- if "per_family_instance_index" in benchmark
- else -1,
- )
- benchmarks = sorted(
- benchmarks,
- key=lambda benchmark: benchmark["family_index"]
- if "family_index" in benchmark
- else -1,
- )
- result["benchmarks"] = benchmarks
- return result
- def run_benchmark(exe_name, benchmark_flags):
- """
- Run a benchmark specified by 'exe_name' with the specified
- 'benchmark_flags'. The benchmark is run directly as a subprocess to preserve
- real time console output.
- RETURNS: A JSON object representing the benchmark output
- """
- output_name = find_benchmark_flag("--benchmark_out=", benchmark_flags)
- is_temp_output = False
- if output_name is None:
- is_temp_output = True
- thandle, output_name = tempfile.mkstemp()
- os.close(thandle)
- benchmark_flags = list(benchmark_flags) + [
- "--benchmark_out=%s" % output_name
- ]
- cmd = [exe_name] + benchmark_flags
- print("RUNNING: %s" % " ".join(cmd))
- exitCode = subprocess.call(cmd)
- if exitCode != 0:
- print("TEST FAILED...")
- sys.exit(exitCode)
- json_res = load_benchmark_results(output_name, None)
- if is_temp_output:
- os.unlink(output_name)
- return json_res
- def run_or_load_benchmark(filename, benchmark_flags):
- """
- Get the results for a specified benchmark. If 'filename' specifies
- an executable benchmark then the results are generated by running the
- benchmark. Otherwise 'filename' must name a valid JSON output file,
- which is loaded and the result returned.
- """
- ftype = check_input_file(filename)
- if ftype == IT_JSON:
- benchmark_filter = find_benchmark_flag(
- "--benchmark_filter=", benchmark_flags
- )
- return load_benchmark_results(filename, benchmark_filter)
- if ftype == IT_Executable:
- return run_benchmark(filename, benchmark_flags)
- raise ValueError("Unknown file type %s" % ftype)
|