[Zope-CVS] CVS: Products/Ape/lib/apelib/tests -
testzodbtables.py:1.2 serialtestbase.py:1.4 testall.py:1.7
testio.py:1.5 testscanner.py:1.3 testserialization.py:1.6
testsqlimpl.py:1.2 teststorage.py:1.6 testzope2fs.py:1.5
testzope2sql.py:1.7 zope2testbase.py:1.6
Shane Hathaway
shane at zope.com
Mon Feb 2 10:07:53 EST 2004
Update of /cvs-repository/Products/Ape/lib/apelib/tests
In directory cvs.zope.org:/tmp/cvs-serv26672/lib/apelib/tests
Modified Files:
serialtestbase.py testall.py testio.py testscanner.py
testserialization.py testsqlimpl.py teststorage.py
testzope2fs.py testzope2sql.py zope2testbase.py
Added Files:
testzodbtables.py
Log Message:
Moved ape-0_8-branch to the HEAD.
>From CHANGES.txt:
- Major restructuring to reduce the number of concepts in
Ape. Keychains and keys have been replaced with simple string OIDs.
There is now a flat namespace of mappers instead of a tree. Only
one classifier and one OID generator are used in any object
database.
- The ZODB root object is now stored on the filesystem.
=== Products/Ape/lib/apelib/tests/testzodbtables.py 1.1 => 1.2 ===
--- /dev/null Mon Feb 2 10:07:53 2004
+++ Products/Ape/lib/apelib/tests/testzodbtables.py Mon Feb 2 10:07:22 2004
@@ -0,0 +1,254 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""zodbtables tests.
+
+$Id$
+"""
+
+import unittest
+from time import time
+
+from apelib.zodb3 import zodbtables
+
+
+TEST_DATA = [
+ {'name': 'Jose',
+ 'sex': 'm',
+ 'address': '101 Example St.',
+ 'phone': '123-4567',
+ },
+ {'name': 'Maria',
+ 'sex': 'f',
+ 'address': '102 Example St.',
+ },
+ {'name': 'Carlos',
+ 'sex': 'm',
+ 'phone': '987-6543',
+ },
+ {'name': 'Tiago',
+ 'sex': 'm',
+ 'phone': '123-4567',
+ },
+ {'name': 'Ana',
+ 'sex': 'f',
+ 'phone': '123-4567',
+ },
+ ]
+
+
+class ZODBTableTests(unittest.TestCase):
+
+ table_schema = zodbtables.TableSchema()
+ table_schema.addColumn('name', primary=1, indexed=1)
+ table_schema.addColumn('sex', indexed=1)
+ table_schema.addColumn('address')
+ table_schema.addColumn('phone', indexed=1)
+
+ def setUp(self):
+ self.table = table = zodbtables.Table(self.table_schema)
+ for data in TEST_DATA:
+ table.insert(data)
+
+ def tearDown(self):
+ get_transaction().abort()
+
+ def testSelectByName(self):
+ # Searches by primary key
+ records = self.table.select({'name': 'Jose'})
+ self.assertEqual(len(records), 1)
+ self.assertEqual(records[0]['address'], '101 Example St.')
+
+ def testSelectByUnknownName(self):
+ # Searches by primary key
+ records = self.table.select({'name': 'Joao'})
+ self.assertEqual(len(records), 0)
+
+ def testSelectByPhone(self):
+ # Searches by index
+ records = self.table.select({'phone': '987-6543'})
+ self.assertEqual(len(records), 1)
+ self.assertEqual(records[0]['name'], 'Carlos')
+
+ def testSelectByAddress(self):
+ # Searches one-by-one
+ records = self.table.select({'address': '102 Example St.'})
+ self.assertEqual(len(records), 1)
+ self.assertEqual(records[0]['name'], 'Maria')
+
+ def testSelectMales(self):
+ records = self.table.select({'sex': 'm'})
+ self.assertEqual(len(records), 3)
+
+ def testSelectFemales(self):
+ records = self.table.select({'sex': 'f'})
+ self.assertEqual(len(records), 2)
+
+ def testSelectByNameAndSex(self):
+ records = self.table.select({'name': 'Jose', 'sex': 'm'})
+ self.assertEqual(len(records), 1)
+
+ def testSelectByNameAndIncorrectSex(self):
+ records = self.table.select({'name': 'Jose', 'sex': 'f'})
+ self.assertEqual(len(records), 0)
+
+ def testSelectBySexAndPhone(self):
+ # Intersects two indexes
+ records = self.table.select({'phone': '123-4567', 'sex': 'm'})
+ self.assertEqual(len(records), 2)
+
+ def testSelectAll(self):
+ records = self.table.select({})
+ self.assertEqual(len(records), 5)
+
+ def testInsertMinimal(self):
+ self.table.insert({'name': 'Edson'})
+
+ def testInsertDuplicate(self):
+ self.assertRaises(zodbtables.DuplicateError,
+ self.table.insert, {'name':'Carlos'})
+
+ def testInsertWithoutPrimaryKey(self):
+ self.assertRaises(ValueError, self.table.insert, {})
+
+ def testUpdateNewAddress(self):
+ # Test adding a value in a non-indexed column
+ self.table.update({'name': 'Carlos'}, {'address': '99 Sohcahtoa Ct.'})
+ records = self.table.select({'address': '99 Sohcahtoa Ct.'})
+ self.assertEqual(len(records), 1)
+ self.assertEqual(records[0]['name'], 'Carlos')
+
+ def testUpdateChangeAddress(self):
+ # Test changing a value in a non-indexed column
+ self.table.update({'name': 'Jose'}, {'address': '99 Sohcahtoa Ct.'})
+ records = self.table.select({'address': '99 Sohcahtoa Ct.'})
+ self.assertEqual(len(records), 1)
+ self.assertEqual(records[0]['name'], 'Jose')
+
+ def testUpdateFemaleAddresses(self):
+ # Test changing and adding simultaneously in a non-indexed column
+ self.table.update({'sex': 'f'}, {'address': '99 Sohcahtoa Ct.'})
+ records = self.table.select({'address': '99 Sohcahtoa Ct.'})
+ self.assertEqual(len(records), 2)
+
+
+ def testUpdateChangePhone(self):
+ # Test changing a value in an indexed column
+ records = self.table.select({'phone': '123-4567'})
+ self.assertEqual(len(records), 3) # Precondition
+
+ self.table.update({'name': 'Jose'}, {'phone': '111-5555'})
+ records = self.table.select({'phone': '123-4567'})
+ self.assertEqual(len(records), 2)
+ records = self.table.select({'phone': '111-5555'})
+ self.assertEqual(len(records), 1)
+ self.assertEqual(records[0]['name'], 'Jose')
+
+
+ def testUpdateChangeName(self):
+ # Test changing a value in a primary key column
+ records = self.table.select({'name': 'Jose'})
+ self.assertEqual(len(records), 1) # Precondition
+
+ self.table.update({'name': 'Jose'}, {'name': 'Marco'})
+ records = self.table.select({'name': 'Jose'})
+ self.assertEqual(len(records), 0)
+ records = self.table.select({'name': 'Marco'})
+ self.assertEqual(len(records), 1)
+
+
+ def testUpdateNameConflict(self):
+ self.assertRaises(zodbtables.DuplicateError, self.table.update,
+ {'name':'Jose'}, {'name': 'Carlos'})
+
+
+ def testDeleteNothing(self):
+ old_count = len(self.table.select({}))
+ self.assertEqual(self.table.delete({'name': 'Edson'}), 0)
+ new_count = len(self.table.select({}))
+ self.assert_(old_count == new_count)
+
+
+ def testDeleteAll(self):
+ count = len(self.table.select({}))
+ self.assert_(count > 0)
+ self.assertEqual(self.table.delete({}), count)
+ new_count = len(self.table.select({}))
+ self.assert_(new_count == 0)
+
+
+ def testDeleteOne(self):
+ # Test deletion of one row
+ records = self.table.select({'name': 'Jose'})
+ self.assertEqual(len(records), 1) # Precondition
+ records = self.table.select({'phone': '123-4567'})
+ self.assertEqual(len(records), 3) # Precondition
+
+ count = self.table.delete({'name': 'Jose'})
+ self.assertEqual(count, 1)
+ records = self.table.select({'name': 'Jose'})
+ self.assertEqual(len(records), 0)
+ records = self.table.select({'phone': '123-4567'})
+ self.assertEqual(len(records), 2)
+
+
+ def testDeleteByPhone(self):
+ records = self.table.select({'phone': '123-4567'})
+ self.assertEqual(len(records), 3) # Precondition
+
+ count = self.table.delete({'phone': '123-4567'})
+ self.assertEqual(count, 3)
+ records = self.table.select({'phone': '123-4567'})
+ self.assertEqual(len(records), 0)
+ records = self.table.select({'name': 'Jose'})
+ self.assertEqual(len(records), 0)
+
+ # Make sure it didn't delete other data
+ records = self.table.select({'name': 'Maria'})
+ self.assertEqual(len(records), 1)
+
+ def testSelectPartialPrimaryKey(self):
+ # Select by only one part of a primary key
+ schema = zodbtables.TableSchema()
+ schema.addColumn('name', primary=1)
+ schema.addColumn('id', primary=1)
+ table = zodbtables.Table(schema)
+ table.insert({'name': 'joe', 'id': 1})
+ table.insert({'name': 'john', 'id': 2})
+ records = table.select({'name': 'joe'})
+ self.assertEqual(len(records), 1)
+
+
+class ZODBTableTestsWithoutPrimaryKey(ZODBTableTests):
+ # Same tests but with no primary key. The absence of a primary
+ # key affects many branches of the code.
+ table_schema = zodbtables.TableSchema()
+ table_schema.addColumn('name', indexed=1)
+ table_schema.addColumn('sex', indexed=1)
+ table_schema.addColumn('address')
+ table_schema.addColumn('phone', indexed=1)
+
+ # Disabled tests
+ def testInsertWithoutPrimaryKey(self):
+ pass
+
+ def testInsertDuplicate(self):
+ pass
+
+ def testUpdateNameConflict(self):
+ pass
+
+
+if __name__ == '__main__':
+ unittest.main()
+
=== Products/Ape/lib/apelib/tests/serialtestbase.py 1.3 => 1.4 ===
--- Products/Ape/lib/apelib/tests/serialtestbase.py:1.3 Mon May 26 14:16:02 2003
+++ Products/Ape/lib/apelib/tests/serialtestbase.py Mon Feb 2 10:07:22 2004
@@ -20,111 +20,50 @@
from Persistence import PersistentMapping
from cPickle import dumps, loads
-from apelib.core \
- import classifiers, gateways, keygen, mapper, schemas, serializers
-from apelib.core.interfaces import ISerializer
-
-from apelib.zodb3.serializers \
- import FixedPersistentMapping, RollCall, RemainingState
-
-
-class SimpleItemsSerializer:
-
- __implements__ = ISerializer
-
- schema = schemas.RowSequenceSchema()
- schema.addField('name')
- schema.addField('pickle')
-
- def getSchema(self):
- return self.schema
-
- def serialize(self, object, event):
- res = []
- for k, v in object.items():
- res.append((k, dumps(v)))
- event.notifySerialized(k, v, 0)
- res.sort()
- event.ignoreAttribute('data')
- event.ignoreAttribute('_container')
- return res
-
- def deserialize(self, object, event, state):
- d = {}
- for k, v in state:
- o = loads(v)
- d[k] = o
- event.notifyDeserialized(k, o)
- object.__init__(d)
+from apelib.core import classifiers, gateways
+from apelib.core import mapper, oidgen, schemas, serializers
+from apelib.zodb3.serializers import StringToPersistentPM, StringToPicklePM
+from apelib.zodb3.serializers import RemainingState, RollCall
-class SerialTestBase:
- def setUp(self):
- self.conns = {}
+class TestObject(PersistentMapping):
+ strdata = ""
+
+
+def addMapper(conf, klass, mapper_name):
+ """Adds a simple mapper to the configuration.
+ """
+ serializer = serializers.CompositeSerializer(
+ klass.__module__, klass.__name__)
+ gateway = gateways.RAMGateway(serializer.schema)
+ m = mapper.Mapper(serializer, gateway)
+ class_name = '%s.%s' % (klass.__module__, klass.__name__)
+ conf.mappers[mapper_name] = m
+ conf.classifier.register("class", class_name, mapper_name)
+ return m
- cfr = classifiers.FixedClassifier()
- cfr.register('test', 'test_mapper')
- cfr.register('test2', 'test_mapper_2')
-
- ser2 = serializers.CompositeSerializer(
- 'Persistence', 'PersistentMapping')
- fixed_items_serializer = FixedPersistentMapping()
- fixed_items_serializer.add('TestRoot', ('test',), ('test_mapper',))
- fixed_items_serializer.add('TestRoot2', ('test2',), ('test_mapper_2',))
- ser2.addSerializer('fixed_items', fixed_items_serializer)
- ser2.addSerializer('roll_call', RollCall())
-
- root_mapper = mapper.Mapper(
- None, ser2, gateways.CompositeGateway(), cfr)
- self.root_mapper = root_mapper
- # Since all of the keychains used by this mapper are fixed, we
- # don't want any keychains to be generated.
- root_mapper.setKeychainGenerator(keygen.NullKeychainGenerator())
-
- # Create "test_mapper", which allows a "strdata" attribute.
-
- ser1 = serializers.CompositeSerializer(
- 'Persistence', 'PersistentMapping')
- items_serializer = SimpleItemsSerializer()
- ser1.addSerializer('items', items_serializer)
- props_serializer = serializers.StringDataAttribute('strdata')
- ser1.addSerializer('properties', props_serializer)
- ser1.addSerializer('roll_call', RollCall())
-
- gw1 = gateways.CompositeGateway()
- items_gw = gateways.MappingGateway(items_serializer.getSchema())
- self.items_gw = items_gw
- gw1.addGateway('items', items_gw)
- props_gw = gateways.MappingGateway(props_serializer.getSchema())
- self.props_gw = props_gw
- gw1.addGateway('properties', props_gw)
-
- om1 = mapper.Mapper(root_mapper, ser1, gw1)
- self.om1 = om1
- root_mapper.addSubMapper('test_mapper', om1)
-
- # Create "test_mapper_2", which stores a remainder.
-
- ser = serializers.CompositeSerializer(
- 'Persistence', 'PersistentMapping')
- items_serializer = SimpleItemsSerializer()
- ser.addSerializer('items', items_serializer)
- remainder_serializer = RemainingState()
- ser.addSerializer('remainder', remainder_serializer)
-
- gw = gateways.CompositeGateway()
- items_gw = gateways.MappingGateway(items_serializer.getSchema())
- gw.addGateway('items', items_gw)
- remainder_gw = gateways.MappingGateway(
- remainder_serializer.getSchema())
- gw.addGateway('remainder', remainder_gw)
- om = mapper.Mapper(root_mapper, ser, gw)
- root_mapper.addSubMapper('test_mapper_2', om)
+class SerialTestBase:
- root_mapper.checkConfiguration()
+ def setUp(self):
+ schema = schemas.FieldSchema("classification", "classification")
+ cfr = classifiers.SimpleClassifier(gateways.RAMGateway(schema))
+ oid_gen = oidgen.SerialOIDGenerator()
+ self.conf = mapper.MapperConfiguration({}, cfr, oid_gen)
+
+ m = addMapper(self.conf, PersistentMapping, "pm")
+ m.serializer.add("items", StringToPersistentPM())
+ m.serializer.add("rollcall", RollCall())
+ m.gateway.schema = m.serializer.schema
+
+ m = addMapper(self.conf, TestObject, "tm")
+ m.serializer.add("items", StringToPicklePM())
+ m.serializer.add("remainder", RemainingState())
+ m.gateway.schema = m.serializer.schema
+ self.conf.check()
+ self.conns = {}
def tearDown(self):
pass
=== Products/Ape/lib/apelib/tests/testall.py 1.6 => 1.7 ===
--- Products/Ape/lib/apelib/tests/testall.py:1.6 Thu Aug 14 16:17:41 2003
+++ Products/Ape/lib/apelib/tests/testall.py Mon Feb 2 10:07:22 2004
@@ -38,17 +38,20 @@
from testzope2fs import Zope2FSTests, Zope2FSUnderscoreTests
from testparams import ParamsTests
from testsqlimpl import ApelibSQLImplTests
-from apelib.config.tests.test_minitables import MiniTableTests
+from testzodbtables import ZODBTableTests, ZODBTableTestsWithoutPrimaryKey
from testscanner import ScanControlTests, ScannerTests
+from testzope2sql import PsycopgTests, MySQLTests
import testzope2sql
+
sql_suite = testzope2sql.test_suite()
def test_suite():
suite = unittest.TestSuite()
for klass in (
SerializationTests,
- MiniTableTests,
+ ZODBTableTests,
+ ZODBTableTestsWithoutPrimaryKey,
ApelibImplTests,
ApeStorageTests,
ApeIOTests,
=== Products/Ape/lib/apelib/tests/testio.py 1.4 => 1.5 ===
--- Products/Ape/lib/apelib/tests/testio.py:1.4 Thu Aug 14 16:20:05 2003
+++ Products/Ape/lib/apelib/tests/testio.py Mon Feb 2 10:07:22 2004
@@ -22,22 +22,20 @@
from Persistence import PersistentMapping
from apelib.core import io
-from apelib.core.interfaces import IKeyedObjectSystem
-from serialtestbase import SerialTestBase
+from apelib.core.interfaces import IObjectDatabase
+from serialtestbase import SerialTestBase, TestObject
-class TestKeyedObjectSystem:
- __implements__ = IKeyedObjectSystem
+class TestObjectDatabase:
+ __implements__ = IObjectDatabase
- def getObject(self, keychain, hints=None):
+ def getObject(self, oid, hints=None):
raise NotImplementedError
- loadStub = getObject
-
- def identifyObject(self, obj):
+ def identify(self, obj):
raise NotImplementedError
- def newKey(self):
+ def new_oid(self):
raise NotImplementedError
def getClass(self, module, name):
@@ -48,46 +46,46 @@
class ApeIOTests(SerialTestBase, unittest.TestCase):
- def getKeyedObjectSystem(self):
- return TestKeyedObjectSystem()
+ def getObjectDatabase(self):
+ return TestObjectDatabase()
def testImpl(self):
# Test of test :-)
from Interface.Verify import verifyClass
- verifyClass(IKeyedObjectSystem, TestKeyedObjectSystem)
+ verifyClass(IObjectDatabase, TestObjectDatabase)
def testSerializeAndDeserialize(self):
- ob = PersistentMapping()
+ ob = TestObject()
ob.strdata = '345'
ob['a'] = 'b'
ob['c'] = 'd'
- keychain = ('test',)
- kos = self.getKeyedObjectSystem()
- obsys = io.ObjectSystemIO(self.root_mapper, kos)
- event, classified_state = obsys.serialize(keychain, ob)
+ oid = 'test'
+ obj_db = self.getObjectDatabase()
+ obsys = io.ObjectSystemIO(self.conf, obj_db)
+ event, classified_state = obsys.serialize(oid, ob)
ob2 = obsys.newObject(classified_state)
- obsys.deserialize(keychain, ob2, classified_state)
+ obsys.deserialize(oid, ob2, classified_state)
self.assertEqual(ob.strdata, ob2.strdata)
self.assertEqual(ob.data, ob2.data)
def testStoreAndLoad(self):
# Tests both serialization and storage
- ob = PersistentMapping()
+ ob = TestObject()
ob.strdata = '345'
ob['a'] = 'b'
ob['c'] = 'd'
- keychain = ('test',)
- kos = self.getKeyedObjectSystem()
- obsys = io.ObjectSystemIO(self.root_mapper, kos)
- gwsys = io.GatewayIO(self.root_mapper, self.conns)
- event, classified_state = obsys.serialize(keychain, ob)
- gwsys.store(keychain, classified_state)
+ oid = 'test'
+ obj_db = self.getObjectDatabase()
+ obsys = io.ObjectSystemIO(self.conf, obj_db)
+ gwsys = io.GatewayIO(self.conf, self.conns)
+ event, classified_state = obsys.serialize(oid, ob)
+ gwsys.store(oid, classified_state, True)
- event, cs, hash_value = gwsys.load(keychain)
+ event, cs, hash_value = gwsys.load(oid)
ob2 = obsys.newObject(cs)
- obsys.deserialize(keychain, ob2, cs)
+ obsys.deserialize(oid, ob2, cs)
self.assertEqual(ob.strdata, ob2.strdata)
self.assertEqual(ob.data, ob2.data)
@@ -95,22 +93,22 @@
def testExportImport(self):
root = PersistentMapping()
- test1 = PersistentMapping()
+ test1 = TestObject()
test1.strdata = '345'
test1['a'] = 'b'
test1['c'] = 'd'
root['TestRoot'] = test1
- test2 = PersistentMapping()
+ test2 = TestObject()
test2.leftover = 'oops'
test2['true'] = 'undecided'
root['TestRoot2'] = test2
- keychain = ()
- exporter = io.ExportImport(self.root_mapper, self.conns)
- exporter.exportObject(root, keychain)
+ oid = ''
+ exporter = io.ExportImport(self.conf, self.conns)
+ exporter.exportObject(root, oid)
- importer = io.ExportImport(self.root_mapper, self.conns)
- roota = importer.importObject(keychain)
+ importer = io.ExportImport(self.conf, self.conns)
+ roota = importer.importObject(oid)
self.assert_(root is not roota)
self.assert_(root['TestRoot'] is not roota['TestRoot'])
self.assert_(root['TestRoot2'] is not roota['TestRoot2'])
=== Products/Ape/lib/apelib/tests/testscanner.py 1.2 => 1.3 ===
--- Products/Ape/lib/apelib/tests/testscanner.py:1.2 Wed Jul 30 17:33:08 2003
+++ Products/Ape/lib/apelib/tests/testscanner.py Mon Feb 2 10:07:22 2004
@@ -19,19 +19,20 @@
import unittest
from time import time
-from apelib.zodb3.scanner import ScanControl
+from apelib.zodb3.scanner import PoolScanControl, Scanner
class FakeRepository:
- def freshen(self, d):
+ def poll(self, d):
res = {}
for source in d.keys():
repo, location = source
if repo is not self:
raise AssertionError, "repo must be self"
if str(location) != location:
- raise AssertionError, "location is expected to be a string"
+ raise AssertionError(
+ "location %s is not a string" % repr(location))
# Always report a change
res[source] = 1001
return res
@@ -41,14 +42,18 @@
repo = FakeRepository()
- def getSources(self, oid):
- return {(self.repo, oid): 10}
+ def getPollSources(self, oid):
+ return {(self.repo, str(oid)): 10}
class ScanControlTests(unittest.TestCase):
def setUp(self):
- ctl = self.ctl = ScanControl()
+ storage = self.storage = FakeStorage()
+ scanner = self.scanner = Scanner()
+ storage.scanner = scanner
+ scanner.storage = storage
+ ctl = self.ctl = PoolScanControl(storage)
self.conn1 = ctl.newConnection()
self.conn2 = ctl.newConnection()
@@ -84,32 +89,29 @@
class ScannerTests(unittest.TestCase):
def setUp(self):
- ctl = self.ctl = ScanControl()
+ storage = self.storage = FakeStorage()
+ scanner = self.scanner = Scanner()
+ storage.scanner = scanner
+ scanner.storage = storage
+ ctl = self.ctl = PoolScanControl(storage)
self.conn1 = ctl.newConnection()
self.conn2 = ctl.newConnection()
- self.scanner = ctl.scanner
self.repo = FakeRepository()
def testAddSource(self):
new_sources = {(self.repo, '5'): 0}
- self.scanner.setSources(5, new_sources)
+ self.scanner.afterLoad(5, new_sources)
self.assertEqual(len(self.scanner.future), 1)
self.assertEqual(self.scanner.future[5][0], new_sources)
- def testChangeSources(self):
+ def testNoUpdatesWhenNotInvalidating(self):
+ # Don't change current except in scan(), where invalidation
+ # messages are possible.
self.conn1.setOIDs([5])
- old_sources = {(self.repo, '5'): 0}
- self.scanner.setSources(5, old_sources)
- self.assertEqual(len(self.scanner.future), 0)
- self.assertEqual(len(self.scanner.current), 1)
- self.assertEqual(self.scanner.current[5], old_sources)
-
- new_sources = {(self.repo, '6'): 0, (self.repo, '7'): 0, }
- self.scanner.setSources(5, new_sources)
- self.assertEqual(len(self.scanner.future), 0)
- self.assertEqual(len(self.scanner.current), 1)
- self.assertEqual(self.scanner.current[5], new_sources)
+ sources = {(self.repo, '5'): 0}
+ self.scanner.afterLoad(5, sources)
+ self.assertNotEqual(self.scanner.current[5], sources)
def testRemoveOID(self):
self.conn1.setOIDs([5])
@@ -120,7 +122,7 @@
def testScan(self):
self.conn1.setOIDs([5])
new_sources = {(self.repo, '6'): 0, (self.repo, '7'): 0, }
- self.scanner.setSources(5, new_sources)
+ self.scanner.afterLoad(5, new_sources)
to_invalidate = self.scanner.scan()
self.assertEqual(len(to_invalidate), 1)
@@ -134,17 +136,15 @@
def testFindNewSources(self):
# Verify the scanner calls storage.getSources() and saves the result.
- storage = FakeStorage()
- self.scanner.setStorage(storage)
self.conn1.setOIDs([5])
- expect_sources = storage.getSources(5)
+ expect_sources = self.storage.getPollSources(5)
self.assertEqual(self.scanner.current[5], expect_sources)
def testUseCachedSources(self):
# Verify the scanner uses previously cached sources when available.
repo = FakeRepository()
sources = {(repo, '999'): -1}
- self.scanner.setSources(5, sources)
+ self.scanner.afterLoad(5, sources)
self.conn1.setOIDs([5])
self.assertEqual(self.scanner.current[5], sources)
@@ -152,10 +152,10 @@
# Verify the scanner sees sources according to transactions.
repo = FakeRepository()
sources = {(repo, '999'): -1}
- self.scanner.setSources(5, sources)
+ self.scanner.afterLoad(5, sources)
self.conn1.setOIDs([5])
sources_2 = {(repo, '999'): -2}
- self.scanner.setUncommittedSources(123, 5, sources_2)
+ self.scanner.afterStore(5, 123, sources_2)
# Uncommitted data should have no effect on sources
self.assertEqual(self.scanner.current[5], sources)
self.scanner.afterCommit(123)
@@ -170,7 +170,7 @@
# Verify the scanner ignores sources on transaction abort.
repo = FakeRepository()
sources = {(repo, '999'): -2}
- self.scanner.setUncommittedSources(123, 5, sources)
+ self.scanner.afterStore(5, 123, sources)
self.assertEqual(self.scanner.uncommitted[123][5], sources)
self.scanner.afterAbort(123)
self.assert_(not self.scanner.uncommitted.has_key(123))
=== Products/Ape/lib/apelib/tests/testserialization.py 1.5 => 1.6 ===
--- Products/Ape/lib/apelib/tests/testserialization.py:1.5 Thu Aug 14 16:20:05 2003
+++ Products/Ape/lib/apelib/tests/testserialization.py Mon Feb 2 10:07:22 2004
@@ -23,93 +23,101 @@
from apelib.core.events \
import LoadEvent, StoreEvent, SerializationEvent, DeserializationEvent
-from apelib.core.exceptions import SerializationError
-from serialtestbase import SerialTestBase
+from apelib.core.interfaces import SerializationError
+from serialtestbase import SerialTestBase, TestObject
-class SimpleInstance:
-
+class SimpleClass:
+ """Represents second-class persistent objects.
+ """
def __init__(self, data):
self.data = data
+class MockObjectDatabase:
+ """Implements only enough to satisfy testCatchExtraAttribute
+ """
+ def identify(self, obj):
+ return None
+
+
class SerializationTests(SerialTestBase, unittest.TestCase):
+ """Tests of basic events, serializers, and gateways.
- def getKeyedObjectSystem(self):
- # XXX works for now
- return None
+ No connections or object databases are provided.
+ """
def testSerializeAndDeserialize(self):
- ob = PersistentMapping()
- ob.strdata = '345'
+ ob = TestObject()
ob['a'] = 'b'
ob['c'] = 'd'
- kos = self.getKeyedObjectSystem()
- mapper = self.root_mapper.getSubMapper('test_mapper')
- event = SerializationEvent(kos, mapper, ('',), ob)
- full_state = mapper.getSerializer().serialize(ob, event)
- ob2 = PersistentMapping()
- event = DeserializationEvent(kos, mapper, ('',), ob2)
- mapper.getSerializer().deserialize(ob2, event, full_state)
+ obj_db = None
+ m = self.conf.mappers["tm"]
+ event = SerializationEvent(self.conf, m, '', obj_db, ob)
+ full_state = m.serializer.serialize(event)
+ ob2 = TestObject()
+ event = DeserializationEvent(self.conf, m, '', obj_db, ob2)
+ m.serializer.deserialize(event, full_state)
self.assertEqual(ob.strdata, ob2.strdata)
self.assertEqual(ob.data, ob2.data)
def testStoreAndLoad(self):
- ob = PersistentMapping()
+ ob = TestObject()
ob.strdata = '345'
ob['a'] = 'b'
ob['c'] = 'd'
- kos = self.getKeyedObjectSystem()
- mapper = self.root_mapper.getSubMapper('test_mapper')
- event = SerializationEvent(kos, mapper, ('',), ob)
- full_state = mapper.getSerializer().serialize(ob, event)
- event = StoreEvent(mapper, ('',), self.conns, None)
- mapper.getGateway().store(event, full_state)
-
- event = LoadEvent(mapper, ('',), self.conns, None)
- full_state, serial = mapper.getGateway().load(event)
- ob2 = PersistentMapping()
- event = DeserializationEvent(kos, mapper, ('',), ob2)
- mapper.getSerializer().deserialize(ob2, event, full_state)
+ obj_db = None
+ m = self.conf.mappers["tm"]
+ event = SerializationEvent(self.conf, m, '', obj_db, ob)
+ full_state = m.serializer.serialize(event)
+ event = StoreEvent(self.conf, m, '', self.conns, None, True)
+ m.gateway.store(event, full_state)
+
+ event = LoadEvent(self.conf, m, '', self.conns, None)
+ full_state, serial = m.gateway.load(event)
+ ob2 = TestObject()
+ event = DeserializationEvent(self.conf, m, '', obj_db, ob2)
+ m.serializer.deserialize(event, full_state)
self.assertEqual(ob.strdata, ob2.strdata)
self.assertEqual(ob.data, ob2.data)
def testCatchExtraAttribute(self):
+ # The mapper for PersistentMappings doesn't allow an
+ # extra attribute.
ob = PersistentMapping()
- ob.strdata = '345'
ob.extra = '678'
ob['a'] = 'b'
ob['c'] = 'd'
- kos = self.getKeyedObjectSystem()
- mapper = self.root_mapper.getSubMapper('test_mapper')
- event = SerializationEvent(kos, mapper, ('',), ob)
- self.assertRaises(SerializationError,
- mapper.getSerializer().serialize, ob, event)
+ obj_db = MockObjectDatabase()
+ m = self.conf.mappers["pm"]
+ event = SerializationEvent(self.conf, m, '', obj_db, ob)
+ self.assertRaises(SerializationError, m.serializer.serialize, event)
def testSharedAttribute(self):
# Test of an attribute shared between a normal serializer and
# a remainder serializer.
- ob = PersistentMapping()
- data = SimpleInstance('This is a shared piece of data')
+ ob = TestObject()
+ data = SimpleClass('This is a shared piece of data')
ob.extra = data
ob['a'] = data
- kos = self.getKeyedObjectSystem()
- mapper = self.root_mapper.getSubMapper('test_mapper_2')
- event = SerializationEvent(kos, mapper, ('',), ob)
- full_state = mapper.getSerializer().serialize(ob, event)
- event = StoreEvent(mapper, ('',), self.conns, None)
- mapper.getGateway().store(event, full_state)
+ obj_db = None
+ m = self.conf.mappers["tm"]
+ event = SerializationEvent(self.conf, m, '', obj_db, ob)
+ full_state = m.serializer.serialize(event)
+ event = StoreEvent(self.conf, m, '', self.conns, None, True)
+ m.gateway.store(event, full_state)
# Now load the state into a different object
- event = LoadEvent(mapper, ('',), self.conns, None)
- full_state, serial = mapper.getGateway().load(event)
- ob2 = PersistentMapping()
- event = DeserializationEvent(kos, mapper, ('',), ob2)
- mapper.getSerializer().deserialize(ob2, event, full_state)
+ event = LoadEvent(self.conf, m, '', self.conns, None)
+ full_state, serial = m.gateway.load(event)
+ ob2 = TestObject()
+ event = DeserializationEvent(self.conf, m, '', obj_db, ob2)
+ m.serializer.deserialize(event, full_state)
self.assertEqual(ob.extra.data, ob2.extra.data)
self.assertEqual(ob.keys(), ob2.keys())
- # Check that both see the *same* object
+ # Check that both ways to access the SimpleClass instance
+ # result in the same object.
self.assert_(ob2['a'] is ob2.extra, (ob2['a'], ob2.extra))
self.assert_(ob2['a'] is not data) # Verify it didn't cheat somehow
=== Products/Ape/lib/apelib/tests/testsqlimpl.py 1.1 => 1.2 ===
--- Products/Ape/lib/apelib/tests/testsqlimpl.py:1.1 Wed Apr 9 23:09:57 2003
+++ Products/Ape/lib/apelib/tests/testsqlimpl.py Mon Feb 2 10:07:22 2004
@@ -25,6 +25,7 @@
def testSQLImplementations(self):
import apelib.sql
+ import apelib.sql.oidgen
self._testAllInPackage(apelib.sql)
if __name__ == '__main__':
=== Products/Ape/lib/apelib/tests/teststorage.py 1.5 => 1.6 ===
--- Products/Ape/lib/apelib/tests/teststorage.py:1.5 Wed Jul 30 17:33:08 2003
+++ Products/Ape/lib/apelib/tests/teststorage.py Mon Feb 2 10:07:22 2004
@@ -26,7 +26,8 @@
from apelib.zodb3.storage import ApeStorage
from apelib.zodb3.resource import StaticResource
from apelib.zodb3.utils import copyOf
-from serialtestbase import SerialTestBase
+from apelib.core.interfaces import OIDConflictError
+from serialtestbase import SerialTestBase, TestObject
def run_in_thread(f):
@@ -48,7 +49,7 @@
def setUp(self):
SerialTestBase.setUp(self)
- resource = StaticResource(self.root_mapper)
+ resource = StaticResource(self.conf)
storage = ApeStorage(resource, self.conns)
self.storage = storage
db = ApeDB(storage, resource)
@@ -60,13 +61,11 @@
SerialTestBase.tearDown(self)
def testStoreAndLoad(self):
- ob = PersistentMapping()
+ ob = TestObject()
ob.strdata = '345'
ob['a'] = 'b'
ob['c'] = 'd'
- dummy = PersistentMapping()
-
conn1 = self.db.open()
conn2 = None
conn3 = None
@@ -76,16 +75,15 @@
root = conn1.root()
get_transaction().begin()
root['TestRoot'] = ob
- root['TestRoot2'] = dummy
get_transaction().commit()
- ob1 = conn1.getObject(('test',))
+ ob1 = conn1.root()['TestRoot']
self.assertEqual(ob1.strdata, ob.strdata)
self.assertEqual(ob1.items(), ob.items())
# Verify a new object was stored and make a change
get_transaction().begin()
conn2 = self.db.open()
- ob2 = conn2.getObject(('test',))
+ ob2 = conn2.root()['TestRoot']
self.assertEqual(ob2.strdata, ob.strdata)
self.assertEqual(ob2.items(), ob.items())
ob2.strdata = '678'
@@ -93,7 +91,7 @@
# Verify the change was stored and make another change
conn3 = self.db.open()
- ob3 = conn3.getObject(('test',))
+ ob3 = conn3.root()['TestRoot']
self.assertEqual(ob3.strdata, '678')
self.assertEqual(ob3.items(), ob.items())
ob3.strdata = '901'
@@ -101,7 +99,7 @@
conn3.close()
conn3 = None
conn3 = self.db.open()
- ob3 = conn3.getObject(('test',))
+ ob3 = conn3.root()['TestRoot']
self.assertEqual(ob3.strdata, '901')
# Verify we didn't accidentally change the original object
@@ -116,14 +114,11 @@
def testUnmanaged(self):
- ob = PersistentMapping()
+ ob = TestObject()
ob['a'] = 'b'
ob.stowaway = PersistentMapping()
ob.stowaway['c'] = 'd'
- dummy = PersistentMapping()
- dummy.strdata = 'foo'
-
conn1 = self.db.open()
conn2 = None
conn3 = None
@@ -132,10 +127,9 @@
# Load the root and create a new object
root = conn1.root()
get_transaction().begin()
- root['TestRoot'] = dummy
root['TestRoot2'] = ob
get_transaction().commit()
- ob1 = conn1.getObject(('test2',))
+ ob1 = conn1.root()['TestRoot2']
self.assert_(ob1 is ob)
self.assertEqual(ob1.items(), [('a', 'b')])
self.assertEqual(ob1.stowaway.items(), [('c', 'd')])
@@ -143,7 +137,7 @@
# Verify a new object was stored
get_transaction().begin()
conn2 = self.db.open()
- ob2 = conn2.getObject(('test2',))
+ ob2 = conn2.root()['TestRoot2']
self.assertEqual(ob2.items(), [('a', 'b')])
self.assertEqual(ob2.stowaway.items(), [('c', 'd')])
@@ -155,7 +149,7 @@
# Verify the change was stored and make a change to the
# managed persistent object.
conn3 = self.db.open()
- ob3 = conn3.getObject(('test2',))
+ ob3 = conn3.root()['TestRoot2']
self.assertEqual(ob3.items(), [('a', 'b')])
self.assertEqual(ob3.stowaway.items(), [('c', 'e')])
ob3['a'] = 'z'
@@ -163,7 +157,7 @@
conn3.close()
conn3 = None
conn3 = self.db.open()
- ob3 = conn3.getObject(('test2',))
+ ob3 = conn3.root()['TestRoot2']
self.assertEqual(ob3['a'], 'z')
self.assertEqual(ob3.stowaway.items(), [('c', 'e')])
@@ -184,52 +178,47 @@
def testStoreAndLoadBinary(self):
- ob = PersistentMapping()
+ ob = TestObject()
# strdata contains binary characters
ob.strdata = ''.join([chr(n) for n in range(256)]) * 2
- dummy = PersistentMapping()
-
conn1 = self.db.open()
try:
root = conn1.root()
get_transaction().begin()
root['TestRoot'] = ob
- root['TestRoot2'] = dummy
get_transaction().commit()
- ob1 = conn1.getObject(('test',))
+ ob1 = conn1.root()['TestRoot']
self.assertEqual(ob1.strdata, ob.strdata)
self.assertEqual(ob1.items(), ob.items())
finally:
conn1.close()
- def _writeBasicObjects(self, conn):
- ob = PersistentMapping()
+ def _writeBasicObject(self, conn):
+ ob = TestObject()
ob.strdata = 'abc'
- dummy = PersistentMapping()
root = conn.root()
get_transaction().begin()
root['TestRoot'] = ob
- root['TestRoot2'] = dummy
get_transaction().commit()
- return ob, dummy
+ return ob
def _changeTestRoot(self):
- conn2 = self.db.open()
+ conn = self.db.open()
try:
- ob2 = conn2.root()['TestRoot']
- ob2.strdata = 'ghi'
+ ob = conn.root()['TestRoot']
+ ob.strdata = 'ghi'
get_transaction().commit()
finally:
- conn2.close()
+ conn.close()
def testConflictDetection(self):
conn1 = self.db.open()
try:
- ob1, dummy = self._writeBasicObjects(conn1)
+ ob1 = self._writeBasicObject(conn1)
ob1.strdata = 'def'
run_in_thread(self._changeTestRoot)
# Verify that "def" doesn't get written, since it
@@ -245,11 +234,10 @@
# Verify a new object won't overwrite existing objects by accident
conn1 = self.db.open()
try:
- ob1, dummy = self._writeBasicObjects(conn1)
+ ob1 = self._writeBasicObject(conn1)
ob1.strdata = 'def'
conn1.setSerial(ob1, '\0' * 8) # Pretend that it's new
- self.assertRaises(ZODB.POSException.ConflictError,
- get_transaction().commit)
+ self.assertRaises(OIDConflictError, get_transaction().commit)
finally:
conn1.close()
@@ -257,17 +245,13 @@
def testRemainderCyclicReferenceRestoration(self):
# test whether the remainder pickler properly stores cyclic references
# back to the object itself.
- ob1 = PersistentMapping()
+ ob1 = TestObject()
ob1.myself = ob1
- dummy = PersistentMapping()
- dummy.strdata = ''
-
conn1 = self.db.open()
try:
root = conn1.root()
get_transaction().begin()
- root['TestRoot'] = dummy
root['TestRoot2'] = ob1
get_transaction().commit()
@@ -322,9 +306,8 @@
# use _p_serial for hashes.
conn1 = self.db.open()
try:
- ob1, dummy = self._writeBasicObjects(conn1)
+ ob1 = self._writeBasicObject(conn1)
self.assertEqual(ob1._p_serial, "\0" * 8)
- self.assertEqual(dummy._p_serial, "\0" * 8)
finally:
conn1.close()
@@ -333,9 +316,9 @@
# Verifies the behavior of getSerial().
conn1 = self.db.open()
try:
- new_ob = PersistentMapping()
+ new_ob = TestObject()
self.assertEqual(conn1.getSerial(new_ob), '\0' * 8)
- ob1, dummy = self._writeBasicObjects(conn1)
+ ob1 = self._writeBasicObject(conn1)
self.assertNotEqual(conn1.getSerial(ob1), '\0' * 8)
finally:
conn1.close()
@@ -345,8 +328,8 @@
# Verifies the behavior of getSerial() and setSerial().
conn1 = self.db.open()
try:
- ob1, dummy = self._writeBasicObjects(conn1)
- self.assertNotEqual(conn1.getSerial(ob1), '\0' * 8)
+ ob = self._writeBasicObject(conn1)
+ self.assertNotEqual(conn1.getSerial(ob), '\0' * 8)
# Replace the object and verify it gets a new serial.
ob1 = PersistentMapping()
ob1.strdata = 'cba'
@@ -361,8 +344,8 @@
# Verify that setSerial() cleans up.
conn1 = self.db.open()
try:
- conn1.SERIAL_CLEANUP_THRESHOLD = 10
- for n in range(conn1.SERIAL_CLEANUP_THRESHOLD + 1):
+ conn1.serial_cleanup_threshold = 10
+ for n in range(conn1.serial_cleanup_threshold + 1):
new_ob = PersistentMapping()
new_ob._p_oid = 'fake_oid_' + str(n)
old_size = len(conn1._serials or ())
@@ -377,12 +360,13 @@
conn1.close()
- def testGetSources(self):
- sources = self.storage.getSources('\0' * 8)
- self.assertEqual(sources, {})
+ def testGetPollSources(self):
+ root_oid = self.conf.oid_gen.root_oid
+ sources = self.storage.getPollSources(root_oid)
+ self.assert_(not sources)
# The test passed, but check for a false positive.
- oid = self.storage._oid_encoder.encode(('nonexistent-oid',))
- self.assertRaises(KeyError, self.storage.getSources, oid)
+ oid = 'nonexistent-oid'
+ self.assertRaises(KeyError, self.storage.getPollSources, oid)
if __name__ == '__main__':
=== Products/Ape/lib/apelib/tests/testzope2fs.py 1.4 => 1.5 ===
--- Products/Ape/lib/apelib/tests/testzope2fs.py:1.4 Wed Jul 9 11:40:08 2003
+++ Products/Ape/lib/apelib/tests/testzope2fs.py Mon Feb 2 10:07:22 2004
@@ -23,16 +23,17 @@
from tempfile import mktemp
from cStringIO import StringIO
-from ZODB.POSException import ConflictError
+from OFS.Application import Application
from OFS.Image import File, manage_addImage, manage_addFile
from Products.PythonScripts.PythonScript import PythonScript
from Products.PageTemplates.ZopePageTemplate import ZopePageTemplate
+from apelib.core.interfaces import OIDConflictError
from apelib.zodb3.db import ApeDB
from apelib.zodb3.storage import ApeStorage
from apelib.zodb3.resource import StaticResource
-from apelib.zope2.mapper import createMapper
-from apelib.fs.exceptions import FSWriteError
+from apelib.zope2.mapper import loadConf
+from apelib.fs.interfaces import FSWriteError
from apelib.fs.connection import FSConnection
from zope2testbase import Zope2TestBase, Folder
@@ -44,7 +45,7 @@
tmpdir = mktemp()
-root_mapper = createMapper('filesystem')
+conf = None
class Zope2FSTests (unittest.TestCase, Zope2TestBase):
@@ -54,19 +55,23 @@
return {'fs': conn}
def setUp(self):
+ global conf
+ if conf is None:
+ conf = loadConf('filesystem')
if not os.path.exists(tmpdir):
os.mkdir(tmpdir)
self.path = tmpdir
- self.root_mapper = root_mapper
+ self.conf = conf
conns = self._createConnections(tmpdir)
assert len(conns) == 1
self.conn = conns['fs']
- resource = StaticResource(self.root_mapper)
+ resource = StaticResource(self.conf)
storage = ApeStorage(resource, conns)
self.storage = storage
db = ApeDB(storage, resource)
self.db = db
get_transaction().begin()
+ self.conn.afs.clearCache()
def tearDown(self):
get_transaction().abort()
@@ -94,16 +99,14 @@
get_transaction().commit()
for folder in (f, f2, f3):
- keychain = conn.db()._oid_encoder.decode(folder._p_oid)
- key = keychain[-1]
- text = self.conn.readSection(key, 'classification')
+ text = self.conn.readAnnotation(folder._p_oid, 'classification')
self.assert_(text.find('class_name=OFS.Folder.Folder') >= 0)
finally:
conn.close()
def testMismatchedIdDetection(self):
- # FSAutoID should detect when the keychain and ID don't match.
+ # FSAutoID should detect when the OID and ID don't match.
# Normally, the only time they don't match is when an object
# has been moved.
conn = self.db.open()
@@ -153,7 +156,7 @@
app._setObject(template.id, template, set_owner=0)
get_transaction().commit()
- dir = self.conn._expandPath('/')
+ dir = self.conn.getPath('/')
names = os.listdir(dir)
self.assert_('template.html' in names, names)
self.assert_('template' not in names, names)
@@ -185,7 +188,7 @@
self.assert_(not hasattr(f, 'script%d.py' % n))
# white box test: verify the scripts were actually stored
# with .py extensions.
- dir = self.conn._expandPath('/folder')
+ dir = self.conn.getPath('/folder')
names = os.listdir(dir)
for n in range(3):
self.assert_(('script%d.py' % n) in names, names)
@@ -219,7 +222,7 @@
self.assert_(not hasattr(f, 'script%d' % n))
# white box test: verify the scripts were actually stored
# with .py extensions.
- dir = self.conn._expandPath('/folder')
+ dir = self.conn.getPath('/folder')
names = os.listdir(dir)
for n in range(3):
self.assert_(('script%d.py' % n) in names, names)
@@ -245,7 +248,7 @@
f._setObject(script.id, script, set_owner=0)
get_transaction().commit()
- dir = self.conn._expandPath('/folder')
+ dir = self.conn.getPath('/folder')
names = os.listdir(dir)
self.assert_(('script0.py') in names, names)
self.assert_(('script0') not in names, names)
@@ -254,7 +257,7 @@
script = PythonScript('script0.py')
script.write('##title=test script\nreturn "Hello, world!"')
f._setObject(script.id, script, set_owner=0)
- self.assertRaises(ConflictError, get_transaction().commit)
+ self.assertRaises(OIDConflictError, get_transaction().commit)
finally:
conn.close()
@@ -279,7 +282,7 @@
f._setObject(script.id, script, set_owner=0)
get_transaction().commit()
- dir = self.conn._expandPath('/folder')
+ dir = self.conn.getPath('/folder')
names = os.listdir(dir)
self.assert_(('script0.py') in names, names)
self.assert_(('script0') in names, names)
@@ -353,7 +356,7 @@
f._setObject(script.id, script, set_owner=0)
get_transaction().commit()
- dir = self.conn._expandPath('/folder')
+ dir = self.conn.getPath('/folder')
names = os.listdir(dir)
self.assert_(('script0.py') in names, names)
self.assert_(('script0.dtml') in names, names)
@@ -424,7 +427,7 @@
finally:
conn2.close()
- dir = self.conn._expandPath('/')
+ dir = self.conn.getPath('/')
names = os.listdir(dir)
self.assert_(('image.png') in names, names)
self.assert_(('image') not in names, names)
@@ -447,7 +450,7 @@
content_type='application/octet-stream')
get_transaction().commit()
- dir = self.conn._expandPath('/')
+ dir = self.conn.getPath('/')
names = os.listdir(dir)
self.assert_(('hello.txt') in names, names)
self.assert_(('world.dat') in names, names)
@@ -461,7 +464,7 @@
# Verify Zope chooses the right object type for
# a new object.
# White box test.
- dir = self.conn._expandPath('/')
+ dir = self.conn.getPath('/')
f = open(os.path.join(dir, 'test.py'), 'wt')
f.write('return "Ok!"')
f.close()
@@ -479,12 +482,12 @@
# Verify that even though the extension gets stripped off
# in Zope, Zope still sees the object as it should.
# White box test.
- dir = self.conn._expandPath('/')
+ dir = self.conn.getPath('/')
f = open(os.path.join(dir, 'test.py'), 'wt')
f.write('return "Ok!"')
f.close()
-
- f = open(os.path.join(dir, self.conn.metadata_prefix
+
+ f = open(os.path.join(dir, self.conn.afs.annotation_prefix
+ 'properties'), 'wt')
f.write('[object_names]\ntest\n')
f.close()
@@ -502,7 +505,7 @@
# Verify Zope uses a File object for unrecognized files on
# the filesystem. White box test.
data = 'data goes here'
- dir = self.conn._expandPath('/')
+ dir = self.conn.getPath('/')
f = open(os.path.join(dir, 'test'), 'wt')
f.write(data)
f.close()
@@ -519,7 +522,7 @@
def testDefaultPropertySchema(self):
# Verify Zope uses the default property schema when no properties
# are set.
- dir = self.conn._expandPath('/')
+ dir = self.conn.getPath('/')
os.mkdir(os.path.join(dir, 'test'))
conn = self.db.open()
try:
@@ -555,8 +558,8 @@
conn2.close()
# Verify the stowaway is in the remainder file.
- dir = self.conn._expandPath('/')
- f = open(os.path.join(dir, self.conn.metadata_prefix
+ dir = self.conn.getPath('/')
+ f = open(os.path.join(dir, self.conn.afs.annotation_prefix
+ 'remainder'), 'rb')
data = f.read()
f.close()
@@ -585,7 +588,7 @@
def testGuessFileContentType(self):
# Verify that file content type guessing happens.
data = '<html><body>Cool stuff</body></html>'
- dir = self.conn._expandPath('/')
+ dir = self.conn.getPath('/')
f = open(os.path.join(dir, 'testobj'), 'wt')
f.write(data)
f.close()
@@ -599,11 +602,37 @@
conn.close()
+ def testWriteToRoot(self):
+ # Verify it's possible to write to the _root object as well as
+ # the Application object without either one stomping on each
+ # other's data. This is a functional test of
+ # apelib.fs.structure.RootDirectoryItems.
+ conn = self.db.open()
+ conn2 = None
+ try:
+ root = conn.root()
+ app = root['Application']
+ root['foo'] = Folder()
+ root['foo'].id = 'foo'
+ app.bar = Folder('bar')
+ app.bar.id = 'bar'
+ get_transaction().commit()
+
+ conn2 = self.db.open()
+ root = conn2.root()
+ app = root['Application']
+ self.assert_(root.has_key('foo'))
+ self.assert_(hasattr(app, 'bar'))
+ finally:
+ conn.close()
+ if conn2 is not None:
+ conn2.close()
+
class Zope2FSUnderscoreTests (Zope2FSTests):
def _createConnections(self, path):
- conn = FSConnection(path, metadata_prefix='_')
+ conn = FSConnection(path, annotation_prefix='_')
return {'fs': conn}
=== Products/Ape/lib/apelib/tests/testzope2sql.py 1.6 => 1.7 ===
--- Products/Ape/lib/apelib/tests/testzope2sql.py:1.6 Wed Jul 9 11:40:08 2003
+++ Products/Ape/lib/apelib/tests/testzope2sql.py Mon Feb 2 10:07:22 2004
@@ -23,11 +23,11 @@
from apelib.zodb3.db import ApeDB
from apelib.zodb3.storage import ApeStorage
from apelib.zodb3.resource import StaticResource
-from apelib.zope2.mapper import createMapper
+from apelib.zope2.mapper import loadConf
from zope2testbase import Zope2TestBase
-root_mapper = createMapper('sql')
+conf = None
class Zope2SQLTests (Zope2TestBase):
@@ -40,13 +40,24 @@
self.dbapi_kwparams, prefix='test_temp')
def setUp(self):
+ global conf
+ if conf is None:
+ conf = loadConf('sql')
conn = self.getConnector()
- self.root_mapper = root_mapper
- resource = StaticResource(self.root_mapper)
+ self.conf = conf
+ resource = StaticResource(self.conf)
self.conns = {'db': conn}
storage = ApeStorage(resource, self.conns, clear_all=1)
self.storage = storage
self.db = ApeDB(storage, resource)
+ c = self.db.open()
+ try:
+ if not c.root().has_key('Application'):
+ from OFS.Application import Application
+ c.root()['Application'] = Application()
+ get_transaction().commit()
+ finally:
+ c.close()
def clear(self):
self.storage.initDatabases(clear_all=1)
=== Products/Ape/lib/apelib/tests/zope2testbase.py 1.5 => 1.6 ===
--- Products/Ape/lib/apelib/tests/zope2testbase.py:1.5 Thu Aug 14 16:21:39 2003
+++ Products/Ape/lib/apelib/tests/zope2testbase.py Mon Feb 2 10:07:22 2004
@@ -11,7 +11,7 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""Test of storing folders on the filesystem via ZODB
+"""Test of storing various kinds of objects
$Id$
"""
@@ -34,6 +34,7 @@
from Products.PageTemplates.ZopePageTemplate import ZopePageTemplate
from apelib.zope2.setup.patches import applySetObPatch
+from apelib.core.interfaces import OIDConflictError
class TestFolder(Folder):
@@ -218,7 +219,7 @@
self.assertEqual(v, 3.14)
elif k == 'stuff':
got += 1
- self.assertEqual(v, ['a', 'bc', 'd'])
+ self.assertEqual(tuple(v), ('a', 'bc', 'd'))
self.assertEqual(got, 3)
finally:
conn2.close()
@@ -252,7 +253,7 @@
for k, v in app.Holidays.propertyItems():
if k == 'greek':
got += 1
- self.assertEqual(v, ['alpha', 'omega'])
+ self.assertEqual(tuple(v), ('alpha', 'omega'))
if k == 'hebrew':
got += 1
self.assertEqual(v, 'alpha')
@@ -323,6 +324,8 @@
conn = self.db.open()
try:
app = conn.root()['Application']
+ if hasattr(app, 'acl_users'):
+ app._delObject('acl_users')
f = UserFolder()
f.id = 'acl_users'
app._setObject(f.id, f, set_owner=0)
@@ -368,8 +371,7 @@
app = conn.root()['Application']
app.some_attr = 'stuff'
conn.setSerial(app, '\0' * 8) # Pretend that it's new
- self.assertRaises(POSException.ConflictError,
- get_transaction().commit)
+ self.assertRaises(OIDConflictError, get_transaction().commit)
finally:
conn.close()
More information about the Zope-CVS
mailing list