__init__.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from io import open
  2. import struct
  3. import json
  4. import os
  5. import logging
  6. import library.python.par_apply as lpp
  7. import library.python.codecs as lpc
  8. logger = logging.getLogger('compress')
  9. def list_all_codecs():
  10. return sorted(frozenset(lpc.list_all_codecs()))
  11. def find_codec(ext):
  12. def ext_compress(x):
  13. return lpc.dumps(ext, x)
  14. def ext_decompress(x):
  15. return lpc.loads(ext, x)
  16. ext_decompress(ext_compress(b''))
  17. return {'c': ext_compress, 'd': ext_decompress, 'n': ext}
  18. def codec_for(path):
  19. for ext in reversed(path.split('.')):
  20. try:
  21. return find_codec(ext)
  22. except Exception as e:
  23. logger.debug('in codec_for(): %s', e)
  24. raise Exception('unsupported file %s' % path)
  25. def compress(fr, to, codec=None, fopen=open, threads=1):
  26. if codec:
  27. codec = find_codec(codec)
  28. else:
  29. codec = codec_for(to)
  30. func = codec['c']
  31. def iter_blocks():
  32. with fopen(fr, 'rb') as f:
  33. while True:
  34. chunk = f.read(16 * 1024 * 1024)
  35. if chunk:
  36. yield chunk
  37. else:
  38. yield b''
  39. return
  40. def iter_results():
  41. info = {
  42. 'codec': codec['n'],
  43. }
  44. if fr:
  45. info['size'] = os.path.getsize(fr)
  46. yield json.dumps(info, sort_keys=True) + '\n'
  47. for c in lpp.par_apply(iter_blocks(), func, threads):
  48. yield c
  49. with fopen(to, 'wb') as f:
  50. for c in iter_results():
  51. logger.debug('complete %s', len(c))
  52. f.write(struct.pack('<I', len(c)))
  53. try:
  54. f.write(c)
  55. except TypeError:
  56. f.write(c.encode('utf-8'))
  57. def decompress(fr, to, codec=None, fopen=open, threads=1):
  58. def iter_chunks():
  59. with fopen(fr, 'rb') as f:
  60. cnt = 0
  61. while True:
  62. ll = f.read(4)
  63. if ll:
  64. ll = struct.unpack('<I', ll)[0]
  65. if ll:
  66. if ll > 100000000:
  67. raise Exception('broken stream')
  68. yield f.read(ll)
  69. cnt += ll
  70. else:
  71. if not cnt:
  72. raise Exception('empty stream')
  73. return
  74. it = iter_chunks()
  75. extra = []
  76. for chunk in it:
  77. hdr = {}
  78. try:
  79. hdr = json.loads(chunk)
  80. except Exception as e:
  81. logger.info('can not parse header, suspect old format: %s', e)
  82. extra.append(chunk)
  83. break
  84. def resolve_codec():
  85. if 'codec' in hdr:
  86. return find_codec(hdr['codec'])
  87. if codec:
  88. return find_codec(codec)
  89. return codec_for(fr)
  90. dc = resolve_codec()['d']
  91. def iter_all_chunks():
  92. for x in extra:
  93. yield x
  94. for x in it:
  95. yield x
  96. with fopen(to, 'wb') as f:
  97. for c in lpp.par_apply(iter_all_chunks(), dc, threads):
  98. if c:
  99. logger.debug('complete %s', len(c))
  100. f.write(c)
  101. else:
  102. break