yql_utils.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083
  1. from __future__ import print_function
  2. import hashlib
  3. import io
  4. import os
  5. import os.path
  6. import six
  7. import sys
  8. import re
  9. import tempfile
  10. import shutil
  11. from google.protobuf import text_format
  12. from collections import namedtuple, defaultdict, OrderedDict
  13. from functools import partial
  14. import codecs
  15. import decimal
  16. from threading import Lock
  17. import pytest
  18. import yatest.common
  19. import cyson
  20. import logging
  21. import getpass
  22. logger = logging.getLogger(__name__)
  23. KSV_ATTR = '''{_yql_row_spec={
  24. Type=[StructType;
  25. [[key;[DataType;String]];
  26. [subkey;[DataType;String]];
  27. [value;[DataType;String]]]]}}'''
  28. def get_param(name, default=None):
  29. name = 'YQL_' + name.upper()
  30. return yatest.common.get_param(name, os.environ.get(name) or default)
  31. def do_custom_query_check(res, sql_query):
  32. custom_check = re.search(r"/\* custom check:(.*)\*/", sql_query)
  33. if not custom_check:
  34. return False
  35. custom_check = custom_check.group(1)
  36. yt_res_yson = res.results
  37. yt_res_yson = cyson.loads(yt_res_yson) if yt_res_yson else cyson.loads("[]")
  38. yt_res_yson = replace_vals(yt_res_yson)
  39. assert eval(custom_check), 'Condition "%(custom_check)s" fails\nResult:\n %(yt_res_yson)s\n' % locals()
  40. return True
  41. def do_custom_error_check(res, sql_query):
  42. err_string = None
  43. custom_error = re.search(r"/\* custom error:(.*)\*/", sql_query)
  44. if custom_error:
  45. err_string = custom_error.group(1).strip()
  46. assert err_string, 'Expected custom error check in test.\nTest error: %s' % res.std_err
  47. log('Custom error: ' + err_string)
  48. assert err_string in res.std_err, '"' + err_string + '" is not found'
  49. def get_gateway_cfg_suffix():
  50. default_suffix = None
  51. return get_param('gateway_config_suffix', default_suffix) or ''
  52. def get_gateway_cfg_filename():
  53. suffix = get_gateway_cfg_suffix()
  54. if suffix == '':
  55. return 'gateways.conf'
  56. else:
  57. return 'gateways-' + suffix + '.conf'
  58. def merge_default_gateway_cfg(cfg_dir, gateway_config):
  59. with open(yql_source_path(os.path.join(cfg_dir, 'gateways.conf'))) as f:
  60. text_format.Merge(f.read(), gateway_config)
  61. suffix = get_gateway_cfg_suffix()
  62. if suffix:
  63. with open(yql_source_path(os.path.join(cfg_dir, 'gateways-' + suffix + '.conf'))) as f:
  64. text_format.Merge(f.read(), gateway_config)
  65. def find_file(path):
  66. arcadia_root = '.'
  67. while '.arcadia.root' not in os.listdir(arcadia_root):
  68. arcadia_root = os.path.join(arcadia_root, '..')
  69. res = os.path.abspath(os.path.join(arcadia_root, path))
  70. assert os.path.exists(res)
  71. return res
  72. output_path_cache = {}
  73. def yql_output_path(*args, **kwargs):
  74. if not get_param('LOCAL_BENCH_XX'):
  75. # abspath is needed, because output_path may be relative when test is run directly (without ya make).
  76. return os.path.abspath(yatest.common.output_path(*args, **kwargs))
  77. else:
  78. if args and args in output_path_cache:
  79. return output_path_cache[args]
  80. res = os.path.join(tempfile.mkdtemp(prefix='yql_tmp_'), *args)
  81. if args:
  82. output_path_cache[args] = res
  83. return res
  84. def yql_binary_path(*args, **kwargs):
  85. if not get_param('LOCAL_BENCH_XX'):
  86. return yatest.common.binary_path(*args, **kwargs)
  87. else:
  88. return find_file(args[0])
  89. def yql_source_path(*args, **kwargs):
  90. if not get_param('LOCAL_BENCH_XX'):
  91. return yatest.common.source_path(*args, **kwargs)
  92. else:
  93. return find_file(args[0])
  94. def yql_work_path():
  95. return os.path.abspath('.')
  96. YQLExecResult = namedtuple('YQLExecResult', (
  97. 'std_out',
  98. 'std_err',
  99. 'results',
  100. 'results_file',
  101. 'opt',
  102. 'opt_file',
  103. 'plan',
  104. 'plan_file',
  105. 'program',
  106. 'execution_result',
  107. 'statistics'
  108. ))
  109. Table = namedtuple('Table', (
  110. 'name',
  111. 'full_name',
  112. 'content',
  113. 'file',
  114. 'yqlrun_file',
  115. 'attr',
  116. 'format',
  117. 'exists'
  118. ))
  119. def new_table(full_name, file_path=None, yqlrun_file=None, content=None, res_dir=None,
  120. attr=None, format_name='yson', def_attr=None, should_exist=False, src_file_alternative=None):
  121. assert '.' in full_name, 'expected name like cedar.Input'
  122. name = '.'.join(full_name.split('.')[1:])
  123. if res_dir is None:
  124. res_dir = get_yql_dir('table_')
  125. exists = True
  126. if content is None:
  127. # try read content from files
  128. src_file = file_path or yqlrun_file
  129. if src_file is None:
  130. # nonexistent table, will be output for query
  131. content = ''
  132. exists = False
  133. else:
  134. if os.path.exists(src_file):
  135. with open(src_file, 'rb') as f:
  136. content = f.read()
  137. elif src_file_alternative and os.path.exists(src_file_alternative):
  138. with open(src_file_alternative, 'rb') as f:
  139. content = f.read()
  140. src_file = src_file_alternative
  141. yqlrun_file, src_file_alternative = src_file_alternative, yqlrun_file
  142. else:
  143. content = ''
  144. exists = False
  145. file_path = os.path.join(res_dir, name + '.txt')
  146. new_yqlrun_file = os.path.join(res_dir, name + '.yqlrun.txt')
  147. if exists:
  148. with open(file_path, 'wb') as f:
  149. f.write(content)
  150. # copy or create yqlrun_file in proper dir
  151. if yqlrun_file is not None:
  152. shutil.copyfile(yqlrun_file, new_yqlrun_file)
  153. else:
  154. with open(new_yqlrun_file, 'wb') as f:
  155. f.write(content)
  156. else:
  157. assert not should_exist, locals()
  158. if attr is None:
  159. # try read from file
  160. attr_file = None
  161. if os.path.exists(file_path + '.attr'):
  162. attr_file = file_path + '.attr'
  163. elif yqlrun_file is not None and os.path.exists(yqlrun_file + '.attr'):
  164. attr_file = yqlrun_file + '.attr'
  165. elif src_file_alternative is not None and os.path.exists(src_file_alternative + '.attr'):
  166. attr_file = src_file_alternative + '.attr'
  167. if attr_file is not None:
  168. with open(attr_file) as f:
  169. attr = f.read()
  170. if attr is None:
  171. attr = def_attr
  172. if attr is not None:
  173. # probably we get it, now write attr file to proper place
  174. attr_file = new_yqlrun_file + '.attr'
  175. with open(attr_file, 'w') as f:
  176. f.write(attr)
  177. return Table(
  178. name,
  179. full_name,
  180. content,
  181. file_path,
  182. new_yqlrun_file,
  183. attr,
  184. format_name,
  185. exists
  186. )
  187. def ensure_dir_exists(dir):
  188. # handle race between isdir and mkdir
  189. if os.path.isdir(dir):
  190. return
  191. try:
  192. os.mkdir(dir)
  193. except OSError:
  194. if not os.path.isdir(dir):
  195. raise
  196. def get_yql_dir(prefix):
  197. yql_dir = yql_output_path('yql')
  198. ensure_dir_exists(yql_dir)
  199. res_dir = tempfile.mkdtemp(prefix=prefix, dir=yql_dir)
  200. os.chmod(res_dir, 0o755)
  201. return res_dir
  202. def get_cmd_for_files(arg, files):
  203. cmd = ' '.join(
  204. arg + ' ' + name + '@' + files[name]
  205. for name in files
  206. )
  207. cmd += ' '
  208. return cmd
  209. def read_res_file(file_path):
  210. if os.path.exists(file_path):
  211. with codecs.open(file_path, encoding="utf-8") as descr:
  212. res = descr.read().strip()
  213. if res == '':
  214. log_res = '<EMPTY>'
  215. else:
  216. log_res = res
  217. else:
  218. res = ''
  219. log_res = '<NOTHING>'
  220. return res, log_res
  221. def normalize_yson(y):
  222. from cyson import YsonBoolean, YsonEntity
  223. if isinstance(y, YsonBoolean) or isinstance(y, bool):
  224. return 'true' if y else 'false'
  225. if isinstance(y, YsonEntity) or y is None:
  226. return None
  227. if isinstance(y, list):
  228. return [normalize_yson(i) for i in y]
  229. if isinstance(y, dict):
  230. return {normalize_yson(k): normalize_yson(v) for k, v in six.iteritems(y)}
  231. if isinstance(y, bytes):
  232. return y
  233. if isinstance(y, six.text_type):
  234. return y.encode('utf-8')
  235. return str(y).encode('ascii')
  236. volatile_attrs = {'DataSize', 'ModifyTime', 'Id', 'Revision'}
  237. current_user = getpass.getuser()
  238. def _replace_vals_impl(y):
  239. if isinstance(y, list):
  240. return [_replace_vals_impl(i) for i in y]
  241. if isinstance(y, dict):
  242. return {_replace_vals_impl(k): _replace_vals_impl(v) for k, v in six.iteritems(y) if k not in volatile_attrs}
  243. if isinstance(y, bytes):
  244. s = y.replace(b'tmp/yql/' + current_user.encode('ascii') + b'/', b'tmp/')
  245. s = re.sub(b'tmp/[0-9a-f]+-[0-9a-f]+-[0-9a-f]+-[0-9a-f]+', b'tmp/<temp_table_guid>', s)
  246. return s
  247. if isinstance(y, str):
  248. s = y.replace('tmp/yql/' + current_user + '/', 'tmp/')
  249. s = re.sub('tmp/[0-9a-f]+-[0-9a-f]+-[0-9a-f]+-[0-9a-f]+', 'tmp/<temp_table_guid>', s)
  250. return s
  251. return y
  252. def replace_vals(y):
  253. y = normalize_yson(y)
  254. y = _replace_vals_impl(y)
  255. return y
  256. def patch_yson_vals(y, patcher):
  257. if isinstance(y, list):
  258. return [patch_yson_vals(i, patcher) for i in y]
  259. if isinstance(y, dict):
  260. return {patch_yson_vals(k, patcher): patch_yson_vals(v, patcher) for k, v in six.iteritems(y)}
  261. if isinstance(y, str):
  262. return patcher(y)
  263. return y
  264. floatRe = re.compile(r'^-?\d*\.\d+$')
  265. floatERe = re.compile(r'^-?(\d*\.)?\d+e[\+\-]?\d+$', re.IGNORECASE)
  266. specFloatRe = re.compile(r'^(-?inf|nan)$', re.IGNORECASE)
  267. def fix_double(x):
  268. if floatRe.match(x) and len(x.replace('.', '').replace('-', '')) > 10:
  269. # Emulate the same double precision as C++ code has
  270. decimal.getcontext().rounding = decimal.ROUND_HALF_DOWN
  271. decimal.getcontext().prec = 10
  272. return str(decimal.Decimal(0) + decimal.Decimal(x)).rstrip('0')
  273. if floatERe.match(x):
  274. # Emulate the same double precision as C++ code has
  275. decimal.getcontext().rounding = decimal.ROUND_HALF_DOWN
  276. decimal.getcontext().prec = 10
  277. return str(decimal.Decimal(0) + decimal.Decimal(x)).lower()
  278. if specFloatRe.match(x):
  279. return x.lower()
  280. return x
  281. def remove_volatile_ast_parts(ast):
  282. return re.sub(r"\(KiClusterConfig '\('\(.*\) '\"\d\" '\"\d\" '\"\d\"\)\)", "(KiClusterConfig)", ast)
  283. def prepare_program(program, program_file, yql_dir, ext='yql'):
  284. assert not (program is None and program_file is None), 'Needs program or program_file'
  285. if program is None:
  286. with codecs.open(program_file, encoding='utf-8') as program_file_descr:
  287. program = program_file_descr.read()
  288. program_file = os.path.join(yql_dir, 'program.' + ext)
  289. with codecs.open(program_file, 'w', encoding='utf-8') as program_file_descr:
  290. program_file_descr.write(program)
  291. return program, program_file
  292. def get_program_cfg(suite, case, data_path):
  293. ret = []
  294. config = os.path.join(data_path, suite if suite else '', case + '.cfg')
  295. if not os.path.exists(config):
  296. config = os.path.join(data_path, suite if suite else '', 'default.cfg')
  297. if os.path.exists(config):
  298. with open(config, 'r') as f:
  299. for line in f:
  300. if line.strip():
  301. ret.append(tuple(line.split()))
  302. else:
  303. in_filename = case + '.in'
  304. in_path = os.path.join(data_path, in_filename)
  305. default_filename = 'default.in'
  306. default_path = os.path.join(data_path, default_filename)
  307. for filepath in [in_path, in_filename, default_path, default_filename]:
  308. if os.path.exists(filepath):
  309. try:
  310. shutil.copy2(filepath, in_path)
  311. except shutil.Error:
  312. pass
  313. ret.append(('in', 'yamr.plato.Input', in_path))
  314. break
  315. if not is_os_supported(ret):
  316. pytest.skip('%s not supported here' % sys.platform)
  317. return ret
  318. def find_user_file(suite, path, data_path):
  319. source_path = os.path.join(data_path, suite, path)
  320. if os.path.exists(source_path):
  321. return source_path
  322. else:
  323. try:
  324. return yql_binary_path(path)
  325. except Exception:
  326. raise Exception('Can not find file ' + path)
  327. def get_input_tables(suite, cfg, data_path, def_attr=None):
  328. in_tables = []
  329. for item in cfg:
  330. if item[0] in ('in', 'out'):
  331. io, table_name, file_name = item
  332. if io == 'in':
  333. in_tables.append(new_table(
  334. full_name=table_name.replace('yamr.', '').replace('yt.', ''),
  335. yqlrun_file=os.path.join(data_path, suite if suite else '', file_name),
  336. src_file_alternative=os.path.join(yql_work_path(), suite if suite else '', file_name),
  337. def_attr=def_attr,
  338. should_exist=True
  339. ))
  340. return in_tables
  341. def get_tables(suite, cfg, data_path, def_attr=None):
  342. in_tables = []
  343. out_tables = []
  344. suite_dir = os.path.join(data_path, suite)
  345. res_dir = get_yql_dir('table_')
  346. for splitted in cfg:
  347. if splitted[0] == 'udf' and yatest.common.context.sanitize == 'undefined':
  348. pytest.skip("udf under ubsan")
  349. if len(splitted) == 4:
  350. type_name, table, file_name, format_name = splitted
  351. elif len(splitted) == 3:
  352. type_name, table, file_name = splitted
  353. format_name = 'yson'
  354. else:
  355. continue
  356. yqlrun_file = os.path.join(suite_dir, file_name)
  357. if type_name == 'in':
  358. in_tables.append(new_table(
  359. full_name='plato.' + table if '.' not in table else table,
  360. yqlrun_file=yqlrun_file,
  361. format_name=format_name,
  362. def_attr=def_attr,
  363. res_dir=res_dir
  364. ))
  365. if type_name == 'out':
  366. out_tables.append(new_table(
  367. full_name='plato.' + table if '.' not in table else table,
  368. yqlrun_file=yqlrun_file if os.path.exists(yqlrun_file) else None,
  369. res_dir=res_dir
  370. ))
  371. return in_tables, out_tables
  372. def get_supported_providers(cfg):
  373. providers = 'yt', 'kikimr', 'dq', 'hybrid'
  374. for item in cfg:
  375. if item[0] == 'providers':
  376. providers = [i.strip() for i in ''.join(item[1:]).split(',')]
  377. return providers
  378. def is_os_supported(cfg):
  379. for item in cfg:
  380. if item[0] == 'os':
  381. return any(sys.platform.startswith(_os) for _os in item[1].split(','))
  382. return True
  383. def is_xfail(cfg):
  384. for item in cfg:
  385. if item[0] == 'xfail':
  386. return True
  387. return False
  388. def is_skip_forceblocks(cfg):
  389. for item in cfg:
  390. if item[0] == 'skip_forceblocks':
  391. return True
  392. return False
  393. def is_canonize_peephole(cfg):
  394. for item in cfg:
  395. if item[0] == 'canonize_peephole':
  396. return True
  397. return False
  398. def is_peephole_use_blocks(cfg):
  399. for item in cfg:
  400. if item[0] == 'peephole_use_blocks':
  401. return True
  402. return False
  403. def is_canonize_lineage(cfg):
  404. for item in cfg:
  405. if item[0] == 'canonize_lineage':
  406. return True
  407. return False
  408. def is_canonize_yt(cfg):
  409. for item in cfg:
  410. if item[0] == 'canonize_yt':
  411. return True
  412. return False
  413. def is_with_final_result_issues(cfg):
  414. for item in cfg:
  415. if item[0] == 'with_final_result_issues':
  416. return True
  417. return False
  418. def skip_test_if_required(cfg):
  419. for item in cfg:
  420. if item[0] == 'skip_test':
  421. pytest.skip(item[1])
  422. def get_pragmas(cfg):
  423. pragmas = []
  424. for item in cfg:
  425. if item[0] == 'pragma':
  426. pragmas.append(' '.join(item))
  427. return pragmas
  428. def execute(
  429. klass=None,
  430. program=None,
  431. program_file=None,
  432. files=None,
  433. urls=None,
  434. run_sql=False,
  435. verbose=False,
  436. check_error=True,
  437. input_tables=None,
  438. output_tables=None,
  439. pretty_plan=True,
  440. parameters={},
  441. ):
  442. '''
  443. Executes YQL/SQL
  444. :param klass: KiKiMRForYQL if instance (default: YQLRun)
  445. :param program: string with YQL or SQL program
  446. :param program_file: file with YQL or SQL program (optional, if :param program: is None)
  447. :param files: dict like {'name': '/path'} with extra files
  448. :param urls: dict like {'name': url} with extra files urls
  449. :param run_sql: execute sql instead of yql
  450. :param verbose: log all results and diagnostics
  451. :param check_error: fail on non-zero exit code
  452. :param input_tables: list of Table (will be written if not exist)
  453. :param output_tables: list of Table (will be returned)
  454. :param pretty_plan: whether to use pretty printing for plan or not
  455. :param parameters: query parameters as dict like {name: json_value}
  456. :return: YQLExecResult
  457. '''
  458. if input_tables is None:
  459. input_tables = []
  460. else:
  461. assert isinstance(input_tables, list)
  462. if output_tables is None:
  463. output_tables = []
  464. klass.write_tables(input_tables + output_tables)
  465. res = klass.yql_exec(
  466. program=program,
  467. program_file=program_file,
  468. files=files,
  469. urls=urls,
  470. run_sql=run_sql,
  471. verbose=verbose,
  472. check_error=check_error,
  473. tables=(output_tables + input_tables),
  474. pretty_plan=pretty_plan,
  475. parameters=parameters
  476. )
  477. try:
  478. res_tables = klass.get_tables(output_tables)
  479. except Exception:
  480. if check_error:
  481. raise
  482. res_tables = {}
  483. return res, res_tables
  484. execute_sql = partial(execute, run_sql=True)
  485. def log(s):
  486. if get_param('STDERR'):
  487. print(s, file=sys.stderr)
  488. else:
  489. logger.debug(s)
  490. def tmpdir_module(request):
  491. return tempfile.mkdtemp(prefix='kikimr_test_')
  492. @pytest.fixture(name='tmpdir_module', scope='module')
  493. def tmpdir_module_fixture(request):
  494. return tmpdir_module(request)
  495. def escape_backslash(s):
  496. return s.replace('\\', '\\\\')
  497. def get_default_mount_point_config_content():
  498. return '''
  499. MountPoints {
  500. RootAlias: '/lib'
  501. MountPoint: '%s'
  502. Library: true
  503. }
  504. ''' % (
  505. escape_backslash(yql_source_path('yql/essentials/mount/lib'))
  506. )
  507. def get_mount_config_file(content=None):
  508. config = yql_output_path('mount.cfg')
  509. if not os.path.exists(config):
  510. with open(config, 'w') as f:
  511. f.write(content or get_default_mount_point_config_content())
  512. return config
  513. def run_command(program, cmd, tmpdir_module=None, stdin=None,
  514. check_exit_code=True, env=None, stdout=None):
  515. if tmpdir_module is None:
  516. tmpdir_module = tempfile.mkdtemp()
  517. stdin_stream = None
  518. if isinstance(stdin, six.string_types):
  519. with tempfile.NamedTemporaryFile(
  520. prefix='stdin_',
  521. dir=tmpdir_module,
  522. delete=False
  523. ) as stdin_file:
  524. stdin_file.write(stdin.encode() if isinstance(stdin, str) else stdin)
  525. stdin_stream = open(stdin_file.name)
  526. elif isinstance(stdin, io.IOBase):
  527. stdin_stream = stdin
  528. elif stdin is not None:
  529. assert 0, 'Strange stdin ' + repr(stdin)
  530. if isinstance(cmd, six.string_types):
  531. cmd = cmd.split()
  532. else:
  533. cmd = [str(c) for c in cmd]
  534. log(' '.join('\'%s\'' % c if ' ' in c else c for c in cmd))
  535. cmd = [program] + cmd
  536. stderr_stream = None
  537. stdout_stream = None
  538. if stdout:
  539. stdout_stream = stdout
  540. res = yatest.common.execute(
  541. cmd,
  542. cwd=tmpdir_module,
  543. stdin=stdin_stream,
  544. stdout=stdout_stream,
  545. stderr=stderr_stream,
  546. check_exit_code=check_exit_code,
  547. env=env,
  548. wait=True
  549. )
  550. if res.std_err:
  551. log(res.std_err)
  552. if res.std_out:
  553. log(res.std_out)
  554. return res
  555. def yson_to_csv(yson_content, columns=None, with_header=True, strict=False):
  556. import cyson as yson
  557. if columns:
  558. headers = sorted(columns)
  559. else:
  560. headers = set()
  561. for item in yson.loads(yson_content):
  562. headers.update(six.iterkeys(item))
  563. headers = sorted(headers)
  564. csv_content = []
  565. if with_header:
  566. csv_content.append(';'.join(headers))
  567. for item in yson.loads(yson_content):
  568. if strict and sorted(six.iterkeys(item)) != headers:
  569. return None
  570. csv_content.append(';'.join([str(item[h]).replace('YsonEntity', '').encode('string_escape') if h in item else '' for h in headers]))
  571. return '\n'.join(csv_content)
  572. udfs_lock = Lock()
  573. def get_udfs_path(extra_paths=None):
  574. essentials_udfs_build_path = yatest.common.build_path('yql/essentials/udfs')
  575. udfs_build_path = yatest.common.build_path('yql/udfs')
  576. ydb_udfs_build_path = yatest.common.build_path('contrib/ydb/library/yql/udfs')
  577. contrib_ydb_udfs_build_path = yatest.common.build_path('contrib/ydb/library/yql/udfs')
  578. rthub_udfs_build_path = yatest.common.build_path('robot/rthub/yql/udfs')
  579. kwyt_udfs_build_path = yatest.common.build_path('robot/kwyt/yql/udfs')
  580. try:
  581. udfs_bin_path = yatest.common.binary_path('yql/udfs')
  582. except Exception:
  583. udfs_bin_path = None
  584. try:
  585. udfs_project_path = yql_binary_path('yql/library/test_framework/udfs_deps')
  586. except Exception:
  587. udfs_project_path = None
  588. try:
  589. ydb_udfs_project_path = yql_binary_path('yql/essentials/tests/common/test_framework/udfs_deps')
  590. except Exception:
  591. ydb_udfs_project_path = None
  592. merged_udfs_path = yql_output_path('yql_udfs')
  593. with udfs_lock:
  594. if not os.path.isdir(merged_udfs_path):
  595. os.mkdir(merged_udfs_path)
  596. udfs_paths = [
  597. udfs_project_path,
  598. ydb_udfs_project_path,
  599. udfs_bin_path,
  600. essentials_udfs_build_path,
  601. udfs_build_path,
  602. ydb_udfs_build_path,
  603. contrib_ydb_udfs_build_path,
  604. rthub_udfs_build_path,
  605. kwyt_udfs_build_path
  606. ]
  607. if extra_paths is not None:
  608. udfs_paths += extra_paths
  609. log('process search UDF in: %s, %s, %s, %s' % (udfs_project_path, ydb_udfs_project_path, udfs_bin_path, udfs_build_path))
  610. for _udfs_path in udfs_paths:
  611. if _udfs_path:
  612. for dirpath, dnames, fnames in os.walk(_udfs_path):
  613. for f in fnames:
  614. if f.endswith('.so'):
  615. f = os.path.join(dirpath, f)
  616. if not os.path.exists(f) and os.path.lexists(f): # seems like broken symlink
  617. try:
  618. os.unlink(f)
  619. except OSError:
  620. pass
  621. link_name = os.path.join(merged_udfs_path, os.path.basename(f))
  622. if not os.path.exists(link_name):
  623. os.symlink(f, link_name)
  624. log('Added UDF: ' + f)
  625. return merged_udfs_path
  626. def get_test_prefix():
  627. return 'yql_tmp_' + hashlib.md5(yatest.common.context.test_name).hexdigest()
  628. def normalize_plan_ids(plan, no_detailed=False):
  629. remapOps = {}
  630. for node in sorted(filter(lambda n: n["type"] == "in", plan["Basic"]["nodes"]), key=lambda n: n.get("name")):
  631. if node["id"] not in remapOps:
  632. remapOps[node["id"]] = len(remapOps) + 1
  633. for node in plan["Basic"]["nodes"]:
  634. if node["id"] not in remapOps:
  635. remapOps[node["id"]] = len(remapOps) + 1
  636. def subst_basic(y):
  637. if isinstance(y, list):
  638. return [subst_basic(i) for i in y]
  639. if isinstance(y, dict):
  640. res = {}
  641. for k, v in six.iteritems(y):
  642. if k in {'source', 'target', 'id'}:
  643. res[k] = remapOps.get(v)
  644. elif k == "links":
  645. res[k] = sorted(subst_basic(v), key=lambda x: (x["source"], x["target"]))
  646. elif k == "nodes":
  647. res[k] = sorted(subst_basic(v), key=lambda x: x["id"])
  648. else:
  649. res[k] = subst_basic(v)
  650. return res
  651. return y
  652. # Sort and normalize input ids
  653. def subst_detailed(y):
  654. if isinstance(y, list):
  655. return [subst_detailed(i) for i in y]
  656. if isinstance(y, dict):
  657. res = {}
  658. for k, v in six.iteritems(y):
  659. if k == "DependsOn":
  660. res[k] = sorted([remapOps.get(i) for i in v])
  661. elif k == "Providers":
  662. res[k] = v
  663. elif k in {'OperationRoot', 'Id'}:
  664. res[k] = remapOps.get(v)
  665. else:
  666. res[k] = subst_detailed(v)
  667. return res
  668. return y
  669. if no_detailed:
  670. return {"Basic": subst_basic(plan["Basic"])}
  671. return {"Basic": subst_basic(plan["Basic"]), "Detailed": subst_detailed(plan["Detailed"])}
  672. def normalized_plan_stats(plan):
  673. renameMap = {
  674. "MrLMap!": "YtMap!",
  675. "MrMapReduce!": "YtMapReduce!",
  676. "MrLReduce!": "YtMapReduce!",
  677. "MrOrderedReduce!": "YtReduce!",
  678. "MrSort!": "YtSort!",
  679. "MrCopy!": "YtCopy!",
  680. "YtMerge!": "YtCopy!",
  681. "MrFill!": "YtFill!",
  682. "MrDrop!": "YtDropTable!",
  683. "YtTouch!": None,
  684. "MrReadTable!": None,
  685. "YtReadTable!": None,
  686. "MrPublish!": "YtPublish!",
  687. "MrReadTableScheme!": "YtReadTableScheme!",
  688. }
  689. normalizedStat = defaultdict(int)
  690. for op, stat in six.iteritems(plan["Detailed"]["OperationStats"]):
  691. renamedOp = renameMap.get(op, op)
  692. if renamedOp is not None:
  693. normalizedStat[renamedOp] += stat
  694. return normalizedStat
  695. def normalize_table_yson(y):
  696. from cyson import YsonEntity
  697. if isinstance(y, list):
  698. return [normalize_table_yson(i) for i in y]
  699. if isinstance(y, dict):
  700. normDict = OrderedDict()
  701. for k, v in sorted(six.iteritems(y), key=lambda x: x[0], reverse=True):
  702. if k == "_other":
  703. normDict[normalize_table_yson(k)] = sorted(normalize_table_yson(v))
  704. elif v != "Void" and v is not None and not isinstance(v, YsonEntity):
  705. normDict[normalize_table_yson(k)] = normalize_table_yson(v)
  706. return normDict
  707. return y
  708. def dump_table_yson(res_yson, sort=True):
  709. rows = normalize_table_yson(cyson.loads(b'[' + res_yson + b']'))
  710. if sort:
  711. rows = sorted(rows, key=cyson.dumps)
  712. return cyson.dumps(rows, format="pretty")
  713. def normalize_source_code_path(s):
  714. # remove contrib/
  715. s = re.sub(r'\b(contrib/)(ydb/library/yql.*)', r'\2', s)
  716. # replace line number in source code with 'xxx'
  717. s = re.sub(r'\b(yql/[\w/]+(?:\.cpp|\.h)):(?:\d+)', r'\1:xxx', s)
  718. return re.sub(r'(/lib/yql/[\w/]+(?:\.yql|\.yqls|\.sql)):(?:\d+):(?:\d+)', r'\1:xxx:yyy', s)
  719. def do_get_files(suite, config, data_path, config_key):
  720. files = dict()
  721. suite_dir = os.path.join(data_path, suite)
  722. res_dir = None
  723. for line in config:
  724. if line[0] == config_key:
  725. _, name, path = line
  726. userpath = find_user_file(suite, path, data_path)
  727. relpath = os.path.relpath(userpath, suite_dir)
  728. if os.path.exists(os.path.join('cwd', relpath)):
  729. path = relpath
  730. else:
  731. path = userpath
  732. if not res_dir:
  733. res_dir = get_yql_dir('file_')
  734. new_path = os.path.join(res_dir, os.path.basename(path))
  735. shutil.copyfile(path, new_path)
  736. files[name] = new_path
  737. return files
  738. def get_files(suite, config, data_path):
  739. return do_get_files(suite, config, data_path, 'file')
  740. def get_http_files(suite, config, data_path):
  741. return do_get_files(suite, config, data_path, 'http_file')
  742. def get_yt_files(suite, config, data_path):
  743. return do_get_files(suite, config, data_path, 'yt_file')
  744. def get_syntax_version(program):
  745. syntax_version_param = get_param('SYNTAX_VERSION')
  746. default_syntax_version = 1
  747. if 'syntax version 0' in program:
  748. return 0
  749. elif 'syntax version 1' in program:
  750. return 1
  751. elif syntax_version_param:
  752. return int(syntax_version_param)
  753. else:
  754. return default_syntax_version
  755. def ansi_lexer_enabled(program):
  756. return 'ansi_lexer' in program
  757. def pytest_get_current_part(path):
  758. folder = os.path.dirname(path)
  759. folder_name = os.path.basename(folder)
  760. assert folder_name.startswith('part'), "Current folder is {}".format(folder_name)
  761. current = int(folder_name[len('part'):])
  762. parent = os.path.dirname(folder)
  763. maxpart = max([int(part[len('part'):]) if part.startswith('part') else -1 for part in os.listdir(parent)])
  764. assert maxpart > 0, "Cannot find parts in {}".format(parent)
  765. return (current, 1 + maxpart)
  766. def normalize_result(res, sort):
  767. res = cyson.loads(res) if res else cyson.loads(b"[]")
  768. res = replace_vals(res)
  769. for r in res:
  770. for data in r[b'Write']:
  771. is_list = (b'Type' in data) and (data[b'Type'][0] == b'ListType')
  772. if is_list and sort and b'Data' in data:
  773. data[b'Data'] = sorted(data[b'Data'])
  774. if b'Ref' in data:
  775. data[b'Ref'] = []
  776. data[b'Truncated'] = True
  777. if is_list and b'Data' in data and len(data[b'Data']) == 0:
  778. del data[b'Data']
  779. return res
  780. def is_sorted_table(table):
  781. assert table.attr is not None
  782. for column in cyson.loads(table.attr)[b'schema']:
  783. if b'sort_order' in column:
  784. return True
  785. return False
  786. def is_unordered_result(res):
  787. path = res.results_file
  788. assert os.path.exists(path)
  789. with open(path, 'rb') as f:
  790. res = f.read()
  791. res = cyson.loads(res)
  792. for r in res:
  793. for data in r[b'Write']:
  794. if b'Unordered' in data:
  795. return True
  796. return False
  797. def stable_write(writer, node):
  798. if hasattr(node, 'attributes'):
  799. writer.begin_attributes()
  800. for k in sorted(node.attributes.keys()):
  801. writer.key(k)
  802. stable_write(writer, node.attributes[k])
  803. writer.end_attributes()
  804. if isinstance(node, list):
  805. writer.begin_list()
  806. for r in node:
  807. stable_write(writer, r)
  808. writer.end_list()
  809. return
  810. if isinstance(node, dict):
  811. writer.begin_map()
  812. for k in sorted(node.keys()):
  813. writer.key(k)
  814. stable_write(writer, node[k])
  815. writer.end_map()
  816. return
  817. writer.write(node)
  818. def stable_result_file(res):
  819. path = res.results_file
  820. assert os.path.exists(path)
  821. with open(path, 'rb') as f:
  822. res = f.read()
  823. res = cyson.loads(res)
  824. res = replace_vals(res)
  825. for r in res:
  826. for data in r[b'Write']:
  827. if b'Unordered' in r and b'Data' in data:
  828. data[b'Data'] = sorted(data[b'Data'], key=cyson.dumps)
  829. with open(path, 'wb') as f:
  830. writer = cyson.Writer(stream=cyson.OutputStream.from_file(f), format='pretty', mode='node')
  831. writer.begin_stream()
  832. stable_write(writer, res)
  833. writer.end_stream()
  834. with open(path, 'rb') as f:
  835. return f.read()
  836. def stable_table_file(table):
  837. path = table.file
  838. assert os.path.exists(path)
  839. assert table.attr is not None
  840. is_sorted = False
  841. for column in cyson.loads(table.attr)[b'schema']:
  842. if b'sort_order' in column:
  843. is_sorted = True
  844. break
  845. if not is_sorted:
  846. with open(path, 'rb') as f:
  847. r = cyson.Reader(cyson.InputStream.from_file(f), mode='list_fragment')
  848. lst = sorted(list(r.list_fragments()), key=cyson.dumps)
  849. with open(path, 'wb') as f:
  850. writer = cyson.Writer(stream=cyson.OutputStream.from_file(f), format='pretty', mode='list_fragment')
  851. writer.begin_stream()
  852. for r in lst:
  853. stable_write(writer, r)
  854. writer.end_stream()
  855. with open(path, 'rb') as f:
  856. return f.read()
  857. class LoggingDowngrade(object):
  858. def __init__(self, loggers, level=logging.CRITICAL):
  859. self.loggers = [(name, logging.getLogger(name).getEffectiveLevel()) for name in loggers]
  860. self.level = level
  861. def __enter__(self):
  862. self.prev_levels = []
  863. for name, _ in self.loggers:
  864. log = logging.getLogger(name)
  865. log.setLevel(self.level)
  866. return self
  867. def __exit__(self, exc_type, exc_value, tb):
  868. for name, level in self.loggers:
  869. log = logging.getLogger(name)
  870. log.setLevel(level)
  871. return True