[Zodb-checkins] CVS: Zope3/src/zodb/code/tests - test_module.py:1.4

Jeremy Hylton jeremy@zope.com
Tue, 31 Dec 2002 13:35:43 -0500


Update of /cvs-repository/Zope3/src/zodb/code/tests
In directory cvs.zope.org:/tmp/cvs-serv20044/zodb/code/tests

Modified Files:
	test_module.py 
Log Message:
Extend test suite to load objects using separate connection.
And use new ManagedRegistry objects to run the tests.

Before these changes, the tests only exercised manipulation of the
original object not creating a new object from a pickle.


=== Zope3/src/zodb/code/tests/test_module.py 1.3 => 1.4 ===
--- Zope3/src/zodb/code/tests/test_module.py:1.3	Mon Dec 30 14:21:42 2002
+++ Zope3/src/zodb/code/tests/test_module.py	Tue Dec 31 13:35:42 2002
@@ -14,17 +14,15 @@
 import os
 import unittest
 
-from zodb.storage.mapping import DB, MappingStorage
-import zodb.db
-
 from persistence.dict import PersistentDict
-from zodb.code.module import \
-     PersistentModuleManager, PersistentModuleRegistry, \
-     PersistentModuleImporter, PersistentPackage
+from persistence._persistence import GHOST, UPTODATE
+from transaction import get_transaction
 
+import zodb.db
+from zodb.storage.mapping import DB, MappingStorage
 from zodb.code import tests # import this package, to get at __file__ reliably
-
-from transaction import get_transaction
+from zodb.code.module \
+     import ManagedRegistry, PersistentModuleImporter, PersistentPackage
 
 # snippets of source code used by testModules
 foo_src = """\
@@ -87,7 +85,7 @@
     def setUp(self):
         self.db = DB()
         self.root = self.db.open().root()
-        self.registry = PersistentModuleRegistry()
+        self.registry = ManagedRegistry()
         self.importer = TestPersistentModuleImporter(self.registry)
         self.importer.install()
         self.root["registry"] = self.registry
@@ -97,35 +95,58 @@
 
     def tearDown(self):
         self.importer.uninstall()
+        # just in case
+        get_transaction().abort()
+
+    def sameModules(self, registry):
+        m1 = self.registry.modules()
+        m1.sort()
+        m2 = registry.modules()
+        m2.sort()
+        self.assertEqual(m1, m2)
+
+    def useNewConnection(self):
+        # load modules using a separate connection to test that
+        # modules can be recreated from the database
+        cn = self.db.open()
+        reg = cn.root()["registry"]
+        self.sameModules(reg)
+        for name in reg.modules():
+            mod = reg.findModule(name)
+            mod._p_activate()
+            self.assertEqual(mod._p_state, UPTODATE)
+            for obj in mod.__dict__.values():
+                if hasattr(obj, "_p_activate"):
+                    obj._p_activate()
+        cn.close()
 
 class TestModule(TestBase):
     
     def testModule(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("pmtest", open(self._pmtest).read())
+        self.registry.newModule("pmtest", open(self._pmtest).read())
         get_transaction().commit()
         self.assert_(self.registry.findModule("pmtest"))
         import pmtest
         pmtest._p_deactivate()
         self.assertEqual(pmtest.a, 1)
         pmtest.f(4)
+        self.useNewConnection()
 
     def testUpdateFunction(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("pmtest", "def f(x): return x")
+        self.registry.newModule("pmtest", "def f(x): return x")
         get_transaction().commit()
         import pmtest
         self.assertEqual(pmtest.f(3), 3)
         copy = pmtest.f
-        mgr.update("def f(x): return x + 1")
+        self.registry.updateModule("pmtest", "def f(x): return x + 1")
         get_transaction().commit()
         pmtest._p_deactivate()
         self.assertEqual(pmtest.f(3), 4)
         self.assertEqual(copy(3), 4)
+        self.useNewConnection()
 
     def testUpdateClass(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("pmtest", src)
+        self.registry.newModule("pmtest", src)
         get_transaction().commit()
         import pmtest
         inst = pmtest.Foo()
@@ -134,23 +155,20 @@
         v2 = inst.n()
         self.assertEqual(v1 - 1, v2)
         self.assertEqual(v0 + 1, v1)
-        mgr.update(src2)
+        self.registry.updateModule("pmtest", src2)
         get_transaction().commit()
         self.assertRaises(AttributeError, getattr, inst, "n")
+        self.useNewConnection()
 
     def testModules(self):
-        foomgr = PersistentModuleManager(self.registry)
-        foomgr.new("foo", foo_src)
+        self.registry.newModule("foo", foo_src)
         # quux has a copy of foo.x
-        quuxmgr = PersistentModuleManager(self.registry)
-        quuxmgr.new("quux", quux_src)
+        self.registry.newModule("quux", quux_src)
         # bar has a reference to foo
-        barmgr = PersistentModuleManager(self.registry)
-        barmgr.new("bar", "import foo")
+        self.registry.newModule("bar", "import foo")
         # baz has reference to f and copy of x,
         # remember the the global x in f is looked up in foo
-        bazmgr = PersistentModuleManager(self.registry)
-        bazmgr.new("baz", "from foo import *")
+        self.registry.newModule("baz", "from foo import *")
         import foo, bar, baz, quux
         self.assert_(foo._p_oid is None)
         get_transaction().commit()
@@ -175,10 +193,10 @@
         self.assertEqual(baz.f(4), 46)
         self.assertEqual(bar.foo.f(4), 46)
         self.assertEqual(foo.f(4), 46)
+        self.useNewConnection()
 
     def testFunctionAttrs(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("foo", foo_src)
+        self.registry.newModule("foo", foo_src)
         import foo
         A = foo.f.attr = "attr"
         self.assertEqual(foo.f.attr, A)
@@ -189,53 +207,44 @@
         del foo.f.attr
         self.assertRaises(AttributeError, getattr, foo.f, "attr")
         foo.f.func_code
+        self.useNewConnection()
 
     def testFunctionSideEffects(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("effect", side_effect_src)
+        self.registry.newModule("effect", side_effect_src)
         import effect
         effect.inc()
         get_transaction().commit()
         effect.inc()
         self.assert_(effect._p_changed)
+        self.useNewConnection()
 
     def testBuiltins(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("test", builtin_src)
+        self.registry.newModule("test", builtin_src)
         get_transaction().commit()
         import test
         self.assertEqual(test.f(), len(test.x))
         test._p_deactivate()
         self.assertEqual(test.f(), len(test.x))
+        self.useNewConnection()
 
     def testNested(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("nested", nested_src)
+        self.registry.newModule("nested", nested_src)
         get_transaction().commit()
         import nested
         self.assertEqual(nested.g(5), 8)
+        self.useNewConnection()
 
     def testLambda(self):
-        mgr = PersistentModuleManager(self.registry)
         # test a lambda that contains another lambda as a default
-        src = "f = lambda x, y = lambda: 1: x + y()"
-        mgr.new("test", src)
+        self.registry.newModule("test",
+                                "f = lambda x, y = lambda: 1: x + y()")
         get_transaction().commit()
         import test
         self.assertEqual(test.f(1), 2)
-
-##    def testClosure(self):
-##        # This test causes a seg fault because ???
-##        self.importer.module_from_source("closure", closure_src)
-##        get_transaction().commit()
-##        import closure
-##        self.assertEqual(closure.inc(5), 6)
-##        closure._p_deactivate()
-##        self.assertEqual(closure.inc(5), 6)
+        self.useNewConnection()
 
     def testClass(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("foo", src)
+        self.registry.newModule("foo", src)
         get_transaction().commit()
         import foo
         obj = foo.Foo()
@@ -247,59 +256,54 @@
         i = o.m()
         j = o.m()
         self.assertEqual(i + 1, j)
+        self.useNewConnection()
 
     def testPackage(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("A.B.C", "def f(x): return x")
+        self.registry.newModule("A.B.C", "def f(x): return x")
         get_transaction().commit()
 
         import A.B.C
         self.assert_(isinstance(A, PersistentPackage))
         self.assertEqual(A.B.C.f("A"), "A")
 
-        mgr = PersistentModuleManager(self.registry)
-        self.assertRaises(ValueError,
-                          mgr.new, "A.B", "def f(x): return x + 1")
+        self.assertRaises(ValueError, self.registry.newModule,
+                          "A.B", "def f(x): return x + 1")
 
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("A.B.D", "def f(x): return x")
+        self.registry.newModule("A.B.D", "def f(x): return x")
         get_transaction().commit()
 
         from A.B import D
         self.assert_(hasattr(A.B.D, "f"))
+        self.useNewConnection()
 
     def testPackageInit(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("A.B.C", "def f(x): return x")
+        self.registry.newModule("A.B.C", "def f(x): return x")
         get_transaction().commit()
 
         import A.B.C
 
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("A.B.__init__", "x = 2")
+        self.registry.newModule("A.B.__init__", "x = 2")
         get_transaction().commit()
 
         import A.B
         self.assert_(hasattr(A.B, "C"))
         self.assertEqual(A.B.x, 2)
 
-        mgr = PersistentModuleManager(self.registry)
-        self.assertRaises(ValueError,
-                          mgr.new, "A.__init__.D", "x = 2")
+        self.assertRaises(ValueError, self.registry.newModule,
+                          "A.__init__.D", "x = 2")
+        self.useNewConnection()
 
     def testPackageRelativeImport(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("A.B.C", "def f(x): return x")
+        self.registry.newModule("A.B.C", "def f(x): return x")
         get_transaction().commit()
 
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("A.Q", "from B.C import f")
+        self.registry.newModule("A.Q", "from B.C import f")
         get_transaction().commit()
 
         import A.Q
         self.assertEqual(A.B.C.f, A.Q.f)
 
-        mgr.update("import B.C")
+        self.registry.updateModule("A.Q", "import B.C")
         get_transaction().commit()
 
         self.assertEqual(A.B.C.f, A.Q.B.C.f)
@@ -308,10 +312,11 @@
             import A.B.Q
         except ImportError:
             pass
+        self.useNewConnection()
 
     def testImportAll(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("A.B.C", """__all__ = ["a", "b"]; a, b, c = 1, 2, 3""")
+        self.registry.newModule("A.B.C",
+                                """__all__ = ["a", "b"]; a, b, c = 1, 2, 3""")
         get_transaction().commit()
 
         d = {}
@@ -320,8 +325,7 @@
         self.assertEqual(d['b'], 2)
         self.assertRaises(KeyError, d.__getitem__, "c")
 
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("A.B.D", "from C import *")
+        self.registry.newModule("A.B.D", "from C import *")
         get_transaction().commit()
 
         import A.B.D
@@ -329,16 +333,15 @@
         self.assert_(hasattr(A.B.D, "b"))
         self.assert_(not hasattr(A.B.D, "c"))
 
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("A.__init__", """__all__ = ["B", "F"]""")
+        self.registry.newModule("A.__init__", """__all__ = ["B", "F"]""")
         get_transaction().commit()
 
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("A.F", "spam = 1")
+        self.registry.newModule("A.F", "spam = 1")
         get_transaction().commit()
 
         import A
         self.assertEqual(A.F.spam, 1)
+        self.useNewConnection()
 
 class TestModuleReload(unittest.TestCase):
     """Test reloading of modules"""
@@ -355,7 +358,7 @@
         self.root = self.db.open().root()
         self.registry = self.root.get("registry")
         if self.registry is None:
-            self.root["registry"] = self.registry = PersistentModuleRegistry()
+            self.root["registry"] = self.registry = ManagedRegistry()
         self.importer = TestPersistentModuleImporter(self.registry)
         self.importer.install()
         get_transaction().commit()
@@ -365,8 +368,7 @@
         self.db.close()
 
     def testModuleReload(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("pmtest", open(self._pmtest).read())
+        self.registry.newModule("pmtest", open(self._pmtest).read())
         get_transaction().commit()
         import pmtest
         pmtest._p_deactivate()
@@ -380,8 +382,7 @@
         import pmtest
 
     def testClassReload(self):
-        mgr = PersistentModuleManager(self.registry)
-        mgr.new("foo", src)
+        self.registry.newModule("foo", src)
         get_transaction().commit()
         import foo
         obj = foo.Foo()