fetch_from_sandbox.py 8.6 KB


  1. import itertools
  2. import json
  3. import logging
  4. import argparse
  5. import os
  6. import random
  7. import subprocess
  8. import sys
  9. import time
  10. import urllib2
  11. import uuid
  12. import fetch_from
  13. ORIGIN_SUFFIX = '?origin=fetch-from-sandbox'
  14. MDS_PREFIX = 'http://storage-int.mds.yandex.net/get-sandbox/'
  15. TEMPORARY_ERROR_CODES = (429, 500, 503, 504)
  16. def parse_args():
  17. parser = argparse.ArgumentParser()
  18. fetch_from.add_common_arguments(parser)
  19. parser.add_argument('--resource-id', type=int, required=True)
  20. parser.add_argument('--custom-fetcher')
  21. parser.add_argument('--resource-file')
  22. return parser.parse_args()
  23. class ResourceInfoError(Exception):
  24. pass
  25. class UnsupportedProtocolException(Exception):
  26. pass
  27. def _sky_path():
  28. return "/usr/local/bin/sky"
  29. def _is_skynet_avaliable():
  30. if not os.path.exists(_sky_path()):
  31. return False
  32. try:
  33. subprocess.check_output([_sky_path(), "--version"])
  34. return True
  35. except subprocess.CalledProcessError:
  36. return False
  37. except OSError:
  38. return False
  39. def download_by_skynet(resource_info, file_name):
  40. def sky_get(skynet_id, target_dir, timeout=None):
  41. cmd_args = [_sky_path(), 'get', "-N", "Backbone", "--user", "--wait", "--dir", target_dir, skynet_id]
  42. if timeout is not None:
  43. cmd_args += ["--timeout", str(timeout)]
  44. logging.info('Call skynet with args: %s', cmd_args)
  45. stdout = subprocess.check_output(cmd_args).strip()
  46. logging.debug('Skynet call with args %s is finished, result is %s', cmd_args, stdout)
  47. return stdout
  48. if not _is_skynet_avaliable():
  49. raise UnsupportedProtocolException("Skynet is not available")
  50. skynet_id = resource_info.get("skynet_id")
  51. if not skynet_id:
  52. raise ValueError("Resource does not have skynet_id")
  53. temp_dir = os.path.abspath(fetch_from.uniq_string_generator())
  54. os.mkdir(temp_dir)
  55. sky_get(skynet_id, temp_dir)
  56. return os.path.join(temp_dir, file_name)
  57. def _urlopen(url, data=None, headers=None):
  58. n = 10
  59. tout = 30
  60. started = time.time()
  61. reqid = uuid.uuid4()
  62. request = urllib2.Request(url, data=data, headers=headers or {})
  63. request.add_header('X-Request-Timeout', str(tout))
  64. request.add_header('X-Request-Id', str(reqid))
  65. request.add_header('User-Agent', 'fetch_from_sandbox.py')
  66. for i in xrange(n):
  67. retry_after = i
  68. try:
  69. request.add_header('X-Request-Duration', str(int(time.time() - started)))
  70. return urllib2.urlopen(request, timeout=tout).read()
  71. except urllib2.HTTPError as e:
  72. logging.warning('failed to fetch URL %s with HTTP code %d: %s', url, e.code, e)
  73. retry_after = int(e.headers.get('Retry-After', str(retry_after)))
  74. if e.code not in TEMPORARY_ERROR_CODES:
  75. raise
  76. except Exception as e:
  77. logging.warning('failed to fetch URL %s: %s', url, e)
  78. if i + 1 == n:
  79. raise e
  80. time.sleep(retry_after)
  81. def _query(url):
  82. return json.loads(_urlopen(url))
  83. _SANDBOX_BASE_URL = 'https://sandbox.yandex-team.ru/api/v1.0'
  84. def get_resource_info(resource_id, touch=False, no_links=False):
  85. url = ''.join((_SANDBOX_BASE_URL, '/resource/', str(resource_id)))
  86. headers = {}
  87. if touch:
  88. headers.update({'X-Touch-Resource': '1'})
  89. if no_links:
  90. headers.update({'X-No-Links': '1'})
  91. return _query(url)
  92. def get_resource_http_links(resource_id):
  93. url = ''.join((_SANDBOX_BASE_URL, '/resource/', str(resource_id), '/data/http'))
  94. return [r['url'] + ORIGIN_SUFFIX for r in _query(url)]
  95. def fetch_via_script(script, resource_id):
  96. return subprocess.check_output([script, str(resource_id)]).rstrip()
  97. def fetch(resource_id, custom_fetcher):
  98. try:
  99. resource_info = get_resource_info(resource_id, touch=True, no_links=True)
  100. except Exception as e:
  101. sys.stderr.write(
  102. "Failed to fetch resource {}: {}\n".format(resource_id, str(e))
  103. )
  104. raise
  105. if resource_info.get('state', 'DELETED') != 'READY':
  106. raise ResourceInfoError("Resource {} is not READY".format(resource_id))
  107. logging.info('Resource %s info %s', str(resource_id), json.dumps(resource_info))
  108. resource_file_name = os.path.basename(resource_info["file_name"])
  109. expected_md5 = resource_info.get('md5')
  110. proxy_link = resource_info['http']['proxy'] + ORIGIN_SUFFIX
  111. mds_id = resource_info.get('attributes', {}).get('mds')
  112. mds_link = MDS_PREFIX + mds_id if mds_id else None
  113. def get_storage_links():
  114. storage_links = get_resource_http_links(resource_id)
  115. random.shuffle(storage_links)
  116. return storage_links
  117. skynet = _is_skynet_avaliable()
  118. if not skynet:
  119. logging.info("Skynet is not available, will try other protocols")
  120. def iter_tries():
  121. if skynet:
  122. yield lambda: download_by_skynet(resource_info, resource_file_name)
  123. if custom_fetcher:
  124. yield lambda: fetch_via_script(custom_fetcher, resource_id)
  125. # Don't try too hard here: we will get back to proxy later on
  126. yield lambda: fetch_from.fetch_url(proxy_link, False, resource_file_name, expected_md5, tries=2)
  127. for x in get_storage_links():
  128. # Don't spend too much time connecting single host
  129. yield lambda: fetch_from.fetch_url(x, False, resource_file_name, expected_md5, tries=1)
  130. if mds_link is not None:
  131. # Don't try too hard here: we will get back to MDS later on
  132. yield lambda: fetch_from.fetch_url(mds_link, True, resource_file_name, expected_md5, tries=2)
  133. yield lambda: fetch_from.fetch_url(proxy_link, False, resource_file_name, expected_md5)
  134. if mds_link is not None:
  135. yield lambda: fetch_from.fetch_url(mds_link, True, resource_file_name, expected_md5)
  136. if resource_info.get('attributes', {}).get('ttl') != 'inf':
  137. sys.stderr.write('WARNING: resource {} ttl is not "inf".\n'.format(resource_id))
  138. exc_info = None
  139. for i, action in enumerate(itertools.islice(iter_tries(), 0, 10)):
  140. try:
  141. fetched_file = action()
  142. break
  143. except UnsupportedProtocolException:
  144. pass
  145. except subprocess.CalledProcessError as e:
  146. logging.warning('failed to fetch resource %s with subprocess: %s', resource_id, e)
  147. time.sleep(i)
  148. except urllib2.HTTPError as e:
  149. logging.warning('failed to fetch resource %s with HTTP code %d: %s', resource_id, e.code, e)
  150. if e.code not in TEMPORARY_ERROR_CODES:
  151. exc_info = exc_info or sys.exc_info()
  152. time.sleep(i)
  153. except Exception as e:
  154. logging.exception(e)
  155. exc_info = exc_info or sys.exc_info()
  156. time.sleep(i)
  157. else:
  158. if exc_info:
  159. raise exc_info[0], exc_info[1], exc_info[2]
  160. else:
  161. raise Exception("No available protocol and/or server to fetch resource")
  162. return fetched_file, resource_info['file_name']
  163. def _get_resource_info_from_file(resource_file):
  164. if resource_file is None or not os.path.exists(resource_file):
  165. return None
  166. RESOURCE_INFO_JSON = "resource_info.json"
  167. RESOURCE_CONTENT_FILE_NAME = "resource"
  168. resource_dir, resource_file = os.path.split(resource_file)
  169. if resource_file != RESOURCE_CONTENT_FILE_NAME:
  170. return None
  171. resource_json = os.path.join(resource_dir, RESOURCE_INFO_JSON)
  172. if not os.path.isfile(resource_json):
  173. return None
  174. try:
  175. with open(resource_json, 'r') as j:
  176. resource_info = json.load(j)
  177. resource_info['file_name'] # check consistency
  178. return resource_info
  179. except:
  180. logging.debug('Invalid %s in %s', RESOURCE_INFO_JSON, resource_dir)
  181. return None
  182. def main(args):
  183. custom_fetcher = os.environ.get('YA_CUSTOM_FETCHER')
  184. resource_info = _get_resource_info_from_file(args.resource_file)
  185. if resource_info:
  186. fetched_file = args.resource_file
  187. file_name = resource_info['file_name']
  188. else:
  189. # This code should be merged to ya and removed.
  190. fetched_file, file_name = fetch(args.resource_id, custom_fetcher)
  191. fetch_from.process(fetched_file, file_name, args, remove=not custom_fetcher and not resource_info)
  192. if __name__ == '__main__':
  193. args = parse_args()
  194. fetch_from.setup_logging(args, os.path.basename(__file__))
  195. try:
  196. main(args)
  197. except Exception as e:
  198. logging.exception(e)
  199. print >>sys.stderr, open(args.abs_log_path).read()
  200. sys.stderr.flush()
  201. import error
  202. sys.exit(error.ExitCodes.INFRASTRUCTURE_ERROR if fetch_from.is_temporary(e) else 1)