import datetime as dt import errno import hashlib import json import logging import os import platform import random import shutil import socket import string import sys import tarfile import urllib2 import retry def make_user_agent(): return 'fetch_from: {host}'.format(host=socket.gethostname()) def add_common_arguments(parser): parser.add_argument('--copy-to') # used by jbuild in fetch_resource parser.add_argument('--rename-to') # used by test_node in inject_mds_resource_to_graph parser.add_argument('--copy-to-dir') parser.add_argument('--untar-to') parser.add_argument( '--rename', action='append', default=[], metavar='FILE', help='rename FILE to the corresponding output' ) parser.add_argument('--executable', action='store_true', help='make outputs executable') parser.add_argument('--log-path') parser.add_argument( '-v', '--verbose', action='store_true', default=os.environ.get('YA_VERBOSE_FETCHER'), help='increase stderr verbosity', ) parser.add_argument('outputs', nargs='*', default=[]) def ensure_dir(path): if not (path == '' or os.path.isdir(path)): os.makedirs(path) # Reference code: library/python/fs/__init__.py def hardlink_or_copy(src, dst): ensure_dir(os.path.dirname(dst)) if os.name == 'nt': shutil.copy(src, dst) else: try: os.link(src, dst) except OSError as e: if e.errno == errno.EEXIST: return elif e.errno in (errno.EXDEV, errno.EMLINK, errno.EINVAL, errno.EACCES): sys.stderr.write( "Can't make hardlink (errno={}) - fallback to copy: {} -> {}\n".format(e.errno, src, dst) ) shutil.copy(src, dst) else: sys.stderr.write("src: {} dst: {}\n".format(src, dst)) raise def rename_or_copy_and_remove(src, dst): ensure_dir(os.path.dirname(dst)) try: os.rename(src, dst) except OSError: shutil.copy(src, dst) os.remove(src) class BadChecksumFetchError(Exception): pass class IncompleteFetchError(Exception): pass class ResourceUnpackingError(Exception): pass class ResourceIsDirectoryError(Exception): pass class OutputIsDirectoryError(Exception): pass class OutputNotExistError(Exception): pass def setup_logging(args, base_name): def makedirs(path): try: os.makedirs(path) except OSError: pass if args.log_path: log_file_name = args.log_path else: log_file_name = base_name + ".log" args.abs_log_path = os.path.abspath(log_file_name) makedirs(os.path.dirname(args.abs_log_path)) logging.basicConfig(filename=args.abs_log_path, level=logging.DEBUG) if args.verbose: logging.getLogger().addHandler(logging.StreamHandler(sys.stderr)) def is_temporary(e): def is_broken(e): return isinstance(e, urllib2.HTTPError) and e.code in (410, 404) if is_broken(e): return False if isinstance(e, (BadChecksumFetchError, IncompleteFetchError, urllib2.URLError, socket.error)): return True import error return error.is_temporary_error(e) def uniq_string_generator(size=6, chars=string.ascii_lowercase + string.digits): return ''.join(random.choice(chars) for _ in range(size)) def report_to_snowden(value): def inner(): body = { 'namespace': 'ygg', 'key': 'fetch-from-sandbox', 'value': json.dumps(value), } urllib2.urlopen( 'https://back-snowden.qloud.yandex-team.ru/report/add', json.dumps( [ body, ] ), timeout=5, ) try: inner() except Exception as e: logging.warning('report_to_snowden failed: %s', e) def copy_stream(read, *writers, **kwargs): chunk_size = kwargs.get('size', 1024 * 1024) while True: data = read(chunk_size) if not data: break for write in writers: write(data) def md5file(fname): res = hashlib.md5() with open(fname, 'rb') as f: copy_stream(f.read, res.update) return res.hexdigest() def git_like_hash_with_size(filepath): """ Calculate git like hash for path """ sha = hashlib.sha1() file_size = 0 with open(filepath, 'rb') as f: while True: block = f.read(2**16) if not block: break file_size += len(block) sha.update(block) sha.update('\0') sha.update(str(file_size)) return sha.hexdigest(), file_size def size_printer(display_name, size): sz = [0] last_stamp = [dt.datetime.now()] def printer(chunk): sz[0] += len(chunk) now = dt.datetime.now() if last_stamp[0] + dt.timedelta(seconds=10) < now: if size: print >> sys.stderr, "##status##{} - [[imp]]{:.1f}%[[rst]]".format( display_name, 100.0 * sz[0] / size if size else 0 ) last_stamp[0] = now return printer def fetch_url(url, unpack, resource_file_name, expected_md5=None, expected_sha1=None, tries=10, writers=None): logging.info('Downloading from url %s name %s and expected md5 %s', url, resource_file_name, expected_md5) tmp_file_name = uniq_string_generator() request = urllib2.Request(url, headers={'User-Agent': make_user_agent()}) req = retry.retry_func(lambda: urllib2.urlopen(request, timeout=30), tries=tries, delay=5, backoff=1.57079) logging.debug('Headers: %s', req.headers.headers) expected_file_size = int(req.headers.get('Content-Length', 0)) real_md5 = hashlib.md5() real_sha1 = hashlib.sha1() with open(tmp_file_name, 'wb') as fp: copy_stream( req.read, fp.write, real_md5.update, real_sha1.update, size_printer(resource_file_name, expected_file_size), *([] if writers is None else writers) ) real_md5 = real_md5.hexdigest() real_file_size = os.path.getsize(tmp_file_name) real_sha1.update('\0') real_sha1.update(str(real_file_size)) real_sha1 = real_sha1.hexdigest() if unpack: tmp_dir = tmp_file_name + '.dir' os.makedirs(tmp_dir) with tarfile.open(tmp_file_name, mode="r|gz") as tar: tar.extractall(tmp_dir) tmp_file_name = os.path.join(tmp_dir, resource_file_name) if expected_md5: real_md5 = md5file(tmp_file_name) logging.info('File size %s (expected %s)', real_file_size, expected_file_size or "UNKNOWN") logging.info('File md5 %s (expected %s)', real_md5, expected_md5) logging.info('File sha1 %s (expected %s)', real_sha1, expected_sha1) if expected_md5 and real_md5 != expected_md5: report_to_snowden({'headers': req.headers.headers, 'expected_md5': expected_md5, 'real_md5': real_md5}) raise BadChecksumFetchError( 'Downloaded {}, but expected {} for {}'.format( real_md5, expected_md5, url, ) ) if expected_sha1 and real_sha1 != expected_sha1: report_to_snowden({'headers': req.headers.headers, 'expected_sha1': expected_sha1, 'real_sha1': real_sha1}) raise BadChecksumFetchError( 'Downloaded {}, but expected {} for {}'.format( real_sha1, expected_sha1, url, ) ) if expected_file_size and expected_file_size != real_file_size: report_to_snowden({'headers': req.headers.headers, 'file_size': real_file_size}) raise IncompleteFetchError( 'Downloaded {}, but expected {} for {}'.format( real_file_size, expected_file_size, url, ) ) return tmp_file_name def chmod(filename, mode): if platform.system().lower() == 'windows': # https://docs.microsoft.com/en-us/windows/win32/fileio/hard-links-and-junctions: # hard to reset read-only attribute for removal if there are multiple hardlinks return stat = os.stat(filename) if stat.st_mode & 0o777 != mode: try: os.chmod(filename, mode) except OSError: import pwd sys.stderr.write( "{} st_mode: {} pwuid: {}\n".format(filename, stat.st_mode, pwd.getpwuid(os.stat(filename).st_uid)) ) raise def make_readonly(filename): chmod(filename, os.stat(filename).st_mode & 0o111 | 0o444) def process(fetched_file, file_name, args, remove=True): assert len(args.rename) <= len(args.outputs), ('too few outputs to rename', args.rename, 'into', args.outputs) fetched_file_is_dir = os.path.isdir(fetched_file) if fetched_file_is_dir and not args.untar_to: raise ResourceIsDirectoryError('Resource may be directory only with untar_to option: ' + fetched_file) # make all read only if fetched_file_is_dir: for root, _, files in os.walk(fetched_file): for filename in files: make_readonly(os.path.join(root, filename)) else: make_readonly(fetched_file) if args.copy_to: hardlink_or_copy(fetched_file, args.copy_to) if not args.outputs: args.outputs = [args.copy_to] if args.rename_to: args.rename.append(fetched_file) if not args.outputs: args.outputs = [args.rename_to] if args.copy_to_dir: hardlink_or_copy(fetched_file, os.path.join(args.copy_to_dir, file_name)) if args.untar_to: ensure_dir(args.untar_to) inputs = set(map(os.path.normpath, args.rename + args.outputs[len(args.rename) :])) if fetched_file_is_dir: for member in inputs: base, name = member.split('/', 1) src = os.path.normpath(os.path.join(fetched_file, name)) dst = os.path.normpath(os.path.join(args.untar_to, member)) hardlink_or_copy(src, dst) else: # Extract only requested files try: with tarfile.open(fetched_file, mode='r:*') as tar: members = [ entry for entry in tar if os.path.normpath(os.path.join(args.untar_to, entry.name)) in inputs ] tar.extractall(args.untar_to, members=members) except tarfile.ReadError as e: logging.exception(e) raise ResourceUnpackingError('File {} cannot be untared'.format(fetched_file)) # Forbid changes to the loaded resource data for root, _, files in os.walk(args.untar_to): for filename in files: make_readonly(os.path.join(root, filename)) for src, dst in zip(args.rename, args.outputs): if src == 'RESOURCE': src = fetched_file if os.path.abspath(src) == os.path.abspath(fetched_file): logging.info('Copying %s to %s', src, dst) hardlink_or_copy(src, dst) else: logging.info('Renaming %s to %s', src, dst) if os.path.exists(dst): raise ResourceUnpackingError("Target file already exists ({} -> {})".format(src, dst)) if remove: rename_or_copy_and_remove(src, dst) else: hardlink_or_copy(src, dst) for path in args.outputs: if not os.path.exists(path): raise OutputNotExistError('Output does not exist: %s' % os.path.abspath(path)) if not os.path.isfile(path): raise OutputIsDirectoryError('Output must be a file, not a directory: %s' % os.path.abspath(path)) if args.executable: chmod(path, os.stat(path).st_mode | 0o111) if os.path.abspath(path) == os.path.abspath(fetched_file): remove = False if remove: if fetched_file_is_dir: shutil.rmtree(fetched_file) else: os.remove(fetched_file)