test_frame_4.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import lz4.frame as lz4frame
  2. import os
  3. import pytest
  4. from . helpers import (
  5. get_frame_info_check,
  6. get_chunked,
  7. )
  8. test_data = [
  9. b'',
  10. (128 * (32 * os.urandom(32))),
  11. (256 * (32 * os.urandom(32))),
  12. (512 * (32 * os.urandom(32))),
  13. (1024 * (32 * os.urandom(32))),
  14. ]
  15. @pytest.fixture(
  16. params=test_data,
  17. ids=[
  18. 'data' + str(i) for i in range(len(test_data))
  19. ]
  20. )
  21. def data(request):
  22. return request.param
  23. @pytest.fixture(
  24. params=[
  25. (True),
  26. (False)
  27. ]
  28. )
  29. def reset(request):
  30. return request.param
  31. @pytest.fixture(
  32. params=[
  33. (1),
  34. (8)
  35. ]
  36. )
  37. def chunks(request):
  38. return request.param
  39. def test_roundtrip_LZ4FrameCompressor(
  40. data,
  41. chunks,
  42. block_size,
  43. block_linked,
  44. reset,
  45. store_size,
  46. block_checksum,
  47. content_checksum):
  48. with lz4frame.LZ4FrameCompressor(
  49. block_size=block_size,
  50. block_linked=block_linked,
  51. content_checksum=content_checksum,
  52. block_checksum=block_checksum,
  53. ) as compressor:
  54. def do_compress():
  55. if store_size is True:
  56. compressed = compressor.begin(source_size=len(data))
  57. else:
  58. compressed = compressor.begin()
  59. for chunk in get_chunked(data, chunks):
  60. compressed += compressor.compress(chunk)
  61. compressed += compressor.flush()
  62. return compressed
  63. compressed = do_compress()
  64. if reset is True:
  65. compressor.reset()
  66. compressed = do_compress()
  67. get_frame_info_check(
  68. compressed,
  69. len(data),
  70. store_size,
  71. block_size,
  72. block_linked,
  73. content_checksum,
  74. block_checksum,
  75. )
  76. decompressed, bytes_read = lz4frame.decompress(
  77. compressed, return_bytes_read=True)
  78. assert data == decompressed
  79. assert bytes_read == len(compressed)
  80. def test_roundtrip_LZ4FrameCompressor_LZ4FrameDecompressor(
  81. data,
  82. chunks,
  83. block_size,
  84. block_linked,
  85. reset,
  86. store_size,
  87. block_checksum,
  88. content_checksum):
  89. with lz4frame.LZ4FrameCompressor(
  90. block_size=block_size,
  91. block_linked=block_linked,
  92. content_checksum=content_checksum,
  93. block_checksum=block_checksum,
  94. ) as compressor:
  95. def do_compress():
  96. if store_size is True:
  97. compressed = compressor.begin(source_size=len(data))
  98. else:
  99. compressed = compressor.begin()
  100. for chunk in get_chunked(data, chunks):
  101. compressed += compressor.compress(chunk)
  102. compressed += compressor.flush()
  103. return compressed
  104. compressed = do_compress()
  105. if reset is True:
  106. compressor.reset()
  107. compressed = do_compress()
  108. get_frame_info_check(
  109. compressed,
  110. len(data),
  111. store_size,
  112. block_size,
  113. block_linked,
  114. content_checksum,
  115. block_checksum,
  116. )
  117. with lz4frame.LZ4FrameDecompressor() as decompressor:
  118. decompressed = b''
  119. for chunk in get_chunked(compressed, chunks):
  120. b = decompressor.decompress(chunk)
  121. decompressed += b
  122. assert data == decompressed