[Zope3-checkins] CVS: Zope3/src/zope/app/advanced/acquisition/tests
- BasicStorage.py:1.1 ConflictResolution.py:1.1
Corruption.py:1.1 HistoryStorage.py:1.1
IteratorStorage.py:1.1 LocalStorage.py:1.1 MTStorage.py:1.1
MinPO.py:1.1 PackableStorage.py:1.1 PersistentStorage.py:1.1
ReadOnlyStorage.py:1.1 RecoveryStorage.py:1.1
RevisionStorage.py:1.1 StorageTestBase.py:1.1
Synchronization.py:1.1 TransactionalUndoStorage.py:1.1
TransactionalUndoVersionStorage.py:1.1 VersionStorage.py:1.1
__init__.py:1.1 dangle.py:1.1 speed.py:1.1
testActivityMonitor.py:1.1 testCache.py:1.1 testConfig.py:1.1
testDB.py:1.1 testDemoStorage.py:1.1 testFileStorage.py:1.1
testMappingStorage.py:1.1 testPersistentList.py:1.1
testPersistentMapping.py:1.1 testRecover.py:1.1
testTimeStamp.py:1.1 testTransaction.py:1.1 testUtils.py:1.1
testZODB.py:1.1 testfsIndex.py:1.1
Sidnei da Silva
sidnei at awkly.org
Thu Apr 1 13:26:46 EST 2004
Update of /cvs-repository/Zope3/src/zope/app/advanced/acquisition/tests
In directory cvs.zope.org:/tmp/cvs-serv23146/acquisition/tests
Added Files:
BasicStorage.py ConflictResolution.py Corruption.py
HistoryStorage.py IteratorStorage.py LocalStorage.py
MTStorage.py MinPO.py PackableStorage.py PersistentStorage.py
ReadOnlyStorage.py RecoveryStorage.py RevisionStorage.py
StorageTestBase.py Synchronization.py
TransactionalUndoStorage.py TransactionalUndoVersionStorage.py
VersionStorage.py __init__.py dangle.py speed.py
testActivityMonitor.py testCache.py testConfig.py testDB.py
testDemoStorage.py testFileStorage.py testMappingStorage.py
testPersistentList.py testPersistentMapping.py testRecover.py
testTimeStamp.py testTransaction.py testUtils.py testZODB.py
testfsIndex.py
Log Message:
Make a query helper that uses Implicit acquisition, as well as making the acquisition package be importable. With tests.
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/BasicStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run the basic tests for a storage as described in the official storage API
The most complete and most out-of-date description of the interface is:
http://www.zope.org/Documentation/Developer/Models/ZODB/ZODB_Architecture_Storage_Interface_Info.html
All storages should be able to pass these tests.
"""
from ZODB.Transaction import Transaction
from ZODB import POSException
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase \
import zodb_unpickle, zodb_pickle, handle_serials
ZERO = '\0'*8
class BasicStorage:
def checkBasics(self):
t = Transaction()
self._storage.tpc_begin(t)
# This should simply return
self._storage.tpc_begin(t)
# Aborting is easy
self._storage.tpc_abort(t)
# Test a few expected exceptions when we're doing operations giving a
# different Transaction object than the one we've begun on.
self._storage.tpc_begin(t)
self.assertRaises(
POSException.StorageTransactionError,
self._storage.store,
0, 0, 0, 0, Transaction())
try:
self._storage.abortVersion('dummy', Transaction())
except (POSException.StorageTransactionError,
POSException.VersionCommitError):
pass # test passed ;)
else:
assert 0, "Should have failed, invalid transaction."
try:
self._storage.commitVersion('dummy', 'dummer', Transaction())
except (POSException.StorageTransactionError,
POSException.VersionCommitError):
pass # test passed ;)
else:
assert 0, "Should have failed, invalid transaction."
self.assertRaises(
POSException.StorageTransactionError,
self._storage.store,
0, 1, 2, 3, Transaction())
self._storage.tpc_abort(t)
def checkSerialIsNoneForInitialRevision(self):
eq = self.assertEqual
oid = self._storage.new_oid()
txn = Transaction()
self._storage.tpc_begin(txn)
# Use None for serial. Don't use _dostore() here because that coerces
# serial=None to serial=ZERO.
r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)),
'', txn)
r2 = self._storage.tpc_vote(txn)
self._storage.tpc_finish(txn)
newrevid = handle_serials(oid, r1, r2)
data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data)
eq(value, MinPO(11))
eq(revid, newrevid)
def checkNonVersionStore(self, oid=None, revid=None, version=None):
revid = ZERO
newrevid = self._dostore(revid=revid)
# Finish the transaction.
self.assertNotEqual(newrevid, revid)
def checkNonVersionStoreAndLoad(self):
eq = self.assertEqual
oid = self._storage.new_oid()
self._dostore(oid=oid, data=MinPO(7))
data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data)
eq(value, MinPO(7))
# Now do a bunch of updates to an object
for i in range(13, 22):
revid = self._dostore(oid, revid=revid, data=MinPO(i))
# Now get the latest revision of the object
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(21))
def checkNonVersionModifiedInVersion(self):
oid = self._storage.new_oid()
self._dostore(oid=oid)
self.assertEqual(self._storage.modifiedInVersion(oid), '')
def checkConflicts(self):
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
self.assertRaises(POSException.ConflictError,
self._dostore,
oid, revid=revid1, data=MinPO(13))
def checkWriteAfterAbort(self):
oid = self._storage.new_oid()
t = Transaction()
self._storage.tpc_begin(t)
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
# Now abort this transaction
self._storage.tpc_abort(t)
# Now start all over again
oid = self._storage.new_oid()
self._dostore(oid=oid, data=MinPO(6))
def checkAbortAfterVote(self):
oid1 = self._storage.new_oid()
revid1 = self._dostore(oid=oid1, data=MinPO(-2))
oid = self._storage.new_oid()
t = Transaction()
self._storage.tpc_begin(t)
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
# Now abort this transaction
self._storage.tpc_vote(t)
self._storage.tpc_abort(t)
# Now start all over again
oid = self._storage.new_oid()
revid = self._dostore(oid=oid, data=MinPO(6))
for oid, revid in [(oid1, revid1), (oid, revid)]:
data, _revid = self._storage.load(oid, '')
self.assertEqual(revid, _revid)
def checkStoreTwoObjects(self):
noteq = self.assertNotEqual
p31, p32, p51, p52 = map(MinPO, (31, 32, 51, 52))
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
noteq(oid1, oid2)
revid1 = self._dostore(oid1, data=p31)
revid2 = self._dostore(oid2, data=p51)
noteq(revid1, revid2)
revid3 = self._dostore(oid1, revid=revid1, data=p32)
revid4 = self._dostore(oid2, revid=revid2, data=p52)
noteq(revid3, revid4)
def checkGetSerial(self):
if not hasattr(self._storage, 'getSerial'):
return
eq = self.assertEqual
p41, p42 = map(MinPO, (41, 42))
oid = self._storage.new_oid()
self.assertRaises(KeyError, self._storage.getSerial, oid)
# Now store a revision
revid1 = self._dostore(oid, data=p41)
eq(revid1, self._storage.getSerial(oid))
# And another one
revid2 = self._dostore(oid, revid=revid1, data=p42)
eq(revid2, self._storage.getSerial(oid))
def checkTwoArgBegin(self):
# XXX how standard is three-argument tpc_begin()?
t = Transaction()
tid = '\0\0\0\0\0psu'
self._storage.tpc_begin(t, tid)
oid = self._storage.new_oid()
data = zodb_pickle(MinPO(8))
self._storage.store(oid, None, data, '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
def checkLen(self):
# len(storage) reports the number of objects.
# check it is zero when empty
self.assertEqual(len(self._storage),0)
# check it is correct when the storage contains two object.
# len may also be zero, for storages that do not keep track
# of this number
self._dostore(data=MinPO(22))
self._dostore(data=MinPO(23))
self.assert_(len(self._storage) in [0,2])
def checkGetSize(self):
self._dostore(data=MinPO(25))
size = self._storage.getSize()
# The storage API doesn't make any claims about what size
# means except that it ought to be printable.
str(size)
def checkNote(self):
oid = self._storage.new_oid()
t = Transaction()
self._storage.tpc_begin(t)
t.note('this is a test')
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
def checkGetExtensionMethods(self):
m = self._storage.getExtensionMethods()
self.assertEqual(type(m),type({}))
for k,v in m.items():
self.assertEqual(v,None)
self.assert_(callable(getattr(self._storage,k)))
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/ConflictResolution.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for application-level conflict resolution."""
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError, UndoError
from Persistence import Persistent
from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
import sys
import types
from cPickle import Pickler, Unpickler
from cStringIO import StringIO
class PCounter(Persistent):
_value = 0
def __repr__(self):
return "<PCounter %d>" % self._value
def inc(self):
self._value = self._value + 1
def _p_resolveConflict(self, oldState, savedState, newState):
savedDiff = savedState['_value'] - oldState['_value']
newDiff = newState['_value'] - oldState['_value']
oldState['_value'] = oldState['_value'] + savedDiff + newDiff
return oldState
# XXX What if _p_resolveConflict _thinks_ it resolved the
# conflict, but did something wrong?
class PCounter2(PCounter):
def _p_resolveConflict(self, oldState, savedState, newState):
raise ConflictError
class PCounter3(PCounter):
def _p_resolveConflict(self, oldState, savedState, newState):
raise AttributeError, "no attribute (testing conflict resolution)"
class PCounter4(PCounter):
def _p_resolveConflict(self, oldState, savedState):
raise RuntimeError, "Can't get here; not enough args"
class ConflictResolvingStorage:
def checkResolve(self):
obj = PCounter()
obj.inc()
oid = self._storage.new_oid()
revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))
obj.inc()
obj.inc()
# The effect of committing two transactions with the same
# pickle is to commit two different transactions relative to
# revid1 that add two to _value.
revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
revid3 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
data, serialno = self._storage.load(oid, '')
inst = zodb_unpickle(data)
self.assertEqual(inst._value, 5)
def checkUnresolvable(self):
obj = PCounter2()
obj.inc()
oid = self._storage.new_oid()
revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))
obj.inc()
obj.inc()
# The effect of committing two transactions with the same
# pickle is to commit two different transactions relative to
# revid1 that add two to _value.
revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
self.assertRaises(ConflictError,
self._dostoreNP,
oid, revid=revid1, data=zodb_pickle(obj))
def checkZClassesArentResolved(self):
from ZODB.ConflictResolution import bad_class
dummy_class_tuple = ('*foobar', ())
assert bad_class(dummy_class_tuple) == 1
def checkBuggyResolve1(self):
obj = PCounter3()
obj.inc()
oid = self._storage.new_oid()
revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))
obj.inc()
obj.inc()
# The effect of committing two transactions with the same
# pickle is to commit two different transactions relative to
# revid1 that add two to _value.
revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
self.assertRaises(ConflictError,
self._dostoreNP,
oid, revid=revid1, data=zodb_pickle(obj))
def checkBuggyResolve2(self):
obj = PCounter4()
obj.inc()
oid = self._storage.new_oid()
revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))
obj.inc()
obj.inc()
# The effect of committing two transactions with the same
# pickle is to commit two different transactions relative to
# revid1 that add two to _value.
revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
self.assertRaises(ConflictError,
self._dostoreNP,
oid, revid=revid1, data=zodb_pickle(obj))
class ConflictResolvingTransUndoStorage:
def checkUndoConflictResolution(self):
# This test is based on checkNotUndoable in the
# TransactionalUndoStorage test suite. Except here, conflict
# resolution should allow us to undo the transaction anyway.
obj = PCounter()
obj.inc()
oid = self._storage.new_oid()
revid_a = self._dostore(oid, data=obj)
obj.inc()
revid_b = self._dostore(oid, revid=revid_a, data=obj)
obj.inc()
revid_c = self._dostore(oid, revid=revid_b, data=obj)
# Start the undo
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
self._storage.tpc_begin(t)
self._storage.transactionalUndo(tid, t)
self._storage.tpc_finish(t)
def checkUndoUnresolvable(self):
# This test is based on checkNotUndoable in the
# TransactionalUndoStorage test suite. Except here, conflict
# resolution should allow us to undo the transaction anyway.
obj = PCounter2()
obj.inc()
oid = self._storage.new_oid()
revid_a = self._dostore(oid, data=obj)
obj.inc()
revid_b = self._dostore(oid, revid=revid_a, data=obj)
obj.inc()
revid_c = self._dostore(oid, revid=revid_b, data=obj)
# Start the undo
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
self._storage.tpc_begin(t)
self.assertRaises(UndoError, self._storage.transactionalUndo,
tid, t)
self._storage.tpc_abort(t)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/Corruption.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Do some minimal tests of data corruption"""
import os
import random
import stat
import tempfile
import unittest
import ZODB, ZODB.FileStorage
from StorageTestBase import StorageTestBase, removefs
class FileStorageCorruptTests(StorageTestBase):
def setUp(self):
self.path = tempfile.mktemp()
self._storage = ZODB.FileStorage.FileStorage(self.path, create=1)
def tearDown(self):
self._storage.close()
removefs(self.path)
def _do_stores(self):
oids = []
for i in range(5):
oid = self._storage.new_oid()
revid = self._dostore(oid)
oids.append((oid, revid))
return oids
def _check_stores(self, oids):
for oid, revid in oids:
data, s_revid = self._storage.load(oid, '')
self.assertEqual(s_revid, revid)
def checkTruncatedIndex(self):
oids = self._do_stores()
self._close()
# truncation the index file
path = self.path + '.index'
self.failUnless(os.path.exists(path))
f = open(path, 'r+')
f.seek(0, 2)
size = f.tell()
f.seek(size / 2)
f.truncate()
f.close()
self._storage = ZODB.FileStorage.FileStorage(self.path)
self._check_stores(oids)
def checkCorruptedIndex(self):
oids = self._do_stores()
self._close()
# truncation the index file
path = self.path + '.index'
self.failUnless(os.path.exists(path))
size = os.stat(path)[stat.ST_SIZE]
f = open(path, 'r+')
while f.tell() < size:
f.seek(random.randrange(1, size / 10), 1)
f.write('\000')
f.close()
self._storage = ZODB.FileStorage.FileStorage(self.path)
self._check_stores(oids)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/HistoryStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run the history() related tests for a storage.
Any storage that supports the history() method should be able to pass
all these tests.
"""
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
class HistoryStorage:
def checkSimpleHistory(self):
eq = self.assertEqual
# Store a couple of non-version revisions of the object
oid = self._storage.new_oid()
self.assertRaises(KeyError,self._storage.history,oid)
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now get various snapshots of the object's history
h = self._storage.history(oid, size=1)
eq(len(h), 1)
d = h[0]
eq(d['serial'], revid3)
eq(d['version'], '')
# Try to get 2 historical revisions
h = self._storage.history(oid, size=2)
eq(len(h), 2)
d = h[0]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid2)
eq(d['version'], '')
# Try to get all 3 historical revisions
h = self._storage.history(oid, size=3)
eq(len(h), 3)
d = h[0]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid2)
eq(d['version'], '')
d = h[2]
eq(d['serial'], revid1)
eq(d['version'], '')
# There should be no more than 3 revisions
h = self._storage.history(oid, size=4)
eq(len(h), 3)
d = h[0]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid2)
eq(d['version'], '')
d = h[2]
eq(d['serial'], revid1)
eq(d['version'], '')
def checkVersionHistory(self):
if not self._storage.supportsVersions():
return
eq = self.assertEqual
# Store a couple of non-version revisions
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now store some new revisions in a version
version = 'test-version'
revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
version=version)
revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
version=version)
revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
version=version)
# Now, try to get the six historical revisions (first three are in
# 'test-version', followed by the non-version revisions).
h = self._storage.history(oid, version, 100)
eq(len(h), 6)
d = h[0]
eq(d['serial'], revid6)
eq(d['version'], version)
d = h[1]
eq(d['serial'], revid5)
eq(d['version'], version)
d = h[2]
eq(d['serial'], revid4)
eq(d['version'], version)
d = h[3]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[4]
eq(d['serial'], revid2)
eq(d['version'], '')
d = h[5]
eq(d['serial'], revid1)
eq(d['version'], '')
def checkHistoryAfterVersionCommit(self):
if not self._storage.supportsVersions():
return
eq = self.assertEqual
# Store a couple of non-version revisions
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now store some new revisions in a version
version = 'test-version'
revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
version=version)
revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
version=version)
revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
version=version)
# Now commit the version
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.commitVersion(version, '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# After consultation with Jim, we agreed that the semantics of
# revision id's after a version commit is that the committed object
# gets a new serial number (a.k.a. revision id). Note that
# FileStorage is broken here; the serial number in the post-commit
# non-version revision will be the same as the serial number of the
# previous in-version revision.
#
# BAW: Using load() is the only way to get the serial number of the
# current revision of the object. But at least this works for both
# broken and working storages.
ign, revid7 = self._storage.load(oid, '')
# Now, try to get the six historical revisions (first three are in
# 'test-version', followed by the non-version revisions).
h = self._storage.history(oid, version, 100)
eq(len(h), 7)
d = h[0]
eq(d['serial'], revid7)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid6)
eq(d['version'], version)
d = h[2]
eq(d['serial'], revid5)
eq(d['version'], version)
d = h[3]
eq(d['serial'], revid4)
eq(d['version'], version)
d = h[4]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[5]
eq(d['serial'], revid2)
eq(d['version'], '')
d = h[6]
eq(d['serial'], revid1)
eq(d['version'], '')
def checkHistoryAfterVersionAbort(self):
if not self._storage.supportsVersions():
return
eq = self.assertEqual
# Store a couple of non-version revisions
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now store some new revisions in a version
version = 'test-version'
revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
version=version)
revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
version=version)
revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
version=version)
# Now commit the version
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.abortVersion(version, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# After consultation with Jim, we agreed that the semantics of
# revision id's after a version commit is that the committed object
# gets a new serial number (a.k.a. revision id). Note that
# FileStorage is broken here; the serial number in the post-commit
# non-version revision will be the same as the serial number of the
# previous in-version revision.
#
# BAW: Using load() is the only way to get the serial number of the
# current revision of the object. But at least this works for both
# broken and working storages.
ign, revid7 = self._storage.load(oid, '')
# Now, try to get the six historical revisions (first three are in
# 'test-version', followed by the non-version revisions).
h = self._storage.history(oid, version, 100)
eq(len(h), 7)
d = h[0]
eq(d['serial'], revid7)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid6)
eq(d['version'], version)
d = h[2]
eq(d['serial'], revid5)
eq(d['version'], version)
d = h[3]
eq(d['serial'], revid4)
eq(d['version'], version)
d = h[4]
eq(d['serial'], revid3)
eq(d['version'], '')
d = h[5]
eq(d['serial'], revid2)
eq(d['version'], '')
d = h[6]
eq(d['serial'], revid1)
eq(d['version'], '')
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/IteratorStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run tests against the iterator() interface for storages.
Any storage that supports the iterator() method should be able to pass
all these tests.
"""
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
from ZODB.utils import U64, p64
from ZODB.Transaction import Transaction
class IteratorCompare:
def iter_verify(self, txniter, revids, val0):
eq = self.assertEqual
oid = self._oid
val = val0
for reciter, revid in zip(txniter, revids + [None]):
eq(reciter.tid, revid)
for rec in reciter:
eq(rec.oid, oid)
eq(rec.serial, revid)
eq(rec.version, '')
eq(zodb_unpickle(rec.data), MinPO(val))
val = val + 1
eq(val, val0 + len(revids))
txniter.close()
class IteratorStorage(IteratorCompare):
def checkSimpleIteration(self):
# Store a bunch of revisions of a single object
self._oid = oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now iterate over all the transactions and compare carefully
txniter = self._storage.iterator()
self.iter_verify(txniter, [revid1, revid2, revid3], 11)
def checkClose(self):
self._oid = oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
txniter = self._storage.iterator()
txniter.close()
self.assertRaises(IOError, txniter.__getitem__, 0)
def checkVersionIterator(self):
if not self._storage.supportsVersions():
return
self._dostore()
self._dostore(version='abort')
self._dostore()
self._dostore(version='abort')
t = Transaction()
self._storage.tpc_begin(t)
self._storage.abortVersion('abort', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self._dostore(version='commit')
self._dostore()
self._dostore(version='commit')
t = Transaction()
self._storage.tpc_begin(t)
self._storage.commitVersion('commit', '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
txniter = self._storage.iterator()
for trans in txniter:
for data in trans:
pass
def checkUndoZombieNonVersion(self):
if not hasattr(self._storage, 'supportsTransactionalUndo'):
return
if not self._storage.supportsTransactionalUndo():
return
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(94))
# Get the undo information
info = self._storage.undoInfo()
tid = info[0]['id']
# Undo the creation of the object, rendering it a zombie
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# Now attempt to iterator over the storage
iter = self._storage.iterator()
for txn in iter:
for rec in txn:
pass
# The last transaction performed an undo of the transaction that
# created object oid. (As Barry points out, the object is now in the
# George Bailey state.) Assert that the final data record contains
# None in the data attribute.
self.assertEqual(rec.oid, oid)
self.assertEqual(rec.data, None)
def checkTransactionExtensionFromIterator(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(1))
iter = self._storage.iterator()
count = 0
for txn in iter:
self.assertEqual(txn._extension, {})
count +=1
self.assertEqual(count, 1)
def checkIterationIntraTransaction(self):
# XXX try this test with logging enabled. If you see something like
#
# ZODB FS FS21 warn: FileStorageTests.fs truncated, possibly due to
# damaged records at 4
#
# Then the code in FileIterator.next() hasn't yet been fixed.
oid = self._storage.new_oid()
t = Transaction()
data = zodb_pickle(MinPO(0))
try:
self._storage.tpc_begin(t)
self._storage.store(oid, '\0'*8, data, '', t)
self._storage.tpc_vote(t)
# Don't do tpc_finish yet
it = self._storage.iterator()
for x in it:
pass
finally:
self._storage.tpc_finish(t)
class ExtendedIteratorStorage(IteratorCompare):
def checkExtendedIteration(self):
# Store a bunch of revisions of a single object
self._oid = oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
revid4 = self._dostore(oid, revid=revid3, data=MinPO(14))
# Note that the end points are included
# Iterate over all of the transactions with explicit start/stop
txniter = self._storage.iterator(revid1, revid4)
self.iter_verify(txniter, [revid1, revid2, revid3, revid4], 11)
# Iterate over some of the transactions with explicit start
txniter = self._storage.iterator(revid3)
self.iter_verify(txniter, [revid3, revid4], 13)
# Iterate over some of the transactions with explicit stop
txniter = self._storage.iterator(None, revid2)
self.iter_verify(txniter, [revid1, revid2], 11)
# Iterate over some of the transactions with explicit start+stop
txniter = self._storage.iterator(revid2, revid3)
self.iter_verify(txniter, [revid2, revid3], 12)
# Specify an upper bound somewhere in between values
revid3a = p64((U64(revid3) + U64(revid4)) / 2)
txniter = self._storage.iterator(revid2, revid3a)
self.iter_verify(txniter, [revid2, revid3], 12)
# Specify a lower bound somewhere in between values.
# revid2 == revid1+1 is very likely on Windows. Adding 1 before
# dividing ensures that "the midpoint" we compute is strictly larger
# than revid1.
revid1a = p64((U64(revid1) + 1 + U64(revid2)) / 2)
assert revid1 < revid1a
txniter = self._storage.iterator(revid1a, revid3a)
self.iter_verify(txniter, [revid2, revid3], 12)
# Specify an empty range
txniter = self._storage.iterator(revid3, revid2)
self.iter_verify(txniter, [], 13)
# Specify a singleton range
txniter = self._storage.iterator(revid3, revid3)
self.iter_verify(txniter, [revid3], 13)
class IteratorDeepCompare:
def compare(self, storage1, storage2):
eq = self.assertEqual
iter1 = storage1.iterator()
iter2 = storage2.iterator()
for txn1, txn2 in zip(iter1, iter2):
eq(txn1.tid, txn2.tid)
eq(txn1.status, txn2.status)
eq(txn1.user, txn2.user)
eq(txn1.description, txn2.description)
eq(txn1._extension, txn2._extension)
for rec1, rec2 in zip(txn1, txn2):
eq(rec1.oid, rec2.oid)
eq(rec1.serial, rec2.serial)
eq(rec1.version, rec2.version)
eq(rec1.data, rec2.data)
# Make sure there are no more records left in rec1 and rec2,
# meaning they were the same length.
self.assertRaises(IndexError, txn1.next)
self.assertRaises(IndexError, txn2.next)
# Make sure ther are no more records left in txn1 and txn2, meaning
# they were the same length
self.assertRaises(IndexError, iter1.next)
self.assertRaises(IndexError, iter2.next)
iter1.close()
iter2.close()
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/LocalStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
class LocalStorage:
"""A single test that only make sense for local storages.
A local storage is one that doens't use ZEO. The __len__()
implementation for ZEO is inexact.
"""
def checkLen(self):
eq = self.assertEqual
# The length of the database ought to grow by one each time
eq(len(self._storage), 0)
self._dostore()
eq(len(self._storage), 1)
self._dostore()
eq(len(self._storage), 2)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/MTStorage.py ===
import random
import sys
import threading
import time
import ZODB
from PersistentMapping import PersistentMapping
from ZODB.tests.StorageTestBase \
import StorageTestBase, zodb_pickle, zodb_unpickle, handle_serials
from ZODB.tests.MinPO import MinPO
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError
SHORT_DELAY = 0.01
def sort(l):
"Sort a list in place and return it."
l.sort()
return l
class TestThread(threading.Thread):
"""Base class for defining threads that run from unittest.
If the thread exits with an uncaught exception, catch it and
re-raise it when the thread is joined. The re-raise will cause
the test to fail.
The subclass should define a runtest() method instead of a run()
method.
"""
def __init__(self, test):
threading.Thread.__init__(self)
self.test = test
self._fail = None
self._exc_info = None
def run(self):
try:
self.runtest()
except:
self._exc_info = sys.exc_info()
def fail(self, msg=""):
self._test.fail(msg)
def join(self, timeout=None):
threading.Thread.join(self, timeout)
if self._exc_info:
raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
class ZODBClientThread(TestThread):
__super_init = TestThread.__init__
def __init__(self, db, test, commits=10, delay=SHORT_DELAY):
self.__super_init(test)
self.setDaemon(1)
self.db = db
self.test = test
self.commits = commits
self.delay = delay
def runtest(self):
conn = self.db.open()
root = conn.root()
d = self.get_thread_dict(root)
if d is None:
self.test.fail()
else:
for i in range(self.commits):
self.commit(d, i)
self.test.assertEqual(sort(d.keys()), range(self.commits))
def commit(self, d, num):
d[num] = time.time()
time.sleep(self.delay)
get_transaction().commit()
time.sleep(self.delay)
def get_thread_dict(self, root):
name = self.getName()
# arbitrarily limit to 10 re-tries
for i in range(10):
try:
m = PersistentMapping()
root[name] = m
get_transaction().commit()
break
except ConflictError, err:
get_transaction().abort()
root._p_jar.sync()
for i in range(10):
try:
return root.get(name)
except ConflictError:
get_transaction().abort()
class StorageClientThread(TestThread):
__super_init = TestThread.__init__
def __init__(self, storage, test, commits=10, delay=SHORT_DELAY):
self.__super_init(test)
self.storage = storage
self.test = test
self.commits = commits
self.delay = delay
self.oids = {}
def runtest(self):
for i in range(self.commits):
self.dostore(i)
self.check()
def check(self):
for oid, revid in self.oids.items():
data, serial = self.storage.load(oid, '')
self.test.assertEqual(serial, revid)
obj = zodb_unpickle(data)
self.test.assertEqual(obj.value[0], self.getName())
def pause(self):
time.sleep(self.delay)
def oid(self):
oid = self.storage.new_oid()
self.oids[oid] = None
return oid
def dostore(self, i):
data = zodb_pickle(MinPO((self.getName(), i)))
t = Transaction()
oid = self.oid()
self.pause()
self.storage.tpc_begin(t)
self.pause()
# Always create a new object, signified by None for revid
r1 = self.storage.store(oid, None, data, '', t)
self.pause()
r2 = self.storage.tpc_vote(t)
self.pause()
self.storage.tpc_finish(t)
self.pause()
revid = handle_serials(oid, r1, r2)
self.oids[oid] = revid
class ExtStorageClientThread(StorageClientThread):
def runtest(self):
# pick some other storage ops to execute
ops = [getattr(self, meth) for meth in dir(ExtStorageClientThread)
if meth.startswith('do_')]
assert ops, "Didn't find an storage ops in %s" % self.storage
# do a store to guarantee there's at least one oid in self.oids
self.dostore(0)
for i in range(self.commits - 1):
meth = random.choice(ops)
meth()
self.dostore(i)
self.check()
def pick_oid(self):
return random.choice(self.oids.keys())
def do_load(self):
oid = self.pick_oid()
self.storage.load(oid, '')
def do_loadSerial(self):
oid = self.pick_oid()
self.storage.loadSerial(oid, self.oids[oid])
def do_modifiedInVersion(self):
oid = self.pick_oid()
self.storage.modifiedInVersion(oid)
def do_undoLog(self):
self.storage.undoLog(0, -20)
def do_iterator(self):
try:
iter = self.storage.iterator()
except AttributeError:
# XXX It's hard to detect that a ZEO ClientStorage
# doesn't have this method, but does have all the others.
return
for obj in iter:
pass
class MTStorage:
"Test a storage with multiple client threads executing concurrently."
def _checkNThreads(self, n, constructor, *args):
threads = [constructor(*args) for i in range(n)]
for t in threads:
t.start()
for t in threads:
t.join(60)
for t in threads:
self.failIf(t.isAlive(), "thread failed to finish in 60 seconds")
def check2ZODBThreads(self):
db = ZODB.DB(self._storage)
self._checkNThreads(2, ZODBClientThread, db, self)
db.close()
def check7ZODBThreads(self):
db = ZODB.DB(self._storage)
self._checkNThreads(7, ZODBClientThread, db, self)
db.close()
def check2StorageThreads(self):
self._checkNThreads(2, StorageClientThread, self._storage, self)
def check7StorageThreads(self):
self._checkNThreads(7, StorageClientThread, self._storage, self)
def check4ExtStorageThread(self):
self._checkNThreads(4, ExtStorageClientThread, self._storage, self)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/MinPO.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""A minimal persistent object to use for tests"""
from Persistence import Persistent
class MinPO(Persistent):
def __init__(self, value=None):
self.value = value
def __cmp__(self, aMinPO):
return cmp(self.value, aMinPO.value)
def __repr__(self):
return "MinPO(%s)" % self.value
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/PackableStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run some tests relevant for storages that support pack()."""
try:
import cPickle
pickle = cPickle
#import cPickle as pickle
except ImportError:
import pickle
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import threading
import time
from ZODB import DB
from Persistence import Persistent
from ZODB.referencesf import referencesf
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import snooze
from ZODB.POSException import ConflictError, StorageError
from ZODB.PersistentMapping import PersistentMapping
ZERO = '\0'*8
# This class is for the root object. It must not contain a getoid() method
# (really, attribute). The persistent pickling machinery -- in the dumps()
# function below -- will pickle Root objects as normal, but any attributes
# which reference persistent Object instances will get pickled as persistent
# ids, not as the object's state. This makes the referencesf stuff work,
# because it pickle sniffs for persistent ids (so we have to get those
# persistent ids into the root object's pickle).
class Root:
pass
# This is the persistent Object class. Because it has a getoid() method, the
# persistent pickling machinery -- in the dumps() function below -- will
# pickle the oid string instead of the object's actual state. Yee haw, this
# stuff is deep. ;)
class Object:
def __init__(self, oid):
self._oid = oid
def getoid(self):
return self._oid
class C(Persistent):
pass
# Here's where all the magic occurs. Sadly, the pickle module is a bit
# underdocumented, but here's what happens: by setting the persistent_id
# attribute to getpersid() on the pickler, that function gets called for every
# object being pickled. By returning None when the object has no getoid
# attribute, it signals pickle to serialize the object as normal. That's how
# the Root instance gets pickled correctly. But, if the object has a getoid
# attribute, then by returning that method's value, we tell pickle to
# serialize the persistent id of the object instead of the object's state.
# That sets the pickle up for proper sniffing by the referencesf machinery.
# Fun, huh?
def dumps(obj):
def getpersid(obj):
if hasattr(obj, 'getoid'):
return obj.getoid()
return None
s = StringIO()
p = pickle.Pickler(s)
p.persistent_id = getpersid
p.dump(obj)
return s.getvalue()
class PackableStorageBase:
# We keep a cache of object ids to instances so that the unpickler can
# easily return any persistent object.
_cache = {}
def _newobj(self):
# This is a convenience method to create a new persistent Object
# instance. It asks the storage for a new object id, creates the
# instance with the given oid, populates the cache and returns the
# object.
oid = self._storage.new_oid()
obj = Object(oid)
self._cache[obj.getoid()] = obj
return obj
def _makeloader(self):
# This is the other side of the persistent pickling magic. We need a
# custom unpickler to mirror our custom pickler above. By setting the
# persistent_load function of the unpickler to self._cache.get(),
# whenever a persistent id is unpickled, it will actually return the
# Object instance out of the cache. As far as returning a function
# with an argument bound to an instance attribute method, we do it
# this way because it makes the code in the tests more succinct.
#
# BUT! Be careful in your use of loads() vs. pickle.loads(). loads()
# should only be used on the Root object's pickle since it's the only
# special one. All the Object instances should use pickle.loads().
def loads(str, persfunc=self._cache.get):
fp = StringIO(str)
u = pickle.Unpickler(fp)
u.persistent_load = persfunc
return u.load()
return loads
class PackableStorage(PackableStorageBase):
def _initroot(self):
try:
self._storage.load(ZERO, '')
except KeyError:
import PersistentMapping
from ZODB.Transaction import Transaction
file = StringIO()
p = cPickle.Pickler(file, 1)
p.dump((PersistentMapping.PersistentMapping, None))
p.dump({'_container': {}})
t=Transaction()
t.description='initial database creation'
self._storage.tpc_begin(t)
self._storage.store(ZERO, None, file.getvalue(), '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
def checkPackEmptyStorage(self):
self._storage.pack(time.time(), referencesf)
def checkPackTomorrow(self):
self._initroot()
self._storage.pack(time.time() + 10000, referencesf)
def checkPackYesterday(self):
self._initroot()
self._storage.pack(time.time() - 10000, referencesf)
def checkPackAllRevisions(self):
self._initroot()
eq = self.assertEqual
raises = self.assertRaises
# Create a `persistent' object
obj = self._newobj()
oid = obj.getoid()
obj.value = 1
# Commit three different revisions
revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
obj.value = 2
revid2 = self._dostoreNP(oid, revid=revid1, data=pickle.dumps(obj))
obj.value = 3
revid3 = self._dostoreNP(oid, revid=revid2, data=pickle.dumps(obj))
# Now make sure all three revisions can be extracted
data = self._storage.loadSerial(oid, revid1)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 1)
data = self._storage.loadSerial(oid, revid2)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 2)
data = self._storage.loadSerial(oid, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 3)
# Now pack all transactions; need to sleep a second to make
# sure that the pack time is greater than the last commit time.
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
self._storage.pack(packtime, referencesf)
# All revisions of the object should be gone, since there is no
# reference from the root object to this object.
raises(KeyError, self._storage.loadSerial, oid, revid1)
raises(KeyError, self._storage.loadSerial, oid, revid2)
raises(KeyError, self._storage.loadSerial, oid, revid3)
def checkPackJustOldRevisions(self):
eq = self.assertEqual
raises = self.assertRaises
loads = self._makeloader()
# Create a root object. This can't be an instance of Object,
# otherwise the pickling machinery will serialize it as a persistent
# id and not as an object that contains references (persistent ids) to
# other objects.
root = Root()
# Create a persistent object, with some initial state
obj = self._newobj()
oid = obj.getoid()
# Link the root object to the persistent object, in order to keep the
# persistent object alive. Store the root object.
root.obj = obj
root.value = 0
revid0 = self._dostoreNP(ZERO, data=dumps(root))
# Make sure the root can be retrieved
data, revid = self._storage.load(ZERO, '')
eq(revid, revid0)
eq(loads(data).value, 0)
# Commit three different revisions of the other object
obj.value = 1
revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
obj.value = 2
revid2 = self._dostoreNP(oid, revid=revid1, data=pickle.dumps(obj))
obj.value = 3
revid3 = self._dostoreNP(oid, revid=revid2, data=pickle.dumps(obj))
# Now make sure all three revisions can be extracted
data = self._storage.loadSerial(oid, revid1)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 1)
data = self._storage.loadSerial(oid, revid2)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 2)
data = self._storage.loadSerial(oid, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 3)
# Now pack just revisions 1 and 2. The object's current revision
# should stay alive because it's pointed to by the root.
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
self._storage.pack(packtime, referencesf)
# Make sure the revisions are gone, but that object zero and revision
# 3 are still there and correct
data, revid = self._storage.load(ZERO, '')
eq(revid, revid0)
eq(loads(data).value, 0)
raises(KeyError, self._storage.loadSerial, oid, revid1)
raises(KeyError, self._storage.loadSerial, oid, revid2)
data = self._storage.loadSerial(oid, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 3)
data, revid = self._storage.load(oid, '')
eq(revid, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid)
eq(pobj.value, 3)
def checkPackOnlyOneObject(self):
eq = self.assertEqual
raises = self.assertRaises
loads = self._makeloader()
# Create a root object. This can't be an instance of Object,
# otherwise the pickling machinery will serialize it as a persistent
# id and not as an object that contains references (persistent ids) to
# other objects.
root = Root()
# Create a persistent object, with some initial state
obj1 = self._newobj()
oid1 = obj1.getoid()
# Create another persistent object, with some initial state. Make
# sure it's oid is greater than the first object's oid.
obj2 = self._newobj()
oid2 = obj2.getoid()
self.failUnless(oid2 > oid1)
# Link the root object to the persistent objects, in order to keep
# them alive. Store the root object.
root.obj1 = obj1
root.obj2 = obj2
root.value = 0
revid0 = self._dostoreNP(ZERO, data=dumps(root))
# Make sure the root can be retrieved
data, revid = self._storage.load(ZERO, '')
eq(revid, revid0)
eq(loads(data).value, 0)
# Commit three different revisions of the first object
obj1.value = 1
revid1 = self._dostoreNP(oid1, data=pickle.dumps(obj1))
obj1.value = 2
revid2 = self._dostoreNP(oid1, revid=revid1, data=pickle.dumps(obj1))
obj1.value = 3
revid3 = self._dostoreNP(oid1, revid=revid2, data=pickle.dumps(obj1))
# Now make sure all three revisions can be extracted
data = self._storage.loadSerial(oid1, revid1)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid1)
eq(pobj.value, 1)
data = self._storage.loadSerial(oid1, revid2)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid1)
eq(pobj.value, 2)
data = self._storage.loadSerial(oid1, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid1)
eq(pobj.value, 3)
# Now commit a revision of the second object
obj2.value = 11
revid4 = self._dostoreNP(oid2, data=pickle.dumps(obj2))
# And make sure the revision can be extracted
data = self._storage.loadSerial(oid2, revid4)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid2)
eq(pobj.value, 11)
# Now pack just revisions 1 and 2 of object1. Object1's current
# revision should stay alive because it's pointed to by the root, as
# should Object2's current revision.
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
self._storage.pack(packtime, referencesf)
# Make sure the revisions are gone, but that object zero, object2, and
# revision 3 of object1 are still there and correct.
data, revid = self._storage.load(ZERO, '')
eq(revid, revid0)
eq(loads(data).value, 0)
raises(KeyError, self._storage.loadSerial, oid1, revid1)
raises(KeyError, self._storage.loadSerial, oid1, revid2)
data = self._storage.loadSerial(oid1, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid1)
eq(pobj.value, 3)
data, revid = self._storage.load(oid1, '')
eq(revid, revid3)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid1)
eq(pobj.value, 3)
data, revid = self._storage.load(oid2, '')
eq(revid, revid4)
eq(loads(data).value, 11)
data = self._storage.loadSerial(oid2, revid4)
pobj = pickle.loads(data)
eq(pobj.getoid(), oid2)
eq(pobj.value, 11)
def checkPackUnlinkedFromRoot(self):
eq = self.assertEqual
db = DB(self._storage)
conn = db.open()
root = conn.root()
txn = get_transaction()
txn.note('root')
txn.commit()
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
obj = C()
obj.value = 7
root['obj'] = obj
txn = get_transaction()
txn.note('root -> o1')
txn.commit()
del root['obj']
txn = get_transaction()
txn.note('root -x-> o1')
txn.commit()
self._storage.pack(packtime, referencesf)
log = self._storage.undoLog()
tid = log[0]['id']
db.undo(tid)
txn = get_transaction()
txn.note('undo root -x-> o1')
txn.commit()
conn.sync()
eq(root['obj'].value, 7)
def _PackWhileWriting(self, pack_now=0):
# A storage should allow some reading and writing during
# a pack. This test attempts to exercise locking code
# in the storage to test that it is safe. It generates
# a lot of revisions, so that pack takes a long time.
db = DB(self._storage)
conn = db.open()
root = conn.root()
for i in range(10):
root[i] = MinPO(i)
get_transaction().commit()
snooze()
packt = time.time()
for j in range(10):
for i in range(10):
root[i].value = MinPO(i)
get_transaction().commit()
threads = [ClientThread(db) for i in range(4)]
for t in threads:
t.start()
if pack_now:
db.pack(time.time())
else:
db.pack(packt)
for t in threads:
t.join(30)
for t in threads:
t.join(1)
self.assert_(not t.isAlive())
# Iterate over the storage to make sure it's sane, but not every
# storage supports iterators.
if not hasattr(self._storage, "iterator"):
return
iter = self._storage.iterator()
for txn in iter:
for data in txn:
pass
iter.close()
def checkPackWhileWriting(self):
self._PackWhileWriting(pack_now=0)
def checkPackNowWhileWriting(self):
self._PackWhileWriting(pack_now=1)
def checkRedundantPack(self):
# It is an error to perform a pack with a packtime earlier
# than a previous packtime. The storage can't do a full
# traversal as of the packtime, because the previous pack may
# have removed revisions necessary for a full traversal.
# It should be simple to test that a storage error is raised,
# but this test case goes to the trouble of constructing a
# scenario that would lose data if the earlier packtime was
# honored.
self._initroot()
db = DB(self._storage)
conn = db.open()
root = conn.root()
root["d"] = d = PersistentMapping()
get_transaction().commit()
snooze()
obj = d["obj"] = C()
obj.value = 1
get_transaction().commit()
snooze()
packt1 = time.time()
lost_oid = obj._p_oid
obj = d["anotherobj"] = C()
obj.value = 2
get_transaction().commit()
snooze()
packt2 = time.time()
db.pack(packt2)
# BDBStorage allows the second pack, but doesn't lose data.
try:
db.pack(packt1)
except StorageError:
pass
# This object would be removed by the second pack, even though
# it is reachable.
self._storage.load(lost_oid, "")
def checkPackUndoLog(self):
self._initroot()
eq = self.assertEqual
raises = self.assertRaises
# Create a `persistent' object
obj = self._newobj()
oid = obj.getoid()
obj.value = 1
# Commit two different revisions
revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
obj.value = 2
snooze()
packtime = time.time()
snooze()
revid2 = self._dostoreNP(oid, revid=revid1, data=pickle.dumps(obj))
# Now pack the first transaction
self.assertEqual(3,len(self._storage.undoLog()))
self._storage.pack(packtime, referencesf)
# The undo log contains only the most resent transaction
self.assertEqual(1,len(self._storage.undoLog()))
def dont_checkPackUndoLogUndoable(self):
# A disabled test. I wanted to test that the content of the
# undo log was consistent, but every storage appears to
# include something slightly different. If the result of this
# method is only used to fill a GUI then this difference
# doesnt matter. Perhaps re-enable this test once we agree
# what should be asserted.
self._initroot()
# Create two `persistent' object
obj1 = self._newobj()
oid1 = obj1.getoid()
obj1.value = 1
obj2 = self._newobj()
oid2 = obj2.getoid()
obj2.value = 2
# Commit the first revision of each of them
revid11 = self._dostoreNP(oid1, data=pickle.dumps(obj1),
description="1-1")
revid22 = self._dostoreNP(oid2, data=pickle.dumps(obj2),
description="2-2")
# remember the time. everything above here will be packed away
snooze()
packtime = time.time()
snooze()
# Commit two revisions of the first object
obj1.value = 3
revid13 = self._dostoreNP(oid1, revid=revid11,
data=pickle.dumps(obj1), description="1-3")
obj1.value = 4
revid14 = self._dostoreNP(oid1, revid=revid13,
data=pickle.dumps(obj1), description="1-4")
# Commit one revision of the second object
obj2.value = 5
revid25 = self._dostoreNP(oid2, revid=revid22,
data=pickle.dumps(obj2), description="2-5")
# Now pack
self.assertEqual(6,len(self._storage.undoLog()))
print '\ninitial undoLog was'
for r in self._storage.undoLog(): print r
self._storage.pack(packtime, referencesf)
# The undo log contains only two undoable transaction.
print '\nafter packing undoLog was'
for r in self._storage.undoLog(): print r
# what can we assert about that?
class ClientThread(threading.Thread):
def __init__(self, db):
threading.Thread.__init__(self)
self.root = db.open().root()
def run(self):
for j in range(50):
try:
self.root[j % 10].value = MinPO(j)
get_transaction().commit()
except ConflictError:
get_transaction().abort()
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/PersistentStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test that a storage's values persist across open and close."""
class PersistentStorage:
def checkUpdatesPersist(self):
oids = []
def new_oid_wrapper(l=oids, new_oid=self._storage.new_oid):
oid = new_oid()
l.append(oid)
return oid
self._storage.new_oid = new_oid_wrapper
self._dostore()
oid = self._storage.new_oid()
revid = self._dostore(oid)
if self._storage.supportsVersions():
self._dostore(oid, revid, data=8, version='b')
oid = self._storage.new_oid()
revid = self._dostore(oid, data=1)
revid = self._dostore(oid, revid, data=2)
self._dostore(oid, revid, data=3)
# keep copies of all the objects
objects = []
for oid in oids:
p, s = self._storage.load(oid, '')
objects.append((oid, '', p, s))
ver = self._storage.modifiedInVersion(oid)
if ver:
p, s = self._storage.load(oid, ver)
objects.append((oid, ver, p, s))
self._storage.close()
self.open()
# keep copies of all the objects
for oid, ver, p, s in objects:
_p, _s = self._storage.load(oid, ver)
self.assertEquals(p, _p)
self.assertEquals(s, _s)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/ReadOnlyStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from ZODB.POSException import ReadOnlyError
from ZODB.Transaction import Transaction
class ReadOnlyStorage:
def _create_data(self):
# test a read-only storage that already has some data
self.oids = {}
for i in range(10):
oid = self._storage.new_oid()
revid = self._dostore(oid)
self.oids[oid] = revid
def _make_readonly(self):
self._storage.close()
self.open(read_only=1)
self.assert_(self._storage.isReadOnly())
def checkReadMethods(self):
self._create_data()
self._make_readonly()
# XXX not going to bother checking all read methods
for oid in self.oids.keys():
data, revid = self._storage.load(oid, '')
self.assertEqual(revid, self.oids[oid])
self.assert_(not self._storage.modifiedInVersion(oid))
_data = self._storage.loadSerial(oid, revid)
self.assertEqual(data, _data)
def checkWriteMethods(self):
self._make_readonly()
self.assertRaises(ReadOnlyError, self._storage.new_oid)
t = Transaction()
self.assertRaises(ReadOnlyError, self._storage.tpc_begin, t)
if self._storage.supportsVersions():
self.assertRaises(ReadOnlyError, self._storage.abortVersion,
'', t)
self.assertRaises(ReadOnlyError, self._storage.commitVersion,
'', '', t)
self.assertRaises(ReadOnlyError, self._storage.store,
'\000' * 8, None, '', '', t)
if self._storage.supportsTransactionalUndo():
self.assertRaises(ReadOnlyError, self._storage.transactionalUndo,
'\000' * 8, t)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/RecoveryStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""More recovery and iterator tests."""
from ZODB.Transaction import Transaction
from ZODB.tests.IteratorStorage import IteratorDeepCompare
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
from ZODB import DB
from ZODB.referencesf import referencesf
import time
class RecoveryStorage(IteratorDeepCompare):
# Requires a setUp() that creates a self._dst destination storage
def checkSimpleRecovery(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=11)
revid = self._dostore(oid, revid=revid, data=12)
revid = self._dostore(oid, revid=revid, data=13)
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def checkRecoveryAcrossVersions(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=21)
revid = self._dostore(oid, revid=revid, data=22)
revid = self._dostore(oid, revid=revid, data=23, version='one')
revid = self._dostore(oid, revid=revid, data=34, version='one')
# Now commit the version
t = Transaction()
self._storage.tpc_begin(t)
self._storage.commitVersion('one', '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def checkRecoverAbortVersion(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=21, version="one")
revid = self._dostore(oid, revid=revid, data=23, version='one')
revid = self._dostore(oid, revid=revid, data=34, version='one')
# Now abort the version and the creation
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.abortVersion('one', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(oids, [oid])
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
# Also make sure the the last transaction has a data record
# with None for its data attribute, because we've undone the
# object.
for s in self._storage, self._dst:
iter = s.iterator()
for trans in iter:
pass # iterate until we get the last one
data = trans[0]
self.assertRaises(IndexError, lambda i, t=trans: t[i], 1)
self.assertEqual(data.oid, oid)
self.assertEqual(data.data, None)
def checkRecoverUndoInVersion(self):
oid = self._storage.new_oid()
version = "aVersion"
revid_a = self._dostore(oid, data=MinPO(91))
revid_b = self._dostore(oid, revid=revid_a, version=version,
data=MinPO(92))
revid_c = self._dostore(oid, revid=revid_b, version=version,
data=MinPO(93))
self._undo(self._storage.undoInfo()[0]['id'], oid)
self._commitVersion(version, '')
self._undo(self._storage.undoInfo()[0]['id'], oid)
# now copy the records to a new storage
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
# The last two transactions were applied directly rather than
# copied. So we can't use compare() to verify that they new
# transactions are applied correctly. (The new transactions
# will have different timestamps for each storage.)
self._abortVersion(version)
self.assert_(self._storage.versionEmpty(version))
self._undo(self._storage.undoInfo()[0]['id'], oid)
self.assert_(not self._storage.versionEmpty(version))
# check the data is what we expect it to be
data, revid = self._storage.load(oid, version)
self.assertEqual(zodb_unpickle(data), MinPO(92))
data, revid = self._storage.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO(91))
# and swap the storages
tmp = self._storage
self._storage = self._dst
self._abortVersion(version)
self.assert_(self._storage.versionEmpty(version))
self._undo(self._storage.undoInfo()[0]['id'], oid)
self.assert_(not self._storage.versionEmpty(version))
# check the data is what we expect it to be
data, revid = self._storage.load(oid, version)
self.assertEqual(zodb_unpickle(data), MinPO(92))
data, revid = self._storage.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO(91))
# swap them back
self._storage = tmp
# Now remove _dst and copy all the transactions a second time.
# This time we will be able to confirm via compare().
self._dst.close()
self._dst.cleanup()
self._dst = self.new_dest()
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def checkRestoreAcrossPack(self):
db = DB(self._storage)
c = db.open()
r = c.root()
obj = r["obj1"] = MinPO(1)
get_transaction().commit()
obj = r["obj2"] = MinPO(1)
get_transaction().commit()
self._dst.copyTransactionsFrom(self._storage)
self._dst.pack(time.time(), referencesf)
self._undo(self._storage.undoInfo()[0]['id'])
# copy the final transaction manually. even though there
# was a pack, the restore() ought to succeed.
it = self._storage.iterator()
final = list(it)[-1]
self._dst.tpc_begin(final, final.tid, final.status)
for r in final:
self._dst.restore(r.oid, r.serial, r.data, r.version, r.data_txn,
final)
it.close()
self._dst.tpc_vote(final)
self._dst.tpc_finish(final)
def checkPackWithGCOnDestinationAfterRestore(self):
raises = self.assertRaises
db = DB(self._storage)
conn = db.open()
root = conn.root()
root.obj = obj1 = MinPO(1)
txn = get_transaction()
txn.note('root -> obj')
txn.commit()
root.obj.obj = obj2 = MinPO(2)
txn = get_transaction()
txn.note('root -> obj -> obj')
txn.commit()
del root.obj
txn = get_transaction()
txn.note('root -X->')
txn.commit()
# Now copy the transactions to the destination
self._dst.copyTransactionsFrom(self._storage)
# Now pack the destination.
snooze()
self._dst.pack(time.time(), referencesf)
# And check to see that the root object exists, but not the other
# objects.
data, serial = self._dst.load(root._p_oid, '')
raises(KeyError, self._dst.load, obj1._p_oid, '')
raises(KeyError, self._dst.load, obj2._p_oid, '')
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/RevisionStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Check loadSerial() on storages that support historical revisions."""
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
ZERO = '\0'*8
class RevisionStorage:
def checkLoadSerial(self):
oid = self._storage.new_oid()
revid = ZERO
revisions = {}
for i in range(31, 38):
revid = self._dostore(oid, revid=revid, data=MinPO(i))
revisions[revid] = MinPO(i)
# Now make sure all the revisions have the correct value
for revid, value in revisions.items():
data = self._storage.loadSerial(oid, revid)
self.assertEqual(zodb_unpickle(data), value)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/StorageTestBase.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Provide a mixin base class for storage tests.
The StorageTestBase class provides basic setUp() and tearDown()
semantics (which you can override), and it also provides a helper
method _dostore() which performs a complete store transaction for a
single object revision.
"""
import errno
import os
import pickle
import string
import sys
import time
import types
import unittest
from cPickle import Pickler, Unpickler
from cStringIO import StringIO
from ZODB.Transaction import Transaction
from ZODB.utils import u64
from ZODB.tests.MinPO import MinPO
ZERO = '\0'*8
def snooze():
# In Windows, it's possible that two successive time.time() calls return
# the same value. Tim guarantees that time never runs backwards. You
# usually want to call this before you pack a storage, or must make other
# guarantees about increasing timestamps.
now = time.time()
while now == time.time():
time.sleep(0.1)
def zodb_pickle(obj):
"""Create a pickle in the format expected by ZODB."""
f = StringIO()
p = Pickler(f, 1)
p.persistent_id = lambda obj: getattr(obj, '_p_oid', None)
klass = obj.__class__
assert not hasattr(obj, '__getinitargs__'), "not ready for constructors"
args = None
mod = getattr(klass, '__module__', None)
if mod is not None:
klass = mod, klass.__name__
state = obj.__getstate__()
p.dump((klass, args))
p.dump(state)
return f.getvalue(1)
def persistent_load(pid):
# helper for zodb_unpickle
return "ref to %s.%s oid=%s" % (pid[1][0], pid[1][1], u64(pid[0]))
def zodb_unpickle(data):
"""Unpickle an object stored using the format expected by ZODB."""
f = StringIO(data)
u = Unpickler(f)
u.persistent_load = persistent_load
klass_info = u.load()
if isinstance(klass_info, types.TupleType):
if isinstance(klass_info[0], types.TupleType):
modname, klassname = klass_info[0]
args = klass_info[1]
else:
modname, klassname = klass_info
args = None
if modname == "__main__":
ns = globals()
else:
mod = import_helper(modname)
ns = mod.__dict__
try:
klass = ns[klassname]
except KeyError:
sys.stderr.write("can't find %s in %s" % (klassname,
repr(ns)))
inst = klass()
else:
raise ValueError, "expected class info: %s" % repr(klass_info)
state = u.load()
inst.__setstate__(state)
return inst
def handle_all_serials(oid, *args):
"""Return dict of oid to serialno from store() and tpc_vote().
Raises an exception if one of the calls raised an exception.
The storage interface got complicated when ZEO was introduced.
Any individual store() call can return None or a sequence of
2-tuples where the 2-tuple is either oid, serialno or an
exception to be raised by the client.
The original interface just returned the serialno for the
object.
"""
d = {}
for arg in args:
if isinstance(arg, types.StringType):
d[oid] = arg
elif arg is None:
pass
else:
for oid, serial in arg:
if not isinstance(serial, types.StringType):
raise serial # error from ZEO server
d[oid] = serial
return d
def handle_serials(oid, *args):
"""Return the serialno for oid based on multiple return values.
A helper for function _handle_all_serials().
"""
return handle_all_serials(oid, *args)[oid]
def import_helper(name):
mod = __import__(name)
return sys.modules[name]
def removefs(base):
"""Remove all files created by FileStorage with path base."""
for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
path = base + ext
try:
os.remove(path)
except os.error, err:
if err[0] != errno.ENOENT:
raise
class StorageTestBase(unittest.TestCase):
# XXX It would be simpler if concrete tests didn't need to extend
# setUp() and tearDown().
def setUp(self):
# You need to override this with a setUp that creates self._storage
self._storage = None
def _close(self):
# You should override this if closing your storage requires additional
# shutdown operations.
if self._storage is not None:
self._storage.close()
def tearDown(self):
self._close()
def _dostore(self, oid=None, revid=None, data=None, version=None,
already_pickled=0, user=None, description=None):
"""Do a complete storage transaction. The defaults are:
- oid=None, ask the storage for a new oid
- revid=None, use a revid of ZERO
- data=None, pickle up some arbitrary data (the integer 7)
- version=None, use the empty string version
Returns the object's new revision id.
"""
if oid is None:
oid = self._storage.new_oid()
if revid is None:
revid = ZERO
if data is None:
data = MinPO(7)
if type(data) == types.IntType:
data = MinPO(data)
if not already_pickled:
data = zodb_pickle(data)
if version is None:
version = ''
# Begin the transaction
t = Transaction()
if user is not None:
t.user = user
if description is not None:
t.description = description
try:
self._storage.tpc_begin(t)
# Store an object
r1 = self._storage.store(oid, revid, data, version, t)
# Finish the transaction
r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
return revid
def _dostoreNP(self, oid=None, revid=None, data=None, version=None,
user=None, description=None):
return self._dostore(oid, revid, data, version, 1, user, description)
# The following methods depend on optional storage features.
def _undo(self, tid, oid=None):
# Undo a tid that affects a single object (oid).
# XXX This is very specialized
t = Transaction()
t.note("undo")
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
if oid is not None:
self.assertEqual(len(oids), 1)
self.assertEqual(oids[0], oid)
return self._storage.lastTransaction()
def _commitVersion(self, src, dst):
t = Transaction()
t.note("commit %r to %r" % (src, dst))
self._storage.tpc_begin(t)
oids = self._storage.commitVersion(src, dst, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
return oids
def _abortVersion(self, ver):
t = Transaction()
t.note("abort %r" % ver)
self._storage.tpc_begin(t)
oids = self._storage.abortVersion(ver, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
return oids
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/Synchronization.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the storage's implemenetation of the storage synchronization spec.
The Synchronization spec
http://www.zope.org/Documentation/Developer/Models/ZODB/
ZODB_Architecture_Storage_Interface_State_Synchronization_Diag.html
It specifies two states committing and non-committing. A storage
starts in the non-committing state. tpc_begin() transfers to the
committting state; tpc_abort() and tpc_finish() transfer back to
non-committing.
Several other methods are only allowed in one state or another. Many
methods allowed only in the committing state require that they apply
to the currently committing transaction.
The spec is silent on a variety of methods that don't appear to modify
the state, e.g. load(), undoLog(), pack(). It's unclear whether there
is a separate set of synchronization rules that apply to these methods
or if the synchronization is implementation dependent, i.e. only what
is need to guarantee a corrected implementation.
The synchronization spec is also silent on whether there is any
contract implied with the caller. If the storage can assume that a
single client is single-threaded and that it will not call, e.g., store()
until after it calls tpc_begin(), the implementation can be
substantially simplified.
New and/or unspecified methods:
tpc_vote(): handled like tpc_abort
transactionalUndo(): handled like undo() (which is how?)
Methods that have nothing to do with committing/non-committing:
load(), loadSerial(), getName(), getSize(), __len__(), history(),
undoLog(), modifiedInVersion(), versionEmpty(), versions(), pack().
Specific questions:
The spec & docs say that undo() takes three arguments, the second
being a transaction. If the specified arg isn't the current
transaction, the undo() should raise StorageTransactionError. This
isn't implemented anywhere. It looks like undo can be called at
anytime.
FileStorage does not allow undo() during a pack. How should this be
tested? Is it a general restriction?
"""
from ZODB.Transaction import Transaction
from ZODB.POSException import StorageTransactionError
VERSION = "testversion"
OID = "\000" * 8
SERIALNO = "\000" * 8
TID = "\000" * 8
class SynchronizedStorage:
## def verifyCommitting(self, callable, *args):
## self.assertRaises(StorageTransactionError, callable *args)
def verifyNotCommitting(self, callable, *args):
self.assertRaises(StorageTransactionError, callable, *args)
def verifyWrongTrans(self, callable, *args):
t = Transaction()
self._storage.tpc_begin(t)
self.assertRaises(StorageTransactionError, callable, *args)
self._storage.tpc_abort(t)
def checkAbortVersionNotCommitting(self):
self.verifyNotCommitting(self._storage.abortVersion,
VERSION, Transaction())
def checkAbortVersionWrongTrans(self):
self.verifyWrongTrans(self._storage.abortVersion,
VERSION, Transaction())
def checkCommitVersionNotCommitting(self):
self.verifyNotCommitting(self._storage.commitVersion,
VERSION, "", Transaction())
def checkCommitVersionWrongTrans(self):
self.verifyWrongTrans(self._storage.commitVersion,
VERSION, "", Transaction())
def checkStoreNotCommitting(self):
self.verifyNotCommitting(self._storage.store,
OID, SERIALNO, "", "", Transaction())
def checkStoreWrongTrans(self):
self.verifyWrongTrans(self._storage.store,
OID, SERIALNO, "", "", Transaction())
## def checkNewOidNotCommitting(self):
## self.verifyNotCommitting(self._storage.new_oid)
## def checkNewOidWrongTrans(self):
## self.verifyWrongTrans(self._storage.new_oid)
def checkAbortNotCommitting(self):
self._storage.tpc_abort(Transaction())
def checkAbortWrongTrans(self):
t = Transaction()
self._storage.tpc_begin(t)
self._storage.tpc_abort(Transaction())
self._storage.tpc_abort(t)
def checkFinishNotCommitting(self):
t = Transaction()
self._storage.tpc_finish(t)
self._storage.tpc_abort(t)
def checkFinishWrongTrans(self):
t = Transaction()
self._storage.tpc_begin(t)
self._storage.tpc_finish(Transaction())
self._storage.tpc_abort(t)
def checkBeginCommitting(self):
t = Transaction()
self._storage.tpc_begin(t)
self._storage.tpc_begin(t)
self._storage.tpc_abort(t)
# XXX how to check undo?
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/TransactionalUndoStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Check transactionalUndo().
Any storage that supports transactionalUndo() must pass these tests.
"""
from __future__ import nested_scopes
import time
import types
from ZODB import POSException
from ZODB.Transaction import Transaction
from ZODB.referencesf import referencesf
from ZODB.utils import u64, p64
from ZODB import DB
from Persistence import Persistent
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
ZERO = '\0'*8
class C(Persistent):
pass
def snooze():
# In Windows, it's possible that two successive time.time() calls return
# the same value. Tim guarantees that time never runs backwards. You
# usually want to call this before you pack a storage, or must make other
# guarantees about increasing timestamps.
now = time.time()
while now == time.time():
time.sleep(0.1)
def listeq(L1, L2):
"""Return True if L1.sort() == L2.sort()"""
c1 = L1[:]
c2 = L2[:]
c1.sort()
c2.sort()
return c1 == c2
class TransactionalUndoStorage:
def _transaction_begin(self):
self.__serials = {}
def _transaction_store(self, oid, rev, data, vers, trans):
r = self._storage.store(oid, rev, data, vers, trans)
if r:
if type(r) == types.StringType:
self.__serials[oid] = r
else:
for oid, serial in r:
self.__serials[oid] = serial
def _transaction_vote(self, trans):
r = self._storage.tpc_vote(trans)
if r:
for oid, serial in r:
self.__serials[oid] = serial
def _transaction_newserial(self, oid):
return self.__serials[oid]
def _multi_obj_transaction(self, objs):
newrevs = {}
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
for oid, rev, data in objs:
self._transaction_store(oid, rev, data, '', t)
newrevs[oid] = None
self._transaction_vote(t)
self._storage.tpc_finish(t)
for oid in newrevs.keys():
newrevs[oid] = self._transaction_newserial(oid)
return newrevs
def _iterate(self):
"""Iterate over the storage in its final state."""
# This is testing that the iterator() code works correctly.
# The hasattr() guards against ZEO, which doesn't support iterator.
if not hasattr(self._storage, "iterator"):
return
iter = self._storage.iterator()
for txn in iter:
for rec in txn:
pass
def checkSimpleTransactionalUndo(self):
eq = self.assertEqual
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(23))
revid = self._dostore(oid, revid=revid, data=MinPO(24))
revid = self._dostore(oid, revid=revid, data=MinPO(25))
info = self._storage.undoInfo()
tid = info[0]['id']
# Now start an undo transaction
t = Transaction()
t.note('undo1')
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(24))
# Do another one
info = self._storage.undoInfo()
tid = info[2]['id']
t = Transaction()
t.note('undo2')
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(23))
# Try to undo the first record
info = self._storage.undoInfo()
tid = info[4]['id']
t = Transaction()
t.note('undo3')
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
# This should fail since we've undone the object's creation
self.assertRaises(KeyError,
self._storage.load, oid, '')
# And now let's try to redo the object's creation
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(23))
self._iterate()
def checkCreationUndoneGetSerial(self):
# create an object
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(23))
# undo its creation
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
t.note('undo1')
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# Check that calling getSerial on an uncreated object raises a KeyError
# The current version of FileStorage fails this test
self.assertRaises(KeyError, self._storage.getSerial, oid)
def checkUndoCreationBranch1(self):
eq = self.assertEqual
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12))
# Undo the last transaction
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(11))
# Now from here, we can either redo the last undo, or undo the object
# creation. Let's undo the object creation.
info = self._storage.undoInfo()
tid = info[2]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
self.assertRaises(KeyError, self._storage.load, oid, '')
self._iterate()
def checkUndoCreationBranch2(self):
eq = self.assertEqual
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12))
# Undo the last transaction
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(11))
# Now from here, we can either redo the last undo, or undo the object
# creation. Let's redo the last undo
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(12))
self._iterate()
def checkTwoObjectUndo(self):
eq = self.assertEqual
# Convenience
p31, p32, p51, p52 = map(zodb_pickle,
map(MinPO, (31, 32, 51, 52)))
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
revid1 = revid2 = ZERO
# Store two objects in the same transaction
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p31, '', t)
self._transaction_store(oid2, revid2, p51, '', t)
# Finish the transaction
self._transaction_vote(t)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(t)
eq(revid1, revid2)
# Update those same two objects
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p32, '', t)
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(t)
eq(revid1, revid2)
# Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(32))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(52))
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 2)
self.failUnless(oid1 in oids)
self.failUnless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(31))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(51))
self._iterate()
def checkTwoObjectUndoAtOnce(self):
# Convenience
eq = self.assertEqual
unless = self.failUnless
p30, p31, p32, p50, p51, p52 = map(zodb_pickle,
map(MinPO,
(30, 31, 32, 50, 51, 52)))
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
revid1 = revid2 = ZERO
# Store two objects in the same transaction
d = self._multi_obj_transaction([(oid1, revid1, p30),
(oid2, revid2, p50),
])
eq(d[oid1], d[oid2])
# Update those same two objects
d = self._multi_obj_transaction([(oid1, d[oid1], p31),
(oid2, d[oid2], p51),
])
eq(d[oid1], d[oid2])
# Update those same two objects
d = self._multi_obj_transaction([(oid1, d[oid1], p32),
(oid2, d[oid2], p52),
])
eq(d[oid1], d[oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
# Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(32))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(52))
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[0]['id']
tid1 = info[1]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
oids1 = self._storage.transactionalUndo(tid1, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# We get the finalization stuff called an extra time:
## self._storage.tpc_vote(t)
## self._storage.tpc_finish(t)
eq(len(oids), 2)
eq(len(oids1), 2)
unless(oid1 in oids)
unless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(30))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(50))
# Now try to undo the one we just did to undo, whew
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 2)
unless(oid1 in oids)
unless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(32))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(52))
self._iterate()
def checkTwoObjectUndoAgain(self):
eq = self.assertEqual
p31, p32, p33, p51, p52, p53 = map(
zodb_pickle,
map(MinPO, (31, 32, 33, 51, 52, 53)))
# Like the above, but the first revision of the objects are stored in
# different transactions.
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
revid1 = self._dostore(oid1, data=p31, already_pickled=1)
revid2 = self._dostore(oid2, data=p51, already_pickled=1)
# Update those same two objects
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p32, '', t)
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
self._storage.tpc_finish(t)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 2)
self.failUnless(oid1 in oids)
self.failUnless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(31))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(51))
# Like the above, but this time, the second transaction contains only
# one object.
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p33, '', t)
self._transaction_store(oid2, revid2, p53, '', t)
# Finish the transaction
self._transaction_vote(t)
self._storage.tpc_finish(t)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
# Update in different transactions
revid1 = self._dostore(oid1, revid=revid1, data=MinPO(34))
revid2 = self._dostore(oid2, revid=revid2, data=MinPO(54))
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
self.failUnless(oid1 in oids)
self.failUnless(not oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(33))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(54))
self._iterate()
def checkNotUndoable(self):
eq = self.assertEqual
# Set things up so we've got a transaction that can't be undone
oid = self._storage.new_oid()
revid_a = self._dostore(oid, data=MinPO(51))
revid_b = self._dostore(oid, revid=revid_a, data=MinPO(52))
revid_c = self._dostore(oid, revid=revid_b, data=MinPO(53))
# Start the undo
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
self._storage.tpc_begin(t)
self.assertRaises(POSException.UndoError,
self._storage.transactionalUndo,
tid, t)
self._storage.tpc_abort(t)
# Now have more fun: object1 and object2 are in the same transaction,
# which we'll try to undo to, but one of them has since modified in
# different transaction, so the undo should fail.
oid1 = oid
revid1 = revid_c
oid2 = self._storage.new_oid()
revid2 = ZERO
p81, p82, p91, p92 = map(zodb_pickle,
map(MinPO, (81, 82, 91, 92)))
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p81, '', t)
self._transaction_store(oid2, revid2, p91, '', t)
self._transaction_vote(t)
self._storage.tpc_finish(t)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
# Make sure the objects have the expected values
data, revid_11 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(81))
data, revid_22 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(91))
eq(revid_11, revid1)
eq(revid_22, revid2)
# Now modify oid2
revid2 = self._dostore(oid2, revid=revid2, data=MinPO(92))
self.assertNotEqual(revid1, revid2)
self.assertNotEqual(revid2, revid_22)
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
self._storage.tpc_begin(t)
self.assertRaises(POSException.UndoError,
self._storage.transactionalUndo,
tid, t)
self._storage.tpc_abort(t)
self._iterate()
def checkTransactionalUndoAfterPack(self):
eq = self.assertEqual
# Add a few object revisions
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(51))
packtime = time.time()
snooze() # time.time() now distinct from packtime
revid2 = self._dostore(oid, revid=revid1, data=MinPO(52))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(53))
# Now get the undo log
info = self._storage.undoInfo()
eq(len(info), 3)
tid = info[0]['id']
# Now pack just the initial revision of the object. We need the
# second revision otherwise we won't be able to undo the third
# revision!
self._storage.pack(packtime, referencesf)
# Make some basic assertions about the undo information now
info2 = self._storage.undoInfo()
eq(len(info2), 2)
# And now attempt to undo the last transaction
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
# The object must now be at the second state
eq(zodb_unpickle(data), MinPO(52))
self._iterate()
def checkTransactionalUndoAfterPackWithObjectUnlinkFromRoot(self):
eq = self.assertEqual
db = DB(self._storage)
conn = db.open()
root = conn.root()
o1 = C()
o2 = C()
root['obj'] = o1
o1.obj = o2
txn = get_transaction()
txn.note('o1 -> o2')
txn.commit()
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
o3 = C()
o2.obj = o3
txn = get_transaction()
txn.note('o1 -> o2 -> o3')
txn.commit()
o1.obj = o3
txn = get_transaction()
txn.note('o1 -> o3')
txn.commit()
log = self._storage.undoLog()
eq(len(log), 4)
for entry in zip(log, ('o1 -> o3', 'o1 -> o2 -> o3',
'o1 -> o2', 'initial database creation')):
eq(entry[0]['description'], entry[1])
self._storage.pack(packtime, referencesf)
log = self._storage.undoLog()
for entry in zip(log, ('o1 -> o3', 'o1 -> o2 -> o3')):
eq(entry[0]['description'], entry[1])
tid = log[0]['id']
db.undo(tid)
txn = get_transaction()
txn.note('undo')
txn.commit()
# undo does a txn-undo, but doesn't invalidate
conn.sync()
log = self._storage.undoLog()
for entry in zip(log, ('undo', 'o1 -> o3', 'o1 -> o2 -> o3')):
eq(entry[0]['description'], entry[1])
eq(o1.obj, o2)
eq(o1.obj.obj, o3)
self._iterate()
def checkPackAfterUndoDeletion(self):
db = DB(self._storage)
cn = db.open()
root = cn.root()
pack_times = []
def set_pack_time():
pack_times.append(time.time())
snooze()
root["key0"] = MinPO(0)
root["key1"] = MinPO(1)
root["key2"] = MinPO(2)
txn = get_transaction()
txn.note("create 3 keys")
txn.commit()
set_pack_time()
del root["key1"]
txn = get_transaction()
txn.note("delete 1 key")
txn.commit()
set_pack_time()
root._p_deactivate()
cn.sync()
self.assert_(listeq(root.keys(), ["key0", "key2"]))
L = db.undoInfo()
db.undo(L[0]["id"])
txn = get_transaction()
txn.note("undo deletion")
txn.commit()
set_pack_time()
root._p_deactivate()
cn.sync()
self.assert_(listeq(root.keys(), ["key0", "key1", "key2"]))
for t in pack_times:
self._storage.pack(t, referencesf)
root._p_deactivate()
cn.sync()
self.assert_(listeq(root.keys(), ["key0", "key1", "key2"]))
for i in range(3):
obj = root["key%d" % i]
self.assertEqual(obj.value, i)
root.items()
self._inter_pack_pause()
def checkPackAfterUndoManyTimes(self):
db = DB(self._storage)
cn = db.open()
rt = cn.root()
rt["test"] = MinPO(1)
get_transaction().commit()
rt["test2"] = MinPO(2)
get_transaction().commit()
rt["test"] = MinPO(3)
txn = get_transaction()
txn.note("root of undo")
txn.commit()
packtimes = []
for i in range(10):
L = db.undoInfo()
db.undo(L[0]["id"])
txn = get_transaction()
txn.note("undo %d" % i)
txn.commit()
rt._p_deactivate()
cn.sync()
self.assertEqual(rt["test"].value, i % 2 and 3 or 1)
self.assertEqual(rt["test2"].value, 2)
packtimes.append(time.time())
snooze()
for t in packtimes:
self._storage.pack(t, referencesf)
cn.sync()
cn._cache.clear()
# The last undo set the value to 3 and pack should
# never change that.
self.assertEqual(rt["test"].value, 3)
self.assertEqual(rt["test2"].value, 2)
self._inter_pack_pause()
def _inter_pack_pause(self):
# DirectoryStorage needs a pause between packs,
# most other storages dont.
pass
def checkTransactionalUndoIterator(self):
# check that data_txn set in iterator makes sense
if not hasattr(self._storage, "iterator"):
return
s = self._storage
BATCHES = 4
OBJECTS = 4
orig = []
for i in range(BATCHES):
t = Transaction()
tid = p64(i + 1)
s.tpc_begin(t, tid)
for j in range(OBJECTS):
oid = s.new_oid()
obj = MinPO(i * OBJECTS + j)
revid = s.store(oid, None, zodb_pickle(obj), '', t)
orig.append((tid, oid, revid))
s.tpc_vote(t)
s.tpc_finish(t)
i = 0
for tid, oid, revid in orig:
self._dostore(oid, revid=revid, data=MinPO(revid),
description="update %s" % i)
# Undo the OBJECTS transactions that modified objects created
# in the ith original transaction.
def undo(i):
info = s.undoInfo()
t = Transaction()
s.tpc_begin(t)
base = i * OBJECTS + i
for j in range(OBJECTS):
tid = info[base + j]['id']
s.transactionalUndo(tid, t)
s.tpc_vote(t)
s.tpc_finish(t)
for i in range(BATCHES):
undo(i)
# There are now (2 + OBJECTS) * BATCHES transactions:
# BATCHES original transactions, followed by
# OBJECTS * BATCHES modifications, followed by
# BATCHES undos
iter = s.iterator()
offset = 0
eq = self.assertEqual
for i in range(BATCHES):
txn = iter[offset]
offset += 1
tid = p64(i + 1)
eq(txn.tid, tid)
L1 = [(rec.oid, rec.serial, rec.data_txn) for rec in txn]
L2 = [(oid, revid, None) for _tid, oid, revid in orig
if _tid == tid]
eq(L1, L2)
for i in range(BATCHES * OBJECTS):
txn = iter[offset]
offset += 1
eq(len([rec for rec in txn if rec.data_txn is None]), 1)
for i in range(BATCHES):
txn = iter[offset]
offset += 1
# The undos are performed in reverse order.
otid = p64(BATCHES - i)
L1 = [(rec.oid, rec.data_txn) for rec in txn]
L2 = [(oid, otid) for _tid, oid, revid in orig
if _tid == otid]
L1.sort()
L2.sort()
eq(L1, L2)
self.assertRaises(IndexError, iter.__getitem__, offset)
def checkUndoLogMetadata(self):
# test that the metadata is correct in the undo log
t = get_transaction()
t.note('t1')
t.setExtendedInfo('k2','this is transaction metadata')
t.setUser('u3',path='p3')
db = DB(self._storage)
conn = db.open()
root = conn.root()
o1 = C()
root['obj'] = o1
txn = get_transaction()
txn.commit()
l = self._storage.undoLog()
self.assertEqual(len(l),2)
d = l[0]
self.assertEqual(d['description'],'t1')
self.assertEqual(d['k2'],'this is transaction metadata')
self.assertEqual(d['user_name'],'p3 u3')
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/TransactionalUndoVersionStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from __future__ import nested_scopes
# Check interactions between transactionalUndo() and versions. Any storage
# that supports both transactionalUndo() and versions must pass these tests.
import time
from ZODB import POSException
from ZODB.referencesf import referencesf
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
class TransactionalUndoVersionStorage:
def _x_dostore(self, *args, **kwargs):
# ugh: backwards compatibilty for ZEO 1.0 which runs these
# tests but has a _dostore() method that does not support the
# description kwarg.
try:
return self._dostore(*args, **kwargs)
except TypeError:
# assume that the type error means we've got a _dostore()
# without the description kwarg
try:
del kwargs['description']
except KeyError:
pass # not expected
return self._dostore(*args, **kwargs)
def _undo(self, tid, oid):
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(len(oids), 1)
self.assertEqual(oids[0], oid)
def checkUndoInVersion(self):
eq = self.assertEqual
unless = self.failUnless
def check_objects(nonversiondata, versiondata):
data, revid = self._storage.load(oid, version)
self.assertEqual(zodb_unpickle(data), MinPO(versiondata))
data, revid = self._storage.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO(nonversiondata))
oid = self._storage.new_oid()
version = 'one'
revid_a = self._dostore(oid, data=MinPO(91))
revid_b = self._dostore(oid, revid=revid_a, data=MinPO(92),
version=version)
revid_c = self._dostore(oid, revid=revid_b, data=MinPO(93),
version=version)
info = self._storage.undoInfo()
self._undo(info[0]['id'], oid)
data, revid = self._storage.load(oid, '')
eq(revid, revid_a)
eq(zodb_unpickle(data), MinPO(91))
data, revid = self._storage.load(oid, version)
unless(revid > revid_b and revid > revid_c)
eq(zodb_unpickle(data), MinPO(92))
# Now commit the version...
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.commitVersion(version, '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
check_objects(92, 92)
# ...and undo the commit
info = self._storage.undoInfo()
self._undo(info[0]['id'], oid)
check_objects(91, 92)
oids = self._abortVersion(version)
assert len(oids) == 1
assert oids[0] == oid
check_objects(91, 91)
# Now undo the abort
info=self._storage.undoInfo()
self._undo(info[0]['id'], oid)
check_objects(91, 92)
def checkUndoCommitVersion(self):
def load_value(oid, version=''):
data, revid = self._storage.load(oid, version)
return zodb_unpickle(data).value
# create a bunch of packable transactions
oid = self._storage.new_oid()
revid = '\000' * 8
for i in range(4):
revid = self._x_dostore(oid, revid, description='packable%d' % i)
pt = time.time()
time.sleep(1)
oid1 = self._storage.new_oid()
version = 'version'
revid1 = self._x_dostore(oid1, data=MinPO(0), description='create1')
revid2 = self._x_dostore(oid1, data=MinPO(1), revid=revid1,
version=version, description='version1')
revid3 = self._x_dostore(oid1, data=MinPO(2), revid=revid2,
version=version, description='version2')
self._x_dostore(description='create2')
t = Transaction()
t.description = 'commit version'
self._storage.tpc_begin(t)
self._storage.commitVersion(version, '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
info = self._storage.undoInfo()
t_id = info[0]['id']
self.assertEqual(load_value(oid1), 2)
self.assertEqual(load_value(oid1, version), 2)
self._storage.pack(pt, referencesf)
t = Transaction()
t.description = 'undo commit version'
self._storage.tpc_begin(t)
self._storage.transactionalUndo(t_id, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(load_value(oid1), 0)
self.assertEqual(load_value(oid1, version), 2)
def checkUndoAbortVersion(self):
def load_value(oid, version=''):
data, revid = self._storage.load(oid, version)
return zodb_unpickle(data).value
# create a bunch of packable transactions
oid = self._storage.new_oid()
revid = '\000' * 8
for i in range(3):
revid = self._x_dostore(oid, revid, description='packable%d' % i)
pt = time.time()
time.sleep(1)
oid1 = self._storage.new_oid()
version = 'version'
revid1 = self._x_dostore(oid1, data=MinPO(0), description='create1')
revid2 = self._x_dostore(oid1, data=MinPO(1), revid=revid1,
version=version, description='version1')
revid3 = self._x_dostore(oid1, data=MinPO(2), revid=revid2,
version=version, description='version2')
self._x_dostore(description='create2')
t = Transaction()
t.description = 'abort version'
self._storage.tpc_begin(t)
self._storage.abortVersion(version, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
info = self._storage.undoInfo()
t_id = info[0]['id']
self.assertEqual(load_value(oid1), 0)
# after abort, we should see non-version data
self.assertEqual(load_value(oid1, version), 0)
t = Transaction()
t.description = 'undo abort version'
self._storage.tpc_begin(t)
self._storage.transactionalUndo(t_id, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(load_value(oid1), 0)
# t undo will re-create the version
self.assertEqual(load_value(oid1, version), 2)
info = self._storage.undoInfo()
t_id = info[0]['id']
self._storage.pack(pt, referencesf)
t = Transaction()
t.description = 'undo undo'
self._storage.tpc_begin(t)
self._storage.transactionalUndo(t_id, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# undo of undo will put as back where we started
self.assertEqual(load_value(oid1), 0)
# after abort, we should see non-version data
self.assertEqual(load_value(oid1, version), 0)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/VersionStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run the version related tests for a storage.
Any storage that supports versions should be able to pass all these tests.
"""
# XXX we should clean this code up to get rid of the #JF# comments.
# They were introduced when Jim reviewed the original version of the
# code. Barry and Jeremy didn't understand versions then.
import time
from ZODB import POSException
from ZODB.referencesf import referencesf
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle, snooze
from ZODB import DB
class VersionStorage:
def checkCommitVersionSerialno(self):
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(12))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(13),
version="version")
oids = self._commitVersion("version", "")
self.assertEqual([oid], oids)
data, revid3 = self._storage.load(oid, "")
# use repr() to avoid getting binary data in a traceback on error
self.assertNotEqual(`revid1`, `revid3`)
self.assertNotEqual(`revid2`, `revid3`)
def checkAbortVersionSerialno(self):
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(12))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(13),
version="version")
oids = self._abortVersion("version")
self.assertEqual([oid], oids)
data, revid3 = self._storage.load(oid, "")
# use repr() to avoid getting binary data in a traceback on error
self.assertEqual(`revid1`, `revid3`)
self.assertNotEqual(`revid2`, `revid3`)
def checkVersionedStoreAndLoad(self):
eq = self.assertEqual
# Store a couple of non-version revisions of the object
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12))
# And now store some new revisions in a version
version = 'test-version'
revid = self._dostore(oid, revid=revid, data=MinPO(13),
version=version)
revid = self._dostore(oid, revid=revid, data=MinPO(14),
version=version)
revid = self._dostore(oid, revid=revid, data=MinPO(15),
version=version)
# Now read back the object in both the non-version and version and
# make sure the values jive.
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(12))
data, vrevid = self._storage.load(oid, version)
eq(zodb_unpickle(data), MinPO(15))
if hasattr(self._storage, 'getSerial'):
s = self._storage.getSerial(oid)
eq(s, max(revid, vrevid))
def checkVersionedLoadErrors(self):
oid = self._storage.new_oid()
version = 'test-version'
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12),
version=version)
# Try to load a bogus oid
self.assertRaises(KeyError,
self._storage.load,
self._storage.new_oid(), '')
# Try to load a bogus version string
#JF# Nope, fall back to non-version
#JF# self.assertRaises(KeyError,
#JF# self._storage.load,
#JF# oid, 'bogus')
data, revid = self._storage.load(oid, 'bogus')
self.assertEqual(zodb_unpickle(data), MinPO(11))
def checkVersionLock(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
version = 'test-version'
revid = self._dostore(oid, revid=revid, data=MinPO(12),
version=version)
self.assertRaises(POSException.VersionLockError,
self._dostore,
oid, revid=revid, data=MinPO(14),
version='another-version')
def checkVersionEmpty(self):
# Before we store anything, these versions ought to be empty
version = 'test-version'
#JF# The empty string is not a valid version. I think that this should
#JF# be an error. Let's punt for now.
#JF# assert self._storage.versionEmpty('')
self.failUnless(self._storage.versionEmpty(version))
# Now store some objects
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12))
revid = self._dostore(oid, revid=revid, data=MinPO(13),
version=version)
revid = self._dostore(oid, revid=revid, data=MinPO(14),
version=version)
# The blank version should not be empty
#JF# The empty string is not a valid version. I think that this should
#JF# be an error. Let's punt for now.
#JF# assert not self._storage.versionEmpty('')
# Neither should 'test-version'
self.failUnless(not self._storage.versionEmpty(version))
# But this non-existant version should be empty
self.failUnless(self._storage.versionEmpty('bogus'))
def checkVersions(self):
unless = self.failUnless
# Store some objects in the non-version
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
oid3 = self._storage.new_oid()
revid1 = self._dostore(oid1, data=MinPO(11))
revid2 = self._dostore(oid2, data=MinPO(12))
revid3 = self._dostore(oid3, data=MinPO(13))
# Now create some new versions
revid1 = self._dostore(oid1, revid=revid1, data=MinPO(14),
version='one')
revid2 = self._dostore(oid2, revid=revid2, data=MinPO(15),
version='two')
revid3 = self._dostore(oid3, revid=revid3, data=MinPO(16),
version='three')
# Ask for the versions
versions = self._storage.versions()
unless('one' in versions)
unless('two' in versions)
unless('three' in versions)
# Now flex the `max' argument
versions = self._storage.versions(1)
self.assertEqual(len(versions), 1)
unless('one' in versions or 'two' in versions or 'three' in versions)
def _setup_version(self, version='test-version'):
# Store some revisions in the non-version
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(49))
revid = self._dostore(oid, revid=revid, data=MinPO(50))
nvrevid = revid = self._dostore(oid, revid=revid, data=MinPO(51))
# Now do some stores in a version
revid = self._dostore(oid, revid=revid, data=MinPO(52),
version=version)
revid = self._dostore(oid, revid=revid, data=MinPO(53),
version=version)
revid = self._dostore(oid, revid=revid, data=MinPO(54),
version=version)
return oid, version
def checkAbortVersion(self):
eq = self.assertEqual
oid, version = self._setup_version()
# XXX Not sure I can write a test for getSerial() in the
# presence of aborted versions, because FileStorage and
# Berkeley storage give a different answer. I think Berkeley
# is right and FS is wrong.
oids = self._abortVersion(version)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(51))
def checkAbortVersionErrors(self):
eq = self.assertEqual
oid, version = self._setup_version()
# Now abort a bogus version
t = Transaction()
self._storage.tpc_begin(t)
#JF# The spec is silent on what happens if you abort or commit
#JF# a non-existent version. FileStorage consideres this a noop.
#JF# We can change the spec, but until we do ....
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.abortVersion,
#JF# 'bogus', t)
# And try to abort the empty version
if (hasattr(self._storage, 'supportsTransactionalUndo')
and self._storage.supportsTransactionalUndo()):
# XXX FileStorage used to be broken on this one
self.assertRaises(POSException.VersionError,
self._storage.abortVersion,
'', t)
# But now we really try to abort the version
oids = self._storage.abortVersion(version, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
eq(len(oids), 1)
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(51))
def checkCommitVersionErrors(self):
if not (hasattr(self._storage, 'supportsTransactionalUndo')
and self._storage.supportsTransactionalUndo()):
# XXX FileStorage used to be broken on this one
return
eq = self.assertEqual
oid1, version1 = self._setup_version('one')
data, revid1 = self._storage.load(oid1, version1)
eq(zodb_unpickle(data), MinPO(54))
t = Transaction()
self._storage.tpc_begin(t)
try:
self.assertRaises(POSException.VersionCommitError,
self._storage.commitVersion,
'one', 'one', t)
finally:
self._storage.tpc_abort(t)
def checkNewSerialOnCommitVersionToVersion(self):
eq = self.assertEqual
oid, version = self._setup_version()
data, vserial = self._storage.load(oid, version)
data, nserial = self._storage.load(oid, '')
version2 = 'test version 2'
self._commitVersion(version, version2)
data, serial = self._storage.load(oid, version2)
self.failUnless(serial != vserial and serial != nserial,
"New serial, %r, should be different from the old "
"version, %r, and non-version, %r, serials."
% (serial, vserial, nserial))
def checkModifyAfterAbortVersion(self):
eq = self.assertEqual
oid, version = self._setup_version()
self._abortVersion(version)
data, revid = self._storage.load(oid, '')
# And modify it a few times
revid = self._dostore(oid, revid=revid, data=MinPO(52))
revid = self._dostore(oid, revid=revid, data=MinPO(53))
revid = self._dostore(oid, revid=revid, data=MinPO(54))
data, newrevid = self._storage.load(oid, '')
eq(newrevid, revid)
eq(zodb_unpickle(data), MinPO(54))
def checkCommitToNonVersion(self):
eq = self.assertEqual
oid, version = self._setup_version()
data, revid = self._storage.load(oid, version)
eq(zodb_unpickle(data), MinPO(54))
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(51))
self._commitVersion(version, '')
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(54))
def checkCommitToOtherVersion(self):
eq = self.assertEqual
oid1, version1 = self._setup_version('one')
data, revid1 = self._storage.load(oid1, version1)
eq(zodb_unpickle(data), MinPO(54))
oid2, version2 = self._setup_version('two')
data, revid2 = self._storage.load(oid2, version2)
eq(zodb_unpickle(data), MinPO(54))
# make sure we see the non-version data when appropriate
data, revid2 = self._storage.load(oid1, version2)
eq(zodb_unpickle(data), MinPO(51))
data, revid2 = self._storage.load(oid2, version1)
eq(zodb_unpickle(data), MinPO(51))
data, revid2 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(51))
# Okay, now let's commit object1 to version2
oids = self._commitVersion(version1, version2)
eq(len(oids), 1)
eq(oids[0], oid1)
data, revid = self._storage.load(oid1, version2)
eq(zodb_unpickle(data), MinPO(54))
data, revid = self._storage.load(oid2, version2)
eq(zodb_unpickle(data), MinPO(54))
# an object can only exist in one version, so a load from
# version1 should now give the non-version data
data, revid2 = self._storage.load(oid1, version1)
eq(zodb_unpickle(data), MinPO(51))
# as should a version that has never been used
data, revid2 = self._storage.load(oid1, 'bela lugosi')
eq(zodb_unpickle(data), MinPO(51))
def checkAbortOneVersionCommitTheOther(self):
eq = self.assertEqual
oid1, version1 = self._setup_version('one')
data, revid1 = self._storage.load(oid1, version1)
eq(zodb_unpickle(data), MinPO(54))
oid2, version2 = self._setup_version('two')
data, revid2 = self._storage.load(oid2, version2)
eq(zodb_unpickle(data), MinPO(54))
# Let's make sure we can't get object1 in version2
data, revid2 = self._storage.load(oid1, version2)
eq(zodb_unpickle(data), MinPO(51))
oids = self._abortVersion(version1)
eq(len(oids), 1)
eq(oids[0], oid1)
data, revid = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(51))
#JF# Ditto
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.load, oid1, version1)
data, revid = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(51))
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.load, oid1, version2)
data, revid = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(51))
data, revid = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(51))
data, revid = self._storage.load(oid2, version2)
eq(zodb_unpickle(data), MinPO(54))
# Okay, now let's commit version2 back to the trunk
oids = self._commitVersion(version2, '')
eq(len(oids), 1)
eq(oids[0], oid2)
data, revid = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(51))
# But the trunk should be up to date now
data, revid = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(54))
data, revid = self._storage.load(oid2, version2)
eq(zodb_unpickle(data), MinPO(54))
#JF# To do a test like you want, you have to add the data in a version
oid = self._storage.new_oid()
revid = self._dostore(oid, revid=revid, data=MinPO(54), version='one')
self.assertRaises(KeyError,
self._storage.load, oid, '')
self.assertRaises(KeyError,
self._storage.load, oid, 'two')
def checkCreateObjectInVersionWithAbort(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=21, version="one")
revid = self._dostore(oid, revid=revid, data=23, version='one')
revid = self._dostore(oid, revid=revid, data=34, version='one')
# Now abort the version and the creation
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.abortVersion('one', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(oids, [oid])
def checkPackVersions(self):
db = DB(self._storage)
cn = db.open(version="testversion")
root = cn.root()
obj = root["obj"] = MinPO("obj")
root["obj2"] = MinPO("obj2")
txn = get_transaction()
txn.note("create 2 objs in version")
txn.commit()
obj.value = "77"
txn = get_transaction()
txn.note("modify obj in version")
txn.commit()
# undo the modification to generate a mix of backpointers
# and versions for pack to chase
info = db.undoInfo()
db.undo(info[0]["id"])
txn = get_transaction()
txn.note("undo modification")
txn.commit()
snooze()
self._storage.pack(time.time(), referencesf)
db.commitVersion("testversion")
txn = get_transaction()
txn.note("commit version")
txn.commit()
cn = db.open()
root = cn.root()
root["obj"] = "no version"
txn = get_transaction()
txn.note("modify obj")
txn.commit()
self._storage.pack(time.time(), referencesf)
def checkPackVersionsInPast(self):
db = DB(self._storage)
cn = db.open(version="testversion")
root = cn.root()
obj = root["obj"] = MinPO("obj")
root["obj2"] = MinPO("obj2")
txn = get_transaction()
txn.note("create 2 objs in version")
txn.commit()
obj.value = "77"
txn = get_transaction()
txn.note("modify obj in version")
txn.commit()
t0 = time.time()
snooze()
# undo the modification to generate a mix of backpointers
# and versions for pack to chase
info = db.undoInfo()
db.undo(info[0]["id"])
txn = get_transaction()
txn.note("undo modification")
txn.commit()
self._storage.pack(t0, referencesf)
db.commitVersion("testversion")
txn = get_transaction()
txn.note("commit version")
txn.commit()
cn = db.open()
root = cn.root()
root["obj"] = "no version"
txn = get_transaction()
txn.note("modify obj")
txn.commit()
self._storage.pack(time.time(), referencesf)
def checkPackVersionReachable(self):
db = DB(self._storage)
cn = db.open()
root = cn.root()
names = "a", "b", "c"
for name in names:
root[name] = MinPO(name)
get_transaction().commit()
for name in names:
cn2 = db.open(version=name)
rt2 = cn2.root()
obj = rt2[name]
obj.value = MinPO("version")
get_transaction().commit()
cn2.close()
root["d"] = MinPO("d")
get_transaction().commit()
snooze()
self._storage.pack(time.time(), referencesf)
cn.sync()
cn._cache.clear()
# make sure all the non-version data is there
for name, obj in root.items():
self.assertEqual(name, obj.value)
# make sure all the version-data is there,
# and create a new revision in the version
for name in names:
cn2 = db.open(version=name)
rt2 = cn2.root()
obj = rt2[name].value
self.assertEqual(obj.value, "version")
obj.value = "still version"
get_transaction().commit()
cn2.close()
db.abortVersion("b")
txn = get_transaction()
txn.note("abort version b")
txn.commit()
t = time.time()
snooze()
L = db.undoInfo()
db.undo(L[0]["id"])
txn = get_transaction()
txn.note("undo abort")
txn.commit()
self._storage.pack(t, referencesf)
cn2 = db.open(version="b")
rt2 = cn2.root()
self.assertEqual(rt2["b"].value.value, "still version")
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/__init__.py ===
# Having this makes debugging better.
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/dangle.py ===
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Functional test to produce a dangling reference."""
import time
from ZODB.FileStorage import FileStorage
from ZODB import DB
from Persistence import Persistent
class P(Persistent):
pass
def create_dangling_ref(db):
rt = db.open().root()
rt[1] = o1 = P()
get_transaction().note("create o1")
get_transaction().commit()
rt[2] = o2 = P()
get_transaction().note("create o2")
get_transaction().commit()
c = o1.child = P()
get_transaction().note("set child on o1")
get_transaction().commit()
o1.child = P()
get_transaction().note("replace child on o1")
get_transaction().commit()
time.sleep(2)
# The pack should remove the reference to c, because it is no
# longer referenced from o1. But the object still exists and has
# an oid, so a new commit of it won't create a new object.
db.pack()
print repr(c._p_oid)
o2.child = c
get_transaction().note("set child on o2")
get_transaction().commit()
def main():
fs = FileStorage("dangle.fs")
db = DB(fs)
create_dangling_ref(db)
db.close()
if __name__ == "__main__":
main()
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/speed.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
usage="""Test speed of a ZODB storage
Options:
-d file The data file to use as input.
The default is this script.
-n n The number of repititions
-s module A module that defines a 'Storage'
attribute, which is an open storage.
If not specified, a FileStorage will ne
used.
-z Test compressing data
-D Run in debug mode
-L Test loads as well as stores by minimizing
the cache after eachrun
-M Output means only
"""
import sys, os, getopt, string, time
sys.path.insert(0, os.getcwd())
import ZODB, ZODB.FileStorage
import Persistence
class P(Persistence.Persistent): pass
def main(args):
opts, args = getopt.getopt(args, 'zd:n:Ds:LM')
z=s=None
data=sys.argv[0]
nrep=5
minimize=0
detailed=1
for o, v in opts:
if o=='-n': nrep=string.atoi(v)
elif o=='-d': data=v
elif o=='-s': s=v
elif o=='-z':
global zlib
import zlib
z=compress
elif o=='-L':
minimize=1
elif o=='-M':
detailed=0
elif o=='-D':
global debug
os.environ['STUPID_LOG_FILE']=''
os.environ['STUPID_LOG_SEVERITY']='-999'
if s:
s=__import__(s, globals(), globals(), ('__doc__',))
s=s.Storage
else:
s=ZODB.FileStorage.FileStorage('zeo_speed.fs', create=1)
data=open(data).read()
db=ZODB.DB(s,
# disable cache deactivation
cache_size=4000,
cache_deactivate_after=6000,)
results={1:0, 10:0, 100:0, 1000:0}
for j in range(nrep):
for r in 1, 10, 100, 1000:
t=time.time()
jar=db.open()
get_transaction().begin()
rt=jar.root()
key='s%s' % r
if rt.has_key(key): p=rt[key]
else: rt[key]=p=P()
for i in range(r):
if z is not None: d=z(data)
else: d=data
v=getattr(p, str(i), P())
v.d=d
setattr(p,str(i),v)
get_transaction().commit()
jar.close()
t=time.time()-t
if detailed:
sys.stderr.write("%s\t%s\t%.4f\n" % (j, r, t))
sys.stdout.flush()
results[r]=results[r]+t
rt=d=p=v=None # release all references
if minimize:
time.sleep(3)
jar.cacheMinimize(3)
if detailed: print '-'*24
for r in 1, 10, 100, 1000:
t=results[r]/nrep
sys.stderr.write("mean:\t%s\t%.4f\t%.4f (s/o)\n" % (r, t, t/r))
db.close()
def compress(s):
c=zlib.compressobj()
o=c.compress(s)
return o+c.flush()
if __name__=='__main__': main(sys.argv[1:])
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testActivityMonitor.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of the default activity monitor.
See ZODB/ActivityMonitor.py
$Id: testActivityMonitor.py,v 1.1 2004/04/01 18:26:43 sidnei Exp $
"""
import unittest
import time
from ZODB.ActivityMonitor import ActivityMonitor
class FakeConnection:
loads = 0
stores = 0
def _transferred(self, loads, stores):
self.loads = self.loads + loads
self.stores = self.stores + stores
def getTransferCounts(self, clear=0):
res = self.loads, self.stores
if clear:
self.loads = self.stores = 0
return res
class Tests(unittest.TestCase):
def testAddLogEntries(self):
am = ActivityMonitor(history_length=3600)
self.assertEqual(len(am.log), 0)
c = FakeConnection()
c._transferred(1, 2)
am.closedConnection(c)
c._transferred(3, 7)
am.closedConnection(c)
self.assertEqual(len(am.log), 2)
def testTrim(self):
am = ActivityMonitor(history_length=0.1)
c = FakeConnection()
c._transferred(1, 2)
am.closedConnection(c)
time.sleep(0.2)
c._transferred(3, 7)
am.closedConnection(c)
self.assert_(len(am.log) <= 1)
def testSetHistoryLength(self):
am = ActivityMonitor(history_length=3600)
c = FakeConnection()
c._transferred(1, 2)
am.closedConnection(c)
time.sleep(0.2)
c._transferred(3, 7)
am.closedConnection(c)
self.assertEqual(len(am.log), 2)
am.setHistoryLength(0.1)
self.assertEqual(am.getHistoryLength(), 0.1)
self.assert_(len(am.log) <= 1)
def testActivityAnalysis(self):
am = ActivityMonitor(history_length=3600)
c = FakeConnection()
c._transferred(1, 2)
am.closedConnection(c)
c._transferred(3, 7)
am.closedConnection(c)
res = am.getActivityAnalysis(start=0, end=0, divisions=10)
lastend = 0
for n in range(9):
div = res[n]
self.assertEqual(div['stores'], 0)
self.assertEqual(div['loads'], 0)
self.assert_(div['start'] > 0)
self.assert_(div['start'] >= lastend)
self.assert_(div['start'] < div['end'])
lastend = div['end']
div = res[9]
self.assertEqual(div['stores'], 9)
self.assertEqual(div['loads'], 4)
self.assert_(div['start'] > 0)
self.assert_(div['start'] >= lastend)
self.assert_(div['start'] < div['end'])
def test_suite():
return unittest.makeSuite(Tests)
if __name__=='__main__':
unittest.main(defaultTest='test_suite')
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testCache.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""A few simple tests of the public cache API.
Each DB Connection has a separate PickleCache. The Cache serves two
purposes. It acts like a memo for unpickling. It also keeps recent
objects in memory under the assumption that they may be used again.
"""
from __future__ import nested_scopes
import time
import types
import unittest
import ZODB
import ZODB.MappingStorage
from ZODB.cPickleCache import PickleCache
from ZODB.POSException import ConflictError
from ZODB.PersistentMapping import PersistentMapping
from ZODB.tests.MinPO import MinPO
from ZODB.utils import p64
from Persistence import Persistent
class CacheTestBase(unittest.TestCase):
def setUp(self):
store = ZODB.MappingStorage.MappingStorage()
self.db = ZODB.DB(store,
cache_size = self.CACHE_SIZE)
self.conns = []
def tearDown(self):
for conn in self.conns:
conn.close()
self.db.close()
CACHE_SIZE = 20
def noodle_new_connection(self):
"""Do some reads and writes on a new connection."""
c = self.db.open()
self.conns.append(c)
self.noodle_connection(c)
def noodle_connection(self, c):
r = c.root()
i = len(self.conns)
d = r.get(i)
if d is None:
d = r[i] = PersistentMapping()
get_transaction().commit()
for i in range(15):
o = d.get(i)
if o is None:
o = d[i] = MinPO(i)
o.value += 1
get_transaction().commit()
class DBMethods(CacheTestBase):
__super_setUp = CacheTestBase.setUp
def setUp(self):
self.__super_setUp()
for i in range(4):
self.noodle_new_connection()
def checkCacheDetail(self):
for name, count in self.db.cacheDetail():
self.assert_(isinstance(name, types.StringType))
self.assert_(isinstance(count, types.IntType))
def checkCacheExtremeDetail(self):
expected = ['conn_no', 'id', 'oid', 'rc', 'klass', 'state']
for dict in self.db.cacheExtremeDetail():
for k, v in dict.items():
self.assert_(k in expected)
# XXX not really sure how to do a black box test of the cache.
# should the full sweep and minimize calls always remove things?
# The sleep(3) call is based on the implementation of the cache.
# It's measures time in units of three seconds, so something used
# within the last three seconds looks like something that is
# currently being used. Three seconds old is the youngest
# something can be and still be collected.
def checkFullSweep(self):
old_size = self.db.cacheSize()
time.sleep(3)
self.db.cacheFullSweep(0)
new_size = self.db.cacheSize()
self.assert_(new_size < old_size, "%s < %s" % (old_size, new_size))
def checkMinimize(self):
old_size = self.db.cacheSize()
time.sleep(3)
self.db.cacheMinimize(0)
new_size = self.db.cacheSize()
self.assert_(new_size < old_size, "%s < %s" % (old_size, new_size))
# XXX don't have an explicit test for incrgc, because the
# connection and database call it internally
# XXX same for the get and invalidate methods
def checkLRUitems(self):
# get a cache
c = self.conns[0]._cache
c.lru_items()
def checkClassItems(self):
c = self.conns[0]._cache
c.klass_items()
class LRUCacheTests(CacheTestBase):
def checkLRU(self):
# verify the LRU behavior of the cache
dataset_size = 5
CACHE_SIZE = dataset_size*2+1
# a cache big enough to hold the objects added in two
# transactions, plus the root object
self.db.setCacheSize(CACHE_SIZE)
c = self.db.open()
r = c.root()
l = {}
# the root is the only thing in the cache, because all the
# other objects are new
self.assertEqual(len(c._cache), 1)
# run several transactions
for t in range(5):
for i in range(dataset_size):
l[(t,i)] = r[i] = MinPO(i)
get_transaction().commit()
# commit() will register the objects, placing them in the
# cache. at the end of commit, the cache will be reduced
# down to CACHE_SIZE items
if len(l)>CACHE_SIZE:
self.assertEqual(c._cache.ringlen(), CACHE_SIZE)
for i in range(dataset_size):
# Check objects added in the first two transactions.
# They must all be ghostified.
self.assertEqual(l[(0,i)]._p_changed, None)
self.assertEqual(l[(1,i)]._p_changed, None)
# Check objects added in the last two transactions.
# They must all still exist in memory, but have
# had their changes flushed
self.assertEqual(l[(3,i)]._p_changed, 0)
self.assertEqual(l[(4,i)]._p_changed, 0)
# Of the objects added in the middle transaction, most
# will have been ghostified. There is one cache slot
# that may be occupied by either one of those objects or
# the root, depending on precise order of access. We do
# not bother to check this
def checkSize(self):
self.assertEqual(self.db.cacheSize(), 0)
self.assertEqual(self.db.cacheDetailSize(), [])
CACHE_SIZE = 10
self.db.setCacheSize(CACHE_SIZE)
CONNS = 3
for i in range(CONNS):
self.noodle_new_connection()
self.assertEquals(self.db.cacheSize(), CACHE_SIZE * CONNS)
details = self.db.cacheDetailSize()
self.assertEquals(len(details), CONNS)
for d in details:
self.assertEquals(d['ngsize'], CACHE_SIZE)
self.assertEquals(d['size'], CACHE_SIZE)
def checkDetail(self):
CACHE_SIZE = 10
self.db.setCacheSize(CACHE_SIZE)
CONNS = 3
for i in range(CONNS):
self.noodle_new_connection()
for klass, count in self.db.cacheDetail():
if klass.endswith('MinPO'):
self.assertEqual(count, CONNS * CACHE_SIZE)
if klass.endswith('PersistentMapping'):
# one root per connection
self.assertEqual(count, CONNS)
for details in self.db.cacheExtremeDetail():
# one dict per object. keys:
if details['klass'].endswith('PersistentMapping'):
self.assertEqual(details['state'], None)
else:
self.assert_(details['klass'].endswith('MinPO'))
self.assertEqual(details['state'], 0)
class StubDataManager:
def setklassstate(self, object):
pass
class StubObject(Persistent):
pass
class CacheErrors(unittest.TestCase):
def setUp(self):
self.jar = StubDataManager()
self.cache = PickleCache(self.jar)
def checkGetBogusKey(self):
self.assertRaises(KeyError, self.cache.get, p64(0))
try:
self.cache[12]
except KeyError:
pass
else:
self.fail("expected KeyError")
try:
self.cache[12] = 12
except TypeError:
pass
else:
self.fail("expected TyepError")
try:
del self.cache[12]
except TypeError:
pass
else:
self.fail("expected TypeError")
def checkBogusObject(self):
def add(key, obj):
self.cache[key] = obj
key = p64(2)
# value isn't persistent
self.assertRaises(TypeError, add, key, 12)
o = StubObject()
# o._p_oid == None
self.assertRaises(TypeError, add, key, o)
o._p_oid = p64(3)
self.assertRaises(ValueError, add, key, o)
o._p_oid = key
# o._p_jar == None
self.assertRaises(Exception, add, key, o)
o._p_jar = self.jar
self.cache[key] = o
# make sure it can be added multiple times
self.cache[key] = o
# same object, different keys
self.assertRaises(ValueError, add, p64(0), o)
def checkTwoCaches(self):
jar2 = StubDataManager()
cache2 = PickleCache(jar2)
o = StubObject()
key = o._p_oid = p64(1)
o._p_jar = jar2
cache2[key] = o
try:
self.cache[key] = o
except ValueError:
pass
else:
self.fail("expected ValueError because object already in cache")
def checkReadOnlyAttrsWhenCached(self):
o = StubObject()
key = o._p_oid = p64(1)
o._p_jar = self.jar
self.cache[key] = o
try:
o._p_oid = p64(2)
except ValueError:
pass
else:
self.fail("expect that you can't change oid of cached object")
try:
del o._p_jar
except ValueError:
pass
else:
self.fail("expect that you can't delete jar of cached object")
def test_suite():
s = unittest.makeSuite(DBMethods, 'check')
s.addTest(unittest.makeSuite(LRUCacheTests, 'check'))
s.addTest(unittest.makeSuite(CacheErrors, 'check'))
return s
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testConfig.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import os
import errno
import shutil
import tempfile
import unittest
import ZODB.config
import ZODB.tests
from ZODB.POSException import ReadOnlyError
from ZEO.ClientStorage import ClientDisconnected
class ConfigTestBase(unittest.TestCase):
def _opendb(self, s):
return ZODB.config.databaseFromString(s)
def _test(self, s):
db = self._opendb(s)
# Do something with the database to make sure it works
cn = db.open()
rt = cn.root()
rt["test"] = 1
get_transaction().commit()
db.close()
class ZODBConfigTest(ConfigTestBase):
def test_map_config1(self):
self._test(
"""
<zodb>
<mappingstorage/>
</zodb>
""")
def test_map_config2(self):
self._test(
"""
<zodb>
<mappingstorage/>
cache-size 1000
</zodb>
""")
def test_file_config1(self):
import ZODB.FileStorage
path = tempfile.mktemp()
self._test(
"""
<zodb>
<filestorage>
path %s
</filestorage>
</zodb>
""" % path)
ZODB.FileStorage.cleanup(path)
def test_file_config2(self):
import ZODB.FileStorage
path = tempfile.mktemp()
cfg = """
<zodb>
<filestorage>
path %s
create false
read-only true
</filestorage>
</zodb>
""" % path
self.assertRaises(ReadOnlyError, self._test, cfg)
ZODB.FileStorage.cleanup(path)
def test_zeo_config(self):
# We're looking for a port that doesn't exist so a connection attempt
# will fail. Instead of elaborate logic to loop over a port
# calculation, we'll just pick a simple "random", likely to not-exist
# port number and add an elaborate comment explaining this instead.
# Go ahead, grep for 9.
cfg = """
<zodb>
<zeoclient>
server localhost:56897
wait false
</zeoclient>
</zodb>
"""
self.assertRaises(ClientDisconnected, self._test, cfg)
def test_demo_config(self):
cfg = """
<zodb unused-name>
<demostorage>
name foo
<mappingstorage/>
</demostorage>
</zodb>
"""
self._test(cfg)
class BDBConfigTest(ConfigTestBase):
def setUp(self):
self._path = tempfile.mktemp()
try:
os.mkdir(self._path)
except OSError, e:
if e.errno <> errno.EEXIST:
raise
def tearDown(self):
shutil.rmtree(self._path)
def test_bdbfull_simple(self):
cfg = """
<zodb>
<fullstorage>
envdir %s
</fullstorage>
</zodb>
""" % self._path
self._test(cfg)
def test_bdbminimal_simple(self):
cfg = """
<zodb>
<minimalstorage>
envdir %s
</minimalstorage>
</zodb>
""" % self._path
self._test(cfg)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ZODBConfigTest))
# Only run the Berkeley tests if they are available
import BDBStorage
if BDBStorage.is_available:
suite.addTest(unittest.makeSuite(BDBConfigTest))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testDB.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import os
import time
import unittest
import ZODB
import ZODB.FileStorage
from ZODB.tests.MinPO import MinPO
class DBTests(unittest.TestCase):
def setUp(self):
self.__path = os.path.abspath('test.fs')
store = ZODB.FileStorage.FileStorage(self.__path)
self.db = ZODB.DB(store)
def tearDown(self):
self.db.close()
for s in ('', '.index', '.lock', '.tmp'):
if os.path.exists(self.__path+s):
os.remove(self.__path+s)
def dowork(self, version=''):
c = self.db.open(version)
r = c.root()
o = r[time.time()] = MinPO(0)
get_transaction().commit()
for i in range(25):
o.value = MinPO(i)
get_transaction().commit()
o = o.value
c.close()
# make sure the basic methods are callable
def testSets(self):
# test set methods that have non-trivial implementations
self.db.setCacheDeactivateAfter(12) # deprecated
self.db.setCacheSize(15)
self.db.setVersionCacheDeactivateAfter(12) # deprecated
self.db.setVersionCacheSize(15)
def test_removeVersionPool(self):
# Test that we can remove a version pool
# This is white box because we check some internal data structures
self.dowork()
self.dowork('v2')
c1 = self.db.open('v1')
c1.close() # return to pool
c12 = self.db.open('v1')
c12.close() # return to pool
self.assert_(c1 is c12) # should be same
pools, pooll = self.db._pools
self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2)
c12 = self.db.open('v1')
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3)
def _test_for_leak(self):
self.dowork()
self.dowork('v2')
while 1:
c1 = self.db.open('v1')
self.db.removeVersionPool('v1')
c1.close() # return to pool
def test_removeVersionPool_while_connection_open(self):
# Test that we can remove a version pool
# This is white box because we check some internal data structures
self.dowork()
self.dowork('v2')
c1 = self.db.open('v1')
c1.close() # return to pool
c12 = self.db.open('v1')
self.assert_(c1 is c12) # should be same
pools, pooll = self.db._pools
self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2)
c12.close() # should leave pools alone
self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2)
c12 = self.db.open('v1')
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3)
def test_suite():
return unittest.makeSuite(DBTests)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testDemoStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import ZODB.DemoStorage
import os, unittest
from ZODB.tests import StorageTestBase, BasicStorage, \
VersionStorage, Synchronization
class DemoStorageTests(StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
VersionStorage.VersionStorage,
Synchronization.SynchronizedStorage,
):
def setUp(self):
self._storage = ZODB.DemoStorage.DemoStorage()
def tearDown(self):
self._storage.close()
def checkOversizeNote(self):
# This base class test checks for the common case where a storage
# doesnt support huge transaction metadata. This storage doesnt
# have this limit, so we inhibit this test here.
pass
def test_suite():
suite = unittest.makeSuite(DemoStorageTests, 'check')
return suite
if __name__ == "__main__":
loader = unittest.TestLoader()
loader.testMethodPrefix = "check"
unittest.main(testLoader=loader)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testFileStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from __future__ import nested_scopes
import ZODB.FileStorage
import sys, os, unittest
import errno
import filecmp
import StringIO
from ZODB.Transaction import Transaction
from ZODB import POSException
from ZODB.fsrecover import recover
from ZODB.tests import StorageTestBase, BasicStorage, \
TransactionalUndoStorage, VersionStorage, \
TransactionalUndoVersionStorage, PackableStorage, \
Synchronization, ConflictResolution, HistoryStorage, \
IteratorStorage, Corruption, RevisionStorage, PersistentStorage, \
MTStorage, ReadOnlyStorage, RecoveryStorage
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle
class FileStorageTests(
StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
TransactionalUndoStorage.TransactionalUndoStorage,
RevisionStorage.RevisionStorage,
VersionStorage.VersionStorage,
TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
PackableStorage.PackableStorage,
Synchronization.SynchronizedStorage,
ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage,
HistoryStorage.HistoryStorage,
IteratorStorage.IteratorStorage,
IteratorStorage.ExtendedIteratorStorage,
PersistentStorage.PersistentStorage,
MTStorage.MTStorage,
ReadOnlyStorage.ReadOnlyStorage
):
def open(self, **kwargs):
self._storage = ZODB.FileStorage.FileStorage('FileStorageTests.fs',
**kwargs)
def setUp(self):
self.open(create=1)
def tearDown(self):
self._storage.close()
StorageTestBase.removefs("FileStorageTests.fs")
def checkLongMetadata(self):
s = "X" * 75000
try:
self._dostore(user=s)
except POSException.StorageError:
pass
else:
self.fail("expect long user field to raise error")
try:
self._dostore(description=s)
except POSException.StorageError:
pass
else:
self.fail("expect long user field to raise error")
def check_use_fsIndex(self):
from ZODB.fsIndex import fsIndex
self.assertEqual(self._storage._index.__class__, fsIndex)
# XXX We could really use some tests for sanity checking
def check_conversion_to_fsIndex_not_if_readonly(self):
self.tearDown()
class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self):
return {}, {}, {}, {}, {}, {}, {}
from ZODB.fsIndex import fsIndex
# Hack FileStorage to create dictionary indexes
self._storage = OldFileStorage('FileStorageTests.fs')
self.assertEqual(type(self._storage._index), type({}))
for i in range(10):
self._dostore()
# Should save the index
self._storage.close()
self._storage = ZODB.FileStorage.FileStorage(
'FileStorageTests.fs', read_only=1)
self.assertEqual(type(self._storage._index), type({}))
def check_conversion_to_fsIndex(self):
self.tearDown()
class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self):
return {}, {}, {}, {}, {}, {}, {}
from ZODB.fsIndex import fsIndex
# Hack FileStorage to create dictionary indexes
self._storage = OldFileStorage('FileStorageTests.fs')
self.assertEqual(type(self._storage._index), type({}))
for i in range(10):
self._dostore()
oldindex = self._storage._index.copy()
# Should save the index
self._storage.close()
self._storage = ZODB.FileStorage.FileStorage('FileStorageTests.fs')
self.assertEqual(self._storage._index.__class__, fsIndex)
self.failUnless(self._storage._used_index)
index = {}
for k, v in self._storage._index.items():
index[k] = v
self.assertEqual(index, oldindex)
def check_save_after_load_with_no_index(self):
for i in range(10):
self._dostore()
self._storage.close()
os.remove('FileStorageTests.fs.index')
self.open()
self.assertEqual(self._storage._saved, 1)
# This would make the unit tests too slow
# check_save_after_load_that_worked_hard(self)
def check_periodic_save_index(self):
# Check the basic algorithm
oldsaved = self._storage._saved
self._storage._records_before_save = 10
for i in range(4):
self._dostore()
self.assertEqual(self._storage._saved, oldsaved)
self._dostore()
self.assertEqual(self._storage._saved, oldsaved+1)
# Now make sure the parameter changes as we get bigger
for i in range(20):
self._dostore()
self.failUnless(self._storage._records_before_save > 20)
# There are a bunch of tests that the current pack() implementation
# does not past. We need to fix pack(), but don't want tests to
# fail until then.
def checkPackVersionsInPast(self):
pass
def checkPackAfterUndoDeletion(self):
pass
class FileStorageRecoveryTest(
StorageTestBase.StorageTestBase,
RecoveryStorage.RecoveryStorage,
):
def setUp(self):
StorageTestBase.removefs("Source.fs")
StorageTestBase.removefs("Dest.fs")
self._storage = ZODB.FileStorage.FileStorage('Source.fs')
self._dst = ZODB.FileStorage.FileStorage('Dest.fs')
def tearDown(self):
self._storage.close()
self._dst.close()
StorageTestBase.removefs("Source.fs")
StorageTestBase.removefs("Dest.fs")
def new_dest(self):
StorageTestBase.removefs('Dest.fs')
return ZODB.FileStorage.FileStorage('Dest.fs')
def test_suite():
suite = unittest.makeSuite(FileStorageTests, 'check')
suite2 = unittest.makeSuite(Corruption.FileStorageCorruptTests, 'check')
suite3 = unittest.makeSuite(FileStorageRecoveryTest, 'check')
suite.addTest(suite2)
suite.addTest(suite3)
return suite
if __name__=='__main__':
unittest.main()
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testMappingStorage.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import ZODB.MappingStorage
import os, unittest
from ZODB.tests import StorageTestBase, BasicStorage, Synchronization
class MappingStorageTests(StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
Synchronization.SynchronizedStorage,
):
def setUp(self):
self._storage = ZODB.MappingStorage.MappingStorage()
def tearDown(self):
self._storage.close()
def checkOversizeNote(self):
# This base class test checks for the common case where a storage
# doesnt support huge transaction metadata. This storage doesnt
# have this limit, so we inhibit this test here.
pass
def test_suite():
suite = unittest.makeSuite(MappingStorageTests, 'check')
return suite
if __name__ == "__main__":
loader = unittest.TestLoader()
loader.testMethodPrefix = "check"
unittest.main(testLoader=loader)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testPersistentList.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the list interface to PersistentList
"""
import unittest
from ZODB.PersistentList import PersistentList
l0 = []
l1 = [0]
l2 = [0, 1]
class TestPList(unittest.TestCase):
def checkTheWorld(self):
# Test constructors
u = PersistentList()
u0 = PersistentList(l0)
u1 = PersistentList(l1)
u2 = PersistentList(l2)
uu = PersistentList(u)
uu0 = PersistentList(u0)
uu1 = PersistentList(u1)
uu2 = PersistentList(u2)
v = PersistentList(tuple(u))
class OtherList:
def __init__(self, initlist):
self.__data = initlist
def __len__(self):
return len(self.__data)
def __getitem__(self, i):
return self.__data[i]
v0 = PersistentList(OtherList(u0))
vv = PersistentList("this is also a sequence")
# Test __repr__
eq = self.assertEqual
eq(str(u0), str(l0), "str(u0) == str(l0)")
eq(repr(u1), repr(l1), "repr(u1) == repr(l1)")
eq(`u2`, `l2`, "`u2` == `l2`")
# Test __cmp__ and __len__
def mycmp(a, b):
r = cmp(a, b)
if r < 0: return -1
if r > 0: return 1
return r
all = [l0, l1, l2, u, u0, u1, u2, uu, uu0, uu1, uu2]
for a in all:
for b in all:
eq(mycmp(a, b), mycmp(len(a), len(b)),
"mycmp(a, b) == mycmp(len(a), len(b))")
# Test __getitem__
for i in range(len(u2)):
eq(u2[i], i, "u2[i] == i")
# Test __setitem__
uu2[0] = 0
uu2[1] = 100
try:
uu2[2] = 200
except IndexError:
pass
else:
raise TestFailed("uu2[2] shouldn't be assignable")
# Test __delitem__
del uu2[1]
del uu2[0]
try:
del uu2[0]
except IndexError:
pass
else:
raise TestFailed("uu2[0] shouldn't be deletable")
# Test __getslice__
for i in range(-3, 4):
eq(u2[:i], l2[:i], "u2[:i] == l2[:i]")
eq(u2[i:], l2[i:], "u2[i:] == l2[i:]")
for j in range(-3, 4):
eq(u2[i:j], l2[i:j], "u2[i:j] == l2[i:j]")
# Test __setslice__
for i in range(-3, 4):
u2[:i] = l2[:i]
eq(u2, l2, "u2 == l2")
u2[i:] = l2[i:]
eq(u2, l2, "u2 == l2")
for j in range(-3, 4):
u2[i:j] = l2[i:j]
eq(u2, l2, "u2 == l2")
uu2 = u2[:]
uu2[:0] = [-2, -1]
eq(uu2, [-2, -1, 0, 1], "uu2 == [-2, -1, 0, 1]")
uu2[0:] = []
eq(uu2, [], "uu2 == []")
# Test __contains__
for i in u2:
self.failUnless(i in u2, "i in u2")
for i in min(u2)-1, max(u2)+1:
self.failUnless(i not in u2, "i not in u2")
# Test __delslice__
uu2 = u2[:]
del uu2[1:2]
del uu2[0:1]
eq(uu2, [], "uu2 == []")
uu2 = u2[:]
del uu2[1:]
del uu2[:1]
eq(uu2, [], "uu2 == []")
# Test __add__, __radd__, __mul__ and __rmul__
#self.failUnless(u1 + [] == [] + u1 == u1, "u1 + [] == [] + u1 == u1")
self.failUnless(u1 + [1] == u2, "u1 + [1] == u2")
#self.failUnless([-1] + u1 == [-1, 0], "[-1] + u1 == [-1, 0]")
self.failUnless(u2 == u2*1 == 1*u2, "u2 == u2*1 == 1*u2")
self.failUnless(u2+u2 == u2*2 == 2*u2, "u2+u2 == u2*2 == 2*u2")
self.failUnless(u2+u2+u2 == u2*3 == 3*u2, "u2+u2+u2 == u2*3 == 3*u2")
# Test append
u = u1[:]
u.append(1)
eq(u, u2, "u == u2")
# Test insert
u = u2[:]
u.insert(0, -1)
eq(u, [-1, 0, 1], "u == [-1, 0, 1]")
# Test pop
u = PersistentList([0, -1, 1])
u.pop()
eq(u, [0, -1], "u == [0, -1]")
u.pop(0)
eq(u, [-1], "u == [-1]")
# Test remove
u = u2[:]
u.remove(1)
eq(u, u1, "u == u1")
# Test count
u = u2*3
eq(u.count(0), 3, "u.count(0) == 3")
eq(u.count(1), 3, "u.count(1) == 3")
eq(u.count(2), 0, "u.count(2) == 0")
# Test index
eq(u2.index(0), 0, "u2.index(0) == 0")
eq(u2.index(1), 1, "u2.index(1) == 1")
try:
u2.index(2)
except ValueError:
pass
else:
raise TestFailed("expected ValueError")
# Test reverse
u = u2[:]
u.reverse()
eq(u, [1, 0], "u == [1, 0]")
u.reverse()
eq(u, u2, "u == u2")
# Test sort
u = PersistentList([1, 0])
u.sort()
eq(u, u2, "u == u2")
# Test extend
u = u1[:]
u.extend(u2)
eq(u, u1 + u2, "u == u1 + u2")
def test_suite():
return unittest.makeSuite(TestPList, 'check')
if __name__ == "__main__":
loader = unittest.TestLoader()
loader.testMethodPrefix = "check"
unittest.main(testLoader=loader)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testPersistentMapping.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Verify that PersistentMapping works with old versions of Zope.
The comments in PersistentMapping.py address the issue in some detail.
The pickled form of a PersistentMapping must use _container to store
the actual mapping, because old versions of Zope used this attribute.
If the new code doesn't generate pickles that are consistent with the
old code, developers will have a hard time testing the new code.
"""
import unittest
import ZODB
from ZODB.MappingStorage import MappingStorage
from ZODB.Transaction import Transaction
import cPickle
import cStringIO
import sys
# This pickle contains a persistent mapping pickle created from the
# old code.
pickle = ('((U\x0bPersistenceq\x01U\x11PersistentMappingtq\x02Nt.}q\x03U\n'
'_containerq\x04}q\x05U\x07versionq\x06U\x03oldq\x07ss.\n')
class PMTests(unittest.TestCase):
def checkOldStyleRoot(self):
# insert the pickle in place of the root
s = MappingStorage()
t = Transaction()
s.tpc_begin(t)
s.store('\000' * 8, None, pickle, '', t)
s.tpc_vote(t)
s.tpc_finish(t)
db = ZODB.DB(s)
# If the root can be loaded successfully, we should be okay.
r = db.open().root()
# But make sure it looks like a new mapping
self.assert_(hasattr(r, 'data'))
self.assert_(not hasattr(r, '_container'))
def checkNewPicklesAreSafe(self):
s = MappingStorage()
db = ZODB.DB(s)
r = db.open().root()
r[1] = 1
r[2] = 2
## r[3] = r
get_transaction().commit()
# MappingStorage stores serialno + pickle in its _index.
root_pickle = s._index['\000' * 8][8:]
f = cStringIO.StringIO(root_pickle)
u = cPickle.Unpickler(f)
klass_info = u.load()
klass = find_global(*klass_info[0])
inst = klass()
state = u.load()
inst.__setstate__(state)
self.assert_(hasattr(inst, '_container'))
self.assert_(not hasattr(inst, 'data'))
def find_global(modulename, classname):
"""Helper for this test suite to get special PersistentMapping"""
if classname == "PersistentMapping":
class PersistentMapping:
def __setstate__(self, state):
self.__dict__.update(state)
return PersistentMapping
else:
__import__(modulename)
mod = sys.modules[modulename]
return getattr(mod, classname)
def test_suite():
return unittest.makeSuite(PMTests, 'check')
if __name__ == "__main__":
loader = unittest.TestLoader()
loader.testMethodPrefix = "check"
unittest.main(testLoader=loader)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testRecover.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of the file storage recovery script."""
import base64
import os
import random
import sys
import tempfile
import unittest
import StringIO
import ZODB
from ZODB.FileStorage import FileStorage
from ZODB.PersistentMapping import PersistentMapping
from ZODB.fsrecover import recover
from ZODB.tests.StorageTestBase import removefs
from ZODB.fsdump import Dumper
class RecoverTest(unittest.TestCase):
level = 2
path = None
def setUp(self):
self.path = tempfile.mktemp(suffix=".fs")
self.storage = FileStorage(self.path)
self.populate()
self.dest = tempfile.mktemp(suffix=".fs")
self.recovered = None
def tearDown(self):
self.storage.close()
if self.recovered is not None:
self.recovered.close()
removefs(self.path)
removefs(self.dest)
def populate(self):
db = ZODB.DB(self.storage)
cn = db.open()
rt = cn.root()
# create a whole bunch of objects,
# looks like a Data.fs > 1MB
for i in range(50):
d = rt[i] = PersistentMapping()
get_transaction().commit()
for j in range(50):
d[j] = "a" * j
get_transaction().commit()
def damage(self, num, size):
self.storage.close()
# Drop size null bytes into num random spots.
for i in range(num):
offset = random.randint(0, self.storage._pos - size)
f = open(self.path, "a+b")
f.seek(offset)
f.write("\0" * size)
f.close()
ITERATIONS = 5
def recover(self, source, dest):
orig = sys.stdout
try:
sys.stdout = StringIO.StringIO()
try:
recover(self.path, self.dest,
verbose=0, partial=1, force=0, pack=1)
except SystemExit:
raise RuntimeError, "recover tried to exit"
finally:
sys.stdout = orig
def testOneBlock(self):
for i in range(self.ITERATIONS):
self.damage(1, 1024)
self.recover(self.path, self.dest)
self.recovered = FileStorage(self.dest)
self.recovered.close()
os.remove(self.path)
os.rename(self.dest, self.path)
def testFourBlocks(self):
for i in range(self.ITERATIONS):
self.damage(4, 512)
self.recover(self.path, self.dest)
self.recovered = FileStorage(self.dest)
self.recovered.close()
os.remove(self.path)
os.rename(self.dest, self.path)
def testBigBlock(self):
for i in range(self.ITERATIONS):
self.damage(1, 32 * 1024)
self.recover(self.path, self.dest)
self.recovered = FileStorage(self.dest)
self.recovered.close()
os.remove(self.path)
os.rename(self.dest, self.path)
def testBadTransaction(self):
# Find transaction headers and blast them.
L = self.storage.undoLog()
r = L[3]
tid = base64.decodestring(r["id"] + "\n")
pos1 = self.storage._txn_find(tid, 0)
r = L[8]
tid = base64.decodestring(r["id"] + "\n")
pos2 = self.storage._txn_find(tid, 0)
self.storage.close()
# Overwrite the entire header.
f = open(self.path, "a+b")
f.seek(pos1 - 50)
f.write("\0" * 100)
f.close()
self.recover(self.path, self.dest)
self.recovered = FileStorage(self.dest)
self.recovered.close()
os.remove(self.path)
os.rename(self.dest, self.path)
# Overwrite part of the header.
f = open(self.path, "a+b")
f.seek(pos2 + 10)
f.write("\0" * 100)
f.close()
self.recover(self.path, self.dest)
self.recovered = FileStorage(self.dest)
self.recovered.close()
def test_suite():
return unittest.makeSuite(RecoverTest)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testTimeStamp.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the TimeStamp utility type"""
import time
import unittest
from ZODB.TimeStamp import TimeStamp
EPSILON = 0.000001
class TimeStampTests(unittest.TestCase):
def checkYMDTimeStamp(self):
self._check_ymd(2001, 6, 3)
def _check_ymd(self, yr, mo, dy):
ts = TimeStamp(yr, mo, dy)
self.assertEqual(ts.year(), yr)
self.assertEqual(ts.month(), mo)
self.assertEqual(ts.day(), dy)
self.assertEquals(ts.hour(), 0)
self.assertEquals(ts.minute(), 0)
self.assertEquals(ts.second(), 0)
t = time.gmtime(ts.timeTime())
self.assertEquals(yr, t[0])
self.assertEquals(mo, t[1])
self.assertEquals(dy, t[2])
def checkFullTimeStamp(self):
t = time.gmtime()
ts = TimeStamp(*t[:6])
# XXX floating point comparison
self.assertEquals(ts.timeTime() + time.timezone, time.mktime(t))
self.assertEqual(ts.year(), t[0])
self.assertEqual(ts.month(), t[1])
self.assertEqual(ts.day(), t[2])
self.assertEquals(ts.hour(), t[3])
self.assertEquals(ts.minute(), t[4])
self.assert_(abs(ts.second() - t[5]) < EPSILON)
def checkRawTimestamp(self):
t = time.gmtime()
ts1 = TimeStamp(*t[:6])
ts2 = TimeStamp(`ts1`)
self.assertEquals(ts1, ts2)
self.assertEquals(ts1.timeTime(), ts2.timeTime())
self.assertEqual(ts1.year(), ts2.year())
self.assertEqual(ts1.month(), ts2.month())
self.assertEqual(ts1.day(), ts2.day())
self.assertEquals(ts1.hour(), ts2.hour())
self.assertEquals(ts1.minute(), ts2.minute())
self.assert_(abs(ts1.second() - ts2.second()) < EPSILON)
def checkDictKey(self):
t = time.gmtime()
ts1 = TimeStamp(*t[:6])
ts2 = TimeStamp(2000, *t[1:6])
d = {}
d[ts1] = 1
d[ts2] = 2
self.assertEquals(len(d), 2)
def checkCompare(self):
ts1 = TimeStamp(1972, 6, 27)
ts2 = TimeStamp(1971, 12, 12)
self.assert_(ts1 > ts2)
self.assert_(ts2 <= ts1)
def checkLaterThan(self):
t = time.gmtime()
ts = TimeStamp(*t[:6])
ts2 = ts.laterThan(ts)
self.assert_(ts2 > ts)
# XXX should test for bogus inputs to TimeStamp constructor
def checkTimeStamp(self):
# Alternate test suite
t = TimeStamp(2002, 1, 23, 10, 48, 5) # GMT
self.assertEquals(str(t), '2002-01-23 10:48:05.000000')
self.assertEquals(repr(t), '\x03B9H\x15UUU')
self.assertEquals(TimeStamp('\x03B9H\x15UUU'), t)
self.assertEquals(t.year(), 2002)
self.assertEquals(t.month(), 1)
self.assertEquals(t.day(), 23)
self.assertEquals(t.hour(), 10)
self.assertEquals(t.minute(), 48)
self.assertEquals(round(t.second()), 5)
self.assertEquals(t.second(), t.seconds()) # Alias
self.assertEquals(t.timeTime(), 1011782885)
t1 = TimeStamp(2002, 1, 23, 10, 48, 10)
self.assertEquals(str(t1), '2002-01-23 10:48:10.000000')
self.assert_(t == t)
self.assert_(t != t1)
self.assert_(t < t1)
self.assert_(t <= t1)
self.assert_(t1 >= t)
self.assert_(t1 > t)
self.failIf(t == t1)
self.failIf(t != t)
self.failIf(t > t1)
self.failIf(t >= t1)
self.failIf(t1 < t)
self.failIf(t1 <= t)
self.assertEquals(cmp(t, t), 0)
self.assertEquals(cmp(t, t1), -1)
self.assertEquals(cmp(t1, t), 1)
self.assertEquals(t1.laterThan(t), t1)
self.assert_(t.laterThan(t1) > t1)
self.assertEquals(TimeStamp(2002,1,23), TimeStamp(2002,1,23,0,0,0))
def test_suite():
return unittest.makeSuite(TimeStampTests, 'check')
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testTransaction.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
Revision information:
$Id: testTransaction.py,v 1.1 2004/04/01 18:26:43 sidnei Exp $
"""
"""
I wrote these unittests to investigate some odd transaction
behavior when doing unittests of integrating non sub transaction
aware objects, and to insure proper txn behavior. these
tests test the transaction system independent of the rest of the
zodb.
you can see the method calls to a jar by passing the
keyword arg tracing to the modify method of a dataobject.
the value of the arg is a prefix used for tracing print calls
to that objects jar.
the number of times a jar method was called can be inspected
by looking at an attribute of the jar that is the method
name prefixed with a c (count/check).
i've included some tracing examples for tests that i thought
were illuminating as doc strings below.
TODO
add in tests for objects which are modified multiple times,
for example an object that gets modified in multiple sub txns.
"""
import random
from types import TupleType
import unittest
from ZODB import Transaction
class TransactionTests(unittest.TestCase):
def setUp(self):
Transaction.hosed = 0
self.sub1 = DataObject()
self.sub2 = DataObject()
self.sub3 = DataObject()
self.nosub1 = DataObject(nost=1)
def tearDown(self):
Transaction.free_transaction()
# basic tests with two sub trans jars
# really we only need one, so tests for
# sub1 should identical to tests for sub2
def testTransactionCommit(self):
self.sub1.modify()
self.sub2.modify()
get_transaction().commit()
assert self.sub1._p_jar.ccommit_sub == 0
assert self.sub1._p_jar.ctpc_finish == 1
def testTransactionAbort(self):
self.sub1.modify()
self.sub2.modify()
get_transaction().abort()
assert self.sub2._p_jar.cabort == 1
def testTransactionNote(self):
t = get_transaction()
t.note('This is a note.')
self.assertEqual(t.description, 'This is a note.')
t.note('Another.')
self.assertEqual(t.description, 'This is a note.\n\nAnother.')
t.abort()
def testSubTransactionCommitCommit(self):
self.sub1.modify()
self.sub2.modify()
get_transaction().commit(1)
assert self.sub1._p_jar.ctpc_vote == 0
assert self.sub1._p_jar.ctpc_finish == 1
get_transaction().commit()
assert self.sub1._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_vote == 1
def testSubTransactionCommitAbort(self):
self.sub1.modify()
self.sub2.modify()
get_transaction().commit(1)
get_transaction().abort()
assert self.sub1._p_jar.ctpc_vote == 0
assert self.sub1._p_jar.cabort == 0
assert self.sub1._p_jar.cabort_sub == 1
def testMultipleSubTransactionCommitCommit(self):
# add it
self.sub1.modify()
get_transaction().commit(1)
# add another
self.sub2.modify()
# reset a flag on the original to test it again
self.sub1.ctpc_finish = 0
get_transaction().commit(1)
# this is interesting.. we go through
# every subtrans commit with all subtrans capable
# objects... i don't like this but its an impl artifact
assert self.sub1._p_jar.ctpc_vote == 0
assert self.sub1._p_jar.ctpc_finish > 0
# add another before we do the entire txn commit
self.sub3.modify()
get_transaction().commit()
# we did an implicit sub commit, is this impl artifiact?
assert self.sub3._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_finish > 1
def testMultipleSubTransactionCommitAbortSub(self):
"""
sub1 calling method commit
sub1 calling method tpc_finish
sub2 calling method tpc_begin
sub2 calling method commit
sub2 calling method tpc_finish
sub3 calling method abort
sub1 calling method commit_sub
sub2 calling method commit_sub
sub2 calling method tpc_vote
sub1 calling method tpc_vote
sub1 calling method tpc_finish
sub2 calling method tpc_finish
"""
# add it
self.sub1.modify()
get_transaction().commit(1)
# add another
self.sub2.modify()
get_transaction().commit(1)
assert self.sub1._p_jar.ctpc_vote == 0
assert self.sub1._p_jar.ctpc_finish > 0
# add another before we do the entire txn commit
self.sub3.modify()
# abort the sub transaction
get_transaction().abort(1)
# commit the container transaction
get_transaction().commit()
assert self.sub3._p_jar.cabort == 1
assert self.sub1._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_finish > 1
# repeat adding in a nonsub trans jars
def testNSJTransactionCommit(self):
self.nosub1.modify()
get_transaction().commit()
assert self.nosub1._p_jar.ctpc_finish == 1
def testNSJTransactionAbort(self):
self.nosub1.modify()
get_transaction().abort()
assert self.nosub1._p_jar.ctpc_finish == 0
assert self.nosub1._p_jar.cabort == 1
# XXX
def BUGtestNSJSubTransactionCommitAbort(self):
"""
this reveals a bug in transaction.py
the nosub jar should not have tpc_finish
called on it till the containing txn
ends.
sub calling method commit
nosub calling method tpc_begin
sub calling method tpc_finish
nosub calling method tpc_finish
nosub calling method abort
sub calling method abort_sub
"""
self.sub1.modify(tracing='sub')
self.nosub1.modify(tracing='nosub')
get_transaction().commit(1)
assert self.sub1._p_jar.ctpc_finish == 1
# bug, non sub trans jars are getting finished
# in a subtrans
assert self.nosub1._p_jar.ctpc_finish == 0
get_transaction().abort()
assert self.nosub1._p_jar.cabort == 1
assert self.sub1._p_jar.cabort_sub == 1
def testNSJSubTransactionCommitCommit(self):
self.sub1.modify()
self.nosub1.modify()
get_transaction().commit(1)
assert self.nosub1._p_jar.ctpc_vote == 0
get_transaction().commit()
#assert self.nosub1._p_jar.ccommit_sub == 0
assert self.nosub1._p_jar.ctpc_vote == 1
assert self.sub1._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_vote == 1
def testNSJMultipleSubTransactionCommitCommit(self):
"""
sub1 calling method tpc_begin
sub1 calling method commit
sub1 calling method tpc_finish
nosub calling method tpc_begin
nosub calling method tpc_finish
sub2 calling method tpc_begin
sub2 calling method commit
sub2 calling method tpc_finish
nosub calling method tpc_begin
nosub calling method commit
sub1 calling method commit_sub
sub2 calling method commit_sub
sub1 calling method tpc_vote
nosub calling method tpc_vote
sub2 calling method tpc_vote
sub2 calling method tpc_finish
nosub calling method tpc_finish
sub1 calling method tpc_finish
"""
# add it
self.sub1.modify()
get_transaction().commit(1)
# add another
self.nosub1.modify()
get_transaction().commit(1)
assert self.sub1._p_jar.ctpc_vote == 0
assert self.nosub1._p_jar.ctpc_vote == 0
assert self.sub1._p_jar.ctpc_finish > 0
# add another before we do the entire txn commit
self.sub2.modify()
# commit the container transaction
get_transaction().commit()
# we did an implicit sub commit
assert self.sub2._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_finish > 1
### Failure Mode Tests
#
# ok now we do some more interesting
# tests that check the implementations
# error handling by throwing errors from
# various jar methods
###
# first the recoverable errors
def testExceptionInAbort(self):
self.sub1._p_jar = SubTransactionJar(errors='abort')
self.nosub1.modify()
self.sub1.modify(nojar=1)
self.sub2.modify()
try:
get_transaction().abort()
except TestTxnException: pass
assert self.nosub1._p_jar.cabort == 1
assert self.sub2._p_jar.cabort == 1
def testExceptionInCommit(self):
self.sub1._p_jar = SubTransactionJar(errors='commit')
self.nosub1.modify()
self.sub1.modify(nojar=1)
try:
get_transaction().commit()
except TestTxnException: pass
assert self.nosub1._p_jar.ctpc_finish == 0
assert self.nosub1._p_jar.ccommit == 1
assert self.nosub1._p_jar.ctpc_abort == 1
assert Transaction.hosed == 0
def testExceptionInTpcVote(self):
self.sub1._p_jar = SubTransactionJar(errors='tpc_vote')
self.nosub1.modify()
self.sub1.modify(nojar=1)
try:
get_transaction().commit()
except TestTxnException: pass
assert self.nosub1._p_jar.ctpc_finish == 0
assert self.nosub1._p_jar.ccommit == 1
assert self.nosub1._p_jar.ctpc_abort == 1
assert self.sub1._p_jar.ctpc_abort == 1
assert Transaction.hosed == 0
def testExceptionInTpcBegin(self):
"""
ok this test reveals a bug in the TM.py
as the nosub tpc_abort there is ignored.
nosub calling method tpc_begin
nosub calling method commit
sub calling method tpc_begin
sub calling method abort
sub calling method tpc_abort
nosub calling method tpc_abort
"""
self.sub1._p_jar = SubTransactionJar(errors='tpc_begin')
self.nosub1.modify()
self.sub1.modify(nojar=1)
try:
get_transaction().commit()
except TestTxnException: pass
assert self.nosub1._p_jar.ctpc_abort == 1
assert self.sub1._p_jar.ctpc_abort == 1
def testExceptionInTpcAbort(self):
self.sub1._p_jar = SubTransactionJar(
errors=('tpc_abort', 'tpc_vote'))
self.nosub1.modify()
self.sub1.modify(nojar=1)
try:
get_transaction().commit()
except TestTxnException: pass
assert self.nosub1._p_jar.ctpc_abort == 1
assert Transaction.hosed == 0
### More Failure modes...
# now we mix in some sub transactions
###
def testExceptionInSubCommitSub(self):
"""
this tests exhibits some odd behavior,
nothing thats technically incorrect...
basically it seems non deterministic, even
stranger the behavior seems dependent on what
values i test after the fact... very odd,
almost relativistic.
in-retrospect this is from the fact that
dictionaries are used to store jars at some point
"""
self.sub1.modify()
get_transaction().commit(1)
self.nosub1.modify()
self.sub2._p_jar = SubTransactionJar(errors='commit_sub')
self.sub2.modify(nojar=1)
get_transaction().commit(1)
self.sub3.modify()
try:
get_transaction().commit()
except TestTxnException: pass
# odd this doesn't seem to be entirely deterministic..
if self.sub1._p_jar.ccommit_sub:
assert self.sub1._p_jar.ctpc_abort == 1
else:
assert self.sub1._p_jar.cabort_sub == 1
if self.sub3._p_jar.ccommit_sub:
assert self.sub3._p_jar.ctpc_abort == 1
else:
assert self.sub3._p_jar.cabort_sub == 1
assert self.sub2._p_jar.ctpc_abort == 1
assert self.nosub1._p_jar.ctpc_abort == 1
def testExceptionInSubAbortSub(self):
self.sub1._p_jar = SubTransactionJar(errors='commit_sub')
self.sub1.modify(nojar=1)
get_transaction().commit(1)
self.nosub1.modify()
self.sub2._p_jar = SubTransactionJar(errors='abort_sub')
self.sub2.modify(nojar=1)
get_transaction().commit(1)
self.sub3.modify()
try:
get_transaction().commit()
except TestTxnException, err:
pass
else:
self.fail("expected transaction to fail")
# The last commit failed. If the commit_sub() method was
# called, then tpc_abort() should be called to abort the
# actual transaction. If not, then calling abort_sub() is
# sufficient.
if self.sub3._p_jar.ccommit_sub == 1:
self.assertEqual(self.sub3._p_jar.ctpc_abort, 1)
else:
self.assertEqual(self.sub3._p_jar.cabort_sub, 1)
# last test, check the hosing mechanism
def testHoserStoppage(self):
# XXX We should consult ZConfig to decide whether we can get into a
# hosed state or not.
return
# It's hard to test the "hosed" state of the database, where
# hosed means that a failure occurred in the second phase of
# the two phase commit. It's hard because the database can
# recover from such an error if it occurs during the very first
# tpc_finish() call of the second phase.
for obj in self.sub1, self.sub2:
j = HoserJar(errors='tpc_finish')
j.reset()
obj._p_jar = j
obj.modify(nojar=1)
try:
get_transaction().commit()
except TestTxnException:
pass
self.assert_(Transaction.hosed)
self.sub2.modify()
try:
get_transaction().commit()
except Transaction.POSException.TransactionError:
pass
else:
self.fail("Hosed Application didn't stop commits")
class DataObject:
def __init__(self, nost=0):
self.nost= nost
self._p_jar = None
def modify(self, nojar=0, tracing=0):
if not nojar:
if self.nost:
self._p_jar = NoSubTransactionJar(tracing=tracing)
else:
self._p_jar = SubTransactionJar(tracing=tracing)
get_transaction().register(self)
class TestTxnException(Exception):
pass
class BasicJar:
def __init__(self, errors=(), tracing=0):
if not isinstance(errors, TupleType):
errors = errors,
self.errors = errors
self.tracing = tracing
self.cabort = 0
self.ccommit = 0
self.ctpc_begin = 0
self.ctpc_abort = 0
self.ctpc_vote = 0
self.ctpc_finish = 0
self.cabort_sub = 0
self.ccommit_sub = 0
def __repr__(self):
return "<jar %X %s>" % (id(self), self.errors)
def check(self, method):
if self.tracing:
print '%s calling method %s'%(str(self.tracing),method)
if method in self.errors:
raise TestTxnException("error %s" % method)
## basic jar txn interface
def abort(self, *args):
self.check('abort')
self.cabort += 1
def commit(self, *args):
self.check('commit')
self.ccommit += 1
def tpc_begin(self, txn, sub=0):
self.check('tpc_begin')
self.ctpc_begin += 1
def tpc_vote(self, *args):
self.check('tpc_vote')
self.ctpc_vote += 1
def tpc_abort(self, *args):
self.check('tpc_abort')
self.ctpc_abort += 1
def tpc_finish(self, *args):
self.check('tpc_finish')
self.ctpc_finish += 1
class SubTransactionJar(BasicJar):
def abort_sub(self, txn):
self.check('abort_sub')
self.cabort_sub = 1
def commit_sub(self, txn):
self.check('commit_sub')
self.ccommit_sub = 1
class NoSubTransactionJar(BasicJar):
pass
class HoserJar(BasicJar):
# The HoserJars coordinate their actions via the class variable
# committed. The check() method will only raise its exception
# if committed > 0.
committed = 0
def reset(self):
# Calling reset() on any instance will reset the class variable.
HoserJar.committed = 0
def check(self, method):
if HoserJar.committed > 0:
BasicJar.check(self, method)
def tpc_finish(self, *args):
self.check('tpc_finish')
self.ctpc_finish += 1
HoserJar.committed += 1
def test_suite():
return unittest.makeSuite(TransactionTests)
if __name__ == '__main__':
unittest.TextTestRunner().run(test_suite())
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testUtils.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the routines to convert between long and 64-bit strings"""
import random
import unittest
NUM = 100
from ZODB.utils import U64, p64, u64
class TestUtils(unittest.TestCase):
small = [random.randrange(1, 1L<<32, int=long)
for i in range(NUM)]
large = [random.randrange(1L<<32, 1L<<64, int=long)
for i in range(NUM)]
all = small + large
def checkLongToStringToLong(self):
for num in self.all:
s = p64(num)
n = U64(s)
self.assertEquals(num, n, "U64() failed")
n2 = u64(s)
self.assertEquals(num, n2, "u64() failed")
def checkKnownConstants(self):
self.assertEquals("\000\000\000\000\000\000\000\001", p64(1))
self.assertEquals("\000\000\000\001\000\000\000\000", p64(1L<<32))
self.assertEquals(u64("\000\000\000\000\000\000\000\001"), 1)
self.assertEquals(U64("\000\000\000\000\000\000\000\001"), 1)
self.assertEquals(u64("\000\000\000\001\000\000\000\000"), 1L<<32)
self.assertEquals(U64("\000\000\000\001\000\000\000\000"), 1L<<32)
def test_suite():
return unittest.makeSuite(TestUtils, 'check')
if __name__ == "__main__":
loader = unittest.TestLoader()
loader.testMethodPrefix = "check"
unittest.main(testLoader=loader)
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testZODB.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from __future__ import nested_scopes
import unittest
import ZODB
import ZODB.FileStorage
from ZODB.PersistentMapping import PersistentMapping
from ZODB.POSException import ReadConflictError, ConflictError
from ZODB.tests.StorageTestBase import removefs
from Persistence import Persistent
class P(Persistent):
pass
class Independent(Persistent):
def _p_independent(self):
return 1
class DecoyIndependent(Persistent):
def _p_independent(self):
return 0
class ZODBTests(unittest.TestCase):
def setUp(self):
self._storage = ZODB.FileStorage.FileStorage(
'ZODBTests.fs', create=1)
self._db = ZODB.DB(self._storage)
def populate(self):
get_transaction().begin()
conn = self._db.open()
root = conn.root()
root['test'] = pm = PersistentMapping()
for n in range(100):
pm[n] = PersistentMapping({0: 100 - n})
get_transaction().note('created test data')
get_transaction().commit()
conn.close()
def tearDown(self):
self._db.close()
removefs("ZODBTests.fs")
def checkExportImport(self, abort_it=0, dup_name='test_duplicate'):
self.populate()
get_transaction().begin()
get_transaction().note('duplication')
# Duplicate the 'test' object.
conn = self._db.open()
try:
root = conn.root()
ob = root['test']
assert len(ob) > 10, 'Insufficient test data'
try:
import tempfile
f = tempfile.TemporaryFile()
ob._p_jar.exportFile(ob._p_oid, f)
assert f.tell() > 0, 'Did not export correctly'
f.seek(0)
new_ob = ob._p_jar.importFile(f)
root[dup_name] = new_ob
f.close()
if abort_it:
get_transaction().abort()
else:
get_transaction().commit()
except:
get_transaction().abort()
raise
finally:
conn.close()
get_transaction().begin()
# Verify the duplicate.
conn = self._db.open()
try:
root = conn.root()
ob = root['test']
try:
ob2 = root[dup_name]
except KeyError:
if abort_it:
# Passed the test.
return
else:
raise
else:
if abort_it:
assert 0, 'Did not abort duplication'
l1 = list(ob.items())
l1.sort()
l2 = list(ob2.items())
l2.sort()
l1 = map(lambda (k, v): (k, v[0]), l1)
l2 = map(lambda (k, v): (k, v[0]), l2)
assert l1 == l2, 'Duplicate did not match'
assert ob._p_oid != ob2._p_oid, 'Did not duplicate'
assert ob._p_jar == ob2._p_jar, 'Not same connection'
oids = {}
for v in ob.values():
oids[v._p_oid] = 1
for v in ob2.values():
assert not oids.has_key(v._p_oid), (
'Did not fully separate duplicate from original')
get_transaction().commit()
finally:
conn.close()
def checkExportImportAborted(self):
self.checkExportImport(abort_it=1, dup_name='test_duplicate_aborted')
def checkVersionOnly(self):
# Make sure the changes to make empty transactions a no-op
# still allow things like abortVersion(). This should work
# because abortVersion() calls tpc_begin() itself.
conn = self._db.open("version")
try:
r = conn.root()
r[1] = 1
get_transaction().commit()
finally:
conn.close()
self._db.abortVersion("version")
get_transaction().commit()
def checkResetCache(self):
# The cache size after a reset should be 0 and the GC attributes
# ought to be linked to it rather than the old cache.
self.populate()
conn = self._db.open()
conn.root()
self.assert_(len(conn._cache) > 0) # Precondition
conn._resetCache()
self.assertEqual(len(conn._cache), 0)
self.assert_(conn._incrgc == conn._cache.incrgc)
self.assert_(conn.cacheGC == conn._cache.incrgc)
def checkLocalTransactions(self):
# Test of transactions that apply to only the connection,
# not the thread.
conn1 = self._db.open()
conn2 = self._db.open()
try:
conn1.setLocalTransaction()
conn2.setLocalTransaction()
r1 = conn1.root()
r2 = conn2.root()
if r1.has_key('item'):
del r1['item']
conn1.getTransaction().commit()
r1.get('item')
r2.get('item')
r1['item'] = 1
conn1.getTransaction().commit()
self.assertEqual(r1['item'], 1)
# r2 has not seen a transaction boundary,
# so it should be unchanged.
self.assertEqual(r2.get('item'), None)
conn2.sync()
# Now r2 is updated.
self.assertEqual(r2['item'], 1)
# Now, for good measure, send an update in the other direction.
r2['item'] = 2
conn2.getTransaction().commit()
self.assertEqual(r1['item'], 1)
self.assertEqual(r2['item'], 2)
conn1.sync()
conn2.sync()
self.assertEqual(r1['item'], 2)
self.assertEqual(r2['item'], 2)
finally:
conn1.close()
conn2.close()
def checkReadConflict(self):
self.obj = P()
self.readConflict()
def readConflict(self, shouldFail=1):
# Two transactions run concurrently. Each reads some object,
# then one commits and the other tries to read an object
# modified by the first. This read should fail with a conflict
# error because the object state read is not necessarily
# consistent with the objects read earlier in the transaction.
conn = self._db.open()
conn.setLocalTransaction()
r1 = conn.root()
r1["p"] = self.obj
self.obj.child1 = P()
conn.getTransaction().commit()
# start a new transaction with a new connection
cn2 = self._db.open()
# start a new transaction with the other connection
cn2.setLocalTransaction()
r2 = cn2.root()
self.assertEqual(r1._p_serial, r2._p_serial)
self.obj.child2 = P()
conn.getTransaction().commit()
# resume the transaction using cn2
obj = r2["p"]
# An attempt to access obj should fail, because r2 was read
# earlier in the transaction and obj was modified by the othe
# transaction.
if shouldFail:
self.assertRaises(ReadConflictError, lambda: obj.child1)
else:
# make sure that accessing the object succeeds
obj.child1
cn2.getTransaction().abort()
def checkReadConflictIgnored(self):
# Test that an application that catches a read conflict and
# continues can not commit the transaction later.
root = self._db.open().root()
root["real_data"] = real_data = PersistentMapping()
root["index"] = index = PersistentMapping()
real_data["a"] = PersistentMapping({"indexed_value": 0})
real_data["b"] = PersistentMapping({"indexed_value": 1})
index[1] = PersistentMapping({"b": 1})
index[0] = PersistentMapping({"a": 1})
get_transaction().commit()
# load some objects from one connection
cn2 = self._db.open()
cn2.setLocalTransaction()
r2 = cn2.root()
real_data2 = r2["real_data"]
index2 = r2["index"]
real_data["b"]["indexed_value"] = 0
del index[1]["b"]
index[0]["b"] = 1
get_transaction().commit()
del real_data2["a"]
try:
del index2[0]["a"]
except ReadConflictError:
# This is the crux of the test. Ignore the error.
pass
else:
self.fail("No conflict occurred")
# real_data2 still ready to commit
self.assert_(real_data2._p_changed)
# index2 values not ready to commit
self.assert_(not index2._p_changed)
self.assert_(not index2[0]._p_changed)
self.assert_(not index2[1]._p_changed)
self.assertRaises(ConflictError, cn2.getTransaction().commit)
get_transaction().abort()
def checkIgnoreReadConflict(self):
# Test that an application that catches a read conflict and
# continues can not commit the transaction later.
root = self._db.open().root()
root["real_data"] = real_data = PersistentMapping()
root["index"] = index = PersistentMapping()
real_data["a"] = PersistentMapping({"indexed_value": 0})
real_data["b"] = PersistentMapping({"indexed_value": 1})
index[1] = PersistentMapping({"b": 1})
index[0] = PersistentMapping({"a": 1})
get_transaction().commit()
# load some objects from one connection
cn2 = self._db.open()
cn2.setLocalTransaction()
r2 = cn2.root()
real_data2 = r2["real_data"]
index2 = r2["index"]
real_data["b"]["indexed_value"] = 0
del index[1]["b"]
index[0]["b"] = 1
get_transaction().commit()
del real_data2["a"]
try:
del index2[0]["a"]
except ReadConflictError, obj:
# The point of this test is to make sure that the conflict
# can be ignored.
obj.ignore()
else:
self.fail("No conflict occurred")
# real_data2 still ready to commit
self.assert_(real_data2._p_changed)
# index2 values not ready to commit
self.assert_(not index2._p_changed)
self.assert_(not index2[0]._p_changed)
self.assert_(not index2[1]._p_changed)
# The commit should succeed despite the ReadConflictError, because
# we explicitly said to ignore it.
cn2.getTransaction().commit()
get_transaction().abort()
def checkIndependent(self):
self.obj = Independent()
self.readConflict(shouldFail=0)
def checkNotIndependent(self):
self.obj = DecoyIndependent()
self.readConflict()
def test_suite():
return unittest.makeSuite(ZODBTests, 'check')
=== Added File Zope3/src/zope/app/advanced/acquisition/tests/testfsIndex.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import unittest, sys
from ZODB.fsIndex import fsIndex
from ZODB.utils import p64
class Test(unittest.TestCase):
def testInserts(self):
index=fsIndex()
for i in range(200):
index[p64(i*1000)]=(i*1000L+1)
for i in range(0,200):
self.assertEqual((i,index[p64(i*1000)]), (i,(i*1000L+1)))
self.assertEqual(len(index), 200)
key=p64(2000)
self.assertEqual(index.get(key), 2001)
key=p64(2001)
self.assertEqual(index.get(key), None)
self.assertEqual(index.get(key, ''), '')
# self.failUnless(len(index._data) > 1)
def testUpdate(self):
index=fsIndex()
d={}
for i in range(200):
d[p64(i*1000)]=(i*1000L+1)
index.update(d)
for i in range(400,600):
d[p64(i*1000)]=(i*1000L+1)
index.update(d)
for i in range(100, 500):
d[p64(i*1000)]=(i*1000L+2)
index.update(d)
self.assertEqual(index.get(p64(2000)), 2001)
self.assertEqual(index.get(p64(599000)), 599001)
self.assertEqual(index.get(p64(399000)), 399002)
self.assertEqual(len(index), 600)
def test_suite():
loader=unittest.TestLoader()
return loader.loadTestsFromTestCase(Test)
if __name__=='__main__':
unittest.TextTestRunner().run(test_suite())
More information about the Zope3-Checkins
mailing list