test_stream_1.py 18 KB


  1. import lz4.stream
  2. import pytest
  3. import sys
  4. import os
  5. if sys.version_info < (3, ):
  6. from struct import pack, unpack
  7. def _get_format(length, byteorder, signed):
  8. _order = {'l': '<', 'b': '>'}
  9. _fmt = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
  10. _sign = {True: lambda x: x.lower(), False: lambda x: x.upper()}
  11. return _sign[signed](_order[byteorder[0].lower()] + _fmt[length])
  12. def int_to_bytes(value, length=4, byteorder='little', signed=False):
  13. return bytearray(pack(_get_format(length, byteorder, signed), value))
  14. def int_from_bytes(bytes, byteorder='little', signed=False):
  15. return unpack(_get_format(len(bytes), byteorder, signed), bytes)[0]
  16. else:
  17. def int_to_bytes(value, length=4, byteorder='little', signed=False):
  18. return value.to_bytes(length, byteorder, signed=signed)
  19. def int_from_bytes(bytes, byteorder='little', signed=False):
  20. return int.from_bytes(bytes, byteorder, signed=signed)
  21. # This test requires allocating a big lump of memory. In order to
  22. # avoid a massive memory allocation during byte compilation, we have
  23. # to declare a variable for the size of the buffer we're going to
  24. # create outside the scope of the function below. See:
  25. # https://bugs.python.org/issue21074
  26. _4GB = 0x100000000 # 4GB
  27. def compress(x, c_kwargs, return_block_offset=False, check_block_type=False):
  28. o = [0, ]
  29. if c_kwargs.get('return_bytearray', False):
  30. c = bytearray()
  31. else:
  32. c = bytes()
  33. with lz4.stream.LZ4StreamCompressor(**c_kwargs) as proc:
  34. for start in range(0, len(x), c_kwargs['buffer_size']):
  35. chunk = x[start:start + c_kwargs['buffer_size']]
  36. block = proc.compress(chunk)
  37. c += block
  38. if return_block_offset:
  39. o.append(len(c))
  40. if check_block_type:
  41. assert isinstance(block, c.__class__)
  42. if return_block_offset:
  43. return c, o
  44. else:
  45. return c
  46. def decompress(x, d_kwargs, check_chunk_type=False):
  47. if d_kwargs.get('return_bytearray', False):
  48. d = bytearray()
  49. else:
  50. d = bytes()
  51. with lz4.stream.LZ4StreamDecompressor(**d_kwargs) as proc:
  52. start = 0
  53. while start < len(x):
  54. block = proc.get_block(x[start:])
  55. chunk = proc.decompress(block)
  56. d += chunk
  57. start += d_kwargs['store_comp_size'] + len(block)
  58. if check_chunk_type:
  59. assert isinstance(chunk, d.__class__)
  60. return d
  61. def test_invalid_config_c_1():
  62. c_kwargs = {}
  63. c_kwargs['strategy'] = "ring_buffer"
  64. c_kwargs['buffer_size'] = 1024
  65. with pytest.raises(NotImplementedError):
  66. lz4.stream.LZ4StreamCompressor(**c_kwargs)
  67. def test_invalid_config_d_1():
  68. d_kwargs = {}
  69. d_kwargs['strategy'] = "ring_buffer"
  70. d_kwargs['buffer_size'] = 1024
  71. with pytest.raises(NotImplementedError):
  72. lz4.stream.LZ4StreamDecompressor(**d_kwargs)
  73. def test_invalid_config_c_2():
  74. c_kwargs = {}
  75. c_kwargs['strategy'] = "foo"
  76. c_kwargs['buffer_size'] = 1024
  77. with pytest.raises(ValueError):
  78. lz4.stream.LZ4StreamCompressor(**c_kwargs)
  79. def test_invalid_config_d_2():
  80. d_kwargs = {}
  81. d_kwargs['strategy'] = "foo"
  82. d_kwargs['buffer_size'] = 1024
  83. with pytest.raises(ValueError):
  84. lz4.stream.LZ4StreamDecompressor(**d_kwargs)
  85. def test_invalid_config_c_3(store_comp_size):
  86. c_kwargs = {}
  87. c_kwargs['strategy'] = "double_buffer"
  88. c_kwargs['buffer_size'] = 1024
  89. c_kwargs['store_comp_size'] = store_comp_size['store_comp_size'] + 5
  90. with pytest.raises(ValueError):
  91. lz4.stream.LZ4StreamCompressor(**c_kwargs)
  92. def test_invalid_config_d_3(store_comp_size):
  93. d_kwargs = {}
  94. d_kwargs['strategy'] = "double_buffer"
  95. d_kwargs['buffer_size'] = 1024
  96. d_kwargs['store_comp_size'] = store_comp_size['store_comp_size'] + 5
  97. with pytest.raises(ValueError):
  98. lz4.stream.LZ4StreamDecompressor(**d_kwargs)
  99. def test_invalid_config_c_4(store_comp_size):
  100. c_kwargs = {}
  101. c_kwargs['strategy'] = "double_buffer"
  102. c_kwargs['buffer_size'] = 1 << (8 * store_comp_size['store_comp_size'])
  103. c_kwargs.update(store_comp_size)
  104. if store_comp_size['store_comp_size'] >= 4:
  105. # No need for skiping this test case, since arguments check is
  106. # expecting to raise an error.
  107. # Make sure the page size is larger than what the input bound will be,
  108. # but still fit in 4 bytes
  109. c_kwargs['buffer_size'] -= 1
  110. if c_kwargs['buffer_size'] > lz4.stream.LZ4_MAX_INPUT_SIZE:
  111. message = r"^Invalid buffer_size argument: \d+. Cannot define output buffer size. Must be lesser or equal to 2113929216$" # noqa
  112. err_class = ValueError
  113. else:
  114. message = r"^Inconsistent buffer_size/store_comp_size values. Maximal compressed length \(\d+\) cannot fit in a \d+ byte-long integer$" # noqa
  115. err_class = lz4.stream.LZ4StreamError
  116. with pytest.raises(err_class, match=message):
  117. lz4.stream.LZ4StreamCompressor(**c_kwargs)
  118. def test_invalid_config_d_4(store_comp_size):
  119. d_kwargs = {}
  120. d_kwargs['strategy'] = "double_buffer"
  121. d_kwargs['buffer_size'] = 1 << (8 * store_comp_size['store_comp_size'])
  122. d_kwargs.update(store_comp_size)
  123. if store_comp_size['store_comp_size'] >= 4:
  124. if sys.maxsize < 0xffffffff:
  125. pytest.skip('Py_ssize_t too small for this test')
  126. # Make sure the page size is larger than what the input bound will be,
  127. # but still fit in 4 bytes
  128. d_kwargs['buffer_size'] -= 1
  129. # No failure expected during instanciation/initialization
  130. lz4.stream.LZ4StreamDecompressor(**d_kwargs)
  131. def test_invalid_config_c_5():
  132. c_kwargs = {}
  133. c_kwargs['strategy'] = "double_buffer"
  134. c_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE
  135. if sys.maxsize < 0xffffffff:
  136. pytest.skip('Py_ssize_t too small for this test')
  137. # No failure expected
  138. lz4.stream.LZ4StreamCompressor(**c_kwargs)
  139. c_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE + 1
  140. with pytest.raises(ValueError):
  141. lz4.stream.LZ4StreamCompressor(**c_kwargs)
  142. # Make sure the page size is larger than what the input bound will be,
  143. # but still fit in 4 bytes
  144. c_kwargs['buffer_size'] = _4GB - 1 # 4GB - 1 (to fit in 4 bytes)
  145. with pytest.raises(ValueError):
  146. lz4.stream.LZ4StreamCompressor(**c_kwargs)
  147. def test_invalid_config_d_5():
  148. d_kwargs = {}
  149. d_kwargs['strategy'] = "double_buffer"
  150. # No failure expected during instanciation/initialization
  151. d_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE
  152. if sys.maxsize < 0xffffffff:
  153. pytest.skip('Py_ssize_t too small for this test')
  154. lz4.stream.LZ4StreamDecompressor(**d_kwargs)
  155. # No failure expected during instanciation/initialization
  156. d_kwargs['buffer_size'] = lz4.stream.LZ4_MAX_INPUT_SIZE + 1
  157. if sys.maxsize < 0xffffffff:
  158. pytest.skip('Py_ssize_t too small for this test')
  159. lz4.stream.LZ4StreamDecompressor(**d_kwargs)
  160. # No failure expected during instanciation/initialization
  161. d_kwargs['buffer_size'] = _4GB - 1 # 4GB - 1 (to fit in 4 bytes)
  162. if sys.maxsize < 0xffffffff:
  163. pytest.skip('Py_ssize_t too small for this test')
  164. lz4.stream.LZ4StreamDecompressor(**d_kwargs)
  165. def test_decompress_corrupted_input_1():
  166. c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  167. d_kwargs = {}
  168. d_kwargs.update(c_kwargs)
  169. data = compress(b'A' * 512, c_kwargs)
  170. decompress(data, d_kwargs)
  171. message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$"
  172. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  173. decompress(data[4:], d_kwargs)
  174. def test_decompress_corrupted_input_2():
  175. c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  176. d_kwargs = {}
  177. d_kwargs.update(c_kwargs)
  178. data = compress(b'A' * 512, c_kwargs)
  179. decompress(data, d_kwargs)
  180. message = r"^Decompression failed. error: \d+$"
  181. # Block size corruption in the first block
  182. # Block size longer than actual:
  183. data = int_to_bytes(int_from_bytes(data[:4], 'little') + 1, 4, 'little') + data[4:]
  184. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  185. decompress(data, d_kwargs)
  186. # Block size shorter than actual:
  187. data = int_to_bytes(int_from_bytes(data[:4], 'little') - 2, 4, 'little') + data[4:]
  188. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  189. decompress(data, d_kwargs)
  190. def test_decompress_corrupted_input_3():
  191. c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  192. d_kwargs = {}
  193. d_kwargs.update(c_kwargs)
  194. data = compress(b'A' * 512, c_kwargs)
  195. decompress(data, d_kwargs)
  196. message = r"^Decompression failed. error: \d+$"
  197. # Block size corruption in a block in the middle of the stream
  198. offset = 4 + int_from_bytes(data[:4], 'little')
  199. # Block size longer than actual:
  200. block_len = int_from_bytes(data[offset:offset + 4], 'little') + 1
  201. data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:]
  202. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  203. decompress(data, d_kwargs)
  204. # Block size shorter than actual:
  205. block_len = int_from_bytes(data[offset:offset + 4], 'little') - 2
  206. data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:]
  207. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  208. decompress(data, d_kwargs)
  209. def test_decompress_corrupted_input_4():
  210. c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  211. d_kwargs = {}
  212. d_kwargs.update(c_kwargs)
  213. data = compress(b'A' * 256, c_kwargs)
  214. decompress(data, d_kwargs)
  215. # Block size corruption in the last block of the stream
  216. offset = 4 + int_from_bytes(data[:4], 'little')
  217. # Block size longer than actual:
  218. block_len = int_from_bytes(data[offset:offset + 4], 'little') + 1
  219. data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:]
  220. message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$"
  221. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  222. decompress(data, d_kwargs)
  223. # Block size shorter than actual:
  224. block_len = int_from_bytes(data[offset:offset + 4], 'little') - 2
  225. data = data[:offset] + int_to_bytes(block_len, 4, 'little') + data[offset + 4:]
  226. message = r"^Decompression failed. error: \d+$"
  227. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  228. decompress(data, d_kwargs)
  229. def test_decompress_truncated():
  230. c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  231. d_kwargs = {}
  232. d_kwargs.update(c_kwargs)
  233. input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
  234. compressed, block_offsets = compress(input_data, c_kwargs, return_block_offset=True)
  235. last_block_offset = 0
  236. for n in range(len(compressed)):
  237. if n in block_offsets:
  238. # end of input matches end of block, so decompression must succeed
  239. last_block_offset = n
  240. decompress(compressed[:n], d_kwargs)
  241. else:
  242. # end of input does not match end of block, so decompression failure is expected
  243. if n - last_block_offset < c_kwargs['store_comp_size']:
  244. message = "^Invalid source, too small for holding any block$"
  245. else:
  246. message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$"
  247. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  248. decompress(compressed[:n], d_kwargs)
  249. # This next test is probably redundant given test_decompress_truncated above
  250. # since the trailing bytes will be considered as the truncated last block, but
  251. # we will keep them for now
  252. def test_decompress_with_trailer():
  253. c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  254. d_kwargs = {}
  255. d_kwargs.update(c_kwargs)
  256. data = b'A' * 64
  257. comp = compress(data, c_kwargs)
  258. message = "^Invalid source, too small for holding any block$"
  259. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  260. decompress(comp + b'A', d_kwargs)
  261. message = r"^Requested input size \(\d+\) larger than source size \(\d+\)$"
  262. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  263. decompress(comp + b'A' * 10, d_kwargs)
  264. for n in range(1, 10):
  265. if n < d_kwargs['store_comp_size']:
  266. message = "^Invalid source, too small for holding any block$"
  267. else:
  268. message = r"^Decompression failed. error: \d+$"
  269. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  270. decompress(comp + b'\x00' * n, d_kwargs)
  271. def test_unicode():
  272. if sys.version_info < (3,):
  273. return # skip
  274. c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  275. d_kwargs = {}
  276. d_kwargs.update(c_kwargs)
  277. DATA = b'x'
  278. with pytest.raises(TypeError):
  279. compress(DATA.decode('latin1'), c_kwargs)
  280. decompress(compress(DATA, c_kwargs).decode('latin1'), d_kwargs)
  281. # These next two are probably redundant given test_1 above but we'll keep them
  282. # for now
  283. def test_return_bytearray():
  284. if sys.version_info < (3,):
  285. return # skip
  286. c_kwargs_r = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  287. c_kwargs = {'return_bytearray': True}
  288. c_kwargs.update(c_kwargs_r)
  289. d_kwargs = {}
  290. d_kwargs.update(c_kwargs)
  291. data = os.urandom(128 * 1024) # Read 128kb
  292. compressed = compress(data, c_kwargs_r, check_block_type=True)
  293. b = compress(data, c_kwargs, check_block_type=True)
  294. assert isinstance(b, bytearray)
  295. assert bytes(b) == compressed
  296. b = decompress(compressed, d_kwargs, check_chunk_type=True)
  297. assert isinstance(b, bytearray)
  298. assert bytes(b) == data
  299. def test_memoryview():
  300. if sys.version_info < (2, 7):
  301. return # skip
  302. c_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  303. d_kwargs = {}
  304. d_kwargs.update(c_kwargs)
  305. data = os.urandom(128 * 1024) # Read 128kb
  306. compressed = compress(data, c_kwargs)
  307. assert compress(memoryview(data), c_kwargs) == compressed
  308. assert decompress(memoryview(compressed), d_kwargs) == data
  309. def test_with_dict_none():
  310. kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  311. input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
  312. for mode in ['default', 'high_compression']:
  313. c_kwargs = {'mode': mode, 'dictionary': None}
  314. c_kwargs.update(kwargs)
  315. d_kwargs = {}
  316. d_kwargs.update(kwargs)
  317. assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
  318. c_kwargs = {'mode': mode}
  319. c_kwargs.update(kwargs)
  320. d_kwargs = {'dictionary': None}
  321. d_kwargs.update(kwargs)
  322. assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
  323. c_kwargs = {'mode': mode, 'dictionary': b''}
  324. c_kwargs.update(kwargs)
  325. d_kwargs = {}
  326. d_kwargs.update(kwargs)
  327. assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
  328. c_kwargs = {'mode': mode}
  329. c_kwargs.update(kwargs)
  330. d_kwargs = {'dictionary': b''}
  331. d_kwargs.update(kwargs)
  332. assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
  333. c_kwargs = {'mode': mode, 'dictionary': ''}
  334. c_kwargs.update(kwargs)
  335. d_kwargs = {}
  336. d_kwargs.update(kwargs)
  337. assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
  338. c_kwargs = {'mode': mode}
  339. c_kwargs.update(kwargs)
  340. d_kwargs = {'dictionary': ''}
  341. d_kwargs.update(kwargs)
  342. assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
  343. def test_with_dict():
  344. kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  345. input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
  346. dict1 = input_data[10:30]
  347. dict2 = input_data[20:40]
  348. message = r"^Decompression failed. error: \d+$"
  349. for mode in ['default', 'high_compression']:
  350. c_kwargs = {'mode': mode, 'dictionary': dict1}
  351. c_kwargs.update(kwargs)
  352. compressed = compress(input_data, c_kwargs)
  353. d_kwargs = {}
  354. d_kwargs.update(kwargs)
  355. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  356. decompress(compressed, d_kwargs)
  357. d_kwargs = {'dictionary': dict1[:2]}
  358. d_kwargs.update(kwargs)
  359. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  360. decompress(compressed, d_kwargs)
  361. d_kwargs = {'dictionary': dict2}
  362. d_kwargs.update(kwargs)
  363. assert decompress(compressed, d_kwargs) != input_data
  364. d_kwargs = {'dictionary': dict1}
  365. d_kwargs.update(kwargs)
  366. assert decompress(compressed, d_kwargs) == input_data
  367. c_kwargs = {}
  368. c_kwargs.update(kwargs)
  369. d_kwargs = {'dictionary': dict1}
  370. d_kwargs.update(kwargs)
  371. assert decompress(compress(input_data, c_kwargs), d_kwargs) == input_data
  372. def test_known_decompress_1():
  373. d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  374. output = b''
  375. input = b'\x00\x00\x00\x00'
  376. message = "^Decompression failed. error: 1$"
  377. with pytest.raises(lz4.stream.LZ4StreamError, match=message):
  378. decompress(input, d_kwargs)
  379. input = b'\x01\x00\x00\x00\x00'
  380. assert decompress(input, d_kwargs) == output
  381. def test_known_decompress_2():
  382. d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  383. input = b'\x02\x00\x00\x00\x10 '
  384. output = b' '
  385. assert decompress(input, d_kwargs) == output
  386. def test_known_decompress_3():
  387. d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  388. # uncompressed data size smaller than buffer_size
  389. input = b'%\x00\x00\x00\xff\x0bLorem ipsum dolor sit amet\x1a\x006P amet'
  390. output = b'Lorem ipsum dolor sit amet' * 4
  391. assert decompress(input, d_kwargs) == output
  392. def test_known_decompress_4():
  393. d_kwargs = {'strategy': "double_buffer", 'buffer_size': 128, 'store_comp_size': 4}
  394. input = b'%\x00\x00\x00\xff\x0bLorem ipsum dolor sit amet\x1a\x00NPit am\n\x00\x00\x00\x0fh\x00hP sit \x05\x00\x00\x00@amet'
  395. output = b'Lorem ipsum dolor sit amet' * 10
  396. assert decompress(input, d_kwargs) == output