fetch_from_sandbox.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. from __future__ import print_function
  2. import itertools
  3. import json
  4. import logging
  5. import argparse
  6. import os
  7. import random
  8. import subprocess
  9. import sys
  10. import time
  11. import urllib2
  12. import uuid
  13. # Explicitly enable local imports
  14. # Don't forget to add imported scripts to inputs of the calling command!
  15. sys.path.append(os.path.dirname(os.path.abspath(__file__)))
  16. import fetch_from
  17. ORIGIN_SUFFIX = '?origin=fetch-from-sandbox'
  18. MDS_PREFIX = 'http://storage-int.mds.yandex.net/get-sandbox/'
  19. TEMPORARY_ERROR_CODES = (429, 500, 503, 504)
  20. def parse_args():
  21. parser = argparse.ArgumentParser()
  22. fetch_from.add_common_arguments(parser)
  23. parser.add_argument('--resource-id', type=int, required=True)
  24. parser.add_argument('--custom-fetcher')
  25. parser.add_argument('--resource-file')
  26. return parser.parse_args()
  27. class ResourceInfoError(Exception):
  28. pass
  29. class UnsupportedProtocolException(Exception):
  30. pass
  31. def _sky_path():
  32. return "/usr/local/bin/sky"
  33. def _is_skynet_avaliable():
  34. if not os.path.exists(_sky_path()):
  35. return False
  36. try:
  37. subprocess.check_output([_sky_path(), "--version"])
  38. return True
  39. except subprocess.CalledProcessError:
  40. return False
  41. except OSError:
  42. return False
  43. def download_by_skynet(resource_info, file_name):
  44. def sky_get(skynet_id, target_dir, timeout=None):
  45. cmd_args = [_sky_path(), 'get', "-N", "Backbone", "--user", "--wait", "--dir", target_dir, skynet_id]
  46. if timeout is not None:
  47. cmd_args += ["--timeout", str(timeout)]
  48. logging.info('Call skynet with args: %s', cmd_args)
  49. stdout = subprocess.check_output(cmd_args).strip()
  50. logging.debug('Skynet call with args %s is finished, result is %s', cmd_args, stdout)
  51. return stdout
  52. if not _is_skynet_avaliable():
  53. raise UnsupportedProtocolException("Skynet is not available")
  54. skynet_id = resource_info.get("skynet_id")
  55. if not skynet_id:
  56. raise ValueError("Resource does not have skynet_id")
  57. temp_dir = os.path.abspath(fetch_from.uniq_string_generator())
  58. os.mkdir(temp_dir)
  59. sky_get(skynet_id, temp_dir)
  60. return os.path.join(temp_dir, file_name)
  61. def _urlopen(url, data=None, headers=None):
  62. n = 10
  63. tout = 30
  64. started = time.time()
  65. reqid = uuid.uuid4()
  66. request = urllib2.Request(url, data=data, headers=headers or {})
  67. request.add_header('X-Request-Timeout', str(tout))
  68. request.add_header('X-Request-Id', str(reqid))
  69. request.add_header('User-Agent', 'fetch_from_sandbox.py')
  70. for i in xrange(n):
  71. retry_after = i
  72. try:
  73. request.add_header('X-Request-Duration', str(int(time.time() - started)))
  74. return urllib2.urlopen(request, timeout=tout).read()
  75. except urllib2.HTTPError as e:
  76. logging.warning('failed to fetch URL %s with HTTP code %d: %s', url, e.code, e)
  77. retry_after = int(e.headers.get('Retry-After', str(retry_after)))
  78. if e.code not in TEMPORARY_ERROR_CODES:
  79. raise
  80. except Exception as e:
  81. logging.warning('failed to fetch URL %s: %s', url, e)
  82. if i + 1 == n:
  83. raise e
  84. time.sleep(retry_after)
  85. def _query(url):
  86. return json.loads(_urlopen(url))
  87. _SANDBOX_BASE_URL = 'https://sandbox.yandex-team.ru/api/v1.0'
  88. def get_resource_info(resource_id, touch=False, no_links=False):
  89. url = ''.join((_SANDBOX_BASE_URL, '/resource/', str(resource_id)))
  90. headers = {}
  91. if touch:
  92. headers.update({'X-Touch-Resource': '1'})
  93. if no_links:
  94. headers.update({'X-No-Links': '1'})
  95. return _query(url)
  96. def get_resource_http_links(resource_id):
  97. url = ''.join((_SANDBOX_BASE_URL, '/resource/', str(resource_id), '/data/http'))
  98. return [r['url'] + ORIGIN_SUFFIX for r in _query(url)]
  99. def fetch_via_script(script, resource_id):
  100. return subprocess.check_output([script, str(resource_id)]).rstrip()
  101. def fetch(resource_id, custom_fetcher):
  102. try:
  103. resource_info = get_resource_info(resource_id, touch=True, no_links=True)
  104. except Exception as e:
  105. sys.stderr.write(
  106. "Failed to fetch resource {}: {}\n".format(resource_id, str(e))
  107. )
  108. raise
  109. if resource_info.get('state', 'DELETED') != 'READY':
  110. raise ResourceInfoError("Resource {} is not READY".format(resource_id))
  111. logging.info('Resource %s info %s', str(resource_id), json.dumps(resource_info))
  112. is_multifile = resource_info.get('multifile', False)
  113. resource_file_name = os.path.basename(resource_info["file_name"])
  114. expected_md5 = resource_info.get('md5')
  115. proxy_link = resource_info['http']['proxy'] + ORIGIN_SUFFIX
  116. if is_multifile:
  117. proxy_link += '&stream=tgz'
  118. mds_id = resource_info.get('attributes', {}).get('mds')
  119. mds_link = MDS_PREFIX + mds_id if mds_id else None
  120. def get_storage_links():
  121. storage_links = get_resource_http_links(resource_id)
  122. random.shuffle(storage_links)
  123. return storage_links
  124. skynet = _is_skynet_avaliable()
  125. if not skynet:
  126. logging.info("Skynet is not available, will try other protocols")
  127. def iter_tries():
  128. if skynet:
  129. yield lambda: download_by_skynet(resource_info, resource_file_name)
  130. if custom_fetcher:
  131. yield lambda: fetch_via_script(custom_fetcher, resource_id)
  132. # Don't try too hard here: we will get back to proxy later on
  133. yield lambda: fetch_from.fetch_url(proxy_link, False, resource_file_name, expected_md5, tries=2)
  134. for x in get_storage_links():
  135. # Don't spend too much time connecting single host
  136. yield lambda: fetch_from.fetch_url(x, False, resource_file_name, expected_md5, tries=1)
  137. if mds_link is not None:
  138. # Don't try too hard here: we will get back to MDS later on
  139. yield lambda: fetch_from.fetch_url(mds_link, True, resource_file_name, expected_md5, tries=2)
  140. yield lambda: fetch_from.fetch_url(proxy_link, False, resource_file_name, expected_md5)
  141. if mds_link is not None:
  142. yield lambda: fetch_from.fetch_url(mds_link, True, resource_file_name, expected_md5)
  143. if resource_info.get('attributes', {}).get('ttl') != 'inf':
  144. sys.stderr.write('WARNING: resource {} ttl is not "inf".\n'.format(resource_id))
  145. exc_info = None
  146. for i, action in enumerate(itertools.islice(iter_tries(), 0, 10)):
  147. try:
  148. fetched_file = action()
  149. break
  150. except UnsupportedProtocolException:
  151. pass
  152. except subprocess.CalledProcessError as e:
  153. logging.warning('failed to fetch resource %s with subprocess: %s', resource_id, e)
  154. time.sleep(i)
  155. except urllib2.HTTPError as e:
  156. logging.warning('failed to fetch resource %s with HTTP code %d: %s', resource_id, e.code, e)
  157. if e.code not in TEMPORARY_ERROR_CODES:
  158. exc_info = exc_info or sys.exc_info()
  159. time.sleep(i)
  160. except Exception as e:
  161. logging.exception(e)
  162. exc_info = exc_info or sys.exc_info()
  163. time.sleep(i)
  164. else:
  165. if exc_info:
  166. if sys.version_info[0] == 2:
  167. raise exc_info[0], exc_info[1], exc_info[2]
  168. else:
  169. raise exc_info[1].with_traceback(exc_info[2])
  170. else:
  171. raise Exception("No available protocol and/or server to fetch resource")
  172. return fetched_file, resource_info['file_name']
  173. def _get_resource_info_from_file(resource_file):
  174. if resource_file is None or not os.path.exists(resource_file):
  175. return None
  176. RESOURCE_INFO_JSON = "resource_info.json"
  177. RESOURCE_CONTENT_FILE_NAME = "resource"
  178. resource_dir, resource_file = os.path.split(resource_file)
  179. if resource_file != RESOURCE_CONTENT_FILE_NAME:
  180. return None
  181. resource_json = os.path.join(resource_dir, RESOURCE_INFO_JSON)
  182. if not os.path.isfile(resource_json):
  183. return None
  184. try:
  185. with open(resource_json, 'r') as j:
  186. resource_info = json.load(j)
  187. resource_info['file_name'] # check consistency
  188. return resource_info
  189. except:
  190. logging.debug('Invalid %s in %s', RESOURCE_INFO_JSON, resource_dir)
  191. return None
  192. def main(args):
  193. custom_fetcher = os.environ.get('YA_CUSTOM_FETCHER')
  194. resource_info = _get_resource_info_from_file(args.resource_file)
  195. if resource_info:
  196. fetched_file = args.resource_file
  197. file_name = resource_info['file_name']
  198. else:
  199. # This code should be merged to ya and removed.
  200. fetched_file, file_name = fetch(args.resource_id, custom_fetcher)
  201. fetch_from.process(fetched_file, file_name, args, remove=not custom_fetcher and not resource_info)
  202. if __name__ == '__main__':
  203. args = parse_args()
  204. fetch_from.setup_logging(args, os.path.basename(__file__))
  205. try:
  206. main(args)
  207. except Exception as e:
  208. logging.exception(e)
  209. print(open(args.abs_log_path).read(), file=sys.stderr)
  210. sys.stderr.flush()
  211. import error
  212. sys.exit(error.ExitCodes.INFRASTRUCTURE_ERROR if fetch_from.is_temporary(e) else 1)