fetch_from.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. import datetime as dt
  2. import errno
  3. import hashlib
  4. import json
  5. import logging
  6. import os
  7. import platform
  8. import random
  9. import shutil
  10. import socket
  11. import string
  12. import sys
  13. import tarfile
  14. import urllib2
  15. import retry
  16. def make_user_agent():
  17. return 'fetch_from: {host}'.format(host=socket.gethostname())
  18. def add_common_arguments(parser):
  19. parser.add_argument('--copy-to') # used by jbuild in fetch_resource
  20. parser.add_argument('--rename-to') # used by test_node in inject_mds_resource_to_graph
  21. parser.add_argument('--copy-to-dir')
  22. parser.add_argument('--untar-to')
  23. parser.add_argument(
  24. '--rename', action='append', default=[], metavar='FILE', help='rename FILE to the corresponding output'
  25. )
  26. parser.add_argument('--executable', action='store_true', help='make outputs executable')
  27. parser.add_argument('--log-path')
  28. parser.add_argument(
  29. '-v',
  30. '--verbose',
  31. action='store_true',
  32. default=os.environ.get('YA_VERBOSE_FETCHER'),
  33. help='increase stderr verbosity',
  34. )
  35. parser.add_argument('outputs', nargs='*', default=[])
  36. def ensure_dir(path):
  37. if not (path == '' or os.path.isdir(path)):
  38. os.makedirs(path)
  39. # Reference code: library/python/fs/__init__.py
  40. def hardlink_or_copy(src, dst):
  41. ensure_dir(os.path.dirname(dst))
  42. if os.name == 'nt':
  43. shutil.copy(src, dst)
  44. else:
  45. try:
  46. os.link(src, dst)
  47. except OSError as e:
  48. if e.errno == errno.EEXIST:
  49. return
  50. elif e.errno in (errno.EXDEV, errno.EMLINK, errno.EINVAL, errno.EACCES):
  51. sys.stderr.write(
  52. "Can't make hardlink (errno={}) - fallback to copy: {} -> {}\n".format(e.errno, src, dst)
  53. )
  54. shutil.copy(src, dst)
  55. else:
  56. sys.stderr.write("src: {} dst: {}\n".format(src, dst))
  57. raise
  58. def rename_or_copy_and_remove(src, dst):
  59. ensure_dir(os.path.dirname(dst))
  60. try:
  61. os.rename(src, dst)
  62. except OSError:
  63. shutil.copy(src, dst)
  64. os.remove(src)
  65. class BadChecksumFetchError(Exception):
  66. pass
  67. class IncompleteFetchError(Exception):
  68. pass
  69. class ResourceUnpackingError(Exception):
  70. pass
  71. class ResourceIsDirectoryError(Exception):
  72. pass
  73. class OutputIsDirectoryError(Exception):
  74. pass
  75. class OutputNotExistError(Exception):
  76. pass
  77. def setup_logging(args, base_name):
  78. def makedirs(path):
  79. try:
  80. os.makedirs(path)
  81. except OSError:
  82. pass
  83. if args.log_path:
  84. log_file_name = args.log_path
  85. else:
  86. log_file_name = base_name + ".log"
  87. args.abs_log_path = os.path.abspath(log_file_name)
  88. makedirs(os.path.dirname(args.abs_log_path))
  89. logging.basicConfig(filename=args.abs_log_path, level=logging.DEBUG)
  90. if args.verbose:
  91. logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
  92. def is_temporary(e):
  93. def is_broken(e):
  94. return isinstance(e, urllib2.HTTPError) and e.code in (410, 404)
  95. if is_broken(e):
  96. return False
  97. if isinstance(e, (BadChecksumFetchError, IncompleteFetchError, urllib2.URLError, socket.error)):
  98. return True
  99. import error
  100. return error.is_temporary_error(e)
  101. def uniq_string_generator(size=6, chars=string.ascii_lowercase + string.digits):
  102. return ''.join(random.choice(chars) for _ in range(size))
  103. def report_to_snowden(value):
  104. def inner():
  105. body = {
  106. 'namespace': 'ygg',
  107. 'key': 'fetch-from-sandbox',
  108. 'value': json.dumps(value),
  109. }
  110. urllib2.urlopen(
  111. 'https://back-snowden.qloud.yandex-team.ru/report/add',
  112. json.dumps(
  113. [
  114. body,
  115. ]
  116. ),
  117. timeout=5,
  118. )
  119. try:
  120. inner()
  121. except Exception as e:
  122. logging.warning('report_to_snowden failed: %s', e)
  123. def copy_stream(read, *writers, **kwargs):
  124. chunk_size = kwargs.get('size', 1024 * 1024)
  125. while True:
  126. data = read(chunk_size)
  127. if not data:
  128. break
  129. for write in writers:
  130. write(data)
  131. def md5file(fname):
  132. res = hashlib.md5()
  133. with open(fname, 'rb') as f:
  134. copy_stream(f.read, res.update)
  135. return res.hexdigest()
  136. def git_like_hash_with_size(filepath):
  137. """
  138. Calculate git like hash for path
  139. """
  140. sha = hashlib.sha1()
  141. file_size = 0
  142. with open(filepath, 'rb') as f:
  143. while True:
  144. block = f.read(2**16)
  145. if not block:
  146. break
  147. file_size += len(block)
  148. sha.update(block)
  149. sha.update('\0')
  150. sha.update(str(file_size))
  151. return sha.hexdigest(), file_size
  152. def size_printer(display_name, size):
  153. sz = [0]
  154. last_stamp = [dt.datetime.now()]
  155. def printer(chunk):
  156. sz[0] += len(chunk)
  157. now = dt.datetime.now()
  158. if last_stamp[0] + dt.timedelta(seconds=10) < now:
  159. if size:
  160. print >> sys.stderr, "##status##{} - [[imp]]{:.1f}%[[rst]]".format(
  161. display_name, 100.0 * sz[0] / size if size else 0
  162. )
  163. last_stamp[0] = now
  164. return printer
  165. def fetch_url(url, unpack, resource_file_name, expected_md5=None, expected_sha1=None, tries=10, writers=None):
  166. logging.info('Downloading from url %s name %s and expected md5 %s', url, resource_file_name, expected_md5)
  167. tmp_file_name = uniq_string_generator()
  168. request = urllib2.Request(url, headers={'User-Agent': make_user_agent()})
  169. req = retry.retry_func(lambda: urllib2.urlopen(request, timeout=30), tries=tries, delay=5, backoff=1.57079)
  170. logging.debug('Headers: %s', req.headers.headers)
  171. expected_file_size = int(req.headers.get('Content-Length', 0))
  172. real_md5 = hashlib.md5()
  173. real_sha1 = hashlib.sha1()
  174. with open(tmp_file_name, 'wb') as fp:
  175. copy_stream(
  176. req.read,
  177. fp.write,
  178. real_md5.update,
  179. real_sha1.update,
  180. size_printer(resource_file_name, expected_file_size),
  181. *([] if writers is None else writers)
  182. )
  183. real_md5 = real_md5.hexdigest()
  184. real_file_size = os.path.getsize(tmp_file_name)
  185. real_sha1.update('\0')
  186. real_sha1.update(str(real_file_size))
  187. real_sha1 = real_sha1.hexdigest()
  188. if unpack:
  189. tmp_dir = tmp_file_name + '.dir'
  190. os.makedirs(tmp_dir)
  191. with tarfile.open(tmp_file_name, mode="r|gz") as tar:
  192. tar.extractall(tmp_dir)
  193. tmp_file_name = os.path.join(tmp_dir, resource_file_name)
  194. if expected_md5:
  195. real_md5 = md5file(tmp_file_name)
  196. logging.info('File size %s (expected %s)', real_file_size, expected_file_size or "UNKNOWN")
  197. logging.info('File md5 %s (expected %s)', real_md5, expected_md5)
  198. logging.info('File sha1 %s (expected %s)', real_sha1, expected_sha1)
  199. if expected_md5 and real_md5 != expected_md5:
  200. report_to_snowden({'headers': req.headers.headers, 'expected_md5': expected_md5, 'real_md5': real_md5})
  201. raise BadChecksumFetchError(
  202. 'Downloaded {}, but expected {} for {}'.format(
  203. real_md5,
  204. expected_md5,
  205. url,
  206. )
  207. )
  208. if expected_sha1 and real_sha1 != expected_sha1:
  209. report_to_snowden({'headers': req.headers.headers, 'expected_sha1': expected_sha1, 'real_sha1': real_sha1})
  210. raise BadChecksumFetchError(
  211. 'Downloaded {}, but expected {} for {}'.format(
  212. real_sha1,
  213. expected_sha1,
  214. url,
  215. )
  216. )
  217. if expected_file_size and expected_file_size != real_file_size:
  218. report_to_snowden({'headers': req.headers.headers, 'file_size': real_file_size})
  219. raise IncompleteFetchError(
  220. 'Downloaded {}, but expected {} for {}'.format(
  221. real_file_size,
  222. expected_file_size,
  223. url,
  224. )
  225. )
  226. return tmp_file_name
  227. def chmod(filename, mode):
  228. if platform.system().lower() == 'windows':
  229. # https://docs.microsoft.com/en-us/windows/win32/fileio/hard-links-and-junctions:
  230. # hard to reset read-only attribute for removal if there are multiple hardlinks
  231. return
  232. stat = os.stat(filename)
  233. if stat.st_mode & 0o777 != mode:
  234. try:
  235. os.chmod(filename, mode)
  236. except OSError:
  237. import pwd
  238. sys.stderr.write(
  239. "{} st_mode: {} pwuid: {}\n".format(filename, stat.st_mode, pwd.getpwuid(os.stat(filename).st_uid))
  240. )
  241. raise
  242. def make_readonly(filename):
  243. chmod(filename, os.stat(filename).st_mode & 0o111 | 0o444)
  244. def process(fetched_file, file_name, args, remove=True):
  245. assert len(args.rename) <= len(args.outputs), ('too few outputs to rename', args.rename, 'into', args.outputs)
  246. fetched_file_is_dir = os.path.isdir(fetched_file)
  247. if fetched_file_is_dir and not args.untar_to:
  248. raise ResourceIsDirectoryError('Resource may be directory only with untar_to option: ' + fetched_file)
  249. # make all read only
  250. if fetched_file_is_dir:
  251. for root, _, files in os.walk(fetched_file):
  252. for filename in files:
  253. make_readonly(os.path.join(root, filename))
  254. else:
  255. make_readonly(fetched_file)
  256. if args.copy_to:
  257. hardlink_or_copy(fetched_file, args.copy_to)
  258. if not args.outputs:
  259. args.outputs = [args.copy_to]
  260. if args.rename_to:
  261. args.rename.append(fetched_file)
  262. if not args.outputs:
  263. args.outputs = [args.rename_to]
  264. if args.copy_to_dir:
  265. hardlink_or_copy(fetched_file, os.path.join(args.copy_to_dir, file_name))
  266. if args.untar_to:
  267. ensure_dir(args.untar_to)
  268. inputs = set(map(os.path.normpath, args.rename + args.outputs[len(args.rename) :]))
  269. if fetched_file_is_dir:
  270. for member in inputs:
  271. base, name = member.split('/', 1)
  272. src = os.path.normpath(os.path.join(fetched_file, name))
  273. dst = os.path.normpath(os.path.join(args.untar_to, member))
  274. hardlink_or_copy(src, dst)
  275. else:
  276. # Extract only requested files
  277. try:
  278. with tarfile.open(fetched_file, mode='r:*') as tar:
  279. members = [
  280. entry for entry in tar if os.path.normpath(os.path.join(args.untar_to, entry.name)) in inputs
  281. ]
  282. tar.extractall(args.untar_to, members=members)
  283. except tarfile.ReadError as e:
  284. logging.exception(e)
  285. raise ResourceUnpackingError('File {} cannot be untared'.format(fetched_file))
  286. # Forbid changes to the loaded resource data
  287. for root, _, files in os.walk(args.untar_to):
  288. for filename in files:
  289. make_readonly(os.path.join(root, filename))
  290. for src, dst in zip(args.rename, args.outputs):
  291. if src == 'RESOURCE':
  292. src = fetched_file
  293. if os.path.abspath(src) == os.path.abspath(fetched_file):
  294. logging.info('Copying %s to %s', src, dst)
  295. hardlink_or_copy(src, dst)
  296. else:
  297. logging.info('Renaming %s to %s', src, dst)
  298. if os.path.exists(dst):
  299. raise ResourceUnpackingError("Target file already exists ({} -> {})".format(src, dst))
  300. if remove:
  301. rename_or_copy_and_remove(src, dst)
  302. else:
  303. hardlink_or_copy(src, dst)
  304. for path in args.outputs:
  305. if not os.path.exists(path):
  306. raise OutputNotExistError('Output does not exist: %s' % os.path.abspath(path))
  307. if not os.path.isfile(path):
  308. raise OutputIsDirectoryError('Output must be a file, not a directory: %s' % os.path.abspath(path))
  309. if args.executable:
  310. chmod(path, os.stat(path).st_mode | 0o111)
  311. if os.path.abspath(path) == os.path.abspath(fetched_file):
  312. remove = False
  313. if remove:
  314. if fetched_file_is_dir:
  315. shutil.rmtree(fetched_file)
  316. else:
  317. os.remove(fetched_file)