yqlrun.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. import os
  2. import shutil
  3. import yatest.common
  4. import yql_utils
  5. import cyson as yson
  6. import yql.essentials.providers.common.proto.gateways_config_pb2 as gateways_config_pb2
  7. import yql.essentials.core.file_storage.proto.file_storage_pb2 as file_storage_pb2
  8. import six
  9. from google.protobuf import text_format
  10. ARCADIA_PREFIX = 'arcadia/'
  11. ARCADIA_TESTS_DATA_PREFIX = 'arcadia_tests_data/'
  12. VAR_CHAR_PREFIX = '$'
  13. FIX_DIR_PREFIXES = {
  14. 'SOURCE': yatest.common.source_path,
  15. 'BUILD': yatest.common.build_path,
  16. 'TEST_SOURCE': yatest.common.test_source_path,
  17. 'DATA': yatest.common.data_path,
  18. 'BINARY': yatest.common.binary_path,
  19. }
  20. class YQLRun(object):
  21. def __init__(self, udfs_dir=None, prov='yt', use_sql2yql=False, keep_temp=True, binary=None, gateway_config=None, fs_config=None, extra_args=[], cfg_dir=None, support_udfs=True):
  22. if binary is None:
  23. self.yqlrun_binary = yql_utils.yql_binary_path(os.getenv('YQL_YQLRUN_PATH') or 'yql/tools/yqlrun/yqlrun')
  24. else:
  25. self.yqlrun_binary = binary
  26. self.extra_args = extra_args
  27. try:
  28. self.sql2yql_binary = yql_utils.yql_binary_path(os.getenv('YQL_SQL2YQL_PATH') or 'yql/essentials/tools/sql2yql/sql2yql')
  29. except BaseException:
  30. self.sql2yql_binary = None
  31. try:
  32. self.udf_resolver_binary = yql_utils.yql_binary_path(os.getenv('YQL_UDFRESOLVER_PATH') or 'yql/essentials/tools/udf_resolver/udf_resolver')
  33. except Exception:
  34. self.udf_resolver_binary = None
  35. if support_udfs:
  36. if udfs_dir is None:
  37. self.udfs_path = yql_utils.get_udfs_path()
  38. else:
  39. self.udfs_path = udfs_dir
  40. else:
  41. self.udfs_path = None
  42. res_dir = yql_utils.get_yql_dir(prefix='yqlrun_')
  43. self.res_dir = res_dir
  44. self.tables = {}
  45. self.prov = prov
  46. self.use_sql2yql = use_sql2yql
  47. self.keep_temp = keep_temp
  48. self.gateway_config = gateways_config_pb2.TGatewaysConfig()
  49. if gateway_config is not None:
  50. text_format.Merge(gateway_config, self.gateway_config)
  51. yql_utils.merge_default_gateway_cfg(cfg_dir or 'yql/essentials/cfg/tests', self.gateway_config)
  52. self.fs_config = file_storage_pb2.TFileStorageConfig()
  53. with open(yql_utils.yql_source_path(os.path.join(cfg_dir or 'yql/essentials/cfg/tests', 'fs.conf'))) as f:
  54. text_format.Merge(f.read(), self.fs_config)
  55. if fs_config is not None:
  56. text_format.Merge(fs_config, self.fs_config)
  57. if yql_utils.get_param('USE_NATIVE_YT_TYPES'):
  58. attr = self.gateway_config.Yt.DefaultSettings.add()
  59. attr.Name = 'UseNativeYtTypes'
  60. attr.Value = 'true'
  61. if yql_utils.get_param('SQL_FLAGS'):
  62. flags = yql_utils.get_param('SQL_FLAGS').split(',')
  63. self.gateway_config.SqlCore.TranslationFlags.extend(flags)
  64. def yql_exec(self, program=None, program_file=None, files=None, urls=None,
  65. run_sql=False, verbose=False, check_error=True, tables=None, pretty_plan=True,
  66. wait=True, parameters={}, extra_env={}, require_udf_resolver=False, scan_udfs=True):
  67. del pretty_plan
  68. res_dir = self.res_dir
  69. def res_file_path(name):
  70. return os.path.join(res_dir, name)
  71. opt_file = res_file_path('opt.yql')
  72. results_file = res_file_path('results.txt')
  73. plan_file = res_file_path('plan.txt')
  74. err_file = res_file_path('err.txt')
  75. udfs_dir = self.udfs_path
  76. prov = self.prov
  77. program, program_file = yql_utils.prepare_program(program, program_file, res_dir,
  78. ext='sql' if run_sql else 'yql')
  79. syntax_version = yql_utils.get_syntax_version(program)
  80. ansi_lexer = yql_utils.ansi_lexer_enabled(program)
  81. if run_sql and self.use_sql2yql:
  82. orig_sql = program_file + '.orig_sql'
  83. shutil.copy2(program_file, orig_sql)
  84. cmd = [
  85. self.sql2yql_binary,
  86. orig_sql,
  87. '--yql',
  88. '--output=' + program_file,
  89. '--syntax-version=%d' % syntax_version
  90. ]
  91. if ansi_lexer:
  92. cmd.append('--ansi-lexer')
  93. env = {'YQL_DETERMINISTIC_MODE': '1'}
  94. env.update(extra_env)
  95. for var in [
  96. 'LLVM_PROFILE_FILE',
  97. 'GO_COVERAGE_PREFIX',
  98. 'PYTHON_COVERAGE_PREFIX',
  99. 'NLG_COVERAGE_FILENAME',
  100. 'YQL_EXPORT_PG_FUNCTIONS_DIR',
  101. 'YQL_ALLOW_ALL_PG_FUNCTIONS',
  102. ]:
  103. if var in os.environ:
  104. env[var] = os.environ[var]
  105. yatest.common.process.execute(cmd, cwd=res_dir, env=env)
  106. with open(program_file) as f:
  107. yql_program = f.read()
  108. with open(program_file, 'w') as f:
  109. f.write(yql_program)
  110. gateways_cfg_file = res_file_path('gateways.conf')
  111. with open(gateways_cfg_file, 'w') as f:
  112. f.write(str(self.gateway_config))
  113. fs_cfg_file = res_file_path('fs.conf')
  114. with open(fs_cfg_file, 'w') as f:
  115. f.write(str(self.fs_config))
  116. cmd = self.yqlrun_binary + ' '
  117. if yql_utils.get_param('TRACE_OPT'):
  118. cmd += '--trace-opt '
  119. cmd += '-L ' \
  120. '--program=%(program_file)s ' \
  121. '--expr-file=%(opt_file)s ' \
  122. '--result-file=%(results_file)s ' \
  123. '--plan-file=%(plan_file)s ' \
  124. '--err-file=%(err_file)s ' \
  125. '--gateways=%(prov)s ' \
  126. '--syntax-version=%(syntax_version)d ' \
  127. '--gateways-cfg=%(gateways_cfg_file)s ' \
  128. '--fs-cfg=%(fs_cfg_file)s ' % locals()
  129. if prov != 'pure':
  130. cmd += '--tmp-dir=%(res_dir)s ' % locals()
  131. if self.udfs_path is not None:
  132. cmd += '--udfs-dir=%(udfs_dir)s ' % locals()
  133. if ansi_lexer:
  134. cmd += '--ansi-lexer '
  135. if self.keep_temp and prov != 'pure':
  136. cmd += '--keep-temp '
  137. if self.extra_args:
  138. cmd += " ".join(self.extra_args) + " "
  139. cmd += '--mounts=' + yql_utils.get_mount_config_file() + ' '
  140. cmd += '--validate-result-format '
  141. if files:
  142. for f in files:
  143. if files[f].startswith(ARCADIA_PREFIX): # how does it work with folders? and does it?
  144. files[f] = yatest.common.source_path(files[f][len(ARCADIA_PREFIX):])
  145. continue
  146. if files[f].startswith(ARCADIA_TESTS_DATA_PREFIX):
  147. files[f] = yatest.common.data_path(files[f][len(ARCADIA_TESTS_DATA_PREFIX):])
  148. continue
  149. if files[f].startswith(VAR_CHAR_PREFIX):
  150. for prefix, func in six.iteritems(FIX_DIR_PREFIXES):
  151. if files[f].startswith(VAR_CHAR_PREFIX + prefix):
  152. real_path = func(files[f][len(prefix) + 2:]) # $ + prefix + /
  153. break
  154. else:
  155. raise Exception("unknown prefix in file path %s" % (files[f],))
  156. copy_dest = os.path.join(res_dir, f)
  157. if not os.path.exists(os.path.dirname(copy_dest)):
  158. os.makedirs(os.path.dirname(copy_dest))
  159. shutil.copy2(
  160. real_path,
  161. copy_dest,
  162. )
  163. files[f] = f
  164. continue
  165. if not files[f].startswith('/'): # why do we check files[f] instead of f here?
  166. path_to_copy = os.path.join(
  167. yatest.common.work_path(),
  168. files[f]
  169. )
  170. if '/' in files[f]:
  171. copy_dest = os.path.join(
  172. res_dir,
  173. os.path.dirname(files[f])
  174. )
  175. if not os.path.exists(copy_dest):
  176. os.makedirs(copy_dest)
  177. else:
  178. copy_dest = res_dir
  179. files[f] = os.path.basename(files[f])
  180. shutil.copy2(path_to_copy, copy_dest)
  181. else:
  182. shutil.copy2(files[f], res_dir)
  183. files[f] = os.path.basename(files[f])
  184. cmd += yql_utils.get_cmd_for_files('--file', files)
  185. if urls:
  186. cmd += yql_utils.get_cmd_for_files('--url', urls)
  187. optimize_only = False
  188. if tables:
  189. for table in tables:
  190. self.tables[table.full_name] = table
  191. if table.format != 'yson':
  192. optimize_only = True
  193. for name in self.tables:
  194. cmd += '--table=yt.%s@%s ' % (name, self.tables[name].yqlrun_file)
  195. if "--lineage" not in self.extra_args and "--peephole" not in self.extra_args:
  196. if optimize_only:
  197. cmd += '-O '
  198. else:
  199. cmd += '--run '
  200. if yql_utils.get_param('UDF_RESOLVER') or require_udf_resolver:
  201. assert self.udf_resolver_binary, "Missing udf_resolver binary"
  202. cmd += '--udf-resolver=' + self.udf_resolver_binary + ' '
  203. if scan_udfs:
  204. cmd += '--scan-udfs '
  205. if not yatest.common.context.sanitize:
  206. cmd += '--udf-resolver-filter-syscalls '
  207. if run_sql and not self.use_sql2yql:
  208. cmd += '--sql '
  209. if parameters:
  210. parameters_file = res_file_path('params.yson')
  211. with open(parameters_file, 'w') as f:
  212. f.write(six.ensure_str(yson.dumps(parameters)))
  213. cmd += '--params-file=%s ' % parameters_file
  214. if verbose:
  215. yql_utils.log('prov is ' + self.prov)
  216. env = {'YQL_DETERMINISTIC_MODE': '1'}
  217. env.update(extra_env)
  218. for var in [
  219. 'LLVM_PROFILE_FILE',
  220. 'GO_COVERAGE_PREFIX',
  221. 'PYTHON_COVERAGE_PREFIX',
  222. 'NLG_COVERAGE_FILENAME',
  223. 'YQL_EXPORT_PG_FUNCTIONS_DIR',
  224. 'YQL_ALLOW_ALL_PG_FUNCTIONS',
  225. ]:
  226. if var in os.environ:
  227. env[var] = os.environ[var]
  228. if yql_utils.get_param('STDERR'):
  229. debug_udfs_dir = os.path.join(os.path.abspath('.'), '..', '..', '..')
  230. env_setters = ";".join("{}={}".format(k, v) for k, v in six.iteritems(env))
  231. yql_utils.log('GDB launch command:')
  232. yql_utils.log('(cd "%s" && %s ya tool gdb --args %s)' % (res_dir, env_setters, cmd.replace(udfs_dir, debug_udfs_dir)))
  233. proc_result = yatest.common.process.execute(cmd.strip().split(), check_exit_code=False, cwd=res_dir, env=env)
  234. if proc_result.exit_code != 0 and check_error:
  235. with open(err_file, 'r') as f:
  236. err_file_text = f.read()
  237. assert 0, \
  238. 'Command\n%(command)s\n finished with exit code %(code)d, stderr:\n\n%(stderr)s\n\nerror file:\n%(err_file)s' % {
  239. 'command': cmd,
  240. 'code': proc_result.exit_code,
  241. 'stderr': proc_result.std_err,
  242. 'err_file': err_file_text
  243. }
  244. if os.path.exists(results_file) and os.stat(results_file).st_size == 0:
  245. os.unlink(results_file) # kikimr yql-exec compatibility
  246. results, log_results = yql_utils.read_res_file(results_file)
  247. plan, log_plan = yql_utils.read_res_file(plan_file)
  248. opt, log_opt = yql_utils.read_res_file(opt_file)
  249. err, log_err = yql_utils.read_res_file(err_file)
  250. if verbose:
  251. yql_utils.log('PROGRAM:')
  252. yql_utils.log(program)
  253. yql_utils.log('OPT:')
  254. yql_utils.log(log_opt)
  255. yql_utils.log('PLAN:')
  256. yql_utils.log(log_plan)
  257. yql_utils.log('RESULTS:')
  258. yql_utils.log(log_results)
  259. yql_utils.log('ERROR:')
  260. yql_utils.log(log_err)
  261. return yql_utils.YQLExecResult(
  262. proc_result.std_out,
  263. yql_utils.normalize_source_code_path(err.replace(res_dir, '<tmp_path>')),
  264. results,
  265. results_file,
  266. opt,
  267. opt_file,
  268. plan,
  269. plan_file,
  270. program,
  271. proc_result,
  272. None
  273. )
  274. def create_empty_tables(self, tables):
  275. pass
  276. def write_tables(self, tables):
  277. pass
  278. def get_tables(self, tables):
  279. res = {}
  280. for table in tables:
  281. # recreate table after yql program was executed
  282. res[table.full_name] = yql_utils.new_table(
  283. table.full_name,
  284. yqlrun_file=self.tables[table.full_name].yqlrun_file,
  285. res_dir=self.res_dir
  286. )
  287. yql_utils.log('YQLRun table ' + table.full_name)
  288. yql_utils.log(res[table.full_name].content)
  289. return res