123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- import lz4.block
- import pytest
- import sys
- import os
- def test_decompress_ui32_overflow():
- data = lz4.block.compress(b'A' * 64)
- with pytest.raises(OverflowError):
- lz4.block.decompress(data[4:], uncompressed_size=((1 << 32) + 64))
- def test_decompress_without_leak():
- # Verify that hand-crafted packet does not leak uninitialized(?) memory.
- data = lz4.block.compress(b'A' * 64)
- message = r'^Decompressor wrote 64 bytes, but 79 bytes expected from header$'
- with pytest.raises(lz4.block.LZ4BlockError, match=message):
- lz4.block.decompress(b'\x4f' + data[1:])
- def test_decompress_with_small_buffer():
- data = lz4.block.compress(b'A' * 64, store_size=False)
- message = r'^Decompression failed: corrupt input or insufficient space in destination buffer. Error code: \d+$'
- with pytest.raises(lz4.block.LZ4BlockError, match=message):
- lz4.block.decompress(data[4:], uncompressed_size=64)
- with pytest.raises(lz4.block.LZ4BlockError, match=message):
- lz4.block.decompress(data, uncompressed_size=60)
- def test_decompress_truncated():
- input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
- compressed = lz4.block.compress(input_data)
- # for i in range(len(compressed)):
- # try:
- # lz4.block.decompress(compressed[:i])
- # except:
- # print(i, sys.exc_info()[0], sys.exc_info()[1])
- with pytest.raises(ValueError, match='Input source data size too small'):
- lz4.block.decompress(compressed[:0])
- for n in [0, 1]:
- with pytest.raises(ValueError, match='Input source data size too small'):
- lz4.block.decompress(compressed[:n])
- for n in [24, 25, -2, 27, 67, 85]:
- with pytest.raises(lz4.block.LZ4BlockError):
- lz4.block.decompress(compressed[:n])
- def test_decompress_with_trailer():
- data = b'A' * 64
- comp = lz4.block.compress(data)
- message = r'^Decompression failed: corrupt input or insufficient space in destination buffer. Error code: \d+$'
- with pytest.raises(lz4.block.LZ4BlockError, match=message):
- lz4.block.decompress(comp + b'A')
- with pytest.raises(lz4.block.LZ4BlockError, match=message):
- lz4.block.decompress(comp + comp)
- with pytest.raises(lz4.block.LZ4BlockError, match=message):
- lz4.block.decompress(comp + comp[4:])
- def test_unicode():
- if sys.version_info < (3,):
- return # skip
- DATA = b'x'
- with pytest.raises(TypeError):
- lz4.block.compress(DATA.decode('latin1'))
- lz4.block.decompress(lz4.block.compress(DATA).decode('latin1'))
- # These next two are probably redundant given test_1 above but we'll keep them
- # for now
- def test_return_bytearray():
- if sys.version_info < (3,):
- return # skip
- data = os.urandom(128 * 1024) # Read 128kb
- compressed = lz4.block.compress(data)
- b = lz4.block.compress(data, return_bytearray=True)
- assert isinstance(b, bytearray)
- assert bytes(b) == compressed
- b = lz4.block.decompress(compressed, return_bytearray=True)
- assert isinstance(b, bytearray)
- assert bytes(b) == data
- def test_memoryview():
- if sys.version_info < (2, 7):
- return # skip
- data = os.urandom(128 * 1024) # Read 128kb
- compressed = lz4.block.compress(data)
- assert lz4.block.compress(memoryview(data)) == compressed
- assert lz4.block.decompress(memoryview(compressed)) == data
- def test_with_dict_none():
- input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
- for mode in ['default', 'high_compression']:
- assert lz4.block.decompress(lz4.block.compress(
- input_data, mode=mode, dict=None)) == input_data
- assert lz4.block.decompress(lz4.block.compress(
- input_data, mode=mode), dict=None) == input_data
- assert lz4.block.decompress(lz4.block.compress(
- input_data, mode=mode, dict=b'')) == input_data
- assert lz4.block.decompress(lz4.block.compress(
- input_data, mode=mode), dict=b'') == input_data
- assert lz4.block.decompress(lz4.block.compress(
- input_data, mode=mode, dict='')) == input_data
- assert lz4.block.decompress(lz4.block.compress(
- input_data, mode=mode), dict='') == input_data
- def test_with_dict():
- input_data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
- dict1 = input_data[10:30]
- dict2 = input_data[20:40]
- message = r'^Decompression failed: corrupt input or insufficient space in destination buffer. Error code: \d+$'
- for mode in ['default', 'high_compression']:
- compressed = lz4.block.compress(input_data, mode=mode, dict=dict1)
- with pytest.raises(lz4.block.LZ4BlockError, match=message):
- lz4.block.decompress(compressed)
- with pytest.raises(lz4.block.LZ4BlockError, match=message):
- lz4.block.decompress(compressed, dict=dict1[:2])
- assert lz4.block.decompress(compressed, dict=dict2) != input_data
- assert lz4.block.decompress(compressed, dict=dict1) == input_data
- assert lz4.block.decompress(lz4.block.compress(
- input_data), dict=dict1) == input_data
- def test_known_decompress_1():
- input = b'\x00\x00\x00\x00\x00'
- output = b''
- assert lz4.block.decompress(input) == output
- def test_known_decompress_2():
- input = b'\x01\x00\x00\x00\x10 '
- output = b' '
- assert lz4.block.decompress(input) == output
- def test_known_decompress_3():
- input = b'h\x00\x00\x00\xff\x0bLorem ipsum dolor sit amet\x1a\x006P amet'
- output = b'Lorem ipsum dolor sit amet' * 4
- assert lz4.block.decompress(input) == output
- def test_known_decompress_4():
- input = b'\xb0\xb3\x00\x00\xff\x1fExcepteur sint occaecat cupidatat non proident.\x00' + (b'\xff' * 180) + b'\x1ePident'
- output = b'Excepteur sint occaecat cupidatat non proident' * 1000
- assert lz4.block.decompress(input) == output
|