test_stream_3.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import lz4.stream
  2. import pytest
  3. import sys
  4. _1KB = 1024
  5. _1MB = _1KB * 1024
  6. _1GB = _1MB * 1024
  7. def compress(x, c_kwargs):
  8. c = []
  9. with lz4.stream.LZ4StreamCompressor(**c_kwargs) as proc:
  10. for start in range(0, len(x), c_kwargs['buffer_size']):
  11. chunk = x[start:start + c_kwargs['buffer_size']]
  12. block = proc.compress(chunk)
  13. c.append(block)
  14. if c_kwargs.get('return_bytearray', False):
  15. return bytearray().join(c)
  16. else:
  17. return bytes().join(c)
  18. def decompress(x, d_kwargs):
  19. d = []
  20. with lz4.stream.LZ4StreamDecompressor(**d_kwargs) as proc:
  21. start = 0
  22. while start < len(x):
  23. block = proc.get_block(x[start:])
  24. chunk = proc.decompress(block)
  25. d.append(chunk)
  26. start += d_kwargs['store_comp_size'] + len(block)
  27. if d_kwargs.get('return_bytearray', False):
  28. return bytearray().join(d)
  29. else:
  30. return bytes().join(d)
  31. test_buffer_size = sorted(
  32. [256,
  33. 1 * _1KB,
  34. 64 * _1KB,
  35. 1 * _1MB,
  36. 1 * _1GB,
  37. lz4.stream.LZ4_MAX_INPUT_SIZE]
  38. )
  39. @pytest.fixture(
  40. params=test_buffer_size,
  41. ids=[
  42. 'buffer_size' + str(i) for i in range(len(test_buffer_size))
  43. ]
  44. )
  45. def buffer_size(request):
  46. return request.param
  47. test_data = [
  48. (b'a' * _1MB),
  49. ]
  50. @pytest.fixture(
  51. params=test_data,
  52. ids=[
  53. 'data' + str(i) for i in range(len(test_data))
  54. ]
  55. )
  56. def data(request):
  57. return request.param
  58. def test_block_decompress_mem_usage(data, buffer_size):
  59. kwargs = {
  60. 'strategy': "double_buffer",
  61. 'buffer_size': buffer_size,
  62. 'store_comp_size': 4,
  63. }
  64. if sys.maxsize < 0xffffffff:
  65. pytest.skip('Py_ssize_t too small for this test')
  66. tracemalloc = pytest.importorskip('tracemalloc')
  67. # Trace memory usage on compression
  68. tracemalloc.start()
  69. prev_snapshot = None
  70. for i in range(1000):
  71. compressed = compress(data, kwargs)
  72. if i % 100 == 0:
  73. snapshot = tracemalloc.take_snapshot()
  74. if prev_snapshot:
  75. # Filter on lz4.stream module'a allocations
  76. stats = [x for x in snapshot.compare_to(prev_snapshot, 'lineno')
  77. if lz4.stream.__file__ in x.traceback._frames[0][0]]
  78. assert sum(map(lambda x: x.size_diff, stats)) < (1024 * 4)
  79. prev_snapshot = snapshot
  80. tracemalloc.stop()
  81. tracemalloc.start()
  82. prev_snapshot = None
  83. for i in range(1000):
  84. decompressed = decompress(compressed, kwargs) # noqa: F841
  85. if i % 100 == 0:
  86. snapshot = tracemalloc.take_snapshot()
  87. if prev_snapshot:
  88. # Filter on lz4.stream module'a allocations
  89. stats = [x for x in snapshot.compare_to(prev_snapshot, 'lineno')
  90. if lz4.stream.__file__ in x.traceback._frames[0][0]]
  91. assert sum(map(lambda x: x.size_diff, stats)) < (1024 * 4)
  92. prev_snapshot = snapshot
  93. tracemalloc.stop()