123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- # -*- test-case-name: twisted.logger.test.test_json -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tools for saving and loading log events in a structured format.
- """
- from json import dumps, loads
- from typing import IO, Any, AnyStr, Dict, Iterable, Optional, Union, cast
- from uuid import UUID
- from constantly import NamedConstant
- from twisted.python.failure import Failure
- from ._file import FileLogObserver
- from ._flatten import flattenEvent
- from ._interfaces import LogEvent
- from ._levels import LogLevel
- from ._logger import Logger
- log = Logger()
- JSONDict = Dict[str, Any]
- def failureAsJSON(failure: Failure) -> JSONDict:
- """
- Convert a failure to a JSON-serializable data structure.
- @param failure: A failure to serialize.
- @return: a mapping of strings to ... stuff, mostly reminiscent of
- L{Failure.__getstate__}
- """
- return dict(
- failure.__getstate__(),
- type=dict(
- __module__=failure.type.__module__,
- __name__=failure.type.__name__,
- ),
- )
- def failureFromJSON(failureDict: JSONDict) -> Failure:
- """
- Load a L{Failure} from a dictionary deserialized from JSON.
- @param failureDict: a JSON-deserialized object like one previously returned
- by L{failureAsJSON}.
- @return: L{Failure}
- """
- f = Failure.__new__(Failure)
- typeInfo = failureDict["type"]
- failureDict["type"] = type(typeInfo["__name__"], (), typeInfo)
- f.__dict__ = failureDict
- return f
- classInfo = [
- (
- lambda level: (
- isinstance(level, NamedConstant)
- and getattr(LogLevel, level.name, None) is level
- ),
- UUID("02E59486-F24D-46AD-8224-3ACDF2A5732A"),
- lambda level: dict(name=level.name),
- lambda level: getattr(LogLevel, level["name"], None),
- ),
- (
- lambda o: isinstance(o, Failure),
- UUID("E76887E2-20ED-49BF-A8F8-BA25CC586F2D"),
- failureAsJSON,
- failureFromJSON,
- ),
- ]
- uuidToLoader = {uuid: loader for (predicate, uuid, saver, loader) in classInfo}
- def objectLoadHook(aDict: JSONDict) -> object:
- """
- Dictionary-to-object-translation hook for certain value types used within
- the logging system.
- @see: the C{object_hook} parameter to L{json.load}
- @param aDict: A dictionary loaded from a JSON object.
- @return: C{aDict} itself, or the object represented by C{aDict}
- """
- if "__class_uuid__" in aDict:
- return uuidToLoader[UUID(aDict["__class_uuid__"])](aDict)
- return aDict
- def objectSaveHook(pythonObject: object) -> JSONDict:
- """
- Object-to-serializable hook for certain value types used within the logging
- system.
- @see: the C{default} parameter to L{json.dump}
- @param pythonObject: Any object.
- @return: If the object is one of the special types the logging system
- supports, a specially-formatted dictionary; otherwise, a marker
- dictionary indicating that it could not be serialized.
- """
- for predicate, uuid, saver, loader in classInfo:
- if predicate(pythonObject):
- result = saver(pythonObject)
- result["__class_uuid__"] = str(uuid)
- return result
- return {"unpersistable": True}
- def eventAsJSON(event: LogEvent) -> str:
- """
- Encode an event as JSON, flattening it if necessary to preserve as much
- structure as possible.
- Not all structure from the log event will be preserved when it is
- serialized.
- @param event: A log event dictionary.
- @return: A string of the serialized JSON; note that this will contain no
- newline characters, and may thus safely be stored in a line-delimited
- file.
- """
- def default(unencodable: object) -> Union[JSONDict, str]:
- """
- Serialize an object not otherwise serializable by L{dumps}.
- @param unencodable: An unencodable object.
- @return: C{unencodable}, serialized
- """
- if isinstance(unencodable, bytes):
- return unencodable.decode("charmap")
- return objectSaveHook(unencodable)
- flattenEvent(event)
- return dumps(event, default=default, skipkeys=True)
- def eventFromJSON(eventText: str) -> JSONDict:
- """
- Decode a log event from JSON.
- @param eventText: The output of a previous call to L{eventAsJSON}
- @return: A reconstructed version of the log event.
- """
- return cast(JSONDict, loads(eventText, object_hook=objectLoadHook))
- def jsonFileLogObserver(
- outFile: IO[Any], recordSeparator: str = "\x1e"
- ) -> FileLogObserver:
- """
- Create a L{FileLogObserver} that emits JSON-serialized events to a
- specified (writable) file-like object.
- Events are written in the following form::
- RS + JSON + NL
- C{JSON} is the serialized event, which is JSON text. C{NL} is a newline
- (C{"\\n"}). C{RS} is a record separator. By default, this is a single
- RS character (C{"\\x1e"}), which makes the default output conform to the
- IETF draft document "draft-ietf-json-text-sequence-13".
- @param outFile: A file-like object. Ideally one should be passed which
- accepts L{str} data. Otherwise, UTF-8 L{bytes} will be used.
- @param recordSeparator: The record separator to use.
- @return: A file log observer.
- """
- return FileLogObserver(
- outFile, lambda event: f"{recordSeparator}{eventAsJSON(event)}\n"
- )
- def eventsFromJSONLogFile(
- inFile: IO[Any],
- recordSeparator: Optional[str] = None,
- bufferSize: int = 4096,
- ) -> Iterable[LogEvent]:
- """
- Load events from a file previously saved with L{jsonFileLogObserver}.
- Event records that are truncated or otherwise unreadable are ignored.
- @param inFile: A (readable) file-like object. Data read from C{inFile}
- should be L{str} or UTF-8 L{bytes}.
- @param recordSeparator: The expected record separator.
- If L{None}, attempt to automatically detect the record separator from
- one of C{"\\x1e"} or C{""}.
- @param bufferSize: The size of the read buffer used while reading from
- C{inFile}.
- @return: Log events as read from C{inFile}.
- """
- def asBytes(s: AnyStr) -> bytes:
- if isinstance(s, bytes):
- return s
- else:
- return s.encode("utf-8")
- def eventFromBytearray(record: bytearray) -> Optional[LogEvent]:
- try:
- text = bytes(record).decode("utf-8")
- except UnicodeDecodeError:
- log.error(
- "Unable to decode UTF-8 for JSON record: {record!r}",
- record=bytes(record),
- )
- return None
- try:
- return eventFromJSON(text)
- except ValueError:
- log.error("Unable to read JSON record: {record!r}", record=bytes(record))
- return None
- if recordSeparator is None:
- first = asBytes(inFile.read(1))
- if first == b"\x1e":
- # This looks json-text-sequence compliant.
- recordSeparatorBytes = first
- else:
- # Default to simpler newline-separated stream, which does not use
- # a record separator.
- recordSeparatorBytes = b""
- else:
- recordSeparatorBytes = asBytes(recordSeparator)
- first = b""
- if recordSeparatorBytes == b"":
- recordSeparatorBytes = b"\n" # Split on newlines below
- eventFromRecord = eventFromBytearray
- else:
- def eventFromRecord(record: bytearray) -> Optional[LogEvent]:
- if record[-1] == ord("\n"):
- return eventFromBytearray(record)
- else:
- log.error(
- "Unable to read truncated JSON record: {record!r}",
- record=bytes(record),
- )
- return None
- buffer = bytearray(first)
- while True:
- newData = inFile.read(bufferSize)
- if not newData:
- if len(buffer) > 0:
- event = eventFromRecord(buffer)
- if event is not None:
- yield event
- break
- buffer += asBytes(newData)
- records = buffer.split(recordSeparatorBytes)
- for record in records[:-1]:
- if len(record) > 0:
- event = eventFromRecord(record)
- if event is not None:
- yield event
- buffer = records[-1]
|