fetch_from.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. import datetime as dt
  2. import errno
  3. import hashlib
  4. import json
  5. import logging
  6. import os
  7. import platform
  8. import random
  9. import shutil
  10. import socket
  11. import string
  12. import sys
  13. import tarfile
  14. import urllib2
  15. import retry
  16. def make_user_agent():
  17. return 'fetch_from: {host}'.format(host=socket.gethostname())
  18. def add_common_arguments(parser):
  19. parser.add_argument('--copy-to') # used by jbuild in fetch_resource
  20. parser.add_argument('--rename-to') # used by test_node in inject_mds_resource_to_graph
  21. parser.add_argument('--copy-to-dir')
  22. parser.add_argument('--untar-to')
  23. parser.add_argument('--rename', action='append', default=[], metavar='FILE', help='rename FILE to the corresponding output')
  24. parser.add_argument('--executable', action='store_true', help='make outputs executable')
  25. parser.add_argument('--log-path')
  26. parser.add_argument('-v', '--verbose', action='store_true', default=os.environ.get('YA_VERBOSE_FETCHER'), help='increase stderr verbosity')
  27. parser.add_argument('outputs', nargs='*', default=[])
  28. def ensure_dir(path):
  29. if not (path == '' or os.path.isdir(path)):
  30. os.makedirs(path)
  31. # Reference code: library/python/fs/__init__.py
  32. def hardlink_or_copy(src, dst):
  33. ensure_dir(os.path.dirname(dst))
  34. if os.name == 'nt':
  35. shutil.copy(src, dst)
  36. else:
  37. try:
  38. os.link(src, dst)
  39. except OSError as e:
  40. if e.errno == errno.EEXIST:
  41. return
  42. elif e.errno in (errno.EXDEV, errno.EMLINK, errno.EINVAL, errno.EACCES):
  43. sys.stderr.write("Can't make hardlink (errno={}) - fallback to copy: {} -> {}\n".format(e.errno, src, dst))
  44. shutil.copy(src, dst)
  45. else:
  46. raise
  47. def rename_or_copy_and_remove(src, dst):
  48. ensure_dir(os.path.dirname(dst))
  49. try:
  50. os.rename(src, dst)
  51. except OSError:
  52. shutil.copy(src, dst)
  53. os.remove(src)
  54. class BadChecksumFetchError(Exception):
  55. pass
  56. class IncompleteFetchError(Exception):
  57. pass
  58. class ResourceUnpackingError(Exception):
  59. pass
  60. class ResourceIsDirectoryError(Exception):
  61. pass
  62. class OutputIsDirectoryError(Exception):
  63. pass
  64. class OutputNotExistError(Exception):
  65. pass
  66. def setup_logging(args, base_name):
  67. def makedirs(path):
  68. try:
  69. os.makedirs(path)
  70. except OSError:
  71. pass
  72. if args.log_path:
  73. log_file_name = args.log_path
  74. else:
  75. log_file_name = base_name + ".log"
  76. args.abs_log_path = os.path.abspath(log_file_name)
  77. makedirs(os.path.dirname(args.abs_log_path))
  78. logging.basicConfig(filename=args.abs_log_path, level=logging.DEBUG)
  79. if args.verbose:
  80. logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
  81. def is_temporary(e):
  82. def is_broken(e):
  83. return isinstance(e, urllib2.HTTPError) and e.code in (410, 404)
  84. if is_broken(e):
  85. return False
  86. if isinstance(e, (BadChecksumFetchError, IncompleteFetchError, urllib2.URLError, socket.error)):
  87. return True
  88. import error
  89. return error.is_temporary_error(e)
  90. def uniq_string_generator(size=6, chars=string.ascii_lowercase + string.digits):
  91. return ''.join(random.choice(chars) for _ in range(size))
  92. def report_to_snowden(value):
  93. def inner():
  94. body = {
  95. 'namespace': 'ygg',
  96. 'key': 'fetch-from-sandbox',
  97. 'value': json.dumps(value),
  98. }
  99. urllib2.urlopen(
  100. 'https://back-snowden.qloud.yandex-team.ru/report/add',
  101. json.dumps([body, ]),
  102. timeout=5,
  103. )
  104. try:
  105. inner()
  106. except Exception as e:
  107. logging.warning('report_to_snowden failed: %s', e)
  108. def copy_stream(read, *writers, **kwargs):
  109. chunk_size = kwargs.get('size', 1024*1024)
  110. while True:
  111. data = read(chunk_size)
  112. if not data:
  113. break
  114. for write in writers:
  115. write(data)
  116. def md5file(fname):
  117. res = hashlib.md5()
  118. with open(fname, 'rb') as f:
  119. copy_stream(f.read, res.update)
  120. return res.hexdigest()
  121. def git_like_hash_with_size(filepath):
  122. """
  123. Calculate git like hash for path
  124. """
  125. sha = hashlib.sha1()
  126. file_size = 0
  127. with open(filepath, 'rb') as f:
  128. while True:
  129. block = f.read(2 ** 16)
  130. if not block:
  131. break
  132. file_size += len(block)
  133. sha.update(block)
  134. sha.update('\0')
  135. sha.update(str(file_size))
  136. return sha.hexdigest(), file_size
  137. def size_printer(display_name, size):
  138. sz = [0]
  139. last_stamp = [dt.datetime.now()]
  140. def printer(chunk):
  141. sz[0] += len(chunk)
  142. now = dt.datetime.now()
  143. if last_stamp[0] + dt.timedelta(seconds=10) < now:
  144. if size:
  145. print >>sys.stderr, "##status##{} - [[imp]]{:.1f}%[[rst]]".format(display_name, 100.0 * sz[0] / size if size else 0)
  146. last_stamp[0] = now
  147. return printer
  148. def fetch_url(url, unpack, resource_file_name, expected_md5=None, expected_sha1=None, tries=10, writers=None):
  149. logging.info('Downloading from url %s name %s and expected md5 %s', url, resource_file_name, expected_md5)
  150. tmp_file_name = uniq_string_generator()
  151. request = urllib2.Request(url, headers={'User-Agent': make_user_agent()})
  152. req = retry.retry_func(lambda: urllib2.urlopen(request, timeout=30), tries=tries, delay=5, backoff=1.57079)
  153. logging.debug('Headers: %s', req.headers.headers)
  154. expected_file_size = int(req.headers.get('Content-Length', 0))
  155. real_md5 = hashlib.md5()
  156. real_sha1 = hashlib.sha1()
  157. with open(tmp_file_name, 'wb') as fp:
  158. copy_stream(
  159. req.read,
  160. fp.write,
  161. real_md5.update,
  162. real_sha1.update,
  163. size_printer(resource_file_name, expected_file_size),
  164. *([] if writers is None else writers)
  165. )
  166. real_md5 = real_md5.hexdigest()
  167. real_file_size = os.path.getsize(tmp_file_name)
  168. real_sha1.update('\0')
  169. real_sha1.update(str(real_file_size))
  170. real_sha1 = real_sha1.hexdigest()
  171. if unpack:
  172. tmp_dir = tmp_file_name + '.dir'
  173. os.makedirs(tmp_dir)
  174. with tarfile.open(tmp_file_name, mode="r|gz") as tar:
  175. tar.extractall(tmp_dir)
  176. tmp_file_name = os.path.join(tmp_dir, resource_file_name)
  177. real_md5 = md5file(tmp_file_name)
  178. logging.info('File size %s (expected %s)', real_file_size, expected_file_size or "UNKNOWN")
  179. logging.info('File md5 %s (expected %s)', real_md5, expected_md5)
  180. logging.info('File sha1 %s (expected %s)', real_sha1, expected_sha1)
  181. if expected_md5 and real_md5 != expected_md5:
  182. report_to_snowden(
  183. {
  184. 'headers': req.headers.headers,
  185. 'expected_md5': expected_md5,
  186. 'real_md5': real_md5
  187. }
  188. )
  189. raise BadChecksumFetchError(
  190. 'Downloaded {}, but expected {} for {}'.format(
  191. real_md5,
  192. expected_md5,
  193. url,
  194. )
  195. )
  196. if expected_sha1 and real_sha1 != expected_sha1:
  197. report_to_snowden(
  198. {
  199. 'headers': req.headers.headers,
  200. 'expected_sha1': expected_sha1,
  201. 'real_sha1': real_sha1
  202. }
  203. )
  204. raise BadChecksumFetchError(
  205. 'Downloaded {}, but expected {} for {}'.format(
  206. real_sha1,
  207. expected_sha1,
  208. url,
  209. )
  210. )
  211. if expected_file_size and expected_file_size != real_file_size:
  212. report_to_snowden({'headers': req.headers.headers, 'file_size': real_file_size})
  213. raise IncompleteFetchError(
  214. 'Downloaded {}, but expected {} for {}'.format(
  215. real_file_size,
  216. expected_file_size,
  217. url,
  218. )
  219. )
  220. return tmp_file_name
  221. def chmod(filename, mode):
  222. if platform.system().lower() == 'windows':
  223. # https://docs.microsoft.com/en-us/windows/win32/fileio/hard-links-and-junctions:
  224. # hard to reset read-only attribute for removal if there are multiple hardlinks
  225. return
  226. stat = os.stat(filename)
  227. if stat.st_mode & 0o777 != mode:
  228. try:
  229. os.chmod(filename, mode)
  230. except OSError:
  231. import pwd
  232. sys.stderr.write("{} st_mode: {} pwuid: {}\n".format(filename, stat.st_mode, pwd.getpwuid(os.stat(filename).st_uid)))
  233. raise
  234. def process(fetched_file, file_name, args, remove=True):
  235. assert len(args.rename) <= len(args.outputs), (
  236. 'too few outputs to rename', args.rename, 'into', args.outputs)
  237. # Forbid changes to the loaded resource
  238. chmod(fetched_file, 0o444)
  239. if not os.path.isfile(fetched_file):
  240. raise ResourceIsDirectoryError('Resource must be a file, not a directory: %s' % fetched_file)
  241. if args.copy_to:
  242. hardlink_or_copy(fetched_file, args.copy_to)
  243. if not args.outputs:
  244. args.outputs = [args.copy_to]
  245. if args.rename_to:
  246. args.rename.append(fetched_file)
  247. if not args.outputs:
  248. args.outputs = [args.rename_to]
  249. if args.copy_to_dir:
  250. hardlink_or_copy(fetched_file, os.path.join(args.copy_to_dir, file_name))
  251. if args.untar_to:
  252. ensure_dir(args.untar_to)
  253. # Extract only requested files
  254. try:
  255. with tarfile.open(fetched_file, mode='r:*') as tar:
  256. inputs = set(map(os.path.normpath, args.rename + args.outputs[len(args.rename):]))
  257. members = [entry for entry in tar if os.path.normpath(os.path.join(args.untar_to, entry.name)) in inputs]
  258. tar.extractall(args.untar_to, members=members)
  259. # Forbid changes to the loaded resource data
  260. for root, _, files in os.walk(args.untar_to):
  261. for filename in files:
  262. chmod(os.path.join(root, filename), 0o444)
  263. except tarfile.ReadError as e:
  264. logging.exception(e)
  265. raise ResourceUnpackingError('File {} cannot be untared'.format(fetched_file))
  266. for src, dst in zip(args.rename, args.outputs):
  267. if src == 'RESOURCE':
  268. src = fetched_file
  269. if os.path.abspath(src) == os.path.abspath(fetched_file):
  270. logging.info('Copying %s to %s', src, dst)
  271. hardlink_or_copy(src, dst)
  272. else:
  273. logging.info('Renaming %s to %s', src, dst)
  274. if os.path.exists(dst):
  275. raise ResourceUnpackingError("Target file already exists ({} -> {})".format(src, dst))
  276. if remove:
  277. rename_or_copy_and_remove(src, dst)
  278. else:
  279. hardlink_or_copy(src, dst)
  280. for path in args.outputs:
  281. if not os.path.exists(path):
  282. raise OutputNotExistError('Output does not exist: %s' % os.path.abspath(path))
  283. if not os.path.isfile(path):
  284. raise OutputIsDirectoryError('Output must be a file, not a directory: %s' % os.path.abspath(path))
  285. if args.executable:
  286. chmod(path, os.stat(path).st_mode | 0o111)
  287. if os.path.abspath(path) == os.path.abspath(fetched_file):
  288. remove = False
  289. if remove:
  290. os.remove(fetched_file)