[Zodb-checkins] CVS: Zope3/src/ZODB/tests - testZODB.py:1.24.2.2
Jeremy Hylton
jeremy at zope.com
Mon Mar 29 12:14:11 EST 2004
Update of /cvs-repository/Zope3/src/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv11388
Modified Files:
Tag: jeremy-txn-branch
testZODB.py
Log Message:
Restructure long test into a couple of helper methods.
I hope that putting the try/finally in a separate function will help
with debugging.
=== Zope3/src/ZODB/tests/testZODB.py 1.24.2.1 => 1.24.2.2 ===
--- Zope3/src/ZODB/tests/testZODB.py:1.24.2.1 Mon Mar 29 12:02:23 2004
+++ Zope3/src/ZODB/tests/testZODB.py Mon Mar 29 12:14:09 2004
@@ -54,70 +54,73 @@
self._db.close()
self._storage.cleanup()
- def checkExportImport(self, abort_it=0, dup_name='test_duplicate'):
+ def checkExportImport(self, abort_it=False, dup_name='test_duplicate'):
self.populate()
- get_transaction().begin()
- get_transaction().note('duplication')
- # Duplicate the 'test' object.
conn = self._db.open()
try:
- root = conn.root()
- ob = root['test']
- assert len(ob) > 10, 'Insufficient test data'
- try:
- import tempfile
- f = tempfile.TemporaryFile()
- ob._p_jar.exportFile(ob._p_oid, f)
- assert f.tell() > 0, 'Did not export correctly'
- f.seek(0)
- new_ob = ob._p_jar.importFile(f)
- self.assertEqual(new_ob, ob)
- root[dup_name] = new_ob
- f.close()
- if abort_it:
- get_transaction().abort()
- else:
- get_transaction().commit()
- except:
- get_transaction().abort()
- raise
+ self.duplicate(conn, dup_name, abort_it)
finally:
conn.close()
- get_transaction().begin()
- # Verify the duplicate.
conn = self._db.open()
try:
- root = conn.root()
- ob = root['test']
- try:
- ob2 = root[dup_name]
- except KeyError:
- if abort_it:
- # Passed the test.
- return
- else:
- raise
- else:
- if abort_it:
- assert 0, 'Did not abort duplication'
- l1 = list(ob.items())
- l1.sort()
- l2 = list(ob2.items())
- l2.sort()
- l1 = map(lambda (k, v): (k, v[0]), l1)
- l2 = map(lambda (k, v): (k, v[0]), l2)
- assert l1 == l2, 'Duplicate did not match'
- assert ob._p_oid != ob2._p_oid, 'Did not duplicate'
- assert ob._p_jar == ob2._p_jar, 'Not same connection'
- oids = {}
- for v in ob.values():
- oids[v._p_oid] = 1
- for v in ob2.values():
- assert not oids.has_key(v._p_oid), (
- 'Did not fully separate duplicate from original')
- get_transaction().commit()
+ self.verify(conn, dup_name, abort_it)
finally:
conn.close()
+
+ def duplicate(self, conn, dup_name, abort_it):
+ get_transaction().begin()
+ get_transaction().note('duplication')
+ root = conn.root()
+ ob = root['test']
+ assert len(ob) > 10, 'Insufficient test data'
+ try:
+ import tempfile
+ f = tempfile.TemporaryFile()
+ ob._p_jar.exportFile(ob._p_oid, f)
+ assert f.tell() > 0, 'Did not export correctly'
+ f.seek(0)
+ new_ob = ob._p_jar.importFile(f)
+ self.assertEqual(new_ob, ob)
+ root[dup_name] = new_ob
+ f.close()
+ if abort_it:
+ get_transaction().abort()
+ else:
+ get_transaction().commit()
+ except:
+ get_transaction().abort()
+ raise
+
+ def verify(self, conn, dup_name, abort_it):
+ get_transaction().begin()
+ root = conn.root()
+ ob = root['test']
+ try:
+ ob2 = root[dup_name]
+ except KeyError:
+ if abort_it:
+ # Passed the test.
+ return
+ else:
+ raise
+ else:
+ self.failUnless(not abort_it, 'Did not abort duplication')
+ l1 = list(ob.items())
+ l1.sort()
+ l2 = list(ob2.items())
+ l2.sort()
+ l1 = map(lambda (k, v): (k, v[0]), l1)
+ l2 = map(lambda (k, v): (k, v[0]), l2)
+ self.assertEqual(l1, l2)
+ self.assert_(ob._p_oid != ob2._p_oid)
+ self.assertEqual(ob._p_jar, ob2._p_jar)
+ oids = {}
+ for v in ob.values():
+ oids[v._p_oid] = 1
+ for v in ob2.values():
+ assert not oids.has_key(v._p_oid), (
+ 'Did not fully separate duplicate from original')
+ get_transaction().commit()
def checkExportImportAborted(self):
self.checkExportImport(abort_it=1, dup_name='test_duplicate_aborted')
More information about the Zodb-checkins
mailing list