from __future__ import print_function import datetime as dt import errno import hashlib import json import logging import os import platform import random import shutil import socket import string import sys import tarfile try: # Python 2 import urllib2 as urllib_request from urllib2 import HTTPError, URLError except (ImportError, ModuleNotFoundError): # Python 3 import urllib.request as urllib_request from urllib.error import HTTPError, URLError import retry def make_user_agent(): return 'fetch_from: {host}'.format(host=socket.gethostname()) def add_common_arguments(parser): parser.add_argument('--copy-to') # used by jbuild in fetch_resource parser.add_argument('--rename-to') # used by test_node in inject_mds_resource_to_graph parser.add_argument('--copy-to-dir') parser.add_argument('--untar-to') parser.add_argument( '--rename', action='append', default=[], metavar='FILE', help='rename FILE to the corresponding output' ) parser.add_argument('--executable', action='store_true', help='make outputs executable') parser.add_argument('--log-path') parser.add_argument( '-v', '--verbose', action='store_true', default=os.environ.get('YA_VERBOSE_FETCHER'), help='increase stderr verbosity', ) parser.add_argument('outputs', nargs='*', default=[]) def ensure_dir(path): if not (path == '' or os.path.isdir(path)): os.makedirs(path) # Reference code: library/python/fs/__init__.py def hardlink_or_copy(src, dst): ensure_dir(os.path.dirname(dst)) if os.name == 'nt': shutil.copy(src, dst) else: try: os.link(src, dst) except OSError as e: if e.errno == errno.EEXIST: return elif e.errno in (errno.EXDEV, errno.EMLINK, errno.EINVAL, errno.EACCES): sys.stderr.write( "Can't make hardlink (errno={}) - fallback to copy: {} -> {}\n".format(e.errno, src, dst) ) shutil.copy(src, dst) else: sys.stderr.write("src: {} dst: {}\n".format(src, dst)) raise def rename_or_copy_and_remove(src, dst): ensure_dir(os.path.dirname(dst)) try: os.rename(src, dst) except OSError: shutil.copy(src, dst) os.remove(src) class BadChecksumFetchError(Exception): pass class IncompleteFetchError(Exception): pass class ResourceUnpackingError(Exception): pass class ResourceIsDirectoryError(Exception): pass class OutputIsDirectoryError(Exception): pass class OutputNotExistError(Exception): pass def setup_logging(args, base_name): def makedirs(path): try: os.makedirs(path) except OSError: pass if args.log_path: log_file_name = args.log_path else: log_file_name = base_name + ".log" args.abs_log_path = os.path.abspath(log_file_name) makedirs(os.path.dirname(args.abs_log_path)) logging.basicConfig(filename=args.abs_log_path, level=logging.DEBUG) if args.verbose: logging.getLogger().addHandler(logging.StreamHandler(sys.stderr)) def is_temporary(e): def is_broken(e): return isinstance(e, HTTPError) and e.code in (410, 404) if is_broken(e): return False if isinstance(e, (BadChecksumFetchError, IncompleteFetchError, URLError, socket.error)): return True import error return error.is_temporary_error(e) def uniq_string_generator(size=6, chars=string.ascii_lowercase + string.digits): return ''.join(random.choice(chars) for _ in range(size)) def report_to_snowden(value): def inner(): body = { 'namespace': 'ygg', 'key': 'fetch-from-sandbox', 'value': json.dumps(value), } urllib_request.urlopen( 'https://back-snowden.qloud.yandex-team.ru/report/add', json.dumps( [ body, ] ), timeout=5, ) try: inner() except Exception as e: logging.warning('report_to_snowden failed: %s', e) def copy_stream(read, *writers, **kwargs): chunk_size = kwargs.get('size', 1024 * 1024) while True: data = read(chunk_size) if not data: break for write in writers: write(data) def md5file(fname): res = hashlib.md5() with open(fname, 'rb') as f: copy_stream(f.read, res.update) return res.hexdigest() def git_like_hash_with_size(filepath): """ Calculate git like hash for path """ sha = hashlib.sha1() file_size = 0 with open(filepath, 'rb') as f: while True: block = f.read(2**16) if not block: break file_size += len(block) sha.update(block) sha.update(b'\0') sha.update(str(file_size).encode('utf-8')) return sha.hexdigest(), file_size def size_printer(display_name, size): sz = [0] last_stamp = [dt.datetime.now()] def printer(chunk): sz[0] += len(chunk) now = dt.datetime.now() if last_stamp[0] + dt.timedelta(seconds=10) < now: if size: print("##status##{} - [[imp]]{:.1f}%[[rst]]".format( display_name, 100.0 * sz[0] / size if size else 0 ), file=sys.stderr) last_stamp[0] = now return printer def fetch_url(url, unpack, resource_file_name, expected_md5=None, expected_sha1=None, tries=10, writers=None): logging.info('Downloading from url %s name %s and expected md5 %s', url, resource_file_name, expected_md5) tmp_file_name = uniq_string_generator() request = urllib_request.Request(url, headers={'User-Agent': make_user_agent()}) req = retry.retry_func(lambda: urllib_request.urlopen(request, timeout=30), tries=tries, delay=5, backoff=1.57079) logging.debug('Headers: %s', req.headers) expected_file_size = int(req.headers.get('Content-Length', 0)) real_md5 = hashlib.md5() real_sha1 = hashlib.sha1() with open(tmp_file_name, 'wb') as fp: copy_stream( req.read, fp.write, real_md5.update, real_sha1.update, size_printer(resource_file_name, expected_file_size), *([] if writers is None else writers) ) real_md5 = real_md5.hexdigest() real_file_size = os.path.getsize(tmp_file_name) real_sha1.update(b'\0') real_sha1.update(str(real_file_size).encode('utf-8')) real_sha1 = real_sha1.hexdigest() if unpack: tmp_dir = tmp_file_name + '.dir' os.makedirs(tmp_dir) with tarfile.open(tmp_file_name, mode="r|gz") as tar: tar.extractall(tmp_dir) tmp_file_name = os.path.join(tmp_dir, resource_file_name) if expected_md5: real_md5 = md5file(tmp_file_name) logging.info('File size %s (expected %s)', real_file_size, expected_file_size or "UNKNOWN") logging.info('File md5 %s (expected %s)', real_md5, expected_md5) logging.info('File sha1 %s (expected %s)', real_sha1, expected_sha1) if expected_md5 and real_md5 != expected_md5: report_to_snowden({'headers': req.headers.headers, 'expected_md5': expected_md5, 'real_md5': real_md5}) raise BadChecksumFetchError( 'Downloaded {}, but expected {} for {}'.format( real_md5, expected_md5, url, ) ) if expected_sha1 and real_sha1 != expected_sha1: report_to_snowden({'headers': req.headers.headers, 'expected_sha1': expected_sha1, 'real_sha1': real_sha1}) raise BadChecksumFetchError( 'Downloaded {}, but expected {} for {}'.format( real_sha1, expected_sha1, url, ) ) if expected_file_size and expected_file_size != real_file_size: report_to_snowden({'headers': req.headers.headers, 'file_size': real_file_size}) raise IncompleteFetchError( 'Downloaded {}, but expected {} for {}'.format( real_file_size, expected_file_size, url, ) ) return tmp_file_name def chmod(filename, mode): if platform.system().lower() == 'windows': # https://docs.microsoft.com/en-us/windows/win32/fileio/hard-links-and-junctions: # hard to reset read-only attribute for removal if there are multiple hardlinks return stat = os.stat(filename) if stat.st_mode & 0o777 != mode: try: os.chmod(filename, mode) except OSError: import pwd sys.stderr.write( "{} st_mode: {} pwuid: {}\n".format(filename, stat.st_mode, pwd.getpwuid(os.stat(filename).st_uid)) ) raise def make_readonly(filename): chmod(filename, os.stat(filename).st_mode & 0o111 | 0o444) def process(fetched_file, file_name, args, remove=True): assert len(args.rename) <= len(args.outputs), ('too few outputs to rename', args.rename, 'into', args.outputs) fetched_file_is_dir = os.path.isdir(fetched_file) if fetched_file_is_dir and not args.untar_to: raise ResourceIsDirectoryError('Resource may be directory only with untar_to option: ' + fetched_file) # make all read only if fetched_file_is_dir: for root, _, files in os.walk(fetched_file): for filename in files: make_readonly(os.path.join(root, filename)) else: make_readonly(fetched_file) if args.copy_to: hardlink_or_copy(fetched_file, args.copy_to) if not args.outputs: args.outputs = [args.copy_to] if args.rename_to: args.rename.append(fetched_file) if not args.outputs: args.outputs = [args.rename_to] if args.copy_to_dir: hardlink_or_copy(fetched_file, os.path.join(args.copy_to_dir, file_name)) if args.untar_to: ensure_dir(args.untar_to) inputs = set(map(os.path.normpath, args.rename + args.outputs[len(args.rename) :])) if fetched_file_is_dir: for member in inputs: base, name = member.split('/', 1) src = os.path.normpath(os.path.join(fetched_file, name)) dst = os.path.normpath(os.path.join(args.untar_to, member)) hardlink_or_copy(src, dst) else: # Extract only requested files try: with tarfile.open(fetched_file, mode='r:*') as tar: members = [ entry for entry in tar if os.path.normpath(os.path.join(args.untar_to, entry.name)) in inputs ] tar.extractall(args.untar_to, members=members) except tarfile.ReadError as e: logging.exception(e) raise ResourceUnpackingError('File {} cannot be untared'.format(fetched_file)) # Forbid changes to the loaded resource data for root, _, files in os.walk(args.untar_to): for filename in files: make_readonly(os.path.join(root, filename)) for src, dst in zip(args.rename, args.outputs): if src == 'RESOURCE': src = fetched_file if os.path.abspath(src) == os.path.abspath(fetched_file): logging.info('Copying %s to %s', src, dst) hardlink_or_copy(src, dst) else: logging.info('Renaming %s to %s', src, dst) if os.path.exists(dst): raise ResourceUnpackingError("Target file already exists ({} -> {})".format(src, dst)) if remove: rename_or_copy_and_remove(src, dst) else: hardlink_or_copy(src, dst) for path in args.outputs: if not os.path.exists(path): raise OutputNotExistError('Output does not exist: %s' % os.path.abspath(path)) if not os.path.isfile(path): raise OutputIsDirectoryError('Output must be a file, not a directory: %s' % os.path.abspath(path)) if args.executable: chmod(path, os.stat(path).st_mode | 0o111) if os.path.abspath(path) == os.path.abspath(fetched_file): remove = False if remove: if fetched_file_is_dir: shutil.rmtree(fetched_file) else: os.remove(fetched_file)