123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572 |
- import os
- import os.path
- import sys
- import logging
- import subprocess
- from multiprocessing import Pool
- from pathlib import Path
- import tempfile
- import shutil
- import re
- import csv
- import click
- import patch
- from collections import Counter
- from library.python.svn_version import svn_version
- from yql.essentials.tests.postgresql.common import get_out_files, Differ
- PROGRAM_NAME = "pg-make-test"
- RUNNER = "../pgrun/pgrun"
- SPLITTER = "../pgrun/pgrun split-statements"
- INIT_SCRIPTS_CFG = "testinits.cfg"
- INIT_SCRIPTS_DIR = "initscripts"
- REPORT_FILE = "pg_tests.csv"
- LOGGER = None
- def get_logger(name, logfile, is_debug):
- logger = logging.getLogger(name)
- logger.setLevel(logging.DEBUG if is_debug else logging.INFO)
- if logfile is not None:
- logger.addHandler(logging.FileHandler(logfile, encoding="utf-8"))
- return logger
- def setup_logging(logfile, is_debug):
- global LOGGER
- LOGGER = get_logger(__file__, logfile, is_debug)
- class Configuration:
- def __init__(
- self, srcdir, dstdir, udfs, patchdir, skip_tests, runner, splitter, report_path, parallel, logfile, is_debug
- ):
- self.srcdir = srcdir
- self.dstdir = dstdir
- self.udfs = udfs
- self.patchdir = patchdir
- self.skip_tests = skip_tests
- self.runner = runner
- self.splitter = splitter
- self.report_path = report_path
- self.parallel = parallel
- self.logfile = logfile
- self.is_debug = is_debug
- def save_strings(fname, lst):
- with open(fname, 'wb') as f:
- for line in lst:
- f.write(line)
- def argwhere1(predicate, collection, default):
- """Returns index of the first element in collection, which satisfies the predicate."""
- try:
- pos, _ = next(enumerate(item for item in collection if predicate(item)))
- except StopIteration:
- return default
- else:
- return pos
- class TestCaseBuilder:
- def __init__(self, config):
- self.config = config
- def build(self, args):
- sqlfile, init_scripts = args
- is_split_logging = self.config.logfile is not None and self.config.parallel
- if is_split_logging:
- logger = get_logger(
- sqlfile.stem,
- f"{self.config.logfile.parent}/{sqlfile.stem}-{self.config.logfile.name}",
- self.config.is_debug,
- )
- else:
- logger = LOGGER
- splitted_stmts = list(self.split_sql_file(sqlfile))
- stmts_count = len(splitted_stmts)
- if init_scripts:
- logging.debug("Init scripts: %s", init_scripts)
- ressqlfile = self.config.dstdir / sqlfile.name
- resoutfile = ressqlfile.with_suffix('.out')
- reserrfile_base = resoutfile.with_suffix('.err')
- max_stmts_run = 0
- ressql = None
- resout = None
- for outfile_idx, outfile in enumerate(get_out_files(sqlfile)):
- test_name = Path(sqlfile).name
- LOGGER.info("Processing (%d) %s -> %s", os.getpid(), test_name, Path(outfile).name)
- if is_split_logging:
- logger.info("Processing (%d) %s -> %s", os.getpid(), test_name, Path(outfile).name)
- with open(outfile, 'rb') as fout:
- outdata = fout.readlines()
- only_out_stmts = Counter()
- only_pgrun_stmts = Counter()
- statements = list(self.split_out_file(splitted_stmts, outdata, logger))
- logger.debug("Matching sql statements to .out file lines")
- for (s_sql, s_out) in statements:
- stmt = '\n'.join(str(sql_line) for sql_line in s_sql)
- only_out_stmts[stmt] += 1
- logger.debug(
- "<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n%s\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n%s\n============================",
- stmt,
- '\n'.join(str(out_line) for out_line in s_out),
- )
- with tempfile.TemporaryDirectory() as tempdir:
- test_out_name = Path(tempdir) / "test.out"
- test_err_name = test_out_name.with_suffix(".err")
- runner_args = self.config.runner + ["--datadir", tempdir]
- for udf in self.config.udfs:
- runner_args.append("--udf")
- runner_args.append(udf)
- if init_scripts:
- init_out_name = Path(tempdir) / "init.out"
- init_err_name = init_out_name.with_suffix(".err")
- for init_script in init_scripts:
- logger.debug("Running init script %s '%s'", self.config.runner, init_script)
- with open(init_script, 'rb') as f, open(init_out_name, 'wb') as fout, open(init_err_name, 'wb') as ferr:
- pi = subprocess.run(runner_args, stdin=f, stdout=fout, stderr=ferr)
- if pi.returncode != 0:
- logger.warning("%s returned error code %d", self.config.runner, pi.returncode)
- logger.debug("Running test %s '%s' -> [%s]", self.config.runner, sqlfile, outfile)
- with open(sqlfile, 'rb') as f, open(test_out_name, 'wb') as fout, open(test_err_name, 'wb') as ferr:
- pi = subprocess.run(runner_args, stdin=f, stdout=fout, stderr=ferr)
- if pi.returncode != 0:
- logger.warning("%s returned error code %d", self.config.runner, pi.returncode)
- with open(test_out_name, 'rb') as fresult:
- out = fresult.readlines()
- logger.debug("Run result:\n%s", str(b'\n'.join(out)))
- real_statements = list(self.split_out_file(splitted_stmts, out, logger))
- logger.debug("Matching sql statements to pgrun's output")
- for (s_sql, s_out) in real_statements:
- stmt = '\n'.join(str(sql_line) for sql_line in s_sql)
- logger.debug(
- "<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n%s\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n%s\n============================",
- stmt,
- '\n'.join(str(out_line) for out_line in s_out),
- )
- if 0 < only_out_stmts[stmt]:
- only_out_stmts[stmt] -= 1
- if 0 == only_out_stmts[stmt]:
- del only_out_stmts[stmt]
- else:
- only_pgrun_stmts[stmt] += 1
- reserrfile = reserrfile_base if outfile_idx == 0 else reserrfile_base.with_suffix(reserrfile_base.suffix + ".{0}".format(outfile_idx))
- shutil.move(test_err_name, reserrfile)
- if only_pgrun_stmts:
- logger.info("Statements in pgrun output, but not in out file:\n%s",
- "\n--------------------------------\n".join(stmt for stmt in only_pgrun_stmts))
- if only_out_stmts:
- logger.info("Statements in out file, but not in pgrun output:\n%s",
- "\n--------------------------------\n".join(stmt for stmt in only_out_stmts))
- stmts_run = 0
- stmts = []
- outs = []
- assert len(statements) == len(real_statements), f"Incorrect statements split in {test_name}. Statements in out-file: {len(statements)}, statements in pgrun output: {len(real_statements)}"
- for ((l_sql, out), (r_sql, res)) in zip(statements, real_statements):
- if l_sql != r_sql:
- logger.warning("out SQL <> pgrun SQL:\n <: %s\n >: %s", l_sql, r_sql)
- break
- if len(Differ.diff(b''.join(out), b''.join(res))) == 0:
- stmts.extend(l_sql)
- outs.extend(out)
- stmts_run += 1
- else:
- logger.warning("out result differs from pgrun result:\n <<: %s\n >>: %s", out, res)
- if max_stmts_run < stmts_run:
- max_stmts_run = stmts_run
- ressql = stmts
- resout = outs
- if ressql is not None and resout is not None:
- LOGGER.info('Case built: %s', sqlfile.name)
- if is_split_logging:
- logger.info('Case built: %s', sqlfile.name)
- save_strings(ressqlfile, ressql)
- save_strings(resoutfile, resout)
- else:
- LOGGER.warning('Case is empty: %s', sqlfile.name)
- if is_split_logging:
- logger.warning('Case is empty: %s', sqlfile.name)
- ressqlfile.unlink(missing_ok=True)
- resoutfile.unlink(missing_ok=True)
- return Path(sqlfile).stem, stmts_count, stmts_run, round(stmts_run * 100 / stmts_count, 2)
- def split_sql_file(self, sqlfile):
- with open(sqlfile, "rb") as f:
- pi = subprocess.run(self.config.splitter, stdin=f, stdout=subprocess.PIPE, stderr=sys.stderr, check=True)
- lines = iter(pi.stdout.splitlines(keepends=True))
- delimiter = next(lines)
- cur_stmt = []
- for line in lines:
- if line == delimiter:
- yield cur_stmt
- cur_stmt = []
- continue
- cur_stmt.append(line)
- if cur_stmt:
- yield cur_stmt
- reCopyFromStdin = re.compile(b"COPY[^;]+FROM std(?:in|out)", re.I)
- def split_out_file(self, stmts, outdata, logger):
- """Matches SQL & its output in outdata with individual SQL statements in stmts.
- Args:
- stmts ([[str]]): Iterator of SQL statements.
- outdata ([str]): Contents of out-file.
- Yields:
- [([str], [str])]: Sequence of matching parts in sql & out files.
- """
- cur_stmt_out = []
- out_iter = enumerate(outdata)
- echo_none = False
- in_copy_from = False
- no_more_stmts_expected = False
- try:
- line_no, out_line = next(out_iter)
- except StopIteration:
- no_more_stmts_expected = True
- in_line_no = 0
- for i, stmt in enumerate(stmts):
- if no_more_stmts_expected:
- yield stmt, cur_stmt_out
- cur_stmt_out = []
- continue
- try:
- for stmt_line in stmt:
- in_line_no += 1
- if echo_none:
- if stmt_line.startswith(b"\\set ECHO ") and not stmt_line.rstrip().endswith(b"none"):
- echo_none = False
- continue
- if stmt_line.startswith(b"\\set ECHO none"):
- echo_none = True
- if in_copy_from:
- if stmt_line.startswith(b"\\."):
- in_copy_from = False
- continue
- if self.reCopyFromStdin.match(stmt_line):
- in_copy_from = True
- logger.debug("Line %d: %s -> %d: %s", in_line_no, stmt_line, line_no, out_line)
- if stmt_line != out_line:
- raise Exception(f"Mismatch at {line_no}: '{stmt_line}' != '{out_line}'")
- cur_stmt_out.append(out_line)
- line_no, out_line = next(out_iter)
- assert not in_copy_from, f"Missing copy from stdout table end marker \\. at line {in_line_no}"
- if echo_none:
- continue
- try:
- next_stmt = stmts[i + 1]
- except IndexError:
- cur_stmt_out.append(out_line)
- cur_stmt_out.extend(l for _, l in out_iter)
- logger.debug("Last out:\n%s", str(b'\n'.join(cur_stmt_out)))
- yield stmt, cur_stmt_out
- return
- while True:
- while out_line != next_stmt[0]:
- logger.debug("Out: %s -> %s", next_stmt[0], out_line)
- cur_stmt_out.append(out_line)
- line_no, out_line = next(out_iter)
- last_pos = argwhere1(lambda s: self.reCopyFromStdin.match(s), next_stmt, default=len(next_stmt))
- maybe_next_stmt = outdata[line_no : line_no + last_pos]
- logger.debug("Left: %s\nRight: %s", next_stmt, maybe_next_stmt)
- if next_stmt[:last_pos] == maybe_next_stmt:
- break
- cur_stmt_out.append(out_line)
- line_no, out_line = next(out_iter)
- yield stmt, cur_stmt_out
- cur_stmt_out = []
- except StopIteration:
- no_more_stmts_expected = True
- yield stmt, cur_stmt_out
- cur_stmt_out = []
- def load_patches(patchdir):
- for p in patchdir.glob("*.patch"):
- ps = patch.fromfile(p)
- if ps is not False:
- yield p.stem, ps
- reInitScriptsCfgLine = re.compile(r"^([\w.]+):\s*([\w.]+(?:\s+[\w.]+)*)$")
- def load_init_scripts(initscriptscfg, initscriptsdir, tests_set):
- init_scripts_map = dict()
- if not initscriptscfg.is_file():
- LOGGER.warning("Init scripts config file is not found: %s", initscriptscfg)
- return init_scripts_map
- if not initscriptsdir.is_dir():
- LOGGER.warning("Init scripts directory is not found: %s", initscriptsdir)
- return init_scripts_map
- scripts = frozenset(s.stem for s in initscriptsdir.glob("*.sql"))
- with open(initscriptscfg, 'r') as cfg:
- for lineno, line in enumerate(cfg, 1):
- line = line.strip()
- if not line:
- continue
- m = reInitScriptsCfgLine.match(line)
- if m is None:
- LOGGER.warning("Bad line %d in init scripts config %s", lineno, initscriptscfg)
- continue
- test_name = m[1]
- if test_name not in tests_set:
- LOGGER.debug("Skipping init scripts for unknown test case %s", test_name)
- continue
- deps = [(initscriptsdir / s).with_suffix(".sql") for s in m[2].split() if s in scripts]
- if not deps:
- LOGGER.debug("No init scripts are listed for test case %s", test_name)
- continue
- init_scripts_map[test_name] = deps
- return init_scripts_map
- def patch_cases(cases, patches, patchdir):
- for i, sql_full_name in enumerate(cases):
- sql_name = sql_full_name.name
- p = patches.get(sql_name, None)
- if p is None:
- continue
- patched_sql_full_name = patchdir / sql_name
- shutil.copyfile(sql_full_name, patched_sql_full_name)
- success = p.apply(root=patchdir)
- if not success:
- LOGGER.warning(
- "Failed to patch %s testcase. Original version %s will be used", patched_sql_full_name, sql_full_name
- )
- continue
- out_full_name = sql_full_name.with_suffix('.out')
- out_name = out_full_name.name
- patched_out_full_name = patchdir / out_name
- shutil.copyfile(out_full_name, patched_out_full_name)
- p = patches.get(out_name, None)
- if p is None:
- LOGGER.warning(
- "Out-file patch for %s testcase is not found. Original version %s will be used",
- patched_sql_full_name,
- sql_full_name,
- )
- continue
- success = p.apply(root=patchdir)
- if not success:
- LOGGER.warning(
- "Failed to patch out-file for %s testcase. Original version %s will be used",
- patched_sql_full_name,
- sql_full_name,
- )
- continue
- cases[i] = patched_sql_full_name
- LOGGER.info("Patched %s -> %s", sql_full_name, cases[i])
- @click.command()
- @click.argument("cases", type=str, nargs=-1)
- @click.option(
- "--srcdir",
- "-i",
- help="Directory with SQL suits to process",
- required=True,
- multiple=False,
- type=click.Path(exists=True, file_okay=False, resolve_path=True, path_type=Path),
- )
- @click.option(
- "--dstdir",
- "-o",
- help="Output directory",
- required=True,
- multiple=False,
- type=click.Path(exists=True, file_okay=False, resolve_path=True, writable=True, path_type=Path),
- )
- @click.option(
- "--patchdir",
- "-p",
- help="Directory with patches for SQL suits",
- required=False,
- multiple=False,
- type=click.Path(exists=True, file_okay=False, resolve_path=True, path_type=Path),
- )
- @click.option(
- "--udf",
- "-u",
- help="Load shared library with UDF by given path",
- required=False,
- multiple=True,
- type=click.Path(dir_okay=False, resolve_path=True, path_type=Path),
- )
- @click.option(
- "--initscriptscfg",
- help="Config file for tests' init scripts",
- required=False,
- multiple=False,
- type=click.Path(exists=True, dir_okay=False, resolve_path=True, path_type=Path)
- )
- @click.option(
- "--initscriptsdir",
- help="Directory with tests' init scripts",
- required=False,
- multiple=False,
- type=click.Path(exists=True, file_okay=False, resolve_path=True, path_type=Path)
- )
- @click.option("--skip", "-s", help="Comma-separated list of testsuits to skip", multiple=False, type=click.STRING)
- @click.option("--runner", help="Test runner", default=RUNNER, required=False, multiple=False, type=click.STRING)
- @click.option(
- "--splitter", help="SQL statements splitter", default=SPLITTER, required=False, multiple=False, type=click.STRING
- )
- @click.option(
- "--report",
- "-r",
- help="Report file name",
- default=REPORT_FILE,
- required=False,
- multiple=False,
- type=click.Path(dir_okay=False, resolve_path=True, writable=True, path_type=Path),
- )
- @click.option("--parallel/--no-parallel", help="Tests build mode", default=True, required=False)
- @click.option(
- "--logfile",
- "-l",
- help="Log file",
- default=None,
- required=False,
- multiple=False,
- type=click.Path(dir_okay=False, resolve_path=True, writable=True, path_type=Path),
- )
- @click.option("--debug/--no-debug", help="Logs verbosity", default=False, required=False)
- @click.version_option(version=svn_version(), prog_name=PROGRAM_NAME)
- def cli(cases, srcdir, dstdir, patchdir, udf, initscriptscfg, initscriptsdir, skip, runner, splitter, report, parallel, logfile, debug):
- setup_logging(logfile, debug)
- if udf:
- LOGGER.debug("UDFs: %s", udf)
- if skip is not None:
- skip_tests = frozenset(
- test_name if not (test_name := s.strip()).endswith(".sql") else test_name[:-4] for s in skip.split(",")
- )
- else:
- skip_tests = frozenset()
- config = Configuration(
- srcdir, dstdir, udf, patchdir, skip_tests, runner.split(), splitter.split(), report, parallel, logfile, debug
- )
- if not cases:
- cases = [c for c in config.srcdir.glob("*.sql") if c.stem not in skip_tests]
- else:
- cases = [Path(c) if os.path.isabs(c) else config.srcdir / c for c in cases]
- init_scripts = load_init_scripts(initscriptscfg, initscriptsdir, frozenset(c.stem for c in cases))
- LOGGER.debug("Init scripts: %s", init_scripts)
- if config.patchdir is not None:
- patches = dict(load_patches(config.patchdir))
- LOGGER.info("Patches: %s", ", ".join(p for p in patches))
- else:
- patches = {}
- with tempfile.TemporaryDirectory() as tempdir:
- patch_cases(cases, patches, Path(tempdir))
- LOGGER.info("Test cases: %s", ", ".join(c.as_posix() for c in cases))
- builder = TestCaseBuilder(config)
- if config.parallel:
- with Pool() as pool:
- results = list(pool.imap_unordered(builder.build, [(test_case, init_scripts.get(test_case.stem) or []) for test_case in cases]))
- else:
- results = [builder.build(c) for c in cases]
- with open(config.report_path, "w", newline='') as f:
- writer = csv.writer(f, dialect="excel")
- writer.writerow(["testcase", "statements", "successful", "ratio"])
- writer.writerows(sorted(results))
- if __name__ == "__main__":
- try:
- cli()
- finally:
- logging.shutdown()