[Zope-CVS] CVS: Products/AdaptableStorage/tests - .cvsignore:1.1 SerialTestBase.py:1.1 __init__.py:1.1 testASStorage.py:1.1 testAll.py:1.1 testInterfaceImpl.py:1.1 testSerialization.py:1.1 testZope2FS.py:1.1
Shane Hathaway
shane@zope.com
Wed, 27 Nov 2002 13:37:09 -0500
Update of /cvs-repository/Products/AdaptableStorage/tests
In directory cvs.zope.org:/tmp/cvs-serv12157/tests
Added Files:
.cvsignore SerialTestBase.py __init__.py testASStorage.py
testAll.py testInterfaceImpl.py testSerialization.py
testZope2FS.py
Log Message:
Moved the latest AdaptableStorage work out of the private repository.
It took a long time, but I moved it as soon as all the unit tests
passed and I felt that all the interface names and conventions were
good enough.
Documentation is still minimal, but now I think the system is finally
straight enough in my head to write down. :-) If you want a sneak
peek, the interfaces have some docstrings, if you're looking for a
"tree" view, while the OpenOffice diagram presents something of a
"forest" view.
Also note that I'm trying a new coding convention. The "public"
module in each package defines exactly which objects should be
exported from the package. This solves a few problems with imports
such as doubling of names and shadowing of modules. Overall, the
"public" module makes it easier to tell which classes are supposed to
be used by other packages, and makes it easier for other packages to
use the public classes. See what you think.
=== Added File Products/AdaptableStorage/tests/.cvsignore ===
*.pyc
=== Added File Products/AdaptableStorage/tests/SerialTestBase.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Serialization test setup/teardown
$Id: SerialTestBase.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""
import ZODB
from Persistence import PersistentMapping
from Products.AdaptableStorage.serial import public as sfw
from Products.AdaptableStorage.serial_std.public import \
StringDataAttribute, MappingGateway, FixedPersistentMapping, RollCall
class SimpleItemsAspect:
__implements__ = sfw.IAspectSerializer
schema = sfw.RecordSchema()
schema.addColumn('name')
schema.addColumn('value')
def getSchema(self):
return self.schema
def serialize(self, object, event):
res = object.items()
res.sort()
for k, v in res:
event.notifySerialized(k, v)
event.ignoreAttribute('data')
event.ignoreAttribute('_container')
return res
def deserialize(self, object, event, state):
d = {}
for k, v in state:
d[k] = v
object.__init__(d)
class SerialTestBase:
def setUp(self):
dm = sfw.DomainMapper(None)
self.dm = dm
class_info = (('Persistence', 'PersistentMapping'), None)
ser1 = sfw.ObjectSerializer(class_info)
items_aspect = SimpleItemsAspect()
ser1.addAspect('items', items_aspect)
props_aspect = StringDataAttribute('strdata')
ser1.addAspect('properties', props_aspect)
ser1.addAspect('roll_call', RollCall())
gw1 = sfw.ObjectGateway()
items_gw = MappingGateway(items_aspect.getSchema())
self.items_gw = items_gw
gw1.addGateway('items', items_gw)
props_gw = MappingGateway(props_aspect.getSchema())
self.props_gw = props_gw
gw1.addGateway('properties', props_gw)
om1 = sfw.ObjectMapper(ser1, gw1, dm)
self.om1 = om1
dm.addMapper('test', om1)
ser2 = sfw.ObjectSerializer(class_info)
fixed_items_aspect = FixedPersistentMapping({'TestRoot': ('test', '')})
ser2.addAspect('fixed_items', fixed_items_aspect)
ser2.addAspect('roll_call', RollCall())
om2 = sfw.ObjectMapper(ser2, sfw.ObjectGateway(), dm)
self.om2 = om2
dm.addMapper('root', om2)
def tearDown(self):
pass
=== Added File Products/AdaptableStorage/tests/__init__.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""AdaptableStorage tests package
$Id: __init__.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""
=== Added File Products/AdaptableStorage/tests/testASStorage.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Storage tests (with data stored in simple mappings)
$Id: testASStorage.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""
import unittest
import ZODB
from Persistence import PersistentMapping
from Products.AdaptableStorage.zodb.ASDB import ASDB
from Products.AdaptableStorage.zodb.ASStorage import ASStorage
from Products.AdaptableStorage.zodb.StaticResource import StaticResource
from SerialTestBase import SerialTestBase
class ASStorageTests (SerialTestBase, unittest.TestCase):
def setUp(self):
SerialTestBase.setUp(self)
resource = StaticResource(self.dm)
storage = ASStorage(resource)
self.storage = storage
db = ASDB(storage, resource)
self.db = db
def tearDown(self):
self.db.close()
SerialTestBase.tearDown(self)
def testStoreAndLoad(self):
ob = PersistentMapping()
ob.strdata = '345'
ob['a'] = 'b'
ob['c'] = 'd'
conn1 = self.db.open()
conn2 = None
conn3 = None
try:
# Load the root and create a new object
root = conn1['root:']
get_transaction().begin()
root['TestRoot'] = ob
get_transaction().commit()
ob1 = conn1['test:']
self.assertEqual(ob1.strdata, ob.strdata)
self.assertEqual(ob1.items(), ob.items())
# Verify a new object was stored and make a change
get_transaction().begin()
conn2 = self.db.open()
ob2 = conn2['test:']
self.assertEqual(ob2.strdata, ob.strdata)
self.assertEqual(ob2.items(), ob.items())
ob2.strdata = '678'
get_transaction().commit()
# Verify the change was stored and make another change
conn3 = self.db.open()
ob3 = conn3['test:']
self.assertEqual(ob3.strdata, '678')
self.assertEqual(ob3.items(), ob.items())
ob3.strdata = '901'
get_transaction().commit()
conn3.close()
conn3 = None
conn3 = self.db.open()
ob3 = conn3['test:']
self.assertEqual(ob3.strdata, '901')
# Verify we didn't accidentally change the original object
self.assertEqual(ob.strdata, '345')
finally:
conn1.close()
if conn2 is not None:
conn2.close()
if conn3 is not None:
conn3.close()
if __name__ == '__main__':
unittest.main()
=== Added File Products/AdaptableStorage/tests/testAll.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run all unit tests
$Id: testAll.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""
import unittest
from testSerialization import SerializationTests
from testInterfaceImpl import InterfaceImplTests
from testASStorage import ASStorageTests
from testZope2FS import Zope2FSTests
if __name__ == '__main__':
unittest.main()
=== Added File Products/AdaptableStorage/tests/testInterfaceImpl.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Interface implementation tests
$Id: testInterfaceImpl.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""
import unittest
from types import ListType, TupleType
from Interface import Interface
from Interface.Verify import verifyClass
class InterfaceImplTests(unittest.TestCase):
def _testObjectImpl(self, c):
impl = c.__implements__
if isinstance(impl, ListType) or isinstance(impl, TupleType):
for iface in impl:
self._verify(iface, c)
else:
self._verify(impl, c)
def _testAllInModule(self, m):
for attr, value in m.__dict__.items():
if (hasattr(value, '__implements__') and
not Interface.isImplementedBy(value)):
self._testObjectImpl(value)
def _verify(self, iface, c):
verifyClass(iface, c)
for base in iface.getBases():
self._verify(base, c)
def testSerialPublicImplementations(self):
from Products.AdaptableStorage.serial import public
self._testAllInModule(public)
def testSerialStdImplementations(self):
from Products.AdaptableStorage.serial_std import public
self._testAllInModule(public)
def testSerialOFSImplementations(self):
from Products.AdaptableStorage.serial_ofs import public
self._testAllInModule(public)
def testGatewayFSImplementations(self):
from Products.AdaptableStorage.gateway_fs import public
self._testAllInModule(public)
if __name__ == '__main__':
unittest.main()
=== Added File Products/AdaptableStorage/tests/testSerialization.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Serialization tests
$Id: testSerialization.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""
import unittest
import ZODB
from Persistence import PersistentMapping
from Products.AdaptableStorage.serial import public as sfw
from SerialTestBase import SerialTestBase
class SerializationTests(SerialTestBase, unittest.TestCase):
def getKeyedObjectSystem(self):
# XXX works for now
return None
def testSerializeAndDeserialize(self):
ob = PersistentMapping()
ob.strdata = '345'
ob['a'] = 'b'
ob['c'] = 'd'
kos = self.getKeyedObjectSystem()
mapper = self.dm.getMapper('test')
full_state, refs = mapper.getSerializer().serialize(
mapper, '', ob, kos)
ob2 = PersistentMapping()
mapper.getSerializer().deserialize(mapper, '', ob2, kos, full_state)
self.assertEqual(ob.strdata, ob2.strdata)
self.assertEqual(ob.items(), ob2.items())
def testStoreAndLoad(self):
ob = PersistentMapping()
ob.strdata = '345'
ob['a'] = 'b'
ob['c'] = 'd'
kos = self.getKeyedObjectSystem()
mapper = self.dm.getMapper('test')
full_state, refs = mapper.getSerializer().serialize(
mapper, '', ob, kos)
mapper.getGateway().store(mapper, '', full_state)
full_state, serial = mapper.getGateway().load(mapper, '')
ob2 = PersistentMapping()
mapper.getSerializer().deserialize(mapper, '', ob2, kos, full_state)
self.assertEqual(ob.strdata, ob2.strdata)
self.assertEqual(ob.items(), ob2.items())
def testCatchExtraAttribute(self):
ob = PersistentMapping()
ob.strdata = '345'
ob.extra = '678'
ob['a'] = 'b'
ob['c'] = 'd'
kos = self.getKeyedObjectSystem()
mapper = self.dm.getMapper('test')
self.assertRaises(sfw.SerializationError,
mapper.getSerializer().serialize, mapper,
'', ob, kos)
if __name__ == '__main__':
unittest.main()
=== Added File Products/AdaptableStorage/tests/testZope2FS.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test of storing folders on the filesystem via ZODB
$Id: testZope2FS.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""
import os
from shutil import rmtree
import unittest
from tempfile import mktemp
import ZODB
from Persistence import PersistentMapping
from Products.AdaptableStorage.zodb.ASDB import ASDB
from Products.AdaptableStorage.zodb.ASStorage import ASStorage
from Products.AdaptableStorage.zodb.StaticResource import StaticResource
from Products.AdaptableStorage.Zope2FS import createDomainMapper
class Zope2FSTests (unittest.TestCase):
def setUp(self):
path = mktemp()
os.mkdir(path)
self.path = path
dm = createDomainMapper(path)
self.dm = dm
resource = StaticResource(dm)
storage = ASStorage(resource)
self.storage = storage
db = ASDB(storage, resource)
self.db = db
def tearDown(self):
self.db.close()
rmtree(self.path)
def testLoad(self):
conn = self.db.open()
try:
app = conn.root()['Application']
app.getId()
finally:
conn.close()
def testStore(self):
conn = self.db.open()
try:
from OFS.Folder import Folder
app = conn.root()['Application']
f = Folder()
f.id = 'Holidays'
app._setObject(f.id, f, set_owner=0)
get_transaction().commit()
f2 = Folder()
f2.id = 'Christmas'
f._setObject(f2.id, f2, set_owner=0)
get_transaction().commit()
conn2 = self.db.open()
try:
app = conn2.root()['Application']
self.assert_(hasattr(app, 'Holidays'))
self.assert_(hasattr(app.Holidays, 'Christmas'))
finally:
conn2.close()
finally:
conn.close()
if __name__ == '__main__':
unittest.main()