[Zodb-checkins] CVS: ZEO/ZEO/tests - Cache.py:1.1.2.3 forker.py:1.1.2.5 multi.py:1.1.2.6 speed.py:1.1.2.3 testZEO.py:1.1.2.13

Jeremy Hylton jeremy@zope.com
Thu, 20 Dec 2001 13:21:09 -0500


Update of /cvs-repository/ZEO/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv27019/ZEO/tests

Modified Files:
      Tag: ZEO-ZRPC-Dev
	Cache.py forker.py multi.py speed.py testZEO.py 
Log Message:
Merge tests from the ZEO head.

XXX  Three of these  tests are  known to  fail.  Apparently the client
does not properly reconnect to the server.




=== ZEO/ZEO/tests/Cache.py 1.1.2.2 => 1.1.2.3 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL).  A copy of the ZPL should accompany this
+# distribution.  THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
+# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
+# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
+
 """Tests of the ZEO cache"""
 
 from ZODB.Transaction import Transaction
@@ -13,7 +22,17 @@
         revid = self._dostore(oid, revid=revid, data=MinPO(25))
 
         info = self._storage.undoInfo()
+        if not info:
+            # XXX perhaps we have an old storage implementation that
+            # does do the negative nonsense
+            info = self._storage.undoInfo(0, 20)
         tid = info[0]['id']
+
+        # We may need to bail at this point if the storage doesn't
+        # support transactional undo
+        if not self._storage.supportsTransactionalUndo():
+            return
+
         # Now start an undo transaction
         self._transaction.note('undo1')
         self._storage.tpc_begin(self._transaction)
@@ -50,7 +69,7 @@
         obj = zodb_unpickle(data)
         assert obj == MinPO(2), obj
 
-    def checkCommitVersionInvalidation(self):
+    def checkCommitEmptyVersionInvalidation(self):
         oid = self._storage.new_oid()
         revid = self._dostore(oid, data=MinPO(1))
         revid = self._dostore(oid, revid=revid, data=MinPO(2))
@@ -62,5 +81,20 @@
         self._storage.tpc_vote(t)
         self._storage.tpc_finish(t)
         data, revid = self._storage.load(oid, "")
+        obj = zodb_unpickle(data)
+        assert obj == MinPO(3), obj
+
+    def checkCommitVersionInvalidation(self):
+        oid = self._storage.new_oid()
+        revid = self._dostore(oid, data=MinPO(1))
+        revid = self._dostore(oid, revid=revid, data=MinPO(2))
+        revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
+        t = Transaction()
+        self._storage.tpc_begin(t)
+        self._storage.commitVersion("foo", "bar", t)
+        self._storage.load(oid, "")
+        self._storage.tpc_vote(t)
+        self._storage.tpc_finish(t)
+        data, revid = self._storage.load(oid, "bar")
         obj = zodb_unpickle(data)
         assert obj == MinPO(3), obj


=== ZEO/ZEO/tests/forker.py 1.1.2.4 => 1.1.2.5 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL).  A copy of the ZPL should accompany this
+# distribution.  THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
+# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
+# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
+
 """Library for forking storage server and connecting client storage"""
 
 import asyncore
-import atexit
 import os
 import profile
+import random
+import socket
 import sys
-import time
 import types
-import ThreadedAsync
 import ZEO.ClientStorage, ZEO.StorageServer
 
 PROFILE = 0
 
-class ZEOServerExit(asyncore.file_dispatcher):
-    """Used to exit ZEO.StorageServer when run is done"""
+def get_port():
+    """Return a port that is not in use.
 
-    def writable(self):
-        return 0
+    Checks if a port is in use by trying to connect to it.  Assumes it
+    is not in use if connect raises an exception.
 
-    def readable(self):
-        return 1
+    Raises RuntimeError after 10 tries.
+    """
+    for i in range(10):
+        port = random.randrange(20000, 30000)
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        try:
+            try:
+                s.connect(('localhost', port))
+            except socket.error:
+                # XXX check value of error?
+                return port
+        finally:
+            s.close()
+    raise RuntimeError, "Can't find port"
+
+if os.name == "nt":
+
+    def start_zeo_server(storage_name, args, port=None):
+        """Start a ZEO server in a separate process.
+
+        Returns the ZEO port, the test server port, and the pid.
+        """
+        import ZEO.tests.winserver
+        if port is None:
+            port = get_port()
+        script = ZEO.tests.winserver.__file__
+        if script.endswith('.pyc'):
+            script = script[:-1]
+        args = (sys.executable, script, str(port), storage_name) + args
+        d = os.environ.copy()
+        d['PYTHONPATH'] = os.pathsep.join(sys.path)
+        pid = os.spawnve(os.P_NOWAIT, sys.executable, args, os.environ)
+        return ('localhost', port), ('localhost', port + 1), pid
+
+else:
+
+    class ZEOServerExit(asyncore.file_dispatcher):
+        """Used to exit ZEO.StorageServer when run is done"""
+
+        def writable(self):
+            return 0
+
+        def readable(self):
+            return 1
+
+        def handle_read(self):
+            buf = self.recv(4)
+            if buf:
+                assert buf == "done"
+                asyncore.socket_map.clear()
 
-    def handle_read(self):
-        buf = self.recv(4)
-        if buf:
-            assert buf == "done"
+        def handle_close(self):
             asyncore.socket_map.clear()
-        
-    def handle_close(self):
-        asyncore.socket_map.clear()
-
-class ZEOClientExit:
-    """Used by client to cause server to exit"""
-    def __init__(self, pipe):
-        self.pipe = pipe
-
-    def close(self):
-        os.write(self.pipe, "done")
-
-def start_zeo_server(storage, addr):
-    rd, wr = os.pipe()
-    pid = os.fork()
-    if pid == 0:
-        if PROFILE:
-            p = profile.Profile()
-            p.runctx("run_server(storage, addr, rd, wr)", globals(),
-                     locals())
-            p.dump_stats("stats.s.%d" % os.getpid())
-        else:
-            run_server(storage, addr, rd, wr)
-        os._exit(0)
-    else:
-        os.close(rd)
-        return pid, ZEOClientExit(wr)
-
-def run_server(storage, addr, rd, wr):
-    # in the child, run the storage server
-    os.close(wr)
-    ZEOServerExit(rd)
-    serv = ZEO.StorageServer.StorageServer(addr, {'1':storage})
-    asyncore.loop()
-    storage.close()
-    if isinstance(addr, types.StringType):
-        os.unlink(addr)
-
-def start_zeo(storage, cache=None, cleanup=None, domain="AF_INET",
-              storage_id="1"):
-    """Setup ZEO client-server for storage.
 
-    Returns a ClientStorage instance and a ZEOClientExit instance.
-
-    XXX Don't know if os.pipe() will work on Windows.
-    """
+    class ZEOClientExit:
+        """Used by client to cause server to exit"""
+        def __init__(self, pipe):
+            self.pipe = pipe
+
+        def close(self):
+            os.write(self.pipe, "done")
+            os.close(self.pipe)
+
+    def start_zeo_server(storage, addr):
+        rd, wr = os.pipe()
+        pid = os.fork()
+        if pid == 0:
+            if PROFILE:
+                p = profile.Profile()
+                p.runctx("run_server(storage, addr, rd, wr)", globals(),
+                         locals())
+                p.dump_stats("stats.s.%d" % os.getpid())
+            else:
+                run_server(storage, addr, rd, wr)
+            os._exit(0)
+        else:
+            os.close(rd)
+            return pid, ZEOClientExit(wr)
 
-    if domain == "AF_INET":
-        import random
-        addr = '', random.randrange(2000, 3000)
-    elif domain == "AF_UNIX":
-        import tempfile
-        addr = tempfile.mktemp()
-    else:
-        raise ValueError, "bad domain: %s" % domain
+    def run_server(storage, addr, rd, wr):
+        # in the child, run the storage server
+        os.close(wr)
+        ZEOServerExit(rd)
+        serv = ZEO.StorageServer.StorageServer(addr, {'1':storage})
+        asyncore.loop()
+        os.close(rd)
+        storage.close()
+        if isinstance(addr, types.StringType):
+            os.unlink(addr)
+
+    def start_zeo(storage, cache=None, cleanup=None, domain="AF_INET",
+                  storage_id="1", cache_size=20000000):
+        """Setup ZEO client-server for storage.
+
+        Returns a ClientStorage instance and a ZEOClientExit instance.
+
+        XXX Don't know if os.pipe() will work on Windows.
+        """
+
+        if domain == "AF_INET":
+            addr = '', get_port()
+        elif domain == "AF_UNIX":
+            import tempfile
+            addr = tempfile.mktemp()
+        else:
+            raise ValueError, "bad domain: %s" % domain
 
-    pid, exit = start_zeo_server(storage, addr)
-    try:
+        pid, exit = start_zeo_server(storage, addr)
         s = ZEO.ClientStorage.ClientStorage(addr, storage_id,
-                                            debug=1, client=cache)
-        if hasattr(s, 'is_connected'):
-            while not s.is_connected():
-                time.sleep(0.1)
-    except Exception, err:
-        raise RuntimeError, (err, exit, pid)
-    return s, exit, pid
+                                            debug=1, client=cache,
+                                            cache_size=cache_size,
+                                            min_disconnect_poll=0.5)
+        return s, exit, pid
 


=== ZEO/ZEO/tests/multi.py 1.1.2.5 => 1.1.2.6 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL).  A copy of the ZPL should accompany this
+# distribution.  THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
+# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
+# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
+
 """A multi-client test of the ZEO storage server"""
 
 import ZODB, ZODB.DB, ZODB.FileStorage, ZODB.POSException
@@ -11,8 +20,6 @@
 import time
 import types
 
-PROFILE = 0
-
 VERBOSE = 1
 CLIENTS = 4
 RECORDS_PER_CLIENT = 100
@@ -118,7 +125,9 @@
     t0 = time.time()
     server_pid, server = start_server(addr)
     t1 = time.time()
-    pids = [start_client(addr, client_func) for i in range(CLIENTS)]
+    pids = []
+    for i in range(CLIENTS):
+        pids.append(start_client(addr, client_func))
     for pid in pids:
         assert type(pid) == types.IntType, "invalid pid type: %s (%s)" % \
                (repr(pid), type(pid))


=== ZEO/ZEO/tests/speed.py 1.1.2.2 => 1.1.2.3 ===
-# 
-# Zope Public License (ZPL) Version 1.0
-# -------------------------------------
-# 
-# Copyright (c) Digital Creations.  All rights reserved.
-# 
-# This license has been certified as Open Source(tm).
-# 
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-# 
-# 1. Redistributions in source code must retain the above copyright
-#    notice, this list of conditions, and the following disclaimer.
-# 
-# 2. Redistributions in binary form must reproduce the above copyright
-#    notice, this list of conditions, and the following disclaimer in
-#    the documentation and/or other materials provided with the
-#    distribution.
-# 
-# 3. Digital Creations requests that attribution be given to Zope
-#    in any manner possible. Zope includes a "Powered by Zope"
-#    button that is installed by default. While it is not a license
-#    violation to remove this button, it is requested that the
-#    attribution remain. A significant investment has been put
-#    into Zope, and this effort will continue if the Zope community
-#    continues to grow. This is one way to assure that growth.
-# 
-# 4. All advertising materials and documentation mentioning
-#    features derived from or use of this software must display
-#    the following acknowledgement:
-# 
-#      "This product includes software developed by Digital Creations
-#      for use in the Z Object Publishing Environment
-#      (http://www.zope.org/)."
-# 
-#    In the event that the product being advertised includes an
-#    intact Zope distribution (with copyright and license included)
-#    then this clause is waived.
-# 
-# 5. Names associated with Zope or Digital Creations must not be used to
-#    endorse or promote products derived from this software without
-#    prior written permission from Digital Creations.
-# 
-# 6. Modified redistributions of any form whatsoever must retain
-#    the following acknowledgment:
-# 
-#      "This product includes software developed by Digital Creations
-#      for use in the Z Object Publishing Environment
-#      (http://www.zope.org/)."
-# 
-#    Intact (re-)distributions of any official Zope release do not
-#    require an external acknowledgement.
-# 
-# 7. Modifications are encouraged but must be packaged separately as
-#    patches to official Zope releases.  Distributions that do not
-#    clearly separate the patches from the original work must be clearly
-#    labeled as unofficial distributions.  Modifications which do not
-#    carry the name Zope may be packaged in any form, as long as they
-#    conform to all of the clauses above.
-# 
-# 
-# Disclaimer
-# 
-#   THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
-#   EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-#   IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-#   PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
-#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-#   USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
-#   ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-#   OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
-#   OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-#   SUCH DAMAGE.
-# 
-# 
-# This software consists of contributions made by Digital Creations and
-# many individuals on behalf of Digital Creations.  Specific
-# attributions are listed in the accompanying credits file.
-# 
-##############################################################################
+# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL).  A copy of the ZPL should accompany this
+# distribution.  THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
+# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
+# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
+
 usage="""Test speed of a ZODB storage
 
 Options:
@@ -151,6 +76,7 @@
     for j in range(nrep):
         for r in 1, 10, 100, 1000:
             t = time.time()
+            conflicts = 0
             
             jar = db.open()
             while 1:
@@ -171,7 +97,7 @@
                         setattr(p, str(i), v)
                     get_transaction().commit()
                 except ConflictError:
-                    pass
+                    conflicts = conflicts + 1
                 else:
                     break
             jar.close()
@@ -179,10 +105,11 @@
             t = time.time() - t
             if detailed:
                 if threadno is None:
-                    print "%s\t%s\t%.4f" % (j, r, t)
+                    print "%s\t%s\t%.4f\t%d" % (j, r, t, conflicts)
                 else:
-                    print "%s\t%s\t%.4f\t%d" % (j, r, t, threadno)
-            results[r] = results[r] + t
+                    print "%s\t%s\t%.4f\t%d\t%d" % (j, r, t, conflicts,
+                                                    threadno)
+            results[r].append((t, conflicts))
             rt=d=p=v=None # release all references
             if minimize:
                 time.sleep(3)
@@ -230,8 +157,6 @@
     else:
         fs = ZODB.FileStorage.FileStorage(fs_name, create=1)
         s, server, pid = forker.start_zeo(fs, domain=domain)
-        while not s.is_connected():
-            time.sleep(0.1)
 
     data=open(data).read()
     db=ZODB.DB(s,
@@ -240,13 +165,15 @@
                cache_deactivate_after=6000,)
 
     print "Beginning work..."
-    results={1:0, 10:0, 100:0, 1000:0}
+    results={1:[], 10:[], 100:[], 1000:[]}
     if threads > 1:
         import threading
-        l = [threading.Thread(target=work,
-                                    args=(db, results, nrep, compress, data,
-                                          detailed, minimize, i))
-                   for i in range(threads)]
+        l = []
+        for i in range(threads):
+            t = threading.Thread(target=work,
+                                 args=(db, results, nrep, compress, data,
+                                       detailed, minimize, i))
+            l.append(t)
         for t in l:
             t.start()
         for t in l:
@@ -261,9 +188,19 @@
 
     if detailed:
         print '-'*24
+    print "num\tmean\tmin\tmax"
     for r in 1, 10, 100, 1000:
-        t=results[r]/(nrep * threads)
-        print "mean:\t%s\t%.4f\t%.4f (s/o)" % (r, t, t/r)
+        times = []
+        for time, conf in results[r]:
+            times.append(time)
+        t = mean(times)
+        print "%d\t%.4f\t%.4f\t%.4f" % (r, t, min(times), max(times))
+
+def mean(l):
+    tot = 0
+    for v in l:
+        tot = tot + v
+    return tot / len(l)
     
 ##def compress(s):
 ##    c = zlib.compressobj()


=== ZEO/ZEO/tests/testZEO.py 1.1.2.12 => 1.1.2.13 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL).  A copy of the ZPL should accompany this
+# distribution.  THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
+# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
+# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
+
 """Test suite for ZEO based on ZODB.tests"""
 
 import asyncore
 import os
+import random
+import socket
+import sys
 import tempfile
 import time
 import types
@@ -10,19 +22,34 @@
 import ZEO.ClientStorage, ZEO.StorageServer
 import ThreadedAsync, ZEO.trigger
 from ZODB.FileStorage import FileStorage
+import thread
 
-from ZEO import zeolog
 from ZEO.tests import forker, Cache
+from ZEO.smac import Disconnected
 
 # Sorry Jim...
 from ZODB.tests import StorageTestBase, BasicStorage, VersionStorage, \
      TransactionalUndoStorage, TransactionalUndoVersionStorage, \
-     PackableStorage, Synchronization, ConflictResolution
+     PackableStorage, Synchronization, ConflictResolution, RevisionStorage
 from ZODB.tests.MinPO import MinPO
-
+from ZODB.tests.StorageTestBase import zodb_unpickle
 
 ZERO = '\0'*8
 
+class DummyDB:
+    def invalidate(self, *args):
+        pass
+
+class PackWaitWrapper:
+    def __init__(self, storage):
+        self.storage = storage
+
+    def __getattr__(self, attr):
+        return getattr(self.storage, attr)
+
+    def pack(self, t, f):
+        self.storage.pack(t, f, wait=1)
+
 class ZEOTestBase(StorageTestBase.StorageTestBase):
     """Version of the storage test class that supports ZEO.
     
@@ -31,31 +58,6 @@
     will get no later than the return value from vote.
     """
     
-    __super_setUp = StorageTestBase.StorageTestBase.setUp
-    __super_tearDown = StorageTestBase.StorageTestBase.tearDown
-
-    def setUp(self):
-        """Start a ZEO server using a Unix domain socket
-
-        The ZEO server uses the storage object returned by the
-        getStorage() method.
-        """
-        self.running = 1
-        client, exit, pid = forker.start_zeo(self.getStorage())
-        self._pid = pid
-        self._server = exit
-        self._storage = client
-        while not self._storage.is_connected():
-            time.sleep(0.1)
-        self.__super_setUp()
-
-    def tearDown(self):
-        """Try to cause the tests to halt"""
-        self.running = 0
-        self._server.close()
-        os.waitpid(self._pid, 0)
-        self.__super_tearDown()
-
     def _dostore(self, oid=None, revid=None, data=None, version=None,
                  already_pickled=0):
         """Do a complete storage transaction.
@@ -101,24 +103,37 @@
             raise RuntimeError, "unexpected ZEO response: no oid"
         else:
             for oid, serial in r:
+                if isinstance(serial, Exception):
+                    raise serial
                 d[oid] = serial
         return d
 
-    def checkLargeUpdate(self):
-        obj = MinPO("X" * (10 * 128 * 1024))
-        self._dostore(data=obj)
+# Some of the ZEO tests depend on the version of FileStorage available
+# for the tests.  If we run these tests using Zope 2.3, FileStorage
+# doesn't support TransactionalUndo.
+
+if hasattr(FileStorage, 'supportsTransactionalUndo'):
+    # XXX Assume that a FileStorage that supports transactional undo
+    # also supports conflict resolution.
+    class VersionDependentTests(
+        TransactionalUndoStorage.TransactionalUndoStorage,
+        TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
+        ConflictResolution.ConflictResolvingStorage,
+        ConflictResolution.ConflictResolvingTransUndoStorage):
+        pass
+else:
+    class VersionDependentTests:
+        pass
         
 class GenericTests(ZEOTestBase,
+                   VersionDependentTests,
                    Cache.StorageWithCache,
                    Cache.TransUndoStorageWithCache,
                    BasicStorage.BasicStorage,
                    VersionStorage.VersionStorage,
+                   RevisionStorage.RevisionStorage,
                    PackableStorage.PackableStorage,
                    Synchronization.SynchronizedStorage,
-                   ConflictResolution.ConflictResolvingStorage,
-                   ConflictResolution.ConflictResolvingTransUndoStorage,
-                   TransactionalUndoStorage.TransactionalUndoStorage,
-      TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
                    ):
     """An abstract base class for ZEO tests
 
@@ -128,8 +143,37 @@
     returns a specific storage, e.g. FileStorage.
     """
 
+    __super_setUp = StorageTestBase.StorageTestBase.setUp
+    __super_tearDown = StorageTestBase.StorageTestBase.tearDown
+
+    def setUp(self):
+        """Start a ZEO server using a Unix domain socket
+
+        The ZEO server uses the storage object returned by the
+        getStorage() method.
+        """
+        self.running = 1
+        client, exit, pid = forker.start_zeo(self.getStorage())
+        self._pid = pid
+        self._server = exit
+        self._storage = PackWaitWrapper(client)
+        client.registerDB(DummyDB(), None)
+        self.__super_setUp()
+
+    def tearDown(self):
+        """Try to cause the tests to halt"""
+        self.running = 0
+        self._storage.close()
+        self._server.close()
+        os.waitpid(self._pid, 0)
+        self.delStorage()
+        self.__super_tearDown()
+
+    def checkLargeUpdate(self):
+        obj = MinPO("X" * (10 * 128 * 1024))
+        self._dostore(data=obj)
+
 class ZEOFileStorageTests(GenericTests):
-    """Tests of storage behavior using FileStorage underneath ZEO"""
     __super_setUp = GenericTests.setUp
     
     def setUp(self):
@@ -140,32 +184,269 @@
         return FileStorage(self.__fs_base, create=1)
 
     def delStorage(self):
-        # file storage appears to create three files
+        # file storage appears to create four files
+        for ext in '', '.index', '.lock', '.tmp':
+            path = self.__fs_base + ext
+            try:
+                os.remove(path)
+            except os.error:
+                pass
+
+class WindowsGenericTests(GenericTests):
+    """Subclass to support server creation on Windows.
+
+    On Windows, the getStorage() design won't work because the storage
+    can't be created in the parent process and passed to the child.
+    All the work has to be done in the server's process.
+    """
+    __super_setUp = StorageTestBase.StorageTestBase.setUp
+    __super_tearDown = StorageTestBase.StorageTestBase.tearDown
+
+    def setUp(self):
+        self.__super_setUp()
+        args = self.getStorageInfo()
+        name = args[0]
+        args = args[1:]
+        zeo_addr, self.test_addr, self.test_pid = \
+                  forker.start_zeo_server(name, args)
+        storage = ZEO.ClientStorage.ClientStorage(zeo_addr, debug=1,
+                                                  min_disconnect_poll=0.1)
+        self._storage = PackWaitWrapper(storage)
+        storage.registerDB(DummyDB(), None)
+
+    def tearDown(self):
+        self._storage.close()
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.connect(self.test_addr)
+        s.close()
+        # the connection should cause the storage server to die
+##        os.waitpid(self.test_pid, 0)
+        time.sleep(0.5)
+        self.delStorage()
+        self.__super_tearDown()
+
+class WindowsZEOFileStorageTests(WindowsGenericTests):
+
+    def getStorageInfo(self):
+        self.__fs_base = tempfile.mktemp()
+        return 'FileStorage', self.__fs_base, '1'
+
+    def delStorage(self):
+        # file storage appears to create four files
         for ext in '', '.index', '.lock', '.tmp':
             path = self.__fs_base + ext
-            os.unlink(path)
+            try:
+                os.remove(path)
+            except os.error:
+                pass
 
-class OtherZEOTests(unittest.TestCase):
-    """Tests of ZEO that are basically independent of the storage"""
+class ConnectionTests(ZEOTestBase):
+    """Tests that explicitly manage the server process.
 
-    def checkBadStorageID(self):
-        tmp = tempfile.mktemp()
-        fs = FileStorage(tmp, create=1)
-        try:
-            client, exit, pid = forker.start_zeo(fs, storage_id="gotcha")
-        except RuntimeError, msg:
-            err, exit, pid = msg
-            assert isinstance(err, ValueError)
-        else:
-            assert 0, "Established ZEO connection with invalid storage id"
-        exit.close()
-        os.waitpid(pid, 0)
-        fs.close()
+    To test the cache or re-connection, these test cases explicit
+    start and stop a ZEO storage server.
+    """
+    
+    __super_tearDown = StorageTestBase.StorageTestBase.tearDown
+
+    ports = []
+    for i in range(200):
+        ports.append(random.randrange(25000, 30000))
+    del i
+
+    def openClientStorage(self, cache='', cache_size=200000, wait=1):
+        # defined by subclasses
+        pass
+
+    def shutdownServer(self):
+        # defined by subclasses
+        pass
+
+    def tearDown(self):
+        """Try to cause the tests to halt"""
+        self.shutdownServer()
+        # file storage appears to create four files
         for ext in '', '.index', '.lock', '.tmp':
-            path = tmp + ext
+            path = self.file + ext
             if os.path.exists(path):
                 os.unlink(path)
-        
+        for i in 0, 1:
+            path = "c1-test-%d.zec" % i
+            if os.path.exists(path):
+                os.unlink(path)
+        self.__super_tearDown()
+
+    def checkBasicPersistence(self):
+        """Verify cached data persists across client storage instances.
+
+        To verify that the cache is being used, the test closes the
+        server and then starts a new client with the server down.
+        """
+        self._storage = self.openClientStorage('test', 100000, 1)
+        oid = self._storage.new_oid()
+        obj = MinPO(12)
+        revid1 = self._dostore(oid, data=obj)
+        self._storage.close()
+        self.shutdownServer()
+        self._storage = self.openClientStorage('test', 100000, 0)
+        data, revid2 = self._storage.load(oid, '')
+        assert zodb_unpickle(data) == MinPO(12)
+        assert revid1 == revid2
+        self._storage.close()
+
+    def checkRollover(self):
+        """Check that the cache works when the files are swapped.
+
+        In this case, only one object fits in a cache file.  When the
+        cache files swap, the first object is effectively uncached.
+        """
+        self._storage = self.openClientStorage('test', 1000, 1)
+        oid1 = self._storage.new_oid()
+        obj1 = MinPO("1" * 500)
+        revid1 = self._dostore(oid1, data=obj1)
+        oid2 = self._storage.new_oid()
+        obj2 = MinPO("2" * 500)
+        revid2 = self._dostore(oid2, data=obj2)
+        self._storage.close()
+        self.shutdownServer()
+        self._storage = self.openClientStorage('test', 1000, 0)
+        self._storage.load(oid2, '')
+        self.assertRaises(Disconnected, self._storage.load, oid1, '')
+
+    def checkReconnection(self):
+        """Check that the client reconnects when a server restarts."""
+
+        from ZEO.ClientStorage import ClientDisconnected
+        self._storage = self.openClientStorage()
+        oid = self._storage.new_oid()
+        obj = MinPO(12)
+        revid1 = self._dostore(oid, data=obj)
+        self.shutdownServer()
+        self.running = 1
+        self._startServer(create=0)
+        oid = self._storage.new_oid()
+        obj = MinPO(12)
+        while 1:
+            try:
+                revid1 = self._dostore(oid, data=obj)
+            except (ClientDisconnected, thread.error, socket.error), err:
+                get_transaction().abort()
+                time.sleep(0.1)
+            else:
+                break
+            # XXX This is a bloody pain.  We're placing a heavy burden
+            # on users to catch a plethora of exceptions in order to
+            # write robust code.  Need to think about implementing
+            # John Heintz's suggestion to make sure all exceptions
+            # inherit from POSException. 
+
+class UnixConnectionTests(ConnectionTests):
+    __super_setUp = StorageTestBase.StorageTestBase.setUp
+
+    def setUp(self):
+        """Start a ZEO server using a Unix domain socket
+
+        The ZEO server uses the storage object returned by the
+        getStorage() method.
+        """
+        self.running = 1
+        self.file = tempfile.mktemp()
+        self.addr = '', self.ports.pop()
+        self._startServer()
+        self.__super_setUp()
+
+    def _startServer(self, create=1):
+        fs = FileStorage(self.file, create=create)
+        self._pid, self._server = forker.start_zeo_server(fs, self.addr)
+
+    def openClientStorage(self, cache='', cache_size=200000, wait=1):
+        base = ZEO.ClientStorage.ClientStorage(self.addr,
+                                               client=cache,
+                                               cache_size=cache_size,
+                                               wait_for_server_on_startup=wait)
+        storage = PackWaitWrapper(base)
+        storage.registerDB(DummyDB(), None)
+        return storage
+
+    def shutdownServer(self):
+        if self.running:
+            self.running = 0
+            self._server.close()
+            os.waitpid(self._pid, 0)
+
+class WindowsConnectionTests(ConnectionTests):
+    __super_setUp = StorageTestBase.StorageTestBase.setUp
+
+    def setUp(self):
+        self.file = tempfile.mktemp()
+        self._startServer()
+        self.__super_setUp()
+
+    def _startServer(self, create=1):
+        if create == 0:
+            port = self.addr[1]
+        else:
+            port = None
+        self.addr, self.test_a, pid = forker.start_zeo_server('FileStorage',
+                                                              (self.file,
+                                                               str(create)),
+                                                              port)
+        self.running = 1
+
+    def openClientStorage(self, cache='', cache_size=200000, wait=1):
+        base = ZEO.ClientStorage.ClientStorage(self.addr,
+                                               client=cache,
+                                               cache_size=cache_size,
+                                               debug=1,
+                                               wait_for_server_on_startup=wait)
+        storage = PackWaitWrapper(base)
+        storage.registerDB(DummyDB(), None)
+        return storage
+
+    def shutdownServer(self):
+        if self.running:
+            self.running = 0
+            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            s.connect(self.test_a)
+            s.close()
+            time.sleep(1.0)
+
+    def tearDown(self):
+        self.shutdownServer()
+
+
+def get_methods(klass):
+    l = [klass]
+    meth = {}
+    while l:
+        klass = l.pop(0)
+        for base in klass.__bases__:
+            l.append(base)
+        for k, v in klass.__dict__.items():
+            if callable(v):
+                meth[k] = 1
+    return meth.keys()
+
+if os.name == "posix":
+    test_classes = ZEOFileStorageTests, UnixConnectionTests
+elif os.name == "nt":
+    test_classes = WindowsZEOFileStorageTests, WindowsConnectionTests
+else:
+    raise RuntimeError, "unsupported os: %s" % os.name
+
+def makeTestSuite(testname=''):
+    suite = unittest.TestSuite()
+    name = 'check' + testname
+    lname = len(name)
+    for klass in test_classes:
+        for meth in get_methods(klass):
+            if meth[:lname] == name:
+                suite.addTest(klass(meth))
+    return suite
+
+def test_suite():
+    return makeTestSuite()
+
 def main():
     import sys, getopt
 
@@ -177,11 +458,10 @@
             name_of_test = val
 
     if args:
-        print >> sys.stderr, "Did not expect arguments.  Got %s" % args
+        print "Did not expect arguments.  Got %s" % args
         return 0
     
-    tests = unittest.makeSuite(ZEOFileStorageTests, 'check' + name_of_test)
-    tests.addTest(OtherZEOTests("checkBadStorageID"))
+    tests = makeTestSuite(name_of_test)
     runner = unittest.TextTestRunner()
     runner.run(tests)