123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- import lz4.frame as lz4frame
- import os
- import pytest
- from . helpers import (
- get_frame_info_check,
- get_chunked,
- )
- test_data = [
- b'',
- (128 * (32 * os.urandom(32))),
- (256 * (32 * os.urandom(32))),
- (512 * (32 * os.urandom(32))),
- (1024 * (32 * os.urandom(32))),
- ]
- @pytest.fixture(
- params=test_data,
- ids=[
- 'data' + str(i) for i in range(len(test_data))
- ]
- )
- def data(request):
- return request.param
- @pytest.fixture(
- params=[
- (True),
- (False)
- ]
- )
- def reset(request):
- return request.param
- @pytest.fixture(
- params=[
- (1),
- (8)
- ]
- )
- def chunks(request):
- return request.param
- def test_roundtrip_LZ4FrameCompressor(
- data,
- chunks,
- block_size,
- block_linked,
- reset,
- store_size,
- block_checksum,
- content_checksum):
- with lz4frame.LZ4FrameCompressor(
- block_size=block_size,
- block_linked=block_linked,
- content_checksum=content_checksum,
- block_checksum=block_checksum,
- ) as compressor:
- def do_compress():
- if store_size is True:
- compressed = compressor.begin(source_size=len(data))
- else:
- compressed = compressor.begin()
- for chunk in get_chunked(data, chunks):
- compressed += compressor.compress(chunk)
- compressed += compressor.flush()
- return compressed
- compressed = do_compress()
- if reset is True:
- compressor.reset()
- compressed = do_compress()
- get_frame_info_check(
- compressed,
- len(data),
- store_size,
- block_size,
- block_linked,
- content_checksum,
- block_checksum,
- )
- decompressed, bytes_read = lz4frame.decompress(
- compressed, return_bytes_read=True)
- assert data == decompressed
- assert bytes_read == len(compressed)
- def test_roundtrip_LZ4FrameCompressor_LZ4FrameDecompressor(
- data,
- chunks,
- block_size,
- block_linked,
- reset,
- store_size,
- block_checksum,
- content_checksum):
- with lz4frame.LZ4FrameCompressor(
- block_size=block_size,
- block_linked=block_linked,
- content_checksum=content_checksum,
- block_checksum=block_checksum,
- ) as compressor:
- def do_compress():
- if store_size is True:
- compressed = compressor.begin(source_size=len(data))
- else:
- compressed = compressor.begin()
- for chunk in get_chunked(data, chunks):
- compressed += compressor.compress(chunk)
- compressed += compressor.flush()
- return compressed
- compressed = do_compress()
- if reset is True:
- compressor.reset()
- compressed = do_compress()
- get_frame_info_check(
- compressed,
- len(data),
- store_size,
- block_size,
- block_linked,
- content_checksum,
- block_checksum,
- )
- with lz4frame.LZ4FrameDecompressor() as decompressor:
- decompressed = b''
- for chunk in get_chunked(compressed, chunks):
- b = decompressor.decompress(chunk)
- decompressed += b
- assert data == decompressed
|