compression.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import zlib
  2. from abc import abstractmethod
  3. from typing import Union
  4. import lz4
  5. import lz4.frame
  6. import zstandard
  7. try:
  8. import brotli
  9. except ImportError:
  10. brotli = None
  11. available_compression = ['lz4', 'zstd']
  12. if brotli:
  13. available_compression.append('br')
  14. available_compression.extend(['gzip', 'deflate'])
  15. comp_map = {}
  16. class Compressor:
  17. def __init_subclass__(cls, tag: str, thread_safe: bool = True):
  18. comp_map[tag] = cls() if thread_safe else cls
  19. @abstractmethod
  20. def compress_block(self, block) -> Union[bytes, bytearray]:
  21. return block
  22. def flush(self):
  23. pass
  24. class GzipCompressor(Compressor, tag='gzip', thread_safe=False):
  25. def __init__(self, level: int = 6, wbits: int = 31):
  26. self.zlib_obj = zlib.compressobj(level=level, wbits=wbits)
  27. def compress_block(self, block):
  28. return self.zlib_obj.compress(block)
  29. def flush(self):
  30. return self.zlib_obj.flush()
  31. class Lz4Compressor(Compressor, tag='lz4', thread_safe=False):
  32. def __init__(self):
  33. self.comp = lz4.frame.LZ4FrameCompressor()
  34. def compress_block(self, block):
  35. output = self.comp.begin(len(block))
  36. output += self.comp.compress(block)
  37. return output + self.comp.flush()
  38. class ZstdCompressor(Compressor, tag='zstd'):
  39. def compress_block(self, block):
  40. return zstandard.compress(block)
  41. class BrotliCompressor(Compressor, tag='br'):
  42. def compress_block(self, block):
  43. return brotli.compress(block)
  44. null_compressor = Compressor()
  45. def get_compressor(compression: str) -> Compressor:
  46. if not compression:
  47. return null_compressor
  48. comp = comp_map[compression]
  49. try:
  50. return comp()
  51. except TypeError:
  52. return comp