util.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. # coding: utf-8
  2. """
  3. some helper functions that might be generally useful
  4. """
  5. import datetime
  6. from functools import partial
  7. import re
  8. from typing import Any, Dict, Optional, List, Text, Callable, Union # NOQA
  9. from .compat import StreamTextType # NOQA
  10. class LazyEval:
  11. """
  12. Lightweight wrapper around lazily evaluated func(*args, **kwargs).
  13. func is only evaluated when any attribute of its return value is accessed.
  14. Every attribute access is passed through to the wrapped value.
  15. (This only excludes special cases like method-wrappers, e.g., __hash__.)
  16. The sole additional attribute is the lazy_self function which holds the
  17. return value (or, prior to evaluation, func and arguments), in its closure.
  18. """
  19. def __init__(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
  20. def lazy_self() -> Any:
  21. return_value = func(*args, **kwargs)
  22. object.__setattr__(self, 'lazy_self', lambda: return_value)
  23. return return_value
  24. object.__setattr__(self, 'lazy_self', lazy_self)
  25. def __getattribute__(self, name: str) -> Any:
  26. lazy_self = object.__getattribute__(self, 'lazy_self')
  27. if name == 'lazy_self':
  28. return lazy_self
  29. return getattr(lazy_self(), name)
  30. def __setattr__(self, name: str, value: Any) -> None:
  31. setattr(self.lazy_self(), name, value)
  32. RegExp = partial(LazyEval, re.compile)
  33. timestamp_regexp = RegExp(
  34. """^(?P<year>[0-9][0-9][0-9][0-9])
  35. -(?P<month>[0-9][0-9]?)
  36. -(?P<day>[0-9][0-9]?)
  37. (?:((?P<t>[Tt])|[ \\t]+) # explictly not retaining extra spaces
  38. (?P<hour>[0-9][0-9]?)
  39. :(?P<minute>[0-9][0-9])
  40. :(?P<second>[0-9][0-9])
  41. (?:\\.(?P<fraction>[0-9]*))?
  42. (?:[ \\t]*(?P<tz>Z|(?P<tz_sign>[-+])(?P<tz_hour>[0-9][0-9]?)
  43. (?::(?P<tz_minute>[0-9][0-9]))?))?)?$""",
  44. re.X,
  45. )
  46. def create_timestamp(
  47. year: Any,
  48. month: Any,
  49. day: Any,
  50. t: Any,
  51. hour: Any,
  52. minute: Any,
  53. second: Any,
  54. fraction: Any,
  55. tz: Any,
  56. tz_sign: Any,
  57. tz_hour: Any,
  58. tz_minute: Any,
  59. ) -> Union[datetime.datetime, datetime.date]:
  60. # create a timestamp from match against timestamp_regexp
  61. MAX_FRAC = 999999
  62. year = int(year)
  63. month = int(month)
  64. day = int(day)
  65. if not hour:
  66. return datetime.date(year, month, day)
  67. hour = int(hour)
  68. minute = int(minute)
  69. second = int(second)
  70. frac = 0
  71. if fraction:
  72. frac_s = fraction[:6]
  73. while len(frac_s) < 6:
  74. frac_s += '0'
  75. frac = int(frac_s)
  76. if len(fraction) > 6 and int(fraction[6]) > 4:
  77. frac += 1
  78. if frac > MAX_FRAC:
  79. fraction = 0
  80. else:
  81. fraction = frac
  82. else:
  83. fraction = 0
  84. delta = None
  85. if tz_sign:
  86. tz_hour = int(tz_hour)
  87. tz_minute = int(tz_minute) if tz_minute else 0
  88. delta = datetime.timedelta(
  89. hours=tz_hour, minutes=tz_minute, seconds=1 if frac > MAX_FRAC else 0,
  90. )
  91. if tz_sign == '-':
  92. delta = -delta
  93. elif frac > MAX_FRAC:
  94. delta = -datetime.timedelta(seconds=1)
  95. # should do something else instead (or hook this up to the preceding if statement
  96. # in reverse
  97. # if delta is None:
  98. # return datetime.datetime(year, month, day, hour, minute, second, fraction)
  99. # return datetime.datetime(year, month, day, hour, minute, second, fraction,
  100. # datetime.timezone.utc)
  101. # the above is not good enough though, should provide tzinfo. In Python3 that is easily
  102. # doable drop that kind of support for Python2 as it has not native tzinfo
  103. data = datetime.datetime(year, month, day, hour, minute, second, fraction)
  104. if delta:
  105. data -= delta
  106. return data
  107. # originally as comment
  108. # https://github.com/pre-commit/pre-commit/pull/211#issuecomment-186466605
  109. # if you use this in your code, I suggest adding a test in your test suite
  110. # that check this routines output against a known piece of your YAML
  111. # before upgrades to this code break your round-tripped YAML
  112. def load_yaml_guess_indent(stream: StreamTextType, **kw: Any) -> Any:
  113. """guess the indent and block sequence indent of yaml stream/string
  114. returns round_trip_loaded stream, indent level, block sequence indent
  115. - block sequence indent is the number of spaces before a dash relative to previous indent
  116. - if there are no block sequences, indent is taken from nested mappings, block sequence
  117. indent is unset (None) in that case
  118. """
  119. from .main import YAML
  120. # load a YAML document, guess the indentation, if you use TABs you are on your own
  121. def leading_spaces(line: Any) -> int:
  122. idx = 0
  123. while idx < len(line) and line[idx] == ' ':
  124. idx += 1
  125. return idx
  126. if isinstance(stream, str):
  127. yaml_str: Any = stream
  128. elif isinstance(stream, bytes):
  129. # most likely, but the Reader checks BOM for this
  130. yaml_str = stream.decode('utf-8')
  131. else:
  132. yaml_str = stream.read()
  133. map_indent = None
  134. indent = None # default if not found for some reason
  135. block_seq_indent = None
  136. prev_line_key_only = None
  137. key_indent = 0
  138. for line in yaml_str.splitlines():
  139. rline = line.rstrip()
  140. lline = rline.lstrip()
  141. if lline.startswith('- '):
  142. l_s = leading_spaces(line)
  143. block_seq_indent = l_s - key_indent
  144. idx = l_s + 1
  145. while line[idx] == ' ': # this will end as we rstripped
  146. idx += 1
  147. if line[idx] == '#': # comment after -
  148. continue
  149. indent = idx - key_indent
  150. break
  151. if map_indent is None and prev_line_key_only is not None and rline:
  152. idx = 0
  153. while line[idx] in ' -':
  154. idx += 1
  155. if idx > prev_line_key_only:
  156. map_indent = idx - prev_line_key_only
  157. if rline.endswith(':'):
  158. key_indent = leading_spaces(line)
  159. idx = 0
  160. while line[idx] == ' ': # this will end on ':'
  161. idx += 1
  162. prev_line_key_only = idx
  163. continue
  164. prev_line_key_only = None
  165. if indent is None and map_indent is not None:
  166. indent = map_indent
  167. yaml = YAML()
  168. return yaml.load(yaml_str, **kw), indent, block_seq_indent
  169. def configobj_walker(cfg: Any) -> Any:
  170. """
  171. walks over a ConfigObj (INI file with comments) generating
  172. corresponding YAML output (including comments
  173. """
  174. from configobj import ConfigObj # type: ignore
  175. assert isinstance(cfg, ConfigObj)
  176. for c in cfg.initial_comment:
  177. if c.strip():
  178. yield c
  179. for s in _walk_section(cfg):
  180. if s.strip():
  181. yield s
  182. for c in cfg.final_comment:
  183. if c.strip():
  184. yield c
  185. def _walk_section(s: Any, level: int = 0) -> Any:
  186. from configobj import Section
  187. assert isinstance(s, Section)
  188. indent = ' ' * level
  189. for name in s.scalars:
  190. for c in s.comments[name]:
  191. yield indent + c.strip()
  192. x = s[name]
  193. if '\n' in x:
  194. i = indent + ' '
  195. x = '|\n' + i + x.strip().replace('\n', '\n' + i)
  196. elif ':' in x:
  197. x = "'" + x.replace("'", "''") + "'"
  198. line = f'{indent}{name}: {x}'
  199. c = s.inline_comments[name]
  200. if c:
  201. line += ' ' + c
  202. yield line
  203. for name in s.sections:
  204. for c in s.comments[name]:
  205. yield indent + c.strip()
  206. line = f'{indent}{name}:'
  207. c = s.inline_comments[name]
  208. if c:
  209. line += ' ' + c
  210. yield line
  211. for val in _walk_section(s[name], level=level + 1):
  212. yield val
  213. # def config_obj_2_rt_yaml(cfg):
  214. # from .comments import CommentedMap, CommentedSeq
  215. # from configobj import ConfigObj
  216. # assert isinstance(cfg, ConfigObj)
  217. # #for c in cfg.initial_comment:
  218. # # if c.strip():
  219. # # pass
  220. # cm = CommentedMap()
  221. # for name in s.sections:
  222. # cm[name] = d = CommentedMap()
  223. #
  224. #
  225. # #for c in cfg.final_comment:
  226. # # if c.strip():
  227. # # yield c
  228. # return cm