123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- from __future__ import print_function
- import itertools
- import json
- import logging
- import argparse
- import os
- import random
- import subprocess
- import sys
- import time
- import urllib2
- import uuid
- import fetch_from
- ORIGIN_SUFFIX = '?origin=fetch-from-sandbox'
- MDS_PREFIX = 'http://storage-int.mds.yandex.net/get-sandbox/'
- TEMPORARY_ERROR_CODES = (429, 500, 503, 504)
- def parse_args():
- parser = argparse.ArgumentParser()
- fetch_from.add_common_arguments(parser)
- parser.add_argument('--resource-id', type=int, required=True)
- parser.add_argument('--custom-fetcher')
- parser.add_argument('--resource-file')
- return parser.parse_args()
- class ResourceInfoError(Exception):
- pass
- class UnsupportedProtocolException(Exception):
- pass
- def _sky_path():
- return "/usr/local/bin/sky"
- def _is_skynet_avaliable():
- if not os.path.exists(_sky_path()):
- return False
- try:
- subprocess.check_output([_sky_path(), "--version"])
- return True
- except subprocess.CalledProcessError:
- return False
- except OSError:
- return False
- def download_by_skynet(resource_info, file_name):
- def sky_get(skynet_id, target_dir, timeout=None):
- cmd_args = [_sky_path(), 'get', "-N", "Backbone", "--user", "--wait", "--dir", target_dir, skynet_id]
- if timeout is not None:
- cmd_args += ["--timeout", str(timeout)]
- logging.info('Call skynet with args: %s', cmd_args)
- stdout = subprocess.check_output(cmd_args).strip()
- logging.debug('Skynet call with args %s is finished, result is %s', cmd_args, stdout)
- return stdout
- if not _is_skynet_avaliable():
- raise UnsupportedProtocolException("Skynet is not available")
- skynet_id = resource_info.get("skynet_id")
- if not skynet_id:
- raise ValueError("Resource does not have skynet_id")
- temp_dir = os.path.abspath(fetch_from.uniq_string_generator())
- os.mkdir(temp_dir)
- sky_get(skynet_id, temp_dir)
- return os.path.join(temp_dir, file_name)
- def _urlopen(url, data=None, headers=None):
- n = 10
- tout = 30
- started = time.time()
- reqid = uuid.uuid4()
- request = urllib2.Request(url, data=data, headers=headers or {})
- request.add_header('X-Request-Timeout', str(tout))
- request.add_header('X-Request-Id', str(reqid))
- request.add_header('User-Agent', 'fetch_from_sandbox.py')
- for i in xrange(n):
- retry_after = i
- try:
- request.add_header('X-Request-Duration', str(int(time.time() - started)))
- return urllib2.urlopen(request, timeout=tout).read()
- except urllib2.HTTPError as e:
- logging.warning('failed to fetch URL %s with HTTP code %d: %s', url, e.code, e)
- retry_after = int(e.headers.get('Retry-After', str(retry_after)))
- if e.code not in TEMPORARY_ERROR_CODES:
- raise
- except Exception as e:
- logging.warning('failed to fetch URL %s: %s', url, e)
- if i + 1 == n:
- raise e
- time.sleep(retry_after)
- def _query(url):
- return json.loads(_urlopen(url))
- _SANDBOX_BASE_URL = 'https://sandbox.yandex-team.ru/api/v1.0'
- def get_resource_info(resource_id, touch=False, no_links=False):
- url = ''.join((_SANDBOX_BASE_URL, '/resource/', str(resource_id)))
- headers = {}
- if touch:
- headers.update({'X-Touch-Resource': '1'})
- if no_links:
- headers.update({'X-No-Links': '1'})
- return _query(url)
- def get_resource_http_links(resource_id):
- url = ''.join((_SANDBOX_BASE_URL, '/resource/', str(resource_id), '/data/http'))
- return [r['url'] + ORIGIN_SUFFIX for r in _query(url)]
- def fetch_via_script(script, resource_id):
- return subprocess.check_output([script, str(resource_id)]).rstrip()
- def fetch(resource_id, custom_fetcher):
- try:
- resource_info = get_resource_info(resource_id, touch=True, no_links=True)
- except Exception as e:
- sys.stderr.write(
- "Failed to fetch resource {}: {}\n".format(resource_id, str(e))
- )
- raise
- if resource_info.get('state', 'DELETED') != 'READY':
- raise ResourceInfoError("Resource {} is not READY".format(resource_id))
- logging.info('Resource %s info %s', str(resource_id), json.dumps(resource_info))
- is_multifile = resource_info.get('multifile', False)
- resource_file_name = os.path.basename(resource_info["file_name"])
- expected_md5 = resource_info.get('md5')
- proxy_link = resource_info['http']['proxy'] + ORIGIN_SUFFIX
- if is_multifile:
- proxy_link += '&stream=tgz'
- mds_id = resource_info.get('attributes', {}).get('mds')
- mds_link = MDS_PREFIX + mds_id if mds_id else None
- def get_storage_links():
- storage_links = get_resource_http_links(resource_id)
- random.shuffle(storage_links)
- return storage_links
- skynet = _is_skynet_avaliable()
- if not skynet:
- logging.info("Skynet is not available, will try other protocols")
- def iter_tries():
- if skynet:
- yield lambda: download_by_skynet(resource_info, resource_file_name)
- if custom_fetcher:
- yield lambda: fetch_via_script(custom_fetcher, resource_id)
- # Don't try too hard here: we will get back to proxy later on
- yield lambda: fetch_from.fetch_url(proxy_link, False, resource_file_name, expected_md5, tries=2)
- for x in get_storage_links():
- # Don't spend too much time connecting single host
- yield lambda: fetch_from.fetch_url(x, False, resource_file_name, expected_md5, tries=1)
- if mds_link is not None:
- # Don't try too hard here: we will get back to MDS later on
- yield lambda: fetch_from.fetch_url(mds_link, True, resource_file_name, expected_md5, tries=2)
- yield lambda: fetch_from.fetch_url(proxy_link, False, resource_file_name, expected_md5)
- if mds_link is not None:
- yield lambda: fetch_from.fetch_url(mds_link, True, resource_file_name, expected_md5)
- if resource_info.get('attributes', {}).get('ttl') != 'inf':
- sys.stderr.write('WARNING: resource {} ttl is not "inf".\n'.format(resource_id))
- exc_info = None
- for i, action in enumerate(itertools.islice(iter_tries(), 0, 10)):
- try:
- fetched_file = action()
- break
- except UnsupportedProtocolException:
- pass
- except subprocess.CalledProcessError as e:
- logging.warning('failed to fetch resource %s with subprocess: %s', resource_id, e)
- time.sleep(i)
- except urllib2.HTTPError as e:
- logging.warning('failed to fetch resource %s with HTTP code %d: %s', resource_id, e.code, e)
- if e.code not in TEMPORARY_ERROR_CODES:
- exc_info = exc_info or sys.exc_info()
- time.sleep(i)
- except Exception as e:
- logging.exception(e)
- exc_info = exc_info or sys.exc_info()
- time.sleep(i)
- else:
- if exc_info:
- if sys.version_info[0] == 2:
- raise exc_info[0], exc_info[1], exc_info[2]
- else:
- raise exc_info[1].with_traceback(exc_info[2])
- else:
- raise Exception("No available protocol and/or server to fetch resource")
- return fetched_file, resource_info['file_name']
- def _get_resource_info_from_file(resource_file):
- if resource_file is None or not os.path.exists(resource_file):
- return None
- RESOURCE_INFO_JSON = "resource_info.json"
- RESOURCE_CONTENT_FILE_NAME = "resource"
- resource_dir, resource_file = os.path.split(resource_file)
- if resource_file != RESOURCE_CONTENT_FILE_NAME:
- return None
- resource_json = os.path.join(resource_dir, RESOURCE_INFO_JSON)
- if not os.path.isfile(resource_json):
- return None
- try:
- with open(resource_json, 'r') as j:
- resource_info = json.load(j)
- resource_info['file_name'] # check consistency
- return resource_info
- except:
- logging.debug('Invalid %s in %s', RESOURCE_INFO_JSON, resource_dir)
- return None
- def main(args):
- custom_fetcher = os.environ.get('YA_CUSTOM_FETCHER')
- resource_info = _get_resource_info_from_file(args.resource_file)
- if resource_info:
- fetched_file = args.resource_file
- file_name = resource_info['file_name']
- else:
- # This code should be merged to ya and removed.
- fetched_file, file_name = fetch(args.resource_id, custom_fetcher)
- fetch_from.process(fetched_file, file_name, args, remove=not custom_fetcher and not resource_info)
- if __name__ == '__main__':
- args = parse_args()
- fetch_from.setup_logging(args, os.path.basename(__file__))
- try:
- main(args)
- except Exception as e:
- logging.exception(e)
- print(open(args.abs_log_path).read(), file=sys.stderr)
- sys.stderr.flush()
- import error
- sys.exit(error.ExitCodes.INFRASTRUCTURE_ERROR if fetch_from.is_temporary(e) else 1)
|