[Zodb-checkins] CVS: StandaloneZODB/ZODB/tests - BasicStorage.py:1.7 ConflictResolution.py:1.5 Corruption.py:1.2 HistoryStorage.py:1.4 IteratorStorage.py:1.3 LocalStorage.py:1.2 PackableStorage.py:1.6 TransactionalUndoStorage.py:1.11 VersionStorage.py:1.7
Barry Warsaw
barry@wooz.org
Fri, 21 Sep 2001 16:56:47 -0400
Update of /cvs-repository/StandaloneZODB/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv27707/ZODB/tests
Modified Files:
BasicStorage.py ConflictResolution.py Corruption.py
HistoryStorage.py IteratorStorage.py LocalStorage.py
PackableStorage.py TransactionalUndoStorage.py
VersionStorage.py
Log Message:
Don't use assert keyword to ensure that a test passes; use
self.assertEqual() and friends since this gives much better output.
=== StandaloneZODB/ZODB/tests/BasicStorage.py 1.6 => 1.7 ===
newrevid = self._dostore(revid=revid)
# Finish the transaction.
- assert newrevid <> revid
+ self.assertNotEqual(newrevid, revid)
def checkNonVersionStoreAndLoad(self):
+ eq = self.assertEqual
oid = self._storage.new_oid()
self._dostore(oid=oid, data=MinPO(7))
data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data)
- assert value == MinPO(7)
+ eq(value, MinPO(7))
# Now do a bunch of updates to an object
for i in range(13, 22):
revid = self._dostore(oid, revid=revid, data=MinPO(i))
# Now get the latest revision of the object
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(21)
+ eq(zodb_unpickle(data), MinPO(21))
def checkNonVersionModifiedInVersion(self):
oid = self._storage.new_oid()
self._dostore(oid=oid)
- assert self._storage.modifiedInVersion(oid) == ''
+ self.assertEqual(self._storage.modifiedInVersion(oid), '')
def checkLoadSerial(self):
oid = self._storage.new_oid()
@@ -98,7 +99,7 @@
# Now make sure all the revisions have the correct value
for revid, value in revisions.items():
data = self._storage.loadSerial(oid, revid)
- assert zodb_unpickle(data) == value
+ self.assertEqual(zodb_unpickle(data), value)
def checkConflicts(self):
oid = self._storage.new_oid()
=== StandaloneZODB/ZODB/tests/ConflictResolution.py 1.4 => 1.5 ===
data, serialno = self._storage.load(oid, '')
inst = zodb_unpickle(data)
- self.assert_(inst._value == 5)
+ self.assertEqual(inst._value, 5)
def checkUnresolvable(self):
obj = PCounter2()
=== StandaloneZODB/ZODB/tests/Corruption.py 1.1 => 1.2 ===
# truncation the index file
path = self.path + '.index'
- self.assert_(os.path.exists(path))
+ self.failUnless(os.path.exists(path))
f = open(path, 'r+')
f.seek(0, 2)
size = f.tell()
@@ -61,7 +61,7 @@
# truncation the index file
path = self.path + '.index'
- self.assert_(os.path.exists(path))
+ self.failUnless(os.path.exists(path))
size = os.stat(path)[stat.ST_SIZE]
f = open(path, 'r+')
while f.tell() < size:
=== StandaloneZODB/ZODB/tests/HistoryStorage.py 1.3 => 1.4 ===
class HistoryStorage:
def checkSimpleHistory(self):
+ eq = self.assertEqual
# Store a couple of non-version revisions of the object
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
@@ -15,36 +16,46 @@
revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
# Now get various snapshots of the object's history
h = self._storage.history(oid, size=1)
- assert len(h) == 1
+ eq(len(h), 1)
d = h[0]
- assert d['serial'] == revid3 and d['version'] == ''
+ eq(d['serial'], revid3)
+ eq(d['version'], '')
# Try to get 2 historical revisions
h = self._storage.history(oid, size=2)
- assert len(h) == 2
+ eq(len(h), 2)
d = h[0]
- assert d['serial'] == revid3 and d['version'] == ''
+ eq(d['serial'], revid3)
+ eq(d['version'], '')
d = h[1]
- assert d['serial'] == revid2 and d['version'] == ''
+ eq(d['serial'], revid2)
+ eq(d['version'], '')
# Try to get all 3 historical revisions
h = self._storage.history(oid, size=3)
- assert len(h) == 3
+ eq(len(h), 3)
d = h[0]
- assert d['serial'] == revid3 and d['version'] == ''
+ eq(d['serial'], revid3)
+ eq(d['version'], '')
d = h[1]
- assert d['serial'] == revid2 and d['version'] == ''
+ eq(d['serial'], revid2)
+ eq(d['version'], '')
d = h[2]
- assert d['serial'] == revid1 and d['version'] == ''
+ eq(d['serial'], revid1)
+ eq(d['version'], '')
# There should be no more than 3 revisions
h = self._storage.history(oid, size=4)
- assert len(h) == 3
+ eq(len(h), 3)
d = h[0]
- assert d['serial'] == revid3 and d['version'] == ''
+ eq(d['serial'], revid3)
+ eq(d['version'], '')
d = h[1]
- assert d['serial'] == revid2 and d['version'] == ''
+ eq(d['serial'], revid2)
+ eq(d['version'], '')
d = h[2]
- assert d['serial'] == revid1 and d['version'] == ''
+ eq(d['serial'], revid1)
+ eq(d['version'], '')
def checkVersionHistory(self):
+ eq = self.assertEqual
# Store a couple of non-version revisions
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
@@ -61,21 +72,28 @@
# Now, try to get the six historical revisions (first three are in
# 'test-version', followed by the non-version revisions).
h = self._storage.history(oid, version, 100)
- assert len(h) == 6
+ eq(len(h), 6)
d = h[0]
- assert d['serial'] == revid6 and d['version'] == version
+ eq(d['serial'], revid6)
+ eq(d['version'], version)
d = h[1]
- assert d['serial'] == revid5 and d['version'] == version
+ eq(d['serial'], revid5)
+ eq(d['version'], version)
d = h[2]
- assert d['serial'] == revid4 and d['version'] == version
+ eq(d['serial'], revid4)
+ eq(d['version'], version)
d = h[3]
- assert d['serial'] == revid3 and d['version'] == ''
+ eq(d['serial'], revid3)
+ eq(d['version'], '')
d = h[4]
- assert d['serial'] == revid2 and d['version'] == ''
+ eq(d['serial'], revid2)
+ eq(d['version'], '')
d = h[5]
- assert d['serial'] == revid1 and d['version'] == ''
+ eq(d['serial'], revid1)
+ eq(d['version'], '')
def checkHistoryAfterVersionCommit(self):
+ eq = self.assertEqual
# Store a couple of non-version revisions
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
@@ -108,23 +126,31 @@
# Now, try to get the six historical revisions (first three are in
# 'test-version', followed by the non-version revisions).
h = self._storage.history(oid, version, 100)
- assert len(h) == 7
+ eq(len(h), 7)
d = h[0]
- assert d['serial'] == revid7 and d['version'] == ''
+ eq(d['serial'], revid7)
+ eq(d['version'], '')
d = h[1]
- assert d['serial'] == revid6 and d['version'] == version
+ eq(d['serial'], revid6)
+ eq(d['version'], version)
d = h[2]
- assert d['serial'] == revid5 and d['version'] == version
+ eq(d['serial'], revid5)
+ eq(d['version'], version)
d = h[3]
- assert d['serial'] == revid4 and d['version'] == version
+ eq(d['serial'], revid4)
+ eq(d['version'], version)
d = h[4]
- assert d['serial'] == revid3 and d['version'] == ''
+ eq(d['serial'], revid3)
+ eq(d['version'], '')
d = h[5]
- assert d['serial'] == revid2 and d['version'] == ''
+ eq(d['serial'], revid2)
+ eq(d['version'], '')
d = h[6]
- assert d['serial'] == revid1 and d['version'] == ''
+ eq(d['serial'], revid1)
+ eq(d['version'], '')
def checkHistoryAfterVersionAbort(self):
+ eq = self.assertEqual
# Store a couple of non-version revisions
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
@@ -157,18 +183,25 @@
# Now, try to get the six historical revisions (first three are in
# 'test-version', followed by the non-version revisions).
h = self._storage.history(oid, version, 100)
- assert len(h) == 7
+ eq(len(h), 7)
d = h[0]
- assert d['serial'] == revid7 and d['version'] == ''
+ eq(d['serial'], revid7)
+ eq(d['version'], '')
d = h[1]
- assert d['serial'] == revid6 and d['version'] == version
+ eq(d['serial'], revid6)
+ eq(d['version'], version)
d = h[2]
- assert d['serial'] == revid5 and d['version'] == version
+ eq(d['serial'], revid5)
+ eq(d['version'], version)
d = h[3]
- assert d['serial'] == revid4 and d['version'] == version
+ eq(d['serial'], revid4)
+ eq(d['version'], version)
d = h[4]
- assert d['serial'] == revid3 and d['version'] == ''
+ eq(d['serial'], revid3)
+ eq(d['version'], '')
d = h[5]
- assert d['serial'] == revid2 and d['version'] == ''
+ eq(d['serial'], revid2)
+ eq(d['version'], '')
d = h[6]
- assert d['serial'] == revid1 and d['version'] == ''
+ eq(d['serial'], revid1)
+ eq(d['version'], '')
=== StandaloneZODB/ZODB/tests/IteratorStorage.py 1.2 => 1.3 ===
class IteratorStorage:
def checkSimpleIteration(self):
+ eq = self.assertEqual
# Store a bunch of revisions of a single object
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
@@ -17,10 +18,10 @@
val = 11
txniter = self._storage.iterator()
for reciter, revid in zip(txniter, (revid1, revid2, revid3)):
- assert reciter.tid == revid
+ eq(reciter.tid, revid)
for rec in reciter:
- assert rec.oid == oid
- assert rec.serial == revid
- assert rec.version == ''
- assert zodb_unpickle(rec.data) == MinPO(val)
+ eq(rec.oid, oid)
+ eq(rec.serial, revid)
+ eq(rec.version, '')
+ eq(zodb_unpickle(rec.data), MinPO(val))
val = val + 1
=== StandaloneZODB/ZODB/tests/LocalStorage.py 1.1 => 1.2 ===
"""
def checkLen(self):
+ eq = self.assertEqual
# The length of the database ought to grow by one each time
- assert len(self._storage) == 0
+ eq(len(self._storage), 0)
self._dostore()
- assert len(self._storage) == 1
+ eq(len(self._storage), 1)
self._dostore()
- assert len(self._storage) == 2
-
+ eq(len(self._storage), 2)
=== StandaloneZODB/ZODB/tests/PackableStorage.py 1.5 => 1.6 ===
class PackableStorage(PackableStorageBase):
def checkPackAllRevisions(self):
+ eq = self.assertEqual
+ raises = self.assertRaises
# Create a `persistent' object
obj = self._newobj()
oid = obj.getoid()
@@ -117,25 +119,27 @@
# Now make sure all three revisions can be extracted
data = self._storage.loadSerial(oid, revid1)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid and pobj.value == 1
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 1)
data = self._storage.loadSerial(oid, revid2)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid and pobj.value == 2
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 2)
data = self._storage.loadSerial(oid, revid3)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid and pobj.value == 3
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 3)
# Now pack all transactions
self._storage.pack(time.time(), referencesf)
# All revisions of the object should be gone, since there is no
# reference from the root object to this object.
- self.assertRaises(KeyError,
- self._storage.loadSerial, oid, revid1)
- self.assertRaises(KeyError,
- self._storage.loadSerial, oid, revid2)
- self.assertRaises(KeyError,
- self._storage.loadSerial, oid, revid3)
+ raises(KeyError, self._storage.loadSerial, oid, revid1)
+ raises(KeyError, self._storage.loadSerial, oid, revid2)
+ raises(KeyError, self._storage.loadSerial, oid, revid3)
def checkPackJustOldRevisions(self):
+ eq = self.assertEqual
+ raises = self.assertRaises
loads = self._makeloader()
# Create a root object. This can't be an instance of Object,
# otherwise the pickling machinery will serialize it as a persistent
@@ -152,7 +156,8 @@
revid0 = self._dostoreNP(ZERO, data=dumps(root))
# Make sure the root can be retrieved
data, revid = self._storage.load(ZERO, '')
- assert revid == revid0 and loads(data).value == 0
+ eq(revid, revid0)
+ eq(loads(data).value, 0)
# Commit three different revisions of the other object
obj.value = 1
revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
@@ -163,33 +168,39 @@
# Now make sure all three revisions can be extracted
data = self._storage.loadSerial(oid, revid1)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid and pobj.value == 1
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 1)
data = self._storage.loadSerial(oid, revid2)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid and pobj.value == 2
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 2)
data = self._storage.loadSerial(oid, revid3)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid and pobj.value == 3
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 3)
# Now pack just revisions 1 and 2. The object's current revision
# should stay alive because it's pointed to by the root.
self._storage.pack(time.time(), referencesf)
# Make sure the revisions are gone, but that object zero and revision
# 3 are still there and correct
data, revid = self._storage.load(ZERO, '')
- assert revid == revid0 and loads(data).value == 0
- self.assertRaises(KeyError,
- self._storage.loadSerial, oid, revid1)
- self.assertRaises(KeyError,
- self._storage.loadSerial, oid, revid2)
+ eq(revid, revid0)
+ eq(loads(data).value, 0)
+ raises(KeyError, self._storage.loadSerial, oid, revid1)
+ raises(KeyError, self._storage.loadSerial, oid, revid2)
data = self._storage.loadSerial(oid, revid3)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid and pobj.value == 3
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 3)
data, revid = self._storage.load(oid, '')
- assert revid == revid3
+ eq(revid, revid3)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid and pobj.value == 3
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 3)
def checkPackOnlyOneObject(self):
+ eq = self.assertEqual
+ raises = self.assertRaises
loads = self._makeloader()
# Create a root object. This can't be an instance of Object,
# otherwise the pickling machinery will serialize it as a persistent
@@ -203,7 +214,7 @@
# sure it's oid is greater than the first object's oid.
obj2 = self._newobj()
oid2 = obj2.getoid()
- assert oid2 > oid1
+ self.failUnless(oid2 > oid1)
# Link the root object to the persistent objects, in order to keep
# them alive. Store the root object.
root.obj1 = obj1
@@ -212,7 +223,8 @@
revid0 = self._dostoreNP(ZERO, data=dumps(root))
# Make sure the root can be retrieved
data, revid = self._storage.load(ZERO, '')
- assert revid == revid0 and loads(data).value == 0
+ eq(revid, revid0)
+ eq(loads(data).value, 0)
# Commit three different revisions of the first object
obj1.value = 1
revid1 = self._dostoreNP(oid1, data=pickle.dumps(obj1))
@@ -223,20 +235,24 @@
# Now make sure all three revisions can be extracted
data = self._storage.loadSerial(oid1, revid1)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid1 and pobj.value == 1
+ eq(pobj.getoid(), oid1)
+ eq(pobj.value, 1)
data = self._storage.loadSerial(oid1, revid2)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid1 and pobj.value == 2
+ eq(pobj.getoid(), oid1)
+ eq(pobj.value, 2)
data = self._storage.loadSerial(oid1, revid3)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid1 and pobj.value == 3
+ eq(pobj.getoid(), oid1)
+ eq(pobj.value, 3)
# Now commit a revision of the second object
obj2.value = 11
revid4 = self._dostoreNP(oid2, data=pickle.dumps(obj2))
# And make sure the revision can be extracted
data = self._storage.loadSerial(oid2, revid4)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid2 and pobj.value == 11
+ eq(pobj.getoid(), oid2)
+ eq(pobj.value, 11)
# Now pack just revisions 1 and 2 of object1. Object1's current
# revision should stay alive because it's pointed to by the root, as
# should Object2's current revision.
@@ -244,20 +260,23 @@
# Make sure the revisions are gone, but that object zero, object2, and
# revision 3 of object1 are still there and correct.
data, revid = self._storage.load(ZERO, '')
- assert revid == revid0 and loads(data).value == 0
- self.assertRaises(KeyError,
- self._storage.loadSerial, oid1, revid1)
- self.assertRaises(KeyError,
- self._storage.loadSerial, oid1, revid2)
+ eq(revid, revid0)
+ eq(loads(data).value, 0)
+ raises(KeyError, self._storage.loadSerial, oid1, revid1)
+ raises(KeyError, self._storage.loadSerial, oid1, revid2)
data = self._storage.loadSerial(oid1, revid3)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid1 and pobj.value == 3
+ eq(pobj.getoid(), oid1)
+ eq(pobj.value, 3)
data, revid = self._storage.load(oid1, '')
- assert revid == revid3
+ eq(revid, revid3)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid1 and pobj.value == 3
+ eq(pobj.getoid(), oid1)
+ eq(pobj.value, 3)
data, revid = self._storage.load(oid2, '')
- assert revid == revid4 and loads(data).value == 11
+ eq(revid, revid4)
+ eq(loads(data).value, 11)
data = self._storage.loadSerial(oid2, revid4)
pobj = pickle.loads(data)
- assert pobj.getoid() == oid2 and pobj.value == 11
+ eq(pobj.getoid(), oid2)
+ eq(pobj.value, 11)
=== StandaloneZODB/ZODB/tests/TransactionalUndoStorage.py 1.10 => 1.11 ===
def checkSimpleTransactionalUndo(self):
+ eq = self.assertEqual
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(23))
revid = self._dostore(oid, revid=revid, data=MinPO(24))
@@ -60,10 +61,10 @@
oids = self._storage.transactionalUndo(tid, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid
+ eq(len(oids), 1)
+ eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(24)
+ eq(zodb_unpickle(data), MinPO(24))
# Do another one
info = self._storage.undoInfo()
tid = info[2]['id']
@@ -72,10 +73,10 @@
oids = self._storage.transactionalUndo(tid, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid
+ eq(len(oids), 1)
+ eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(23)
+ eq(zodb_unpickle(data), MinPO(23))
# Try to undo the first record
info = self._storage.undoInfo()
tid = info[4]['id']
@@ -85,8 +86,8 @@
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid
+ eq(len(oids), 1)
+ eq(oids[0], oid)
# This should fail since we've undone the object's creation
self.assertRaises(KeyError,
self._storage.load, oid, '')
@@ -97,12 +98,13 @@
oids = self._storage.transactionalUndo(tid, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid
+ eq(len(oids), 1)
+ eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(23)
+ eq(zodb_unpickle(data), MinPO(23))
def checkUndoCreationBranch1(self):
+ eq = self.assertEqual
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12))
@@ -113,10 +115,10 @@
oids = self._storage.transactionalUndo(tid, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid
+ eq(len(oids), 1)
+ eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(11)
+ eq(zodb_unpickle(data), MinPO(11))
# Now from here, we can either redo the last undo, or undo the object
# creation. Let's undo the object creation.
info = self._storage.undoInfo()
@@ -125,11 +127,12 @@
oids = self._storage.transactionalUndo(tid, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid
+ eq(len(oids), 1)
+ eq(oids[0], oid)
self.assertRaises(KeyError, self._storage.load, oid, '')
def checkUndoCreationBranch2(self):
+ eq = self.assertEqual
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
revid = self._dostore(oid, revid=revid, data=MinPO(12))
@@ -140,10 +143,10 @@
oids = self._storage.transactionalUndo(tid, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid
+ eq(len(oids), 1)
+ eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(11)
+ eq(zodb_unpickle(data), MinPO(11))
# Now from here, we can either redo the last undo, or undo the object
# creation. Let's redo the last undo
info = self._storage.undoInfo()
@@ -152,12 +155,13 @@
oids = self._storage.transactionalUndo(tid, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid
+ eq(len(oids), 1)
+ eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(12)
+ eq(zodb_unpickle(data), MinPO(12))
def checkTwoObjectUndo(self):
+ eq = self.assertEqual
# Convenience
p31, p32, p51, p52 = map(zodb_pickle,
map(MinPO, (31, 32, 51, 52)))
@@ -174,7 +178,7 @@
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(self._transaction)
- assert revid1 == revid2
+ eq(revid1, revid2)
# Update those same two objects
self._storage.tpc_begin(self._transaction)
self._transaction_begin()
@@ -185,12 +189,12 @@
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(self._transaction)
- assert revid1 == revid2
+ eq(revid1, revid2)
# Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(32)
+ eq(zodb_unpickle(data), MinPO(32))
data, revid2 = self._storage.load(oid2, '')
- assert zodb_unpickle(data) == MinPO(52)
+ eq(zodb_unpickle(data), MinPO(52))
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[0]['id']
@@ -198,15 +202,18 @@
oids = self._storage.transactionalUndo(tid, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 2
- assert oid1 in oids and oid2 in oids
+ eq(len(oids), 2)
+ self.failUnless(oid1 in oids)
+ self.failUnless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(31)
+ eq(zodb_unpickle(data), MinPO(31))
data, revid2 = self._storage.load(oid2, '')
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
def checkTwoObjectUndoAtOnce(self):
# Convenience
+ eq = self.assertEqual
+ unless = self.failUnless
p30, p31, p32, p50, p51, p52 = map(zodb_pickle,
map(MinPO,
(30, 31, 32, 50, 51, 52)))
@@ -217,25 +224,25 @@
d = self._multi_obj_transaction([(oid1, revid1, p30),
(oid2, revid2, p50),
])
- assert d[oid1] == d[oid2]
+ eq(d[oid1], d[oid2])
# Update those same two objects
d = self._multi_obj_transaction([(oid1, d[oid1], p31),
(oid2, d[oid2], p51),
])
- assert d[oid1] == d[oid2]
+ eq(d[oid1], d[oid2])
# Update those same two objects
d = self._multi_obj_transaction([(oid1, d[oid1], p32),
(oid2, d[oid2], p52),
])
- assert d[oid1] == d[oid2]
+ eq(d[oid1], d[oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- assert revid1 == revid2
+ eq(revid1, revid2)
# Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(32)
+ eq(zodb_unpickle(data), MinPO(32))
data, revid2 = self._storage.load(oid2, '')
- assert zodb_unpickle(data) == MinPO(52)
+ eq(zodb_unpickle(data), MinPO(52))
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[0]['id']
@@ -248,13 +255,14 @@
# We get the finalization stuff called an extra time:
## self._storage.tpc_vote(self._transaction)
## self._storage.tpc_finish(self._transaction)
- assert len(oids) == 2
- assert len(oids1) == 2
- assert oid1 in oids and oid2 in oids
+ eq(len(oids), 2)
+ eq(len(oids1), 2)
+ unless(oid1 in oids)
+ unless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(30)
+ eq(zodb_unpickle(data), MinPO(30))
data, revid2 = self._storage.load(oid2, '')
- assert zodb_unpickle(data) == MinPO(50)
+ eq(zodb_unpickle(data), MinPO(50))
# Now try to undo the one we just did to undo, whew
info = self._storage.undoInfo()
tid = info[0]['id']
@@ -262,14 +270,16 @@
oids = self._storage.transactionalUndo(tid, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 2
- assert oid1 in oids and oid2 in oids
+ eq(len(oids), 2)
+ unless(oid1 in oids)
+ unless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(32)
+ eq(zodb_unpickle(data), MinPO(32))
data, revid2 = self._storage.load(oid2, '')
- assert zodb_unpickle(data) == MinPO(52)
+ eq(zodb_unpickle(data), MinPO(52))
def checkTwoObjectUndoAgain(self):
+ eq = self.assertEqual
p32, p33, p52, p53 = map(zodb_pickle,
map(MinPO, (32, 33, 52, 53)))
# Like the above, but the first revision of the objects are stored in
@@ -288,7 +298,7 @@
self._storage.tpc_finish(self._transaction)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- assert revid1 == revid2
+ eq(revid1, revid2)
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[0]['id']
@@ -296,12 +306,13 @@
oids = self._storage.transactionalUndo(tid, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 2
- assert oid1 in oids and oid2 in oids
+ eq(len(oids), 2)
+ self.failUnless(oid1 in oids)
+ self.failUnless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(31)
+ eq(zodb_unpickle(data), MinPO(31))
data, revid2 = self._storage.load(oid2, '')
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
# Like the above, but this time, the second transaction contains only
# one object.
self._storage.tpc_begin(self._transaction)
@@ -313,7 +324,7 @@
self._storage.tpc_finish(self._transaction)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- assert revid1 == revid2
+ eq(revid1, revid2)
# Update in different transactions
revid1 = self._dostore(oid1, revid=revid1, data=MinPO(34))
revid2 = self._dostore(oid2, revid=revid2, data=MinPO(54))
@@ -324,15 +335,17 @@
oids = self._storage.transactionalUndo(tid, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oid1 in oids and not oid2 in oids
+ eq(len(oids), 1)
+ self.failUnless(oid1 in oids)
+ self.failUnless(not oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(33)
+ eq(zodb_unpickle(data), MinPO(33))
data, revid2 = self._storage.load(oid2, '')
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
def checkNotUndoable(self):
+ eq = self.assertEqual
# Set things up so we've got a transaction that can't be undone
oid = self._storage.new_oid()
revid_a = self._dostore(oid, data=MinPO(51))
@@ -364,16 +377,18 @@
self._storage.tpc_finish(self._transaction)
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- assert revid1 == revid2
+ eq(revid1, revid2)
# Make sure the objects have the expected values
data, revid_11 = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(81)
+ eq(zodb_unpickle(data), MinPO(81))
data, revid_22 = self._storage.load(oid2, '')
- assert zodb_unpickle(data) == MinPO(91)
- assert revid_11 == revid1 and revid_22 == revid2
+ eq(zodb_unpickle(data), MinPO(91))
+ eq(revid_11, revid1)
+ eq(revid_22, revid2)
# Now modify oid2
revid2 = self._dostore(oid2, revid=revid2, data=MinPO(92))
- assert revid1 <> revid2 and revid2 <> revid_22
+ self.assertNotEqual(revid1, revid2)
+ self.assertNotEqual(revid2, revid_22)
info = self._storage.undoInfo()
tid = info[1]['id']
self._storage.tpc_begin(self._transaction)
=== StandaloneZODB/ZODB/tests/VersionStorage.py 1.6 => 1.7 ===
class VersionStorage:
def checkVersionedStoreAndLoad(self):
+ eq = self.assertEqual
# Store a couple of non-version revisions of the object
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
@@ -23,9 +24,9 @@
# Now read back the object in both the non-version and version and
# make sure the values jive.
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(12)
+ eq(zodb_unpickle(data), MinPO(12))
data, revid = self._storage.load(oid, version)
- assert zodb_unpickle(data) == MinPO(15)
+ eq(zodb_unpickle(data), MinPO(15))
def checkVersionedLoadErrors(self):
oid = self._storage.new_oid()
@@ -43,7 +44,7 @@
#JF# self._storage.load,
#JF# oid, 'bogus')
data, revid = self._storage.load(oid, 'bogus')
- assert zodb_unpickle(data) == MinPO(11)
+ self.assertEqual(zodb_unpickle(data), MinPO(11))
def checkVersionLock(self):
@@ -63,7 +64,7 @@
#JF# The empty string is not a valid version. I think that this should
#JF# be an error. Let's punt for now.
#JF# assert self._storage.versionEmpty('')
- assert self._storage.versionEmpty(version)
+ self.failUnless(self._storage.versionEmpty(version))
# Now store some objects
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(11))
@@ -78,11 +79,12 @@
#JF# assert not self._storage.versionEmpty('')
# Neither should 'test-version'
- assert not self._storage.versionEmpty(version)
+ self.failUnless(not self._storage.versionEmpty(version))
# But this non-existant version should be empty
- assert self._storage.versionEmpty('bogus')
+ self.failUnless(self._storage.versionEmpty('bogus'))
def checkVersions(self):
+ unless = self.failUnless
# Store some objects in the non-version
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
@@ -99,13 +101,13 @@
version='three')
# Ask for the versions
versions = self._storage.versions()
- assert 'one' in versions
- assert 'two' in versions
- assert 'three' in versions
+ unless('one' in versions)
+ unless('two' in versions)
+ unless('three' in versions)
# Now flex the `max' argument
versions = self._storage.versions(1)
- assert len(versions) == 1
- assert 'one' in versions or 'two' in versions or 'three' in versions
+ self.assertEqual(len(versions), 1)
+ unless('one' in versions or 'two' in versions or 'three' in versions)
def _setup_version(self, version='test-version'):
# Store some revisions in the non-version
@@ -123,18 +125,20 @@
return oid, version
def checkAbortVersion(self):
+ eq = self.assertEqual
oid, version = self._setup_version()
# Now abort the version -- must be done in a transaction
self._storage.tpc_begin(self._transaction)
oids = self._storage.abortVersion(version, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid
+ eq(len(oids), 1)
+ eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
def checkAbortVersionErrors(self):
+ eq = self.assertEqual
oid, version = self._setup_version()
# Now abort a bogus version
self._storage.tpc_begin(self._transaction)
@@ -154,12 +158,13 @@
oids = self._storage.abortVersion(version, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid
+ eq(len(oids), 1)
+ eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
def checkModifyAfterAbortVersion(self):
+ eq = self.assertEqual
oid, version = self._setup_version()
# Now abort the version
self._storage.tpc_begin(self._transaction)
@@ -173,37 +178,39 @@
revid = self._dostore(oid, revid=revid, data=MinPO(53))
revid = self._dostore(oid, revid=revid, data=MinPO(54))
data, newrevid = self._storage.load(oid, '')
- assert newrevid == revid
- assert zodb_unpickle(data) == MinPO(54)
+ eq(newrevid, revid)
+ eq(zodb_unpickle(data), MinPO(54))
def checkCommitToNonVersion(self):
+ eq = self.assertEqual
oid, version = self._setup_version()
data, revid = self._storage.load(oid, version)
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
# Try committing this version to the empty version
self._storage.tpc_begin(self._transaction)
oids = self._storage.commitVersion(version, '', self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
data, revid = self._storage.load(oid, '')
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
def checkCommitToOtherVersion(self):
+ eq = self.assertEqual
oid1, version1 = self._setup_version('one')
data, revid1 = self._storage.load(oid1, version1)
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
oid2, version2 = self._setup_version('two')
data, revid2 = self._storage.load(oid2, version2)
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
# Let's make sure we can't get object1 in version2
#JF# This won't fail because we fall back to non-version data.
#JF# In fact, it must succed and give us 51
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.load, oid1, version2)
data, revid2 = self._storage.load(oid1, version2)
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
# Okay, now let's commit object1 to version2
self._storage.tpc_begin(self._transaction)
@@ -211,25 +218,26 @@
self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid1
+ eq(len(oids), 1)
+ eq(oids[0], oid1)
data, revid = self._storage.load(oid1, version2)
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
data, revid = self._storage.load(oid2, version2)
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
#JF# Ditto, sort of
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.load, oid1, version1)
data, revid2 = self._storage.load(oid1, version1)
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
def checkAbortOneVersionCommitTheOther(self):
+ eq = self.assertEqual
oid1, version1 = self._setup_version('one')
data, revid1 = self._storage.load(oid1, version1)
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
oid2, version2 = self._setup_version('two')
data, revid2 = self._storage.load(oid2, version2)
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
# Let's make sure we can't get object1 in version2
#JF# It's not an error to load data in a different version when data
@@ -238,52 +246,52 @@
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.load, oid1, version2)
data, revid2 = self._storage.load(oid1, version2)
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
# First, let's abort version1
self._storage.tpc_begin(self._transaction)
oids = self._storage.abortVersion(version1, self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid1
+ eq(len(oids), 1)
+ eq(oids[0], oid1)
data, revid = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
#JF# Ditto
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.load, oid1, version1)
data, revid = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.load, oid1, version2)
data, revid = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
data, revid = self._storage.load(oid2, '')
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
data, revid = self._storage.load(oid2, version2)
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
# Okay, now let's commit version2 back to the trunk
self._storage.tpc_begin(self._transaction)
oids = self._storage.commitVersion(version2, '', self._transaction)
self._storage.tpc_vote(self._transaction)
self._storage.tpc_finish(self._transaction)
- assert len(oids) == 1
- assert oids[0] == oid2
+ eq(len(oids), 1)
+ eq(oids[0], oid2)
# These objects should not be found in version 2
#JF# Ditto
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.load, oid1, version2)
data, revid = self._storage.load(oid1, '')
- assert zodb_unpickle(data) == MinPO(51)
+ eq(zodb_unpickle(data), MinPO(51))
#JF# self.assertRaises(POSException.VersionError,
#JF# self._storage.load, oid2, version2)
# But the trunk should be up to date now
data, revid = self._storage.load(oid2, version2)
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
data, revid = self._storage.load(oid2, '')
- assert zodb_unpickle(data) == MinPO(54)
+ eq(zodb_unpickle(data), MinPO(54))
#JF# To do a test like you want, you have to add the data in a version
oid = self._storage.new_oid()