test_frame_7.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import lz4.frame as lz4frame
  2. import pytest
  3. import os
  4. test_data = [
  5. (os.urandom(32) * 256),
  6. ]
  7. @pytest.fixture(
  8. params=test_data,
  9. ids=[
  10. 'data' + str(i) for i in range(len(test_data))
  11. ]
  12. )
  13. def data(request):
  14. return request.param
  15. def test_roundtrip_multiframe_1(data):
  16. nframes = 4
  17. compressed = b''
  18. for _ in range(nframes):
  19. compressed += lz4frame.compress(data)
  20. decompressed = b''
  21. for _ in range(nframes):
  22. decompressed += lz4frame.decompress(compressed)
  23. assert len(decompressed) == nframes * len(data)
  24. assert data * nframes == decompressed
  25. def test_roundtrip_multiframe_2(data):
  26. nframes = 4
  27. compressed = b''
  28. ctx = lz4frame.create_compression_context()
  29. for _ in range(nframes):
  30. compressed += lz4frame.compress_begin(ctx)
  31. compressed += lz4frame.compress_chunk(ctx, data)
  32. compressed += lz4frame.compress_flush(ctx)
  33. decompressed = b''
  34. for _ in range(nframes):
  35. decompressed += lz4frame.decompress(compressed)
  36. assert len(decompressed) == nframes * len(data)
  37. assert data * nframes == decompressed
  38. def test_roundtrip_multiframe_3(data):
  39. nframes = 4
  40. compressed = b''
  41. ctx = lz4frame.create_compression_context()
  42. for _ in range(nframes):
  43. compressed += lz4frame.compress_begin(ctx)
  44. compressed += lz4frame.compress_chunk(ctx, data)
  45. compressed += lz4frame.compress_flush(ctx)
  46. decompressed = b''
  47. ctx = lz4frame.create_decompression_context()
  48. for _ in range(nframes):
  49. d, bytes_read, eof = lz4frame.decompress_chunk(ctx, compressed)
  50. decompressed += d
  51. assert eof is True
  52. assert bytes_read == len(compressed) // nframes
  53. assert len(decompressed) == nframes * len(data)
  54. assert data * nframes == decompressed
  55. def test_roundtrip_multiframe_4(data):
  56. nframes = 4
  57. compressed = b''
  58. with lz4frame.LZ4FrameCompressor() as compressor:
  59. for _ in range(nframes):
  60. compressed += compressor.begin()
  61. compressed += compressor.compress(data)
  62. compressed += compressor.flush()
  63. decompressed = b''
  64. with lz4frame.LZ4FrameDecompressor() as decompressor:
  65. for i in range(nframes):
  66. if i == 0:
  67. d = compressed
  68. else:
  69. d = decompressor.unused_data
  70. decompressed += decompressor.decompress(d)
  71. assert decompressor.eof is True
  72. assert decompressor.needs_input is True
  73. if i == nframes - 1:
  74. assert decompressor.unused_data is None
  75. else:
  76. assert len(decompressor.unused_data) == len(
  77. compressed) * (nframes - i - 1) / nframes
  78. assert len(decompressed) == nframes * len(data)
  79. assert data * nframes == decompressed