import lz4.frame as lz4frame import os import pytest from . helpers import ( get_frame_info_check, get_chunked, ) test_data = [ b'', (128 * (32 * os.urandom(32))), (256 * (32 * os.urandom(32))), (512 * (32 * os.urandom(32))), (1024 * (32 * os.urandom(32))), ] @pytest.fixture( params=test_data, ids=[ 'data' + str(i) for i in range(len(test_data)) ] ) def data(request): return request.param @pytest.fixture( params=[ (True), (False) ] ) def reset(request): return request.param @pytest.fixture( params=[ (1), (8) ] ) def chunks(request): return request.param def test_roundtrip_LZ4FrameCompressor( data, chunks, block_size, block_linked, reset, store_size, block_checksum, content_checksum): with lz4frame.LZ4FrameCompressor( block_size=block_size, block_linked=block_linked, content_checksum=content_checksum, block_checksum=block_checksum, ) as compressor: def do_compress(): if store_size is True: compressed = compressor.begin(source_size=len(data)) else: compressed = compressor.begin() for chunk in get_chunked(data, chunks): compressed += compressor.compress(chunk) compressed += compressor.flush() return compressed compressed = do_compress() if reset is True: compressor.reset() compressed = do_compress() get_frame_info_check( compressed, len(data), store_size, block_size, block_linked, content_checksum, block_checksum, ) decompressed, bytes_read = lz4frame.decompress( compressed, return_bytes_read=True) assert data == decompressed assert bytes_read == len(compressed) def test_roundtrip_LZ4FrameCompressor_LZ4FrameDecompressor( data, chunks, block_size, block_linked, reset, store_size, block_checksum, content_checksum): with lz4frame.LZ4FrameCompressor( block_size=block_size, block_linked=block_linked, content_checksum=content_checksum, block_checksum=block_checksum, ) as compressor: def do_compress(): if store_size is True: compressed = compressor.begin(source_size=len(data)) else: compressed = compressor.begin() for chunk in get_chunked(data, chunks): compressed += compressor.compress(chunk) compressed += compressor.flush() return compressed compressed = do_compress() if reset is True: compressor.reset() compressed = do_compress() get_frame_info_check( compressed, len(data), store_size, block_size, block_linked, content_checksum, block_checksum, ) with lz4frame.LZ4FrameDecompressor() as decompressor: decompressed = b'' for chunk in get_chunked(compressed, chunks): b = decompressor.decompress(chunk) decompressed += b assert data == decompressed