|
- import pickle
- import datetime
- import pytest
- import uuid
- from pyrsistent import (
- PRecord, field, InvariantException, ny, pset, PSet, CheckedPVector,
- PTypeError, pset_field, pvector_field, pmap_field, pmap, PMap,
- pvector, PVector, v, m)
- class ARecord(PRecord):
- x = field(type=(int, float))
- y = field()
- class Hierarchy(PRecord):
- point1 = field(ARecord)
- point2 = field(ARecord)
- points = pvector_field(ARecord)
- class RecordContainingContainers(PRecord):
- map = pmap_field(str, str)
- vec = pvector_field(str)
- set = pset_field(str)
- class UniqueThing(PRecord):
- id = field(type=uuid.UUID, factory=uuid.UUID)
- class Something(object):
- pass
- class Another(object):
- pass
- def test_create_ignore_extra_true():
- h = Hierarchy.create(
- {'point1': {'x': 1, 'y': 'foo', 'extra_field_0': 'extra_data_0'},
- 'point2': {'x': 1, 'y': 'foo', 'extra_field_1': 'extra_data_1'},
- 'extra_field_2': 'extra_data_2',
- }, ignore_extra=True
- )
- assert h
- def test_create_ignore_extra_true_sequence_hierarchy():
- h = Hierarchy.create(
- {'point1': {'x': 1, 'y': 'foo', 'extra_field_0': 'extra_data_0'},
- 'point2': {'x': 1, 'y': 'foo', 'extra_field_1': 'extra_data_1'},
- 'points': [{'x': 1, 'y': 'foo', 'extra_field_2': 'extra_data_2'},
- {'x': 1, 'y': 'foo', 'extra_field_3': 'extra_data_3'}],
- 'extra_field____': 'extra_data_2',
- }, ignore_extra=True
- )
- assert h
- def test_ignore_extra_for_pvector_field():
- class HierarchyA(PRecord):
- points = pvector_field(ARecord, optional=False)
- class HierarchyB(PRecord):
- points = pvector_field(ARecord, optional=True)
- point_object = {'x': 1, 'y': 'foo', 'extra_field': 69}
- h = HierarchyA.create({'points': [point_object]}, ignore_extra=True)
- assert h
- h = HierarchyB.create({'points': [point_object]}, ignore_extra=True)
- assert h
- def test_create():
- r = ARecord(x=1, y='foo')
- assert r.x == 1
- assert r.y == 'foo'
- assert isinstance(r, ARecord)
- def test_create_ignore_extra():
- r = ARecord.create({'x': 1, 'y': 'foo', 'z': None}, ignore_extra=True)
- assert r.x == 1
- assert r.y == 'foo'
- assert isinstance(r, ARecord)
- def test_create_ignore_extra_false():
- with pytest.raises(AttributeError):
- _ = ARecord.create({'x': 1, 'y': 'foo', 'z': None})
- def test_correct_assignment():
- r = ARecord(x=1, y='foo')
- r2 = r.set('x', 2.0)
- r3 = r2.set('y', 'bar')
- assert r2 == {'x': 2.0, 'y': 'foo'}
- assert r3 == {'x': 2.0, 'y': 'bar'}
- assert isinstance(r3, ARecord)
- def test_direct_assignment_not_possible():
- with pytest.raises(AttributeError):
- ARecord().x = 1
- def test_cannot_assign_undeclared_fields():
- with pytest.raises(AttributeError):
- ARecord().set('z', 5)
- def test_cannot_assign_wrong_type_to_fields():
- try:
- ARecord().set('x', 'foo')
- assert False
- except PTypeError as e:
- assert e.source_class == ARecord
- assert e.field == 'x'
- assert e.expected_types == set([int, float])
- assert e.actual_type is type('foo')
- def test_cannot_construct_with_undeclared_fields():
- with pytest.raises(AttributeError):
- ARecord(z=5)
- def test_cannot_construct_with_fields_of_wrong_type():
- with pytest.raises(TypeError):
- ARecord(x='foo')
- def test_support_record_inheritance():
- class BRecord(ARecord):
- z = field()
- r = BRecord(x=1, y='foo', z='bar')
- assert isinstance(r, BRecord)
- assert isinstance(r, ARecord)
- assert r == {'x': 1, 'y': 'foo', 'z': 'bar'}
- def test_single_type_spec():
- class A(PRecord):
- x = field(type=int)
- r = A(x=1)
- assert r.x == 1
- with pytest.raises(TypeError):
- r.set('x', 'foo')
- def test_remove():
- r = ARecord(x=1, y='foo')
- r2 = r.remove('y')
- assert isinstance(r2, ARecord)
- assert r2 == {'x': 1}
- def test_remove_non_existing_member():
- r = ARecord(x=1, y='foo')
- with pytest.raises(KeyError):
- r.remove('z')
- def test_field_invariant_must_hold():
- class BRecord(PRecord):
- x = field(invariant=lambda x: (x > 1, 'x too small'))
- y = field(mandatory=True)
- try:
- BRecord(x=1)
- assert False
- except InvariantException as e:
- assert e.invariant_errors == ('x too small',)
- assert e.missing_fields == ('BRecord.y',)
- def test_global_invariant_must_hold():
- class BRecord(PRecord):
- __invariant__ = lambda r: (r.x <= r.y, 'y smaller than x')
- x = field()
- y = field()
- BRecord(x=1, y=2)
- try:
- BRecord(x=2, y=1)
- assert False
- except InvariantException as e:
- assert e.invariant_errors == ('y smaller than x',)
- assert e.missing_fields == ()
- def test_set_multiple_fields():
- a = ARecord(x=1, y='foo')
- b = a.set(x=2, y='bar')
- assert b == {'x': 2, 'y': 'bar'}
- def test_initial_value():
- class BRecord(PRecord):
- x = field(initial=1)
- y = field(initial=2)
- a = BRecord()
- assert a.x == 1
- assert a.y == 2
- def test_enum_field():
- try:
- from enum import Enum
- except ImportError:
- return # Enum not supported in this environment
- class ExampleEnum(Enum):
- x = 1
- y = 2
- class RecordContainingEnum(PRecord):
- enum_field = field(type=ExampleEnum)
- r = RecordContainingEnum(enum_field=ExampleEnum.x)
- assert r.enum_field == ExampleEnum.x
- def test_type_specification_must_be_a_type():
- with pytest.raises(TypeError):
- class BRecord(PRecord):
- x = field(type=1)
- def test_initial_must_be_of_correct_type():
- with pytest.raises(TypeError):
- class BRecord(PRecord):
- x = field(type=int, initial='foo')
- def test_invariant_must_be_callable():
- with pytest.raises(TypeError):
- class BRecord(PRecord):
- x = field(invariant='foo') # type: ignore
- def test_global_invariants_are_inherited():
- class BRecord(PRecord):
- __invariant__ = lambda r: (r.x % r.y == 0, 'modulo')
- x = field()
- y = field()
- class CRecord(BRecord):
- __invariant__ = lambda r: (r.x > r.y, 'size')
- try:
- CRecord(x=5, y=3)
- assert False
- except InvariantException as e:
- assert e.invariant_errors == ('modulo',)
- def test_global_invariants_must_be_callable():
- with pytest.raises(TypeError):
- class CRecord(PRecord):
- __invariant__ = 1
- def test_repr():
- r = ARecord(x=1, y=2)
- assert repr(r) == 'ARecord(x=1, y=2)' or repr(r) == 'ARecord(y=2, x=1)'
- def test_factory():
- class BRecord(PRecord):
- x = field(type=int, factory=int)
- assert BRecord(x=2.5) == {'x': 2}
- def test_factory_must_be_callable():
- with pytest.raises(TypeError):
- class BRecord(PRecord):
- x = field(type=int, factory=1) # type: ignore
- def test_nested_record_construction():
- class BRecord(PRecord):
- x = field(int, factory=int)
- class CRecord(PRecord):
- a = field()
- b = field(type=BRecord)
- r = CRecord.create({'a': 'foo', 'b': {'x': '5'}})
- assert isinstance(r, CRecord)
- assert isinstance(r.b, BRecord)
- assert r == {'a': 'foo', 'b': {'x': 5}}
- def test_pickling():
- x = ARecord(x=2.0, y='bar')
- y = pickle.loads(pickle.dumps(x, -1))
- assert x == y
- assert isinstance(y, ARecord)
- def test_supports_pickling_with_typed_container_fields():
- obj = RecordContainingContainers(
- map={'foo': 'bar'}, set=['hello', 'there'], vec=['a', 'b'])
- obj2 = pickle.loads(pickle.dumps(obj))
- assert obj == obj2
- def test_all_invariant_errors_reported():
- class BRecord(PRecord):
- x = field(factory=int, invariant=lambda x: (x >= 0, 'x negative'))
- y = field(mandatory=True)
- class CRecord(PRecord):
- a = field(invariant=lambda x: (x != 0, 'a zero'))
- b = field(type=BRecord)
- try:
- CRecord.create({'a': 0, 'b': {'x': -5}})
- assert False
- except InvariantException as e:
- assert set(e.invariant_errors) == set(['x negative', 'a zero'])
- assert e.missing_fields == ('BRecord.y',)
- def test_precord_factory_method_is_idempotent():
- class BRecord(PRecord):
- x = field()
- y = field()
- r = BRecord(x=1, y=2)
- assert BRecord.create(r) is r
- def test_serialize():
- class BRecord(PRecord):
- d = field(type=datetime.date,
- factory=lambda d: datetime.datetime.strptime(d, "%d%m%Y").date(),
- serializer=lambda format, d: d.strftime('%Y-%m-%d') if format == 'ISO' else d.strftime('%d%m%Y'))
- assert BRecord(d='14012015').serialize('ISO') == {'d': '2015-01-14'}
- assert BRecord(d='14012015').serialize('other') == {'d': '14012015'}
- def test_nested_serialize():
- class BRecord(PRecord):
- d = field(serializer=lambda format, d: format)
- class CRecord(PRecord):
- b = field()
- serialized = CRecord(b=BRecord(d='foo')).serialize('bar')
- assert serialized == {'b': {'d': 'bar'}}
- assert isinstance(serialized, dict)
- def test_serializer_must_be_callable():
- with pytest.raises(TypeError):
- class CRecord(PRecord):
- x = field(serializer=1) # type: ignore
- def test_transform_without_update_returns_same_precord():
- r = ARecord(x=2.0, y='bar')
- assert r.transform([ny], lambda x: x) is r
- class Application(PRecord):
- name = field(type=str)
- image = field(type=str)
- class ApplicationVector(CheckedPVector):
- __type__ = Application
- class Node(PRecord):
- applications = field(type=ApplicationVector)
- def test_nested_create_serialize():
- node = Node(applications=[Application(name='myapp', image='myimage'),
- Application(name='b', image='c')])
- node2 = Node.create({'applications': [{'name': 'myapp', 'image': 'myimage'},
- {'name': 'b', 'image': 'c'}]})
- assert node == node2
- serialized = node.serialize()
- restored = Node.create(serialized)
- assert restored == node
- def test_pset_field_initial_value():
- """
- ``pset_field`` results in initial value that is empty.
- """
- class Record(PRecord):
- value = pset_field(int)
- assert Record() == Record(value=[])
- def test_pset_field_custom_initial():
- """
- A custom initial value can be passed in.
- """
- class Record(PRecord):
- value = pset_field(int, initial=(1, 2))
- assert Record() == Record(value=[1, 2])
- def test_pset_field_factory():
- """
- ``pset_field`` has a factory that creates a ``PSet``.
- """
- class Record(PRecord):
- value = pset_field(int)
- record = Record(value=[1, 2])
- assert isinstance(record.value, PSet)
- def test_pset_field_checked_set():
- """
- ``pset_field`` results in a set that enforces its type.
- """
- class Record(PRecord):
- value = pset_field(int)
- record = Record(value=[1, 2])
- with pytest.raises(TypeError):
- record.value.add("hello") # type: ignore
- def test_pset_field_checked_vector_multiple_types():
- """
- ``pset_field`` results in a vector that enforces its types.
- """
- class Record(PRecord):
- value = pset_field((int, str))
- record = Record(value=[1, 2, "hello"])
- with pytest.raises(TypeError):
- record.value.add(object())
- def test_pset_field_type():
- """
- ``pset_field`` enforces its type.
- """
- class Record(PRecord):
- value = pset_field(int)
- record = Record()
- with pytest.raises(TypeError):
- record.set("value", None)
- def test_pset_field_mandatory():
- """
- ``pset_field`` is a mandatory field.
- """
- class Record(PRecord):
- value = pset_field(int)
- record = Record(value=[1])
- with pytest.raises(InvariantException):
- record.remove("value")
- def test_pset_field_default_non_optional():
- """
- By default ``pset_field`` is non-optional, i.e. does not allow
- ``None``.
- """
- class Record(PRecord):
- value = pset_field(int)
- with pytest.raises(TypeError):
- Record(value=None)
- def test_pset_field_explicit_non_optional():
- """
- If ``optional`` argument is ``False`` then ``pset_field`` is
- non-optional, i.e. does not allow ``None``.
- """
- class Record(PRecord):
- value = pset_field(int, optional=False)
- with pytest.raises(TypeError):
- Record(value=None)
- def test_pset_field_optional():
- """
- If ``optional`` argument is true, ``None`` is acceptable alternative
- to a set.
- """
- class Record(PRecord):
- value = pset_field(int, optional=True)
- assert ((Record(value=[1, 2]).value, Record(value=None).value) ==
- (pset([1, 2]), None))
- def test_pset_field_name():
- """
- The created set class name is based on the type of items in the set.
- """
- class Record(PRecord):
- value = pset_field(Something)
- value2 = pset_field(int)
- assert ((Record().value.__class__.__name__,
- Record().value2.__class__.__name__) ==
- ("SomethingPSet", "IntPSet"))
- def test_pset_multiple_types_field_name():
- """
- The created set class name is based on the multiple given types of
- items in the set.
- """
- class Record(PRecord):
- value = pset_field((Something, int))
- assert (Record().value.__class__.__name__ ==
- "SomethingIntPSet")
- def test_pset_field_name_string_type():
- """
- The created set class name is based on the type of items specified by name
- """
- class Record(PRecord):
- value = pset_field("record_test.Something")
- assert Record().value.__class__.__name__ == "SomethingPSet"
- def test_pset_multiple_string_types_field_name():
- """
- The created set class name is based on the multiple given types of
- items in the set specified by name
- """
- class Record(PRecord):
- value = pset_field(("record_test.Something", "record_test.Another"))
- assert Record().value.__class__.__name__ == "SomethingAnotherPSet"
- def test_pvector_field_initial_value():
- """
- ``pvector_field`` results in initial value that is empty.
- """
- class Record(PRecord):
- value = pvector_field(int)
- assert Record() == Record(value=[])
- def test_pvector_field_custom_initial():
- """
- A custom initial value can be passed in.
- """
- class Record(PRecord):
- value = pvector_field(int, initial=(1, 2))
- assert Record() == Record(value=[1, 2])
- def test_pvector_field_factory():
- """
- ``pvector_field`` has a factory that creates a ``PVector``.
- """
- class Record(PRecord):
- value = pvector_field(int)
- record = Record(value=[1, 2])
- assert isinstance(record.value, PVector)
- def test_pvector_field_checked_vector():
- """
- ``pvector_field`` results in a vector that enforces its type.
- """
- class Record(PRecord):
- value = pvector_field(int)
- record = Record(value=[1, 2])
- with pytest.raises(TypeError):
- record.value.append("hello") # type: ignore
- def test_pvector_field_checked_vector_multiple_types():
- """
- ``pvector_field`` results in a vector that enforces its types.
- """
- class Record(PRecord):
- value = pvector_field((int, str))
- record = Record(value=[1, 2, "hello"])
- with pytest.raises(TypeError):
- record.value.append(object())
- def test_pvector_field_type():
- """
- ``pvector_field`` enforces its type.
- """
- class Record(PRecord):
- value = pvector_field(int)
- record = Record()
- with pytest.raises(TypeError):
- record.set("value", None)
- def test_pvector_field_mandatory():
- """
- ``pvector_field`` is a mandatory field.
- """
- class Record(PRecord):
- value = pvector_field(int)
- record = Record(value=[1])
- with pytest.raises(InvariantException):
- record.remove("value")
- def test_pvector_field_default_non_optional():
- """
- By default ``pvector_field`` is non-optional, i.e. does not allow
- ``None``.
- """
- class Record(PRecord):
- value = pvector_field(int)
- with pytest.raises(TypeError):
- Record(value=None)
- def test_pvector_field_explicit_non_optional():
- """
- If ``optional`` argument is ``False`` then ``pvector_field`` is
- non-optional, i.e. does not allow ``None``.
- """
- class Record(PRecord):
- value = pvector_field(int, optional=False)
- with pytest.raises(TypeError):
- Record(value=None)
- def test_pvector_field_optional():
- """
- If ``optional`` argument is true, ``None`` is acceptable alternative
- to a sequence.
- """
- class Record(PRecord):
- value = pvector_field(int, optional=True)
- assert ((Record(value=[1, 2]).value, Record(value=None).value) ==
- (pvector([1, 2]), None))
- def test_pvector_field_name():
- """
- The created set class name is based on the type of items in the set.
- """
- class Record(PRecord):
- value = pvector_field(Something)
- value2 = pvector_field(int)
- assert ((Record().value.__class__.__name__,
- Record().value2.__class__.__name__) ==
- ("SomethingPVector", "IntPVector"))
- def test_pvector_multiple_types_field_name():
- """
- The created vector class name is based on the multiple given types of
- items in the vector.
- """
- class Record(PRecord):
- value = pvector_field((Something, int))
- assert (Record().value.__class__.__name__ ==
- "SomethingIntPVector")
- def test_pvector_field_name_string_type():
- """
- The created set class name is based on the type of items in the set
- specified by name.
- """
- class Record(PRecord):
- value = pvector_field("record_test.Something")
- assert Record().value.__class__.__name__ == "SomethingPVector"
- def test_pvector_multiple_string_types_field_name():
- """
- The created vector class name is based on the multiple given types of
- items in the vector.
- """
- class Record(PRecord):
- value = pvector_field(("record_test.Something", "record_test.Another"))
- assert Record().value.__class__.__name__ == "SomethingAnotherPVector"
- def test_pvector_field_create_from_nested_serialized_data():
- class Foo(PRecord):
- foo = field(type=str)
- class Bar(PRecord):
- bar = pvector_field(Foo)
- data = Bar(bar=v(Foo(foo="foo")))
- Bar.create(data.serialize()) == data
- def test_pmap_field_initial_value():
- """
- ``pmap_field`` results in initial value that is empty.
- """
- class Record(PRecord):
- value = pmap_field(int, int)
- assert Record() == Record(value={})
- def test_pmap_field_factory():
- """
- ``pmap_field`` has a factory that creates a ``PMap``.
- """
- class Record(PRecord):
- value = pmap_field(int, int)
- record = Record(value={1: 1234})
- assert isinstance(record.value, PMap)
- def test_pmap_field_checked_map_key():
- """
- ``pmap_field`` results in a map that enforces its key type.
- """
- class Record(PRecord):
- value = pmap_field(int, type(None))
- record = Record(value={1: None})
- with pytest.raises(TypeError):
- record.value.set("hello", None) # type: ignore
- def test_pmap_field_checked_map_value():
- """
- ``pmap_field`` results in a map that enforces its value type.
- """
- class Record(PRecord):
- value = pmap_field(int, type(None))
- record = Record(value={1: None})
- with pytest.raises(TypeError):
- record.value.set(2, 4) # type: ignore
- def test_pmap_field_checked_map_key_multiple_types():
- """
- ``pmap_field`` results in a map that enforces its key types.
- """
- class Record(PRecord):
- value = pmap_field((int, str), type(None))
- record = Record(value={1: None, "hello": None})
- with pytest.raises(TypeError):
- record.value.set(object(), None)
- def test_pmap_field_checked_map_value_multiple_types():
- """
- ``pmap_field`` results in a map that enforces its value types.
- """
- class Record(PRecord):
- value = pmap_field(int, (str, type(None)))
- record = Record(value={1: None, 3: "hello"})
- with pytest.raises(TypeError):
- record.value.set(2, 4)
- def test_pmap_field_mandatory():
- """
- ``pmap_field`` is a mandatory field.
- """
- class Record(PRecord):
- value = pmap_field(int, int)
- record = Record()
- with pytest.raises(InvariantException):
- record.remove("value")
- def test_pmap_field_default_non_optional():
- """
- By default ``pmap_field`` is non-optional, i.e. does not allow
- ``None``.
- """
- class Record(PRecord):
- value = pmap_field(int, int)
- # Ought to be TypeError, but pyrsistent doesn't quite allow that:
- with pytest.raises(AttributeError):
- Record(value=None)
- def test_pmap_field_explicit_non_optional():
- """
- If ``optional`` argument is ``False`` then ``pmap_field`` is
- non-optional, i.e. does not allow ``None``.
- """
- class Record(PRecord):
- value = pmap_field(int, int, optional=False)
- # Ought to be TypeError, but pyrsistent doesn't quite allow that:
- with pytest.raises(AttributeError):
- Record(value=None)
- def test_pmap_field_optional():
- """
- If ``optional`` argument is true, ``None`` is acceptable alternative
- to a set.
- """
- class Record(PRecord):
- value = pmap_field(int, int, optional=True)
- assert (Record(value={1: 2}).value, Record(value=None).value) == \
- (pmap({1: 2}), None)
- def test_pmap_field_name():
- """
- The created map class name is based on the types of items in the map.
- """
- class Record(PRecord):
- value = pmap_field(Something, Another)
- value2 = pmap_field(int, float)
- assert ((Record().value.__class__.__name__,
- Record().value2.__class__.__name__) ==
- ("SomethingToAnotherPMap", "IntToFloatPMap"))
- def test_pmap_field_name_multiple_types():
- """
- The created map class name is based on the types of items in the map,
- including when there are multiple supported types.
- """
- class Record(PRecord):
- value = pmap_field((Something, Another), int)
- value2 = pmap_field(str, (int, float))
- assert ((Record().value.__class__.__name__,
- Record().value2.__class__.__name__) ==
- ("SomethingAnotherToIntPMap", "StrToIntFloatPMap"))
- def test_pmap_field_name_string_type():
- """
- The created map class name is based on the types of items in the map
- specified by name.
- """
- class Record(PRecord):
- value = pmap_field("record_test.Something", "record_test.Another")
- assert Record().value.__class__.__name__ == "SomethingToAnotherPMap"
- def test_pmap_field_name_multiple_string_types():
- """
- The created map class name is based on the types of items in the map,
- including when there are multiple supported types.
- """
- class Record(PRecord):
- value = pmap_field(("record_test.Something", "record_test.Another"), int)
- value2 = pmap_field(str, ("record_test.Something", "record_test.Another"))
- assert ((Record().value.__class__.__name__,
- Record().value2.__class__.__name__) ==
- ("SomethingAnotherToIntPMap", "StrToSomethingAnotherPMap"))
- def test_pmap_field_invariant():
- """
- The ``invariant`` parameter is passed through to ``field``.
- """
- class Record(PRecord):
- value = pmap_field(
- int, int,
- invariant=(
- lambda pmap: (len(pmap) == 1, "Exactly one item required.")
- )
- )
- with pytest.raises(InvariantException):
- Record(value={})
- with pytest.raises(InvariantException):
- Record(value={1: 2, 3: 4})
- assert Record(value={1: 2}).value == {1: 2}
- def test_pmap_field_create_from_nested_serialized_data():
- class Foo(PRecord):
- foo = field(type=str)
- class Bar(PRecord):
- bar = pmap_field(str, Foo)
- data = Bar(bar=m(foo_key=Foo(foo="foo")))
- Bar.create(data.serialize()) == data
- def test_supports_weakref():
- import weakref
- weakref.ref(ARecord(x=1, y=2))
- def test_supports_lazy_initial_value_for_field():
- class MyRecord(PRecord):
- a = field(int, initial=lambda: 2)
- assert MyRecord() == MyRecord(a=2)
- def test_pickle_with_one_way_factory():
- """
- A field factory isn't called when restoring from pickle.
- """
- thing = UniqueThing(id='25544626-86da-4bce-b6b6-9186c0804d64')
- assert thing == pickle.loads(pickle.dumps(thing))
|