test_block_0.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import lz4.block
  2. from multiprocessing.pool import ThreadPool
  3. import sys
  4. from functools import partial
  5. if sys.version_info <= (3, 2):
  6. import struct
  7. def get_stored_size(buff):
  8. if sys.version_info > (2, 7):
  9. if isinstance(buff, memoryview):
  10. b = buff.tobytes()
  11. else:
  12. b = bytes(buff)
  13. else:
  14. b = bytes(buff)
  15. if len(b) < 4:
  16. return None
  17. if sys.version_info > (3, 2):
  18. return int.from_bytes(b[:4], 'little')
  19. else:
  20. # This would not work on a memoryview object, hence buff.tobytes call
  21. # above
  22. return struct.unpack('<I', b[:4])[0]
  23. def roundtrip(x, c_kwargs, d_kwargs, dictionary):
  24. if dictionary:
  25. if isinstance(dictionary, tuple):
  26. d = x[dictionary[0]:dictionary[1]]
  27. else:
  28. d = dictionary
  29. c_kwargs['dict'] = d
  30. d_kwargs['dict'] = d
  31. c = lz4.block.compress(x, **c_kwargs)
  32. if c_kwargs['store_size']:
  33. assert get_stored_size(c) == len(x)
  34. else:
  35. d_kwargs['uncompressed_size'] = len(x)
  36. return lz4.block.decompress(c, **d_kwargs)
  37. def setup_kwargs(mode, store_size, c_return_bytearray=None, d_return_bytearray=None):
  38. c_kwargs = {}
  39. if mode[0] is not None:
  40. c_kwargs['mode'] = mode[0]
  41. if mode[1] is not None:
  42. c_kwargs.update(mode[1])
  43. c_kwargs.update(store_size)
  44. if c_return_bytearray:
  45. c_kwargs.update(c_return_bytearray)
  46. d_kwargs = {}
  47. if d_return_bytearray:
  48. d_kwargs.update(d_return_bytearray)
  49. return (c_kwargs, d_kwargs)
  50. # Test single threaded usage with all valid variations of input
  51. def test_1(data, mode, store_size, c_return_bytearray, d_return_bytearray, dictionary):
  52. (c_kwargs, d_kwargs) = setup_kwargs(
  53. mode, store_size, c_return_bytearray, d_return_bytearray)
  54. d = roundtrip(data, c_kwargs, d_kwargs, dictionary)
  55. assert d == data
  56. if d_return_bytearray['return_bytearray']:
  57. assert isinstance(d, bytearray)
  58. # Test multi threaded usage with all valid variations of input
  59. def test_2(data, mode, store_size, dictionary):
  60. (c_kwargs, d_kwargs) = setup_kwargs(mode, store_size)
  61. data_in = [data for i in range(32)]
  62. pool = ThreadPool(2)
  63. rt = partial(roundtrip, c_kwargs=c_kwargs,
  64. d_kwargs=d_kwargs, dictionary=dictionary)
  65. data_out = pool.map(rt, data_in)
  66. pool.close()
  67. assert data_in == data_out