123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- import lz4.frame as lz4frame
- import pytest
- import os
- test_data = [
- (os.urandom(32) * 256),
- ]
- @pytest.fixture(
- params=test_data,
- ids=[
- 'data' + str(i) for i in range(len(test_data))
- ]
- )
- def data(request):
- return request.param
- def test_roundtrip_multiframe_1(data):
- nframes = 4
- compressed = b''
- for _ in range(nframes):
- compressed += lz4frame.compress(data)
- decompressed = b''
- for _ in range(nframes):
- decompressed += lz4frame.decompress(compressed)
- assert len(decompressed) == nframes * len(data)
- assert data * nframes == decompressed
- def test_roundtrip_multiframe_2(data):
- nframes = 4
- compressed = b''
- ctx = lz4frame.create_compression_context()
- for _ in range(nframes):
- compressed += lz4frame.compress_begin(ctx)
- compressed += lz4frame.compress_chunk(ctx, data)
- compressed += lz4frame.compress_flush(ctx)
- decompressed = b''
- for _ in range(nframes):
- decompressed += lz4frame.decompress(compressed)
- assert len(decompressed) == nframes * len(data)
- assert data * nframes == decompressed
- def test_roundtrip_multiframe_3(data):
- nframes = 4
- compressed = b''
- ctx = lz4frame.create_compression_context()
- for _ in range(nframes):
- compressed += lz4frame.compress_begin(ctx)
- compressed += lz4frame.compress_chunk(ctx, data)
- compressed += lz4frame.compress_flush(ctx)
- decompressed = b''
- ctx = lz4frame.create_decompression_context()
- for _ in range(nframes):
- d, bytes_read, eof = lz4frame.decompress_chunk(ctx, compressed)
- decompressed += d
- assert eof is True
- assert bytes_read == len(compressed) // nframes
- assert len(decompressed) == nframes * len(data)
- assert data * nframes == decompressed
- def test_roundtrip_multiframe_4(data):
- nframes = 4
- compressed = b''
- with lz4frame.LZ4FrameCompressor() as compressor:
- for _ in range(nframes):
- compressed += compressor.begin()
- compressed += compressor.compress(data)
- compressed += compressor.flush()
- decompressed = b''
- with lz4frame.LZ4FrameDecompressor() as decompressor:
- for i in range(nframes):
- if i == 0:
- d = compressed
- else:
- d = decompressor.unused_data
- decompressed += decompressor.decompress(d)
- assert decompressor.eof is True
- assert decompressor.needs_input is True
- if i == nframes - 1:
- assert decompressor.unused_data is None
- else:
- assert len(decompressor.unused_data) == len(
- compressed) * (nframes - i - 1) / nframes
- assert len(decompressed) == nframes * len(data)
- assert data * nframes == decompressed
|