[Zope3-checkins] CVS: Zope3/src/zope/app/schema/tests -
fields.zcml:1.1 test_field.py:1.1
test_interfaceutility.py:1.1 test_schemautility.py:1.1
test_schemautilitypersistence.py:1.1 test_wrapper.py:1.1
Stephan Richter
srichter at cosmos.phy.tufts.edu
Tue Mar 9 19:58:00 EST 2004
Update of /cvs-repository/Zope3/src/zope/app/schema/tests
In directory cvs.zope.org:/tmp/cvs-serv28963/src/zope/app/schema/tests
Added Files:
fields.zcml test_field.py test_interfaceutility.py
test_schemautility.py test_schemautilitypersistence.py
test_wrapper.py
Log Message:
Moved mutable schemas out of zope.app.utilities to zope.app.schema. Also,
seperated it from the mutable schema content definition/instance code, which
now lives in zope.app.schemacontent.
=== Added File Zope3/src/zope/app/schema/tests/fields.zcml ===
<configure
xmlns="http://namespaces.zope.org/zope"
i18n_domain="zope">
<include package="zope.app.component" file="meta.zcml" />
<include package="zope.app.publisher" file="meta.zcml" />
<include package="zope.app.browser.form" file="meta.zcml" />
<include package="zope.app.security" file="meta.zcml"/>
<include package="zope.app.security" file="configure.zcml"/>
<include package="zope.app.browser" file="usages.zcml" />
<include package="zope.app.browser" file="menus.zcml" />
<include package="zope.app.schema" file="configure.zcml" />
</configure>
=== Added File Zope3/src/zope/app/schema/tests/test_field.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Field Tests
$Id: test_field.py,v 1.1 2004/03/10 00:57:57 srichter Exp $
"""
import unittest
from persistent.tests.persistenttestbase import DM
from zope.configuration import xmlconfig
from zope.schema import Text, getFieldsInOrder
from zope.security.checker import ProxyFactory
from zope.security.management import system_user, newSecurityManager
from zope.app.tests import setup
from zope.app.schema.wrapper import Struct
from zope.security.checker import getChecker, _defaultChecker
import zope.app.schema.tests
class FieldPersistence(unittest.TestCase):
def test_field_change(self):
f = Text(title=u'alpha')
f = Struct(f)
f._p_oid = '\0\0\0\0\0\0hi'
dm = DM()
f._p_jar = dm
self.assertEqual(f._p_changed, 0)
f.title = u'bar'
self.assertEqual(f._p_changed, 1)
self.assertEqual(dm.called, 1)
class FieldPermissions(unittest.TestCase):
def setUp(self):
setup.placefulSetUp()
self.context = xmlconfig.file("fields.zcml", zope.app.schema.tests)
newSecurityManager(system_user)
def test_wrapped_field_checker(self):
f1 = Text(title=u'alpha')
f1 = ProxyFactory(f1)
f2 = Text(title=u'alpha')
f2 = Struct(f2)
f2 = ProxyFactory(f2)
self.assertEquals(getChecker(f1), getChecker(f2))
self.failIf(getChecker(f1) is _defaultChecker)
self.failIf(getChecker(f2) is _defaultChecker)
def tearDown(self):
setup.placefulTearDown()
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(FieldPersistence),
unittest.makeSuite(FieldPermissions),
))
if __name__ == '__main__':
unittest.main()
=== Added File Zope3/src/zope/app/schema/tests/test_interfaceutility.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Utility service tests
XXX longer description goes here.
$Id: test_interfaceutility.py,v 1.1 2004/03/10 00:57:57 srichter Exp $
"""
import unittest
from zope.app.tests import setup
from zope.app.services.tests import placefulsetup
from zope.app.services import utility
from zope.app.services.servicenames import Utilities
from zope.component.utility import utilityService as globalUtilityService
from zope.app.component.interface import getInterface, searchInterface
from zope.interface import Interface, implements
from zope.app.container.contained import Contained
from zope.component import getService
from zope.component.exceptions import ComponentLookupError
from zope.app.traversing import traverse
from zope.app.interfaces.services.registration import IRegistrationStack
from zope.app.interfaces.services.registration import UnregisteredStatus
from zope.app.interfaces.services.registration import RegisteredStatus
from zope.app.interfaces.services.registration import ActiveStatus
from zope.app.interfaces.services.utility import ILocalUtility
from zope.app.interfaces.services.registration import IRegistered
from zope.app.interfaces.dependable import IDependable
from zope.app.tests import setup
from zope.interface.interface import InterfaceClass
from zope.interface.interfaces import IInterface
from zope.interface import Interface
class IBaz(Interface): pass
class Baz:
# We implement IRegistered and IDependable directly to
# depend as little as possible on other infrastructure.
implements(IBaz, ILocalUtility, IRegistered, IDependable)
def __init__(self, name):
self.name = name
self._usages = []
self._dependents = []
def foo(self):
return 'foo ' + self.name
def addUsage(self, location):
"See zope.app.interfaces.services.registration.IRegistered"
if location not in self._usages:
self._usages.append(location)
def removeUsage(self, location):
"See zope.app.interfaces.services.registration.IRegistered"
self._usages.remove(location)
def usages(self):
"See zope.app.interfaces.services.registration.IRegistered"
return self._usages
def addDependent(self, location):
"See zope.app.interfaces.dependable.IDependable"
if location not in self._dependents:
self._dependents.append(location)
def removeDependent(self, location):
"See zope.app.interfaces.dependable.IDependable"
self._dependents.remove(location)
def dependents(self):
"See zope.app.interfaces.dependable.IDependable"
return self._dependents
class Foo(InterfaceClass, Baz, Contained):
def __init__(self, name):
InterfaceClass.__init__(self, name, (Interface,))
Baz.__init__(self, name)
class Bar(Foo): pass
class TestInterfaceUtility(placefulsetup.PlacefulSetup, unittest.TestCase):
def setUp(self):
sm = placefulsetup.PlacefulSetup.setUp(self, site=True)
setup.addService(sm, Utilities,
utility.LocalUtilityService())
def test_getLocalInterface_delegates_to_globalUtility(self):
globalUtilityService.provideUtility(IInterface, Bar("blob"),
name="blob")
globalUtilityService.provideUtility(IBaz, Baz("global baz"))
globalUtilityService.provideUtility(IInterface, Foo("global bob"),
name="bob")
self.assertEqual(getInterface(None, "bob").__class__, Foo)
self.assertEqual(getInterface(None, "blob").__class__, Bar)
def test_localInterfaceitems_filters_accordingly(self):
bar = Bar("global")
baz = Baz("global baz")
foo = Foo("global bob")
globalUtilityService.provideUtility(IInterface, foo,
name="bob")
globalUtilityService.provideUtility(IInterface, bar)
globalUtilityService.provideUtility(IBaz, baz)
ifaces = searchInterface(None)
self.assert_(len(ifaces), 2)
for pair in [(foo), (bar)]:
self.assert_(pair in ifaces)
iface_utilities = globalUtilityService.getUtilitiesFor(IInterface)
ifaces = [iface for (name, iface) in iface_utilities]
self.assert_(len(ifaces), 2)
for pair in [(foo), (bar)]:
self.assert_(pair in ifaces)
for pair in [(foo), (bar)]:
self.assert_(pair in ifaces)
def test_localInterfaceitems_filters_only_interfaces(self):
bar = Bar("global")
baz = Baz("global baz")
foo = Foo("global bob")
globalUtilityService.provideUtility(IInterface, foo,
name="bob")
globalUtilityService.provideUtility(ILocalUtility, bar)
globalUtilityService.provideUtility(IBaz, baz)
iface_utilities = globalUtilityService.getUtilitiesFor(IInterface)
ifaces = [iface for (name, iface) in iface_utilities]
self.assertEqual(ifaces, [(foo)])
iface_utilities = globalUtilityService.getUtilitiesFor(ILocalUtility)
ifaces = [iface for (name, iface) in iface_utilities]
self.assertEqual(ifaces, [(bar)])
iface_utilities = globalUtilityService.getUtilitiesFor(IBaz)
ifaces = [iface for (name, iface) in iface_utilities]
self.assertEqual(ifaces, [(baz)])
def test_getLocalInterface_raisesComponentLookupError(self):
globalUtilityService.provideUtility(IInterface, Foo("global"))
globalUtilityService.provideUtility(IBaz, Baz("global baz"))
globalUtilityService.provideUtility(IInterface, Foo("global bob"),
name="bob")
self.assertRaises(ComponentLookupError,
getInterface, None, "bobesponja")
def test_globalsearchInterface_delegates_to_globalUtility(self):
foo = Foo("global bob")
bar = Bar("global")
baz = Baz("global baz")
globalUtilityService.provideUtility(IInterface, bar)
globalUtilityService.provideUtility(IBaz, baz)
globalUtilityService.provideUtility(IInterface, foo,
name="bob")
self.assertEqual(searchInterface(None, search_string="bob"),
[foo])
def test_localsearchInterface_delegates_to_globalUtility(self):
foo = Foo("global bob")
bar = Bar("global")
baz = Baz("global baz")
globalUtilityService.provideUtility(IInterface, bar)
globalUtilityService.provideUtility(IBaz, baz)
globalUtilityService.provideUtility(IInterface, foo,
name="bob")
self.assertEqual(searchInterface(None, search_string="bob"),
[foo])
def test_queryUtility_delegates_to_global(self):
globalUtilityService.provideUtility(IInterface, Foo("global"))
globalUtilityService.provideUtility(IInterface, Foo("global bob"),
name="bob")
utility_service = getService(self.rootFolder, Utilities)
self.assert_(utility_service != globalUtilityService)
self.assertEqual(utility_service.queryUtility(IInterface).foo(),
"foo global")
self.assertEqual(
utility_service.queryUtility(IInterface, name="bob").foo(),
"foo global bob")
def test_getUtility_delegates_to_global(self):
globalUtilityService.provideUtility(IInterface, Foo("global"))
globalUtilityService.provideUtility(IInterface, Foo("global bob"),
name="bob")
utility_service = getService(self.rootFolder, Utilities)
self.assert_(utility_service != globalUtilityService)
self.assertEqual(utility_service.getUtility(IInterface).foo(),
"foo global")
self.assertEqual(
utility_service.getUtility(IInterface, name="bob").foo(),
"foo global bob")
def test_registrationsFor_methods(self):
utilities = getService(self.rootFolder, Utilities)
default = traverse(self.rootFolder, "++etc++site/default")
default['foo'] = Foo("local")
path = "/++etc++site/default/foo"
for name in ('', 'bob'):
registration = utility.UtilityRegistration(name, IInterface, path)
self.assertEqual(utilities.queryRegistrationsFor(registration),
None)
registery = utilities.createRegistrationsFor(registration)
self.assert_(IRegistrationStack.providedBy(registery))
self.assertEqual(utilities.queryRegistrationsFor(registration),
registery)
def test_local_utilities(self):
globalUtilityService.provideUtility(IInterface, Foo("global"))
globalUtilityService.provideUtility(IInterface, Foo("global bob"),
name="bob")
utilities = getService(self.rootFolder, Utilities)
default = traverse(self.rootFolder, "++etc++site/default")
default['foo'] = Foo("local")
path = "/++etc++site/default/foo"
cm = default.getRegistrationManager()
for name in ('', 'bob'):
registration = utility.UtilityRegistration(name, IInterface, path)
cname = cm.addRegistration(registration)
registration = traverse(cm, cname)
gout = name and "foo global "+name or "foo global"
self.assertEqual(utilities.getUtility(IInterface, name=name).foo(),
gout)
registration.status = ActiveStatus
self.assertEqual(utilities.getUtility(IInterface, name=name).foo(),
"foo local")
registration.status = RegisteredStatus
self.assertEqual(utilities.getUtility(IInterface, name=name).foo(),
gout)
def test_getRegisteredMatching(self):
self.test_local_utilities()
utilities = getService(self.rootFolder, Utilities)
r = list(utilities.getRegisteredMatching())
r.sort()
path = "/++etc++site/default/foo"
cr1 = utilities.queryRegistrationsFor(
utility.UtilityRegistration("", IInterface, path))
cr2 = utilities.queryRegistrationsFor(
utility.UtilityRegistration("bob", IInterface, path))
self.assertEqual(r, [(IInterface, "", cr1), (IInterface, "bob", cr2)])
self.assertEqual(r[0][2].__parent__, utilities)
self.assertEqual(r[1][2].__parent__, utilities)
# Now tescvt that an empty registry doesn't show up
for cd in cr1.info(): # Remove everything from cr1
cd['registration'].status = UnregisteredStatus
self.assertEqual(bool(cr1), False)
r = list(utilities.getRegisteredMatching())
self.assertEqual(r, [(IInterface, "bob", cr2)])
def test_suite():
return unittest.makeSuite(TestInterfaceUtility)
if __name__ == '__main__':
unittest.main()
=== Added File Zope3/src/zope/app/schema/tests/test_schemautility.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: test_schemautility.py,v 1.1 2004/03/10 00:57:57 srichter Exp $
"""
from unittest import TestCase, makeSuite, TestSuite
from zope.configuration import xmlconfig
from zope.schema import Text, getFieldNamesInOrder, getFieldsInOrder
from zope.security.management import system_user, newSecurityManager
from zope.security.checker import getChecker, _defaultChecker, ProxyFactory
from zope.app.schema.schema import SchemaUtility
from zope.app.tests import setup
from zope.app import zapi
import zope.app.schema.tests
class SchemaUtilityTests(TestCase):
def _createSchemaUtility(self):
return SchemaUtility()
def _additionalSetup(self):
self.s = self._createSchemaUtility()
self.s.setName('IFoo')
self.alpha = Text(title=u"alpha")
def setUp(self):
setup.placefulSetUp()
self._additionalSetup()
def test_addField(self):
s = self.s
s.addField(u'alpha', self.alpha)
self.assertEquals(
[u'alpha',],
getFieldNamesInOrder(s))
def test_addFieldInsertsAtEnd(self):
s = self.s
s.addField(u'alpha', self.alpha)
beta = Text(title=u"Beta")
s.addField(u'beta', beta)
self.assertEquals(
[u'alpha', u'beta'],
getFieldNamesInOrder(s))
def test_removeField(self):
s = self.s
s.addField(u'alpha', self.alpha)
s.removeField(u'alpha')
self.assertEquals(
[],
getFieldNamesInOrder(s))
def test_addFieldCollision(self):
s = self.s
s.addField(u'alpha', self.alpha)
self.assertRaises(KeyError, s.addField, 'alpha', self.alpha)
def test_removeFieldNotPresent(self):
self.assertRaises(KeyError, self.s.removeField, 'alpha')
def test_renameField(self):
s = self.s
s.addField(u'alpha', self.alpha)
s.renameField(u'alpha', 'beta')
self.assertEquals(
[u'beta'],
getFieldNamesInOrder(s))
def test_renameFieldCollision(self):
s = self.s
s.addField(u'alpha', self.alpha)
s.addField(u'beta', Text(title=u"Beta"))
self.assertRaises(KeyError, s.renameField, 'alpha', 'beta')
def test_renameFieldNotPresent(self):
self.assertRaises(KeyError, self.s.renameField, 'alpha', 'beta')
def test_insertField(self):
s = self.s
s.addField(u'alpha', self.alpha)
beta = Text(title=u"Beta")
s.insertField(u'beta', beta, 0)
self.assertEquals(
[u'beta', u'alpha'],
getFieldNamesInOrder(s))
def test_insertFieldCollision(self):
s = self.s
s.addField(u'alpha', self.alpha)
beta = Text(title=u"Beta")
self.assertRaises(KeyError, s.insertField, 'alpha', beta, 0)
def test_insertFieldCornerCases(self):
s = self.s
gamma = Text(title=u"Gamma")
# it's still possible to insert at beginning
s.insertField(u'gamma', gamma, 0)
self.assertEquals(
[u'gamma'],
getFieldNamesInOrder(s))
# should be allowed to insert field at the end
s.insertField(u'alpha', self.alpha, 1)
self.assertEquals(
[u'gamma', u'alpha'],
getFieldNamesInOrder(s))
# should be allowed to insert field at the beginning still
delta = Text(title=u"Delta")
s.insertField(u'delta', delta, 0)
self.assertEquals(
[u'delta', u'gamma', u'alpha'],
getFieldNamesInOrder(s))
def test_insertFieldBeyondEnd(self):
s = self.s
s.addField(u'alpha', self.alpha)
beta = Text(title=u"Beta")
self.assertRaises(IndexError, s.insertField,
'beta', beta, 100)
def test_insertFieldBeforeBeginning(self):
s = self.s
s.addField(u'alpha', self.alpha)
beta = Text(title=u"Beta")
self.assertRaises(IndexError, s.insertField,
'beta', beta, -1)
def test_moveField(self):
s = self.s
s.addField(u'alpha', self.alpha)
beta = Text(title=u'Beta')
s.addField(u'beta', beta)
gamma = Text(title=u'Gamma')
s.addField(u'gamma', gamma)
s.moveField(u'beta', 3)
self.assertEquals(
[u'alpha', u'gamma', u'beta'],
getFieldNamesInOrder(s))
s.moveField(u'beta', 2)
self.assertEquals(
[u'alpha', u'gamma', u'beta'],
getFieldNamesInOrder(s))
s.moveField(u'beta', 1)
self.assertEquals(
[u'alpha', u'beta', u'gamma'],
getFieldNamesInOrder(s))
s.moveField(u'beta', 0)
self.assertEquals(
[u'beta', u'alpha', u'gamma'],
getFieldNamesInOrder(s))
def test_moveFieldBeyondEnd(self):
s = self.s
s.addField(u'alpha', self.alpha)
beta = Text(title=u"Beta")
s.addField(u'beta', beta)
self.assertRaises(IndexError, s.moveField,
'beta', 100)
def test_moveFieldBeforeBeginning(self):
s = self.s
s.addField(u'alpha', self.alpha)
beta = Text(title=u"Beta")
s.addField(u'beta', beta)
self.assertRaises(IndexError, s.moveField,
'beta', -1)
def test_traverseToField(self):
context = xmlconfig.file("fields.zcml", zope.app.schema.tests)
s = self.s
s.addField(u'alpha', self.alpha)
s = ProxyFactory(s)
newSecurityManager(system_user)
f1 = ProxyFactory(s[u'alpha'])
order = f1.order
f1 = zapi.traverse(s, 'alpha')
self.assertEquals(f1.order, self.alpha.order)
title = zapi.traverse(f1, 'title')
self.assertEquals(title, self.alpha.title)
fields = getFieldsInOrder(s)
for k, v in fields:
self.failUnless(v.title is not None)
def tearDown(self):
setup.placefulTearDown()
def test_suite():
return TestSuite((
makeSuite(SchemaUtilityTests),
))
=== Added File Zope3/src/zope/app/schema/tests/test_schemautilitypersistence.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for Schema Utility Persistence
$Id: test_schemautilitypersistence.py,v 1.1 2004/03/10 00:57:57 srichter Exp $
"""
import unittest
from persistent.tests.persistenttestbase import PersistentTest, DM
from zope.app.schema.wrapper import Struct
from zope.app.schema.schema import SchemaUtility
from zope.schema import Text, getFieldsInOrder
from zope.app.tests import setup
class PSchema(SchemaUtility):
def __init__(self):
super(PSchema, self).__init__()
self.x = 0
self.setName('PSchema')
def inc(self):
self.x += 1
class PersistentSchemaUtilityTest(PersistentTest):
klass = PSchema
def setUp(self):
PersistentTest.setUp(self)
setup.placefulSetUp(self)
def testState(self):
pass
# def testChangeField(self):
# f = Text(title=u'alpha')
# p = self.klass()
# p._p_oid = '\0\0\0\0\0\0hi'
# dm = DM()
# p._p_jar = dm
# self.assertEqual(p._p_changed, 0)
# self.assertEqual(dm.called, 0)
# p.addField('alpha', f)
# self.assertEqual(p._p_changed, 1)
# self.assertEqual(dm.called, 1)
# p._p_changed = 0
# self.assertEqual(p._p_changed, 0)
# self.assertEqual(dm.called, 1)
# field = p['alpha']
# field.title = u'Beta'
# self.assertEqual(f._p_changed, 1)
# self.assertEqual(p._p_changed, 1)
# self.assertEqual(dm.called, 2)
# def testAddField(self):
# f = Text(title=u'alpha')
# p = self.klass()
# p._p_oid = '\0\0\0\0\0\0hi'
# dm = DM()
# p._p_jar = dm
# self.assertEqual(p._p_changed, 0)
# self.assertEqual(dm.called, 0)
# p.addField('alpha', f)
# self.assertEqual(p._p_changed, 1)
# self.assertEqual(dm.called, 1)
# def testRemoveField(self):
# f = Text(title=u'alpha')
# p = self.klass()
# p._p_oid = '\0\0\0\0\0\0hi'
# dm = DM()
# p._p_jar = dm
# self.assertEqual(p._p_changed, 0)
# self.assertEqual(dm.called, 0)
# p.addField('alpha', f)
# self.assertEqual(p._p_changed, 1)
# self.assertEqual(dm.called, 1)
# p._p_changed = 0
# self.assertEqual(p._p_changed, 0)
# self.assertEqual(dm.called, 1)
# p.removeField('alpha')
# self.assertEqual(p._p_changed, 1)
# self.assertEqual(dm.called, 2)
def tearDown(self):
PersistentTest.tearDown(self)
setup.placefulTearDown()
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(PersistentSchemaUtilityTest))
return suite
if __name__ == '__main__':
unittest.main()
=== Added File Zope3/src/zope/app/schema/tests/test_wrapper.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import unittest
from persistent import Persistent, GHOST, UPTODATE
from persistent.tests.persistenttestbase import DM as BaseDM, BrokenDM
from zope.interface import Interface, directlyProvides, directlyProvidedBy
from zope.app.schema.wrapper import Struct
from zope.app.container.contained import ContainedProxy, getProxiedObject
class IDummy(Interface): pass
class Dummy(object):
def __init__(self, x=0):
self.x = x
DUMMY = Dummy(42)
class DM(BaseDM):
def setstate(self, ob):
ob.__setstate__({'__proxied__': DUMMY})
def makeInstance(self):
d = Dummy()
p = Struct(d)
return p
class Test(unittest.TestCase):
klass = None # override in subclass
def testSaved(self):
p = self.klass()
p._p_oid = '\0\0\0\0\0\0hi'
dm = DM()
p._p_jar = dm
self.assertEqual(p._p_changed, 0)
self.assertEqual(dm.called, 0)
p.x += 1
self.assertEqual(p._p_changed, 1)
self.assertEqual(dm.called, 1)
p.x += 1
self.assertEqual(p._p_changed, 1)
self.assertEqual(dm.called, 1)
p._p_deactivate()
self.assertEqual(p._p_changed, 1)
self.assertEqual(dm.called, 1)
p._p_deactivate()
self.assertEqual(p._p_changed, 1)
self.assertEqual(dm.called, 1)
p._p_invalidate()
self.assertEqual(p._p_changed, None)
self.assertEqual(dm.called, 1)
p.x += 1
self.assertEqual(p.x, 43)
self.assertEqual(p._p_changed, 1)
self.assertEqual(dm.called, 2)
p._p_changed = 0
self.assertEqual(p._p_changed, 0)
self.assertEqual(dm.called, 2)
self.assertEqual(p.x, 43)
p.x += 1
self.assertEqual(p._p_changed, 1)
self.assertEqual(dm.called, 3)
def testUnsaved(self):
p = self.klass()
self.assertEqual(p.x, 0)
self.assertEqual(p._p_changed, 0)
self.assertEqual(p._p_jar, None)
self.assertEqual(p._p_oid, None)
p.x += 1
p.x += 1
self.assertEqual(p.x, 2)
self.assertEqual(p._p_changed, 0)
p._p_deactivate()
self.assertEqual(p._p_changed, 0)
p._p_changed = 1
self.assertEqual(p._p_changed, 0)
p._p_deactivate()
self.assertEqual(p._p_changed, 0)
p._p_invalidate()
self.assertEqual(p._p_changed, None)
if self.has_dict:
self.failUnless(p.__dict__)
self.assertEqual(p.x, 2)
def testState(self):
p = self.klass()
d = Dummy()
self.failUnless(p.__getstate__().has_key('__proxied__'))
self.assertEqual(p._p_changed, 0)
p.__setstate__({'__proxied__':d})
self.assertEqual(p._p_changed, 0)
if self.has_dict:
p._v_foo = 2
self.failUnless(p.__getstate__(), {'__proxied__': d})
self.assertEqual(p._p_changed, 0)
def testSetStateSerial(self):
p = self.klass()
p._p_serial = 'abcdefgh'
p.__setstate__(p.__getstate__())
self.assertEqual(p._p_serial, 'abcdefgh')
def testDirectChanged(self):
p = self.klass()
p._p_oid = 1
dm = DM()
p._p_jar = dm
self.assertEqual(p._p_changed, 0)
self.assertEqual(dm.called, 0)
p._p_changed = 1
self.assertEqual(dm.called, 1)
def testGhostChanged(self):
# An object is a ghost, and it's _p_changed it set to True.
# This assignment should have no effect.
p = self.klass()
p._p_oid = 1
dm = DM()
p._p_jar = dm
p._p_deactivate()
self.assertEqual(p._p_state, GHOST)
p._p_changed = True
self.assertEqual(p._p_state, GHOST)
def testRegistrationFailure(self):
p = self.klass()
p._p_oid = 1
dm = BrokenDM()
p._p_jar = dm
self.assertEqual(p._p_changed, 0)
self.assertEqual(dm.called, 0)
try:
p._p_changed = 1
except NotImplementedError:
pass
else:
raise AssertionError("Exception not propagated")
self.assertEqual(dm.called, 1)
self.assertEqual(p._p_changed, 0)
def testLoadFailure(self):
p = self.klass()
p._p_oid = 1
dm = BrokenDM()
p._p_jar = dm
p._p_deactivate() # make it a ghost
try:
p._p_activate()
except NotImplementedError:
pass
else:
raise AssertionError("Exception not propagated")
self.assertEqual(p._p_changed, None)
def testActivate(self):
p = self.klass()
dm = DM()
p._p_oid = 1
p._p_jar = dm
p._p_changed = 0
p._p_deactivate()
# XXX does this really test the activate method?
p._p_activate()
self.assertEqual(p._p_state, UPTODATE)
self.assertEqual(p.x, 42)
def testDeactivate(self):
p = self.klass()
dm = DM()
p._p_oid = 1
p._p_deactivate() # this deactive has no effect
self.assertEqual(p._p_state, UPTODATE)
p._p_jar = dm
p._p_changed = 0
p._p_deactivate()
self.assertEqual(p._p_state, GHOST)
p._p_activate()
self.assertEqual(p._p_state, UPTODATE)
self.assertEqual(p.x, 42)
# XXX to do this right and expose both IPersistent and the
# underlying object's interfaces, we'd need to use a specialized
# descriptor. This would create to great a dependency on
# zope.interface.
# def testInterface(self):
# from persistent.interfaces import IPersistent
# self.assert_(IPersistent.implementedBy(Persistent),
# "%s does not implement IPersistent" % Persistent)
# p = Persistent()
# self.assert_(IPersistent.providedBy(p),
# "%s does not implement IPersistent" % p)
# self.assert_(IPersistent.implementedBy(Struct),
# "%s does not implement IPersistent" % Struct)
# p = self.klass()
# self.assert_(IPersistent.providedBy(p),
# "%s does not implement IPersistent" % p)
def testDataManagerAndAttributes(self):
# Test to cover an odd bug where the instance __dict__ was
# set at the same location as the data manager in the C type.
p = self.klass()
p.x += 1
p.x += 1
self.assert_('__proxied__' in p.__dict__)
self.assert_(p._p_jar is None)
def testMultipleInheritance(self):
# make sure it is possible to inherit from two different
# subclasses of persistent.
class A(Persistent):
pass
class B(Persistent):
pass
class C(A, B):
pass
class D(object):
pass
class E(D, B):
pass
def testMultipleMeta(self):
# make sure it's possible to define persistent classes
# with a base whose metaclass is different
class alternateMeta(type):
pass
class alternate(object):
__metaclass__ = alternateMeta
class mixedMeta(alternateMeta, type):
pass
class mixed(alternate, Persistent):
__metaclass__ = mixedMeta
def testSlots(self):
# Verify that Persistent classes behave the same way
# as pure Python objects where '__slots__' and '__dict__'
# are concerned.
class noDict(object):
__slots__ = ['foo']
class shouldHaveDict(noDict):
pass
class p_noDict(Persistent):
__slots__ = ['foo']
class p_shouldHaveDict(p_noDict):
pass
self.assertEqual(noDict.__dictoffset__, 0)
self.assertEqual(p_noDict.__dictoffset__, 0)
self.assert_(shouldHaveDict.__dictoffset__ <> 0)
self.assert_(p_shouldHaveDict.__dictoffset__ <> 0)
def testBasicTypeStructure(self):
# test that a persistent class has a sane C type structure
# use P (defined below) as simplest example
self.assertEqual(Persistent.__dictoffset__, 0)
self.assertEqual(Persistent.__weakrefoffset__, 0)
self.assert_(Persistent.__basicsize__ > object.__basicsize__)
self.assert_(Struct.__dictoffset__)
self.assert_(Struct.__weakrefoffset__)
self.assert_(Struct.__dictoffset__ < Struct.__weakrefoffset__)
self.assert_(Struct.__basicsize__ > Persistent.__basicsize__)
def testDeactivateErrors(self):
p = self.klass()
p._p_oid = '\0\0\0\0\0\0hi'
dm = DM()
p._p_jar = dm
def typeerr(*args, **kwargs):
self.assertRaises(TypeError, p, *args, **kwargs)
typeerr(1)
typeerr(1, 2)
typeerr(spam=1)
typeerr(spam=1, force=1)
p._p_changed = True
class Err(object):
def __nonzero__(self):
raise RuntimeError
typeerr(force=Err())
class PersistentTest(Test):
klass = makeInstance
has_dict = 1
def testPicklable(self):
import pickle
p = self.klass()
p.x += 1
p2 = pickle.loads(pickle.dumps(p))
self.assertEqual(p2.__class__, p.__class__);
self.failUnless(p2.__dict__.has_key('__proxied__'))
self.assertEqual(p2.__dict__['__proxied__'].__dict__,
p.__dict__['__proxied__'].__dict__)
def testContainedPicklable(self):
import pickle
p = self.klass()
p = ContainedProxy(p)
p.x += 1
p2 = pickle.loads(pickle.dumps(p))
pa = getProxiedObject(p)
pb = getProxiedObject(p2)
self.assertEqual(pb.__class__, pa.__class__);
self.failUnless(pb.__dict__.has_key('__proxied__'))
self.assertEqual(pb.__dict__['__proxied__'].__dict__,
pa.__dict__['__proxied__'].__dict__)
def testDirectlyProvides(self):
p = self.klass()
self.failIf(IDummy.providedBy(p))
directlyProvides(p, directlyProvidedBy(p), IDummy)
self.failUnless(IDummy.providedBy(p))
def test_suite():
return unittest.makeSuite(PersistentTest)
More information about the Zope3-Checkins
mailing list