_field_common.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. from pyrsistent._checked_types import (
  2. CheckedPMap,
  3. CheckedPSet,
  4. CheckedPVector,
  5. CheckedType,
  6. InvariantException,
  7. _restore_pickle,
  8. get_type,
  9. maybe_parse_user_type,
  10. maybe_parse_many_user_types,
  11. )
  12. from pyrsistent._checked_types import optional as optional_type
  13. from pyrsistent._checked_types import wrap_invariant
  14. import inspect
  15. def set_fields(dct, bases, name):
  16. dct[name] = dict(sum([list(b.__dict__.get(name, {}).items()) for b in bases], []))
  17. for k, v in list(dct.items()):
  18. if isinstance(v, _PField):
  19. dct[name][k] = v
  20. del dct[k]
  21. def check_global_invariants(subject, invariants):
  22. error_codes = tuple(error_code for is_ok, error_code in
  23. (invariant(subject) for invariant in invariants) if not is_ok)
  24. if error_codes:
  25. raise InvariantException(error_codes, (), 'Global invariant failed')
  26. def serialize(serializer, format, value):
  27. if isinstance(value, CheckedType) and serializer is PFIELD_NO_SERIALIZER:
  28. return value.serialize(format)
  29. return serializer(format, value)
  30. def check_type(destination_cls, field, name, value):
  31. if field.type and not any(isinstance(value, get_type(t)) for t in field.type):
  32. actual_type = type(value)
  33. message = "Invalid type for field {0}.{1}, was {2}".format(destination_cls.__name__, name, actual_type.__name__)
  34. raise PTypeError(destination_cls, name, field.type, actual_type, message)
  35. def is_type_cls(type_cls, field_type):
  36. if type(field_type) is set:
  37. return True
  38. types = tuple(field_type)
  39. if len(types) == 0:
  40. return False
  41. return issubclass(get_type(types[0]), type_cls)
  42. def is_field_ignore_extra_complaint(type_cls, field, ignore_extra):
  43. # ignore_extra param has default False value, for speed purpose no need to propagate False
  44. if not ignore_extra:
  45. return False
  46. if not is_type_cls(type_cls, field.type):
  47. return False
  48. return 'ignore_extra' in inspect.signature(field.factory).parameters
  49. class _PField(object):
  50. __slots__ = ('type', 'invariant', 'initial', 'mandatory', '_factory', 'serializer')
  51. def __init__(self, type, invariant, initial, mandatory, factory, serializer):
  52. self.type = type
  53. self.invariant = invariant
  54. self.initial = initial
  55. self.mandatory = mandatory
  56. self._factory = factory
  57. self.serializer = serializer
  58. @property
  59. def factory(self):
  60. # If no factory is specified and the type is another CheckedType use the factory method of that CheckedType
  61. if self._factory is PFIELD_NO_FACTORY and len(self.type) == 1:
  62. typ = get_type(tuple(self.type)[0])
  63. if issubclass(typ, CheckedType):
  64. return typ.create
  65. return self._factory
  66. PFIELD_NO_TYPE = ()
  67. PFIELD_NO_INVARIANT = lambda _: (True, None)
  68. PFIELD_NO_FACTORY = lambda x: x
  69. PFIELD_NO_INITIAL = object()
  70. PFIELD_NO_SERIALIZER = lambda _, value: value
  71. def field(type=PFIELD_NO_TYPE, invariant=PFIELD_NO_INVARIANT, initial=PFIELD_NO_INITIAL,
  72. mandatory=False, factory=PFIELD_NO_FACTORY, serializer=PFIELD_NO_SERIALIZER):
  73. """
  74. Field specification factory for :py:class:`PRecord`.
  75. :param type: a type or iterable with types that are allowed for this field
  76. :param invariant: a function specifying an invariant that must hold for the field
  77. :param initial: value of field if not specified when instantiating the record
  78. :param mandatory: boolean specifying if the field is mandatory or not
  79. :param factory: function called when field is set.
  80. :param serializer: function that returns a serialized version of the field
  81. """
  82. # NB: We have to check this predicate separately from the predicates in
  83. # `maybe_parse_user_type` et al. because this one is related to supporting
  84. # the argspec for `field`, while those are related to supporting the valid
  85. # ways to specify types.
  86. # Multiple types must be passed in one of the following containers. Note
  87. # that a type that is a subclass of one of these containers, like a
  88. # `collections.namedtuple`, will work as expected, since we check
  89. # `isinstance` and not `issubclass`.
  90. if isinstance(type, (list, set, tuple)):
  91. types = set(maybe_parse_many_user_types(type))
  92. else:
  93. types = set(maybe_parse_user_type(type))
  94. invariant_function = wrap_invariant(invariant) if invariant != PFIELD_NO_INVARIANT and callable(invariant) else invariant
  95. field = _PField(type=types, invariant=invariant_function, initial=initial,
  96. mandatory=mandatory, factory=factory, serializer=serializer)
  97. _check_field_parameters(field)
  98. return field
  99. def _check_field_parameters(field):
  100. for t in field.type:
  101. if not isinstance(t, type) and not isinstance(t, str):
  102. raise TypeError('Type parameter expected, not {0}'.format(type(t)))
  103. if field.initial is not PFIELD_NO_INITIAL and \
  104. not callable(field.initial) and \
  105. field.type and not any(isinstance(field.initial, t) for t in field.type):
  106. raise TypeError('Initial has invalid type {0}'.format(type(field.initial)))
  107. if not callable(field.invariant):
  108. raise TypeError('Invariant must be callable')
  109. if not callable(field.factory):
  110. raise TypeError('Factory must be callable')
  111. if not callable(field.serializer):
  112. raise TypeError('Serializer must be callable')
  113. class PTypeError(TypeError):
  114. """
  115. Raised when trying to assign a value with a type that doesn't match the declared type.
  116. Attributes:
  117. source_class -- The class of the record
  118. field -- Field name
  119. expected_types -- Types allowed for the field
  120. actual_type -- The non matching type
  121. """
  122. def __init__(self, source_class, field, expected_types, actual_type, *args, **kwargs):
  123. super(PTypeError, self).__init__(*args, **kwargs)
  124. self.source_class = source_class
  125. self.field = field
  126. self.expected_types = expected_types
  127. self.actual_type = actual_type
  128. SEQ_FIELD_TYPE_SUFFIXES = {
  129. CheckedPVector: "PVector",
  130. CheckedPSet: "PSet",
  131. }
  132. # Global dictionary to hold auto-generated field types: used for unpickling
  133. _seq_field_types = {}
  134. def _restore_seq_field_pickle(checked_class, item_type, data):
  135. """Unpickling function for auto-generated PVec/PSet field types."""
  136. type_ = _seq_field_types[checked_class, item_type]
  137. return _restore_pickle(type_, data)
  138. def _types_to_names(types):
  139. """Convert a tuple of types to a human-readable string."""
  140. return "".join(get_type(typ).__name__.capitalize() for typ in types)
  141. def _make_seq_field_type(checked_class, item_type, item_invariant):
  142. """Create a subclass of the given checked class with the given item type."""
  143. type_ = _seq_field_types.get((checked_class, item_type))
  144. if type_ is not None:
  145. return type_
  146. class TheType(checked_class):
  147. __type__ = item_type
  148. __invariant__ = item_invariant
  149. def __reduce__(self):
  150. return (_restore_seq_field_pickle,
  151. (checked_class, item_type, list(self)))
  152. suffix = SEQ_FIELD_TYPE_SUFFIXES[checked_class]
  153. TheType.__name__ = _types_to_names(TheType._checked_types) + suffix
  154. _seq_field_types[checked_class, item_type] = TheType
  155. return TheType
  156. def _sequence_field(checked_class, item_type, optional, initial,
  157. invariant=PFIELD_NO_INVARIANT,
  158. item_invariant=PFIELD_NO_INVARIANT):
  159. """
  160. Create checked field for either ``PSet`` or ``PVector``.
  161. :param checked_class: ``CheckedPSet`` or ``CheckedPVector``.
  162. :param item_type: The required type for the items in the set.
  163. :param optional: If true, ``None`` can be used as a value for
  164. this field.
  165. :param initial: Initial value to pass to factory.
  166. :return: A ``field`` containing a checked class.
  167. """
  168. TheType = _make_seq_field_type(checked_class, item_type, item_invariant)
  169. if optional:
  170. def factory(argument, _factory_fields=None, ignore_extra=False):
  171. if argument is None:
  172. return None
  173. else:
  174. return TheType.create(argument, _factory_fields=_factory_fields, ignore_extra=ignore_extra)
  175. else:
  176. factory = TheType.create
  177. return field(type=optional_type(TheType) if optional else TheType,
  178. factory=factory, mandatory=True,
  179. invariant=invariant,
  180. initial=factory(initial))
  181. def pset_field(item_type, optional=False, initial=(),
  182. invariant=PFIELD_NO_INVARIANT,
  183. item_invariant=PFIELD_NO_INVARIANT):
  184. """
  185. Create checked ``PSet`` field.
  186. :param item_type: The required type for the items in the set.
  187. :param optional: If true, ``None`` can be used as a value for
  188. this field.
  189. :param initial: Initial value to pass to factory if no value is given
  190. for the field.
  191. :return: A ``field`` containing a ``CheckedPSet`` of the given type.
  192. """
  193. return _sequence_field(CheckedPSet, item_type, optional, initial,
  194. invariant=invariant,
  195. item_invariant=item_invariant)
  196. def pvector_field(item_type, optional=False, initial=(),
  197. invariant=PFIELD_NO_INVARIANT,
  198. item_invariant=PFIELD_NO_INVARIANT):
  199. """
  200. Create checked ``PVector`` field.
  201. :param item_type: The required type for the items in the vector.
  202. :param optional: If true, ``None`` can be used as a value for
  203. this field.
  204. :param initial: Initial value to pass to factory if no value is given
  205. for the field.
  206. :return: A ``field`` containing a ``CheckedPVector`` of the given type.
  207. """
  208. return _sequence_field(CheckedPVector, item_type, optional, initial,
  209. invariant=invariant,
  210. item_invariant=item_invariant)
  211. _valid = lambda item: (True, "")
  212. # Global dictionary to hold auto-generated field types: used for unpickling
  213. _pmap_field_types = {}
  214. def _restore_pmap_field_pickle(key_type, value_type, data):
  215. """Unpickling function for auto-generated PMap field types."""
  216. type_ = _pmap_field_types[key_type, value_type]
  217. return _restore_pickle(type_, data)
  218. def _make_pmap_field_type(key_type, value_type):
  219. """Create a subclass of CheckedPMap with the given key and value types."""
  220. type_ = _pmap_field_types.get((key_type, value_type))
  221. if type_ is not None:
  222. return type_
  223. class TheMap(CheckedPMap):
  224. __key_type__ = key_type
  225. __value_type__ = value_type
  226. def __reduce__(self):
  227. return (_restore_pmap_field_pickle,
  228. (self.__key_type__, self.__value_type__, dict(self)))
  229. TheMap.__name__ = "{0}To{1}PMap".format(
  230. _types_to_names(TheMap._checked_key_types),
  231. _types_to_names(TheMap._checked_value_types))
  232. _pmap_field_types[key_type, value_type] = TheMap
  233. return TheMap
  234. def pmap_field(key_type, value_type, optional=False, invariant=PFIELD_NO_INVARIANT):
  235. """
  236. Create a checked ``PMap`` field.
  237. :param key: The required type for the keys of the map.
  238. :param value: The required type for the values of the map.
  239. :param optional: If true, ``None`` can be used as a value for
  240. this field.
  241. :param invariant: Pass-through to ``field``.
  242. :return: A ``field`` containing a ``CheckedPMap``.
  243. """
  244. TheMap = _make_pmap_field_type(key_type, value_type)
  245. if optional:
  246. def factory(argument):
  247. if argument is None:
  248. return None
  249. else:
  250. return TheMap.create(argument)
  251. else:
  252. factory = TheMap.create
  253. return field(mandatory=True, initial=TheMap(),
  254. type=optional_type(TheMap) if optional else TheMap,
  255. factory=factory, invariant=invariant)