|
@@ -15,7 +15,7 @@ import fetch_from
|
|
|
|
|
|
ORIGIN_SUFFIX = '?origin=fetch-from-sandbox'
|
|
|
MDS_PREFIX = 'http://storage-int.mds.yandex.net/get-sandbox/'
|
|
|
-TEMPORARY_ERROR_CODES = (429, 500, 503, 504)
|
|
|
+TEMPORARY_ERROR_CODES = (429, 500, 503, 504)
|
|
|
|
|
|
|
|
|
def parse_args():
|
|
@@ -23,7 +23,7 @@ def parse_args():
|
|
|
fetch_from.add_common_arguments(parser)
|
|
|
parser.add_argument('--resource-id', type=int, required=True)
|
|
|
parser.add_argument('--custom-fetcher')
|
|
|
- parser.add_argument('--resource-file')
|
|
|
+ parser.add_argument('--resource-file')
|
|
|
return parser.parse_args()
|
|
|
|
|
|
|
|
@@ -35,33 +35,33 @@ class UnsupportedProtocolException(Exception):
|
|
|
pass
|
|
|
|
|
|
|
|
|
-def _sky_path():
|
|
|
- return "/usr/local/bin/sky"
|
|
|
+def _sky_path():
|
|
|
+ return "/usr/local/bin/sky"
|
|
|
|
|
|
|
|
|
-def _is_skynet_avaliable():
|
|
|
- if not os.path.exists(_sky_path()):
|
|
|
- return False
|
|
|
- try:
|
|
|
- subprocess.check_output([_sky_path(), "--version"])
|
|
|
- return True
|
|
|
- except subprocess.CalledProcessError:
|
|
|
- return False
|
|
|
- except OSError:
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def download_by_skynet(resource_info, file_name):
|
|
|
+def _is_skynet_avaliable():
|
|
|
+ if not os.path.exists(_sky_path()):
|
|
|
+ return False
|
|
|
+ try:
|
|
|
+ subprocess.check_output([_sky_path(), "--version"])
|
|
|
+ return True
|
|
|
+ except subprocess.CalledProcessError:
|
|
|
+ return False
|
|
|
+ except OSError:
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def download_by_skynet(resource_info, file_name):
|
|
|
def sky_get(skynet_id, target_dir, timeout=None):
|
|
|
cmd_args = [_sky_path(), 'get', "-N", "Backbone", "--user", "--wait", "--dir", target_dir, skynet_id]
|
|
|
if timeout is not None:
|
|
|
cmd_args += ["--timeout", str(timeout)]
|
|
|
- logging.info('Call skynet with args: %s', cmd_args)
|
|
|
+ logging.info('Call skynet with args: %s', cmd_args)
|
|
|
stdout = subprocess.check_output(cmd_args).strip()
|
|
|
logging.debug('Skynet call with args %s is finished, result is %s', cmd_args, stdout)
|
|
|
return stdout
|
|
|
|
|
|
- if not _is_skynet_avaliable():
|
|
|
+ if not _is_skynet_avaliable():
|
|
|
raise UnsupportedProtocolException("Skynet is not available")
|
|
|
|
|
|
skynet_id = resource_info.get("skynet_id")
|
|
@@ -94,7 +94,7 @@ def _urlopen(url, data=None, headers=None):
|
|
|
logging.warning('failed to fetch URL %s with HTTP code %d: %s', url, e.code, e)
|
|
|
retry_after = int(e.headers.get('Retry-After', str(retry_after)))
|
|
|
|
|
|
- if e.code not in TEMPORARY_ERROR_CODES:
|
|
|
+ if e.code not in TEMPORARY_ERROR_CODES:
|
|
|
raise
|
|
|
|
|
|
except Exception as e:
|
|
@@ -141,9 +141,9 @@ def fetch(resource_id, custom_fetcher):
|
|
|
)
|
|
|
raise
|
|
|
|
|
|
- if resource_info.get('state', 'DELETED') != 'READY':
|
|
|
- raise ResourceInfoError("Resource {} is not READY".format(resource_id))
|
|
|
-
|
|
|
+ if resource_info.get('state', 'DELETED') != 'READY':
|
|
|
+ raise ResourceInfoError("Resource {} is not READY".format(resource_id))
|
|
|
+
|
|
|
logging.info('Resource %s info %s', str(resource_id), json.dumps(resource_info))
|
|
|
|
|
|
resource_file_name = os.path.basename(resource_info["file_name"])
|
|
@@ -159,15 +159,15 @@ def fetch(resource_id, custom_fetcher):
|
|
|
random.shuffle(storage_links)
|
|
|
return storage_links
|
|
|
|
|
|
- skynet = _is_skynet_avaliable()
|
|
|
-
|
|
|
- if not skynet:
|
|
|
- logging.info("Skynet is not available, will try other protocols")
|
|
|
-
|
|
|
+ skynet = _is_skynet_avaliable()
|
|
|
+
|
|
|
+ if not skynet:
|
|
|
+ logging.info("Skynet is not available, will try other protocols")
|
|
|
+
|
|
|
def iter_tries():
|
|
|
- if skynet:
|
|
|
- yield lambda: download_by_skynet(resource_info, resource_file_name)
|
|
|
-
|
|
|
+ if skynet:
|
|
|
+ yield lambda: download_by_skynet(resource_info, resource_file_name)
|
|
|
+
|
|
|
if custom_fetcher:
|
|
|
yield lambda: fetch_via_script(custom_fetcher, resource_id)
|
|
|
|
|
@@ -191,78 +191,78 @@ def fetch(resource_id, custom_fetcher):
|
|
|
try:
|
|
|
fetched_file = action()
|
|
|
break
|
|
|
- except UnsupportedProtocolException:
|
|
|
- pass
|
|
|
- except subprocess.CalledProcessError as e:
|
|
|
+ except UnsupportedProtocolException:
|
|
|
+ pass
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
logging.warning('failed to fetch resource %s with subprocess: %s', resource_id, e)
|
|
|
- time.sleep(i)
|
|
|
- except urllib2.HTTPError as e:
|
|
|
+ time.sleep(i)
|
|
|
+ except urllib2.HTTPError as e:
|
|
|
logging.warning('failed to fetch resource %s with HTTP code %d: %s', resource_id, e.code, e)
|
|
|
- if e.code not in TEMPORARY_ERROR_CODES:
|
|
|
- exc_info = exc_info or sys.exc_info()
|
|
|
- time.sleep(i)
|
|
|
+ if e.code not in TEMPORARY_ERROR_CODES:
|
|
|
+ exc_info = exc_info or sys.exc_info()
|
|
|
+ time.sleep(i)
|
|
|
except Exception as e:
|
|
|
logging.exception(e)
|
|
|
exc_info = exc_info or sys.exc_info()
|
|
|
time.sleep(i)
|
|
|
else:
|
|
|
- if exc_info:
|
|
|
- raise exc_info[0], exc_info[1], exc_info[2]
|
|
|
- else:
|
|
|
- raise Exception("No available protocol and/or server to fetch resource")
|
|
|
+ if exc_info:
|
|
|
+ raise exc_info[0], exc_info[1], exc_info[2]
|
|
|
+ else:
|
|
|
+ raise Exception("No available protocol and/or server to fetch resource")
|
|
|
|
|
|
return fetched_file, resource_info['file_name']
|
|
|
|
|
|
|
|
|
-def _get_resource_info_from_file(resource_file):
|
|
|
- if resource_file is None or not os.path.exists(resource_file):
|
|
|
- return None
|
|
|
-
|
|
|
- RESOURCE_INFO_JSON = "resource_info.json"
|
|
|
- RESOURCE_CONTENT_FILE_NAME = "resource"
|
|
|
-
|
|
|
- resource_dir, resource_file = os.path.split(resource_file)
|
|
|
- if resource_file != RESOURCE_CONTENT_FILE_NAME:
|
|
|
- return None
|
|
|
-
|
|
|
- resource_json = os.path.join(resource_dir, RESOURCE_INFO_JSON)
|
|
|
- if not os.path.isfile(resource_json):
|
|
|
- return None
|
|
|
-
|
|
|
- try:
|
|
|
- with open(resource_json, 'r') as j:
|
|
|
- resource_info = json.load(j)
|
|
|
- resource_info['file_name'] # check consistency
|
|
|
- return resource_info
|
|
|
- except:
|
|
|
- logging.debug('Invalid %s in %s', RESOURCE_INFO_JSON, resource_dir)
|
|
|
-
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
+def _get_resource_info_from_file(resource_file):
|
|
|
+ if resource_file is None or not os.path.exists(resource_file):
|
|
|
+ return None
|
|
|
+
|
|
|
+ RESOURCE_INFO_JSON = "resource_info.json"
|
|
|
+ RESOURCE_CONTENT_FILE_NAME = "resource"
|
|
|
+
|
|
|
+ resource_dir, resource_file = os.path.split(resource_file)
|
|
|
+ if resource_file != RESOURCE_CONTENT_FILE_NAME:
|
|
|
+ return None
|
|
|
+
|
|
|
+ resource_json = os.path.join(resource_dir, RESOURCE_INFO_JSON)
|
|
|
+ if not os.path.isfile(resource_json):
|
|
|
+ return None
|
|
|
+
|
|
|
+ try:
|
|
|
+ with open(resource_json, 'r') as j:
|
|
|
+ resource_info = json.load(j)
|
|
|
+ resource_info['file_name'] # check consistency
|
|
|
+ return resource_info
|
|
|
+ except:
|
|
|
+ logging.debug('Invalid %s in %s', RESOURCE_INFO_JSON, resource_dir)
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
def main(args):
|
|
|
custom_fetcher = os.environ.get('YA_CUSTOM_FETCHER')
|
|
|
|
|
|
- resource_info = _get_resource_info_from_file(args.resource_file)
|
|
|
- if resource_info:
|
|
|
- fetched_file = args.resource_file
|
|
|
- file_name = resource_info['file_name']
|
|
|
- else:
|
|
|
- # This code should be merged to ya and removed.
|
|
|
- fetched_file, file_name = fetch(args.resource_id, custom_fetcher)
|
|
|
+ resource_info = _get_resource_info_from_file(args.resource_file)
|
|
|
+ if resource_info:
|
|
|
+ fetched_file = args.resource_file
|
|
|
+ file_name = resource_info['file_name']
|
|
|
+ else:
|
|
|
+ # This code should be merged to ya and removed.
|
|
|
+ fetched_file, file_name = fetch(args.resource_id, custom_fetcher)
|
|
|
|
|
|
- fetch_from.process(fetched_file, file_name, args, remove=not custom_fetcher and not resource_info)
|
|
|
+ fetch_from.process(fetched_file, file_name, args, remove=not custom_fetcher and not resource_info)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
args = parse_args()
|
|
|
- fetch_from.setup_logging(args, os.path.basename(__file__))
|
|
|
+ fetch_from.setup_logging(args, os.path.basename(__file__))
|
|
|
|
|
|
try:
|
|
|
main(args)
|
|
|
except Exception as e:
|
|
|
logging.exception(e)
|
|
|
- print >>sys.stderr, open(args.abs_log_path).read()
|
|
|
+ print >>sys.stderr, open(args.abs_log_path).read()
|
|
|
sys.stderr.flush()
|
|
|
|
|
|
import error
|