test_stream_0.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import lz4.stream
  2. import sys
  3. import pytest
  4. if sys.version_info <= (3, 2):
  5. import struct
  6. def get_stored_size(buff, block_length_size):
  7. if sys.version_info > (2, 7):
  8. if isinstance(buff, memoryview):
  9. b = buff.tobytes()
  10. else:
  11. b = bytes(buff)
  12. else:
  13. b = bytes(buff)
  14. if len(b) < block_length_size:
  15. return None
  16. if sys.version_info > (3, 2):
  17. return int.from_bytes(b[:block_length_size], 'little')
  18. else:
  19. # This would not work on a memoryview object, hence buff.tobytes call
  20. # above
  21. fmt = {1: 'B', 2: 'H', 4: 'I', }
  22. return struct.unpack('<' + fmt[block_length_size], b[:block_length_size])[0]
  23. def roundtrip(x, c_kwargs, d_kwargs, dictionary):
  24. if dictionary:
  25. if isinstance(dictionary, tuple):
  26. dict_ = x[dictionary[0]:dictionary[1]]
  27. else:
  28. dict_ = dictionary
  29. c_kwargs['dictionary'] = dict_
  30. d_kwargs['dictionary'] = dict_
  31. c = bytes()
  32. with lz4.stream.LZ4StreamCompressor(**c_kwargs) as proc:
  33. for start in range(0, len(x), c_kwargs['buffer_size']):
  34. chunk = x[start:start + c_kwargs['buffer_size']]
  35. assert len(chunk) <= c_kwargs['buffer_size']
  36. block = proc.compress(chunk)
  37. if c_kwargs.get('return_bytearray'):
  38. assert isinstance(block, bytearray)
  39. if start == 0:
  40. c = block
  41. else:
  42. c += block
  43. assert get_stored_size(block, c_kwargs['store_comp_size']) == \
  44. (len(block) - c_kwargs['store_comp_size'])
  45. d = bytes()
  46. with lz4.stream.LZ4StreamDecompressor(**d_kwargs) as proc:
  47. start = 0
  48. while start < len(c):
  49. block = proc.get_block(c[start:])
  50. chunk = proc.decompress(block)
  51. if d_kwargs.get('return_bytearray'):
  52. assert isinstance(chunk, bytearray)
  53. if start == 0:
  54. d = chunk
  55. else:
  56. d += chunk
  57. start += d_kwargs['store_comp_size'] + len(block)
  58. return d
  59. def setup_kwargs(strategy, mode, buffer_size, store_comp_size,
  60. c_return_bytearray=None, d_return_bytearray=None):
  61. c_kwargs = {}
  62. if mode[0] is not None:
  63. c_kwargs['mode'] = mode[0]
  64. if mode[1] is not None:
  65. c_kwargs.update(mode[1])
  66. c_kwargs['strategy'] = strategy
  67. c_kwargs['buffer_size'] = buffer_size
  68. c_kwargs.update(store_comp_size)
  69. if c_return_bytearray:
  70. c_kwargs.update(c_return_bytearray)
  71. d_kwargs = {}
  72. if d_return_bytearray:
  73. d_kwargs.update(d_return_bytearray)
  74. d_kwargs['strategy'] = strategy
  75. d_kwargs['buffer_size'] = buffer_size
  76. d_kwargs.update(store_comp_size)
  77. return (c_kwargs, d_kwargs)
  78. # Test single threaded usage with all valid variations of input
  79. def test_1(data, strategy, mode, buffer_size, store_comp_size,
  80. c_return_bytearray, d_return_bytearray, dictionary):
  81. if buffer_size >= (1 << (8 * store_comp_size['store_comp_size'])):
  82. pytest.skip("Invalid case: buffer_size too large for the block length area")
  83. (c_kwargs, d_kwargs) = setup_kwargs(
  84. strategy, mode, buffer_size, store_comp_size, c_return_bytearray, d_return_bytearray)
  85. d = roundtrip(data, c_kwargs, d_kwargs, dictionary)
  86. assert d == data
  87. # Test multi threaded:
  88. # Not relevant in the lz4.stream case (the process is highly sequential,
  89. # and re-use/share the same context from one input chunk to the next one).
  90. def test_2(data, strategy, mode, buffer_size, store_comp_size, dictionary): # noqa
  91. pass