[Zope3-checkins] CVS: Zope3/src/ZODB/tests - testZODB.py:1.24.2.2

Jeremy Hylton jeremy at zope.com
Mon Mar 29 12:14:10 EST 2004


Update of /cvs-repository/Zope3/src/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv11388

Modified Files:
      Tag: jeremy-txn-branch
	testZODB.py 
Log Message:
Restructure long test into a couple of helper methods.

I hope that putting the try/finally in a separate function will help
with debugging.


=== Zope3/src/ZODB/tests/testZODB.py 1.24.2.1 => 1.24.2.2 ===
--- Zope3/src/ZODB/tests/testZODB.py:1.24.2.1	Mon Mar 29 12:02:23 2004
+++ Zope3/src/ZODB/tests/testZODB.py	Mon Mar 29 12:14:09 2004
@@ -54,70 +54,73 @@
         self._db.close()
         self._storage.cleanup()
 
-    def checkExportImport(self, abort_it=0, dup_name='test_duplicate'):
+    def checkExportImport(self, abort_it=False, dup_name='test_duplicate'):
         self.populate()
-        get_transaction().begin()
-        get_transaction().note('duplication')
-        # Duplicate the 'test' object.
         conn = self._db.open()
         try:
-            root = conn.root()
-            ob = root['test']
-            assert len(ob) > 10, 'Insufficient test data'
-            try:
-                import tempfile
-                f = tempfile.TemporaryFile()
-                ob._p_jar.exportFile(ob._p_oid, f)
-                assert f.tell() > 0, 'Did not export correctly'
-                f.seek(0)
-                new_ob = ob._p_jar.importFile(f)
-                self.assertEqual(new_ob, ob)
-                root[dup_name] = new_ob
-                f.close()
-                if abort_it:
-                    get_transaction().abort()
-                else:
-                    get_transaction().commit()
-            except:
-                get_transaction().abort()
-                raise
+            self.duplicate(conn, dup_name, abort_it)
         finally:
             conn.close()
-        get_transaction().begin()
-        # Verify the duplicate.
         conn = self._db.open()
         try:
-            root = conn.root()
-            ob = root['test']
-            try:
-                ob2 = root[dup_name]
-            except KeyError:
-                if abort_it:
-                    # Passed the test.
-                    return
-                else:
-                    raise
-            else:
-                if abort_it:
-                    assert 0, 'Did not abort duplication'
-            l1 = list(ob.items())
-            l1.sort()
-            l2 = list(ob2.items())
-            l2.sort()
-            l1 = map(lambda (k, v): (k, v[0]), l1)
-            l2 = map(lambda (k, v): (k, v[0]), l2)
-            assert l1 == l2, 'Duplicate did not match'
-            assert ob._p_oid != ob2._p_oid, 'Did not duplicate'
-            assert ob._p_jar == ob2._p_jar, 'Not same connection'
-            oids = {}
-            for v in ob.values():
-                oids[v._p_oid] = 1
-            for v in ob2.values():
-                assert not oids.has_key(v._p_oid), (
-                    'Did not fully separate duplicate from original')
-            get_transaction().commit()
+            self.verify(conn, dup_name, abort_it)
         finally:
             conn.close()
+
+    def duplicate(self, conn, dup_name, abort_it):
+        get_transaction().begin()
+        get_transaction().note('duplication')
+        root = conn.root()
+        ob = root['test']
+        assert len(ob) > 10, 'Insufficient test data'
+        try:
+            import tempfile
+            f = tempfile.TemporaryFile()
+            ob._p_jar.exportFile(ob._p_oid, f)
+            assert f.tell() > 0, 'Did not export correctly'
+            f.seek(0)
+            new_ob = ob._p_jar.importFile(f)
+            self.assertEqual(new_ob, ob)
+            root[dup_name] = new_ob
+            f.close()
+            if abort_it:
+                get_transaction().abort()
+            else:
+                get_transaction().commit()
+        except:
+            get_transaction().abort()
+            raise
+
+    def verify(self, conn, dup_name, abort_it):
+        get_transaction().begin()
+        root = conn.root()
+        ob = root['test']
+        try:
+            ob2 = root[dup_name]
+        except KeyError:
+            if abort_it:
+                # Passed the test.
+                return
+            else:
+                raise
+        else:
+            self.failUnless(not abort_it, 'Did not abort duplication')
+        l1 = list(ob.items())
+        l1.sort()
+        l2 = list(ob2.items())
+        l2.sort()
+        l1 = map(lambda (k, v): (k, v[0]), l1)
+        l2 = map(lambda (k, v): (k, v[0]), l2)
+        self.assertEqual(l1, l2)
+        self.assert_(ob._p_oid != ob2._p_oid)
+        self.assertEqual(ob._p_jar, ob2._p_jar)
+        oids = {}
+        for v in ob.values():
+            oids[v._p_oid] = 1
+        for v in ob2.values():
+            assert not oids.has_key(v._p_oid), (
+                'Did not fully separate duplicate from original')
+        get_transaction().commit()
 
     def checkExportImportAborted(self):
         self.checkExportImport(abort_it=1, dup_name='test_duplicate_aborted')




More information about the Zope3-Checkins mailing list