fetch_from.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. from __future__ import print_function
  2. import datetime as dt
  3. import errno
  4. import hashlib
  5. import json
  6. import logging
  7. import os
  8. import platform
  9. import random
  10. import shutil
  11. import socket
  12. import string
  13. import sys
  14. import tarfile
  15. try:
  16. # Python 2
  17. import urllib2 as urllib_request
  18. from urllib2 import HTTPError, URLError
  19. except (ImportError, ModuleNotFoundError):
  20. # Python 3
  21. import urllib.request as urllib_request
  22. from urllib.error import HTTPError, URLError
  23. import retry
  24. def make_user_agent():
  25. return 'fetch_from: {host}'.format(host=socket.gethostname())
  26. def add_common_arguments(parser):
  27. parser.add_argument('--copy-to') # used by jbuild in fetch_resource
  28. parser.add_argument('--rename-to') # used by test_node in inject_mds_resource_to_graph
  29. parser.add_argument('--copy-to-dir')
  30. parser.add_argument('--untar-to')
  31. parser.add_argument(
  32. '--rename', action='append', default=[], metavar='FILE', help='rename FILE to the corresponding output'
  33. )
  34. parser.add_argument('--executable', action='store_true', help='make outputs executable')
  35. parser.add_argument('--log-path')
  36. parser.add_argument(
  37. '-v',
  38. '--verbose',
  39. action='store_true',
  40. default=os.environ.get('YA_VERBOSE_FETCHER'),
  41. help='increase stderr verbosity',
  42. )
  43. parser.add_argument('outputs', nargs='*', default=[])
  44. def ensure_dir(path):
  45. if not (path == '' or os.path.isdir(path)):
  46. os.makedirs(path)
  47. # Reference code: library/python/fs/__init__.py
  48. def hardlink_or_copy(src, dst):
  49. ensure_dir(os.path.dirname(dst))
  50. if os.name == 'nt':
  51. shutil.copy(src, dst)
  52. else:
  53. try:
  54. os.link(src, dst)
  55. except OSError as e:
  56. if e.errno == errno.EEXIST:
  57. return
  58. elif e.errno in (errno.EXDEV, errno.EMLINK, errno.EINVAL, errno.EACCES):
  59. sys.stderr.write(
  60. "Can't make hardlink (errno={}) - fallback to copy: {} -> {}\n".format(e.errno, src, dst)
  61. )
  62. shutil.copy(src, dst)
  63. else:
  64. sys.stderr.write("src: {} dst: {}\n".format(src, dst))
  65. raise
  66. def rename_or_copy_and_remove(src, dst):
  67. ensure_dir(os.path.dirname(dst))
  68. try:
  69. os.rename(src, dst)
  70. except OSError:
  71. shutil.copy(src, dst)
  72. os.remove(src)
  73. class BadChecksumFetchError(Exception):
  74. pass
  75. class IncompleteFetchError(Exception):
  76. pass
  77. class ResourceUnpackingError(Exception):
  78. pass
  79. class ResourceIsDirectoryError(Exception):
  80. pass
  81. class OutputIsDirectoryError(Exception):
  82. pass
  83. class OutputNotExistError(Exception):
  84. pass
  85. def setup_logging(args, base_name):
  86. def makedirs(path):
  87. try:
  88. os.makedirs(path)
  89. except OSError:
  90. pass
  91. if args.log_path:
  92. log_file_name = args.log_path
  93. else:
  94. log_file_name = base_name + ".log"
  95. args.abs_log_path = os.path.abspath(log_file_name)
  96. makedirs(os.path.dirname(args.abs_log_path))
  97. logging.basicConfig(filename=args.abs_log_path, level=logging.DEBUG)
  98. if args.verbose:
  99. logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
  100. def is_temporary(e):
  101. def is_broken(e):
  102. return isinstance(e, HTTPError) and e.code in (410, 404)
  103. if is_broken(e):
  104. return False
  105. if isinstance(e, (BadChecksumFetchError, IncompleteFetchError, URLError, socket.error)):
  106. return True
  107. import error
  108. return error.is_temporary_error(e)
  109. def uniq_string_generator(size=6, chars=string.ascii_lowercase + string.digits):
  110. return ''.join(random.choice(chars) for _ in range(size))
  111. def report_to_snowden(value):
  112. def inner():
  113. body = {
  114. 'namespace': 'ygg',
  115. 'key': 'fetch-from-sandbox',
  116. 'value': json.dumps(value),
  117. }
  118. urllib_request.urlopen(
  119. 'https://back-snowden.qloud.yandex-team.ru/report/add',
  120. json.dumps(
  121. [
  122. body,
  123. ]
  124. ),
  125. timeout=5,
  126. )
  127. try:
  128. inner()
  129. except Exception as e:
  130. logging.warning('report_to_snowden failed: %s', e)
  131. def copy_stream(read, *writers, **kwargs):
  132. chunk_size = kwargs.get('size', 1024 * 1024)
  133. while True:
  134. data = read(chunk_size)
  135. if not data:
  136. break
  137. for write in writers:
  138. write(data)
  139. def md5file(fname):
  140. res = hashlib.md5()
  141. with open(fname, 'rb') as f:
  142. copy_stream(f.read, res.update)
  143. return res.hexdigest()
  144. def git_like_hash_with_size(filepath):
  145. """
  146. Calculate git like hash for path
  147. """
  148. sha = hashlib.sha1()
  149. file_size = 0
  150. with open(filepath, 'rb') as f:
  151. while True:
  152. block = f.read(2**16)
  153. if not block:
  154. break
  155. file_size += len(block)
  156. sha.update(block)
  157. sha.update(b'\0')
  158. sha.update(str(file_size).encode('utf-8'))
  159. return sha.hexdigest(), file_size
  160. def size_printer(display_name, size):
  161. sz = [0]
  162. last_stamp = [dt.datetime.now()]
  163. def printer(chunk):
  164. sz[0] += len(chunk)
  165. now = dt.datetime.now()
  166. if last_stamp[0] + dt.timedelta(seconds=10) < now:
  167. if size:
  168. print("##status##{} - [[imp]]{:.1f}%[[rst]]".format(
  169. display_name, 100.0 * sz[0] / size if size else 0
  170. ), file=sys.stderr)
  171. last_stamp[0] = now
  172. return printer
  173. def fetch_url(url, unpack, resource_file_name, expected_md5=None, expected_sha1=None, tries=10, writers=None):
  174. logging.info('Downloading from url %s name %s and expected md5 %s', url, resource_file_name, expected_md5)
  175. tmp_file_name = uniq_string_generator()
  176. request = urllib_request.Request(url, headers={'User-Agent': make_user_agent()})
  177. req = retry.retry_func(lambda: urllib_request.urlopen(request, timeout=30), tries=tries, delay=5, backoff=1.57079)
  178. logging.debug('Headers: %s', req.headers)
  179. expected_file_size = int(req.headers.get('Content-Length', 0))
  180. real_md5 = hashlib.md5()
  181. real_sha1 = hashlib.sha1()
  182. with open(tmp_file_name, 'wb') as fp:
  183. copy_stream(
  184. req.read,
  185. fp.write,
  186. real_md5.update,
  187. real_sha1.update,
  188. size_printer(resource_file_name, expected_file_size),
  189. *([] if writers is None else writers)
  190. )
  191. real_md5 = real_md5.hexdigest()
  192. real_file_size = os.path.getsize(tmp_file_name)
  193. real_sha1.update(b'\0')
  194. real_sha1.update(str(real_file_size).encode('utf-8'))
  195. real_sha1 = real_sha1.hexdigest()
  196. if unpack:
  197. tmp_dir = tmp_file_name + '.dir'
  198. os.makedirs(tmp_dir)
  199. with tarfile.open(tmp_file_name, mode="r|gz") as tar:
  200. tar.extractall(tmp_dir)
  201. tmp_file_name = os.path.join(tmp_dir, resource_file_name)
  202. if expected_md5:
  203. real_md5 = md5file(tmp_file_name)
  204. logging.info('File size %s (expected %s)', real_file_size, expected_file_size or "UNKNOWN")
  205. logging.info('File md5 %s (expected %s)', real_md5, expected_md5)
  206. logging.info('File sha1 %s (expected %s)', real_sha1, expected_sha1)
  207. if expected_md5 and real_md5 != expected_md5:
  208. report_to_snowden({'headers': req.headers.headers, 'expected_md5': expected_md5, 'real_md5': real_md5})
  209. raise BadChecksumFetchError(
  210. 'Downloaded {}, but expected {} for {}'.format(
  211. real_md5,
  212. expected_md5,
  213. url,
  214. )
  215. )
  216. if expected_sha1 and real_sha1 != expected_sha1:
  217. report_to_snowden({'headers': req.headers.headers, 'expected_sha1': expected_sha1, 'real_sha1': real_sha1})
  218. raise BadChecksumFetchError(
  219. 'Downloaded {}, but expected {} for {}'.format(
  220. real_sha1,
  221. expected_sha1,
  222. url,
  223. )
  224. )
  225. if expected_file_size and expected_file_size != real_file_size:
  226. report_to_snowden({'headers': req.headers.headers, 'file_size': real_file_size})
  227. raise IncompleteFetchError(
  228. 'Downloaded {}, but expected {} for {}'.format(
  229. real_file_size,
  230. expected_file_size,
  231. url,
  232. )
  233. )
  234. return tmp_file_name
  235. def chmod(filename, mode):
  236. if platform.system().lower() == 'windows':
  237. # https://docs.microsoft.com/en-us/windows/win32/fileio/hard-links-and-junctions:
  238. # hard to reset read-only attribute for removal if there are multiple hardlinks
  239. return
  240. stat = os.stat(filename)
  241. if stat.st_mode & 0o777 != mode:
  242. try:
  243. os.chmod(filename, mode)
  244. except OSError:
  245. import pwd
  246. sys.stderr.write(
  247. "{} st_mode: {} pwuid: {}\n".format(filename, stat.st_mode, pwd.getpwuid(os.stat(filename).st_uid))
  248. )
  249. raise
  250. def make_readonly(filename):
  251. chmod(filename, os.stat(filename).st_mode & 0o111 | 0o444)
  252. def process(fetched_file, file_name, args, remove=True):
  253. assert len(args.rename) <= len(args.outputs), ('too few outputs to rename', args.rename, 'into', args.outputs)
  254. fetched_file_is_dir = os.path.isdir(fetched_file)
  255. if fetched_file_is_dir and not args.untar_to:
  256. raise ResourceIsDirectoryError('Resource may be directory only with untar_to option: ' + fetched_file)
  257. # make all read only
  258. if fetched_file_is_dir:
  259. for root, _, files in os.walk(fetched_file):
  260. for filename in files:
  261. make_readonly(os.path.join(root, filename))
  262. else:
  263. make_readonly(fetched_file)
  264. if args.copy_to:
  265. hardlink_or_copy(fetched_file, args.copy_to)
  266. if not args.outputs:
  267. args.outputs = [args.copy_to]
  268. if args.rename_to:
  269. args.rename.append(fetched_file)
  270. if not args.outputs:
  271. args.outputs = [args.rename_to]
  272. if args.copy_to_dir:
  273. hardlink_or_copy(fetched_file, os.path.join(args.copy_to_dir, file_name))
  274. if args.untar_to:
  275. ensure_dir(args.untar_to)
  276. inputs = set(map(os.path.normpath, args.rename + args.outputs[len(args.rename) :]))
  277. if fetched_file_is_dir:
  278. for member in inputs:
  279. base, name = member.split('/', 1)
  280. src = os.path.normpath(os.path.join(fetched_file, name))
  281. dst = os.path.normpath(os.path.join(args.untar_to, member))
  282. hardlink_or_copy(src, dst)
  283. else:
  284. # Extract only requested files
  285. try:
  286. with tarfile.open(fetched_file, mode='r:*') as tar:
  287. members = [
  288. entry for entry in tar if os.path.normpath(os.path.join(args.untar_to, entry.name)) in inputs
  289. ]
  290. tar.extractall(args.untar_to, members=members)
  291. except tarfile.ReadError as e:
  292. logging.exception(e)
  293. raise ResourceUnpackingError('File {} cannot be untared'.format(fetched_file))
  294. # Forbid changes to the loaded resource data
  295. for root, _, files in os.walk(args.untar_to):
  296. for filename in files:
  297. make_readonly(os.path.join(root, filename))
  298. for src, dst in zip(args.rename, args.outputs):
  299. if src == 'RESOURCE':
  300. src = fetched_file
  301. if os.path.abspath(src) == os.path.abspath(fetched_file):
  302. logging.info('Copying %s to %s', src, dst)
  303. hardlink_or_copy(src, dst)
  304. else:
  305. logging.info('Renaming %s to %s', src, dst)
  306. if os.path.exists(dst):
  307. raise ResourceUnpackingError("Target file already exists ({} -> {})".format(src, dst))
  308. if remove:
  309. rename_or_copy_and_remove(src, dst)
  310. else:
  311. hardlink_or_copy(src, dst)
  312. for path in args.outputs:
  313. if not os.path.exists(path):
  314. raise OutputNotExistError('Output does not exist: %s' % os.path.abspath(path))
  315. if not os.path.isfile(path):
  316. raise OutputIsDirectoryError('Output must be a file, not a directory: %s' % os.path.abspath(path))
  317. if args.executable:
  318. chmod(path, os.stat(path).st_mode | 0o111)
  319. if os.path.abspath(path) == os.path.abspath(fetched_file):
  320. remove = False
  321. if remove:
  322. if fetched_file_is_dir:
  323. shutil.rmtree(fetched_file)
  324. else:
  325. os.remove(fetched_file)