123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 |
- # -*- test-case-name: twisted.logger.test.test_format -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tools for formatting logging events.
- """
- from __future__ import annotations
- from datetime import datetime as DateTime
- from typing import Any, Callable, Iterator, Mapping, Optional, Union, cast
- from constantly import NamedConstant
- from twisted.python._tzhelper import FixedOffsetTimeZone
- from twisted.python.failure import Failure
- from twisted.python.reflect import safe_repr
- from ._flatten import aFormatter, flatFormat
- from ._interfaces import LogEvent
- timeFormatRFC3339 = "%Y-%m-%dT%H:%M:%S%z"
- def formatEvent(event: LogEvent) -> str:
- """
- Formats an event as text, using the format in C{event["log_format"]}.
- This implementation should never raise an exception; if the formatting
- cannot be done, the returned string will describe the event generically so
- that a useful message is emitted regardless.
- @param event: A logging event.
- @return: A formatted string.
- """
- return eventAsText(
- event,
- includeTraceback=False,
- includeTimestamp=False,
- includeSystem=False,
- )
- def formatUnformattableEvent(event: LogEvent, error: BaseException) -> str:
- """
- Formats an event as text that describes the event generically and a
- formatting error.
- @param event: A logging event.
- @param error: The formatting error.
- @return: A formatted string.
- """
- try:
- return "Unable to format event {event!r}: {error}".format(
- event=event, error=error
- )
- except BaseException:
- # Yikes, something really nasty happened.
- #
- # Try to recover as much formattable data as possible; hopefully at
- # least the namespace is sane, which will help you find the offending
- # logger.
- failure = Failure()
- text = ", ".join(
- " = ".join((safe_repr(key), safe_repr(value)))
- for key, value in event.items()
- )
- return (
- "MESSAGE LOST: unformattable object logged: {error}\n"
- "Recoverable data: {text}\n"
- "Exception during formatting:\n{failure}".format(
- error=safe_repr(error), failure=failure, text=text
- )
- )
- def formatTime(
- when: Optional[float],
- timeFormat: Optional[str] = timeFormatRFC3339,
- default: str = "-",
- ) -> str:
- """
- Format a timestamp as text.
- Example::
- >>> from time import time
- >>> from twisted.logger import formatTime
- >>>
- >>> t = time()
- >>> formatTime(t)
- u'2013-10-22T14:19:11-0700'
- >>> formatTime(t, timeFormat="%Y/%W") # Year and week number
- u'2013/42'
- >>>
- @param when: A timestamp.
- @param timeFormat: A time format.
- @param default: Text to return if C{when} or C{timeFormat} is L{None}.
- @return: A formatted time.
- """
- if timeFormat is None or when is None:
- return default
- else:
- tz = FixedOffsetTimeZone.fromLocalTimeStamp(when)
- datetime = DateTime.fromtimestamp(when, tz)
- return str(datetime.strftime(timeFormat))
- def formatEventAsClassicLogText(
- event: LogEvent, formatTime: Callable[[Optional[float]], str] = formatTime
- ) -> Optional[str]:
- """
- Format an event as a line of human-readable text for, e.g. traditional log
- file output.
- The output format is C{"{timeStamp} [{system}] {event}\\n"}, where:
- - C{timeStamp} is computed by calling the given C{formatTime} callable
- on the event's C{"log_time"} value
- - C{system} is the event's C{"log_system"} value, if set, otherwise,
- the C{"log_namespace"} and C{"log_level"}, joined by a C{"#"}. Each
- defaults to C{"-"} is not set.
- - C{event} is the event, as formatted by L{formatEvent}.
- Example::
- >>> from time import time
- >>> from twisted.logger import formatEventAsClassicLogText
- >>> from twisted.logger import LogLevel
- >>>
- >>> formatEventAsClassicLogText(dict()) # No format, returns None
- >>> formatEventAsClassicLogText(dict(log_format="Hello!"))
- u'- [-#-] Hello!\\n'
- >>> formatEventAsClassicLogText(dict(
- ... log_format="Hello!",
- ... log_time=time(),
- ... log_namespace="my_namespace",
- ... log_level=LogLevel.info,
- ... ))
- u'2013-10-22T17:30:02-0700 [my_namespace#info] Hello!\\n'
- >>> formatEventAsClassicLogText(dict(
- ... log_format="Hello!",
- ... log_time=time(),
- ... log_system="my_system",
- ... ))
- u'2013-11-11T17:22:06-0800 [my_system] Hello!\\n'
- >>>
- @param event: an event.
- @param formatTime: A time formatter
- @return: A formatted event, or L{None} if no output is appropriate.
- """
- eventText = eventAsText(event, formatTime=formatTime)
- if not eventText:
- return None
- eventText = eventText.replace("\n", "\n\t")
- return eventText + "\n"
- def keycall(key: str, getter: Callable[[str], Any]) -> PotentialCallWrapper:
- """
- Check to see if C{key} ends with parentheses ("C{()}"); if not, wrap up the
- result of C{get} in a L{PotentialCallWrapper}. Otherwise, call the result
- of C{get} first, before wrapping it up.
- @param key: The last dotted segment of a formatting key, as parsed by
- L{Formatter.vformat}, which may end in C{()}.
- @param getter: A function which takes a string and returns some other
- object, to be formatted and stringified for a log.
- @return: A L{PotentialCallWrapper} that will wrap up the result to allow
- for subsequent usages of parens to defer execution to log-format time.
- """
- callit = key.endswith("()")
- realKey = key[:-2] if callit else key
- value = getter(realKey)
- if callit:
- value = value()
- return PotentialCallWrapper(value)
- class PotentialCallWrapper(object):
- """
- Object wrapper that wraps C{getattr()} so as to process call-parentheses
- C{"()"} after a dotted attribute access.
- """
- def __init__(self, wrapped: object) -> None:
- self._wrapped = wrapped
- def __getattr__(self, name: str) -> object:
- return keycall(name, self._wrapped.__getattribute__)
- def __getitem__(self, name: str) -> object:
- # The sub-object may not be indexable, but if it isn't, that's the
- # caller's problem.
- value = self._wrapped[name] # type:ignore[index]
- return PotentialCallWrapper(value)
- def __format__(self, format_spec: str) -> str:
- return self._wrapped.__format__(format_spec)
- def __repr__(self) -> str:
- return self._wrapped.__repr__()
- def __str__(self) -> str:
- return self._wrapped.__str__()
- class CallMapping(Mapping[str, Any]):
- """
- Read-only mapping that turns a C{()}-suffix in key names into an invocation
- of the key rather than a lookup of the key.
- Implementation support for L{formatWithCall}.
- """
- def __init__(self, submapping: Mapping[str, Any]) -> None:
- """
- @param submapping: Another read-only mapping which will be used to look
- up items.
- """
- self._submapping = submapping
- def __iter__(self) -> Iterator[Any]:
- return iter(self._submapping)
- def __len__(self) -> int:
- return len(self._submapping)
- def __getitem__(self, key: str) -> Any:
- """
- Look up an item in the submapping for this L{CallMapping}, calling it
- if C{key} ends with C{"()"}.
- """
- return keycall(key, self._submapping.__getitem__)
- def formatWithCall(formatString: str, mapping: Mapping[str, Any]) -> str:
- """
- Format a string like L{str.format}, but:
- - taking only a name mapping; no positional arguments
- - with the additional syntax that an empty set of parentheses
- correspond to a formatting item that should be called, and its result
- C{str}'d, rather than calling C{str} on the element directly as
- normal.
- For example::
- >>> formatWithCall("{string}, {function()}.",
- ... dict(string="just a string",
- ... function=lambda: "a function"))
- 'just a string, a function.'
- @param formatString: A PEP-3101 format string.
- @param mapping: A L{dict}-like object to format.
- @return: The string with formatted values interpolated.
- """
- return str(aFormatter.vformat(formatString, (), CallMapping(mapping)))
- def _formatEvent(event: LogEvent) -> str:
- """
- Formats an event as a string, using the format in C{event["log_format"]}.
- This implementation should never raise an exception; if the formatting
- cannot be done, the returned string will describe the event generically so
- that a useful message is emitted regardless.
- @param event: A logging event.
- @return: A formatted string.
- """
- try:
- if "log_flattened" in event:
- return flatFormat(event)
- format = cast(Optional[Union[str, bytes]], event.get("log_format", None))
- if format is None:
- return ""
- # Make sure format is text.
- if isinstance(format, str):
- pass
- elif isinstance(format, bytes):
- format = format.decode("utf-8")
- else:
- raise TypeError(f"Log format must be str, not {format!r}")
- return formatWithCall(format, event)
- except BaseException as e:
- return formatUnformattableEvent(event, e)
- def _formatTraceback(failure: Failure) -> str:
- """
- Format a failure traceback, assuming UTF-8 and using a replacement
- strategy for errors. Every effort is made to provide a usable
- traceback, but should not that not be possible, a message and the
- captured exception are logged.
- @param failure: The failure to retrieve a traceback from.
- @return: The formatted traceback.
- """
- try:
- traceback = failure.getTraceback()
- except BaseException as e:
- traceback = "(UNABLE TO OBTAIN TRACEBACK FROM EVENT):" + str(e)
- return traceback
- def _formatSystem(event: LogEvent) -> str:
- """
- Format the system specified in the event in the "log_system" key if set,
- otherwise the C{"log_namespace"} and C{"log_level"}, joined by a C{"#"}.
- Each defaults to C{"-"} is not set. If formatting fails completely,
- "UNFORMATTABLE" is returned.
- @param event: The event containing the system specification.
- @return: A formatted string representing the "log_system" key.
- """
- system = cast(Optional[str], event.get("log_system", None))
- if system is None:
- level = cast(Optional[NamedConstant], event.get("log_level", None))
- if level is None:
- levelName = "-"
- else:
- levelName = level.name
- system = "{namespace}#{level}".format(
- namespace=cast(str, event.get("log_namespace", "-")),
- level=levelName,
- )
- else:
- try:
- system = str(system)
- except Exception:
- system = "UNFORMATTABLE"
- return system
- def eventAsText(
- event: LogEvent,
- includeTraceback: bool = True,
- includeTimestamp: bool = True,
- includeSystem: bool = True,
- formatTime: Callable[[float], str] = formatTime,
- ) -> str:
- r"""
- Format an event as text. Optionally, attach timestamp, traceback, and
- system information.
- The full output format is:
- C{"{timeStamp} [{system}] {event}\n{traceback}\n"} where:
- - C{timeStamp} is the event's C{"log_time"} value formatted with
- the provided C{formatTime} callable.
- - C{system} is the event's C{"log_system"} value, if set, otherwise,
- the C{"log_namespace"} and C{"log_level"}, joined by a C{"#"}. Each
- defaults to C{"-"} is not set.
- - C{event} is the event, as formatted by L{formatEvent}.
- - C{traceback} is the traceback if the event contains a
- C{"log_failure"} key. In the event the original traceback cannot
- be formatted, a message indicating the failure will be substituted.
- If the event cannot be formatted, and no traceback exists, an empty string
- is returned, even if includeSystem or includeTimestamp are true.
- @param event: A logging event.
- @param includeTraceback: If true and a C{"log_failure"} key exists, append
- a traceback.
- @param includeTimestamp: If true include a formatted timestamp before the
- event.
- @param includeSystem: If true, include the event's C{"log_system"} value.
- @param formatTime: A time formatter
- @return: A formatted string with specified options.
- @since: Twisted 18.9.0
- """
- eventText = _formatEvent(event)
- if includeTraceback and "log_failure" in event:
- f = event["log_failure"]
- traceback = _formatTraceback(f)
- eventText = "\n".join((eventText, traceback))
- if not eventText:
- return eventText
- timeStamp = ""
- if includeTimestamp:
- timeStamp = "".join([formatTime(cast(float, event.get("log_time", None))), " "])
- system = ""
- if includeSystem:
- system = "".join(["[", _formatSystem(event), "]", " "])
- return "{timeStamp}{system}{eventText}".format(
- timeStamp=timeStamp,
- system=system,
- eventText=eventText,
- )
|