# -*- test-case-name: twisted.logger.test.test_flatten -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Code related to "flattening" events; that is, extracting a description of all relevant fields from the format string and persisting them for later examination. """ from string import Formatter from collections import defaultdict from twisted.python.compat import unicode aFormatter = Formatter() class KeyFlattener(object): """ A L{KeyFlattener} computes keys for the things within curly braces in PEP-3101-style format strings as parsed by L{string.Formatter.parse}. """ def __init__(self): """ Initialize a L{KeyFlattener}. """ self.keys = defaultdict(lambda: 0) def flatKey(self, fieldName, formatSpec, conversion): """ Compute a string key for a given field/format/conversion. @param fieldName: A format field name. @type fieldName: L{str} @param formatSpec: A format spec. @type formatSpec: L{str} @param conversion: A format field conversion type. @type conversion: L{str} @return: A key specific to the given field, format and conversion, as well as the occurrence of that combination within this L{KeyFlattener}'s lifetime. @rtype: L{str} """ result = ( "{fieldName}!{conversion}:{formatSpec}" .format( fieldName=fieldName, formatSpec=(formatSpec or ""), conversion=(conversion or ""), ) ) self.keys[result] += 1 n = self.keys[result] if n != 1: result += "/" + str(self.keys[result]) return result def flattenEvent(event): """ Flatten the given event by pre-associating format fields with specific objects and callable results in a L{dict} put into the C{"log_flattened"} key in the event. @param event: A logging event. @type event: L{dict} """ if event.get("log_format", None) is None: return if "log_flattened" in event: fields = event["log_flattened"] else: fields = {} keyFlattener = KeyFlattener() for (literalText, fieldName, formatSpec, conversion) in ( aFormatter.parse(event["log_format"]) ): if fieldName is None: continue if conversion != "r": conversion = "s" flattenedKey = keyFlattener.flatKey(fieldName, formatSpec, conversion) structuredKey = keyFlattener.flatKey(fieldName, formatSpec, "") if flattenedKey in fields: # We've already seen and handled this key continue if fieldName.endswith(u"()"): fieldName = fieldName[:-2] callit = True else: callit = False field = aFormatter.get_field(fieldName, (), event) fieldValue = field[0] if conversion == "r": conversionFunction = repr else: # Above: if conversion is not "r", it's "s" conversionFunction = unicode if callit: fieldValue = fieldValue() flattenedValue = conversionFunction(fieldValue) fields[flattenedKey] = flattenedValue fields[structuredKey] = fieldValue if fields: event["log_flattened"] = fields def extractField(field, event): """ Extract a given format field from the given event. @param field: A string describing a format field or log key. This is the text that would normally fall between a pair of curly braces in a format string: for example, C{"key[2].attribute"}. If a conversion is specified (the thing after the C{"!"} character in a format field) then the result will always be L{unicode}. @type field: L{str} (native string) @param event: A log event. @type event: L{dict} @return: A value extracted from the field. @rtype: L{object} @raise KeyError: if the field is not found in the given event. """ keyFlattener = KeyFlattener() [[literalText, fieldName, formatSpec, conversion]] = aFormatter.parse( "{" + field + "}" ) key = keyFlattener.flatKey(fieldName, formatSpec, conversion) if "log_flattened" not in event: flattenEvent(event) return event["log_flattened"][key] def flatFormat(event): """ Format an event which has been flattened with L{flattenEvent}. @param event: A logging event. @type event: L{dict} @return: A formatted string. @rtype: L{unicode} """ fieldValues = event["log_flattened"] s = [] keyFlattener = KeyFlattener() formatFields = aFormatter.parse(event["log_format"]) for literalText, fieldName, formatSpec, conversion in formatFields: s.append(literalText) if fieldName is not None: key = keyFlattener.flatKey( fieldName, formatSpec, conversion or "s") s.append(unicode(fieldValues[key])) return u"".join(s)