[Zodb-checkins] CVS: Zope3/lib/python/ZODB/tests - RecoveryStorage.py:1.1 ReadOnlyStorage.py:1.6 StorageTestBase.py:1.19 testFileStorage.py:1.20
Jeremy Hylton
jeremy@zope.com
Mon, 25 Nov 2002 14:54:52 -0500
Update of /cvs-repository/Zope3/lib/python/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv7770/ZODB/tests
Modified Files:
ReadOnlyStorage.py StorageTestBase.py testFileStorage.py
Added Files:
RecoveryStorage.py
Log Message:
First step in ZODB3 / ZODB4 integration.
Merge FileStorage and BaseStorage changes and all the other dependent
stuff.
Make copyTransactionsFrom() always use restore().
Replace all uses of U64 with u64.
Use newTimeStamp() from ZODB.TimeStamp.
Remove __len__() and getSize().
Add getExtensionMethods().
=== Added File Zope3/lib/python/ZODB/tests/RecoveryStorage.py ===
"""More recovery and iterator tests."""
from ZODB.ZTransaction import Transaction
from ZODB.tests.IteratorStorage import IteratorDeepCompare
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle
class RecoveryStorage(IteratorDeepCompare):
# Requires a setUp() that creates a self._dst destination storage
def checkSimpleRecovery(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=11)
revid = self._dostore(oid, revid=revid, data=12)
revid = self._dostore(oid, revid=revid, data=13)
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def checkRecoveryAcrossVersions(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=21)
revid = self._dostore(oid, revid=revid, data=22)
revid = self._dostore(oid, revid=revid, data=23, version='one')
revid = self._dostore(oid, revid=revid, data=34, version='one')
# Now commit the version
t = Transaction()
self._storage.tpc_begin(t)
self._storage.commitVersion('one', '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def checkRecoverAbortVersion(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=21, version="one")
revid = self._dostore(oid, revid=revid, data=23, version='one')
revid = self._dostore(oid, revid=revid, data=34, version='one')
# Now abort the version and the creation
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.abortVersion('one', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(oids, [oid])
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
# Also make sure the the last transaction has a data record
# with None for its data attribute, because we've undone the
# object.
for s in self._storage, self._dst:
iter = s.iterator()
for trans in iter:
pass # iterate until we get the last one
data = trans[0]
self.assertRaises(IndexError, lambda i, t=trans: t[i], 1)
self.assertEqual(data.oid, oid)
self.assertEqual(data.data, None)
def checkRecoverUndoInVersion(self):
oid = self._storage.new_oid()
version = "aVersion"
revid_a = self._dostore(oid, data=MinPO(91))
revid_b = self._dostore(oid, revid=revid_a, version=version,
data=MinPO(92))
revid_c = self._dostore(oid, revid=revid_b, version=version,
data=MinPO(93))
self._undo(self._storage.undoInfo()[0]['id'], oid)
self._commitVersion(version, '')
self._undo(self._storage.undoInfo()[0]['id'], oid)
# now copy the records to a new storage
self._dst.copyTransactionsFrom(self._storage)
self._abortVersion(version)
self.assert_(self._storage.versionEmpty(version))
self._undo(self._storage.undoInfo()[0]['id'], oid)
self.assert_(not self._storage.versionEmpty(version))
# check the data is what we expect it to be
data, revid = self._storage.load(oid, version)
self.assertEqual(zodb_unpickle(data), MinPO(92))
data, revid = self._storage.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO(91))
# and swap the storages
tmp = self._storage
self._storage = self._dst
self._abortVersion(version)
self.assert_(self._storage.versionEmpty(version))
self._undo(self._storage.undoInfo()[0]['id'], oid)
self.assert_(not self._storage.versionEmpty(version))
# check the data is what we expect it to be
data, revid = self._storage.load(oid, version)
self.assertEqual(zodb_unpickle(data), MinPO(92))
data, revid = self._storage.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO(91))
# Now remove _dst and copy all the transactions a second time.
# This time we will be able to confirm via compare().
self._storage = tmp
self._dst.close()
self._dst = self.new_dest()
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
=== Zope3/lib/python/ZODB/tests/ReadOnlyStorage.py 1.5 => 1.6 ===
--- Zope3/lib/python/ZODB/tests/ReadOnlyStorage.py:1.5 Wed Jul 24 19:12:46 2002
+++ Zope3/lib/python/ZODB/tests/ReadOnlyStorage.py Mon Nov 25 14:54:50 2002
@@ -2,14 +2,14 @@
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
-#
+#
##############################################################################
from ZODB.POSException import ReadOnlyError
from ZODB.ZTransaction import Transaction
@@ -42,31 +42,17 @@
def checkWriteMethods(self):
self._make_readonly()
+ t = Transaction()
self.assertRaises(ReadOnlyError, self._storage.new_oid)
+ self.assertRaises(ReadOnlyError, self._storage.tpc_begin, t)
- t = Transaction()
- self._storage.tpc_begin(t)
self.assertRaises(ReadOnlyError, self._storage.abortVersion,
'', t)
- self._storage.tpc_abort(t)
-
- t = Transaction()
- self._storage.tpc_begin(t)
self.assertRaises(ReadOnlyError, self._storage.commitVersion,
'', '', t)
- self._storage.tpc_abort(t)
-
- t = Transaction()
- self._storage.tpc_begin(t)
self.assertRaises(ReadOnlyError, self._storage.store,
'\000' * 8, None, '', '', t)
- self._storage.tpc_abort(t)
if self._storage.supportsTransactionalUndo():
- t = Transaction()
- self._storage.tpc_begin(t)
self.assertRaises(ReadOnlyError, self._storage.transactionalUndo,
'\000' * 8, t)
- self._storage.tpc_abort(t)
-
-
=== Zope3/lib/python/ZODB/tests/StorageTestBase.py 1.18 => 1.19 ===
--- Zope3/lib/python/ZODB/tests/StorageTestBase.py:1.18 Thu Sep 19 15:14:06 2002
+++ Zope3/lib/python/ZODB/tests/StorageTestBase.py Mon Nov 25 14:54:50 2002
@@ -2,14 +2,14 @@
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
-#
+#
##############################################################################
"""Provide a mixin base class for storage tests.
@@ -19,6 +19,8 @@
single object revision.
"""
+import errno
+import os
import pickle
import string
import sys
@@ -39,9 +41,8 @@
f = StringIO()
p = Pickler(f, 1)
klass = obj.__class__
- mod = getattr(klass, '__module__', None)
+ mod = getattr(klass, "__module__", None)
state = obj.__getstate__()
-
# XXX
p.dump((mod, klass.__name__, None))
p.dump(state)
@@ -83,19 +84,28 @@
A helper for function _handle_all_serials().
"""
- args = (oid,) + args
- return apply(handle_all_serials, args)[oid]
+ return handle_all_serials(oid, *args)[oid]
def import_helper(name):
mod = __import__(name)
return sys.modules[name]
-
+def removefs(base):
+ """Remove all files created by FileStorage with path base."""
+ for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
+ path = base + ext
+ try:
+ os.remove(path)
+ except os.error, err:
+ if err[0] != errno.ENOENT:
+ raise
+
+
class StorageTestBase(unittest.TestCase):
# XXX It would be simpler if concrete tests didn't need to extend
# setUp() and tearDown().
-
+
def setUp(self):
# You need to override this with a setUp that creates self._storage
self._storage = None
@@ -112,12 +122,12 @@
def _dostore(self, oid=None, revid=None, data=None, version=None,
already_pickled=0, user=None, description=None):
"""Do a complete storage transaction. The defaults are:
-
+
- oid=None, ask the storage for a new oid
- revid=None, use a revid of ZERO
- data=None, pickle up some arbitrary data (the integer 7)
- version=None, use the empty string version
-
+
Returns the object's new revision id.
"""
if oid is None:
@@ -150,7 +160,39 @@
self._storage.tpc_abort(t)
raise
return revid
-
+
def _dostoreNP(self, oid=None, revid=None, data=None, version=None,
user=None, description=None):
return self._dostore(oid, revid, data, version, already_pickled=1)
+ # The following methods depend on optional storage features.
+
+ def _undo(self, tid, oid):
+ # Undo a tid that affects a single object (oid).
+ # XXX This is very specialized
+ t = Transaction()
+ t.note("undo")
+ self._storage.tpc_begin(t)
+ oids = self._storage.transactionalUndo(tid, t)
+ self._storage.tpc_vote(t)
+ self._storage.tpc_finish(t)
+ self.assertEqual(len(oids), 1)
+ self.assertEqual(oids[0], oid)
+ return self._storage.lastTransaction()
+
+ def _commitVersion(self, src, dst):
+ t = Transaction()
+ t.note("commit %r to %r" % (src, dst))
+ self._storage.tpc_begin(t)
+ oids = self._storage.commitVersion(src, dst, t)
+ self._storage.tpc_vote(t)
+ self._storage.tpc_finish(t)
+ return oids
+
+ def _abortVersion(self, ver):
+ t = Transaction()
+ t.note("abort %r" % ver)
+ self._storage.tpc_begin(t)
+ oids = self._storage.abortVersion(ver, t)
+ self._storage.tpc_vote(t)
+ self._storage.tpc_finish(t)
+ return oids
=== Zope3/lib/python/ZODB/tests/testFileStorage.py 1.19 => 1.20 ===
--- Zope3/lib/python/ZODB/tests/testFileStorage.py:1.19 Wed Jul 24 19:12:46 2002
+++ Zope3/lib/python/ZODB/tests/testFileStorage.py Mon Nov 25 14:54:50 2002
@@ -2,28 +2,28 @@
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
-#
+#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
-#
+#
##############################################################################
-from __future__ import nested_scopes
-
import ZODB.FileStorage
import sys, os, unittest
import errno
from ZODB.ZTransaction import Transaction
+from ZODB import POSException
from ZODB.tests import StorageTestBase, BasicStorage, \
TransactionalUndoStorage, VersionStorage, \
TransactionalUndoVersionStorage, PackableStorage, \
Synchronization, ConflictResolution, HistoryStorage, \
IteratorStorage, Corruption, RevisionStorage, PersistentStorage, \
- MTStorage, ReadOnlyStorage
+ MTStorage, ReadOnlyStorage, RecoveryStorage
+from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle
class FileStorageTests(
StorageTestBase.StorageTestBase,
@@ -44,92 +44,108 @@
):
def open(self, **kwargs):
- if kwargs:
- self._storage = apply(ZODB.FileStorage.FileStorage,
- ('FileStorageTests.fs',), kwargs)
- else:
- self._storage = ZODB.FileStorage.FileStorage(
- 'FileStorageTests.fs', **kwargs)
+ self._storage = ZODB.FileStorage.FileStorage('FileStorageTests.fs',
+ **kwargs)
def setUp(self):
self.open(create=1)
def tearDown(self):
self._storage.close()
- for ext in '', '.old', '.tmp', '.lock', '.index':
- path = 'FileStorageTests.fs' + ext
- if os.path.exists(path):
- os.remove(path)
+ StorageTestBase.removefs("FileStorageTests.fs")
+
+ def checkLongMetadata(self):
+ s = "X" * 75000
+ try:
+ self._dostore(user=s)
+ except POSException.StorageError:
+ pass
+ else:
+ self.fail("expect long user field to raise error")
+ try:
+ self._dostore(description=s)
+ except POSException.StorageError:
+ pass
+ else:
+ self.fail("expect long user field to raise error")
class FileStorageRecoveryTest(
StorageTestBase.StorageTestBase,
- IteratorStorage.IteratorDeepCompare,
+ RecoveryStorage.RecoveryStorage,
):
def setUp(self):
+ StorageTestBase.removefs("Source.fs")
+ StorageTestBase.removefs("Dest.fs")
self._storage = ZODB.FileStorage.FileStorage('Source.fs')
self._dst = ZODB.FileStorage.FileStorage('Dest.fs')
def tearDown(self):
self._storage.close()
self._dst.close()
- for ext in '', '.old', '.tmp', '.lock', '.index':
- for fs in 'Source', 'Dest':
- path = fs + '.fs' + ext
- try:
- os.remove(path)
- except OSError, e:
- if e.errno <> errno.ENOENT: raise
+ StorageTestBase.removefs("Source.fs")
+ StorageTestBase.removefs("Dest.fs")
- def checkSimpleRecovery(self):
- oid = self._storage.new_oid()
- revid = self._dostore(oid, data=11)
- revid = self._dostore(oid, revid=revid, data=12)
- revid = self._dostore(oid, revid=revid, data=13)
- self._dst.copyTransactionsFrom(self._storage)
- self.compare(self._storage, self._dst)
+ def new_dest(self):
+ StorageTestBase.removefs('Dest.fs')
+ return ZODB.FileStorage.FileStorage('Dest.fs')
- def checkRecoveryAcrossVersions(self):
+ def checkRecoverUndoInVersion(self):
oid = self._storage.new_oid()
- revid = self._dostore(oid, data=21)
- revid = self._dostore(oid, revid=revid, data=22)
- revid = self._dostore(oid, revid=revid, data=23, version='one')
- revid = self._dostore(oid, revid=revid, data=34, version='one')
- # Now commit the version
- t = Transaction()
- self._storage.tpc_begin(t)
- self._storage.commitVersion('one', '', t)
- self._storage.tpc_vote(t)
- self._storage.tpc_finish(t)
+ version = "aVersion"
+ revid_a = self._dostore(oid, data=MinPO(91))
+ revid_b = self._dostore(oid, revid=revid_a, version=version,
+ data=MinPO(92))
+ revid_c = self._dostore(oid, revid=revid_b, version=version,
+ data=MinPO(93))
+ self._undo(self._storage.undoInfo()[0]['id'], oid)
+ self._commitVersion(version, '')
+ self._undo(self._storage.undoInfo()[0]['id'], oid)
+
+ # now copy the records to a new storage
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
- def checkRecoverAbortVersion(self):
- oid = self._storage.new_oid()
- revid = self._dostore(oid, data=21, version="one")
- revid = self._dostore(oid, revid=revid, data=23, version='one')
- revid = self._dostore(oid, revid=revid, data=34, version='one')
- # Now abort the version and the creation
- t = Transaction()
- self._storage.tpc_begin(t)
- oids = self._storage.abortVersion('one', t)
- self._storage.tpc_vote(t)
- self._storage.tpc_finish(t)
- self.assertEqual(oids, [oid])
+ # The last two transactions were applied directly rather than
+ # copied. So we can't use compare() to verify that they new
+ # transactions are applied correctly. (The new transactions
+ # will have different timestamps for each storage.)
+
+ self._abortVersion(version)
+ self.assert_(self._storage.versionEmpty(version))
+ self._undo(self._storage.undoInfo()[0]['id'], oid)
+ self.assert_(not self._storage.versionEmpty(version))
+
+ # check the data is what we expect it to be
+ data, revid = self._storage.load(oid, version)
+ self.assertEqual(zodb_unpickle(data), MinPO(92))
+ data, revid = self._storage.load(oid, '')
+ self.assertEqual(zodb_unpickle(data), MinPO(91))
+
+ # and swap the storages
+ tmp = self._storage
+ self._storage = self._dst
+ self._abortVersion(version)
+ self.assert_(self._storage.versionEmpty(version))
+ self._undo(self._storage.undoInfo()[0]['id'], oid)
+ self.assert_(not self._storage.versionEmpty(version))
+
+ # check the data is what we expect it to be
+ data, revid = self._storage.load(oid, version)
+ self.assertEqual(zodb_unpickle(data), MinPO(92))
+ data, revid = self._storage.load(oid, '')
+ self.assertEqual(zodb_unpickle(data), MinPO(91))
+
+ # swap them back
+ self._storage = tmp
+
+ # Now remove _dst and copy all the transactions a second time.
+ # This time we will be able to confirm via compare().
+ self._dst.close()
+ StorageTestBase.removefs("Dest.fs")
+ self._dst = ZODB.FileStorage.FileStorage('Dest.fs')
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
- # Also make sure the the last transaction has a data record
- # with None for its data attribute, because we've undone the
- # object.
- for s in self._storage, self._dst:
- iter = s.iterator()
- for trans in iter:
- pass # iterate until we get the last one
- data = trans[0]
- self.assertRaises(IndexError, lambda i:trans[i], 1)
- self.assertEqual(data.oid, oid)
- self.assertEqual(data.data, None)
-
def test_suite():
suite = unittest.makeSuite(FileStorageTests, 'check')
@@ -144,15 +160,3 @@
runner = unittest.TextTestRunner()
runner.run(alltests)
-def debug():
- test_suite().debug()
-
-def pdebug():
- import pdb
- pdb.run('debug()')
-
-if __name__=='__main__':
- if len(sys.argv) > 1:
- globals()[sys.argv[1]]()
- else:
- main()