[Zope3-checkins] CVS: Zope3/src/zodb/zeo/tests - zeoserver.py:1.7.6.1 test_conn.py:1.3.6.1 forker.py:1.4.6.1 connection.py:1.5.2.3

Jeremy Hylton jeremy@zope.com
Thu, 13 Feb 2003 19:29:11 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo/tests
In directory cvs.zope.org:/tmp/cvs-serv25656/zeo/tests

Modified Files:
      Tag: ZODB3-2-integration-branch
	zeoserver.py test_conn.py forker.py connection.py 
Log Message:
Port new connection tests from ZODB3.2.

Not all of the tests pass, but it's a good start.


=== Zope3/src/zodb/zeo/tests/zeoserver.py 1.7 => 1.7.6.1 ===
--- Zope3/src/zodb/zeo/tests/zeoserver.py:1.7	Mon Jan 27 14:44:14 2003
+++ Zope3/src/zodb/zeo/tests/zeoserver.py	Thu Feb 13 19:29:10 2003
@@ -29,24 +29,12 @@
 import zodb.zeo.server
 from zodb.zeo import threadedasync
 
-
 def load_storage(fp):
     context = ZConfig.Context.Context()
     rootconf = context.loadFile(fp)
     storageconf = rootconf.getSection('Storage')
     return config.createStorage(storageconf)
 
-
-def cleanup(storage):
-    # FileStorage and the Berkeley storages have this method, which deletes
-    # all files and directories used by the storage.  This prevents @-files
-    # from clogging up /tmp
-    try:
-        storage.cleanup()
-    except AttributeError:
-        pass
-
-
 class ZEOTestServer(asyncore.dispatcher):
     """A server for killing the whole process at the end of a test.
 
@@ -64,10 +52,10 @@
     """
     __super_init = asyncore.dispatcher.__init__
 
-    def __init__(self, addr, storage, keep):
+    def __init__(self, addr, server, keep):
         self.__super_init()
         self._sockets = [self]
-        self._storage = storage
+        self._server = server
         self._keep = keep
         # Count down to zero, the number of connects
         self._count = 1
@@ -95,9 +83,9 @@
         # the ack character until the storage is finished closing.
         if self._count <= 0:
             self.logger.info('closing the storage')
-            self._storage.close()
-            if not self._keep:
-                cleanup(self._storage)
+            self._server.close_server()
+            for storage in self._server.storages.values():
+                storage.cleanup()
             self.logger.info('exiting')
             # Close all the other sockets so that we don't have to wait
             # for os._exit() to get to it before starting the next
@@ -137,8 +125,11 @@
     ro_svr = False
     keep = False
     configfile = None
+    invalidation_queue_size = 100
+    transaction_timeout = None
+    monitor_address = None
     # Parse the arguments and let getopt.error percolate
-    opts, args = getopt.getopt(sys.argv[1:], 'rkC:')
+    opts, args = getopt.getopt(sys.argv[1:], 'rkC:Q:T:m:')
     for opt, arg in opts:
         if opt == '-r':
             ro_svr = True
@@ -146,6 +137,12 @@
             keep = True
         elif opt == '-C':
             configfile = arg
+        elif opt == '-Q':
+            invalidation_queue_size = int(arg)
+        elif opt == '-T':
+            transaction_timeout = int(arg)
+        elif opt == '-m':
+            monitor_address = '', int(arg)
     # Open the config file and let ZConfig parse the data there.  Then remove
     # the config file, otherwise we'll leave turds.
     fp = open(configfile, 'r')
@@ -156,19 +153,24 @@
     zeo_port = int(args[0])
     test_port = zeo_port + 1
     test_addr = ('', test_port)
+    addr = ('', zeo_port)
+    serv = zodb.zeo.server.StorageServer(
+        addr, {'1': storage}, ro_svr,
+        invalidation_queue_size=invalidation_queue_size,
+        transaction_timeout=transaction_timeout,
+        monitor_address=monitor_address)
     try:
         logger.info('creating the test server, ro: %s, keep: %s',
                     ro_svr, keep)
-        t = ZEOTestServer(test_addr, storage, keep)
+        t = ZEOTestServer(test_addr, serv, keep)
     except socket.error, e:
         if e[0] <> errno.EADDRINUSE: raise
         logger.info('addr in use, closing and exiting')
         storage.close()
-        cleanup(storage)
+        storage.cleanup()
         sys.exit(2)
     addr = ('', zeo_port)
     logger.info('creating the storage server')
-    serv = zodb.zeo.server.StorageServer(addr, {'1': storage}, ro_svr)
     t.register_socket(serv.dispatcher)
     # Loop for socket events
     logger.info('entering threadedasync loop')


=== Zope3/src/zodb/zeo/tests/test_conn.py 1.3 => 1.3.6.1 ===
--- Zope3/src/zodb/zeo/tests/test_conn.py:1.3	Wed Jan 22 16:46:53 2003
+++ Zope3/src/zodb/zeo/tests/test_conn.py	Thu Feb 13 19:29:10 2003
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2001 Zope Corporation and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -17,11 +17,10 @@
 platform-dependent scaffolding.
 """
 
-# System imports
 import unittest
-# Import the actual test class
-from zodb.zeo.tests import connection
 
+from zodb.zeo.tests import connection
+from zodb.storage.base import berkeley_is_available
 
 class FileStorageConfig:
     def getConfig(self, path, create, read_only):
@@ -35,7 +34,6 @@
                          create and 'yes' or 'no',
                          read_only and 'yes' or 'no')
 
-
 class BerkeleyStorageConfig:
     def getConfig(self, path, create, read_only):
         # Full always creates and doesn't have a read_only flag
@@ -47,54 +45,28 @@
         </Storage>""" % (path,
                          read_only and 'yes' or 'no')
 
+class MappingStorageConfig:
+    def getConfig(self, path, create, read_only):
+        return """\
+        <Storage>
+            type MappingStorage
+            name %s
+        </Storage>""" % path
 
-class FileStorageConnectionTests(
-    FileStorageConfig,
-    connection.ConnectionTests
-    ):
-    """FileStorage-specific connection tests."""
-
-
-class FileStorageReconnectionTests(
-    FileStorageConfig,
-    connection.ReconnectionTests
-    ):
-    """FileStorage-specific re-connection tests."""
-
-
-class BDBConnectionTests(
-    BerkeleyStorageConfig,
-    connection.ConnectionTests
-    ):
-    """Berkeley storage connection tests."""
-
-
-class BDBReconnectionTests(
-    BerkeleyStorageConfig,
-    connection.ReconnectionTests
-    ):
-    """Berkeley storage re-connection tests."""
-
-
-test_classes = [FileStorageConnectionTests, FileStorageReconnectionTests]
-
-from zodb.storage.base import berkeley_is_available
+tests = [connection.ConnectionTests, connection.ReconnectionTests]
+configs = [FileStorageConfig, MappingStorageConfig]
 if berkeley_is_available:
-    test_classes.append(BDBConnectionTests)
-    test_classes.append(BDBReconnectionTests)
-
+    configs += [BerkeleyStorageConfig]
 
 def test_suite():
-    # shutup warnings about mktemp
-    import warnings
-    warnings.filterwarnings("ignore", "mktemp")
-
     suite = unittest.TestSuite()
-    for klass in test_classes:
-        sub = unittest.makeSuite(klass, 'check')
-        suite.addTest(sub)
+    for testclass in tests:
+        for configclass in configs:
+            # synthesize a concrete class combining tests and configuration
+            name = "%s:%s" % (testclass.__name__,
+                              configclass.__name__)
+            aclass = type.__new__(type, name,
+                                  (configclass, testclass, object), {})
+            sub = unittest.makeSuite(aclass, "check")
+            suite.addTest(sub)
     return suite
-
-
-if __name__ == "__main__":
-    unittest.main(defaultTest='test_suite')


=== Zope3/src/zodb/zeo/tests/forker.py 1.4 => 1.4.6.1 ===
--- Zope3/src/zodb/zeo/tests/forker.py:1.4	Mon Jan 27 14:44:14 2003
+++ Zope3/src/zodb/zeo/tests/forker.py	Thu Feb 13 19:29:10 2003
@@ -51,8 +51,8 @@
     raise RuntimeError, "Can't find port"
 
 
-def start_zeo_server(conf, addr=None, ro_svr=False, keep=False,
-                     admin_retries=10):
+def start_zeo_server(conf, addr=None, ro_svr=False, monitor=False, keep=False,
+                     invq=None, timeout=None):
     """Start a ZEO server in a separate process.
 
     Returns the ZEO port, the test server port, and the pid.
@@ -72,11 +72,19 @@
     if script.endswith('.pyc'):
         script = script[:-1]
     # Create a list of arguments, which we'll tuplify below
-    args = [sys.executable, script, '-C', tmpfile]
+    qa = _quote_arg
+    args = [qa(sys.executable), qa(script), '-C', qa(tmpfile)]
     if ro_svr:
         args.append('-r')
     if keep:
         args.append('-k')
+    if invq:
+        args += ['-Q', str(invq)]
+    if timeout:
+        args += ['-T', str(timeout)]
+    if monitor:
+        # XXX Is it safe to reuse the port?
+        args += ['-m', '42000']
     args.append(str(port))
     d = os.environ.copy()
     d['PYTHONPATH'] = os.pathsep.join(sys.path)
@@ -86,7 +94,7 @@
     # Always do a sleep as the first thing, since we don't expect
     # the spawned process to get started right away.
     delay = 0.25
-    for i in range(admin_retries):
+    for i in range(10):
         time.sleep(delay)
         logging.debug('forker: connect %s', i)
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -107,6 +115,12 @@
         raise
     return ('localhost', port), adminaddr, pid
 
+if sys.platform[:3].lower() == "win":
+    def _quote_arg(s):
+        return '"%s"' % s
+else:
+    def _quote_arg(s):
+        return s
 
 def shutdown_zeo_server(adminaddr):
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


=== Zope3/src/zodb/zeo/tests/connection.py 1.5.2.2 => 1.5.2.3 ===
--- Zope3/src/zodb/zeo/tests/connection.py:1.5.2.2	Thu Feb 13 12:40:14 2003
+++ Zope3/src/zodb/zeo/tests/connection.py	Thu Feb 13 19:29:10 2003
@@ -36,22 +36,31 @@
 from zodb.storage.tests.base import zodb_pickle, zodb_unpickle
 from zodb.storage.tests.base import handle_all_serials, ZERO
 
+class TestClientStorage(ClientStorage):
+
+    def verify_cache(self, stub):
+        self.end_verify = threading.Event()
+        self.verify_result = ClientStorage.verify_cache(self, stub)
+
+    def endVerify(self):
+        ClientStorage.endVerify(self)
+        self.end_verify.set()
 
 class DummyDB:
     def invalidate(self, *args, **kws):
         pass
 
-
 class CommonSetupTearDown(StorageTestBase):
-    """Tests that explicitly manage the server process.
-
-    To test the cache or re-connection, these test cases explicit
-    start and stop a ZEO storage server.
-    """
+    """Common boilerplate"""
 
     __super_setUp = StorageTestBase.setUp
     __super_tearDown = StorageTestBase.tearDown
 
+    keep = 0
+    invq = None
+    timeout = None
+    monitor = 0
+
     def setUp(self):
         """Test setup for connection tests.
 
@@ -67,7 +76,7 @@
         self._pids = []
         self._servers = []
         self._newAddr()
-        self.startServer(keep=self.keep)
+        self.startServer()
 
     def tearDown(self):
         """Try to cause the tests to halt"""
@@ -99,21 +108,18 @@
         # port+1 is also used, so only draw even port numbers
         return 'localhost', random.randrange(25000, 30000, 2)
 
-    def getConfig(self):
-        raise NotImplementedError
-
     def openClientStorage(self, cache='', cache_size=200000, wait=True,
                           read_only=False, read_only_fallback=False,
                           addr=None):
         if addr is None:
             addr = self.addr
-        storage = ClientStorage(addr,
-                                client=cache,
-                                cache_size=cache_size,
-                                wait=wait,
-                                min_disconnect_poll=0.1,
-                                read_only=read_only,
-                                read_only_fallback=read_only_fallback)
+        storage = TestClientStorage(addr,
+                                    client=cache,
+                                    cache_size=cache_size,
+                                    wait=wait,
+                                    min_disconnect_poll=0.1,
+                                    read_only=read_only,
+                                    read_only_fallback=read_only_fallback)
         storage.registerDB(DummyDB())
         return storage
 
@@ -122,6 +128,8 @@
     # it fails with an error.
     forker_admin_retries = 10
 
+    # Concrete test classes must provide a getConfig() method
+
     def startServer(self, create=True, index=0,
                     read_only=False, ro_svr=False, keep=False):
         addr = self.addr[index]
@@ -130,7 +138,8 @@
         path = "%s.%d" % (self.file, index)
         conf = self.getConfig(path, create, read_only)
         zeoport, adminaddr, pid = forker.start_zeo_server(
-            conf, addr, ro_svr, keep, self.forker_admin_retries)
+            conf, addr, ro_svr,
+            self.monitor, self.keep, self.invq, self.timeout)
         self._pids.append(pid)
         self._servers.append(adminaddr)
 
@@ -142,11 +151,13 @@
             forker.shutdown_zeo_server(adminaddr)
             self._servers[index] = None
 
-    def pollUp(self, timeout=30.0):
+    def pollUp(self, timeout=30.0, storage=None):
         # Poll until we're connected
+        if storage is None:
+            storage = self._storage
         now = time.time()
         giveup = now + timeout
-        while not self._storage.is_connected():
+        while not storage.is_connected():
             asyncore.poll(0.1)
             now = time.time()
             if now > giveup:
@@ -164,7 +175,11 @@
 
 
 class ConnectionTests(CommonSetupTearDown):
-    keep = False
+    """Tests that explicitly manage the server process.
+
+    To test the cache or re-connection, these test cases explicit
+    start and stop a ZEO storage server.
+    """
 
     def checkMultipleAddresses(self):
         for i in range(4):
@@ -267,7 +282,7 @@
         self._dostore()
 
     def checkDisconnectionError(self):
-        # Make sure we get a Disconnected when we try to read an
+        # Make sure we get a ClientDisconnected when we try to read an
         # object when we're not connected to a storage server and the
         # object is not in the cache.
         self.shutdownServer()
@@ -275,6 +290,22 @@
         self.assertRaises(ClientDisconnected,
                           self._storage.load, 'fredwash', '')
 
+    def checkDisconnectedAbort(self):
+        self._storage = self.openClientStorage()
+        self._dostore()
+        oids = [self._storage.newObjectId() for i in range(5)]
+        txn = Transaction()
+        self._storage.tpcBegin(txn)
+        for oid in oids:
+            data = zodb_pickle(MinPO(oid))
+            self._storage.store(oid, None, data, '', txn)
+        self.shutdownServer()
+        self.assertRaises(ClientDisconnected, self._storage.tpcVote, txn)
+        self._storage.tpcAbort(txn)
+        self.startServer(create=0)
+        self._storage._wait()
+        self._dostore()
+
     def checkBasicPersistence(self):
         # Verify cached data persists across client storage instances.
 
@@ -425,6 +456,7 @@
 class ReconnectionTests(CommonSetupTearDown):
     keep = True
     forker_admin_retries = 20
+    invq = 2
 
     def checkReadOnlyStorage(self):
         # Open a read-only client to a read-only *storage*; stores fail
@@ -558,6 +590,136 @@
         else:
             self.fail("Couldn't store after starting a read-write server")
 
+    def checkNoVerificationOnServerRestart(self):
+        self._storage = self.openClientStorage()
+        # When we create a new storage, it should always do a full
+        # verification
+        self.assertEqual(self._storage.verify_result, "full verification")
+        self._dostore()
+        self.shutdownServer()
+        self.pollDown()
+        self._storage.verify_result = None
+        self.startServer(create=0)
+        self.pollUp()
+        # There were no transactions committed, so no verification
+        # should be needed.
+        self.assertEqual(self._storage.verify_result, "no verification")
+        
+    def checkNoVerificationOnServerRestartWith2Clients(self):
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "full verification")
+        
+        self._storage = self.openClientStorage()
+        oid = self._storage.newObjectId()
+        # When we create a new storage, it should always do a full
+        # verification
+        self.assertEqual(self._storage.verify_result, "full verification")
+        # do two storages of the object to make sure an invalidation
+        # message is generated
+        revid = self._dostore(oid)
+        self._dostore(oid, revid)
+
+        perstorage.load(oid, '')
+
+        self.shutdownServer()
+
+        self.pollDown()
+        self._storage.verify_result = None
+        perstorage.verify_result = None
+        self.startServer(create=0)
+        self.pollUp()
+        self.pollUp(storage=perstorage)
+        # There were no transactions committed, so no verification
+        # should be needed.
+        self.assertEqual(self._storage.verify_result, "no verification")
+        self.assertEqual(perstorage.verify_result, "no verification")
+        perstorage.close()
+
+    def checkQuickVerificationWith2Clients(self):
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "full verification")
+        
+        self._storage = self.openClientStorage()
+        oid = self._storage.newObjectId()
+        # When we create a new storage, it should always do a full
+        # verification
+        self.assertEqual(self._storage.verify_result, "full verification")
+        # do two storages of the object to make sure an invalidation
+        # message is generated
+        revid = self._dostore(oid)
+        revid = self._dostore(oid, revid)
+
+        perstorage.load(oid, '')
+        perstorage.close()
+        
+        revid = self._dostore(oid, revid)
+
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "quick verification")
+
+        self.assertEqual(perstorage.load(oid, ''),
+                         self._storage.load(oid, ''))
+
+
+
+    def checkVerificationWith2ClientsInvqOverflow(self):
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "full verification")
+        
+        self._storage = self.openClientStorage()
+        oid = self._storage.newObjectId()
+        # When we create a new storage, it should always do a full
+        # verification
+        self.assertEqual(self._storage.verify_result, "full verification")
+        # do two storages of the object to make sure an invalidation
+        # message is generated
+        revid = self._dostore(oid)
+        revid = self._dostore(oid, revid)
+
+        perstorage.load(oid, '')
+        perstorage.close()
+
+        # the test code sets invq bound to 2
+        for i in range(5):
+            revid = self._dostore(oid, revid)
+
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "full verification")
+        t = time.time() + 30
+        while not perstorage.end_verify.isSet():
+            perstorage.sync()
+            if time.time() > t:
+                self.fail("timed out waiting for endVerify")
+
+        self.assertEqual(self._storage.load(oid, '')[1], revid)
+        self.assertEqual(perstorage.load(oid, ''),
+                         self._storage.load(oid, ''))
+
+        perstorage.close()
+
+class TimeoutTests(CommonSetupTearDown):
+    timeout = 1
+
+    def checkTimeout(self):
+        storage = self.openClientStorage()
+        txn = Transaction()
+        storage.tpcBegin(txn)
+        storage.tpcVote(txn)
+        time.sleep(2)
+        self.assertRaises(ClientDisconnected, storage.tpcFinish, txn)
+
+    def checkTimeoutOnAbort(self):
+        storage = self.openClientStorage()
+        txn = Transaction()
+        storage.tpcBegin(txn)
+        storage.tpcVote(txn)
+        storage.tpcAbort(txn)
+
+    def checkTimeoutOnAbortNoLock(self):
+        storage = self.openClientStorage()
+        txn = Transaction()
+        storage.tpcBegin(txn)
+        storage.tpcAbort(txn)
 
 class MSTThread(threading.Thread):