test_reader_writer.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. # -*- coding: utf-8 -*-
  2. from __future__ import print_function, absolute_import, division
  3. import io
  4. import math
  5. import pytest
  6. import six
  7. import sys
  8. from functools import partial
  9. from cyson import (
  10. dumps, loads, YsonInt64, YsonUInt64, UInt, Writer, OutputStream,
  11. UnicodeReader,
  12. )
  13. if six.PY2:
  14. NativeUInt = long # noqa: F821
  15. elif six.PY3:
  16. NativeUInt = UInt
  17. unicode = str
  18. long = int
  19. else:
  20. raise RuntimeError('Unsupported Python version')
  21. def canonize(value, as_unicode=False):
  22. _canonize = partial(canonize, as_unicode=as_unicode)
  23. if isinstance(value, (list, tuple)):
  24. return [_canonize(_) for _ in value]
  25. elif isinstance(value, dict):
  26. return {_canonize(k): _canonize(value[k]) for k in value}
  27. elif isinstance(value, unicode) and not as_unicode:
  28. return value.encode('utf8')
  29. elif isinstance(value, bytes) and as_unicode:
  30. return value.decode('utf8')
  31. return value
  32. def switch_string_type(string):
  33. if isinstance(string, bytes):
  34. return string.decode('utf8')
  35. elif isinstance(string, unicode):
  36. return string.encode('utf8')
  37. raise TypeError('expected unicode or bytes, got {!r}'.format(string))
  38. def coerce(obj, to, via=None):
  39. if via is None:
  40. via = to
  41. if isinstance(obj, to):
  42. return obj
  43. return via(obj)
  44. SKIP_PY3 = pytest.mark.skipif(six.PY3, reason='Makes no sense for Python3')
  45. if six.PY3 and sys.platform == 'win32':
  46. NUMPY_CASES = []
  47. else:
  48. import numpy as np
  49. NUMPY_CASES = [
  50. # numpy int
  51. np.int8(2 ** 7 - 1), np.int16(2 ** 15 - 1),
  52. np.int32(2 ** 31 - 1), np.int64(2 ** 63 - 1),
  53. # numpy uint
  54. np.uint8(2 ** 8 - 1), np.uint16(2 ** 16 - 1),
  55. np.uint32(2 ** 32 - 1), np.uint64(2 ** 64 - 1),
  56. # numpy float
  57. np.float16(100.0), np.float32(100.0), np.float64(100.0),
  58. ]
  59. CASES = [
  60. # NoneType
  61. None,
  62. # boolean
  63. True, False,
  64. # int
  65. 0, 1, -1, int(2 ** 63 - 1), int(-2 ** 63),
  66. # float
  67. 0.0, 100.0, -100.0, float('inf'), float('-inf'),
  68. # bytes
  69. b'', b'hello', u'Привет'.encode('utf8'),
  70. # unicode
  71. u'', u'hello', u'Привет',
  72. # list
  73. [], [0], [1, 'hello'], [17, 'q'] * 100, [b'bytes'],
  74. # tuple
  75. (), (0,), (1, 'hello'), (17, 'q') * 100, (b'bytes',),
  76. # dict
  77. {}, {'a': 'b'}, {'a': 17}, {'a': [1, 2, 3]}, {b'a': 1, u'b': b'a'}
  78. ] + NUMPY_CASES
  79. @pytest.mark.parametrize('format', ['binary', 'text', 'pretty'])
  80. @pytest.mark.parametrize('value', CASES)
  81. def test_roundtrip(value, format):
  82. encoded = dumps(value, format)
  83. decoded = loads(encoded)
  84. assert encoded == dumps(value, switch_string_type(format))
  85. assert decoded == canonize(value)
  86. # NOTE: roundtrip test doesn't work for NaN (NaN != NaN)
  87. @pytest.mark.parametrize('format', ['binary', 'text', 'pretty'])
  88. def test_nan(format):
  89. encoded = dumps(float('nan'), format)
  90. decoded = loads(encoded)
  91. assert encoded == dumps(float('nan'), switch_string_type(format))
  92. assert math.isnan(decoded)
  93. @SKIP_PY3
  94. @pytest.mark.parametrize('format', ['binary', 'text', 'pretty'])
  95. @pytest.mark.parametrize(
  96. 'value', [long(0), long(1), long(2 ** 63), long(2 ** 64 - 1)]
  97. )
  98. def test_long_roundtrip(value, format):
  99. encoded = dumps(value, format)
  100. decoded = loads(encoded)
  101. assert encoded == dumps(value, switch_string_type(format))
  102. assert decoded == value
  103. @pytest.mark.parametrize(
  104. 'value', [NativeUInt(0), NativeUInt(111), NativeUInt(2 ** 63), NativeUInt(2 ** 64 - 1)]
  105. )
  106. @pytest.mark.parametrize('format', ['binary', 'text', 'pretty'])
  107. def test_readwrite_uint64(value, format):
  108. dumped_uint64 = dumps(coerce(value, YsonUInt64), format=format)
  109. loaded_uint64 = loads(dumped_uint64)
  110. assert type(value) is NativeUInt
  111. assert type(loaded_uint64) is NativeUInt
  112. assert dumps(value, format=format) == dumped_uint64
  113. @pytest.mark.parametrize('value', [int(-2 ** 63), -111, 0, 111, int(2 ** 63 - 1)])
  114. @pytest.mark.parametrize('format', ['binary', 'text', 'pretty'])
  115. def test_readwrite_int64(value, format):
  116. dumped_int64 = dumps(YsonInt64(value), format=format)
  117. loaded_int64 = loads(dumped_int64)
  118. assert type(value) is int
  119. assert type(loaded_int64) is int
  120. assert dumps(value, format=format) == dumped_int64
  121. @SKIP_PY3
  122. def test_long_overflow():
  123. with pytest.raises(OverflowError):
  124. dumps(long(-1))
  125. with pytest.raises(OverflowError):
  126. dumps(long(2**64))
  127. @pytest.mark.parametrize('value', [2 ** 63, -2 ** 63 - 1])
  128. def test_int64_overflow(value):
  129. with pytest.raises(OverflowError):
  130. int64_value = YsonInt64(value)
  131. dumps(int64_value)
  132. if six.PY3:
  133. with pytest.raises(OverflowError):
  134. dumps(value)
  135. @pytest.mark.parametrize('value', [2 ** 64, 2 ** 100])
  136. def test_uint64_overflow(value):
  137. with pytest.raises(OverflowError):
  138. uint64_value = YsonUInt64(value)
  139. dumps(uint64_value)
  140. @pytest.mark.parametrize('format', ['binary', 'text', 'pretty'])
  141. def test_force_write_sequence(format):
  142. class Sequence(object):
  143. def __init__(self, seq):
  144. self._seq = seq
  145. def __getitem__(self, index):
  146. return self._seq[index]
  147. def __len__(self):
  148. return len(self._seq)
  149. sequence = [1, 1.1, None, b'xyz']
  150. sink = io.BytesIO()
  151. writer = Writer(OutputStream.from_file(sink), format=format)
  152. writer.begin_stream()
  153. writer.list(Sequence(sequence))
  154. writer.end_stream()
  155. assert sink.getvalue() == dumps(sequence, format)
  156. @pytest.mark.parametrize('format', ['binary', 'text', 'pretty'])
  157. def test_force_write_mapping(format):
  158. class Mapping(object):
  159. def __init__(self, mapping):
  160. self._mapping = mapping
  161. def __getitem__(self, key):
  162. return self._mapping[key]
  163. def keys(self):
  164. return self._mapping.keys()
  165. mapping = {b'a': 1, b'b': 1.1, b'c': None, b'd': b'some'}
  166. sink = io.BytesIO()
  167. writer = Writer(OutputStream.from_file(sink), format=format)
  168. writer.begin_stream()
  169. writer.map(Mapping(mapping))
  170. writer.end_stream()
  171. assert sink.getvalue() == dumps(mapping, format)
  172. @pytest.mark.parametrize('format', ['binary', 'text', 'pretty'])
  173. @pytest.mark.parametrize('value', CASES)
  174. def test_unicode_reader(value, format):
  175. expected = canonize(value, as_unicode=True)
  176. got = loads(dumps(value, format), UnicodeReader)
  177. assert expected == got
  178. def test_unicode_reader_raises_unicode_decode_error():
  179. not_decodable = b'\x80\x81'
  180. with pytest.raises(UnicodeDecodeError):
  181. loads(dumps(not_decodable, format='binary'), UnicodeReader)
  182. def test_unicode_reader_decodes_object_with_attributes():
  183. data = b'{"a" = "b"; "c" = <"foo" = "bar">"d"}'
  184. expected = {u"a": u"b", u"c": u"d"}
  185. assert loads(data, UnicodeReader) == expected