fetch_from.py 13 KB


  1. from __future__ import print_function
  2. import datetime as dt
  3. import errno
  4. import hashlib
  5. import json
  6. import logging
  7. import os
  8. import platform
  9. import random
  10. import shutil
  11. import socket
  12. import string
  13. import sys
  14. import tarfile
  15. try:
  16. # Python 2
  17. import urllib2 as urllib_request
  18. from urllib2 import HTTPError, URLError
  19. except (ImportError, ModuleNotFoundError):
  20. # Python 3
  21. import urllib.request as urllib_request
  22. from urllib.error import HTTPError, URLError
  23. # Explicitly enable local imports
  24. # Don't forget to add imported scripts to inputs of the calling command!
  25. sys.path.append(os.path.dirname(os.path.abspath(__file__)))
  26. import retry
  27. def make_user_agent():
  28. return 'fetch_from: {host}'.format(host=socket.gethostname())
  29. def add_common_arguments(parser):
  30. parser.add_argument('--copy-to') # used by jbuild in fetch_resource
  31. parser.add_argument('--rename-to') # used by test_node in inject_mds_resource_to_graph
  32. parser.add_argument('--copy-to-dir')
  33. parser.add_argument('--untar-to')
  34. parser.add_argument(
  35. '--rename', action='append', default=[], metavar='FILE', help='rename FILE to the corresponding output'
  36. )
  37. parser.add_argument('--executable', action='store_true', help='make outputs executable')
  38. parser.add_argument('--log-path')
  39. parser.add_argument(
  40. '-v',
  41. '--verbose',
  42. action='store_true',
  43. default=os.environ.get('YA_VERBOSE_FETCHER'),
  44. help='increase stderr verbosity',
  45. )
  46. parser.add_argument('outputs', nargs='*', default=[])
  47. def ensure_dir(path):
  48. if not (path == '' or os.path.isdir(path)):
  49. os.makedirs(path)
  50. # Reference code: library/python/fs/__init__.py
  51. def hardlink_or_copy(src, dst):
  52. ensure_dir(os.path.dirname(dst))
  53. if os.name == 'nt':
  54. shutil.copy(src, dst)
  55. else:
  56. try:
  57. os.link(src, dst)
  58. except OSError as e:
  59. if e.errno == errno.EEXIST:
  60. return
  61. elif e.errno in (errno.EXDEV, errno.EMLINK, errno.EINVAL, errno.EACCES):
  62. sys.stderr.write(
  63. "Can't make hardlink (errno={}) - fallback to copy: {} -> {}\n".format(e.errno, src, dst)
  64. )
  65. shutil.copy(src, dst)
  66. else:
  67. sys.stderr.write("src: {} dst: {}\n".format(src, dst))
  68. raise
  69. def rename_or_copy_and_remove(src, dst):
  70. ensure_dir(os.path.dirname(dst))
  71. try:
  72. os.rename(src, dst)
  73. except OSError:
  74. shutil.copy(src, dst)
  75. os.remove(src)
  76. class BadChecksumFetchError(Exception):
  77. pass
  78. class IncompleteFetchError(Exception):
  79. pass
  80. class ResourceUnpackingError(Exception):
  81. pass
  82. class ResourceIsDirectoryError(Exception):
  83. pass
  84. class OutputIsDirectoryError(Exception):
  85. pass
  86. class OutputNotExistError(Exception):
  87. pass
  88. def setup_logging(args, base_name):
  89. def makedirs(path):
  90. try:
  91. os.makedirs(path)
  92. except OSError:
  93. pass
  94. if args.log_path:
  95. log_file_name = args.log_path
  96. else:
  97. log_file_name = base_name + ".log"
  98. args.abs_log_path = os.path.abspath(log_file_name)
  99. makedirs(os.path.dirname(args.abs_log_path))
  100. logging.basicConfig(filename=args.abs_log_path, level=logging.DEBUG)
  101. if args.verbose:
  102. logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
  103. def is_temporary(e):
  104. def is_broken(e):
  105. return isinstance(e, HTTPError) and e.code in (410, 404)
  106. if is_broken(e):
  107. return False
  108. if isinstance(e, (BadChecksumFetchError, IncompleteFetchError, URLError, socket.error)):
  109. return True
  110. import error
  111. return error.is_temporary_error(e)
  112. def uniq_string_generator(size=6, chars=string.ascii_lowercase + string.digits):
  113. return ''.join(random.choice(chars) for _ in range(size))
  114. def report_to_snowden(value):
  115. def inner():
  116. body = {
  117. 'namespace': 'ygg',
  118. 'key': 'fetch-from-sandbox',
  119. 'value': json.dumps(value),
  120. }
  121. urllib_request.urlopen(
  122. 'https://back-snowden.qloud.yandex-team.ru/report/add',
  123. json.dumps(
  124. [
  125. body,
  126. ]
  127. ),
  128. timeout=5,
  129. )
  130. try:
  131. inner()
  132. except Exception as e:
  133. logging.warning('report_to_snowden failed: %s', e)
  134. def copy_stream(read, *writers, **kwargs):
  135. chunk_size = kwargs.get('size', 1024 * 1024)
  136. while True:
  137. data = read(chunk_size)
  138. if not data:
  139. break
  140. for write in writers:
  141. write(data)
  142. def md5file(fname):
  143. res = hashlib.md5()
  144. with open(fname, 'rb') as f:
  145. copy_stream(f.read, res.update)
  146. return res.hexdigest()
  147. def git_like_hash_with_size(filepath):
  148. """
  149. Calculate git like hash for path
  150. """
  151. sha = hashlib.sha1()
  152. file_size = 0
  153. with open(filepath, 'rb') as f:
  154. while True:
  155. block = f.read(2**16)
  156. if not block:
  157. break
  158. file_size += len(block)
  159. sha.update(block)
  160. sha.update(b'\0')
  161. sha.update(str(file_size).encode('utf-8'))
  162. return sha.hexdigest(), file_size
  163. def size_printer(display_name, size):
  164. sz = [0]
  165. last_stamp = [dt.datetime.now()]
  166. def printer(chunk):
  167. sz[0] += len(chunk)
  168. now = dt.datetime.now()
  169. if last_stamp[0] + dt.timedelta(seconds=10) < now:
  170. if size:
  171. print("##status##{} - [[imp]]{:.1f}%[[rst]]".format(
  172. display_name, 100.0 * sz[0] / size if size else 0
  173. ), file=sys.stderr)
  174. last_stamp[0] = now
  175. return printer
  176. def fetch_url(url, unpack, resource_file_name, expected_md5=None, expected_sha1=None, tries=10, writers=None):
  177. logging.info('Downloading from url %s name %s and expected md5 %s', url, resource_file_name, expected_md5)
  178. tmp_file_name = uniq_string_generator()
  179. request = urllib_request.Request(url, headers={'User-Agent': make_user_agent()})
  180. req = retry.retry_func(lambda: urllib_request.urlopen(request, timeout=30), tries=tries, delay=5, backoff=1.57079)
  181. logging.debug('Headers: %s', req.headers)
  182. expected_file_size = int(req.headers.get('Content-Length', 0))
  183. real_md5 = hashlib.md5()
  184. real_sha1 = hashlib.sha1()
  185. with open(tmp_file_name, 'wb') as fp:
  186. copy_stream(
  187. req.read,
  188. fp.write,
  189. real_md5.update,
  190. real_sha1.update,
  191. size_printer(resource_file_name, expected_file_size),
  192. *([] if writers is None else writers)
  193. )
  194. real_md5 = real_md5.hexdigest()
  195. real_file_size = os.path.getsize(tmp_file_name)
  196. real_sha1.update(b'\0')
  197. real_sha1.update(str(real_file_size).encode('utf-8'))
  198. real_sha1 = real_sha1.hexdigest()
  199. if unpack:
  200. tmp_dir = tmp_file_name + '.dir'
  201. os.makedirs(tmp_dir)
  202. with tarfile.open(tmp_file_name, mode="r|gz") as tar:
  203. tar.extractall(tmp_dir)
  204. tmp_file_name = os.path.join(tmp_dir, resource_file_name)
  205. if expected_md5:
  206. real_md5 = md5file(tmp_file_name)
  207. logging.info('File size %s (expected %s)', real_file_size, expected_file_size or "UNKNOWN")
  208. logging.info('File md5 %s (expected %s)', real_md5, expected_md5)
  209. logging.info('File sha1 %s (expected %s)', real_sha1, expected_sha1)
  210. if expected_md5 and real_md5 != expected_md5:
  211. report_to_snowden({'headers': req.headers.headers, 'expected_md5': expected_md5, 'real_md5': real_md5})
  212. raise BadChecksumFetchError(
  213. 'Downloaded {}, but expected {} for {}'.format(
  214. real_md5,
  215. expected_md5,
  216. url,
  217. )
  218. )
  219. if expected_sha1 and real_sha1 != expected_sha1:
  220. report_to_snowden({'headers': req.headers.headers, 'expected_sha1': expected_sha1, 'real_sha1': real_sha1})
  221. raise BadChecksumFetchError(
  222. 'Downloaded {}, but expected {} for {}'.format(
  223. real_sha1,
  224. expected_sha1,
  225. url,
  226. )
  227. )
  228. if expected_file_size and expected_file_size != real_file_size:
  229. report_to_snowden({'headers': req.headers.headers, 'file_size': real_file_size})
  230. raise IncompleteFetchError(
  231. 'Downloaded {}, but expected {} for {}'.format(
  232. real_file_size,
  233. expected_file_size,
  234. url,
  235. )
  236. )
  237. return tmp_file_name
  238. def chmod(filename, mode):
  239. if platform.system().lower() == 'windows':
  240. # https://docs.microsoft.com/en-us/windows/win32/fileio/hard-links-and-junctions:
  241. # hard to reset read-only attribute for removal if there are multiple hardlinks
  242. return
  243. stat = os.stat(filename)
  244. if stat.st_mode & 0o777 != mode:
  245. try:
  246. os.chmod(filename, mode)
  247. except OSError:
  248. import pwd
  249. sys.stderr.write(
  250. "{} st_mode: {} pwuid: {}\n".format(filename, stat.st_mode, pwd.getpwuid(os.stat(filename).st_uid))
  251. )
  252. raise
  253. def make_readonly(filename):
  254. chmod(filename, os.stat(filename).st_mode & 0o111 | 0o444)
  255. def process(fetched_file, file_name, args, remove=True):
  256. assert len(args.rename) <= len(args.outputs), ('too few outputs to rename', args.rename, 'into', args.outputs)
  257. fetched_file_is_dir = os.path.isdir(fetched_file)
  258. if fetched_file_is_dir and not args.untar_to:
  259. raise ResourceIsDirectoryError('Resource may be directory only with untar_to option: ' + fetched_file)
  260. # make all read only
  261. if fetched_file_is_dir:
  262. for root, _, files in os.walk(fetched_file):
  263. for filename in files:
  264. make_readonly(os.path.join(root, filename))
  265. else:
  266. make_readonly(fetched_file)
  267. if args.copy_to:
  268. hardlink_or_copy(fetched_file, args.copy_to)
  269. if not args.outputs:
  270. args.outputs = [args.copy_to]
  271. if args.rename_to:
  272. args.rename.append(fetched_file)
  273. if not args.outputs:
  274. args.outputs = [args.rename_to]
  275. if args.copy_to_dir:
  276. hardlink_or_copy(fetched_file, os.path.join(args.copy_to_dir, file_name))
  277. if args.untar_to:
  278. ensure_dir(args.untar_to)
  279. inputs = set(map(os.path.normpath, args.rename + args.outputs[len(args.rename) :]))
  280. if fetched_file_is_dir:
  281. for member in inputs:
  282. base, name = member.split('/', 1)
  283. src = os.path.normpath(os.path.join(fetched_file, name))
  284. dst = os.path.normpath(os.path.join(args.untar_to, member))
  285. hardlink_or_copy(src, dst)
  286. else:
  287. # Extract only requested files
  288. try:
  289. with tarfile.open(fetched_file, mode='r:*') as tar:
  290. members = [
  291. entry for entry in tar if os.path.normpath(os.path.join(args.untar_to, entry.name)) in inputs
  292. ]
  293. tar.extractall(args.untar_to, members=members)
  294. except tarfile.ReadError as e:
  295. logging.exception(e)
  296. raise ResourceUnpackingError('File {} cannot be untared'.format(fetched_file))
  297. # Forbid changes to the loaded resource data
  298. for root, _, files in os.walk(args.untar_to):
  299. for filename in files:
  300. make_readonly(os.path.join(root, filename))
  301. for src, dst in zip(args.rename, args.outputs):
  302. if src == 'RESOURCE':
  303. src = fetched_file
  304. if os.path.abspath(src) == os.path.abspath(fetched_file):
  305. logging.info('Copying %s to %s', src, dst)
  306. hardlink_or_copy(src, dst)
  307. else:
  308. logging.info('Renaming %s to %s', src, dst)
  309. if os.path.exists(dst):
  310. raise ResourceUnpackingError("Target file already exists ({} -> {})".format(src, dst))
  311. if not os.path.exists(src):
  312. raise ResourceUnpackingError("Source file does not exist ({} in {})".format(src, os.getcwd()))
  313. if remove:
  314. rename_or_copy_and_remove(src, dst)
  315. else:
  316. hardlink_or_copy(src, dst)
  317. for path in args.outputs:
  318. if not os.path.exists(path):
  319. raise OutputNotExistError('Output does not exist: %s' % os.path.abspath(path))
  320. if not os.path.isfile(path):
  321. raise OutputIsDirectoryError('Output must be a file, not a directory: %s' % os.path.abspath(path))
  322. if args.executable:
  323. chmod(path, os.stat(path).st_mode | 0o111)
  324. if os.path.abspath(path) == os.path.abspath(fetched_file):
  325. remove = False
  326. if remove:
  327. if fetched_file_is_dir:
  328. shutil.rmtree(fetched_file)
  329. else:
  330. os.remove(fetched_file)