Browse Source

Move swag from devtools/ to library/python/testing/

alexv-smirnov 1 year ago
parent
commit
9ae2b50e80

+ 297 - 0
library/python/testing/swag/daemon.py

@@ -0,0 +1,297 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import os
+import sys
+import signal
+import tempfile
+import shutil
+
+try:
+    from . import gdb
+except ValueError:
+    import gdb
+
+from yatest.common import process, output_path, TimeoutError, cores
+
+MAX_IO_LEN = 1024 * 10
+GYGABYTES = 1 << 30
+
+logger = logging.getLogger(__name__)
+
+
+def run_daemon(command, check_exit_code=True, shell=False, timeout=5, cwd=None,
+               env=None, stdin=None, stdout=None, stderr=None, creationflags=0):
+    daemon = Daemon(command, check_exit_code, shell, timeout, cwd, env, stdin, stdout, stderr, creationflags)
+    daemon.run()
+    return daemon
+
+
+def get_free_space(path):
+    stats = os.statvfs(path)
+    return stats.f_bavail * stats.f_frsize
+
+
+class DaemonError(RuntimeError):
+    def __init__(self, message, stdout=None, stderr=None, exit_code=None):
+        lst = [
+            "Daemon failed with message: {message}.".format(message=message),
+        ]
+        if exit_code is not None:
+            lst.append(
+                "Process exit_code = {exit_code}.".format(exit_code=exit_code)
+            )
+        if stdout is not None:
+            lst.append(
+                "Stdout: {stdout}".format(stdout=stdout)
+            )
+        if stderr is not None:
+            lst.append(
+                "Stderr: {stderr}".format(stderr=stderr)
+            )
+
+        super(DaemonError, self).__init__('\n'.join(lst))
+
+
+class Daemon(object):
+    def __init__(self, command, check_exit_code=True, shell=False, timeout=5, cwd=None,
+                 env=None, stdin=None, stdout=None, stderr=None, creationflags=0):
+        if cwd is None:
+            cwd = tempfile.mkdtemp()
+        self.cwd = cwd
+
+        self.stdoutf = stdout or tempfile.NamedTemporaryFile(dir=self.cwd, prefix="stdout_", delete=False)
+        self.stderrf = stderr or tempfile.NamedTemporaryFile(dir=self.cwd, prefix="stderr_", delete=False)
+        self.stdinf = stdin or tempfile.NamedTemporaryFile(dir=self.cwd, prefix="stdin_", delete=False)
+
+        self.cmd = command
+        if sys.version_info.major > 2:
+            _basestring = str
+        else:
+            _basestring = basestring
+        if isinstance(command, _basestring):
+            self.cmd = [arg for arg in command.split() if arg]
+        self.daemon = None
+        self.name = os.path.basename(self.cmd[0])
+
+        self._shell = shell
+        self._env = env
+        self._creationflags = creationflags
+
+        self._check_exit_code = check_exit_code
+        self._timeout = timeout
+
+    def before_start(self):
+        pass
+
+    def after_start(self):
+        pass
+
+    def before_stop(self):
+        pass
+
+    def after_stop(self):
+        pass
+
+    def is_alive(self):
+        return self.daemon and self.daemon.running
+
+    def required_args(self):
+        return []
+
+    def check_run(self):
+        """This function checks that daemon is running. By default it
+        checks only the process status. But you can override it to
+        check your binary specific marks like 'port is busy' and
+        others."""
+        return self.is_alive()
+
+    def run(self):
+        if self.check_run():
+            logger.error("Can't run %s.\nProcess already started" % self.cmd)
+            raise DaemonError("daemon already started.")
+
+        try:
+            self.before_start()
+        except Exception:
+            logger.exception("Exception in user hook before_start")
+        self.daemon = process.execute(self.cmd[:1] + self.required_args() + self.cmd[1:],
+                                      False,
+                                      shell=self._shell,
+                                      cwd=self.cwd,
+                                      env=self._env,
+                                      stdin=self.stdinf,
+                                      stdout=self.stdoutf,
+                                      stderr=self.stderrf,
+                                      creationflags=self._creationflags,
+                                      wait=False)
+        stdout, stderr = self.__communicate()
+        timeout_reason_msg = "Failed to execute '{cmd}'.\n\tstdout: {out}\n\tstderr: {err}".format(
+            cmd=" ".join(self.cmd),
+            out=stdout,
+            err=stderr)
+        try:
+            process.wait_for(self.check_run, self._timeout, timeout_reason_msg, sleep_time=0.1)
+        except process.TimeoutError:
+            self.raise_on_death(timeout_reason_msg)
+
+        if not self.is_alive():
+            self.raise_on_death("WHY? %s %s" % (self.daemon, self.daemon.running))
+
+        try:
+            self.after_start()
+        except Exception as e:
+            msg = "Exception in user hook after_start. Exception: %s" % str(e)
+            logger.exception(msg)
+
+        return self
+
+    def raise_on_death(self, additional_text=""):
+        stdout = "[NO STDOUT]"
+        stderr = "[NO STDERR]"
+
+        if self.stdoutf and self.stdinf:
+            stdout, stderr = self.__communicate()
+        if self.daemon and getattr(self.daemon, "process"):
+            self.check_coredump()
+
+        raise DaemonError(
+            Daemon.__log_failed(
+                "process {} unexpectedly finished. \n\n {}".format(self.cmd, additional_text),
+                stdout,
+                stderr
+            )
+        )
+
+    def check_coredump(self):
+        try:
+            core_file = cores.recover_core_dump_file(self.cmd[0], self.cwd, self.daemon.process.pid)
+            if core_file:
+                logger.debug(core_file + " found, maybe this is our coredump file")
+                self.save_coredump(core_file)
+            else:
+                logger.debug("Core dump file was not found")
+        except Exception as e:
+            logger.warn("While checking coredump: " + str(e))
+
+    def save_coredump(self, core_file):
+        output_core_dir = output_path("cores")
+        shared_core_file = os.path.join(output_core_dir, os.path.basename(core_file))
+        if not os.path.isdir(output_core_dir):
+            os.mkdir(output_core_dir)
+
+        short_bt, _ = gdb.dump_traceback(executable=self.cmd[0], core_file=core_file,
+                                         output_file=shared_core_file + ".trace.txt")
+        if short_bt:
+            logger.error("Short backtrace = \n" + "=" * 80 + "\n" + short_bt + "\n" + "=" * 80)
+
+        space_left = float(get_free_space(output_core_dir))
+        if space_left > 5 * GYGABYTES:
+            shutil.copy2(
+                core_file,
+                shared_core_file
+            )
+            os.chmod(shared_core_file, 0o755)
+            logger.debug("Saved to " + output_core_dir)
+
+        else:
+            logger.error("Not enough space left on device (%s GB). Won't save %s file" % (float(space_left / GYGABYTES), core_file))
+
+    def stop(self, kill=False):
+        if not self.is_alive() and self.daemon.exit_code == 0:
+            return
+
+        if not self.is_alive():
+            stdout, stderr = self.__communicate()
+            self.check_coredump()
+            try:
+                self.after_stop()
+            except Exception:
+                logger.exception("Exception in user hook after_stop.")
+
+            raise DaemonError(
+                Daemon.__log_failed(
+                    "process {} unexpectedly finished with exit code {}.".format(self.cmd, self.daemon.exit_code),
+                    stdout,
+                    stderr
+                ),
+                exit_code=self.daemon.exit_code
+            )
+
+        try:
+            self.before_stop()
+        except Exception:
+            logger.exception("Exception in user hook before_stop.")
+
+        stderr, stdout = self.__communicate()
+        timeout_reason_msg = "Cannot stop {cmd}.\n\tstdout: {out}\n\tstderr: {err}".format(
+            cmd=" ".join(self.cmd),
+            out=stdout,
+            err=stderr)
+        if not kill:
+            self.daemon.process.send_signal(signal.SIGINT)
+            try:  # soft wait for. trying to kill with sigint
+                process.wait_for(lambda: not self.is_alive(), self._timeout, timeout_reason_msg, sleep_time=0.1)
+            except TimeoutError:
+                pass
+
+        is_killed = False
+        if self.is_alive():
+            self.daemon.process.send_signal(signal.SIGKILL)
+            is_killed = True
+
+        process.wait_for(lambda: not self.is_alive(), self._timeout, timeout_reason_msg, sleep_time=0.1)
+
+        try:
+            self.after_stop()
+        except Exception:
+            logger.exception("Exception in user hook after_stop")
+
+        if self.daemon.running:
+            stdout, stderr = self.__communicate()
+            msg = "cannot stop daemon {cmd}\n\tstdout: {out}\n\tstderr: {err}".format(
+                cmd=' '.join(self.cmd),
+                out=stdout,
+                err=stderr
+            )
+            logger.error(msg)
+            raise DaemonError(msg, stdout=stdout, stderr=stderr, exit_code=self.daemon.exit_code)
+
+        stdout, stderr = self.__communicate()
+        logger.debug(
+            "Process stopped: {cmd}.\n\tstdout:\n{out}\n\tstderr:\n{err}".format(
+                cmd=" ".join(self.cmd),
+                out=stdout,
+                err=stderr
+            )
+        )
+        if not is_killed:
+            self.check_coredump()
+            if self._check_exit_code and self.daemon.exit_code != 0:
+                stdout, stderr = self.__communicate()
+                raise DaemonError("Bad exit_code.", stdout=stdout, stderr=stderr, exit_code=self.daemon.exit_code)
+        else:
+            logger.warning("Exit code is not checked, cos binary was stopped by sigkill")
+
+    def _read_io(self, file_obj):
+        file_obj.flush()
+
+        cur_pos = file_obj.tell()
+        seek_pos_from_end = max(-cur_pos, -MAX_IO_LEN)
+        file_obj.seek(seek_pos_from_end, os.SEEK_END)
+        return file_obj.read()
+
+    def __communicate(self):
+        stderr = self._read_io(self.stderrf)
+        stdout = self._read_io(self.stdoutf)
+        return stdout, stderr
+
+    @staticmethod
+    def __log_failed(msg, stderr, stdout):
+        final_msg = '{msg}\nstdout: {out}\nstderr: {err}'.format(
+            msg=msg,
+            out=stdout,
+            err=stderr)
+        logger.error(msg)
+        return final_msg

+ 59 - 0
library/python/testing/swag/gdb.py

@@ -0,0 +1,59 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+
+import yatest.common
+
+logger = logging.getLogger(__name__)
+
+SHORT_DUMP_COMMAND = "{gdb} {executable} {core_file} --eval-command='backtrace full' --batch -q"
+LONG_DUMP_COMMAND = "{gdb} {executable} {core_file} --eval-command='thread apply all bt full' --batch -q"
+
+
+def get_gdb():
+    return yatest.common.gdb_path()
+
+
+def run_gdb_command(command, stdout_file, stderr_file):
+    logger.debug("Running gdb command %s" % command)
+
+    with open(stdout_file, "w") as out, open(stderr_file, "w") as err:
+        yatest.common.process.execute(
+            command,
+            check_exit_code=True,
+            wait=True,
+            shell=True,
+            stdout=out,
+            stderr=err
+        )
+
+
+def dump_traceback(executable, core_file, output_file):
+    """
+    Dumps traceback if its possible
+
+    :param executable: binary for gdb
+    :param core_file: core file for gdb
+    :param output_file: file to dump traceback to, also dump full traceback to <output_file + ".full">
+
+    :return: string tuple (short_backtrace, full_backtrace)
+    """
+    try:
+        gdb = get_gdb()
+        short_dump_command = SHORT_DUMP_COMMAND.format(gdb=gdb, executable=executable, core_file=core_file)
+        long_dump_command = LONG_DUMP_COMMAND.format(gdb=gdb, executable=executable, core_file=core_file)
+        run_gdb_command(short_dump_command, output_file, output_file + '.err')
+        output_file_full = output_file + ".full"
+        output_file_full_err = output_file_full + '.err'
+        run_gdb_command(long_dump_command, output_file_full, output_file_full_err)
+    except Exception:
+        logger.exception("Failed to print trace")
+        return '', ''
+
+    short_backtrace = ''
+    full_backtrace = ''
+    with open(output_file) as o, open(output_file_full) as e:
+        short_backtrace = o.read()
+        full_backtrace = e.read()
+    return short_backtrace, full_backtrace

+ 20 - 0
library/python/testing/swag/lib/ya.make

@@ -0,0 +1,20 @@
+PY23_LIBRARY()
+
+PEERDIR(
+    contrib/python/protobuf
+    library/python/testing/yatest_common
+)
+
+SRCDIR(library/python/testing/swag)
+
+PY_SRCS(
+    NAMESPACE library.python.testing.swag
+
+    daemon.py
+    gdb.py
+    pathutil.py
+    ports.py
+    proto_traversals.py
+)
+
+END()

+ 26 - 0
library/python/testing/swag/pathutil.py

@@ -0,0 +1,26 @@
+import os
+import tempfile
+
+
+def get_valid_filename(filename, dirname):
+    current_file, counter = filename, 0
+    while os.path.exists(os.path.join(dirname, current_file)):
+        current_file = "%s_%d" % (filename, counter)
+        counter += 1
+    valid_path = os.path.join(dirname, current_file)
+    os.mknod(valid_path)
+    return valid_path
+
+
+def get_valid_tmpdir(name, tmp_dir):
+    current_dir, counter = name, 0
+    while os.path.exists(os.path.join(tmp_dir, current_dir)):
+        current_dir = "%s_%d" % (name, counter)
+        counter += 1
+    os.mkdir(os.path.join(tmp_dir, current_dir))
+    return os.path.join(tmp_dir, current_dir)
+
+
+def get_base_tmpdir(name):
+    tmppath = tempfile.gettempdir()
+    return get_valid_tmpdir(name, tmppath)

+ 30 - 0
library/python/testing/swag/ports.py

@@ -0,0 +1,30 @@
+import random
+import socket
+
+ATTEMPTS = 25
+# range 10000-10199 is reserved for Skynet on Sandbox machines
+MIN_PORT = 10200
+MAX_PORT = 25000
+
+
+def is_port_open(host, port):
+    _socket = socket.socket(socket.AF_INET)
+    return _socket.connect_ex((host, port)) != 0
+
+
+def find_free_port(range_start=MIN_PORT, range_end=MAX_PORT, attempts=ATTEMPTS):
+    """
+    Finds free port
+
+    :param range_start: start of range
+    :param range_end: end of range
+    :param attempts: number of tries to find free port
+
+    :return: some open port in a given range
+    """
+    ports = [random.randint(range_start, range_end) for _ in range(attempts)]
+    while ports:
+        port = ports.pop()
+        if is_port_open('', port):
+            return port
+    raise RuntimeError('Could not find free port in range = ' + str((range_start, range_end)))

+ 74 - 0
library/python/testing/swag/proto_traversals.py

@@ -0,0 +1,74 @@
+import copy
+from google.protobuf.descriptor import FieldDescriptor as fdescriptor
+
+"""Recursive tree traversals for protobuf. Each message
+   is node, each field is leaf. Function walks through
+   proto and in each node do smth."""
+
+
+def search(proto, fname=None, ftype=None):
+    for desc, obj in proto.ListFields():
+        if desc.name == fname and (ftype is None or ftype == desc.type):
+            return (obj, desc, proto)
+        if desc.type == fdescriptor.TYPE_MESSAGE:
+            objs = obj if desc.label == fdescriptor.LABEL_REPEATED else [obj]
+            for one_obj in objs:
+                return search(one_obj, fname, ftype)
+    return None
+
+
+def search_and_process(proto, return_func=lambda params, child_values=None: params,
+                       recalc_params_func=lambda proto, obj, desc, params: params,
+                       params=None):
+    """Search and process each node. Recalc params on each step. Pass it down
+    the tree. On each leaf calcs return value from param, and pass it up. Nodes
+    calc return value with current param and childs return values.
+
+    Args:
+      * proto -- current node. to run through some proto, put its object here
+      * return_func -- function that return value. takes current (recalced for current
+      *                node) param and list of return values for current node children.
+      *                for leafs second parametr is None
+      * recalc_params_func -- function to recalc params in node. takes root proto,
+      *                       current object (or objects for repeated fields), current
+      *                       proto descriptor and param. return new param value
+      * params -- initial values for params"""
+    if proto is None:
+        return None
+
+    return_values = []
+    for desc, obj in proto.ListFields():
+        params = copy.deepcopy(params)
+        if desc.type == fdescriptor.TYPE_MESSAGE:
+            objs = obj if desc.label == fdescriptor.LABEL_REPEATED else [obj]
+            params = recalc_params_func(proto, obj, desc, params)
+            for one_obj in objs:
+                return_values.append(search_and_process(one_obj, return_func,
+                                                        recalc_params_func, params))
+        else:
+            return_values.append(return_func(recalc_params_func(proto, obj, desc, params), None))
+    return return_func(params, return_values)
+
+
+def search_and_process_descriptors(proto_desc,
+                                   return_func=lambda params, child_values=None: params,
+                                   recalc_params_func=lambda desc, params: params,
+                                   params=None):
+    """Same as search and process(except we run recalc_params in root_proto too),
+    but process each node from PROTOBUF DESCRIPTIO, instead of each node from
+    protobuf message."""
+    params = copy.deepcopy(params)
+    params = recalc_params_func(proto_desc, params)
+
+    if proto_desc is None:
+        return None
+    elif hasattr(proto_desc, "type") and proto_desc.type != fdescriptor.TYPE_MESSAGE:
+        return return_func(params, None)
+
+    return_values = []
+    for field_desc in proto_desc.fields:
+        desc = field_desc if field_desc.message_type is None else field_desc.message_type
+        return_values.append(search_and_process_descriptors(desc, return_func,
+                                                            recalc_params_func, params))
+
+    return return_func(params, return_values)

+ 1 - 1
ydb/library/yql/tests/sql/sql2yql/ya.make

@@ -32,7 +32,7 @@ ENDIF()
     )
     PEERDIR(
         ydb/library/yql/tests/common/test_framework
-        devtools/swag/lib
+        library/python/testing/swag/lib
     )