_format.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. # -*- test-case-name: twisted.logger.test.test_format -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Tools for formatting logging events.
  6. """
  7. from __future__ import annotations
  8. from datetime import datetime as DateTime
  9. from typing import Any, Callable, Iterator, Mapping, Optional, Union, cast
  10. from constantly import NamedConstant
  11. from twisted.python._tzhelper import FixedOffsetTimeZone
  12. from twisted.python.failure import Failure
  13. from twisted.python.reflect import safe_repr
  14. from ._flatten import aFormatter, flatFormat
  15. from ._interfaces import LogEvent
  16. timeFormatRFC3339 = "%Y-%m-%dT%H:%M:%S%z"
  17. def formatEvent(event: LogEvent) -> str:
  18. """
  19. Formats an event as text, using the format in C{event["log_format"]}.
  20. This implementation should never raise an exception; if the formatting
  21. cannot be done, the returned string will describe the event generically so
  22. that a useful message is emitted regardless.
  23. @param event: A logging event.
  24. @return: A formatted string.
  25. """
  26. return eventAsText(
  27. event,
  28. includeTraceback=False,
  29. includeTimestamp=False,
  30. includeSystem=False,
  31. )
  32. def formatUnformattableEvent(event: LogEvent, error: BaseException) -> str:
  33. """
  34. Formats an event as text that describes the event generically and a
  35. formatting error.
  36. @param event: A logging event.
  37. @param error: The formatting error.
  38. @return: A formatted string.
  39. """
  40. try:
  41. return "Unable to format event {event!r}: {error}".format(
  42. event=event, error=error
  43. )
  44. except BaseException:
  45. # Yikes, something really nasty happened.
  46. #
  47. # Try to recover as much formattable data as possible; hopefully at
  48. # least the namespace is sane, which will help you find the offending
  49. # logger.
  50. failure = Failure()
  51. text = ", ".join(
  52. " = ".join((safe_repr(key), safe_repr(value)))
  53. for key, value in event.items()
  54. )
  55. return (
  56. "MESSAGE LOST: unformattable object logged: {error}\n"
  57. "Recoverable data: {text}\n"
  58. "Exception during formatting:\n{failure}".format(
  59. error=safe_repr(error), failure=failure, text=text
  60. )
  61. )
  62. def formatTime(
  63. when: Optional[float],
  64. timeFormat: Optional[str] = timeFormatRFC3339,
  65. default: str = "-",
  66. ) -> str:
  67. """
  68. Format a timestamp as text.
  69. Example::
  70. >>> from time import time
  71. >>> from twisted.logger import formatTime
  72. >>>
  73. >>> t = time()
  74. >>> formatTime(t)
  75. u'2013-10-22T14:19:11-0700'
  76. >>> formatTime(t, timeFormat="%Y/%W") # Year and week number
  77. u'2013/42'
  78. >>>
  79. @param when: A timestamp.
  80. @param timeFormat: A time format.
  81. @param default: Text to return if C{when} or C{timeFormat} is L{None}.
  82. @return: A formatted time.
  83. """
  84. if timeFormat is None or when is None:
  85. return default
  86. else:
  87. tz = FixedOffsetTimeZone.fromLocalTimeStamp(when)
  88. datetime = DateTime.fromtimestamp(when, tz)
  89. return str(datetime.strftime(timeFormat))
  90. def formatEventAsClassicLogText(
  91. event: LogEvent, formatTime: Callable[[Optional[float]], str] = formatTime
  92. ) -> Optional[str]:
  93. """
  94. Format an event as a line of human-readable text for, e.g. traditional log
  95. file output.
  96. The output format is C{"{timeStamp} [{system}] {event}\\n"}, where:
  97. - C{timeStamp} is computed by calling the given C{formatTime} callable
  98. on the event's C{"log_time"} value
  99. - C{system} is the event's C{"log_system"} value, if set, otherwise,
  100. the C{"log_namespace"} and C{"log_level"}, joined by a C{"#"}. Each
  101. defaults to C{"-"} is not set.
  102. - C{event} is the event, as formatted by L{formatEvent}.
  103. Example::
  104. >>> from time import time
  105. >>> from twisted.logger import formatEventAsClassicLogText
  106. >>> from twisted.logger import LogLevel
  107. >>>
  108. >>> formatEventAsClassicLogText(dict()) # No format, returns None
  109. >>> formatEventAsClassicLogText(dict(log_format="Hello!"))
  110. u'- [-#-] Hello!\\n'
  111. >>> formatEventAsClassicLogText(dict(
  112. ... log_format="Hello!",
  113. ... log_time=time(),
  114. ... log_namespace="my_namespace",
  115. ... log_level=LogLevel.info,
  116. ... ))
  117. u'2013-10-22T17:30:02-0700 [my_namespace#info] Hello!\\n'
  118. >>> formatEventAsClassicLogText(dict(
  119. ... log_format="Hello!",
  120. ... log_time=time(),
  121. ... log_system="my_system",
  122. ... ))
  123. u'2013-11-11T17:22:06-0800 [my_system] Hello!\\n'
  124. >>>
  125. @param event: an event.
  126. @param formatTime: A time formatter
  127. @return: A formatted event, or L{None} if no output is appropriate.
  128. """
  129. eventText = eventAsText(event, formatTime=formatTime)
  130. if not eventText:
  131. return None
  132. eventText = eventText.replace("\n", "\n\t")
  133. return eventText + "\n"
  134. def keycall(key: str, getter: Callable[[str], Any]) -> PotentialCallWrapper:
  135. """
  136. Check to see if C{key} ends with parentheses ("C{()}"); if not, wrap up the
  137. result of C{get} in a L{PotentialCallWrapper}. Otherwise, call the result
  138. of C{get} first, before wrapping it up.
  139. @param key: The last dotted segment of a formatting key, as parsed by
  140. L{Formatter.vformat}, which may end in C{()}.
  141. @param getter: A function which takes a string and returns some other
  142. object, to be formatted and stringified for a log.
  143. @return: A L{PotentialCallWrapper} that will wrap up the result to allow
  144. for subsequent usages of parens to defer execution to log-format time.
  145. """
  146. callit = key.endswith("()")
  147. realKey = key[:-2] if callit else key
  148. value = getter(realKey)
  149. if callit:
  150. value = value()
  151. return PotentialCallWrapper(value)
  152. class PotentialCallWrapper(object):
  153. """
  154. Object wrapper that wraps C{getattr()} so as to process call-parentheses
  155. C{"()"} after a dotted attribute access.
  156. """
  157. def __init__(self, wrapped: object) -> None:
  158. self._wrapped = wrapped
  159. def __getattr__(self, name: str) -> object:
  160. return keycall(name, self._wrapped.__getattribute__)
  161. def __getitem__(self, name: str) -> object:
  162. # The sub-object may not be indexable, but if it isn't, that's the
  163. # caller's problem.
  164. value = self._wrapped[name] # type:ignore[index]
  165. return PotentialCallWrapper(value)
  166. def __format__(self, format_spec: str) -> str:
  167. return self._wrapped.__format__(format_spec)
  168. def __repr__(self) -> str:
  169. return self._wrapped.__repr__()
  170. def __str__(self) -> str:
  171. return self._wrapped.__str__()
  172. class CallMapping(Mapping[str, Any]):
  173. """
  174. Read-only mapping that turns a C{()}-suffix in key names into an invocation
  175. of the key rather than a lookup of the key.
  176. Implementation support for L{formatWithCall}.
  177. """
  178. def __init__(self, submapping: Mapping[str, Any]) -> None:
  179. """
  180. @param submapping: Another read-only mapping which will be used to look
  181. up items.
  182. """
  183. self._submapping = submapping
  184. def __iter__(self) -> Iterator[Any]:
  185. return iter(self._submapping)
  186. def __len__(self) -> int:
  187. return len(self._submapping)
  188. def __getitem__(self, key: str) -> Any:
  189. """
  190. Look up an item in the submapping for this L{CallMapping}, calling it
  191. if C{key} ends with C{"()"}.
  192. """
  193. return keycall(key, self._submapping.__getitem__)
  194. def formatWithCall(formatString: str, mapping: Mapping[str, Any]) -> str:
  195. """
  196. Format a string like L{str.format}, but:
  197. - taking only a name mapping; no positional arguments
  198. - with the additional syntax that an empty set of parentheses
  199. correspond to a formatting item that should be called, and its result
  200. C{str}'d, rather than calling C{str} on the element directly as
  201. normal.
  202. For example::
  203. >>> formatWithCall("{string}, {function()}.",
  204. ... dict(string="just a string",
  205. ... function=lambda: "a function"))
  206. 'just a string, a function.'
  207. @param formatString: A PEP-3101 format string.
  208. @param mapping: A L{dict}-like object to format.
  209. @return: The string with formatted values interpolated.
  210. """
  211. return str(aFormatter.vformat(formatString, (), CallMapping(mapping)))
  212. def _formatEvent(event: LogEvent) -> str:
  213. """
  214. Formats an event as a string, using the format in C{event["log_format"]}.
  215. This implementation should never raise an exception; if the formatting
  216. cannot be done, the returned string will describe the event generically so
  217. that a useful message is emitted regardless.
  218. @param event: A logging event.
  219. @return: A formatted string.
  220. """
  221. try:
  222. if "log_flattened" in event:
  223. return flatFormat(event)
  224. format = cast(Optional[Union[str, bytes]], event.get("log_format", None))
  225. if format is None:
  226. return ""
  227. # Make sure format is text.
  228. if isinstance(format, str):
  229. pass
  230. elif isinstance(format, bytes):
  231. format = format.decode("utf-8")
  232. else:
  233. raise TypeError(f"Log format must be str, not {format!r}")
  234. return formatWithCall(format, event)
  235. except BaseException as e:
  236. return formatUnformattableEvent(event, e)
  237. def _formatTraceback(failure: Failure) -> str:
  238. """
  239. Format a failure traceback, assuming UTF-8 and using a replacement
  240. strategy for errors. Every effort is made to provide a usable
  241. traceback, but should not that not be possible, a message and the
  242. captured exception are logged.
  243. @param failure: The failure to retrieve a traceback from.
  244. @return: The formatted traceback.
  245. """
  246. try:
  247. traceback = failure.getTraceback()
  248. except BaseException as e:
  249. traceback = "(UNABLE TO OBTAIN TRACEBACK FROM EVENT):" + str(e)
  250. return traceback
  251. def _formatSystem(event: LogEvent) -> str:
  252. """
  253. Format the system specified in the event in the "log_system" key if set,
  254. otherwise the C{"log_namespace"} and C{"log_level"}, joined by a C{"#"}.
  255. Each defaults to C{"-"} is not set. If formatting fails completely,
  256. "UNFORMATTABLE" is returned.
  257. @param event: The event containing the system specification.
  258. @return: A formatted string representing the "log_system" key.
  259. """
  260. system = cast(Optional[str], event.get("log_system", None))
  261. if system is None:
  262. level = cast(Optional[NamedConstant], event.get("log_level", None))
  263. if level is None:
  264. levelName = "-"
  265. else:
  266. levelName = level.name
  267. system = "{namespace}#{level}".format(
  268. namespace=cast(str, event.get("log_namespace", "-")),
  269. level=levelName,
  270. )
  271. else:
  272. try:
  273. system = str(system)
  274. except Exception:
  275. system = "UNFORMATTABLE"
  276. return system
  277. def eventAsText(
  278. event: LogEvent,
  279. includeTraceback: bool = True,
  280. includeTimestamp: bool = True,
  281. includeSystem: bool = True,
  282. formatTime: Callable[[float], str] = formatTime,
  283. ) -> str:
  284. r"""
  285. Format an event as text. Optionally, attach timestamp, traceback, and
  286. system information.
  287. The full output format is:
  288. C{"{timeStamp} [{system}] {event}\n{traceback}\n"} where:
  289. - C{timeStamp} is the event's C{"log_time"} value formatted with
  290. the provided C{formatTime} callable.
  291. - C{system} is the event's C{"log_system"} value, if set, otherwise,
  292. the C{"log_namespace"} and C{"log_level"}, joined by a C{"#"}. Each
  293. defaults to C{"-"} is not set.
  294. - C{event} is the event, as formatted by L{formatEvent}.
  295. - C{traceback} is the traceback if the event contains a
  296. C{"log_failure"} key. In the event the original traceback cannot
  297. be formatted, a message indicating the failure will be substituted.
  298. If the event cannot be formatted, and no traceback exists, an empty string
  299. is returned, even if includeSystem or includeTimestamp are true.
  300. @param event: A logging event.
  301. @param includeTraceback: If true and a C{"log_failure"} key exists, append
  302. a traceback.
  303. @param includeTimestamp: If true include a formatted timestamp before the
  304. event.
  305. @param includeSystem: If true, include the event's C{"log_system"} value.
  306. @param formatTime: A time formatter
  307. @return: A formatted string with specified options.
  308. @since: Twisted 18.9.0
  309. """
  310. eventText = _formatEvent(event)
  311. if includeTraceback and "log_failure" in event:
  312. f = event["log_failure"]
  313. traceback = _formatTraceback(f)
  314. eventText = "\n".join((eventText, traceback))
  315. if not eventText:
  316. return eventText
  317. timeStamp = ""
  318. if includeTimestamp:
  319. timeStamp = "".join([formatTime(cast(float, event.get("log_time", None))), " "])
  320. system = ""
  321. if includeSystem:
  322. system = "".join(["[", _formatSystem(event), "]", " "])
  323. return "{timeStamp}{system}{eventText}".format(
  324. timeStamp=timeStamp,
  325. system=system,
  326. eventText=eventText,
  327. )