[Zope3-checkins] CVS: Zope3/src/zope/app/advanced/acquisition/tests
- AcquisitionTestBase.py:1.1 BasicAcquisition.py:1.1
HistoryAcquisition.py:1.1 IteratorAcquisition.py:1.1
LocalAcquisition.py:1.1 MTAcquisition.py:1.1
PackableAcquisition.py:1.1 PersistentAcquisition.py:1.1
ReadOnlyAcquisition.py:1.1 RecoveryAcquisition.py:1.1
RevisionAcquisition.py:1.1 TransactionalUndoAcquisition.py:1.1
TransactionalUndoVersionAcquisition.py:1.1
VersionAcquisition.py:1.1 testDemoAcquisition.py:1.1
testFileAcquisition.py:1.1 testMappingAcquisition.py:1.1
Sidnei da Silva
sidnei at awkly.org
Thu Apr 1 13:29:04 EST 2004
Update of /cvs-repository/Zope3/src/zope/app/advanced/acquisition/tests
In directory cvs.zope.org:/tmp/cvs-serv23661
Added Files:
AcquisitionTestBase.py BasicAcquisition.py
HistoryAcquisition.py IteratorAcquisition.py
LocalAcquisition.py MTAcquisition.py PackableAcquisition.py
PersistentAcquisition.py ReadOnlyAcquisition.py
RecoveryAcquisition.py RevisionAcquisition.py
TransactionalUndoAcquisition.py
TransactionalUndoVersionAcquisition.py VersionAcquisition.py
testDemoAcquisition.py testFileAcquisition.py
testMappingAcquisition.py
Log Message:
Ops, missed some files.
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/AcquisitionTestBase.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Provide a mixin base class for storage tests.
The StorageTestBase class provides basic setUp() and tearDown()
semantics (which you can override), and it also provides a helper
method _dostore() which performs a complete store transaction for a
single object revision.
"""
import errno
import os
import pickle
import string
import sys
import time
import types
import unittest
from cPickle import Pickler, Unpickler
from cStringIO import StringIO
from ZODB.Transaction import Transaction
from ZODB.utils import u64
from ZODB.tests.MinPO import MinPO
ZERO = '\0'*8
def snooze():
# In Windows, it's possible that two successive time.time() calls return
# the same value. Tim guarantees that time never runs backwards. You
# usually want to call this before you pack a storage, or must make other
# guarantees about increasing timestamps.
now = time.time()
while now == time.time():
time.sleep(0.1)
def zodb_pickle(obj):
"""Create a pickle in the format expected by ZODB."""
f = StringIO()
p = Pickler(f, 1)
p.persistent_id = lambda obj: getattr(obj, '_p_oid', None)
klass = obj.__class__
assert not hasattr(obj, '__getinitargs__'), "not ready for constructors"
args = None
mod = getattr(klass, '__module__', None)
if mod is not None:
klass = mod, klass.__name__
state = obj.__getstate__()
p.dump((klass, args))
p.dump(state)
return f.getvalue(1)
def persistent_load(pid):
# helper for zodb_unpickle
return "ref to %s.%s oid=%s" % (pid[1][0], pid[1][1], u64(pid[0]))
def zodb_unpickle(data):
"""Unpickle an object stored using the format expected by ZODB."""
f = StringIO(data)
u = Unpickler(f)
u.persistent_load = persistent_load
klass_info = u.load()
if isinstance(klass_info, types.TupleType):
if isinstance(klass_info[0], types.TupleType):
modname, klassname = klass_info[0]
args = klass_info[1]
else:
modname, klassname = klass_info
args = None
if modname == "__main__":
ns = globals()
else:
mod = import_helper(modname)
ns = mod.__dict__
try:
klass = ns[klassname]
except KeyError:
sys.stderr.write("can't find %s in %s" % (klassname,
repr(ns)))
inst = klass()
else:
raise ValueError, "expected class info: %s" % repr(klass_info)
state = u.load()
inst.__setstate__(state)
return inst
def handle_all_serials(oid, *args):
"""Return dict of oid to serialno from store() and tpc_vote().
Raises an exception if one of the calls raised an exception.
The storage interface got complicated when ZEO was introduced.
Any individual store() call can return None or a sequence of
2-tuples where the 2-tuple is either oid, serialno or an
exception to be raised by the client.
The original interface just returned the serialno for the
object.
"""
d = {}
for arg in args:
if isinstance(arg, types.StringType):
d[oid] = arg
elif arg is None:
pass
else:
for oid, serial in arg:
if not isinstance(serial, types.StringType):
raise serial # error from ZEO server
d[oid] = serial
return d
def handle_serials(oid, *args):
"""Return the serialno for oid based on multiple return values.
A helper for function _handle_all_serials().
"""
return handle_all_serials(oid, *args)[oid]
def import_helper(name):
mod = __import__(name)
return sys.modules[name]
def removefs(base):
"""Remove all files created by FileStorage with path base."""
for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
path = base + ext
try:
os.remove(path)
except os.error, err:
if err[0] != errno.ENOENT:
raise
class StorageTestBase(unittest.TestCase):
# XXX It would be simpler if concrete tests didn't need to extend
# setUp() and tearDown().
def setUp(self):
# You need to override this with a setUp that creates self._storage
self._storage = None
def _close(self):
# You should override this if closing your storage requires additional
# shutdown operations.
if self._storage is not None:
self._storage.close()
def tearDown(self):
self._close()
def _dostore(self, oid=None, revid=None, data=None, version=None,
already_pickled=0, user=None, description=None):
"""Do a complete storage transaction. The defaults are:
- oid=None, ask the storage for a new oid
- revid=None, use a revid of ZERO
- data=None, pickle up some arbitrary data (the integer 7)
- version=None, use the empty string version
Returns the object's new revision id.
"""
if oid is None:
oid = self._storage.new_oid()
if revid is None:
revid = ZERO
if data is None:
data = MinPO(7)
if type(data) == types.IntType:
data = MinPO(data)
if not already_pickled:
data = zodb_pickle(data)
if version is None:
version = ''
# Begin the transaction
t = Transaction()
if user is not None:
t.user = user
if description is not None:
t.description = description
try:
self._storage.tpc_begin(t)
# Store an object
r1 = self._storage.store(oid, revid, data, version, t)
# Finish the transaction
r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
return revid
def _dostoreNP(self, oid=None, revid=None, data=None, version=None,
user=None, description=None):
return self._dostore(oid, revid, data, version, 1, user, description)
# The following methods depend on optional storage features.
def _undo(self, tid, oid=None):
# Undo a tid that affects a single object (oid).
# XXX This is very specialized
t = Transaction()
t.note("undo")
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
if oid is not None:
self.assertEqual(len(oids), 1)
self.assertEqual(oids[0], oid)
return self._storage.lastTransaction()
def _commitVersion(self, src, dst):
t = Transaction()
t.note("commit %r to %r" % (src, dst))
self._storage.tpc_begin(t)
oids = self._storage.commitVersion(src, dst, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
return oids
def _abortVersion(self, ver):
t = Transaction()
t.note("abort %r" % ver)
self._storage.tpc_begin(t)
oids = self._storage.abortVersion(ver, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
return oids
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/BasicAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run the basic tests for a storage as described in the official storage API
The most complete and most out-of-date description of the interface is:
http://www.zope.org/Documentation/Developer/Models/ZODB/ZODB_Architecture_Storage_Interface_Info.html
All storages should be able to pass these tests.
"""
from ZODB.Transaction import Transaction
from ZODB import POSException
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase \
import zodb_unpickle, zodb_pickle, handle_serials
ZERO = '\0'*8
class BasicStorage:
def checkBasics(self):
t = Transaction()
self._storage.tpc_begin(t)
# This should simply return
self._storage.tpc_begin(t)
# Aborting is easy
self._storage.tpc_abort(t)
# Test a few expected exceptions when we're doing operations giving a
# different Transaction object than the one we've begun on.
self._storage.tpc_begin(t)
self.assertRaises(
POSException.StorageTransactionError,
self._storage.store,
0, 0, 0, 0, Transaction())
try:
self._storage.abortVersion('dummy', Transaction())
except (POSException.StorageTransactionError,
POSException.VersionCommitError):
pass # test passed ;)
else:
assert 0, "Should have failed, invalid transaction."
try:
self._storage.commitVersion('dummy', 'dummer', Transaction())
except (POSException.StorageTransactionError,
POSException.VersionCommitError):
pass # test passed ;)
else:
assert 0, "Should have failed, invalid transaction."
self.assertRaises(
POSException.StorageTransactionError,
self._storage.store,
0, 1, 2, 3, Transaction())
self._storage.tpc_abort(t)
def checkSerialIsNoneForInitialRevision(self):
eq = self.assertEqual
oid = self._storage.new_oid()
txn = Transaction()
self._storage.tpc_begin(txn)
# Use None for serial. Don't use _dostore() here because that coerces
# serial=None to serial=ZERO.
r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)),
'', txn)
r2 = self._storage.tpc_vote(txn)
self._storage.tpc_finish(txn)
newrevid = handle_serials(oid, r1, r2)
data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data)
eq(value, MinPO(11))
eq(revid, newrevid)
def checkNonVersionStore(self, oid=None, revid=None, version=None):
revid = ZERO
newrevid = self._dostore(revid=revid)
# Finish the transaction.
self.assertNotEqual(newrevid, revid)
def checkNonVersionStoreAndLoad(self):
eq = self.assertEqual
oid = self._storage.new_oid()
self._dostore(oid=oid, data=MinPO(7))
data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data)
eq(value, MinPO(7))
# Now do a bunch of updates to an object
for i in range(13, 22):
revid = self._dostore(oid, revid=revid, data=MinPO(i))
# Now get the latest revision of the object
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(21))
def checkNonVersionModifiedInVersion(self):
oid = self._storage.new_oid()
self._dostore(oid=oid)
self.assertEqual(self._storage.modifiedInVersion(oid), '')
def checkConflicts(self):
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
self.assertRaises(POSException.ConflictError,
self._dostore,
oid, revid=revid1, data=MinPO(13))
def checkWriteAfterAbort(self):
oid = self._storage.new_oid()
t = Transaction()
self._storage.tpc_begin(t)
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
# Now abort this transaction
self._storage.tpc_abort(t)
# Now start all over again
oid = self._storage.new_oid()
self._dostore(oid=oid, data=MinPO(6))
def checkAbortAfterVote(self):
oid1 = self._storage.new_oid()
revid1 = self._dostore(oid=oid1, data=MinPO(-2))
oid = self._storage.new_oid()
t = Transaction()
self._storage.tpc_begin(t)
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
# Now abort this transaction
self._storage.tpc_vote(t)
self._storage.tpc_abort(t)
# Now start all over again
oid = self._storage.new_oid()
revid = self._dostore(oid=oid, data=MinPO(6))
for oid, revid in [(oid1, revid1), (oid, revid)]:
data, _revid = self._storage.load(oid, '')
self.assertEqual(revid, _revid)
def checkStoreTwoObjects(self):
noteq = self.assertNotEqual
p31, p32, p51, p52 = map(MinPO, (31, 32, 51, 52))
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
noteq(oid1, oid2)
revid1 = self._dostore(oid1, data=p31)
revid2 = self._dostore(oid2, data=p51)
noteq(revid1, revid2)
revid3 = self._dostore(oid1, revid=revid1, data=p32)
revid4 = self._dostore(oid2, revid=revid2, data=p52)
noteq(revid3, revid4)
def checkGetSerial(self):
if not hasattr(self._storage, 'getSerial'):
return
eq = self.assertEqual
p41, p42 = map(MinPO, (41, 42))
oid = self._storage.new_oid()
self.assertRaises(KeyError, self._storage.getSerial, oid)
# Now store a revision
revid1 = self._dostore(oid, data=p41)
eq(revid1, self._storage.getSerial(oid))
# And another one
revid2 = self._dostore(oid, revid=revid1, data=p42)
eq(revid2, self._storage.getSerial(oid))
def checkTwoArgBegin(self):
# XXX how standard is three-argument tpc_begin()?
t = Transaction()
tid = '\0\0\0\0\0psu'
self._storage.tpc_begin(t, tid)
oid = self._storage.new_oid()
data = zodb_pickle(MinPO(8))
self._storage.store(oid, None, data, '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
def checkLen(self):
# len(storage) reports the number of objects.
# check it is zero when empty
self.assertEqual(len(self._storage),0)
# check it is correct when the storage contains two object.
# len may also be zero, for storages that do not keep track
# of this number
self._dostore(data=MinPO(22))
self._dostore(data=MinPO(23))
self.assert_(len(self._storage) in [0,2])
def checkGetSize(self):
self._dostore(data=MinPO(25))
size = self._storage.getSize()
# The storage API doesn't make any claims about what size
# means except that it ought to be printable.
str(size)
def checkNote(self):
oid = self._storage.new_oid()
t = Transaction()
self._storage.tpc_begin(t)
t.note('this is a test')
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
def checkGetExtensionMethods(self):
m = self._storage.getExtensionMethods()
self.assertEqual(type(m),type({}))
for k,v in m.items():
self.assertEqual(v,None)
self.assert_(callable(getattr(self._storage,k)))
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/HistoryAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run the history() related tests for a storage.
Any storage that supports the history() method should be able to pass
all these tests.
"""
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
class HistoryStorage:
def checkSimpleHistory(self):
eq = self.assertEqual
# Store a couple of non-version revisions of the object
oid = self._storage.new_oid()
self.assertRaises(KeyError,self._storage.history,oid)
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now get various snapshots of the object's history
h = self._storage.history(oid, size=1)
eq(len(h), 1)
d = h[0]
eq(d['serial'], revid3)
eq(d['version'], '')
# Try to get 2 historical revisions
h = self._storage.history(oid, size=2)
eq(len(h), 2)
d = h[0]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid2)
eq(d['version'], '')
# Try to get all 3 historical revisions
h = self._storage.history(oid, size=3)
eq(len(h), 3)
d = h[0]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid2)
eq(d['version'], '')
d = h[2]
eq(d['serial'], revid1)
eq(d['version'], '')
# There should be no more than 3 revisions
h = self._storage.history(oid, size=4)
eq(len(h), 3)
d = h[0]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid2)
eq(d['version'], '')
d = h[2]
eq(d['serial'], revid1)
eq(d['version'], '')
def checkVersionHistory(self):
if not self._storage.supportsVersions():
return
eq = self.assertEqual
# Store a couple of non-version revisions
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now store some new revisions in a version
version = 'test-version'
revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
version=version)
revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
version=version)
revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
version=version)
# Now, try to get the six historical revisions (first three are in
# 'test-version', followed by the non-version revisions).
h = self._storage.history(oid, version, 100)
eq(len(h), 6)
d = h[0]
eq(d['serial'], revid6)
eq(d['version'], version)
d = h[1]
eq(d['serial'], revid5)
eq(d['version'], version)
d = h[2]
eq(d['serial'], revid4)
eq(d['version'], version)
d = h[3]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[4]
eq(d['serial'], revid2)
eq(d['version'], '')
d = h[5]
eq(d['serial'], revid1)
eq(d['version'], '')
def checkHistoryAfterVersionCommit(self):
if not self._storage.supportsVersions():
return
eq = self.assertEqual
# Store a couple of non-version revisions
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now store some new revisions in a version
version = 'test-version'
revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
version=version)
revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
version=version)
revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
version=version)
# Now commit the version
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.commitVersion(version, '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# After consultation with Jim, we agreed that the semantics of
# revision id's after a version commit is that the committed object
# gets a new serial number (a.k.a. revision id). Note that
# FileStorage is broken here; the serial number in the post-commit
# non-version revision will be the same as the serial number of the
# previous in-version revision.
#
# BAW: Using load() is the only way to get the serial number of the
# current revision of the object. But at least this works for both
# broken and working storages.
ign, revid7 = self._storage.load(oid, '')
# Now, try to get the six historical revisions (first three are in
# 'test-version', followed by the non-version revisions).
h = self._storage.history(oid, version, 100)
eq(len(h), 7)
d = h[0]
eq(d['serial'], revid7)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid6)
eq(d['version'], version)
d = h[2]
eq(d['serial'], revid5)
eq(d['version'], version)
d = h[3]
eq(d['serial'], revid4)
eq(d['version'], version)
d = h[4]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[5]
eq(d['serial'], revid2)
eq(d['version'], '')
d = h[6]
eq(d['serial'], revid1)
eq(d['version'], '')
def checkHistoryAfterVersionAbort(self):
if not self._storage.supportsVersions():
return
eq = self.assertEqual
# Store a couple of non-version revisions
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now store some new revisions in a version
version = 'test-version'
revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
version=version)
revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
version=version)
revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
version=version)
# Now commit the version
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.abortVersion(version, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# After consultation with Jim, we agreed that the semantics of
# revision id's after a version commit is that the committed object
# gets a new serial number (a.k.a. revision id). Note that
# FileStorage is broken here; the serial number in the post-commit
# non-version revision will be the same as the serial number of the
# previous in-version revision.
#
# BAW: Using load() is the only way to get the serial number of the
# current revision of the object. But at least this works for both
# broken and working storages.
ign, revid7 = self._storage.load(oid, '')
# Now, try to get the six historical revisions (first three are in
# 'test-version', followed by the non-version revisions).
h = self._storage.history(oid, version, 100)
eq(len(h), 7)
d = h[0]
eq(d['serial'], revid7)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid6)
eq(d['version'], version)
d = h[2]
eq(d['serial'], revid5)
eq(d['version'], version)
d = h[3]
eq(d['serial'], revid4)
eq(d['version'], version)
d = h[4]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[5]
eq(d['serial'], revid2)
eq(d['version'], '')
d = h[6]
eq(d['serial'], revid1)
eq(d['version'], '')
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/IteratorAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run tests against the iterator() interface for storages.
Any storage that supports the iterator() method should be able to pass
all these tests.
"""
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
from ZODB.utils import U64, p64
from ZODB.Transaction import Transaction
class IteratorCompare:
def iter_verify(self, txniter, revids, val0):
eq = self.assertEqual
oid = self._oid
val = val0
for reciter, revid in zip(txniter, revids + [None]):
eq(reciter.tid, revid)
for rec in reciter:
eq(rec.oid, oid)
eq(rec.serial, revid)
eq(rec.version, '')
eq(zodb_unpickle(rec.data), MinPO(val))
val = val + 1
eq(val, val0 + len(revids))
txniter.close()
class IteratorStorage(IteratorCompare):
def checkSimpleIteration(self):
# Store a bunch of revisions of a single object
self._oid = oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now iterate over all the transactions and compare carefully
txniter = self._storage.iterator()
self.iter_verify(txniter, [revid1, revid2, revid3], 11)
def checkClose(self):
self._oid = oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
txniter = self._storage.iterator()
txniter.close()
self.assertRaises(IOError, txniter.__getitem__, 0)
def checkVersionIterator(self):
if not self._storage.supportsVersions():
return
self._dostore()
self._dostore(version='abort')
self._dostore()
self._dostore(version='abort')
t = Transaction()
self._storage.tpc_begin(t)
self._storage.abortVersion('abort', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self._dostore(version='commit')
self._dostore()
self._dostore(version='commit')
t = Transaction()
self._storage.tpc_begin(t)
self._storage.commitVersion('commit', '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
txniter = self._storage.iterator()
for trans in txniter:
for data in trans:
pass
def checkUndoZombieNonVersion(self):
if not hasattr(self._storage, 'supportsTransactionalUndo'):
return
if not self._storage.supportsTransactionalUndo():
return
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(94))
# Get the undo information
info = self._storage.undoInfo()
tid = info[0]['id']
# Undo the creation of the object, rendering it a zombie
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# Now attempt to iterator over the storage
iter = self._storage.iterator()
for txn in iter:
for rec in txn:
pass
# The last transaction performed an undo of the transaction that
# created object oid. (As Barry points out, the object is now in the
# George Bailey state.) Assert that the final data record contains
# None in the data attribute.
self.assertEqual(rec.oid, oid)
self.assertEqual(rec.data, None)
def checkTransactionExtensionFromIterator(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(1))
iter = self._storage.iterator()
count = 0
for txn in iter:
self.assertEqual(txn._extension, {})
count +=1
self.assertEqual(count, 1)
def checkIterationIntraTransaction(self):
# XXX try this test with logging enabled. If you see something like
#
# ZODB FS FS21 warn: FileStorageTests.fs truncated, possibly due to
# damaged records at 4
#
# Then the code in FileIterator.next() hasn't yet been fixed.
oid = self._storage.new_oid()
t = Transaction()
data = zodb_pickle(MinPO(0))
try:
self._storage.tpc_begin(t)
self._storage.store(oid, '\0'*8, data, '', t)
self._storage.tpc_vote(t)
# Don't do tpc_finish yet
it = self._storage.iterator()
for x in it:
pass
finally:
self._storage.tpc_finish(t)
class ExtendedIteratorStorage(IteratorCompare):
def checkExtendedIteration(self):
# Store a bunch of revisions of a single object
self._oid = oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
revid4 = self._dostore(oid, revid=revid3, data=MinPO(14))
# Note that the end points are included
# Iterate over all of the transactions with explicit start/stop
txniter = self._storage.iterator(revid1, revid4)
self.iter_verify(txniter, [revid1, revid2, revid3, revid4], 11)
# Iterate over some of the transactions with explicit start
txniter = self._storage.iterator(revid3)
self.iter_verify(txniter, [revid3, revid4], 13)
# Iterate over some of the transactions with explicit stop
txniter = self._storage.iterator(None, revid2)
self.iter_verify(txniter, [revid1, revid2], 11)
# Iterate over some of the transactions with explicit start+stop
txniter = self._storage.iterator(revid2, revid3)
self.iter_verify(txniter, [revid2, revid3], 12)
# Specify an upper bound somewhere in between values
revid3a = p64((U64(revid3) + U64(revid4)) / 2)
txniter = self._storage.iterator(revid2, revid3a)
self.iter_verify(txniter, [revid2, revid3], 12)
# Specify a lower bound somewhere in between values.
# revid2 == revid1+1 is very likely on Windows. Adding 1 before
# dividing ensures that "the midpoint" we compute is strictly larger
# than revid1.
revid1a = p64((U64(revid1) + 1 + U64(revid2)) / 2)
assert revid1 < revid1a
txniter = self._storage.iterator(revid1a, revid3a)
self.iter_verify(txniter, [revid2, revid3], 12)
# Specify an empty range
txniter = self._storage.iterator(revid3, revid2)
self.iter_verify(txniter, [], 13)
# Specify a singleton range
txniter = self._storage.iterator(revid3, revid3)
self.iter_verify(txniter, [revid3], 13)
class IteratorDeepCompare:
def compare(self, storage1, storage2):
eq = self.assertEqual
iter1 = storage1.iterator()
iter2 = storage2.iterator()
for txn1, txn2 in zip(iter1, iter2):
eq(txn1.tid, txn2.tid)
eq(txn1.status, txn2.status)
eq(txn1.user, txn2.user)
eq(txn1.description, txn2.description)
eq(txn1._extension, txn2._extension)
for rec1, rec2 in zip(txn1, txn2):
eq(rec1.oid, rec2.oid)
eq(rec1.serial, rec2.serial)
eq(rec1.version, rec2.version)
eq(rec1.data, rec2.data)
# Make sure there are no more records left in rec1 and rec2,
# meaning they were the same length.
self.assertRaises(IndexError, txn1.next)
self.assertRaises(IndexError, txn2.next)
# Make sure ther are no more records left in txn1 and txn2, meaning
# they were the same length
self.assertRaises(IndexError, iter1.next)
self.assertRaises(IndexError, iter2.next)
iter1.close()
iter2.close()
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/LocalAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
class LocalStorage:
"""A single test that only make sense for local storages.
A local storage is one that doens't use ZEO. The __len__()
implementation for ZEO is inexact.
"""
def checkLen(self):
eq = self.assertEqual
# The length of the database ought to grow by one each time
eq(len(self._storage), 0)
self._dostore()
eq(len(self._storage), 1)
self._dostore()
eq(len(self._storage), 2)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/MTAcquisition.py ===
import random
import sys
import threading
import time
import ZODB
from PersistentMapping import PersistentMapping
from ZODB.tests.StorageTestBase \
import StorageTestBase, zodb_pickle, zodb_unpickle, handle_serials
from ZODB.tests.MinPO import MinPO
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError
SHORT_DELAY = 0.01
def sort(l):
"Sort a list in place and return it."
l.sort()
return l
class TestThread(threading.Thread):
"""Base class for defining threads that run from unittest.
If the thread exits with an uncaught exception, catch it and
re-raise it when the thread is joined. The re-raise will cause
the test to fail.
The subclass should define a runtest() method instead of a run()
method.
"""
def __init__(self, test):
threading.Thread.__init__(self)
self.test = test
self._fail = None
self._exc_info = None
def run(self):
try:
self.runtest()
except:
self._exc_info = sys.exc_info()
def fail(self, msg=""):
self._test.fail(msg)
def join(self, timeout=None):
threading.Thread.join(self, timeout)
if self._exc_info:
raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
class ZODBClientThread(TestThread):
__super_init = TestThread.__init__
def __init__(self, db, test, commits=10, delay=SHORT_DELAY):
self.__super_init(test)
self.setDaemon(1)
self.db = db
self.test = test
self.commits = commits
self.delay = delay
def runtest(self):
conn = self.db.open()
root = conn.root()
d = self.get_thread_dict(root)
if d is None:
self.test.fail()
else:
for i in range(self.commits):
self.commit(d, i)
self.test.assertEqual(sort(d.keys()), range(self.commits))
def commit(self, d, num):
d[num] = time.time()
time.sleep(self.delay)
get_transaction().commit()
time.sleep(self.delay)
def get_thread_dict(self, root):
name = self.getName()
# arbitrarily limit to 10 re-tries
for i in range(10):
try:
m = PersistentMapping()
root[name] = m
get_transaction().commit()
break
except ConflictError, err:
get_transaction().abort()
root._p_jar.sync()
for i in range(10):
try:
return root.get(name)
except ConflictError:
get_transaction().abort()
class StorageClientThread(TestThread):
__super_init = TestThread.__init__
def __init__(self, storage, test, commits=10, delay=SHORT_DELAY):
self.__super_init(test)
self.storage = storage
self.test = test
self.commits = commits
self.delay = delay
self.oids = {}
def runtest(self):
for i in range(self.commits):
self.dostore(i)
self.check()
def check(self):
for oid, revid in self.oids.items():
data, serial = self.storage.load(oid, '')
self.test.assertEqual(serial, revid)
obj = zodb_unpickle(data)
self.test.assertEqual(obj.value[0], self.getName())
def pause(self):
time.sleep(self.delay)
def oid(self):
oid = self.storage.new_oid()
self.oids[oid] = None
return oid
def dostore(self, i):
data = zodb_pickle(MinPO((self.getName(), i)))
t = Transaction()
oid = self.oid()
self.pause()
self.storage.tpc_begin(t)
self.pause()
# Always create a new object, signified by None for revid
r1 = self.storage.store(oid, None, data, '', t)
self.pause()
r2 = self.storage.tpc_vote(t)
self.pause()
self.storage.tpc_finish(t)
self.pause()
revid = handle_serials(oid, r1, r2)
self.oids[oid] = revid
class ExtStorageClientThread(StorageClientThread):
def runtest(self):
# pick some other storage ops to execute
ops = [getattr(self, meth) for meth in dir(ExtStorageClientThread)
if meth.startswith('do_')]
assert ops, "Didn't find an storage ops in %s" % self.storage
# do a store to guarantee there's at least one oid in self.oids
self.dostore(0)
for i in range(self.commits - 1):
meth = random.choice(ops)
meth()
self.dostore(i)
self.check()
def pick_oid(self):
return random.choice(self.oids.keys())
def do_load(self):
oid = self.pick_oid()
self.storage.load(oid, '')
def do_loadSerial(self):
oid = self.pick_oid()
self.storage.loadSerial(oid, self.oids[oid])
def do_modifiedInVersion(self):
oid = self.pick_oid()
self.storage.modifiedInVersion(oid)
def do_undoLog(self):
self.storage.undoLog(0, -20)
def do_iterator(self):
try:
iter = self.storage.iterator()
except AttributeError:
# XXX It's hard to detect that a ZEO ClientStorage
# doesn't have this method, but does have all the others.
return
for obj in iter:
pass
class MTStorage:
"Test a storage with multiple client threads executing concurrently."
def _checkNThreads(self, n, constructor, *args):
threads = [constructor(*args) for i in range(n)]
for t in threads:
t.start()
for t in threads:
t.join(60)
for t in threads:
self.failIf(t.isAlive(), "thread failed to finish in 60 seconds")
def check2ZODBThreads(self):
db = ZODB.DB(self._storage)
self._checkNThreads(2, ZODBClientThread, db, self)
db.close()
def check7ZODBThreads(self):
db = ZODB.DB(self._storage)
self._checkNThreads(7, ZODBClientThread, db, self)
db.close()
def check2StorageThreads(self):
self._checkNThreads(2, StorageClientThread, self._storage, self)
def check7StorageThreads(self):
self._checkNThreads(7, StorageClientThread, self._storage, self)
def check4ExtStorageThread(self):
self._checkNThreads(4, ExtStorageClientThread, self._storage, self)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/PackableAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run some tests relevant for storages that support pack()."""
try:
import cPickle
pickle = cPickle
#import cPickle as pickle
except ImportError:
import pickle
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import threading
import time
from ZODB import DB
from Persistence import Persistent
from ZODB.referencesf import referencesf
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import snooze
from ZODB.POSException import ConflictError, StorageError
from ZODB.PersistentMapping import PersistentMapping
ZERO = '\0'*8
# This class is for the root object. It must not contain a getoid() method
# (really, attribute). The persistent pickling machinery -- in the dumps()
# function below -- will pickle Root objects as normal, but any attributes
# which reference persistent Object instances will get pickled as persistent
# ids, not as the object's state. This makes the referencesf stuff work,
# because it pickle sniffs for persistent ids (so we have to get those
# persistent ids into the root object's pickle).
class Root:
pass
# This is the persistent Object class. Because it has a getoid() method, the
# persistent pickling machinery -- in the dumps() function below -- will
# pickle the oid string instead of the object's actual state. Yee haw, this
# stuff is deep. ;)
class Object:
def __init__(self, oid):
self._oid = oid
def getoid(self):
return self._oid
class C(Persistent):
pass
# Here's where all the magic occurs. Sadly, the pickle module is a bit
# underdocumented, but here's what happens: by setting the persistent_id
# attribute to getpersid() on the pickler, that function gets called for every
# object being pickled. By returning None when the object has no getoid
# attribute, it signals pickle to serialize the object as normal. That's how
# the Root instance gets pickled correctly. But, if the object has a getoid
# attribute, then by returning that method's value, we tell pickle to
# serialize the persistent id of the object instead of the object's state.
# That sets the pickle up for proper sniffing by the referencesf machinery.
# Fun, huh?
def dumps(obj):
def getpersid(obj):
if hasattr(obj, 'getoid'):
return obj.getoid()
return None
s = StringIO()
p = pickle.Pickler(s)
p.persistent_id = getpersid
p.dump(obj)
return s.getvalue()
class PackableStorageBase:
# We keep a cache of object ids to instances so that the unpickler can
# easily return any persistent object.
_cache = {}
def _newobj(self):
# This is a convenience method to create a new persistent Object
# instance. It asks the storage for a new object id, creates the
# instance with the given oid, populates the cache and returns the
# object.
oid = self._storage.new_oid()
obj = Object(oid)
self._cache[obj.getoid()] = obj
return obj
def _makeloader(self):
# This is the other side of the persistent pickling magic. We need a
# custom unpickler to mirror our custom pickler above. By setting the
# persistent_load function of the unpickler to self._cache.get(),
# whenever a persistent id is unpickled, it will actually return the
# Object instance out of the cache. As far as returning a function
# with an argument bound to an instance attribute method, we do it
# this way because it makes the code in the tests more succinct.
#
# BUT! Be careful in your use of loads() vs. pickle.loads(). loads()
# should only be used on the Root object's pickle since it's the only
# special one. All the Object instances should use pickle.loads().
def loads(str, persfunc=self._cache.get):
fp = StringIO(str)
u = pickle.Unpickler(fp)
u.persistent_load = persfunc
return u.load()
return loads
class PackableStorage(PackableStorageBase):
def _initroot(self):
try:
self._storage.load(ZERO, '')
except KeyError:
import PersistentMapping
from ZODB.Transaction import Transaction
file = StringIO()
p = cPickle.Pickler(file, 1)
p.dump((PersistentMapping.PersistentMapping, None))
p.dump({'_container': {}})
t=Transaction()
t.description='initial database creation'
self._storage.tpc_begin(t)
self._storage.store(ZERO, None, file.getvalue(), '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
def checkPackEmptyStorage(self):
self._storage.pack(time.time(), referencesf)
def checkPackTomorrow(self):
self._initroot()
self._storage.pack(time.time() + 10000, referencesf)
def checkPackYesterday(self):
self._initroot()
self._storage.pack(time.time() - 10000, referencesf)
def checkPackAllRevisions(self):
self._initroot()
eq = self.assertEqual
raises = self.assertRaises
# Create a `persistent' object
obj = self._newobj()
oid = obj.getoid()
obj.value = 1
# Commit three different revisions
revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
obj.value = 2
revid2 = self._dostoreNP(oid, revid=revid1, data=pickle.dumps(obj))
obj.value = 3
revid3 = self._dostoreNP(oid, revid=revid2, data=pickle.dumps(obj))
# Now make sure all three revisions can be extracted
data = self._storage.loadSerial(oid, revid1)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 1)
data = self._storage.loadSerial(oid, revid2)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 2)
data = self._storage.loadSerial(oid, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 3)
# Now pack all transactions; need to sleep a second to make
# sure that the pack time is greater than the last commit time.
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
self._storage.pack(packtime, referencesf)
# All revisions of the object should be gone, since there is no
# reference from the root object to this object.
raises(KeyError, self._storage.loadSerial, oid, revid1)
raises(KeyError, self._storage.loadSerial, oid, revid2)
raises(KeyError, self._storage.loadSerial, oid, revid3)
def checkPackJustOldRevisions(self):
eq = self.assertEqual
raises = self.assertRaises
loads = self._makeloader()
# Create a root object. This can't be an instance of Object,
# otherwise the pickling machinery will serialize it as a persistent
# id and not as an object that contains references (persistent ids) to
# other objects.
root = Root()
# Create a persistent object, with some initial state
obj = self._newobj()
oid = obj.getoid()
# Link the root object to the persistent object, in order to keep the
# persistent object alive. Store the root object.
root.obj = obj
root.value = 0
revid0 = self._dostoreNP(ZERO, data=dumps(root))
# Make sure the root can be retrieved
data, revid = self._storage.load(ZERO, '')
eq(revid, revid0)
eq(loads(data).value, 0)
# Commit three different revisions of the other object
obj.value = 1
revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
obj.value = 2
revid2 = self._dostoreNP(oid, revid=revid1, data=pickle.dumps(obj))
obj.value = 3
revid3 = self._dostoreNP(oid, revid=revid2, data=pickle.dumps(obj))
# Now make sure all three revisions can be extracted
data = self._storage.loadSerial(oid, revid1)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 1)
data = self._storage.loadSerial(oid, revid2)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 2)
data = self._storage.loadSerial(oid, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 3)
# Now pack just revisions 1 and 2. The object's current revision
# should stay alive because it's pointed to by the root.
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
self._storage.pack(packtime, referencesf)
# Make sure the revisions are gone, but that object zero and revision
# 3 are still there and correct
data, revid = self._storage.load(ZERO, '')
eq(revid, revid0)
eq(loads(data).value, 0)
raises(KeyError, self._storage.loadSerial, oid, revid1)
raises(KeyError, self._storage.loadSerial, oid, revid2)
data = self._storage.loadSerial(oid, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 3)
data, revid = self._storage.load(oid, '')
eq(revid, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 3)
def checkPackOnlyOneObject(self):
eq = self.assertEqual
raises = self.assertRaises
loads = self._makeloader()
# Create a root object. This can't be an instance of Object,
# otherwise the pickling machinery will serialize it as a persistent
# id and not as an object that contains references (persistent ids) to
# other objects.
root = Root()
# Create a persistent object, with some initial state
obj1 = self._newobj()
oid1 = obj1.getoid()
# Create another persistent object, with some initial state. Make
# sure it's oid is greater than the first object's oid.
obj2 = self._newobj()
oid2 = obj2.getoid()
self.failUnless(oid2 > oid1)
# Link the root object to the persistent objects, in order to keep
# them alive. Store the root object.
root.obj1 = obj1
root.obj2 = obj2
root.value = 0
revid0 = self._dostoreNP(ZERO, data=dumps(root))
# Make sure the root can be retrieved
data, revid = self._storage.load(ZERO, '')
eq(revid, revid0)
eq(loads(data).value, 0)
# Commit three different revisions of the first object
obj1.value = 1
revid1 = self._dostoreNP(oid1, data=pickle.dumps(obj1))
obj1.value = 2
revid2 = self._dostoreNP(oid1, revid=revid1, data=pickle.dumps(obj1))
obj1.value = 3
revid3 = self._dostoreNP(oid1, revid=revid2, data=pickle.dumps(obj1))
# Now make sure all three revisions can be extracted
data = self._storage.loadSerial(oid1, revid1)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid1)
eq(pobj.value, 1)
data = self._storage.loadSerial(oid1, revid2)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid1)
eq(pobj.value, 2)
data = self._storage.loadSerial(oid1, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid1)
eq(pobj.value, 3)
# Now commit a revision of the second object
obj2.value = 11
revid4 = self._dostoreNP(oid2, data=pickle.dumps(obj2))
# And make sure the revision can be extracted
data = self._storage.loadSerial(oid2, revid4)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid2)
eq(pobj.value, 11)
# Now pack just revisions 1 and 2 of object1. Object1's current
# revision should stay alive because it's pointed to by the root, as
# should Object2's current revision.
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
self._storage.pack(packtime, referencesf)
# Make sure the revisions are gone, but that object zero, object2, and
# revision 3 of object1 are still there and correct.
data, revid = self._storage.load(ZERO, '')
eq(revid, revid0)
eq(loads(data).value, 0)
raises(KeyError, self._storage.loadSerial, oid1, revid1)
raises(KeyError, self._storage.loadSerial, oid1, revid2)
data = self._storage.loadSerial(oid1, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid1)
eq(pobj.value, 3)
data, revid = self._storage.load(oid1, '')
eq(revid, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid1)
eq(pobj.value, 3)
data, revid = self._storage.load(oid2, '')
eq(revid, revid4)
eq(loads(data).value, 11)
data = self._storage.loadSerial(oid2, revid4)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid2)
eq(pobj.value, 11)
def checkPackUnlinkedFromRoot(self):
eq = self.assertEqual
db = DB(self._storage)
conn = db.open()
root = conn.root()
txn = get_transaction()
txn.note('root')
txn.commit()
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
obj = C()
obj.value = 7
root['obj'] = obj
txn = get_transaction()
txn.note('root -> o1')
txn.commit()
del root['obj']
txn = get_transaction()
txn.note('root -x-> o1')
txn.commit()
self._storage.pack(packtime, referencesf)
log = self._storage.undoLog()
tid = log[0]['id']
db.undo(tid)
txn = get_transaction()
txn.note('undo root -x-> o1')
txn.commit()
conn.sync()
eq(root['obj'].value, 7)
def _PackWhileWriting(self, pack_now=0):
# A storage should allow some reading and writing during
# a pack. This test attempts to exercise locking code
# in the storage to test that it is safe. It generates
# a lot of revisions, so that pack takes a long time.
db = DB(self._storage)
conn = db.open()
root = conn.root()
for i in range(10):
root[i] = MinPO(i)
get_transaction().commit()
snooze()
packt = time.time()
for j in range(10):
for i in range(10):
root[i].value = MinPO(i)
get_transaction().commit()
threads = [ClientThread(db) for i in range(4)]
for t in threads:
t.start()
if pack_now:
db.pack(time.time())
else:
db.pack(packt)
for t in threads:
t.join(30)
for t in threads:
t.join(1)
self.assert_(not t.isAlive())
# Iterate over the storage to make sure it's sane, but not every
# storage supports iterators.
if not hasattr(self._storage, "iterator"):
return
iter = self._storage.iterator()
for txn in iter:
for data in txn:
pass
iter.close()
def checkPackWhileWriting(self):
self._PackWhileWriting(pack_now=0)
def checkPackNowWhileWriting(self):
self._PackWhileWriting(pack_now=1)
def checkRedundantPack(self):
# It is an error to perform a pack with a packtime earlier
# than a previous packtime. The storage can't do a full
# traversal as of the packtime, because the previous pack may
# have removed revisions necessary for a full traversal.
# It should be simple to test that a storage error is raised,
# but this test case goes to the trouble of constructing a
# scenario that would lose data if the earlier packtime was
# honored.
self._initroot()
db = DB(self._storage)
conn = db.open()
root = conn.root()
root["d"] = d = PersistentMapping()
get_transaction().commit()
snooze()
obj = d["obj"] = C()
obj.value = 1
get_transaction().commit()
snooze()
packt1 = time.time()
lost_oid = obj._p_oid
obj = d["anotherobj"] = C()
obj.value = 2
get_transaction().commit()
snooze()
packt2 = time.time()
db.pack(packt2)
# BDBStorage allows the second pack, but doesn't lose data.
try:
db.pack(packt1)
except StorageError:
pass
# This object would be removed by the second pack, even though
# it is reachable.
self._storage.load(lost_oid, "")
def checkPackUndoLog(self):
self._initroot()
eq = self.assertEqual
raises = self.assertRaises
# Create a `persistent' object
obj = self._newobj()
oid = obj.getoid()
obj.value = 1
# Commit two different revisions
revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
obj.value = 2
snooze()
packtime = time.time()
snooze()
revid2 = self._dostoreNP(oid, revid=revid1, data=pickle.dumps(obj))
# Now pack the first transaction
self.assertEqual(3,len(self._storage.undoLog()))
self._storage.pack(packtime, referencesf)
# The undo log contains only the most resent transaction
self.assertEqual(1,len(self._storage.undoLog()))
def dont_checkPackUndoLogUndoable(self):
# A disabled test. I wanted to test that the content of the
# undo log was consistent, but every storage appears to
# include something slightly different. If the result of this
# method is only used to fill a GUI then this difference
# doesnt matter. Perhaps re-enable this test once we agree
# what should be asserted.
self._initroot()
# Create two `persistent' object
obj1 = self._newobj()
oid1 = obj1.getoid()
obj1.value = 1
obj2 = self._newobj()
oid2 = obj2.getoid()
obj2.value = 2
# Commit the first revision of each of them
revid11 = self._dostoreNP(oid1, data=pickle.dumps(obj1),
description="1-1")
revid22 = self._dostoreNP(oid2, data=pickle.dumps(obj2),
description="2-2")
# remember the time. everything above here will be packed away
snooze()
packtime = time.time()
snooze()
# Commit two revisions of the first object
obj1.value = 3
revid13 = self._dostoreNP(oid1, revid=revid11,
data=pickle.dumps(obj1), description="1-3")
obj1.value = 4
revid14 = self._dostoreNP(oid1, revid=revid13,
data=pickle.dumps(obj1), description="1-4")
# Commit one revision of the second object
obj2.value = 5
revid25 = self._dostoreNP(oid2, revid=revid22,
data=pickle.dumps(obj2), description="2-5")
# Now pack
self.assertEqual(6,len(self._storage.undoLog()))
print '\ninitial undoLog was'
for r in self._storage.undoLog(): print r
self._storage.pack(packtime, referencesf)
# The undo log contains only two undoable transaction.
print '\nafter packing undoLog was'
for r in self._storage.undoLog(): print r
# what can we assert about that?
class ClientThread(threading.Thread):
def __init__(self, db):
threading.Thread.__init__(self)
self.root = db.open().root()
def run(self):
for j in range(50):
try:
self.root[j % 10].value = MinPO(j)
get_transaction().commit()
except ConflictError:
get_transaction().abort()
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/PersistentAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test that a storage's values persist across open and close."""
class PersistentStorage:
def checkUpdatesPersist(self):
oids = []
def new_oid_wrapper(l=oids, new_oid=self._storage.new_oid):
oid = new_oid()
l.append(oid)
return oid
self._storage.new_oid = new_oid_wrapper
self._dostore()
oid = self._storage.new_oid()
revid = self._dostore(oid)
if self._storage.supportsVersions():
self._dostore(oid, revid, data=8, version='b')
oid = self._storage.new_oid()
revid = self._dostore(oid, data=1)
revid = self._dostore(oid, revid, data=2)
self._dostore(oid, revid, data=3)
# keep copies of all the objects
objects = []
for oid in oids:
p, s = self._storage.load(oid, '')
objects.append((oid, '', p, s))
ver = self._storage.modifiedInVersion(oid)
if ver:
p, s = self._storage.load(oid, ver)
objects.append((oid, ver, p, s))
self._storage.close()
self.open()
# keep copies of all the objects
for oid, ver, p, s in objects:
_p, _s = self._storage.load(oid, ver)
self.assertEquals(p, _p)
self.assertEquals(s, _s)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/ReadOnlyAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from ZODB.POSException import ReadOnlyError
from ZODB.Transaction import Transaction
class ReadOnlyStorage:
def _create_data(self):
# test a read-only storage that already has some data
self.oids = {}
for i in range(10):
oid = self._storage.new_oid()
revid = self._dostore(oid)
self.oids[oid] = revid
def _make_readonly(self):
self._storage.close()
self.open(read_only=1)
self.assert_(self._storage.isReadOnly())
def checkReadMethods(self):
self._create_data()
self._make_readonly()
# XXX not going to bother checking all read methods
for oid in self.oids.keys():
data, revid = self._storage.load(oid, '')
self.assertEqual(revid, self.oids[oid])
self.assert_(not self._storage.modifiedInVersion(oid))
_data = self._storage.loadSerial(oid, revid)
self.assertEqual(data, _data)
def checkWriteMethods(self):
self._make_readonly()
self.assertRaises(ReadOnlyError, self._storage.new_oid)
t = Transaction()
self.assertRaises(ReadOnlyError, self._storage.tpc_begin, t)
if self._storage.supportsVersions():
self.assertRaises(ReadOnlyError, self._storage.abortVersion,
'', t)
self.assertRaises(ReadOnlyError, self._storage.commitVersion,
'', '', t)
self.assertRaises(ReadOnlyError, self._storage.store,
'\000' * 8, None, '', '', t)
if self._storage.supportsTransactionalUndo():
self.assertRaises(ReadOnlyError, self._storage.transactionalUndo,
'\000' * 8, t)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/RecoveryAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""More recovery and iterator tests."""
from ZODB.Transaction import Transaction
from ZODB.tests.IteratorStorage import IteratorDeepCompare
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
from ZODB import DB
from ZODB.referencesf import referencesf
import time
class RecoveryStorage(IteratorDeepCompare):
# Requires a setUp() that creates a self._dst destination storage
def checkSimpleRecovery(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=11)
revid = self._dostore(oid, revid=revid, data=12)
revid = self._dostore(oid, revid=revid, data=13)
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def checkRecoveryAcrossVersions(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=21)
revid = self._dostore(oid, revid=revid, data=22)
revid = self._dostore(oid, revid=revid, data=23, version='one')
revid = self._dostore(oid, revid=revid, data=34, version='one')
# Now commit the version
t = Transaction()
self._storage.tpc_begin(t)
self._storage.commitVersion('one', '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def checkRecoverAbortVersion(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=21, version="one")
revid = self._dostore(oid, revid=revid, data=23, version='one')
revid = self._dostore(oid, revid=revid, data=34, version='one')
# Now abort the version and the creation
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.abortVersion('one', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(oids, [oid])
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
# Also make sure the the last transaction has a data record
# with None for its data attribute, because we've undone the
# object.
for s in self._storage, self._dst:
iter = s.iterator()
for trans in iter:
pass # iterate until we get the last one
data = trans[0]
self.assertRaises(IndexError, lambda i, t=trans: t[i], 1)
self.assertEqual(data.oid, oid)
self.assertEqual(data.data, None)
def checkRecoverUndoInVersion(self):
oid = self._storage.new_oid()
version = "aVersion"
revid_a = self._dostore(oid, data=MinPO(91))
revid_b = self._dostore(oid, revid=revid_a, version=version,
data=MinPO(92))
revid_c = self._dostore(oid, revid=revid_b, version=version,
data=MinPO(93))
self._undo(self._storage.undoInfo()[0]['id'], oid)
self._commitVersion(version, '')
self._undo(self._storage.undoInfo()[0]['id'], oid)
# now copy the records to a new storage
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
# The last two transactions were applied directly rather than
# copied. So we can't use compare() to verify that they new
# transactions are applied correctly. (The new transactions
# will have different timestamps for each storage.)
self._abortVersion(version)
self.assert_(self._storage.versionEmpty(version))
self._undo(self._storage.undoInfo()[0]['id'], oid)
self.assert_(not self._storage.versionEmpty(version))
# check the data is what we expect it to be
data, revid = self._storage.load(oid, version)
self.assertEqual(zodb_unpickle(data), MinPO(92))
data, revid = self._storage.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO(91))
# and swap the storages
tmp = self._storage
self._storage = self._dst
self._abortVersion(version)
self.assert_(self._storage.versionEmpty(version))
self._undo(self._storage.undoInfo()[0]['id'], oid)
self.assert_(not self._storage.versionEmpty(version))
# check the data is what we expect it to be
data, revid = self._storage.load(oid, version)
self.assertEqual(zodb_unpickle(data), MinPO(92))
data, revid = self._storage.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO(91))
# swap them back
self._storage = tmp
# Now remove _dst and copy all the transactions a second time.
# This time we will be able to confirm via compare().
self._dst.close()
self._dst.cleanup()
self._dst = self.new_dest()
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def checkRestoreAcrossPack(self):
db = DB(self._storage)
c = db.open()
r = c.root()
obj = r["obj1"] = MinPO(1)
get_transaction().commit()
obj = r["obj2"] = MinPO(1)
get_transaction().commit()
self._dst.copyTransactionsFrom(self._storage)
self._dst.pack(time.time(), referencesf)
self._undo(self._storage.undoInfo()[0]['id'])
# copy the final transaction manually. even though there
# was a pack, the restore() ought to succeed.
it = self._storage.iterator()
final = list(it)[-1]
self._dst.tpc_begin(final, final.tid, final.status)
for r in final:
self._dst.restore(r.oid, r.serial, r.data, r.version, r.data_txn,
final)
it.close()
self._dst.tpc_vote(final)
self._dst.tpc_finish(final)
def checkPackWithGCOnDestinationAfterRestore(self):
raises = self.assertRaises
db = DB(self._storage)
conn = db.open()
root = conn.root()
root.obj = obj1 = MinPO(1)
txn = get_transaction()
txn.note('root -> obj')
txn.commit()
root.obj.obj = obj2 = MinPO(2)
txn = get_transaction()
txn.note('root -> obj -> obj')
txn.commit()
del root.obj
txn = get_transaction()
txn.note('root -X->')
txn.commit()
# Now copy the transactions to the destination
self._dst.copyTransactionsFrom(self._storage)
# Now pack the destination.
snooze()
self._dst.pack(time.time(), referencesf)
# And check to see that the root object exists, but not the other
# objects.
data, serial = self._dst.load(root._p_oid, '')
raises(KeyError, self._dst.load, obj1._p_oid, '')
raises(KeyError, self._dst.load, obj2._p_oid, '')
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/RevisionAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Check loadSerial() on storages that support historical revisions."""
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
ZERO = '\0'*8
class RevisionStorage:
def checkLoadSerial(self):
oid = self._storage.new_oid()
revid = ZERO
revisions = {}
for i in range(31, 38):
revid = self._dostore(oid, revid=revid, data=MinPO(i))
revisions[revid] = MinPO(i)
# Now make sure all the revisions have the correct value
for revid, value in revisions.items():
data = self._storage.loadSerial(oid, revid)
self.assertEqual(zodb_unpickle(data), value)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/TransactionalUndoAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Check transactionalUndo().
Any storage that supports transactionalUndo() must pass these tests.
"""
from __future__ import nested_scopes
import time
import types
from ZODB import POSException
from ZODB.Transaction import Transaction
from ZODB.referencesf import referencesf
from ZODB.utils import u64, p64
from ZODB import DB
from Persistence import Persistent
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
ZERO = '\0'*8
class C(Persistent):
pass
def snooze():
# In Windows, it's possible that two successive time.time() calls return
# the same value. Tim guarantees that time never runs backwards. You
# usually want to call this before you pack a storage, or must make other
# guarantees about increasing timestamps.
now = time.time()
while now == time.time():
time.sleep(0.1)
def listeq(L1, L2):
"""Return True if L1.sort() == L2.sort()"""
c1 = L1[:]
c2 = L2[:]
c1.sort()
c2.sort()
return c1 == c2
class TransactionalUndoStorage:
def _transaction_begin(self):
self.__serials = {}
def _transaction_store(self, oid, rev, data, vers, trans):
r = self._storage.store(oid, rev, data, vers, trans)
if r:
if type(r) == types.StringType:
self.__serials[oid] = r
else:
for oid, serial in r:
self.__serials[oid] = serial
def _transaction_vote(self, trans):
r = self._storage.tpc_vote(trans)
if r:
for oid, serial in r:
self.__serials[oid] = serial
def _transaction_newserial(self, oid):
return self.__serials[oid]
def _multi_obj_transaction(self, objs):
newrevs = {}
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
for oid, rev, data in objs:
self._transaction_store(oid, rev, data, '', t)
newrevs[oid] = None
self._transaction_vote(t)
self._storage.tpc_finish(t)
for oid in newrevs.keys():
newrevs[oid] = self._transaction_newserial(oid)
return newrevs
def _iterate(self):
"""Iterate over the storage in its final state."""
# This is testing that the iterator() code works correctly.
# The hasattr() guards against ZEO, which doesn't support iterator.
if not hasattr(self._storage, "iterator"):
return
iter = self._storage.iterator()
for txn in iter:
for rec in txn:
pass
def checkSimpleTransactionalUndo(self):
eq = self.assertEqual
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(23))
revid = self._dostore(oid, revid=revid, data=MinPO(24))
revid = self._dostore(oid, revid=revid, data=MinPO(25))
info = self._storage.undoInfo()
tid = info[0]['id']
# Now start an undo transaction
t = Transaction()
t.note('undo1')
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(24))
# Do another one
info = self._storage.undoInfo()
tid = info[2]['id']
t = Transaction()
t.note('undo2')
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(23))
# Try to undo the first record
info = self._storage.undoInfo()
tid = info[4]['id']
t = Transaction()
t.note('undo3')
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
# This should fail since we've undone the object's creation
self.assertRaises(KeyError,
self._storage.load, oid, '')
# And now let's try to redo the object's creation
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(23))
self._iterate()
def checkCreationUndoneGetSerial(self):
# create an object
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(23))
# undo its creation
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
t.note('undo1')
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# Check that calling getSerial on an uncreated object raises a KeyError
# The current version of FileStorage fails this test
self.assertRaises(KeyError, self._storage.getSerial, oid)
def checkUndoCreationBranch1(self):
eq = self.assertEqual
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12))
# Undo the last transaction
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(11))
# Now from here, we can either redo the last undo, or undo the object
# creation. Let's undo the object creation.
info = self._storage.undoInfo()
tid = info[2]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
self.assertRaises(KeyError, self._storage.load, oid, '')
self._iterate()
def checkUndoCreationBranch2(self):
eq = self.assertEqual
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12))
# Undo the last transaction
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(11))
# Now from here, we can either redo the last undo, or undo the object
# creation. Let's redo the last undo
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(12))
self._iterate()
def checkTwoObjectUndo(self):
eq = self.assertEqual
# Convenience
p31, p32, p51, p52 = map(zodb_pickle,
map(MinPO, (31, 32, 51, 52)))
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
revid1 = revid2 = ZERO
# Store two objects in the same transaction
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p31, '', t)
self._transaction_store(oid2, revid2, p51, '', t)
# Finish the transaction
self._transaction_vote(t)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(t)
eq(revid1, revid2)
# Update those same two objects
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p32, '', t)
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(t)
eq(revid1, revid2)
# Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(32))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(52))
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 2)
self.failUnless(oid1 in oids)
self.failUnless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(31))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(51))
self._iterate()
def checkTwoObjectUndoAtOnce(self):
# Convenience
eq = self.assertEqual
unless = self.failUnless
p30, p31, p32, p50, p51, p52 = map(zodb_pickle,
map(MinPO,
(30, 31, 32, 50, 51, 52)))
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
revid1 = revid2 = ZERO
# Store two objects in the same transaction
d = self._multi_obj_transaction([(oid1, revid1, p30),
(oid2, revid2, p50),
])
eq(d[oid1], d[oid2])
# Update those same two objects
d = self._multi_obj_transaction([(oid1, d[oid1], p31),
(oid2, d[oid2], p51),
])
eq(d[oid1], d[oid2])
# Update those same two objects
d = self._multi_obj_transaction([(oid1, d[oid1], p32),
(oid2, d[oid2], p52),
])
eq(d[oid1], d[oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
# Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(32))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(52))
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[0]['id']
tid1 = info[1]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
oids1 = self._storage.transactionalUndo(tid1, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# We get the finalization stuff called an extra time:
## self._storage.tpc_vote(t)
## self._storage.tpc_finish(t)
eq(len(oids), 2)
eq(len(oids1), 2)
unless(oid1 in oids)
unless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(30))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(50))
# Now try to undo the one we just did to undo, whew
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 2)
unless(oid1 in oids)
unless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(32))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(52))
self._iterate()
def checkTwoObjectUndoAgain(self):
eq = self.assertEqual
p31, p32, p33, p51, p52, p53 = map(
zodb_pickle,
map(MinPO, (31, 32, 33, 51, 52, 53)))
# Like the above, but the first revision of the objects are stored in
# different transactions.
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
revid1 = self._dostore(oid1, data=p31, already_pickled=1)
revid2 = self._dostore(oid2, data=p51, already_pickled=1)
# Update those same two objects
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p32, '', t)
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
self._storage.tpc_finish(t)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 2)
self.failUnless(oid1 in oids)
self.failUnless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(31))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(51))
# Like the above, but this time, the second transaction contains only
# one object.
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p33, '', t)
self._transaction_store(oid2, revid2, p53, '', t)
# Finish the transaction
self._transaction_vote(t)
self._storage.tpc_finish(t)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
# Update in different transactions
revid1 = self._dostore(oid1, revid=revid1, data=MinPO(34))
revid2 = self._dostore(oid2, revid=revid2, data=MinPO(54))
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
self.failUnless(oid1 in oids)
self.failUnless(not oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(33))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(54))
self._iterate()
def checkNotUndoable(self):
eq = self.assertEqual
# Set things up so we've got a transaction that can't be undone
oid = self._storage.new_oid()
revid_a = self._dostore(oid, data=MinPO(51))
revid_b = self._dostore(oid, revid=revid_a, data=MinPO(52))
revid_c = self._dostore(oid, revid=revid_b, data=MinPO(53))
# Start the undo
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
self._storage.tpc_begin(t)
self.assertRaises(POSException.UndoError,
self._storage.transactionalUndo,
tid, t)
self._storage.tpc_abort(t)
# Now have more fun: object1 and object2 are in the same transaction,
# which we'll try to undo to, but one of them has since modified in
# different transaction, so the undo should fail.
oid1 = oid
revid1 = revid_c
oid2 = self._storage.new_oid()
revid2 = ZERO
p81, p82, p91, p92 = map(zodb_pickle,
map(MinPO, (81, 82, 91, 92)))
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p81, '', t)
self._transaction_store(oid2, revid2, p91, '', t)
self._transaction_vote(t)
self._storage.tpc_finish(t)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
# Make sure the objects have the expected values
data, revid_11 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(81))
data, revid_22 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(91))
eq(revid_11, revid1)
eq(revid_22, revid2)
# Now modify oid2
revid2 = self._dostore(oid2, revid=revid2, data=MinPO(92))
self.assertNotEqual(revid1, revid2)
self.assertNotEqual(revid2, revid_22)
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
self._storage.tpc_begin(t)
self.assertRaises(POSException.UndoError,
self._storage.transactionalUndo,
tid, t)
self._storage.tpc_abort(t)
self._iterate()
def checkTransactionalUndoAfterPack(self):
eq = self.assertEqual
# Add a few object revisions
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(51))
packtime = time.time()
snooze() # time.time() now distinct from packtime
revid2 = self._dostore(oid, revid=revid1, data=MinPO(52))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(53))
# Now get the undo log
info = self._storage.undoInfo()
eq(len(info), 3)
tid = info[0]['id']
# Now pack just the initial revision of the object. We need the
# second revision otherwise we won't be able to undo the third
# revision!
self._storage.pack(packtime, referencesf)
# Make some basic assertions about the undo information now
info2 = self._storage.undoInfo()
eq(len(info2), 2)
# And now attempt to undo the last transaction
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
# The object must now be at the second state
eq(zodb_unpickle(data), MinPO(52))
self._iterate()
def checkTransactionalUndoAfterPackWithObjectUnlinkFromRoot(self):
eq = self.assertEqual
db = DB(self._storage)
conn = db.open()
root = conn.root()
o1 = C()
o2 = C()
root['obj'] = o1
o1.obj = o2
txn = get_transaction()
txn.note('o1 -> o2')
txn.commit()
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
o3 = C()
o2.obj = o3
txn = get_transaction()
txn.note('o1 -> o2 -> o3')
txn.commit()
o1.obj = o3
txn = get_transaction()
txn.note('o1 -> o3')
txn.commit()
log = self._storage.undoLog()
eq(len(log), 4)
for entry in zip(log, ('o1 -> o3', 'o1 -> o2 -> o3',
'o1 -> o2', 'initial database creation')):
eq(entry[0]['description'], entry[1])
self._storage.pack(packtime, referencesf)
log = self._storage.undoLog()
for entry in zip(log, ('o1 -> o3', 'o1 -> o2 -> o3')):
eq(entry[0]['description'], entry[1])
tid = log[0]['id']
db.undo(tid)
txn = get_transaction()
txn.note('undo')
txn.commit()
# undo does a txn-undo, but doesn't invalidate
conn.sync()
log = self._storage.undoLog()
for entry in zip(log, ('undo', 'o1 -> o3', 'o1 -> o2 -> o3')):
eq(entry[0]['description'], entry[1])
eq(o1.obj, o2)
eq(o1.obj.obj, o3)
self._iterate()
def checkPackAfterUndoDeletion(self):
db = DB(self._storage)
cn = db.open()
root = cn.root()
pack_times = []
def set_pack_time():
pack_times.append(time.time())
snooze()
root["key0"] = MinPO(0)
root["key1"] = MinPO(1)
root["key2"] = MinPO(2)
txn = get_transaction()
txn.note("create 3 keys")
txn.commit()
set_pack_time()
del root["key1"]
txn = get_transaction()
txn.note("delete 1 key")
txn.commit()
set_pack_time()
root._p_deactivate()
cn.sync()
self.assert_(listeq(root.keys(), ["key0", "key2"]))
L = db.undoInfo()
db.undo(L[0]["id"])
txn = get_transaction()
txn.note("undo deletion")
txn.commit()
set_pack_time()
root._p_deactivate()
cn.sync()
self.assert_(listeq(root.keys(), ["key0", "key1", "key2"]))
for t in pack_times:
self._storage.pack(t, referencesf)
root._p_deactivate()
cn.sync()
self.assert_(listeq(root.keys(), ["key0", "key1", "key2"]))
for i in range(3):
obj = root["key%d" % i]
self.assertEqual(obj.value, i)
root.items()
self._inter_pack_pause()
def checkPackAfterUndoManyTimes(self):
db = DB(self._storage)
cn = db.open()
rt = cn.root()
rt["test"] = MinPO(1)
get_transaction().commit()
rt["test2"] = MinPO(2)
get_transaction().commit()
rt["test"] = MinPO(3)
txn = get_transaction()
txn.note("root of undo")
txn.commit()
packtimes = []
for i in range(10):
L = db.undoInfo()
db.undo(L[0]["id"])
txn = get_transaction()
txn.note("undo %d" % i)
txn.commit()
rt._p_deactivate()
cn.sync()
self.assertEqual(rt["test"].value, i % 2 and 3 or 1)
self.assertEqual(rt["test2"].value, 2)
packtimes.append(time.time())
snooze()
for t in packtimes:
self._storage.pack(t, referencesf)
cn.sync()
cn._cache.clear()
# The last undo set the value to 3 and pack should
# never change that.
self.assertEqual(rt["test"].value, 3)
self.assertEqual(rt["test2"].value, 2)
self._inter_pack_pause()
def _inter_pack_pause(self):
# DirectoryStorage needs a pause between packs,
# most other storages dont.
pass
def checkTransactionalUndoIterator(self):
# check that data_txn set in iterator makes sense
if not hasattr(self._storage, "iterator"):
return
s = self._storage
BATCHES = 4
OBJECTS = 4
orig = []
for i in range(BATCHES):
t = Transaction()
tid = p64(i + 1)
s.tpc_begin(t, tid)
for j in range(OBJECTS):
oid = s.new_oid()
obj = MinPO(i * OBJECTS + j)
revid = s.store(oid, None, zodb_pickle(obj), '', t)
orig.append((tid, oid, revid))
s.tpc_vote(t)
s.tpc_finish(t)
i = 0
for tid, oid, revid in orig:
self._dostore(oid, revid=revid, data=MinPO(revid),
description="update %s" % i)
# Undo the OBJECTS transactions that modified objects created
# in the ith original transaction.
def undo(i):
info = s.undoInfo()
t = Transaction()
s.tpc_begin(t)
base = i * OBJECTS + i
for j in range(OBJECTS):
tid = info[base + j]['id']
s.transactionalUndo(tid, t)
s.tpc_vote(t)
s.tpc_finish(t)
for i in range(BATCHES):
undo(i)
# There are now (2 + OBJECTS) * BATCHES transactions:
# BATCHES original transactions, followed by
# OBJECTS * BATCHES modifications, followed by
# BATCHES undos
iter = s.iterator()
offset = 0
eq = self.assertEqual
for i in range(BATCHES):
txn = iter[offset]
offset += 1
tid = p64(i + 1)
eq(txn.tid, tid)
L1 = [(rec.oid, rec.serial, rec.data_txn) for rec in txn]
L2 = [(oid, revid, None) for _tid, oid, revid in orig
if _tid == tid]
eq(L1, L2)
for i in range(BATCHES * OBJECTS):
txn = iter[offset]
offset += 1
eq(len([rec for rec in txn if rec.data_txn is None]), 1)
for i in range(BATCHES):
txn = iter[offset]
offset += 1
# The undos are performed in reverse order.
otid = p64(BATCHES - i)
L1 = [(rec.oid, rec.data_txn) for rec in txn]
L2 = [(oid, otid) for _tid, oid, revid in orig
if _tid == otid]
L1.sort()
L2.sort()
eq(L1, L2)
self.assertRaises(IndexError, iter.__getitem__, offset)
def checkUndoLogMetadata(self):
# test that the metadata is correct in the undo log
t = get_transaction()
t.note('t1')
t.setExtendedInfo('k2','this is transaction metadata')
t.setUser('u3',path='p3')
db = DB(self._storage)
conn = db.open()
root = conn.root()
o1 = C()
root['obj'] = o1
txn = get_transaction()
txn.commit()
l = self._storage.undoLog()
self.assertEqual(len(l),2)
d = l[0]
self.assertEqual(d['description'],'t1')
self.assertEqual(d['k2'],'this is transaction metadata')
self.assertEqual(d['user_name'],'p3 u3')
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/TransactionalUndoVersionAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from __future__ import nested_scopes
# Check interactions between transactionalUndo() and versions. Any storage
# that supports both transactionalUndo() and versions must pass these tests.
import time
from ZODB import POSException
from ZODB.referencesf import referencesf
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
class TransactionalUndoVersionStorage:
def _x_dostore(self, *args, **kwargs):
# ugh: backwards compatibilty for ZEO 1.0 which runs these
# tests but has a _dostore() method that does not support the
# description kwarg.
try:
return self._dostore(*args, **kwargs)
except TypeError:
# assume that the type error means we've got a _dostore()
# without the description kwarg
try:
del kwargs['description']
except KeyError:
pass # not expected
return self._dostore(*args, **kwargs)
def _undo(self, tid, oid):
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(len(oids), 1)
self.assertEqual(oids[0], oid)
def checkUndoInVersion(self):
eq = self.assertEqual
unless = self.failUnless
def check_objects(nonversiondata, versiondata):
data, revid = self._storage.load(oid, version)
self.assertEqual(zodb_unpickle(data), MinPO(versiondata))
data, revid = self._storage.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO(nonversiondata))
oid = self._storage.new_oid()
version = 'one'
revid_a = self._dostore(oid, data=MinPO(91))
revid_b = self._dostore(oid, revid=revid_a, data=MinPO(92),
version=version)
revid_c = self._dostore(oid, revid=revid_b, data=MinPO(93),
version=version)
info = self._storage.undoInfo()
self._undo(info[0]['id'], oid)
data, revid = self._storage.load(oid, '')
eq(revid, revid_a)
eq(zodb_unpickle(data), MinPO(91))
data, revid = self._storage.load(oid, version)
unless(revid > revid_b and revid > revid_c)
eq(zodb_unpickle(data), MinPO(92))
# Now commit the version...
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.commitVersion(version, '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
check_objects(92, 92)
# ...and undo the commit
info = self._storage.undoInfo()
self._undo(info[0]['id'], oid)
check_objects(91, 92)
oids = self._abortVersion(version)
assert len(oids) == 1
assert oids[0] == oid
check_objects(91, 91)
# Now undo the abort
info=self._storage.undoInfo()
self._undo(info[0]['id'], oid)
check_objects(91, 92)
def checkUndoCommitVersion(self):
def load_value(oid, version=''):
data, revid = self._storage.load(oid, version)
return zodb_unpickle(data).value
# create a bunch of packable transactions
oid = self._storage.new_oid()
revid = '\000' * 8
for i in range(4):
revid = self._x_dostore(oid, revid, description='packable%d' % i)
pt = time.time()
time.sleep(1)
oid1 = self._storage.new_oid()
version = 'version'
revid1 = self._x_dostore(oid1, data=MinPO(0), description='create1')
revid2 = self._x_dostore(oid1, data=MinPO(1), revid=revid1,
version=version, description='version1')
revid3 = self._x_dostore(oid1, data=MinPO(2), revid=revid2,
version=version, description='version2')
self._x_dostore(description='create2')
t = Transaction()
t.description = 'commit version'
self._storage.tpc_begin(t)
self._storage.commitVersion(version, '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
info = self._storage.undoInfo()
t_id = info[0]['id']
self.assertEqual(load_value(oid1), 2)
self.assertEqual(load_value(oid1, version), 2)
self._storage.pack(pt, referencesf)
t = Transaction()
t.description = 'undo commit version'
self._storage.tpc_begin(t)
self._storage.transactionalUndo(t_id, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(load_value(oid1), 0)
self.assertEqual(load_value(oid1, version), 2)
def checkUndoAbortVersion(self):
def load_value(oid, version=''):
data, revid = self._storage.load(oid, version)
return zodb_unpickle(data).value
# create a bunch of packable transactions
oid = self._storage.new_oid()
revid = '\000' * 8
for i in range(3):
revid = self._x_dostore(oid, revid, description='packable%d' % i)
pt = time.time()
time.sleep(1)
oid1 = self._storage.new_oid()
version = 'version'
revid1 = self._x_dostore(oid1, data=MinPO(0), description='create1')
revid2 = self._x_dostore(oid1, data=MinPO(1), revid=revid1,
version=version, description='version1')
revid3 = self._x_dostore(oid1, data=MinPO(2), revid=revid2,
version=version, description='version2')
self._x_dostore(description='create2')
t = Transaction()
t.description = 'abort version'
self._storage.tpc_begin(t)
self._storage.abortVersion(version, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
info = self._storage.undoInfo()
t_id = info[0]['id']
self.assertEqual(load_value(oid1), 0)
# after abort, we should see non-version data
self.assertEqual(load_value(oid1, version), 0)
t = Transaction()
t.description = 'undo abort version'
self._storage.tpc_begin(t)
self._storage.transactionalUndo(t_id, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(load_value(oid1), 0)
# t undo will re-create the version
self.assertEqual(load_value(oid1, version), 2)
info = self._storage.undoInfo()
t_id = info[0]['id']
self._storage.pack(pt, referencesf)
t = Transaction()
t.description = 'undo undo'
self._storage.tpc_begin(t)
self._storage.transactionalUndo(t_id, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# undo of undo will put as back where we started
self.assertEqual(load_value(oid1), 0)
# after abort, we should see non-version data
self.assertEqual(load_value(oid1, version), 0)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/VersionAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run the version related tests for a storage.
Any storage that supports versions should be able to pass all these tests.
"""
# XXX we should clean this code up to get rid of the #JF# comments.
# They were introduced when Jim reviewed the original version of the
# code. Barry and Jeremy didn't understand versions then.
import time
from ZODB import POSException
from ZODB.referencesf import referencesf
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle, snooze
from ZODB import DB
class VersionStorage:
def checkCommitVersionSerialno(self):
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(12))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(13),
version="version")
oids = self._commitVersion("version", "")
self.assertEqual([oid], oids)
data, revid3 = self._storage.load(oid, "")
# use repr() to avoid getting binary data in a traceback on error
self.assertNotEqual(`revid1`, `revid3`)
self.assertNotEqual(`revid2`, `revid3`)
def checkAbortVersionSerialno(self):
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(12))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(13),
version="version")
oids = self._abortVersion("version")
self.assertEqual([oid], oids)
data, revid3 = self._storage.load(oid, "")
# use repr() to avoid getting binary data in a traceback on error
self.assertEqual(`revid1`, `revid3`)
self.assertNotEqual(`revid2`, `revid3`)
def checkVersionedStoreAndLoad(self):
eq = self.assertEqual
# Store a couple of non-version revisions of the object
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12))
# And now store some new revisions in a version
version = 'test-version'
revid = self._dostore(oid, revid=revid, data=MinPO(13),
version=version)
revid = self._dostore(oid, revid=revid, data=MinPO(14),
version=version)
revid = self._dostore(oid, revid=revid, data=MinPO(15),
version=version)
# Now read back the object in both the non-version and version and
# make sure the values jive.
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(12))
data, vrevid = self._storage.load(oid, version)
eq(zodb_unpickle(data), MinPO(15))
if hasattr(self._storage, 'getSerial'):
s = self._storage.getSerial(oid)
eq(s, max(revid, vrevid))
def checkVersionedLoadErrors(self):
oid = self._storage.new_oid()
version = 'test-version'
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12),
version=version)
# Try to load a bogus oid
self.assertRaises(KeyError,
self._storage.load,
self._storage.new_oid(), '')
# Try to load a bogus version string
#JF# Nope, fall back to non-version
#JF# self.assertRaises(KeyError,
#JF# self._storage.load,
#JF# oid, 'bogus')
data, revid = self._storage.load(oid, 'bogus')
self.assertEqual(zodb_unpickle(data), MinPO(11))
def checkVersionLock(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
version = 'test-version'
revid = self._dostore(oid, revid=revid, data=MinPO(12),
version=version)
self.assertRaises(POSException.VersionLockError,
self._dostore,
oid, revid=revid, data=MinPO(14),
version='another-version')
def checkVersionEmpty(self):
# Before we store anything, these versions ought to be empty
version = 'test-version'
#JF# The empty string is not a valid version. I think that this should
#JF# be an error. Let's punt for now.
#JF# assert self._storage.versionEmpty('')
self.failUnless(self._storage.versionEmpty(version))
# Now store some objects
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12))
revid = self._dostore(oid, revid=revid, data=MinPO(13),
version=version)
revid = self._dostore(oid, revid=revid, data=MinPO(14),
version=version)
# The blank version should not be empty
#JF# The empty string is not a valid version. I think that this should
#JF# be an error. Let's punt for now.
#JF# assert not self._storage.versionEmpty('')
# Neither should 'test-version'
self.failUnless(not self._storage.versionEmpty(version))
# But this non-existant version should be empty
self.failUnless(self._storage.versionEmpty('bogus'))
def checkVersions(self):
unless = self.failUnless
# Store some objects in the non-version
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
oid3 = self._storage.new_oid()
revid1 = self._dostore(oid1, data=MinPO(11))
revid2 = self._dostore(oid2, data=MinPO(12))
revid3 = self._dostore(oid3, data=MinPO(13))
# Now create some new versions
revid1 = self._dostore(oid1, revid=revid1, data=MinPO(14),
version='one')
revid2 = self._dostore(oid2, revid=revid2, data=MinPO(15),
version='two')
revid3 = self._dostore(oid3, revid=revid3, data=MinPO(16),
version='three')
# Ask for the versions
versions = self._storage.versions()
unless('one' in versions)
unless('two' in versions)
unless('three' in versions)
# Now flex the `max' argument
versions = self._storage.versions(1)
self.assertEqual(len(versions), 1)
unless('one' in versions or 'two' in versions or 'three' in versions)
def _setup_version(self, version='test-version'):
# Store some revisions in the non-version
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(49))
revid = self._dostore(oid, revid=revid, data=MinPO(50))
nvrevid = revid = self._dostore(oid, revid=revid, data=MinPO(51))
# Now do some stores in a version
revid = self._dostore(oid, revid=revid, data=MinPO(52),
version=version)
revid = self._dostore(oid, revid=revid, data=MinPO(53),
version=version)
revid = self._dostore(oid, revid=revid, data=MinPO(54),
version=version)
return oid, version
def checkAbortVersion(self):
eq = self.assertEqual
oid, version = self._setup_version()
# XXX Not sure I can write a test for getSerial() in the
# presence of aborted versions, because FileStorage and
# Berkeley storage give a different answer. I think Berkeley
# is right and FS is wrong.
oids = self._abortVersion(version)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(51))
def checkAbortVersionErrors(self):
eq = self.assertEqual
oid, version = self._setup_version()
# Now abort a bogus version
t = Transaction()
self._storage.tpc_begin(t)
#JF# The spec is silent on what happens if you abort or commit
#JF# a non-existent version. FileStorage consideres this a noop.
#JF# We can change the spec, but until we do ....
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.abortVersion,
#JF# 'bogus', t)
# And try to abort the empty version
if (hasattr(self._storage, 'supportsTransactionalUndo')
and self._storage.supportsTransactionalUndo()):
# XXX FileStorage used to be broken on this one
self.assertRaises(POSException.VersionError,
self._storage.abortVersion,
'', t)
# But now we really try to abort the version
oids = self._storage.abortVersion(version, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(51))
def checkCommitVersionErrors(self):
if not (hasattr(self._storage, 'supportsTransactionalUndo')
and self._storage.supportsTransactionalUndo()):
# XXX FileStorage used to be broken on this one
return
eq = self.assertEqual
oid1, version1 = self._setup_version('one')
data, revid1 = self._storage.load(oid1, version1)
eq(zodb_unpickle(data), MinPO(54))
t = Transaction()
self._storage.tpc_begin(t)
try:
self.assertRaises(POSException.VersionCommitError,
self._storage.commitVersion,
'one', 'one', t)
finally:
self._storage.tpc_abort(t)
def checkNewSerialOnCommitVersionToVersion(self):
eq = self.assertEqual
oid, version = self._setup_version()
data, vserial = self._storage.load(oid, version)
data, nserial = self._storage.load(oid, '')
version2 = 'test version 2'
self._commitVersion(version, version2)
data, serial = self._storage.load(oid, version2)
self.failUnless(serial != vserial and serial != nserial,
"New serial, %r, should be different from the old "
"version, %r, and non-version, %r, serials."
% (serial, vserial, nserial))
def checkModifyAfterAbortVersion(self):
eq = self.assertEqual
oid, version = self._setup_version()
self._abortVersion(version)
data, revid = self._storage.load(oid, '')
# And modify it a few times
revid = self._dostore(oid, revid=revid, data=MinPO(52))
revid = self._dostore(oid, revid=revid, data=MinPO(53))
revid = self._dostore(oid, revid=revid, data=MinPO(54))
data, newrevid = self._storage.load(oid, '')
eq(newrevid, revid)
eq(zodb_unpickle(data), MinPO(54))
def checkCommitToNonVersion(self):
eq = self.assertEqual
oid, version = self._setup_version()
data, revid = self._storage.load(oid, version)
eq(zodb_unpickle(data), MinPO(54))
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(51))
self._commitVersion(version, '')
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(54))
def checkCommitToOtherVersion(self):
eq = self.assertEqual
oid1, version1 = self._setup_version('one')
data, revid1 = self._storage.load(oid1, version1)
eq(zodb_unpickle(data), MinPO(54))
oid2, version2 = self._setup_version('two')
data, revid2 = self._storage.load(oid2, version2)
eq(zodb_unpickle(data), MinPO(54))
# make sure we see the non-version data when appropriate
data, revid2 = self._storage.load(oid1, version2)
eq(zodb_unpickle(data), MinPO(51))
data, revid2 = self._storage.load(oid2, version1)
eq(zodb_unpickle(data), MinPO(51))
data, revid2 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(51))
# Okay, now let's commit object1 to version2
oids = self._commitVersion(version1, version2)
eq(len(oids), 1)
eq(oids[0], oid1)
data, revid = self._storage.load(oid1, version2)
eq(zodb_unpickle(data), MinPO(54))
data, revid = self._storage.load(oid2, version2)
eq(zodb_unpickle(data), MinPO(54))
# an object can only exist in one version, so a load from
# version1 should now give the non-version data
data, revid2 = self._storage.load(oid1, version1)
eq(zodb_unpickle(data), MinPO(51))
# as should a version that has never been used
data, revid2 = self._storage.load(oid1, 'bela lugosi')
eq(zodb_unpickle(data), MinPO(51))
def checkAbortOneVersionCommitTheOther(self):
eq = self.assertEqual
oid1, version1 = self._setup_version('one')
data, revid1 = self._storage.load(oid1, version1)
eq(zodb_unpickle(data), MinPO(54))
oid2, version2 = self._setup_version('two')
data, revid2 = self._storage.load(oid2, version2)
eq(zodb_unpickle(data), MinPO(54))
# Let's make sure we can't get object1 in version2
data, revid2 = self._storage.load(oid1, version2)
eq(zodb_unpickle(data), MinPO(51))
oids = self._abortVersion(version1)
eq(len(oids), 1)
eq(oids[0], oid1)
data, revid = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(51))
#JF# Ditto
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.load, oid1, version1)
data, revid = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(51))
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.load, oid1, version2)
data, revid = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(51))
data, revid = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(51))
data, revid = self._storage.load(oid2, version2)
eq(zodb_unpickle(data), MinPO(54))
# Okay, now let's commit version2 back to the trunk
oids = self._commitVersion(version2, '')
eq(len(oids), 1)
eq(oids[0], oid2)
data, revid = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(51))
# But the trunk should be up to date now
data, revid = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(54))
data, revid = self._storage.load(oid2, version2)
eq(zodb_unpickle(data), MinPO(54))
#JF# To do a test like you want, you have to add the data in a version
oid = self._storage.new_oid()
revid = self._dostore(oid, revid=revid, data=MinPO(54), version='one')
self.assertRaises(KeyError,
self._storage.load, oid, '')
self.assertRaises(KeyError,
self._storage.load, oid, 'two')
def checkCreateObjectInVersionWithAbort(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=21, version="one")
revid = self._dostore(oid, revid=revid, data=23, version='one')
revid = self._dostore(oid, revid=revid, data=34, version='one')
# Now abort the version and the creation
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.abortVersion('one', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(oids, [oid])
def checkPackVersions(self):
db = DB(self._storage)
cn = db.open(version="testversion")
root = cn.root()
obj = root["obj"] = MinPO("obj")
root["obj2"] = MinPO("obj2")
txn = get_transaction()
txn.note("create 2 objs in version")
txn.commit()
obj.value = "77"
txn = get_transaction()
txn.note("modify obj in version")
txn.commit()
# undo the modification to generate a mix of backpointers
# and versions for pack to chase
info = db.undoInfo()
db.undo(info[0]["id"])
txn = get_transaction()
txn.note("undo modification")
txn.commit()
snooze()
self._storage.pack(time.time(), referencesf)
db.commitVersion("testversion")
txn = get_transaction()
txn.note("commit version")
txn.commit()
cn = db.open()
root = cn.root()
root["obj"] = "no version"
txn = get_transaction()
txn.note("modify obj")
txn.commit()
self._storage.pack(time.time(), referencesf)
def checkPackVersionsInPast(self):
db = DB(self._storage)
cn = db.open(version="testversion")
root = cn.root()
obj = root["obj"] = MinPO("obj")
root["obj2"] = MinPO("obj2")
txn = get_transaction()
txn.note("create 2 objs in version")
txn.commit()
obj.value = "77"
txn = get_transaction()
txn.note("modify obj in version")
txn.commit()
t0 = time.time()
snooze()
# undo the modification to generate a mix of backpointers
# and versions for pack to chase
info = db.undoInfo()
db.undo(info[0]["id"])
txn = get_transaction()
txn.note("undo modification")
txn.commit()
self._storage.pack(t0, referencesf)
db.commitVersion("testversion")
txn = get_transaction()
txn.note("commit version")
txn.commit()
cn = db.open()
root = cn.root()
root["obj"] = "no version"
txn = get_transaction()
txn.note("modify obj")
txn.commit()
self._storage.pack(time.time(), referencesf)
def checkPackVersionReachable(self):
db = DB(self._storage)
cn = db.open()
root = cn.root()
names = "a", "b", "c"
for name in names:
root[name] = MinPO(name)
get_transaction().commit()
for name in names:
cn2 = db.open(version=name)
rt2 = cn2.root()
obj = rt2[name]
obj.value = MinPO("version")
get_transaction().commit()
cn2.close()
root["d"] = MinPO("d")
get_transaction().commit()
snooze()
self._storage.pack(time.time(), referencesf)
cn.sync()
cn._cache.clear()
# make sure all the non-version data is there
for name, obj in root.items():
self.assertEqual(name, obj.value)
# make sure all the version-data is there,
# and create a new revision in the version
for name in names:
cn2 = db.open(version=name)
rt2 = cn2.root()
obj = rt2[name].value
self.assertEqual(obj.value, "version")
obj.value = "still version"
get_transaction().commit()
cn2.close()
db.abortVersion("b")
txn = get_transaction()
txn.note("abort version b")
txn.commit()
t = time.time()
snooze()
L = db.undoInfo()
db.undo(L[0]["id"])
txn = get_transaction()
txn.note("undo abort")
txn.commit()
self._storage.pack(t, referencesf)
cn2 = db.open(version="b")
rt2 = cn2.root()
self.assertEqual(rt2["b"].value.value, "still version")
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testDemoAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import ZODB.DemoStorage
import os, unittest
from ZODB.tests import StorageTestBase, BasicStorage, \
VersionStorage, Synchronization
class DemoStorageTests(StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
VersionStorage.VersionStorage,
Synchronization.SynchronizedStorage,
):
def setUp(self):
self._storage = ZODB.DemoStorage.DemoStorage()
def tearDown(self):
self._storage.close()
def checkOversizeNote(self):
# This base class test checks for the common case where a storage
# doesnt support huge transaction metadata. This storage doesnt
# have this limit, so we inhibit this test here.
pass
def test_suite():
suite = unittest.makeSuite(DemoStorageTests, 'check')
return suite
if __name__ == "__main__":
loader = unittest.TestLoader()
loader.testMethodPrefix = "check"
unittest.main(testLoader=loader)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testFileAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from __future__ import nested_scopes
import ZODB.FileStorage
import sys, os, unittest
import errno
import filecmp
import StringIO
from ZODB.Transaction import Transaction
from ZODB import POSException
from ZODB.fsrecover import recover
from ZODB.tests import StorageTestBase, BasicStorage, \
TransactionalUndoStorage, VersionStorage, \
TransactionalUndoVersionStorage, PackableStorage, \
Synchronization, ConflictResolution, HistoryStorage, \
IteratorStorage, Corruption, RevisionStorage, PersistentStorage, \
MTStorage, ReadOnlyStorage, RecoveryStorage
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle
class FileStorageTests(
StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
TransactionalUndoStorage.TransactionalUndoStorage,
RevisionStorage.RevisionStorage,
VersionStorage.VersionStorage,
TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
PackableStorage.PackableStorage,
Synchronization.SynchronizedStorage,
ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage,
HistoryStorage.HistoryStorage,
IteratorStorage.IteratorStorage,
IteratorStorage.ExtendedIteratorStorage,
PersistentStorage.PersistentStorage,
MTStorage.MTStorage,
ReadOnlyStorage.ReadOnlyStorage
):
def open(self, **kwargs):
self._storage = ZODB.FileStorage.FileStorage('FileStorageTests.fs',
**kwargs)
def setUp(self):
self.open(create=1)
def tearDown(self):
self._storage.close()
StorageTestBase.removefs("FileStorageTests.fs")
def checkLongMetadata(self):
s = "X" * 75000
try:
self._dostore(user=s)
except POSException.StorageError:
pass
else:
self.fail("expect long user field to raise error")
try:
self._dostore(description=s)
except POSException.StorageError:
pass
else:
self.fail("expect long user field to raise error")
def check_use_fsIndex(self):
from ZODB.fsIndex import fsIndex
self.assertEqual(self._storage._index.__class__, fsIndex)
# XXX We could really use some tests for sanity checking
def check_conversion_to_fsIndex_not_if_readonly(self):
self.tearDown()
class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self):
return {}, {}, {}, {}, {}, {}, {}
from ZODB.fsIndex import fsIndex
# Hack FileStorage to create dictionary indexes
self._storage = OldFileStorage('FileStorageTests.fs')
self.assertEqual(type(self._storage._index), type({}))
for i in range(10):
self._dostore()
# Should save the index
self._storage.close()
self._storage = ZODB.FileStorage.FileStorage(
'FileStorageTests.fs', read_only=1)
self.assertEqual(type(self._storage._index), type({}))
def check_conversion_to_fsIndex(self):
self.tearDown()
class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self):
return {}, {}, {}, {}, {}, {}, {}
from ZODB.fsIndex import fsIndex
# Hack FileStorage to create dictionary indexes
self._storage = OldFileStorage('FileStorageTests.fs')
self.assertEqual(type(self._storage._index), type({}))
for i in range(10):
self._dostore()
oldindex = self._storage._index.copy()
# Should save the index
self._storage.close()
self._storage = ZODB.FileStorage.FileStorage('FileStorageTests.fs')
self.assertEqual(self._storage._index.__class__, fsIndex)
self.failUnless(self._storage._used_index)
index = {}
for k, v in self._storage._index.items():
index[k] = v
self.assertEqual(index, oldindex)
def check_save_after_load_with_no_index(self):
for i in range(10):
self._dostore()
self._storage.close()
os.remove('FileStorageTests.fs.index')
self.open()
self.assertEqual(self._storage._saved, 1)
# This would make the unit tests too slow
# check_save_after_load_that_worked_hard(self)
def check_periodic_save_index(self):
# Check the basic algorithm
oldsaved = self._storage._saved
self._storage._records_before_save = 10
for i in range(4):
self._dostore()
self.assertEqual(self._storage._saved, oldsaved)
self._dostore()
self.assertEqual(self._storage._saved, oldsaved+1)
# Now make sure the parameter changes as we get bigger
for i in range(20):
self._dostore()
self.failUnless(self._storage._records_before_save > 20)
# There are a bunch of tests that the current pack() implementation
# does not past. We need to fix pack(), but don't want tests to
# fail until then.
def checkPackVersionsInPast(self):
pass
def checkPackAfterUndoDeletion(self):
pass
class FileStorageRecoveryTest(
StorageTestBase.StorageTestBase,
RecoveryStorage.RecoveryStorage,
):
def setUp(self):
StorageTestBase.removefs("Source.fs")
StorageTestBase.removefs("Dest.fs")
self._storage = ZODB.FileStorage.FileStorage('Source.fs')
self._dst = ZODB.FileStorage.FileStorage('Dest.fs')
def tearDown(self):
self._storage.close()
self._dst.close()
StorageTestBase.removefs("Source.fs")
StorageTestBase.removefs("Dest.fs")
def new_dest(self):
StorageTestBase.removefs('Dest.fs')
return ZODB.FileStorage.FileStorage('Dest.fs')
def test_suite():
suite = unittest.makeSuite(FileStorageTests, 'check')
suite2 = unittest.makeSuite(Corruption.FileStorageCorruptTests, 'check')
suite3 = unittest.makeSuite(FileStorageRecoveryTest, 'check')
suite.addTest(suite2)
suite.addTest(suite3)
return suite
if __name__=='__main__':
unittest.main()
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testMappingAcquisition.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import ZODB.MappingStorage
import os, unittest
from ZODB.tests import StorageTestBase, BasicStorage, Synchronization
class MappingStorageTests(StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
Synchronization.SynchronizedStorage,
):
def setUp(self):
self._storage = ZODB.MappingStorage.MappingStorage()
def tearDown(self):
self._storage.close()
def checkOversizeNote(self):
# This base class test checks for the common case where a storage
# doesnt support huge transaction metadata. This storage doesnt
# have this limit, so we inhibit this test here.
pass
def test_suite():
suite = unittest.makeSuite(MappingStorageTests, 'check')
return suite
if __name__ == "__main__":
loader = unittest.TestLoader()
loader.testMethodPrefix = "check"
unittest.main(testLoader=loader)
More information about the Zope3-Checkins
mailing list