123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- import lz4.stream
- import pytest
- import sys
- if sys.version_info < (3, ):
- from struct import pack, unpack
- def _get_format(length, byteorder, signed):
- _order = {'l': '<', 'b': '>'}
- _fmt = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
- _sign = {True: lambda x: x.lower(), False: lambda x: x.upper()}
- return _sign[signed](_order[byteorder[0].lower()] + _fmt[length])
- def int_to_bytes(value, length=4, byteorder='little', signed=False):
- return bytearray(pack(_get_format(length, byteorder, signed), value))
- def int_from_bytes(bytes, byteorder='little', signed=False):
- return unpack(_get_format(len(bytes), byteorder, signed), bytes)[0]
- else:
- def int_to_bytes(value, length=4, byteorder='little', signed=False):
- return value.to_bytes(length, byteorder, signed=signed)
- def int_from_bytes(bytes, byteorder='little', signed=False):
- return int.from_bytes(bytes, byteorder, signed=signed)
- # Out-of-band block size record tests
- def test_round_trip():
- data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
- kwargs = {'strategy': "double_buffer", 'buffer_size': 256, 'store_comp_size': 4}
- oob_kwargs = {}
- oob_kwargs.update(kwargs)
- oob_kwargs['store_comp_size'] = 0
- ib_cstream = bytearray()
- oob_cstream = bytearray()
- oob_sizes = []
- with lz4.stream.LZ4StreamCompressor(**kwargs) as ib_proc, \
- lz4.stream.LZ4StreamCompressor(**oob_kwargs) as oob_proc:
- for start in range(0, len(data), kwargs['buffer_size']):
- chunk = data[start:start + kwargs['buffer_size']]
- ib_block = ib_proc.compress(chunk)
- oob_block = oob_proc.compress(chunk)
- assert (len(ib_block) == (len(oob_block) + kwargs['store_comp_size'])), \
- "Blocks size mismatch: " \
- "{}/{}".format(len(ib_block), len(oob_block) + kwargs['store_comp_size'])
- assert (int_from_bytes(ib_block[:kwargs['store_comp_size']]) == len(oob_block)), \
- "Blocks size record mismatch: got {}, expected {}".format(
- int_from_bytes(ib_block[:kwargs['store_comp_size']]),
- len(oob_block))
- assert (ib_block[kwargs['store_comp_size']:] == oob_block), "Blocks data mismatch"
- ib_cstream += ib_block
- oob_cstream += oob_block
- oob_sizes.append(len(oob_block))
- ib_dstream = bytearray()
- oob_dstream = bytearray()
- with lz4.stream.LZ4StreamDecompressor(**kwargs) as ib_proc, \
- lz4.stream.LZ4StreamDecompressor(**oob_kwargs) as oob_proc:
- ib_offset = 0
- oob_index = 0
- oob_offset = 0
- while ib_offset < len(ib_cstream) and oob_index < len(oob_sizes):
- ib_block = ib_proc.get_block(ib_cstream[ib_offset:])
- oob_block = oob_cstream[oob_offset:oob_offset + oob_sizes[oob_index]]
- assert (len(ib_block) == len(oob_block)), \
- "Blocks size mismatch: {}/{}".format(len(ib_block), len(oob_block))
- assert (ib_block == oob_block), "Blocks data mismatch"
- ib_chunk = ib_proc.decompress(ib_block)
- oob_chunk = oob_proc.decompress(oob_block)
- assert (len(ib_chunk) == len(oob_chunk)), \
- "Chunks size mismatch: {}/{}".format(len(ib_chunk), len(oob_chunk))
- assert (ib_chunk == oob_chunk), "Chunks data mismatch"
- ib_dstream += ib_chunk
- oob_dstream += oob_chunk
- ib_offset += kwargs['store_comp_size'] + len(ib_block)
- oob_offset += oob_sizes[oob_index]
- oob_index += 1
- assert (len(ib_dstream) == len(oob_dstream)), "Decompressed streams length mismatch"
- assert (len(data) == len(ib_dstream)), "Decompressed streams length mismatch"
- assert (len(data) == len(oob_dstream)), "Decompressed streams length mismatch"
- assert (ib_dstream == oob_dstream), "Decompressed streams mismatch"
- assert (data == ib_dstream), "Decompressed streams mismatch"
- assert (data == oob_dstream), "Decompressed streams mismatch"
- def test_invalid_usage():
- data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
- kwargs = {'strategy': "double_buffer", 'buffer_size': 256, 'store_comp_size': 0}
- cstream = bytearray()
- oob_sizes = []
- with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
- for start in range(0, len(data), kwargs['buffer_size']):
- chunk = data[start:start + kwargs['buffer_size']]
- block = proc.compress(chunk)
- cstream += block
- oob_sizes.append(len(block))
- message = r"^LZ4 context is configured for storing block size out-of-band$"
- with pytest.raises(lz4.stream.LZ4StreamError, match=message):
- dstream = bytearray()
- with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
- offset = 0
- index = 0
- while offset < len(cstream):
- block = proc.get_block(cstream[offset:])
- chunk = proc.decompress(block)
- dstream += chunk
- offset += kwargs['store_comp_size'] + len(block)
- index += 1
|