test_control_attributes.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. # -*- coding: utf-8 -*-
  2. from __future__ import print_function, absolute_import, division
  3. import itertools
  4. from functools import partial
  5. import pytest
  6. import six
  7. from cyson import (
  8. YsonEntity, InputStream,
  9. list_fragments, key_switched_list_fragments,
  10. Reader, UnicodeReader
  11. )
  12. def filter_control_records(list):
  13. return [
  14. _ for _ in list
  15. if not isinstance(_[2], YsonEntity)
  16. ]
  17. def canonize(val, as_unicode):
  18. _canonize = partial(canonize, as_unicode=as_unicode)
  19. if isinstance(val, six.binary_type) and as_unicode:
  20. return val.decode('utf8')
  21. elif isinstance(val, six.text_type) and not as_unicode:
  22. return val.encode('utf8')
  23. elif isinstance(val, (list, tuple)):
  24. return [_canonize(elem) for elem in val]
  25. elif isinstance(val, dict):
  26. return {_canonize(k): _canonize(v) for k, v in val.items()}
  27. return val
  28. @pytest.mark.parametrize(
  29. 'reader, as_unicode', [
  30. [Reader, False],
  31. [UnicodeReader, True],
  32. ],
  33. )
  34. @pytest.mark.parametrize(
  35. 'keep_control_records', [True, False]
  36. )
  37. def test_row_index(keep_control_records, reader, as_unicode):
  38. _ = partial(canonize, as_unicode=as_unicode)
  39. data = b"""
  40. <row_index=0>#;
  41. {a=1;b=2};
  42. {a=2;b=3};
  43. {a=3;b=4};
  44. <row_index=10000>#;
  45. {a=-1;b=-1};
  46. {a=-2;b=-2};
  47. """
  48. iter = list_fragments(
  49. stream=InputStream.from_string(data),
  50. Reader=reader,
  51. process_attributes=True,
  52. keep_control_records=keep_control_records,
  53. )
  54. records = [(iter.range_index, iter.row_index, __) for __ in iter]
  55. etalon = [
  56. (None, -1, YsonEntity(attributes={b'row_index': 0})),
  57. (None, 0, _({b'a': 1, b'b': 2})),
  58. (None, 1, _({b'a': 2, b'b': 3})),
  59. (None, 2, _({b'a': 3, b'b': 4})),
  60. (None, 9999, YsonEntity(attributes={b'row_index': 10000})),
  61. (None, 10000, _({b'a': -1, b'b': -1})),
  62. (None, 10001, _({b'a': -2, b'b': -2})),
  63. ]
  64. if not keep_control_records:
  65. etalon = filter_control_records(etalon)
  66. assert records == etalon
  67. @pytest.mark.parametrize(
  68. 'reader, as_unicode', [
  69. [Reader, False],
  70. [UnicodeReader, True],
  71. ]
  72. )
  73. @pytest.mark.parametrize(
  74. 'keep_control_records', [True, False],
  75. )
  76. @pytest.mark.parametrize(
  77. 'parameter_name',
  78. ['process_attributes', 'process_table_index']
  79. )
  80. def test_range_index(parameter_name, keep_control_records, reader, as_unicode):
  81. _ = partial(canonize, as_unicode=as_unicode)
  82. data = b"""
  83. <range_index=2; row_index=0>#;
  84. {a=1;b=2};
  85. {a=2;b=3};
  86. {a=3;b=4};
  87. <range_index=0; row_index=10000>#;
  88. {a=-1;b=-1};
  89. {a=-2;b=-2};
  90. """
  91. iter = list_fragments(
  92. stream=InputStream.from_string(data),
  93. Reader=reader,
  94. **{parameter_name: True, 'keep_control_records': keep_control_records}
  95. )
  96. records = [(iter.range_index, iter.row_index, __) for __ in iter]
  97. etalon = [
  98. (2, -1, YsonEntity(attributes={b'range_index': 2, b'row_index': 0})),
  99. (2, 0, _({b'a': 1, b'b': 2})),
  100. (2, 1, _({b'a': 2, b'b': 3})),
  101. (2, 2, _({b'a': 3, b'b': 4})),
  102. (0, 9999, YsonEntity(attributes={b'range_index': 0, b'row_index': 10000})),
  103. (0, 10000, _({b'a': -1, b'b': -1})),
  104. (0, 10001, _({b'a': -2, b'b': -2})),
  105. ]
  106. if not keep_control_records:
  107. etalon = filter_control_records(etalon)
  108. assert records == etalon
  109. @pytest.mark.parametrize(
  110. 'reader, as_unicode', [
  111. [Reader, False],
  112. [UnicodeReader, True],
  113. ]
  114. )
  115. def test_key_switch_first(reader, as_unicode):
  116. _ = partial(canonize, as_unicode=as_unicode)
  117. data = b"""
  118. <key_switch=True>#;
  119. {k=1;a=1;b=2};
  120. {k=1;a=2;b=3};
  121. {k=1;a=3;b=4};
  122. <key_switch=True>#;
  123. {k=2;a=-1;b=-1};
  124. {k=2;a=-2;b=-2};
  125. """
  126. iter = key_switched_list_fragments(
  127. stream=InputStream.from_string(data),
  128. Reader=reader,
  129. )
  130. records = [list(__) for __ in iter]
  131. assert records == [
  132. [
  133. _({b'k': 1, b'a': 1, b'b': 2}),
  134. _({b'k': 1, b'a': 2, b'b': 3}),
  135. _({b'k': 1, b'a': 3, b'b': 4}),
  136. ],
  137. [
  138. _({b'k': 2, b'a': -1, b'b': -1}),
  139. _({b'k': 2, b'a': -2, b'b': -2}),
  140. ]
  141. ]
  142. @pytest.mark.parametrize(
  143. 'reader, as_unicode', [
  144. [Reader, False],
  145. [UnicodeReader, True],
  146. ]
  147. )
  148. def test_key_switch_nofirst(reader, as_unicode):
  149. _ = partial(canonize, as_unicode=as_unicode)
  150. data = b"""
  151. {k=1;a=1;b=2};
  152. {k=1;a=2;b=3};
  153. {k=1;a=3;b=4};
  154. <key_switch=True>#;
  155. {k=2;a=-1;b=-1};
  156. {k=2;a=-2;b=-2};
  157. """
  158. iter = key_switched_list_fragments(
  159. stream=InputStream.from_string(data),
  160. Reader=reader
  161. )
  162. records = [list(__) for __ in iter]
  163. assert records == [
  164. [
  165. _({b'k': 1, b'a': 1, b'b': 2}),
  166. _({b'k': 1, b'a': 2, b'b': 3}),
  167. _({b'k': 1, b'a': 3, b'b': 4}),
  168. ],
  169. [
  170. _({b'k': 2, b'a': -1, b'b': -1}),
  171. _({b'k': 2, b'a': -2, b'b': -2}),
  172. ]
  173. ]
  174. @pytest.mark.parametrize(
  175. 'reader, as_unicode', [
  176. [Reader, False],
  177. [UnicodeReader, True],
  178. ]
  179. )
  180. def test_key_switch_exhaust_unused_records(reader, as_unicode):
  181. _ = partial(canonize, as_unicode=as_unicode)
  182. data = b"""
  183. {k=1;a=1;b=2};
  184. {k=1;a=2;b=3};
  185. {k=1;a=3;b=4};
  186. <key_switch=True>#;
  187. {k=2;a=-1;b=-1};
  188. {k=2;a=-2;b=-2};
  189. """
  190. iter = key_switched_list_fragments(
  191. stream=InputStream.from_string(data),
  192. Reader=reader,
  193. )
  194. records = []
  195. for group in iter:
  196. records.append(
  197. list(itertools.islice(group, 2))
  198. )
  199. assert records == [
  200. [
  201. _({b'k': 1, b'a': 1, b'b': 2}),
  202. _({b'k': 1, b'a': 2, b'b': 3}),
  203. ],
  204. [
  205. _({b'k': 2, b'a': -1, b'b': -1}),
  206. _({b'k': 2, b'a': -2, b'b': -2}),
  207. ]
  208. ]
  209. @pytest.mark.parametrize('reader', [Reader, UnicodeReader])
  210. def test_key_switch_empty(reader):
  211. assert list(
  212. key_switched_list_fragments(
  213. stream=InputStream.from_string(""),
  214. Reader=reader,
  215. )
  216. ) == []