# coding: utf-8 import base64 import errno import sys import os import logging import fnmatch import json import time import traceback import collections import signal import inspect import warnings import faulthandler import py import pytest import six import _pytest import _pytest._io import _pytest.mark import _pytest.outcomes import _pytest.skipping from _pytest.warning_types import PytestUnhandledCoroutineWarning from yatest_lib import test_splitter import yatest.common as yatest_common from library.python.pytest.plugins.metrics import test_metrics try: import resource except ImportError: resource = None try: import library.python.pytest.yatest_tools as tools except ImportError: # fallback for pytest script mode import yatest_tools as tools try: from library.python import filelock except ImportError: filelock = None import yatest_lib.tools import yatest_lib.external as canon import yatest_lib.ya from library.python.pytest import context console_logger = logging.getLogger("console") yatest_logger = logging.getLogger("ya.test") _pytest.main.EXIT_NOTESTSCOLLECTED = 0 SHUTDOWN_REQUESTED = False pytest_config = None def configure_pdb_on_demand(): import signal if hasattr(signal, "SIGUSR1"): def on_signal(*args): import ipdb ipdb.set_trace() signal.signal(signal.SIGUSR1, on_signal) class CustomImporter(object): def __init__(self, roots): self._roots = roots def find_spec(self, fullname, path, target=None): return None def find_module(self, fullname, package_path=None): for path in self._roots: full_path = self._get_module_path(path, fullname) if os.path.exists(full_path) and os.path.isdir(full_path) and not os.path.exists(os.path.join(full_path, "__init__.py")): open(os.path.join(full_path, "__init__.py"), "w").close() return None def _get_module_path(self, path, fullname): return os.path.join(path, *fullname.split('.')) class YaTestLoggingFileHandler(logging.FileHandler): pass class _TokenFilterFormatter(logging.Formatter): def __init__(self, fmt): super(_TokenFilterFormatter, self).__init__(fmt) self._replacements = [] if not self._replacements: if six.PY2: for k, v in os.environ.iteritems(): if k.endswith('TOKEN') and v: self._replacements.append(v) elif six.PY3: for k, v in os.environ.items(): if k.endswith('TOKEN') and v: self._replacements.append(v) self._replacements = sorted(self._replacements) def _filter(self, s): for r in self._replacements: s = s.replace(r, "[SECRET]") return s def format(self, record): return self._filter(super(_TokenFilterFormatter, self).format(record)) def setup_logging(log_path, level=logging.DEBUG, *other_logs): logs = [log_path] + list(other_logs) root_logger = logging.getLogger() for i in range(len(root_logger.handlers) - 1, -1, -1): if isinstance(root_logger.handlers[i], YaTestLoggingFileHandler): root_logger.handlers.pop(i).close() root_logger.setLevel(level) for log_file in logs: file_handler = YaTestLoggingFileHandler(log_file) log_format = '%(asctime)s - %(levelname)s - %(name)s - %(funcName)s: %(message)s' file_handler.setFormatter(_TokenFilterFormatter(log_format)) file_handler.setLevel(level) root_logger.addHandler(file_handler) def pytest_addoption(parser): parser.addoption("--build-root", action="store", dest="build_root", default="", help="path to the build root") parser.addoption("--dep-root", action="append", dest="dep_roots", default=[], help="path to the dep build roots") parser.addoption("--source-root", action="store", dest="source_root", default="", help="path to the source root") parser.addoption("--data-root", action="store", dest="data_root", default="", help="path to the arcadia_tests_data root") parser.addoption("--output-dir", action="store", dest="output_dir", default="", help="path to the test output dir") parser.addoption("--python-path", action="store", dest="python_path", default="", help="path the canonical python binary") parser.addoption("--valgrind-path", action="store", dest="valgrind_path", default="", help="path the canonical valgring binary") parser.addoption("--test-filter", action="append", dest="test_filter", default=None, help="test filter") parser.addoption("--test-file-filter", action="store", dest="test_file_filter", default=None, help="test file filter") parser.addoption("--test-param", action="append", dest="test_params", default=None, help="test parameters") parser.addoption("--test-log-level", action="store", dest="test_log_level", choices=["critical", "error", "warning", "info", "debug"], default="debug", help="test log level") parser.addoption("--mode", action="store", choices=[yatest_lib.ya.RunMode.List, yatest_lib.ya.RunMode.Run], dest="mode", default=yatest_lib.ya.RunMode.Run, help="testing mode") parser.addoption("--test-list-file", action="store", dest="test_list_file") parser.addoption("--modulo", default=1, type=int) parser.addoption("--modulo-index", default=0, type=int) parser.addoption("--partition-mode", default='SEQUENTIAL', help="Split tests according to partitoin mode") parser.addoption("--split-by-tests", action='store_true', help="Split test execution by tests instead of suites", default=False) parser.addoption("--project-path", action="store", default="", help="path to CMakeList where test is declared") parser.addoption("--build-type", action="store", default="", help="build type") parser.addoption("--flags", action="append", dest="flags", default=[], help="build flags (-D)") parser.addoption("--sanitize", action="store", default="", help="sanitize mode") parser.addoption("--test-stderr", action="store_true", default=False, help="test stderr") parser.addoption("--test-debug", action="store_true", default=False, help="test debug mode") parser.addoption("--root-dir", action="store", default=None) parser.addoption("--ya-trace", action="store", dest="ya_trace_path", default=None, help="path to ya trace report") parser.addoption("--ya-version", action="store", dest="ya_version", default=0, type=int, help="allows to be compatible with ya and the new changes in ya-dev") parser.addoption( "--test-suffix", action="store", dest="test_suffix", default=None, help="add suffix to every test name" ) parser.addoption("--gdb-path", action="store", dest="gdb_path", default="", help="path the canonical gdb binary") parser.addoption("--collect-cores", action="store_true", dest="collect_cores", default=False, help="allows core dump file recovering during test") parser.addoption("--sanitizer-extra-checks", action="store_true", dest="sanitizer_extra_checks", default=False, help="enables extra checks for tests built with sanitizers") parser.addoption("--report-deselected", action="store_true", dest="report_deselected", default=False, help="report deselected tests to the trace file") parser.addoption("--pdb-on-sigusr1", action="store_true", default=False, help="setup pdb.set_trace on SIGUSR1") parser.addoption("--test-tool-bin", help="Path to test_tool") parser.addoption("--test-list-path", dest="test_list_path", action="store", help="path to test list", default="") def from_ya_test(): return "YA_TEST_RUNNER" in os.environ @pytest.hookimpl(tryfirst=True) def pytest_configure(config): global pytest_config pytest_config = config config.option.continue_on_collection_errors = True config.from_ya_test = from_ya_test() config.test_logs = collections.defaultdict(dict) test_metrics.metrics = {} config.suite_metrics = {} config.configure_timestamp = time.time() context = { "project_path": config.option.project_path, "test_stderr": config.option.test_stderr, "test_debug": config.option.test_debug, "build_type": config.option.build_type, "test_traceback": config.option.tbstyle, "flags": config.option.flags, "sanitize": config.option.sanitize, } if config.option.collectonly: config.option.mode = yatest_lib.ya.RunMode.List config.ya = yatest_lib.ya.Ya( config.option.mode, config.option.source_root, config.option.build_root, config.option.dep_roots, config.option.output_dir, config.option.test_params, context, config.option.python_path, config.option.valgrind_path, config.option.gdb_path, config.option.data_root, ) config.option.test_log_level = { "critical": logging.CRITICAL, "error": logging.ERROR, "warning": logging.WARN, "info": logging.INFO, "debug": logging.DEBUG, }[config.option.test_log_level] if not config.option.collectonly: setup_logging(os.path.join(config.ya.output_dir, "run.log"), config.option.test_log_level) config.current_item_nodeid = None config.current_test_name = None config.test_cores_count = 0 config.collect_cores = config.option.collect_cores config.sanitizer_extra_checks = config.option.sanitizer_extra_checks try: config.test_tool_bin = config.option.test_tool_bin except AttributeError: logging.info("test_tool_bin not specified") if config.sanitizer_extra_checks: for envvar in ['LSAN_OPTIONS', 'ASAN_OPTIONS']: if envvar in os.environ: os.environ.pop(envvar) if envvar + '_ORIGINAL' in os.environ: os.environ[envvar] = os.environ[envvar + '_ORIGINAL'] extra_sys_path = [] # Arcadia paths from the test DEPENDS section of ya.make extra_sys_path.append(os.path.join(config.option.source_root, config.option.project_path)) # Build root is required for correct import of protobufs, because imports are related to the root # (like import devtools.dummy_arcadia.protos.lib.my_proto_pb2) extra_sys_path.append(config.option.build_root) for path in config.option.dep_roots: if os.path.isabs(path): extra_sys_path.append(path) else: extra_sys_path.append(os.path.join(config.option.source_root, path)) sys_path_set = set(sys.path) for path in extra_sys_path: if path not in sys_path_set: sys.path.append(path) sys_path_set.add(path) os.environ["PYTHONPATH"] = os.pathsep.join(sys.path) if not config.option.collectonly: if config.option.ya_trace_path: config.ya_trace_reporter = TraceReportGenerator(config.option.ya_trace_path) else: config.ya_trace_reporter = DryTraceReportGenerator(config.option.ya_trace_path) config.ya_version = config.option.ya_version sys.meta_path.append(CustomImporter([config.option.build_root] + [os.path.join(config.option.build_root, dep) for dep in config.option.dep_roots])) if config.option.pdb_on_sigusr1: configure_pdb_on_demand() yatest_common.runtime._set_ya_config(config=config) # Dump python backtrace in case of any errors faulthandler.enable() if hasattr(signal, "SIGQUIT"): # SIGQUIT is used by test_tool to teardown tests which overruns timeout faulthandler.register(signal.SIGQUIT, chain=True) if hasattr(signal, "SIGUSR2"): signal.signal(signal.SIGUSR2, _graceful_shutdown) # register custom markers config.addinivalue_line( "markers", "xfaildiff: Allows to mark test which is expected to have a diff with canonical data" ) session_should_exit = False def _graceful_shutdown_on_log(should_exit): if should_exit: pytest.exit("Graceful shutdown requested") def pytest_runtest_logreport(report): _graceful_shutdown_on_log(session_should_exit) def pytest_runtest_logstart(nodeid, location): _graceful_shutdown_on_log(session_should_exit) def pytest_runtest_logfinish(nodeid, location): _graceful_shutdown_on_log(session_should_exit) def _graceful_shutdown(*args): global session_should_exit session_should_exit = True try: import library.python.coverage library.python.coverage.stop_coverage_tracing() except ImportError: pass traceback.print_stack(file=sys.stderr) capman = pytest_config.pluginmanager.getplugin("capturemanager") capman.suspend(in_=True) _graceful_shutdown_on_log(not capman.is_globally_capturing()) def _get_rusage(): return resource and resource.getrusage(resource.RUSAGE_SELF) def _collect_test_rusage(item): if resource and hasattr(item, "rusage"): finish_rusage = _get_rusage() ya_inst = pytest_config.ya def add_metric(attr_name, metric_name=None, modifier=None): if not metric_name: metric_name = attr_name if not modifier: def modifier(x): return x if hasattr(item.rusage, attr_name): ya_inst.set_metric_value(metric_name, modifier(getattr(finish_rusage, attr_name) - getattr(item.rusage, attr_name))) for args in [ ("ru_maxrss", "ru_rss", lambda x: x*1024), # to be the same as in util/system/rusage.cpp ("ru_utime",), ("ru_stime",), ("ru_ixrss", None, lambda x: x*1024), ("ru_idrss", None, lambda x: x*1024), ("ru_isrss", None, lambda x: x*1024), ("ru_majflt", "ru_major_pagefaults"), ("ru_minflt", "ru_minor_pagefaults"), ("ru_nswap",), ("ru_inblock",), ("ru_oublock",), ("ru_msgsnd",), ("ru_msgrcv",), ("ru_nsignals",), ("ru_nvcsw",), ("ru_nivcsw",), ]: add_metric(*args) def _get_item_tags(item): tags = [] for key, value in item.keywords.items(): if key == 'pytestmark' and isinstance(value, list): for mark in value: tags.append(mark.name) elif isinstance(value, _pytest.mark.MarkDecorator): tags.append(key) return tags def pytest_runtest_setup(item): item.rusage = _get_rusage() pytest_config.test_cores_count = 0 pytest_config.current_item_nodeid = item.nodeid class_name, test_name = tools.split_node_id(item.nodeid) test_log_path = tools.get_test_log_file_path(pytest_config.ya.output_dir, class_name, test_name) setup_logging( os.path.join(pytest_config.ya.output_dir, "run.log"), pytest_config.option.test_log_level, test_log_path ) pytest_config.test_logs[item.nodeid]['log'] = test_log_path pytest_config.test_logs[item.nodeid]['logsdir'] = pytest_config.ya.output_dir pytest_config.current_test_log_path = test_log_path pytest_config.current_test_name = "{}::{}".format(class_name, test_name) separator = "#" * 100 yatest_logger.info(separator) yatest_logger.info(test_name) yatest_logger.info(separator) yatest_logger.info("Test setup") test_item = CrashedTestItem(item.nodeid, item.location[0], pytest_config.option.test_suffix) pytest_config.ya_trace_reporter.on_start_test_class(test_item) pytest_config.ya_trace_reporter.on_start_test_case(test_item) def pytest_runtest_teardown(item, nextitem): yatest_logger.info("Test teardown") def pytest_runtest_call(item): class_name, test_name = tools.split_node_id(item.nodeid) yatest_logger.info("Test call (class_name: %s, test_name: %s)", class_name, test_name) def pytest_deselected(items): config = pytest_config if config.option.report_deselected: for item in items: deselected_item = DeselectedTestItem(item.nodeid, item.location[0], config.option.test_suffix) config.ya_trace_reporter.on_start_test_class(deselected_item) config.ya_trace_reporter.on_finish_test_case(deselected_item) config.ya_trace_reporter.on_finish_test_class(deselected_item) @pytest.hookimpl(trylast=True) def pytest_collection_modifyitems(items, config): def filter_items(filters): filtered_items = [] deselected_items = [] for item in items: canonical_node_id = str(CustomTestItem(item.nodeid, item.location[0], pytest_config.option.test_suffix)) matched = False for flt in filters: if "::" not in flt and "*" not in flt: flt += "*" # add support for filtering by module name if canonical_node_id.endswith(flt) or fnmatch.fnmatch(tools.escape_for_fnmatch(canonical_node_id), tools.escape_for_fnmatch(flt)): matched = True if matched: filtered_items.append(item) else: deselected_items.append(item) config.hook.pytest_deselected(items=deselected_items) items[:] = filtered_items def filter_by_full_name(filters): filter_set = {flt for flt in filters} filtered_items = [] deselected_items = [] for item in items: if item.nodeid in filter_set: filtered_items.append(item) else: deselected_items.append(item) config.hook.pytest_deselected(items=deselected_items) items[:] = filtered_items # XXX - check to be removed when tests for peerdirs don't run for item in items: if not item.nodeid: item._nodeid = os.path.basename(item.location[0]) if os.path.exists(config.option.test_list_path): with open(config.option.test_list_path, 'r') as afile: chunks = json.load(afile) filters = chunks[config.option.modulo_index] filter_by_full_name(filters) else: if config.option.test_filter: filter_items(config.option.test_filter) partition_mode = config.option.partition_mode modulo = config.option.modulo if modulo > 1: items[:] = sorted(items, key=lambda item: item.nodeid) modulo_index = config.option.modulo_index split_by_tests = config.option.split_by_tests items_by_classes = {} res = [] for item in items: if item.nodeid.count("::") == 2 and not split_by_tests: class_name = item.nodeid.rsplit("::", 1)[0] if class_name not in items_by_classes: items_by_classes[class_name] = [] res.append(items_by_classes[class_name]) items_by_classes[class_name].append(item) else: res.append([item]) chunk_items = test_splitter.get_splitted_tests(res, modulo, modulo_index, partition_mode, is_sorted=True) items[:] = [] for item in chunk_items: items.extend(item) yatest_logger.info("Modulo %s tests are: %s", modulo_index, chunk_items) if config.option.mode == yatest_lib.ya.RunMode.Run: for item in items: test_item = NotLaunchedTestItem(item.nodeid, item.location[0], config.option.test_suffix) config.ya_trace_reporter.on_start_test_class(test_item) config.ya_trace_reporter.on_finish_test_case(test_item) config.ya_trace_reporter.on_finish_test_class(test_item) elif config.option.mode == yatest_lib.ya.RunMode.List: tests = [] for item in items: item = CustomTestItem(item.nodeid, item.location[0], pytest_config.option.test_suffix, item.keywords) record = { "class": item.class_name, "test": item.test_name, "tags": _get_item_tags(item), } tests.append(record) if config.option.test_list_file: with open(config.option.test_list_file, 'w') as afile: json.dump(tests, afile) # TODO prettyboy remove after test_tool release - currently it's required for backward compatibility sys.stderr.write(json.dumps(tests)) def pytest_collectreport(report): if not report.passed: if hasattr(pytest_config, 'ya_trace_reporter'): test_item = TestItem(report, None, None, pytest_config.option.test_suffix) pytest_config.ya_trace_reporter.on_error(test_item) else: sys.stderr.write(yatest_lib.tools.to_utf8(report.longrepr)) @pytest.hookimpl(tryfirst=True) def pytest_pyfunc_call(pyfuncitem): testfunction = pyfuncitem.obj iscoroutinefunction = getattr(inspect, "iscoroutinefunction", None) if iscoroutinefunction is not None and iscoroutinefunction(testfunction): msg = "Coroutine functions are not natively supported and have been skipped.\n" msg += "You need to install a suitable plugin for your async framework, for example:\n" msg += " - pytest-asyncio\n" msg += " - pytest-trio\n" msg += " - pytest-tornasync" warnings.warn(PytestUnhandledCoroutineWarning(msg.format(pyfuncitem.nodeid))) _pytest.outcomes.skip(msg="coroutine function and no async plugin installed (see warnings)") funcargs = pyfuncitem.funcargs testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames} pyfuncitem.retval = testfunction(**testargs) return True @pytest.hookimpl(hookwrapper=True) def pytest_runtest_makereport(item, call): def logreport(report, result, call, markers): test_item = TestItem(report, item.location[0], result, pytest_config.option.test_suffix, markers=markers) if not pytest_config.suite_metrics and context.Ctx.get("YA_PYTEST_START_TIMESTAMP"): pytest_config.suite_metrics["pytest_startup_duration"] = call.start - context.Ctx["YA_PYTEST_START_TIMESTAMP"] pytest_config.ya_trace_reporter.dump_suite_metrics() pytest_config.ya_trace_reporter.on_log_report(test_item) if report.outcome == "failed": yatest_logger.error(report.longrepr) if report.when == "call": _collect_test_rusage(item) pytest_config.ya_trace_reporter.on_finish_test_case(test_item) elif report.when == "setup": pytest_config.ya_trace_reporter.on_start_test_class(test_item) if report.outcome != "passed": pytest_config.ya_trace_reporter.on_start_test_case(test_item) pytest_config.ya_trace_reporter.on_finish_test_case(test_item) else: pytest_config.ya_trace_reporter.on_start_test_case(test_item) elif report.when == "teardown": if report.outcome == "failed": pytest_config.ya_trace_reporter.on_start_test_case(test_item) pytest_config.ya_trace_reporter.on_finish_test_case(test_item) else: pytest_config.ya_trace_reporter.on_finish_test_case(test_item, duration_only=True) pytest_config.ya_trace_reporter.on_finish_test_class(test_item) outcome = yield rep = outcome.get_result() result = None if hasattr(item, 'retval') and item.retval is not None: result = item.retval if not pytest_config.from_ya_test: ti = TestItem(rep, item.location[0], result, pytest_config.option.test_suffix) tr = pytest_config.pluginmanager.getplugin('terminalreporter') tr.write_line("{} - Validating canonical data is not supported when running standalone binary".format(ti), yellow=True, bold=True) logreport(rep, result, call, item.own_markers) def pytest_make_parametrize_id(config, val, argname): # Avoid <, > symbols in canondata file names if inspect.isfunction(val) and val.__name__ == "": return str(argname) return None def get_formatted_error(report): if isinstance(report.longrepr, tuple): text = "" for entry in report.longrepr: text += colorize(entry) else: text = colorize(report.longrepr) text = yatest_lib.tools.to_utf8(text) return text def colorize(longrepr): # use default pytest colorization if pytest_config.option.tbstyle != "short": io = py.io.TextIO() if six.PY2: writer = py.io.TerminalWriter(file=io) else: writer = _pytest._io.TerminalWriter(file=io) # enable colorization writer.hasmarkup = True if hasattr(longrepr, 'reprtraceback') and hasattr(longrepr.reprtraceback, 'toterminal'): longrepr.reprtraceback.toterminal(writer) return io.getvalue().strip() return yatest_lib.tools.to_utf8(longrepr) # Use arcadia style colorization text = yatest_lib.tools.to_utf8(longrepr) return tools.colorize_pytest_error(text) class TestItem(object): def __init__(self, report, location, result, test_suffix, markers=None): self._result = result self.nodeid = report.nodeid self._class_name, self._test_name = tools.split_node_id(self.nodeid, test_suffix) self._error = "" self._status = None self._location = location self._duration = hasattr(report, 'duration') and report.duration or 0 self._keywords = getattr(report, "keywords", {}) self._xfaildiff = any(m.name == 'xfaildiff' for m in (markers or [])) self._process_report(report) def _process_report(self, report): if report.longrepr: self.set_error(report) if hasattr(report, 'when') and report.when != "call": self.set_error(report.when + " failed:\n" + self._error) report_teststatus = _pytest.skipping.pytest_report_teststatus(report) if report_teststatus is not None: report_teststatus = report_teststatus[0] if report_teststatus == 'xfailed': self._status = 'xfail' self.set_error(report.wasxfail or 'test was marked as xfail', 'imp') elif report_teststatus == 'xpassed': self._status = 'xpass' self.set_error("Test unexpectedly passed") elif report.skipped: self._status = 'skipped' self.set_error(yatest_lib.tools.to_utf8(report.longrepr[-1])) elif report.passed: if self._xfaildiff: self._status = 'xfaildiff' else: self._status = 'good' else: self._status = 'fail' @property def status(self): return self._status def set_status(self, status): self._status = status @property def test_name(self): return tools.normalize_name(self._test_name) @property def class_name(self): return tools.normalize_name(self._class_name) @property def error(self): return self._error @property def location(self): return self._location def set_error(self, entry, marker='bad'): assert entry != "" if isinstance(entry, _pytest.reports.BaseReport): self._error = get_formatted_error(entry) else: self._error = "[[{}]]{}".format(yatest_lib.tools.to_str(marker), yatest_lib.tools.to_str(entry)) @property def duration(self): return self._duration @property def result(self): if 'not_canonize' in self._keywords: return None return self._result @property def keywords(self): return self._keywords def __str__(self): return "{}::{}".format(self.class_name, self.test_name) class CustomTestItem(TestItem): def __init__(self, nodeid, location, test_suffix, keywords=None): self._result = None self.nodeid = nodeid self._location = location self._class_name, self._test_name = tools.split_node_id(nodeid, test_suffix) self._duration = 0 self._error = "" self._keywords = keywords if keywords is not None else {} class NotLaunchedTestItem(CustomTestItem): def __init__(self, nodeid, location, test_suffix): super(NotLaunchedTestItem, self).__init__(nodeid, location, test_suffix) self._status = "not_launched" class CrashedTestItem(CustomTestItem): def __init__(self, nodeid, location, test_suffix): super(CrashedTestItem, self).__init__(nodeid, location, test_suffix) self._status = "crashed" class DeselectedTestItem(CustomTestItem): def __init__(self, nodeid, location, test_suffix): super(DeselectedTestItem, self).__init__(nodeid, location, test_suffix) self._status = "deselected" class TraceReportGenerator(object): def __init__(self, out_file_path): self._filename = out_file_path self._file = open(out_file_path, 'w') self._wreckage_filename = out_file_path + '.wreckage' self._test_messages = {} self._test_duration = {} # Some machinery to avoid data corruption due sloppy fork() self._current_test = (None, None) self._pid = os.getpid() self._check_intricate_respawn() def _check_intricate_respawn(self): pid_file = self._filename + '.pid' try: # python2 doesn't support open(f, 'x') afile = os.fdopen(os.open(pid_file, os.O_WRONLY | os.O_EXCL | os.O_CREAT), 'w') afile.write(str(self._pid)) afile.close() return except OSError as e: if e.errno != errno.EEXIST: raise # Looks like the test binary was respawned if from_ya_test(): try: with open(pid_file) as afile: prev_pid = afile.read() except Exception as e: prev_pid = '(failed to obtain previous pid: {})'.format(e) parts = [ "Aborting test run: test machinery found that the test binary {} has already been run before.".format(sys.executable), "Looks like test has incorrect respawn/relaunch logic within test binary.", "Test should not try to restart itself - this is a poorly designed test case that leads to errors and could corrupt internal test machinery files.", "Debug info: previous pid:{} current:{}".format(prev_pid, self._pid), ] msg = '\n'.join(parts) yatest_logger.error(msg) if filelock: lock = filelock.FileLock(self._wreckage_filename + '.lock') lock.acquire() with open(self._wreckage_filename, 'a') as afile: self._file = afile self._dump_trace('chunk_event', {"errors": [('fail', '[[bad]]' + msg)]}) raise Exception(msg) else: # Test binary is launched without `ya make -t`'s testing machinery - don't rely on clean environment pass def on_start_test_class(self, test_item): pytest_config.ya.set_test_item_node_id(test_item.nodeid) class_name = test_item.class_name.decode('utf-8') if sys.version_info[0] < 3 else test_item.class_name self._current_test = (class_name, None) def on_finish_test_class(self, test_item): pytest_config.ya.set_test_item_node_id(test_item.nodeid) def on_start_test_case(self, test_item): class_name = yatest_lib.tools.to_utf8(test_item.class_name) subtest_name = yatest_lib.tools.to_utf8(test_item.test_name) message = { 'class': class_name, 'subtest': subtest_name, } # Enable when CI is ready, see YA-465 if False and test_item.location: # noqa PLR1727 message['path'] = test_item.location if test_item.nodeid in pytest_config.test_logs: message['logs'] = pytest_config.test_logs[test_item.nodeid] pytest_config.ya.set_test_item_node_id(test_item.nodeid) self._current_test = (class_name, subtest_name) self.trace('subtest-started', message) def on_finish_test_case(self, test_item, duration_only=False): if test_item.result is not None: try: result = canon.serialize(test_item.result) except Exception as e: yatest_logger.exception("Error while serializing test results") test_item.set_error("Invalid test result: {}".format(e)) test_item.set_status("fail") result = None else: result = None if duration_only and test_item.nodeid in self._test_messages: # add teardown time message = self._test_messages[test_item.nodeid] else: comment = self._test_messages[test_item.nodeid]['comment'] if test_item.nodeid in self._test_messages else '' comment += self._get_comment(test_item) message = { 'class': yatest_lib.tools.to_utf8(test_item.class_name), 'subtest': yatest_lib.tools.to_utf8(test_item.test_name), 'status': test_item.status, 'comment': comment, 'result': result, 'metrics': test_metrics.get(test_item.nodeid), 'is_diff_test': 'diff_test' in test_item.keywords, 'tags': _get_item_tags(test_item), } # Enable when CI is ready, see YA-465 if False and test_item.location: # noqa PLR1727 message['path'] = test_item.location if test_item.nodeid in pytest_config.test_logs: message['logs'] = pytest_config.test_logs[test_item.nodeid] message['time'] = self._test_duration.get(test_item.nodeid, test_item.duration) self.trace('subtest-finished', message) self._test_messages[test_item.nodeid] = message def dump_suite_metrics(self): message = {"metrics": pytest_config.suite_metrics} self.trace("chunk_event", message) def on_error(self, test_item): self.trace('chunk_event', {"errors": [(test_item.status, self._get_comment(test_item))]}) def on_log_report(self, test_item): if test_item.nodeid in self._test_duration: self._test_duration[test_item.nodeid] += test_item._duration else: self._test_duration[test_item.nodeid] = test_item._duration @staticmethod def _get_comment(test_item, limit=8*1024): msg = yatest_lib.tools.to_utf8(test_item.error) if not msg: return "" if len(msg) > limit: msg = msg[:limit - 3] + "..." return msg + "[[rst]]" def _dump_trace(self, name, value): event = { 'timestamp': time.time(), 'value': value, 'name': name } data = yatest_lib.tools.to_str(json.dumps(event, ensure_ascii=False)) self._file.write(data + '\n') self._file.flush() def _check_sloppy_fork(self, name, value): if self._pid == os.getpid(): return yatest_logger.error("Skip tracing to avoid data corruption, name = %s, value = %s", name, value) try: # Lock wreckage tracefile to avoid race if multiple tests use fork sloppily if filelock: lock = filelock.FileLock(self._wreckage_filename + '.lock') lock.acquire() with open(self._wreckage_filename, 'a') as afile: self._file = afile parts = [ "It looks like you have leaked process - it could corrupt internal test machinery files.", "Usually it happens when you casually use fork() without os._exit(),", "which results in two pytest processes running at the same time.", "Pid of the original pytest's process is {}, however current process has {} pid.".format(self._pid, os.getpid()), ] if self._current_test[1]: parts.append("Most likely the problem is in '{}' test.".format(self._current_test)) else: parts.append("Most likely new process was created before any test was launched (during the import stage?).") if value.get('comment'): comment = value.get('comment', '').strip() # multiline comment newline_required = '\n' if '\n' in comment else '' parts.append("Debug info: name = '{}' comment:{}{}".format(name, newline_required, comment)) else: val_str = json.dumps(value, ensure_ascii=False).encode('utf-8') parts.append("Debug info: name = '{}' value = '{}'".format(name, base64.b64encode(val_str))) msg = "[[bad]]{}".format('\n'.join(parts)) class_name, subtest_name = self._current_test if subtest_name: data = { 'class': class_name, 'subtest': subtest_name, 'status': 'fail', 'comment': msg, } # overwrite original status self._dump_trace('subtest-finished', data) else: self._dump_trace('chunk_event', {"errors": [('fail', msg)]}) except Exception as e: yatest_logger.exception(e) finally: os._exit(38) def trace(self, name, value): self._check_sloppy_fork(name, value) self._dump_trace(name, value) class DryTraceReportGenerator(TraceReportGenerator): """ Generator does not write any information. """ def __init__(self, *args, **kwargs): self._test_messages = {} self._test_duration = {} def trace(self, name, value): pass