fetch_from_sandbox.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. from __future__ import print_function
  2. import itertools
  3. import json
  4. import logging
  5. import argparse
  6. import os
  7. import random
  8. import subprocess
  9. import sys
  10. import time
  11. import urllib2
  12. import uuid
  13. import fetch_from
  14. ORIGIN_SUFFIX = '?origin=fetch-from-sandbox'
  15. MDS_PREFIX = 'http://storage-int.mds.yandex.net/get-sandbox/'
  16. TEMPORARY_ERROR_CODES = (429, 500, 503, 504)
  17. def parse_args():
  18. parser = argparse.ArgumentParser()
  19. fetch_from.add_common_arguments(parser)
  20. parser.add_argument('--resource-id', type=int, required=True)
  21. parser.add_argument('--custom-fetcher')
  22. parser.add_argument('--resource-file')
  23. return parser.parse_args()
  24. class ResourceInfoError(Exception):
  25. pass
  26. class UnsupportedProtocolException(Exception):
  27. pass
  28. def _sky_path():
  29. return "/usr/local/bin/sky"
  30. def _is_skynet_avaliable():
  31. if not os.path.exists(_sky_path()):
  32. return False
  33. try:
  34. subprocess.check_output([_sky_path(), "--version"])
  35. return True
  36. except subprocess.CalledProcessError:
  37. return False
  38. except OSError:
  39. return False
  40. def download_by_skynet(resource_info, file_name):
  41. def sky_get(skynet_id, target_dir, timeout=None):
  42. cmd_args = [_sky_path(), 'get', "-N", "Backbone", "--user", "--wait", "--dir", target_dir, skynet_id]
  43. if timeout is not None:
  44. cmd_args += ["--timeout", str(timeout)]
  45. logging.info('Call skynet with args: %s', cmd_args)
  46. stdout = subprocess.check_output(cmd_args).strip()
  47. logging.debug('Skynet call with args %s is finished, result is %s', cmd_args, stdout)
  48. return stdout
  49. if not _is_skynet_avaliable():
  50. raise UnsupportedProtocolException("Skynet is not available")
  51. skynet_id = resource_info.get("skynet_id")
  52. if not skynet_id:
  53. raise ValueError("Resource does not have skynet_id")
  54. temp_dir = os.path.abspath(fetch_from.uniq_string_generator())
  55. os.mkdir(temp_dir)
  56. sky_get(skynet_id, temp_dir)
  57. return os.path.join(temp_dir, file_name)
  58. def _urlopen(url, data=None, headers=None):
  59. n = 10
  60. tout = 30
  61. started = time.time()
  62. reqid = uuid.uuid4()
  63. request = urllib2.Request(url, data=data, headers=headers or {})
  64. request.add_header('X-Request-Timeout', str(tout))
  65. request.add_header('X-Request-Id', str(reqid))
  66. request.add_header('User-Agent', 'fetch_from_sandbox.py')
  67. for i in xrange(n):
  68. retry_after = i
  69. try:
  70. request.add_header('X-Request-Duration', str(int(time.time() - started)))
  71. return urllib2.urlopen(request, timeout=tout).read()
  72. except urllib2.HTTPError as e:
  73. logging.warning('failed to fetch URL %s with HTTP code %d: %s', url, e.code, e)
  74. retry_after = int(e.headers.get('Retry-After', str(retry_after)))
  75. if e.code not in TEMPORARY_ERROR_CODES:
  76. raise
  77. except Exception as e:
  78. logging.warning('failed to fetch URL %s: %s', url, e)
  79. if i + 1 == n:
  80. raise e
  81. time.sleep(retry_after)
  82. def _query(url):
  83. return json.loads(_urlopen(url))
  84. _SANDBOX_BASE_URL = 'https://sandbox.yandex-team.ru/api/v1.0'
  85. def get_resource_info(resource_id, touch=False, no_links=False):
  86. url = ''.join((_SANDBOX_BASE_URL, '/resource/', str(resource_id)))
  87. headers = {}
  88. if touch:
  89. headers.update({'X-Touch-Resource': '1'})
  90. if no_links:
  91. headers.update({'X-No-Links': '1'})
  92. return _query(url)
  93. def get_resource_http_links(resource_id):
  94. url = ''.join((_SANDBOX_BASE_URL, '/resource/', str(resource_id), '/data/http'))
  95. return [r['url'] + ORIGIN_SUFFIX for r in _query(url)]
  96. def fetch_via_script(script, resource_id):
  97. return subprocess.check_output([script, str(resource_id)]).rstrip()
  98. def fetch(resource_id, custom_fetcher):
  99. try:
  100. resource_info = get_resource_info(resource_id, touch=True, no_links=True)
  101. except Exception as e:
  102. sys.stderr.write(
  103. "Failed to fetch resource {}: {}\n".format(resource_id, str(e))
  104. )
  105. raise
  106. if resource_info.get('state', 'DELETED') != 'READY':
  107. raise ResourceInfoError("Resource {} is not READY".format(resource_id))
  108. logging.info('Resource %s info %s', str(resource_id), json.dumps(resource_info))
  109. is_multifile = resource_info.get('multifile', False)
  110. resource_file_name = os.path.basename(resource_info["file_name"])
  111. expected_md5 = resource_info.get('md5')
  112. proxy_link = resource_info['http']['proxy'] + ORIGIN_SUFFIX
  113. if is_multifile:
  114. proxy_link += '&stream=tgz'
  115. mds_id = resource_info.get('attributes', {}).get('mds')
  116. mds_link = MDS_PREFIX + mds_id if mds_id else None
  117. def get_storage_links():
  118. storage_links = get_resource_http_links(resource_id)
  119. random.shuffle(storage_links)
  120. return storage_links
  121. skynet = _is_skynet_avaliable()
  122. if not skynet:
  123. logging.info("Skynet is not available, will try other protocols")
  124. def iter_tries():
  125. if skynet:
  126. yield lambda: download_by_skynet(resource_info, resource_file_name)
  127. if custom_fetcher:
  128. yield lambda: fetch_via_script(custom_fetcher, resource_id)
  129. # Don't try too hard here: we will get back to proxy later on
  130. yield lambda: fetch_from.fetch_url(proxy_link, False, resource_file_name, expected_md5, tries=2)
  131. for x in get_storage_links():
  132. # Don't spend too much time connecting single host
  133. yield lambda: fetch_from.fetch_url(x, False, resource_file_name, expected_md5, tries=1)
  134. if mds_link is not None:
  135. # Don't try too hard here: we will get back to MDS later on
  136. yield lambda: fetch_from.fetch_url(mds_link, True, resource_file_name, expected_md5, tries=2)
  137. yield lambda: fetch_from.fetch_url(proxy_link, False, resource_file_name, expected_md5)
  138. if mds_link is not None:
  139. yield lambda: fetch_from.fetch_url(mds_link, True, resource_file_name, expected_md5)
  140. if resource_info.get('attributes', {}).get('ttl') != 'inf':
  141. sys.stderr.write('WARNING: resource {} ttl is not "inf".\n'.format(resource_id))
  142. exc_info = None
  143. for i, action in enumerate(itertools.islice(iter_tries(), 0, 10)):
  144. try:
  145. fetched_file = action()
  146. break
  147. except UnsupportedProtocolException:
  148. pass
  149. except subprocess.CalledProcessError as e:
  150. logging.warning('failed to fetch resource %s with subprocess: %s', resource_id, e)
  151. time.sleep(i)
  152. except urllib2.HTTPError as e:
  153. logging.warning('failed to fetch resource %s with HTTP code %d: %s', resource_id, e.code, e)
  154. if e.code not in TEMPORARY_ERROR_CODES:
  155. exc_info = exc_info or sys.exc_info()
  156. time.sleep(i)
  157. except Exception as e:
  158. logging.exception(e)
  159. exc_info = exc_info or sys.exc_info()
  160. time.sleep(i)
  161. else:
  162. if exc_info:
  163. if sys.version_info[0] == 2:
  164. raise exc_info[0], exc_info[1], exc_info[2]
  165. else:
  166. raise exc_info[1].with_traceback(exc_info[2])
  167. else:
  168. raise Exception("No available protocol and/or server to fetch resource")
  169. return fetched_file, resource_info['file_name']
  170. def _get_resource_info_from_file(resource_file):
  171. if resource_file is None or not os.path.exists(resource_file):
  172. return None
  173. RESOURCE_INFO_JSON = "resource_info.json"
  174. RESOURCE_CONTENT_FILE_NAME = "resource"
  175. resource_dir, resource_file = os.path.split(resource_file)
  176. if resource_file != RESOURCE_CONTENT_FILE_NAME:
  177. return None
  178. resource_json = os.path.join(resource_dir, RESOURCE_INFO_JSON)
  179. if not os.path.isfile(resource_json):
  180. return None
  181. try:
  182. with open(resource_json, 'r') as j:
  183. resource_info = json.load(j)
  184. resource_info['file_name'] # check consistency
  185. return resource_info
  186. except:
  187. logging.debug('Invalid %s in %s', RESOURCE_INFO_JSON, resource_dir)
  188. return None
  189. def main(args):
  190. custom_fetcher = os.environ.get('YA_CUSTOM_FETCHER')
  191. resource_info = _get_resource_info_from_file(args.resource_file)
  192. if resource_info:
  193. fetched_file = args.resource_file
  194. file_name = resource_info['file_name']
  195. else:
  196. # This code should be merged to ya and removed.
  197. fetched_file, file_name = fetch(args.resource_id, custom_fetcher)
  198. fetch_from.process(fetched_file, file_name, args, remove=not custom_fetcher and not resource_info)
  199. if __name__ == '__main__':
  200. args = parse_args()
  201. fetch_from.setup_logging(args, os.path.basename(__file__))
  202. try:
  203. main(args)
  204. except Exception as e:
  205. logging.exception(e)
  206. print(open(args.abs_log_path).read(), file=sys.stderr)
  207. sys.stderr.flush()
  208. import error
  209. sys.exit(error.ExitCodes.INFRASTRUCTURE_ERROR if fetch_from.is_temporary(e) else 1)