1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import zlib
- from abc import abstractmethod
- from typing import Union
- import lz4
- import lz4.frame
- import zstandard
- try:
- import brotli
- except ImportError:
- brotli = None
- available_compression = ['lz4', 'zstd']
- if brotli:
- available_compression.append('br')
- available_compression.extend(['gzip', 'deflate'])
- comp_map = {}
- class Compressor:
- def __init_subclass__(cls, tag: str, thread_safe: bool = True):
- comp_map[tag] = cls() if thread_safe else cls
- @abstractmethod
- def compress_block(self, block) -> Union[bytes, bytearray]:
- return block
- def flush(self):
- pass
- class GzipCompressor(Compressor, tag='gzip', thread_safe=False):
- def __init__(self, level: int = 6, wbits: int = 31):
- self.zlib_obj = zlib.compressobj(level=level, wbits=wbits)
- def compress_block(self, block):
- return self.zlib_obj.compress(block)
- def flush(self):
- return self.zlib_obj.flush()
- class Lz4Compressor(Compressor, tag='lz4', thread_safe=False):
- def __init__(self):
- self.comp = lz4.frame.LZ4FrameCompressor()
- def compress_block(self, block):
- output = self.comp.begin(len(block))
- output += self.comp.compress(block)
- return output + self.comp.flush()
- class ZstdCompressor(Compressor, tag='zstd'):
- def compress_block(self, block):
- return zstandard.compress(block)
- class BrotliCompressor(Compressor, tag='br'):
- def compress_block(self, block):
- return brotli.compress(block)
- null_compressor = Compressor()
- def get_compressor(compression: str) -> Compressor:
- if not compression:
- return null_compressor
- comp = comp_map[compression]
- try:
- return comp()
- except TypeError:
- return comp
|