import codecs import os import pytest import re import cyson import yql.essentials.providers.common.proto.gateways_config_pb2 as gateways_config_pb2 from google.protobuf import text_format from yql_utils import execute, get_supported_providers, get_tables, get_files, get_http_files, \ get_pragmas, KSV_ATTR, is_xfail, get_param, YQLExecResult, yql_binary_path, do_custom_error_check from yqlrun import YQLRun from test_utils import get_parameters_json, replace_vars def get_gateways_config(http_files, yql_http_file_server, force_blocks=False, is_hybrid=False, allow_llvm=True): config = None if http_files or force_blocks or is_hybrid or not allow_llvm: config_message = gateways_config_pb2.TGatewaysConfig() if http_files: schema = config_message.Fs.CustomSchemes.add() schema.Pattern = 'http_test://(.*)' schema.TargetUrl = yql_http_file_server.compose_http_link('$1') if force_blocks: config_message.SqlCore.TranslationFlags.extend(['EmitAggApply', 'EmitTableSource']) flags = config_message.YqlCore.Flags.add() flags.Name = 'UseBlocks' if is_hybrid: activate_hybrid = config_message.Yt.DefaultSettings.add() activate_hybrid.Name = "HybridDqExecution" activate_hybrid.Value = "1" deactivate_dq = config_message.Dq.DefaultSettings.add() deactivate_dq.Name = "AnalyzeQuery" deactivate_dq.Value = "0" if not allow_llvm: flags = config_message.YqlCore.Flags.add() flags.Name = 'LLVM_OFF' config = text_format.MessageToString(config_message) return config def is_hybrid(provider): return provider == 'hybrid' def check_provider(provider, config): if provider == 'pure': return if provider not in get_supported_providers(config): pytest.skip('%s provider is not supported here' % provider) def get_sql_query(provider, suite, case, config, data_path=None, template='.sql'): pragmas = get_pragmas(config) if get_param('TARGET_PLATFORM'): if "yson" in case or "regexp" in case or "match" in case: pytest.skip('yson/match/regexp is not supported on non-default target platform') if get_param('TARGET_PLATFORM') and is_xfail(config): pytest.skip('xfail is not supported on non-default target platform') program_sql = os.path.join(data_path, suite, '%s%s' % (case, template)) with codecs.open(program_sql, encoding='utf-8') as program_file_descr: sql_query = program_file_descr.read() if get_param('TARGET_PLATFORM'): if "Yson::" in sql_query: pytest.skip('yson udf is not supported on non-default target platform') if (provider + 'file can not' in sql_query) or (is_hybrid(provider) and ('ytfile can not' in sql_query)): pytest.skip(provider + ' can not execute this') pragmas.append(sql_query) sql_query = ';\n'.join(pragmas) if provider != 'yt' and provider != 'pure' and 'Javascript' in sql_query: pytest.skip('ScriptUdf') assert 'UseBlocks' not in sql_query, 'UseBlocks should not be used directly, only via ForceBlocks' return sql_query def run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server, yqlrun_binary=None, extra_args=[], force_blocks=False, allow_llvm=True, data_path=None, run_sql=True): check_provider(provider, config) sql_query = get_sql_query(provider, suite, case, config, data_path, template='.sql' if run_sql else '.yqls') sql_query = replace_vars(sql_query, "yqlrun_var") xfail = is_xfail(config) in_tables, out_tables = get_tables(suite, config, data_path, def_attr=KSV_ATTR) if provider != 'pure' else (None, None) files = get_files(suite, config, data_path) http_files = get_http_files(suite, config, data_path) http_files_urls = yql_http_file_server.register_files({}, http_files) if in_tables is not None: for table in in_tables: if cyson.loads(table.attr).get("type") == "document": content = table.content else: content = table.attr if provider != 'yt' and 'Javascript' in content: pytest.skip('ScriptUdf') parameters = get_parameters_json(suite, config, data_path) yqlrun = YQLRun( prov=provider, keep_temp=not re.search(r"yt\.ReleaseTempData", sql_query), binary=yqlrun_binary, gateway_config=get_gateways_config(http_files, yql_http_file_server, force_blocks=force_blocks, is_hybrid=is_hybrid(provider), allow_llvm=allow_llvm), extra_args=extra_args, udfs_dir=yql_binary_path('yql/essentials/tests/common/test_framework/udfs_deps') ) res, tables_res = execute( yqlrun, program=sql_query, input_tables=in_tables, output_tables=out_tables, files=files, urls=http_files_urls, check_error=not xfail, run_sql=run_sql, verbose=True, parameters=parameters) fixed_stderr = res.std_err if xfail: assert res.execution_result.exit_code != 0 do_custom_error_check(res, sql_query) fixed_stderr = None fixed_result = YQLExecResult(res.std_out, fixed_stderr, res.results, res.results_file, res.opt, res.opt_file, res.plan, res.plan_file, res.program, res.execution_result, None) return fixed_result, tables_res def run_file(provider, suite, case, cfg, config, yql_http_file_server, yqlrun_binary=None, extra_args=[], force_blocks=False, allow_llvm=True, data_path=None, run_sql=True): if (suite, case, cfg) not in run_file.cache: run_file.cache[(suite, case, cfg)] = \ run_file_no_cache(provider, suite, case, cfg, config, yql_http_file_server, yqlrun_binary, extra_args, force_blocks=force_blocks, allow_llvm=allow_llvm, data_path=data_path, run_sql=run_sql) return run_file.cache[(suite, case, cfg)] run_file.cache = {}