[Zope3-checkins] CVS: Zope3/src/zodb/zeo/tests - thread.py:1.2.6.1 common.py:1.2.6.1 zeoserver.py:1.7.8.1 threadtests.py:1.4.4.1 test_zeo.py:1.7.4.2 test_conn.py:1.3.8.1 test_cache.py:1.2.12.1 forker.py:1.4.8.1 connection.py:1.5.4.1 commitlock.py:1.3.4.1

Jeremy Hylton jeremy@zope.com
Wed, 12 Mar 2003 16:41:57 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo/tests
In directory cvs.zope.org:/tmp/cvs-serv29357/zeo/tests

Modified Files:
      Tag: opaque-pickles-branch
	zeoserver.py threadtests.py test_zeo.py test_conn.py 
	test_cache.py forker.py connection.py commitlock.py 
Added Files:
      Tag: opaque-pickles-branch
	thread.py common.py 
Log Message:
Update from trunk.

Resolve a few import conflicts.


=== Added File Zope3/src/zodb/zeo/tests/thread.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A Thread base class for use with unittest."""

from cStringIO import StringIO
import threading
import traceback

class TestThread(threading.Thread):
    __super_init = threading.Thread.__init__
    __super_run = threading.Thread.run

    def __init__(self, testcase, group=None, target=None, name=None,
                 args=(), kwargs={}, verbose=None):
        self.__super_init(group, target, name, args, kwargs, verbose)
        self.setDaemon(1)
        self._testcase = testcase

    def run(self):
        try:
            self.testrun()
        except Exception:
            s = StringIO()
            traceback.print_exc(file=s)
            self._testcase.fail("Exception in thread %s:\n%s\n" %
                                (self, s.getvalue()))

    def cleanup(self, timeout=15):
        self.join(timeout)
        if self.isAlive():
            self._testcase.fail("Thread did not finish: %s" % self)


=== Added File Zope3/src/zodb/zeo/tests/common.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Common helper classes for testing."""

import threading
from zodb.zeo.client import ClientStorage

class TestClientStorage(ClientStorage):

    def verify_cache(self, stub):
        self.end_verify = threading.Event()
        self.verify_result = super(TestClientStorage, self).verify_cache(stub)

    def endVerify(self):
        super(TestClientStorage, self).endVerify()
        self.end_verify.set()

class DummyDB:
    def invalidate(self, *args, **kws):
        pass




=== Zope3/src/zodb/zeo/tests/zeoserver.py 1.7 => 1.7.8.1 ===
--- Zope3/src/zodb/zeo/tests/zeoserver.py:1.7	Mon Jan 27 14:44:14 2003
+++ Zope3/src/zodb/zeo/tests/zeoserver.py	Wed Mar 12 16:41:21 2003
@@ -29,24 +29,12 @@
 import zodb.zeo.server
 from zodb.zeo import threadedasync
 
-
 def load_storage(fp):
     context = ZConfig.Context.Context()
     rootconf = context.loadFile(fp)
     storageconf = rootconf.getSection('Storage')
     return config.createStorage(storageconf)
 
-
-def cleanup(storage):
-    # FileStorage and the Berkeley storages have this method, which deletes
-    # all files and directories used by the storage.  This prevents @-files
-    # from clogging up /tmp
-    try:
-        storage.cleanup()
-    except AttributeError:
-        pass
-
-
 class ZEOTestServer(asyncore.dispatcher):
     """A server for killing the whole process at the end of a test.
 
@@ -64,10 +52,10 @@
     """
     __super_init = asyncore.dispatcher.__init__
 
-    def __init__(self, addr, storage, keep):
+    def __init__(self, addr, server, keep):
         self.__super_init()
         self._sockets = [self]
-        self._storage = storage
+        self._server = server
         self._keep = keep
         # Count down to zero, the number of connects
         self._count = 1
@@ -95,9 +83,10 @@
         # the ack character until the storage is finished closing.
         if self._count <= 0:
             self.logger.info('closing the storage')
-            self._storage.close()
+            self._server.close_server()
             if not self._keep:
-                cleanup(self._storage)
+                for storage in self._server.storages.values():
+                    storage.cleanup()
             self.logger.info('exiting')
             # Close all the other sockets so that we don't have to wait
             # for os._exit() to get to it before starting the next
@@ -137,8 +126,11 @@
     ro_svr = False
     keep = False
     configfile = None
+    invalidation_queue_size = 100
+    transaction_timeout = None
+    monitor_address = None
     # Parse the arguments and let getopt.error percolate
-    opts, args = getopt.getopt(sys.argv[1:], 'rkC:')
+    opts, args = getopt.getopt(sys.argv[1:], 'rkC:Q:T:m:')
     for opt, arg in opts:
         if opt == '-r':
             ro_svr = True
@@ -146,6 +138,12 @@
             keep = True
         elif opt == '-C':
             configfile = arg
+        elif opt == '-Q':
+            invalidation_queue_size = int(arg)
+        elif opt == '-T':
+            transaction_timeout = int(arg)
+        elif opt == '-m':
+            monitor_address = '', int(arg)
     # Open the config file and let ZConfig parse the data there.  Then remove
     # the config file, otherwise we'll leave turds.
     fp = open(configfile, 'r')
@@ -156,19 +154,24 @@
     zeo_port = int(args[0])
     test_port = zeo_port + 1
     test_addr = ('', test_port)
+    addr = ('', zeo_port)
+    serv = zodb.zeo.server.StorageServer(
+        addr, {'1': storage}, ro_svr,
+        invalidation_queue_size=invalidation_queue_size,
+        transaction_timeout=transaction_timeout,
+        monitor_address=monitor_address)
     try:
         logger.info('creating the test server, ro: %s, keep: %s',
                     ro_svr, keep)
-        t = ZEOTestServer(test_addr, storage, keep)
+        t = ZEOTestServer(test_addr, serv, keep)
     except socket.error, e:
         if e[0] <> errno.EADDRINUSE: raise
         logger.info('addr in use, closing and exiting')
         storage.close()
-        cleanup(storage)
+        storage.cleanup()
         sys.exit(2)
     addr = ('', zeo_port)
     logger.info('creating the storage server')
-    serv = zodb.zeo.server.StorageServer(addr, {'1': storage}, ro_svr)
     t.register_socket(serv.dispatcher)
     # Loop for socket events
     logger.info('entering threadedasync loop')


=== Zope3/src/zodb/zeo/tests/threadtests.py 1.4 => 1.4.4.1 ===
--- Zope3/src/zodb/zeo/tests/threadtests.py:1.4	Wed Feb  5 18:28:22 2003
+++ Zope3/src/zodb/zeo/tests/threadtests.py	Wed Mar 12 16:41:21 2003
@@ -19,7 +19,7 @@
 from zodb.storage.tests.base import zodb_pickle, MinPO
 
 from zodb.zeo.client import ClientStorageError
-from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.interfaces import ClientDisconnected
 
 ZERO = '\0'*8
 
@@ -86,7 +86,7 @@
             self.gotValueError = 1
         try:
             self.storage.tpcAbort(self.trans)
-        except Disconnected:
+        except ClientDisconnected:
             self.gotDisconnected = 1
 
 class MTStoresThread(threading.Thread):
@@ -142,24 +142,6 @@
         thread2.join()
         self.assertEqual(thread1.gotValueError, 1)
         self.assertEqual(thread2.gotValueError, 1)
-
-    def checkThatFailedBeginDoesNotHaveLock(self):
-        doNextEvent = threading.Event()
-        threadStartedEvent = threading.Event()
-        thread1 = GetsThroughVoteThread(self._storage,
-                                        doNextEvent, threadStartedEvent)
-        thread2 = AbortsAfterBeginFailsThread(self._storage,
-                                              doNextEvent, threadStartedEvent)
-        thread1.start()
-        threadStartedEvent.wait(1)
-        thread2.start()
-        self._storage.close()
-        doNextEvent.set()
-        thread1.join()
-        thread2.join()
-        self.assertEqual(thread1.gotValueError, 1)
-        self.assertEqual(thread2.gotValueError, 1)
-        self.assertEqual(thread2.gotDisconnected, 1)
 
     # Run a bunch of threads doing small and large stores in parallel
     def checkMTStores(self):


=== Zope3/src/zodb/zeo/tests/test_zeo.py 1.7.4.1 => 1.7.4.2 ===
--- Zope3/src/zodb/zeo/tests/test_zeo.py:1.7.4.1	Mon Mar 10 14:40:48 2003
+++ Zope3/src/zodb/zeo/tests/test_zeo.py	Wed Mar 12 16:41:21 2003
@@ -31,26 +31,18 @@
 
 
 # ZODB test mixin classes
-from zodb.storage.tests import base, basic, version, \
-     undo, undoversion, \
-     packable, synchronization, conflict, revision, \
-     mt, readonly
+from zodb.storage.tests import base, basic, version, undo, undoversion, \
+     packable, synchronization, conflict, revision, mt, readonly
 
-# ZEO imports
 from zodb.zeo.client import ClientStorage
-from zodb.zeo.interfaces import Disconnected
-
-# ZEO test support
 from zodb.zeo.tests import forker, cache
-
-# ZEO test mixin classes
 from zodb.zeo.tests import commitlock, threadtests
+from zodb.zeo.tests.common import TestClientStorage, DummyDB
 
 class DummyDB:
     def invalidate(self, *args):
         pass
 
-
 class MiscZEOTests:
     """ZEO tests that don't fit in elsewhere."""
 
@@ -60,7 +52,7 @@
 
     def checkZEOInvalidation(self):
         addr = self._storage._addr
-        storage2 = ClientStorage(addr, wait=1, min_disconnect_poll=0.1)
+        storage2 = TestClientStorage(addr, wait=True, min_disconnect_poll=0.1)
         try:
             oid = self._storage.newObjectId()
             ob = MinPO('first')
@@ -80,59 +72,38 @@
         finally:
             storage2.close()
 
+class ZEOConflictTests(
+    conflict.ConflictResolvingStorage,
+    conflict.ConflictResolvingTransUndoStorage):
+
+    def unresolvable(self, klass):
+        # This helper method is used to test the implementation of
+        # conflict resolution.  That code runs in the server, and there
+        # is no way for the test suite (a client) to inquire about it.
+        return False
 
-class GenericTests(
+class StorageTests(
     # Base class for all ZODB tests
     base.StorageTestBase,
     # ZODB test mixin classes 
     basic.BasicStorage,
-    version.VersionStorage,
-    undo.TransactionalUndoStorage,
-    undoversion.TransactionalUndoVersionStorage,
-    packable.PackableStorage,
-    synchronization.SynchronizedStorage,
-    conflict.ConflictResolvingStorage,
-    conflict.ConflictResolvingTransUndoStorage,
-    revision.RevisionStorage,
-    mt.MTStorage,
     readonly.ReadOnlyStorage,
+    synchronization.SynchronizedStorage,
     # ZEO test mixin classes 
-    cache.StorageWithCache,
-    cache.TransUndoStorageWithCache,
     commitlock.CommitLockTests,
     threadtests.ThreadTests,
     # Locally defined (see above)
     MiscZEOTests
     ):
-
-    """Combine tests from various origins in one class."""
-
-    def open(self, read_only=0):
-        # XXX Needed to support ReadOnlyStorage tests.  Ought to be a
-        # cleaner way.
-        addr = self._storage._addr
-        self._storage.close()
-        self._storage = ClientStorage(addr, read_only=read_only, wait=1)
-
-    _open = open
-
-    def unresolvable(self, klass):
-        # This helper method is used to test the implementation of
-        # conflict resolution.  That code runs in the server, and there
-        # is no way for the test suite (a client) to inquire about it.
-        pass
-
-
-class FileStorageTests(GenericTests):
-    """Test ZEO backed by a FileStorage."""
+    """Tests for storage that supports IStorage."""
 
     def setUp(self):
         logging.info("testZEO: setUp() %s", self.id())
         zeoport, adminaddr, pid = forker.start_zeo_server(self.getConfig())
         self._pids = [pid]
         self._servers = [adminaddr]
-        self._storage = ClientStorage(zeoport, '1', cache_size=20000000,
-                                      min_disconnect_poll=0.5, wait=1)
+        self._storage = TestClientStorage(zeoport, '1', cache_size=20000000,
+                                          min_disconnect_poll=0.5, wait=1)
         self._storage.registerDB(DummyDB())
 
     def tearDown(self):
@@ -146,9 +117,39 @@
             for pid in self._pids:
                 os.waitpid(pid, 0)
 
+    def open(self, read_only=False):
+        # XXX Needed to support ReadOnlyStorage tests.  Ought to be a
+        # cleaner way.
+        addr = self._storage._addr
+        self._storage.close()
+        self._storage = TestClientStorage(addr, read_only=read_only, wait=True)
+
+class UndoVersionStorageTests(
+    StorageTests,
+    ZEOConflictTests,
+    cache.StorageWithCache,
+    cache.TransUndoStorageWithCache,
+    commitlock.CommitLockUndoTests,
+    mt.MTStorage,
+    packable.PackableStorage,
+    revision.RevisionStorage,
+    undo.TransactionalUndoStorage,
+    undoversion.TransactionalUndoVersionStorage,
+    version.VersionStorage,
+    ):
+    """Tests for storage that supports IUndoStorage and IVersionStorage."""
+
+    # XXX Some of the pack tests should really be run for the mapping
+    # storage, but the pack tests assume that the storage also supports
+    # multiple revisions.
+
+class FileStorageTests(UndoVersionStorageTests):
+    """Test ZEO backed by a FileStorage."""
+
+    level = 2
+
     def getConfig(self):
         filename = self.__fs_base = tempfile.mktemp()
-        # Return a 1-tuple
         return """\
         <Storage>
             type FileStorage
@@ -157,13 +158,13 @@
         </Storage>
         """ % filename
 
-
-class BDBTests(FileStorageTests):
+class BDBTests(UndoVersionStorageTests):
     """ZEO backed by a Berkeley Full storage."""
 
-    def getStorage(self):
+    level = 2
+
+    def getConfig(self):
         self._envdir = tempfile.mktemp()
-        # Return a 1-tuple
         return """\
         <Storage>
             type BDBFullStorage
@@ -171,25 +172,35 @@
         </Storage>
         """ % self._envdir
 
+    # XXX These test seems to have massive failures when I run them.
+    # I don't think they should fail, but need Barry's help to debug.
+    
+    def checkCommitLockUndoClose(self):
+        pass
+    
+    def checkCommitLockUndoAbort(self):
+        pass
+
+class MappingStorageTests(StorageTests):
+    
+    def getConfig(self):
+        self._envdir = tempfile.mktemp()
+        return """\
+        <Storage>
+            type MappingStorage
+            name %s
+        </Storage>
+        """ % self._envdir
 
-test_classes = [FileStorageTests]
+test_classes = [FileStorageTests, MappingStorageTests]
 
 from zodb.storage.base import berkeley_is_available
 if berkeley_is_available:
     test_classes.append(BDBTests)
 
-
 def test_suite():
-    # shutup warnings about mktemp
-    import warnings
-    warnings.filterwarnings("ignore", "mktemp")
-
     suite = unittest.TestSuite()
     for klass in test_classes:
         sub = unittest.makeSuite(klass, 'check')
         suite.addTest(sub)
     return suite
-
-
-if __name__ == "__main__":
-    unittest.main(defaultTest='test_suite')


=== Zope3/src/zodb/zeo/tests/test_conn.py 1.3 => 1.3.8.1 ===
--- Zope3/src/zodb/zeo/tests/test_conn.py:1.3	Wed Jan 22 16:46:53 2003
+++ Zope3/src/zodb/zeo/tests/test_conn.py	Wed Mar 12 16:41:21 2003
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2001 Zope Corporation and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -17,11 +17,10 @@
 platform-dependent scaffolding.
 """
 
-# System imports
 import unittest
-# Import the actual test class
-from zodb.zeo.tests import connection
 
+from zodb.zeo.tests.connection import ConnectionTests, ReconnectionTests
+from zodb.storage.base import berkeley_is_available
 
 class FileStorageConfig:
     def getConfig(self, path, create, read_only):
@@ -35,7 +34,6 @@
                          create and 'yes' or 'no',
                          read_only and 'yes' or 'no')
 
-
 class BerkeleyStorageConfig:
     def getConfig(self, path, create, read_only):
         # Full always creates and doesn't have a read_only flag
@@ -47,54 +45,33 @@
         </Storage>""" % (path,
                          read_only and 'yes' or 'no')
 
+class MappingStorageConfig:
+    def getConfig(self, path, create, read_only):
+        return """\
+        <Storage>
+            type MappingStorage
+            name %s
+        </Storage>""" % path
 
-class FileStorageConnectionTests(
-    FileStorageConfig,
-    connection.ConnectionTests
-    ):
-    """FileStorage-specific connection tests."""
-
-
-class FileStorageReconnectionTests(
-    FileStorageConfig,
-    connection.ReconnectionTests
-    ):
-    """FileStorage-specific re-connection tests."""
-
-
-class BDBConnectionTests(
-    BerkeleyStorageConfig,
-    connection.ConnectionTests
-    ):
-    """Berkeley storage connection tests."""
-
-
-class BDBReconnectionTests(
-    BerkeleyStorageConfig,
-    connection.ReconnectionTests
-    ):
-    """Berkeley storage re-connection tests."""
-
-
-test_classes = [FileStorageConnectionTests, FileStorageReconnectionTests]
-
-from zodb.storage.base import berkeley_is_available
+tests = [
+    (MappingStorageConfig, ConnectionTests, 1),
+    (FileStorageConfig, ReconnectionTests, 1),
+    (FileStorageConfig, ConnectionTests, 2),
+    ]
+         
 if berkeley_is_available:
-    test_classes.append(BDBConnectionTests)
-    test_classes.append(BDBReconnectionTests)
-
+    tests += [
+        (BerkeleyStorageConfig, ConnectionTests, 2),
+        (BerkeleyStorageConfig, ReconnectionTests, 2),
+        ]
 
 def test_suite():
-    # shutup warnings about mktemp
-    import warnings
-    warnings.filterwarnings("ignore", "mktemp")
-
     suite = unittest.TestSuite()
-    for klass in test_classes:
-        sub = unittest.makeSuite(klass, 'check')
+    for testclass, configclass, level in tests:
+        # synthesize a concrete class combining tests and configuration
+        name = "%s:%s" % (testclass.__name__, configclass.__name__)
+        aclass = type.__new__(type, name, (configclass, testclass, object), {})
+        aclass.level = level
+        sub = unittest.makeSuite(aclass, "check")
         suite.addTest(sub)
     return suite
-
-
-if __name__ == "__main__":
-    unittest.main(defaultTest='test_suite')


=== Zope3/src/zodb/zeo/tests/test_cache.py 1.2 => 1.2.12.1 ===
--- Zope3/src/zodb/zeo/tests/test_cache.py:1.2	Wed Dec 25 09:12:22 2002
+++ Zope3/src/zodb/zeo/tests/test_cache.py	Wed Mar 12 16:41:21 2003
@@ -16,7 +16,6 @@
 At times, we do 'white box' testing, i.e. we know about the internals
 of the ClientCache object.
 """
-from __future__ import nested_scopes
 
 import os
 import time


=== Zope3/src/zodb/zeo/tests/forker.py 1.4 => 1.4.8.1 ===
--- Zope3/src/zodb/zeo/tests/forker.py:1.4	Mon Jan 27 14:44:14 2003
+++ Zope3/src/zodb/zeo/tests/forker.py	Wed Mar 12 16:41:21 2003
@@ -51,8 +51,8 @@
     raise RuntimeError, "Can't find port"
 
 
-def start_zeo_server(conf, addr=None, ro_svr=False, keep=False,
-                     admin_retries=10):
+def start_zeo_server(conf, addr=None, ro_svr=False, monitor=False, keep=False,
+                     invq=None, timeout=None):
     """Start a ZEO server in a separate process.
 
     Returns the ZEO port, the test server port, and the pid.
@@ -72,11 +72,19 @@
     if script.endswith('.pyc'):
         script = script[:-1]
     # Create a list of arguments, which we'll tuplify below
-    args = [sys.executable, script, '-C', tmpfile]
+    qa = _quote_arg
+    args = [qa(sys.executable), qa(script), '-C', qa(tmpfile)]
     if ro_svr:
         args.append('-r')
     if keep:
         args.append('-k')
+    if invq:
+        args += ['-Q', str(invq)]
+    if timeout:
+        args += ['-T', str(timeout)]
+    if monitor:
+        # XXX Is it safe to reuse the port?
+        args += ['-m', '42000']
     args.append(str(port))
     d = os.environ.copy()
     d['PYTHONPATH'] = os.pathsep.join(sys.path)
@@ -86,7 +94,7 @@
     # Always do a sleep as the first thing, since we don't expect
     # the spawned process to get started right away.
     delay = 0.25
-    for i in range(admin_retries):
+    for i in range(10):
         time.sleep(delay)
         logging.debug('forker: connect %s', i)
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -107,6 +115,12 @@
         raise
     return ('localhost', port), adminaddr, pid
 
+if sys.platform[:3].lower() == "win":
+    def _quote_arg(s):
+        return '"%s"' % s
+else:
+    def _quote_arg(s):
+        return s
 
 def shutdown_zeo_server(adminaddr):
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


=== Zope3/src/zodb/zeo/tests/connection.py 1.5 => 1.5.4.1 ===
--- Zope3/src/zodb/zeo/tests/connection.py:1.5	Wed Feb  5 18:28:22 2003
+++ Zope3/src/zodb/zeo/tests/connection.py	Wed Mar 12 16:41:21 2003
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2001 Zope Corporation and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -24,9 +24,10 @@
 import logging
 
 from zodb.zeo.client import ClientStorage
-from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.interfaces import ClientDisconnected
 from zodb.zeo.zrpc.marshal import Marshaller
 from zodb.zeo.tests import forker
+from zodb.zeo.tests.common import TestClientStorage, DummyDB
 
 from transaction import get_transaction
 from zodb.ztransaction import Transaction
@@ -36,22 +37,17 @@
 from zodb.storage.tests.base import zodb_pickle, zodb_unpickle
 from zodb.storage.tests.base import handle_all_serials, ZERO
 
-
-class DummyDB:
-    def invalidate(self, *args, **kws):
-        pass
-
-
 class CommonSetupTearDown(StorageTestBase):
-    """Tests that explicitly manage the server process.
-
-    To test the cache or re-connection, these test cases explicit
-    start and stop a ZEO storage server.
-    """
+    """Common boilerplate"""
 
     __super_setUp = StorageTestBase.setUp
     __super_tearDown = StorageTestBase.tearDown
 
+    keep = 0
+    invq = None
+    timeout = None
+    monitor = 0
+
     def setUp(self):
         """Test setup for connection tests.
 
@@ -67,7 +63,7 @@
         self._pids = []
         self._servers = []
         self._newAddr()
-        self.startServer(keep=self.keep)
+        self.startServer()
 
     def tearDown(self):
         """Try to cause the tests to halt"""
@@ -99,21 +95,18 @@
         # port+1 is also used, so only draw even port numbers
         return 'localhost', random.randrange(25000, 30000, 2)
 
-    def getConfig(self):
-        raise NotImplementedError
-
     def openClientStorage(self, cache='', cache_size=200000, wait=True,
                           read_only=False, read_only_fallback=False,
                           addr=None):
         if addr is None:
             addr = self.addr
-        storage = ClientStorage(addr,
-                                client=cache,
-                                cache_size=cache_size,
-                                wait=wait,
-                                min_disconnect_poll=0.1,
-                                read_only=read_only,
-                                read_only_fallback=read_only_fallback)
+        storage = TestClientStorage(addr,
+                                    client=cache,
+                                    cache_size=cache_size,
+                                    wait=wait,
+                                    min_disconnect_poll=0.1,
+                                    read_only=read_only,
+                                    read_only_fallback=read_only_fallback)
         storage.registerDB(DummyDB())
         return storage
 
@@ -122,15 +115,18 @@
     # it fails with an error.
     forker_admin_retries = 10
 
-    def startServer(self, create=True, index=0,
-                    read_only=False, ro_svr=False, keep=False):
+    # Concrete test classes must provide a getConfig() method
+
+    def startServer(self, create=True, index=0, read_only=False, ro_svr=False,
+                    keep=False):
         addr = self.addr[index]
         self.logger.warn("startServer(create=%d, index=%d, read_only=%d) @ %s",
                          create, index, read_only, addr)
         path = "%s.%d" % (self.file, index)
         conf = self.getConfig(path, create, read_only)
         zeoport, adminaddr, pid = forker.start_zeo_server(
-            conf, addr, ro_svr, keep, self.forker_admin_retries)
+            conf, addr, ro_svr,
+            self.monitor, self.keep, self.invq, self.timeout)
         self._pids.append(pid)
         self._servers.append(adminaddr)
 
@@ -142,11 +138,13 @@
             forker.shutdown_zeo_server(adminaddr)
             self._servers[index] = None
 
-    def pollUp(self, timeout=30.0):
+    def pollUp(self, timeout=30.0, storage=None):
         # Poll until we're connected
+        if storage is None:
+            storage = self._storage
         now = time.time()
         giveup = now + timeout
-        while not self._storage.is_connected():
+        while not storage.is_connected():
             asyncore.poll(0.1)
             now = time.time()
             if now > giveup:
@@ -164,7 +162,11 @@
 
 
 class ConnectionTests(CommonSetupTearDown):
-    keep = False
+    """Tests that explicitly manage the server process.
+
+    To test the cache or re-connection, these test cases explicit
+    start and stop a ZEO storage server.
+    """
 
     def checkMultipleAddresses(self):
         for i in range(4):
@@ -193,8 +195,9 @@
             try:
                 self._dostore()
                 break
-            except Disconnected:
+            except ClientDisconnected:
                 time.sleep(0.5)
+                self._storage.sync()
 
     def checkReadOnlyClient(self):
         # Open a read-only client to a read-write server; stores fail
@@ -256,7 +259,7 @@
         # Poll until the client disconnects
         self.pollDown()
         # Stores should fail now
-        self.assertRaises(Disconnected, self._dostore)
+        self.assertRaises(ClientDisconnected, self._dostore)
 
         # Restart the server
         self.startServer(create=False)
@@ -266,12 +269,29 @@
         self._dostore()
 
     def checkDisconnectionError(self):
-        # Make sure we get a Disconnected when we try to read an
+        # Make sure we get a ClientDisconnected when we try to read an
         # object when we're not connected to a storage server and the
         # object is not in the cache.
         self.shutdownServer()
         self._storage = self.openClientStorage('test', 1000, wait=False)
-        self.assertRaises(Disconnected, self._storage.load, 'fredwash', '')
+        self.assertRaises(ClientDisconnected,
+                          self._storage.load, 'fredwash', '')
+
+    def checkDisconnectedAbort(self):
+        self._storage = self.openClientStorage()
+        self._dostore()
+        oids = [self._storage.newObjectId() for i in range(5)]
+        txn = Transaction()
+        self._storage.tpcBegin(txn)
+        for oid in oids:
+            data = zodb_pickle(MinPO(oid))
+            self._storage.store(oid, None, data, '', txn)
+        self.shutdownServer()
+        self.assertRaises(ClientDisconnected, self._storage.tpcVote, txn)
+        self._storage.tpcAbort(txn)
+        self.startServer(create=0)
+        self._storage._wait()
+        self._dostore()
 
     def checkBasicPersistence(self):
         # Verify cached data persists across client storage instances.
@@ -335,18 +355,14 @@
             try:
                 self._dostore(oid, data=obj)
                 break
-            except (Disconnected, select.error,
-                    threading.ThreadError, socket.error):
+            except ClientDisconnected:
                 self.logger.warn("checkReconnection: "
                                  "Error after server restart; retrying.",
                                  exc_info=True)
                 get_transaction().abort()
-                time.sleep(0.1) # XXX how long to sleep
-            # XXX This is a bloody pain.  We're placing a heavy burden
-            # on users to catch a plethora of exceptions in order to
-            # write robust code.  Need to think about implementing
-            # John Heintz's suggestion to make sure all exceptions
-            # inherit from POSException.
+                self._storage.sync()
+        else:
+            self.fail("Could not reconnect to server")
         self.logger.warn("checkReconnection: finished")
 
     def checkBadMessage1(self):
@@ -377,7 +393,7 @@
 
         try:
             self._dostore()
-        except Disconnected:
+        except ClientDisconnected:
             pass
         else:
             self._storage.close()
@@ -427,6 +443,7 @@
 class ReconnectionTests(CommonSetupTearDown):
     keep = True
     forker_admin_retries = 20
+    invq = 2
 
     def checkReadOnlyStorage(self):
         # Open a read-only client to a read-only *storage*; stores fail
@@ -493,7 +510,7 @@
         # Poll until the client disconnects
         self.pollDown()
         # Stores should fail now
-        self.assertRaises(Disconnected, self._dostore)
+        self.assertRaises(ClientDisconnected, self._dostore)
 
         # Restart the server
         self.startServer(create=False, read_only=True)
@@ -522,7 +539,7 @@
         # Poll until the client disconnects
         self.pollDown()
         # Stores should fail now
-        self.assertRaises(Disconnected, self._dostore)
+        self.assertRaises(ClientDisconnected, self._dostore)
 
         # Restart the server, this time read-write
         self.startServer(create=False)
@@ -554,12 +571,142 @@
             try:
                 self._dostore()
                 break
-            except (Disconnected, ReadOnlyError,
-                    select.error, threading.ThreadError, socket.error):
+            except (ClientDisconnected, ReadOnlyError):
                 time.sleep(0.1)
+                self._storage.sync()
         else:
             self.fail("Couldn't store after starting a read-write server")
 
+    def checkNoVerificationOnServerRestart(self):
+        self._storage = self.openClientStorage()
+        # When we create a new storage, it should always do a full
+        # verification
+        self.assertEqual(self._storage.verify_result, "full verification")
+        self._dostore()
+        self.shutdownServer()
+        self.pollDown()
+        self._storage.verify_result = None
+        self.startServer(create=0)
+        self.pollUp()
+        # There were no transactions committed, so no verification
+        # should be needed.
+        self.assertEqual(self._storage.verify_result, "no verification")
+        
+    def checkNoVerificationOnServerRestartWith2Clients(self):
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "full verification")
+        
+        self._storage = self.openClientStorage()
+        oid = self._storage.newObjectId()
+        # When we create a new storage, it should always do a full
+        # verification
+        self.assertEqual(self._storage.verify_result, "full verification")
+        # do two storages of the object to make sure an invalidation
+        # message is generated
+        revid = self._dostore(oid)
+        self._dostore(oid, revid)
+
+        perstorage.load(oid, '')
+
+        self.shutdownServer()
+
+        self.pollDown()
+        self._storage.verify_result = None
+        perstorage.verify_result = None
+        self.startServer(create=0)
+        self.pollUp()
+        self.pollUp(storage=perstorage)
+        # There were no transactions committed, so no verification
+        # should be needed.
+        self.assertEqual(self._storage.verify_result, "no verification")
+        self.assertEqual(perstorage.verify_result, "no verification")
+        perstorage.close()
+
+    def checkQuickVerificationWith2Clients(self):
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "full verification")
+        
+        self._storage = self.openClientStorage()
+        oid = self._storage.newObjectId()
+        # When we create a new storage, it should always do a full
+        # verification
+        self.assertEqual(self._storage.verify_result, "full verification")
+        # do two storages of the object to make sure an invalidation
+        # message is generated
+        revid = self._dostore(oid)
+        revid = self._dostore(oid, revid)
+
+        perstorage.load(oid, '')
+        perstorage.close()
+        
+        revid = self._dostore(oid, revid)
+
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "quick verification")
+
+        self.assertEqual(perstorage.load(oid, ''),
+                         self._storage.load(oid, ''))
+
+
+
+    def checkVerificationWith2ClientsInvqOverflow(self):
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "full verification")
+        
+        self._storage = self.openClientStorage()
+        oid = self._storage.newObjectId()
+        # When we create a new storage, it should always do a full
+        # verification
+        self.assertEqual(self._storage.verify_result, "full verification")
+        # do two storages of the object to make sure an invalidation
+        # message is generated
+        revid = self._dostore(oid)
+        revid = self._dostore(oid, revid)
+
+        perstorage.load(oid, '')
+        perstorage.close()
+
+        # the test code sets invq bound to 2
+        for i in range(5):
+            revid = self._dostore(oid, revid)
+
+        perstorage = self.openClientStorage(cache="test")
+        self.assertEqual(perstorage.verify_result, "full verification")
+        t = time.time() + 30
+        while not perstorage.end_verify.isSet():
+            perstorage.sync()
+            if time.time() > t:
+                self.fail("timed out waiting for endVerify")
+
+        self.assertEqual(self._storage.load(oid, '')[1], revid)
+        self.assertEqual(perstorage.load(oid, ''),
+                         self._storage.load(oid, ''))
+
+        perstorage.close()
+
+class TimeoutTests(CommonSetupTearDown):
+    timeout = 1
+
+    def checkTimeout(self):
+        storage = self.openClientStorage()
+        txn = Transaction()
+        storage.tpcBegin(txn)
+        storage.tpcVote(txn)
+        time.sleep(2)
+        self.assertRaises(ClientDisconnected, storage.tpcFinish, txn)
+
+    def checkTimeoutOnAbort(self):
+        storage = self.openClientStorage()
+        txn = Transaction()
+        storage.tpcBegin(txn)
+        storage.tpcVote(txn)
+        storage.tpcAbort(txn)
+
+    def checkTimeoutOnAbortNoLock(self):
+        storage = self.openClientStorage()
+        txn = Transaction()
+        storage.tpcBegin(txn)
+        storage.tpcAbort(txn)
 
 class MSTThread(threading.Thread):
 


=== Zope3/src/zodb/zeo/tests/commitlock.py 1.3 => 1.3.4.1 ===
--- Zope3/src/zodb/zeo/tests/commitlock.py:1.3	Wed Feb  5 18:28:22 2003
+++ Zope3/src/zodb/zeo/tests/commitlock.py	Wed Mar 12 16:41:21 2003
@@ -21,44 +21,18 @@
 from zodb.storage.tests.base import zodb_pickle, MinPO
 
 from zodb.zeo.client import ClientStorage
-from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.interfaces import ClientDisconnected
+from zodb.zeo.tests.thread import TestThread
+from zodb.zeo.tests.common import DummyDB
 
 ZERO = '\0'*8
 
-class DummyDB:
-    def invalidate(self, *args):
-        pass
-
-class TestThread(threading.Thread):
-    __super_init = threading.Thread.__init__
-    __super_run = threading.Thread.run
-
-    def __init__(self, testcase, group=None, target=None, name=None,
-                 args=(), kwargs={}, verbose=None):
-        self.__super_init(group, target, name, args, kwargs, verbose)
-        self.setDaemon(1)
-        self._testcase = testcase
-
-    def run(self):
-        try:
-            self.testrun()
-        except Exception:
-            s = StringIO()
-            traceback.print_exc(file=s)
-            self._testcase.fail("Exception in thread %s:\n%s\n" %
-                                (self, s.getvalue()))
-
-    def cleanup(self, timeout=15):
-        self.join(timeout)
-        if self.isAlive():
-            self._testcase.fail("Thread did not finish: %s" % self)
-
 class WorkerThread(TestThread):
 
     # run the entire test in a thread so that the blocking call for
     # tpc_vote() doesn't hang the test suite.
 
-    def __init__(self, testcase, storage, trans, method="tpc_finish"):
+    def __init__(self, testcase, storage, trans, method="tpcFinish"):
         self.storage = storage
         self.trans = trans
         self.method = method
@@ -74,53 +48,45 @@
             oid = self.storage.newObjectId()
             p = zodb_pickle(MinPO("c"))
             self.storage.store(oid, ZERO, p, '', self.trans)
-            self.ready.set()
-            self.storage.tpcVote(self.trans)
-            if self.method == "tpc_finish":
+            self.myvote()
+            if self.method == "tpcFinish":
                 self.storage.tpcFinish(self.trans)
             else:
                 self.storage.tpcAbort(self.trans)
-        except Disconnected:
+        except ClientDisconnected:
             pass
 
-class CommitLockTests:
-
-    # The commit lock tests verify that the storage successfully
-    # blocks and restarts transactions when there is content for a
-    # single storage.  There are a lot of cases to cover.
-
-    # CommitLock1 checks the case where a single transaction delays
-    # other transactions before they actually block.  IOW, by the time
-    # the other transactions get to the vote stage, the first
-    # transaction has finished.
+    def myvote(self):
+        # The vote() call is synchronous, which makes it difficult to
+        # coordinate the action of multiple threads that all call
+        # vote().  This method sends the vote call, then sets the
+        # event saying vote was called, then waits for the vote
+        # response.  It digs deep into the implementation of the client.
+
+        # This method is a replacement for:
+        #     self.ready.set()
+        #     self.storage.tpc_vote(self.trans)
+
+        rpc = self.storage._server.rpc
+        msgid = rpc._deferred_call("tpcVote", self.storage._serial)
+        self.ready.set()
+        rpc._deferred_wait(msgid)
+        self.storage._check_serials()
 
-    def checkCommitLock1OnCommit(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
-        finally:
-            self._cleanup()
+class CommitLockTests:
 
-    def checkCommitLock1OnAbort(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
-        finally:
-            self._cleanup()
+    NUM_CLIENTS = 5
 
-    def checkCommitLock2OnCommit(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
-        finally:
-            self._cleanup()
-
-    def checkCommitLock2OnAbort(self):
-        self._storages = []
-        try:
-            self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
-        finally:
-            self._cleanup()
+    # The commit lock tests verify that the storage successfully
+    # blocks and restarts transactions when there is contention for a
+    # single storage.  There are a lot of cases to cover.  transaction
+    # has finished.
+
+    # The general flow of these tests is to start a transaction by
+    # getting far enough into 2PC to acquire the commit lock.  Then
+    # begin one or more other connections that also want to commit.
+    # This causes the commit lock code to be exercised.  Once the
+    # other connections are started, the first transaction completes.
 
     def _cleanup(self):
         for store, trans in self._storages:
@@ -128,77 +94,74 @@
             store.close()
         self._storages = []
 
-    def _checkCommitLock(self, method_name, dosetup, dowork):
-        # check the commit lock when a client attemps a transaction,
-        # but fails/exits before finishing the commit.
-
-        # The general flow of these tests is to start a transaction by
-        # calling tpc_begin().  Then begin one or more other
-        # connections that also want to commit.  This causes the
-        # commit lock code to be exercised.  Once the other
-        # connections are started, the first transaction completes.
-        # Either by commit or abort, depending on whether method_name
-        # is "tpc_finish."
-
-        # The tests are parameterized by method_name, dosetup(), and
-        # dowork().  The dosetup() function is called with a
-        # connectioned client storage, transaction, and timestamp.
-        # Any work it does occurs after the first transaction has
-        # started, but before it finishes.  The dowork() function
-        # executes after the first transaction has completed.
-
-        # Start on transaction normally and get the lock.
-        t = Transaction()
-        self._storage.tpcBegin(t)
+    def _start_txn(self):
+        txn = Transaction()
+        self._storage.tpcBegin(txn)
         oid = self._storage.newObjectId()
-        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
-        self._storage.tpcVote(t)
+        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
+        return oid, txn
 
-        # Start a second transaction on a different connection without
-        # blocking the test thread.
-        self._storages = []
-        for i in range(4):
-            storage2 = self._duplicate_client()
-            t2 = Transaction()
-            tid = self._get_timestamp()
-            dosetup(storage2, t2, tid)
-            if i == 0:
-                storage2.close()
-            else:
-                self._storages.append((storage2, t2))
+    def checkCommitLockVoteFinish(self):
+        oid, txn = self._start_txn()
+        self._storage.tpcVote(txn)
+
+        self._begin_threads()
 
-        if method_name == "tpc_finish":
-            self._storage.tpcFinish(t)
-            self._storage.load(oid, '')
-        else:
-            self._storage.tpcAbort(t)
+        self._storage.tpcFinish(txn)
+        self._storage.load(oid, '')
 
-        dowork(method_name)
+        self._finish_threads()
 
-        # Make sure the server is still responsive
         self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockVoteAbort(self):
+        oid, txn = self._start_txn()
+        self._storage.tpcVote(txn)
 
-    def _dosetup1(self, storage, trans, tid):
-        storage.tpcBegin(trans, tid)
+        self._begin_threads()
 
-    def _dowork1(self, method_name):
-        for store, trans in self._storages:
-            oid = store.newObjectId()
-            store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
-            store.tpcVote(trans)
-            if method_name == "tpc_finish":
-                store.tpcFinish(trans)
-            else:
-                store.tpcAbort(trans)
+        self._storage.tpcAbort(txn)
+
+        self._finish_threads()
+
+        self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockVoteClose(self):
+        oid, txn = self._start_txn()
+        self._storage.tpcVote(txn)
+
+        self._begin_threads()
+
+        self._storage.close()
 
-    def _dosetup2(self, storage, trans, tid):
+        self._finish_threads()
+        self._cleanup()
+
+    def _begin_threads(self):
+        # Start a second transaction on a different connection without
+        # blocking the test thread.
+        self._storages = []
         self._threads = []
-        t = WorkerThread(self, storage, trans)
-        self._threads.append(t)
-        t.start()
-        t.ready.wait()
+        
+        for i in range(self.NUM_CLIENTS):
+            storage = self._duplicate_client()
+            txn = Transaction()
+            tid = self._get_timestamp()
+            
+            t = WorkerThread(self, storage, txn)
+            self._threads.append(t)
+            t.start()
+            t.ready.wait()
+        
+            # Close on the connections abnormally to test server response
+            if i == 0:
+                storage.close()
+            else:
+                self._storages.append((storage, txn))
 
-    def _dowork2(self, method_name):
+    def _finish_threads(self):
         for t in self._threads:
             t.cleanup()
 
@@ -215,5 +178,67 @@
 
     def _get_timestamp(self):
         t = time.time()
-        ts = TimeStamp(*(time.gmtime(t)[:5] + (t % 60,)))
-        return ts.raw()
+        t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
+        return `t`
+
+class CommitLockUndoTests(CommitLockTests):
+    
+    def _get_trans_id(self):
+        self._dostore()
+        L = self._storage.undoInfo()
+        return L[0]['id']
+
+    def _begin_undo(self, trans_id):
+        rpc = self._storage._server.rpc
+        return rpc._deferred_call("undo", trans_id, self._storage._serial)
+
+    def _finish_undo(self, msgid):
+        return self._storage._server.rpc._deferred_wait(msgid)
+        
+    def checkCommitLockUndoFinish(self):
+        trans_id = self._get_trans_id()
+        oid, txn = self._start_txn()
+        msgid = self._begin_undo(trans_id)
+
+        self._begin_threads()
+
+        self._finish_undo(msgid)
+        self._storage.tpcVote(txn)
+        self._storage.tpcFinish(txn)
+        self._storage.load(oid, '')
+
+        self._finish_threads()
+
+        self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockUndoAbort(self):
+        trans_id = self._get_trans_id()
+        oid, txn = self._start_txn()
+        msgid = self._begin_undo(trans_id)
+
+        self._begin_threads()
+
+        self._finish_undo(msgid)
+        self._storage.tpcVote(txn)
+        self._storage.tpcAbort(txn)
+
+        self._finish_threads()
+
+        self._dostore()
+        self._cleanup()
+        
+    def checkCommitLockUndoClose(self):
+        trans_id = self._get_trans_id()
+        oid, txn = self._start_txn()
+        msgid = self._begin_undo(trans_id)
+
+        self._begin_threads()
+
+        self._finish_undo(msgid)
+        self._storage.tpcVote(txn)
+        self._storage.close()
+
+        self._finish_threads()
+
+        self._cleanup()