# -*- coding: utf-8 -*- from __future__ import print_function, absolute_import, division import itertools from functools import partial import pytest import six from cyson import ( YsonEntity, InputStream, list_fragments, key_switched_list_fragments, Reader, UnicodeReader ) def filter_control_records(list): return [ _ for _ in list if not isinstance(_[2], YsonEntity) ] def canonize(val, as_unicode): _canonize = partial(canonize, as_unicode=as_unicode) if isinstance(val, six.binary_type) and as_unicode: return val.decode('utf8') elif isinstance(val, six.text_type) and not as_unicode: return val.encode('utf8') elif isinstance(val, (list, tuple)): return [_canonize(elem) for elem in val] elif isinstance(val, dict): return {_canonize(k): _canonize(v) for k, v in val.items()} return val @pytest.mark.parametrize( 'reader, as_unicode', [ [Reader, False], [UnicodeReader, True], ], ) @pytest.mark.parametrize( 'keep_control_records', [True, False] ) def test_row_index(keep_control_records, reader, as_unicode): _ = partial(canonize, as_unicode=as_unicode) data = b""" #; {a=1;b=2}; {a=2;b=3}; {a=3;b=4}; #; {a=-1;b=-1}; {a=-2;b=-2}; """ iter = list_fragments( stream=InputStream.from_string(data), Reader=reader, process_attributes=True, keep_control_records=keep_control_records, ) records = [(iter.range_index, iter.row_index, __) for __ in iter] etalon = [ (None, -1, YsonEntity(attributes={b'row_index': 0})), (None, 0, _({b'a': 1, b'b': 2})), (None, 1, _({b'a': 2, b'b': 3})), (None, 2, _({b'a': 3, b'b': 4})), (None, 9999, YsonEntity(attributes={b'row_index': 10000})), (None, 10000, _({b'a': -1, b'b': -1})), (None, 10001, _({b'a': -2, b'b': -2})), ] if not keep_control_records: etalon = filter_control_records(etalon) assert records == etalon @pytest.mark.parametrize( 'reader, as_unicode', [ [Reader, False], [UnicodeReader, True], ] ) @pytest.mark.parametrize( 'keep_control_records', [True, False], ) @pytest.mark.parametrize( 'parameter_name', ['process_attributes', 'process_table_index'] ) def test_range_index(parameter_name, keep_control_records, reader, as_unicode): _ = partial(canonize, as_unicode=as_unicode) data = b""" #; {a=1;b=2}; {a=2;b=3}; {a=3;b=4}; #; {a=-1;b=-1}; {a=-2;b=-2}; """ iter = list_fragments( stream=InputStream.from_string(data), Reader=reader, **{parameter_name: True, 'keep_control_records': keep_control_records} ) records = [(iter.range_index, iter.row_index, __) for __ in iter] etalon = [ (2, -1, YsonEntity(attributes={b'range_index': 2, b'row_index': 0})), (2, 0, _({b'a': 1, b'b': 2})), (2, 1, _({b'a': 2, b'b': 3})), (2, 2, _({b'a': 3, b'b': 4})), (0, 9999, YsonEntity(attributes={b'range_index': 0, b'row_index': 10000})), (0, 10000, _({b'a': -1, b'b': -1})), (0, 10001, _({b'a': -2, b'b': -2})), ] if not keep_control_records: etalon = filter_control_records(etalon) assert records == etalon @pytest.mark.parametrize( 'reader, as_unicode', [ [Reader, False], [UnicodeReader, True], ] ) def test_key_switch_first(reader, as_unicode): _ = partial(canonize, as_unicode=as_unicode) data = b""" #; {k=1;a=1;b=2}; {k=1;a=2;b=3}; {k=1;a=3;b=4}; #; {k=2;a=-1;b=-1}; {k=2;a=-2;b=-2}; """ iter = key_switched_list_fragments( stream=InputStream.from_string(data), Reader=reader, ) records = [list(__) for __ in iter] assert records == [ [ _({b'k': 1, b'a': 1, b'b': 2}), _({b'k': 1, b'a': 2, b'b': 3}), _({b'k': 1, b'a': 3, b'b': 4}), ], [ _({b'k': 2, b'a': -1, b'b': -1}), _({b'k': 2, b'a': -2, b'b': -2}), ] ] @pytest.mark.parametrize( 'reader, as_unicode', [ [Reader, False], [UnicodeReader, True], ] ) def test_key_switch_nofirst(reader, as_unicode): _ = partial(canonize, as_unicode=as_unicode) data = b""" {k=1;a=1;b=2}; {k=1;a=2;b=3}; {k=1;a=3;b=4}; #; {k=2;a=-1;b=-1}; {k=2;a=-2;b=-2}; """ iter = key_switched_list_fragments( stream=InputStream.from_string(data), Reader=reader ) records = [list(__) for __ in iter] assert records == [ [ _({b'k': 1, b'a': 1, b'b': 2}), _({b'k': 1, b'a': 2, b'b': 3}), _({b'k': 1, b'a': 3, b'b': 4}), ], [ _({b'k': 2, b'a': -1, b'b': -1}), _({b'k': 2, b'a': -2, b'b': -2}), ] ] @pytest.mark.parametrize( 'reader, as_unicode', [ [Reader, False], [UnicodeReader, True], ] ) def test_key_switch_exhaust_unused_records(reader, as_unicode): _ = partial(canonize, as_unicode=as_unicode) data = b""" {k=1;a=1;b=2}; {k=1;a=2;b=3}; {k=1;a=3;b=4}; #; {k=2;a=-1;b=-1}; {k=2;a=-2;b=-2}; """ iter = key_switched_list_fragments( stream=InputStream.from_string(data), Reader=reader, ) records = [] for group in iter: records.append( list(itertools.islice(group, 2)) ) assert records == [ [ _({b'k': 1, b'a': 1, b'b': 2}), _({b'k': 1, b'a': 2, b'b': 3}), ], [ _({b'k': 2, b'a': -1, b'b': -1}), _({b'k': 2, b'a': -2, b'b': -2}), ] ] @pytest.mark.parametrize('reader', [Reader, UnicodeReader]) def test_key_switch_empty(reader): assert list( key_switched_list_fragments( stream=InputStream.from_string(""), Reader=reader, ) ) == []