[Zodb-checkins] CVS: Zope/lib/python/ZEO/tests - Cache.py:1.9.8.1 CommitLockTests.py:1.9.8.1 ConnectionTests.py:1.5.2.1 TestThread.py:1.3.8.1 ThreadTests.py:1.5.4.1 __init__.py:1.4.8.1 forker.py:1.21.4.1 multi.py:1.10.4.1 speed.py:1.9.8.1 stress.py:1.8.4.1 testClientCache.py:1.6.2.1 testConnection.py:1.2.2.1 testStart.py:1.12.4.1 testTransactionBuffer.py:1.5.8.1 testZEO.py:1.55.2.1 winserver.py:1.11.4.1

Chris McDonough chrism@zope.com
Tue, 8 Oct 2002 20:41:45 -0400


Update of /cvs-repository/Zope/lib/python/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv15249/ZEO/tests

Added Files:
      Tag: chrism-install-branch
	Cache.py CommitLockTests.py ConnectionTests.py TestThread.py 
	ThreadTests.py __init__.py forker.py multi.py speed.py 
	stress.py testClientCache.py testConnection.py testStart.py 
	testTransactionBuffer.py testZEO.py winserver.py 
Log Message:
Committing ZEO to chrism-install-branch.


=== Added File Zope/lib/python/ZEO/tests/Cache.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Tests of the ZEO cache"""

from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle

class TransUndoStorageWithCache:

    def checkUndoInvalidation(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(23))
        revid = self._dostore(oid, revid=revid, data=MinPO(24))
        revid = self._dostore(oid, revid=revid, data=MinPO(25))

        info = self._storage.undoInfo()
        if not info:
            # XXX perhaps we have an old storage implementation that
            # does do the negative nonsense
            info = self._storage.undoInfo(0, 20)
        tid = info[0]['id']

        # We may need to bail at this point if the storage doesn't
        # support transactional undo
        if not self._storage.supportsTransactionalUndo():
            return

        # Now start an undo transaction
        t = Transaction()
        t.note('undo1')
        self._storage.tpc_begin(t)

        oids = self._storage.transactionalUndo(tid, t)

        # Make sure this doesn't load invalid data into the cache
        self._storage.load(oid, '')

        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

        assert len(oids) == 1
        assert oids[0] == oid
        data, revid = self._storage.load(oid, '')
        obj = zodb_unpickle(data)
        assert obj == MinPO(24)

class StorageWithCache:

    def checkAbortVersionInvalidation(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(1))
        revid = self._dostore(oid, revid=revid, data=MinPO(2))
        revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
        revid = self._dostore(oid, revid=revid, data=MinPO(4), version="foo")
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.abortVersion("foo", t)
        self._storage.load(oid, "foo")
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        data, revid = self._storage.load(oid, "foo")
        obj = zodb_unpickle(data)
        assert obj == MinPO(2), obj

    def checkCommitEmptyVersionInvalidation(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(1))
        revid = self._dostore(oid, revid=revid, data=MinPO(2))
        revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.commitVersion("foo", "", t)
        self._storage.load(oid, "")
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        data, revid = self._storage.load(oid, "")
        obj = zodb_unpickle(data)
        assert obj == MinPO(3), obj

    def checkCommitVersionInvalidation(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(1))
        revid = self._dostore(oid, revid=revid, data=MinPO(2))
        revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.commitVersion("foo", "bar", t)
        self._storage.load(oid, "")
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        data, revid = self._storage.load(oid, "bar")
        obj = zodb_unpickle(data)
        assert obj == MinPO(3), obj


=== Added File Zope/lib/python/ZEO/tests/CommitLockTests.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Tests of the distributed commit lock."""

import threading
import time

from ZODB.Transaction import Transaction
from ZODB.TimeStamp import TimeStamp
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO

import ZEO.ClientStorage
from ZEO.Exceptions import Disconnected
from ZEO.tests.TestThread import TestThread

ZERO = '\0'*8

class DummyDB:
    def invalidate(self, *args):
        pass

class WorkerThread(TestThread):

    # run the entire test in a thread so that the blocking call for
    # tpc_vote() doesn't hang the test suite.

    def __init__(self, testcase, storage, trans, method="tpc_finish"):
        self.storage = storage
        self.trans = trans
        self.method = method
        self.ready = threading.Event()
        TestThread.__init__(self, testcase)

    def testrun(self):
        try:
            self.storage.tpc_begin(self.trans)
            oid = self.storage.new_oid()
            p = zodb_pickle(MinPO("c"))
            self.storage.store(oid, ZERO, p, '', self.trans)
            oid = self.storage.new_oid()
            p = zodb_pickle(MinPO("c"))
            self.storage.store(oid, ZERO, p, '', self.trans)
            self.ready.set()
            self.storage.tpc_vote(self.trans)
            if self.method == "tpc_finish":
                self.storage.tpc_finish(self.trans)
            else:
                self.storage.tpc_abort(self.trans)
        except Disconnected:
            pass

class CommitLockTests:

    # The commit lock tests verify that the storage successfully
    # blocks and restarts transactions when there is content for a
    # single storage.  There are a lot of cases to cover.

    # CommitLock1 checks the case where a single transaction delays
    # other transactions before they actually block.  IOW, by the time
    # the other transactions get to the vote stage, the first
    # transaction has finished.

    def checkCommitLock1OnCommit(self):
        self._storages = []
        try:
            self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
        finally:
            self._cleanup()

    def checkCommitLock1OnAbort(self):
        self._storages = []
        try:
            self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
        finally:
            self._cleanup()

    def checkCommitLock2OnCommit(self):
        self._storages = []
        try:
            self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
        finally:
            self._cleanup()

    def checkCommitLock2OnAbort(self):
        self._storages = []
        try:
            self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
        finally:
            self._cleanup()

    def _cleanup(self):
        for store, trans in self._storages:
            store.tpc_abort(trans)
            store.close()
        self._storages = []

    def _checkCommitLock(self, method_name, dosetup, dowork):
        # check the commit lock when a client attemps a transaction,
        # but fails/exits before finishing the commit.

        # The general flow of these tests is to start a transaction by
        # calling tpc_begin().  Then begin one or more other
        # connections that also want to commit.  This causes the
        # commit lock code to be exercised.  Once the other
        # connections are started, the first transaction completes.
        # Either by commit or abort, depending on whether method_name
        # is "tpc_finish."

        # The tests are parameterized by method_name, dosetup(), and
        # dowork().  The dosetup() function is called with a
        # connectioned client storage, transaction, and timestamp.
        # Any work it does occurs after the first transaction has
        # started, but before it finishes.  The dowork() function
        # executes after the first transaction has completed.

        # Start on transaction normally.
        t = Transaction()
        self._storage.tpc_begin(t)

        # Start a second transaction on a different connection without
        # blocking the test thread.
        self._storages = []
        for i in range(4):
            storage2 = self._duplicate_client()
            t2 = Transaction()
            tid = self._get_timestamp()
            dosetup(storage2, t2, tid)
            if i == 0:
                storage2.close()
            else:
                self._storages.append((storage2, t2))

        oid = self._storage.new_oid()
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
        self._storage.tpc_vote(t)
        if method_name == "tpc_finish":
            self._storage.tpc_finish(t)
            self._storage.load(oid, '')
        else:
            self._storage.tpc_abort(t)

        dowork(method_name)

        # Make sure the server is still responsive
        self._dostore()

    def _dosetup1(self, storage, trans, tid):
        storage.tpc_begin(trans, tid)

    def _dowork1(self, method_name):
        for store, trans in self._storages:
            oid = store.new_oid()
            store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
            store.tpc_vote(trans)
            if method_name == "tpc_finish":
                store.tpc_finish(trans)
            else:
                store.tpc_abort(trans)

    def _dosetup2(self, storage, trans, tid):
        self._threads = []
        t = WorkerThread(self, storage, trans)
        self._threads.append(t)
        t.start()
        t.ready.wait()

    def _dowork2(self, method_name):
        for t in self._threads:
            t.cleanup()

    def _duplicate_client(self):
        "Open another ClientStorage to the same server."
        # XXX argh it's hard to find the actual address
        # The rpc mgr addr attribute is a list.  Each element in the
        # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
        # address.
        addr = self._storage._addr
        new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
        new.registerDB(DummyDB(), None)
        return new

    def _get_timestamp(self):
        t = time.time()
        t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
        return `t`


=== Added File Zope/lib/python/ZEO/tests/ConnectionTests.py === (405/505 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import os
import random
import select
import socket
import sys
import tempfile
import thread
import time

import zLOG

from ZEO.ClientStorage import ClientStorage
from ZEO.Exceptions import Disconnected
from ZEO.zrpc.marshal import Marshaller

from ZODB.Transaction import get_transaction
from ZODB.POSException import ReadOnlyError
from ZODB.tests import StorageTestBase
from ZODB.tests.StorageTestBase import zodb_unpickle, MinPO

class DummyDB:
    def invalidate(self, *args):
        pass

class ConnectionTests(StorageTestBase.StorageTestBase):
    """Tests that explicitly manage the server process.

    To test the cache or re-connection, these test cases explicit
    start and stop a ZEO storage server.
    """

    __super_tearDown = StorageTestBase.StorageTestBase.tearDown

    def setUp(self):
        """Start a ZEO server using a Unix domain socket


[-=- -=- -=- 405 lines omitted -=- -=- -=-]

            except (Disconnected, select.error, thread.error, socket.error):
                zLOG.LOG("checkReconnection", zLOG.INFO,
                         "Error after server restart; retrying.",
                         error=sys.exc_info())
                get_transaction().abort()
                time.sleep(0.1) # XXX how long to sleep
            # XXX This is a bloody pain.  We're placing a heavy burden
            # on users to catch a plethora of exceptions in order to
            # write robust code.  Need to think about implementing
            # John Heintz's suggestion to make sure all exceptions
            # inherit from POSException.
        zLOG.LOG("checkReconnection", zLOG.INFO, "finished")

    def checkBadMessage1(self):
        # not even close to a real message
        self._bad_message("salty")

    def checkBadMessage2(self):
        # just like a real message, but with an unpicklable argument
        global Hack
        class Hack:
            pass

        msg = Marshaller().encode(1, 0, "foo", (Hack(),))
        self._bad_message(msg)
        del Hack

    def _bad_message(self, msg):
        # Establish a connection, then send the server an ill-formatted
        # request.  Verify that the connection is closed and that it is
        # possible to establish a new connection.

        self._storage = self.openClientStorage()
        self._dostore()

        # break into the internals to send a bogus message
        zrpc_conn = self._storage._server.rpc
        zrpc_conn.message_output(msg)

        try:
            self._dostore()
        except Disconnected:
            pass
        else:
            self._storage.close()
            self.fail("Server did not disconnect after bogus message")
        self._storage.close()

        self._storage = self.openClientStorage()
        self._dostore()


=== Added File Zope/lib/python/ZEO/tests/TestThread.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A Thread base class for use with unittest."""

from cStringIO import StringIO
import threading
import traceback

class TestThread(threading.Thread):
    __super_init = threading.Thread.__init__
    __super_run = threading.Thread.run

    def __init__(self, testcase, group=None, target=None, name=None,
                 args=(), kwargs={}, verbose=None):
        self.__super_init(group, target, name, args, kwargs, verbose)
        self.setDaemon(1)
        self._testcase = testcase

    def run(self):
        try:
            self.testrun()
        except Exception, err:
            s = StringIO()
            traceback.print_exc(file=s)
            self._testcase.fail("Exception in thread %s:\n%s\n" %
                                (self, s.getvalue()))

    def cleanup(self, timeout=15):
        self.join(timeout)
        if self.isAlive():
            self._testcase.fail("Thread did not finish: %s" % self)


=== Added File Zope/lib/python/ZEO/tests/ThreadTests.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Compromising positions involving threads."""

import threading

from ZODB.Transaction import Transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO

import ZEO.ClientStorage
from ZEO.Exceptions import Disconnected

ZERO = '\0'*8

class BasicThread(threading.Thread):
    def __init__(self, storage, doNextEvent, threadStartedEvent):
        self.storage = storage
        self.trans = Transaction()
        self.doNextEvent = doNextEvent
        self.threadStartedEvent = threadStartedEvent
        self.gotValueError = 0
        self.gotDisconnected = 0
        threading.Thread.__init__(self)
        self.setDaemon(1)

    def join(self):
        threading.Thread.join(self, 10)
        assert not self.isAlive()


class GetsThroughVoteThread(BasicThread):
    # This thread gets partially through a transaction before it turns
    # execution over to another thread.  We're trying to establish that a
    # tpc_finish() after a storage has been closed by another thread will get
    # a ClientStorageError error.
    #
    # This class gets does a tpc_begin(), store(), tpc_vote() and is waiting
    # to do the tpc_finish() when the other thread closes the storage.
    def run(self):
        self.storage.tpc_begin(self.trans)
        oid = self.storage.new_oid()
        self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
        self.storage.tpc_vote(self.trans)
        self.threadStartedEvent.set()
        self.doNextEvent.wait(10)
        try:
            self.storage.tpc_finish(self.trans)
        except ZEO.ClientStorage.ClientStorageError:
            self.gotValueError = 1
            self.storage.tpc_abort(self.trans)


class GetsThroughBeginThread(BasicThread):
    # This class is like the above except that it is intended to be run when
    # another thread is already in a tpc_begin().  Thus, this thread will
    # block in the tpc_begin until another thread closes the storage.  When
    # that happens, this one will get disconnected too.
    def run(self):
        try:
            self.storage.tpc_begin(self.trans)
        except ZEO.ClientStorage.ClientStorageError:
            self.gotValueError = 1


class AbortsAfterBeginFailsThread(BasicThread):
    # This class is identical to GetsThroughBeginThread except that it
    # attempts to tpc_abort() after the tpc_begin() fails.  That will raise a
    # ClientDisconnected exception which implies that we don't have the lock,
    # and that's what we really want to test (but it's difficult given the
    # threading module's API).
    def run(self):
        try:
            self.storage.tpc_begin(self.trans)
        except ZEO.ClientStorage.ClientStorageError:
            self.gotValueError = 1
        try:
            self.storage.tpc_abort(self.trans)
        except Disconnected:
            self.gotDisconnected = 1


class ThreadTests:
    # Thread 1 should start a transaction, but not get all the way through it.
    # Main thread should close the connection.  Thread 1 should then get
    # disconnected.
    def checkDisconnectedOnThread2Close(self):
        doNextEvent = threading.Event()
        threadStartedEvent = threading.Event()
        thread1 = GetsThroughVoteThread(self._storage,
                                        doNextEvent, threadStartedEvent)
        thread1.start()
        threadStartedEvent.wait(10)
        self._storage.close()
        doNextEvent.set()
        thread1.join()
        self.assertEqual(thread1.gotValueError, 1)

    # Thread 1 should start a transaction, but not get all the way through
    # it.  While thread 1 is in the middle of the transaction, a second thread
    # should start a transaction, and it will block in the tcp_begin() --
    # because thread 1 has acquired the lock in its tpc_begin().  Now the main
    # thread closes the storage and both sub-threads should get disconnected.
    def checkSecondBeginFails(self):
        doNextEvent = threading.Event()
        threadStartedEvent = threading.Event()
        thread1 = GetsThroughVoteThread(self._storage,
                                        doNextEvent, threadStartedEvent)
        thread2 = GetsThroughBeginThread(self._storage,
                                         doNextEvent, threadStartedEvent)
        thread1.start()
        threadStartedEvent.wait(1)
        thread2.start()
        self._storage.close()
        doNextEvent.set()
        thread1.join()
        thread2.join()
        self.assertEqual(thread1.gotValueError, 1)
        self.assertEqual(thread2.gotValueError, 1)

    def checkThatFailedBeginDoesNotHaveLock(self):
        doNextEvent = threading.Event()
        threadStartedEvent = threading.Event()
        thread1 = GetsThroughVoteThread(self._storage,
                                        doNextEvent, threadStartedEvent)
        thread2 = AbortsAfterBeginFailsThread(self._storage,
                                              doNextEvent, threadStartedEvent)
        thread1.start()
        threadStartedEvent.wait(1)
        thread2.start()
        self._storage.close()
        doNextEvent.set()
        thread1.join()
        thread2.join()
        self.assertEqual(thread1.gotValueError, 1)
        self.assertEqual(thread2.gotValueError, 1)
        self.assertEqual(thread2.gotDisconnected, 1)

    # Run a bunch of threads doing small and large stores in parallel
    def checkMTStores(self):
        threads = []
        for i in range(5):
            t = threading.Thread(target=self.mtstorehelper)
            threads.append(t)
            t.start()
        for t in threads:
            t.join(30)
        for i in threads:
            self.failUnless(not t.isAlive())

    # Helper for checkMTStores
    def mtstorehelper(self):
        name = threading.currentThread().getName()
        objs = []
        for i in range(10):
            objs.append(MinPO("X" * 200000))
            objs.append(MinPO("X"))
        for obj in objs:
            self._dostore(data=obj)


=== Added File Zope/lib/python/ZEO/tests/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################


=== Added File Zope/lib/python/ZEO/tests/forker.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Library for forking storage server and connecting client storage"""

import asyncore
import os
import random
import socket
import sys
import traceback
import types
import ZEO.ClientStorage

# Change value of PROFILE to enable server-side profiling
PROFILE = 0
if PROFILE:
    import hotshot

def get_port():
    """Return a port that is not in use.

    Checks if a port is in use by trying to connect to it.  Assumes it
    is not in use if connect raises an exception.

    Raises RuntimeError after 10 tries.
    """
    for i in range(10):
        port = random.randrange(20000, 30000)
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            try:
                s.connect(('localhost', port))
            except socket.error:
                # XXX check value of error?
                return port
        finally:
            s.close()
    raise RuntimeError, "Can't find port"

if os.name == "nt":

    def start_zeo_server(storage_name, args, addr=None, ro_svr=0):
        """Start a ZEO server in a separate process.

        Returns the ZEO port, the test server port, and the pid.
        """
        import ZEO.tests.winserver
        if addr is None:
            port = get_port()
        else:
            port = addr[1]
        script = ZEO.tests.winserver.__file__
        if script.endswith('.pyc'):
            script = script[:-1]
        if ro_svr:
            prefix = (sys.executable, script, "-r")
        else:
            prefix = (sys.executable, script)
        args = prefix + (str(port), storage_name) + args
        d = os.environ.copy()
        d['PYTHONPATH'] = os.pathsep.join(sys.path)
        pid = os.spawnve(os.P_NOWAIT, sys.executable, args, d)
        return ('localhost', port), ('localhost', port + 1), pid

else:

    class ZEOServerExit(asyncore.file_dispatcher):
        """Used to exit ZEO.StorageServer when run is done"""

        def writable(self):
            return 0

        def readable(self):
            return 1

        def handle_read(self):
            buf = self.recv(4)
            if buf:
                assert buf == "done"
                server.close_server()
                asyncore.socket_map.clear()

        def handle_close(self):
            server.close_server()
            asyncore.socket_map.clear()

    class ZEOClientExit:
        """Used by client to cause server to exit"""
        def __init__(self, pipe):
            self.pipe = pipe

        def close(self):
            try:
                os.write(self.pipe, "done")
                os.close(self.pipe)
            except os.error:
                pass

    def start_zeo_server(storage_name, args, addr, ro_svr=0):
        assert isinstance(args, types.TupleType)
        rd, wr = os.pipe()
        pid = os.fork()
        if pid == 0:
            asyncore.socket_map.clear() # Don't service the parent's sockets
            import ZEO.zrpc.log
            reload(ZEO.zrpc.log) # Don't share the logging file object
            try:
                if PROFILE:
                    p = hotshot.Profile("stats.s.%d" % os.getpid())
                    p.runctx(
                        "run_server(addr, rd, wr, storage_name, args, ro_svr)",
                        globals(), locals())
                    p.close()
                else:
                    run_server(addr, rd, wr, storage_name, args, ro_svr)
            except:
                print "Exception in ZEO server process"
                traceback.print_exc()
            os._exit(0)
        else:
            os.close(rd)
            return pid, ZEOClientExit(wr)

    def load_storage(name, args):
        package = __import__("ZODB." + name)
        mod = getattr(package, name)
        klass = getattr(mod, name)
        return klass(*args)

    def run_server(addr, rd, wr, storage_name, args, ro_svr):
        # in the child, run the storage server
        global server
        os.close(wr)
        ZEOServerExit(rd)
        import ZEO.StorageServer, ZEO.zrpc.server
        storage = load_storage(storage_name, args)
        server = ZEO.StorageServer.StorageServer(addr, {'1':storage}, ro_svr)
        ZEO.zrpc.server.loop()
        storage.close()
        if isinstance(addr, types.StringType):
            os.unlink(addr)

    def start_zeo(storage_name, args, cache=None, cleanup=None,
                  domain="AF_INET", storage_id="1", cache_size=20000000):
        """Setup ZEO client-server for storage.

        Returns a ClientStorage instance and a ZEOClientExit instance.

        XXX Don't know if os.pipe() will work on Windows.
        """

        if domain == "AF_INET":
            addr = '', get_port()
        elif domain == "AF_UNIX":
            import tempfile
            addr = tempfile.mktemp()
        else:
            raise ValueError, "bad domain: %s" % domain

        pid, exit = start_zeo_server(storage_name, args, addr)
        s = ZEO.ClientStorage.ClientStorage(addr, storage_id,
                                            client=cache,
                                            cache_size=cache_size,
                                            min_disconnect_poll=0.5,
                                            wait=1)
        return s, exit, pid


=== Added File Zope/lib/python/ZEO/tests/multi.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A multi-client test of the ZEO storage server"""
# XXX This code is currently broken.

import ZODB, ZODB.DB, ZODB.FileStorage, ZODB.POSException
import Persistence
import PersistentMapping
from ZEO.tests import forker

import asyncore
import os
import tempfile
import time
import types

VERBOSE = 1
CLIENTS = 4
RECORDS_PER_CLIENT = 100
CONFLICT_DELAY = 0.1
CONNECT_DELAY = 0.1
CLIENT_CACHE = '' # use temporary cache

class Record(Persistence.Persistent):
    def __init__(self, client=None, value=None):
        self.client = client
        self.value = None
        self.next = None

    def set_next(self, next):
        self.next = next

class Stats(Persistence.Persistent):
    def __init__(self):
        self.begin = time.time()
        self.end = None

    def done(self):
        self.end = time.time()

def init_storage():
    path = tempfile.mktemp()
    if VERBOSE:
        print "FileStorage path:", path
    fs = ZODB.FileStorage.FileStorage(path)

    db = ZODB.DB(fs)
    root = db.open().root()
    root["multi"] = PersistentMapping.PersistentMapping()
    get_transaction().commit()

    return fs

def start_server(addr):
    storage = init_storage()
    pid, exit = forker.start_zeo_server(storage, addr)
    return pid, exit

def start_client(addr, client_func=None):
    pid = os.fork()
    if pid == 0:
        try:
            import ZEO.ClientStorage
            if VERBOSE:
                print "Client process started:", os.getpid()
            cli = ZEO.ClientStorage.ClientStorage(addr, client=CLIENT_CACHE)
            if client_func is None:
                run(cli)
            else:
                client_func(cli)
            cli.close()
        finally:
            os._exit(0)
    else:
        return pid

def run(storage):
    if hasattr(storage, 'is_connected'):
        while not storage.is_connected():
            time.sleep(CONNECT_DELAY)
    pid = os.getpid()
    print "Client process connected:", pid, storage
    db = ZODB.DB(storage)
    root = db.open().root()
    while 1:
        try:
            s = root[pid] = Stats()
            get_transaction().commit()
        except ZODB.POSException.ConflictError:
            get_transaction().abort()
            time.sleep(CONFLICT_DELAY)
        else:
            break

    dict = root["multi"]
    prev = None
    i = 0
    while i < RECORDS_PER_CLIENT:
        try:
            size = len(dict)
            r = dict[size] = Record(pid, size)
            if prev:
                prev.set_next(r)
            get_transaction().commit()
        except ZODB.POSException.ConflictError, err:
            get_transaction().abort()
            time.sleep(CONFLICT_DELAY)
        else:
            i = i + 1
            if VERBOSE and (i < 5 or i % 10 == 0):
                print "Client %s: %s of %s" % (pid, i, RECORDS_PER_CLIENT)
    s.done()
    get_transaction().commit()

    print "Client completed:", pid

def main(client_func=None):
    if VERBOSE:
        print "Main process:", os.getpid()
    addr = tempfile.mktemp()
    t0 = time.time()
    server_pid, server = start_server(addr)
    t1 = time.time()
    pids = []
    for i in range(CLIENTS):
        pids.append(start_client(addr, client_func))
    for pid in pids:
        assert type(pid) == types.IntType, "invalid pid type: %s (%s)" % \
               (repr(pid), type(pid))
        try:
            if VERBOSE:
                print "waitpid(%s)" % repr(pid)
            os.waitpid(pid, 0)
        except os.error, err:
            print "waitpid(%s) failed: %s" % (repr(pid), err)
    t2 = time.time()
    server.close()
    os.waitpid(server_pid, 0)

    # XXX Should check that the results are consistent!

    print "Total time:", t2 - t0
    print "Server start time", t1 - t0
    print "Client time:", t2 - t1

if __name__ == "__main__":
    main()


=== Added File Zope/lib/python/ZEO/tests/speed.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
usage="""Test speed of a ZODB storage

Options:

    -d file    The data file to use as input.
               The default is this script.

    -n n       The number of repititions

    -s module  A module that defines a 'Storage'
               attribute, which is an open storage.
               If not specified, a FileStorage will ne
               used.

    -z         Test compressing data

    -D         Run in debug mode

    -L         Test loads as well as stores by minimizing
               the cache after eachrun

    -M         Output means only

    -C         Run with a persistent client cache

    -U         Run ZEO using a Unix domain socket

    -t n       Number of concurrent threads to run.
"""

import asyncore
import sys, os, getopt, string, time
##sys.path.insert(0, os.getcwd())

import ZODB, ZODB.FileStorage
import Persistence
import ZEO.ClientStorage, ZEO.StorageServer
from ZEO.tests import forker
from ZODB.POSException import ConflictError

class P(Persistence.Persistent):
    pass

fs_name = "zeo-speed.fs"

class ZEOExit(asyncore.file_dispatcher):
    """Used to exit ZEO.StorageServer when run is done"""
    def writable(self):
        return 0
    def readable(self):
        return 1
    def handle_read(self):
        buf = self.recv(4)
        assert buf == "done"
        self.delete_fs()
        os._exit(0)
    def handle_close(self):
        print "Parent process exited unexpectedly"
        self.delete_fs()
        os._exit(0)
    def delete_fs(self):
        os.unlink(fs_name)
        os.unlink(fs_name + ".lock")
        os.unlink(fs_name + ".tmp")

def work(db, results, nrep, compress, data, detailed, minimize, threadno=None):
    for j in range(nrep):
        for r in 1, 10, 100, 1000:
            t = time.time()
            conflicts = 0

            jar = db.open()
            while 1:
                try:
                    get_transaction().begin()
                    rt = jar.root()
                    key = 's%s' % r
                    if rt.has_key(key):
                        p = rt[key]
                    else:
                        rt[key] = p =P()
                    for i in range(r):
                        v = getattr(p, str(i), P())
                        if compress is not None:
                            v.d = compress(data)
                        else:
                            v.d = data
                        setattr(p, str(i), v)
                    get_transaction().commit()
                except ConflictError:
                    conflicts = conflicts + 1
                else:
                    break
            jar.close()

            t = time.time() - t
            if detailed:
                if threadno is None:
                    print "%s\t%s\t%.4f\t%d" % (j, r, t, conflicts)
                else:
                    print "%s\t%s\t%.4f\t%d\t%d" % (j, r, t, conflicts,
                                                    threadno)
            results[r].append((t, conflicts))
            rt=d=p=v=None # release all references
            if minimize:
                time.sleep(3)
                jar.cacheMinimize(3)

def main(args):
    opts, args = getopt.getopt(args, 'zd:n:Ds:LMt:U')
    s = None
    compress = None
    data=sys.argv[0]
    nrep=5
    minimize=0
    detailed=1
    cache = None
    domain = 'AF_INET'
    threads = 1
    for o, v in opts:
        if o=='-n': nrep = int(v)
        elif o=='-d': data = v
        elif o=='-s': s = v
        elif o=='-z':
            import zlib
            compress = zlib.compress
        elif o=='-L':
            minimize=1
        elif o=='-M':
            detailed=0
        elif o=='-D':
            global debug
            os.environ['STUPID_LOG_FILE']=''
            os.environ['STUPID_LOG_SEVERITY']='-999'
            debug = 1
        elif o == '-C':
            cache = 'speed'
        elif o == '-U':
            domain = 'AF_UNIX'
        elif o == '-t':
            threads = int(v)

    zeo_pipe = None
    if s:
        s = __import__(s, globals(), globals(), ('__doc__',))
        s = s.Storage
        server = None
    else:
        s, server, pid = forker.start_zeo("FileStorage",
                                          (fs_name, 1), domain=domain)

    data=open(data).read()
    db=ZODB.DB(s,
               # disable cache deactivation
               cache_size=4000,
               cache_deactivate_after=6000,)

    print "Beginning work..."
    results={1:[], 10:[], 100:[], 1000:[]}
    if threads > 1:
        import threading
        l = []
        for i in range(threads):
            t = threading.Thread(target=work,
                                 args=(db, results, nrep, compress, data,
                                       detailed, minimize, i))
            l.append(t)
        for t in l:
            t.start()
        for t in l:
            t.join()

    else:
        work(db, results, nrep, compress, data, detailed, minimize)

    if server is not None:
        server.close()
        os.waitpid(pid, 0)

    if detailed:
        print '-'*24
    print "num\tmean\tmin\tmax"
    for r in 1, 10, 100, 1000:
        times = []
        for time, conf in results[r]:
            times.append(time)
        t = mean(times)
        print "%d\t%.4f\t%.4f\t%.4f" % (r, t, min(times), max(times))

def mean(l):
    tot = 0
    for v in l:
        tot = tot + v
    return tot / len(l)

##def compress(s):
##    c = zlib.compressobj()
##    o = c.compress(s)
##    return o + c.flush()

if __name__=='__main__':
    main(sys.argv[1:])


=== Added File Zope/lib/python/ZEO/tests/stress.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A ZEO client-server stress test to look for leaks.

The stress test should run in an infinite loop and should involve
multiple connections.
"""
# XXX This code is currently broken.

from __future__ import nested_scopes

import ZODB
from ZEO.ClientStorage import ClientStorage
from ZODB.MappingStorage import MappingStorage
from ZEO.tests import forker
from ZODB.tests import MinPO
import zLOG

import os
import random
import sys
import types

NUM_TRANSACTIONS_PER_CONN = 10
NUM_CONNECTIONS = 10
NUM_ROOTS = 20
MAX_DEPTH = 20
MIN_OBJSIZE = 128
MAX_OBJSIZE = 2048

def an_object():
    """Return an object suitable for a PersistentMapping key"""
    size = random.randrange(MIN_OBJSIZE, MAX_OBJSIZE)
    if os.path.exists("/dev/urandom"):
        f = open("/dev/urandom")
        buf = f.read(size)
        f.close()
        return buf
    else:
        f = open(MinPO.__file__)
        l = list(f.read(size))
        f.close()
        random.shuffle(l)
        return "".join(l)

def setup(cn):
    """Initialize the database with some objects"""
    root = cn.root()
    for i in range(NUM_ROOTS):
        prev = an_object()
        for j in range(random.randrange(1, MAX_DEPTH)):
            o = MinPO.MinPO(prev)
            prev = o
        root[an_object()] = o
        get_transaction().commit()
    cn.close()

def work(cn):
    """Do some work with a transaction"""
    cn.sync()
    root = cn.root()
    obj = random.choice(root.values())
    # walk down to the bottom
    while not isinstance(obj.value, types.StringType):
        obj = obj.value
    obj.value = an_object()
    get_transaction().commit()

def main():
    # Yuck!  Need to cleanup forker so that the API is consistent
    # across Unix and Windows, at least if that's possible.
    if os.name == "nt":
        zaddr, tport, pid = forker.start_zeo_server('MappingStorage', ())
        def exitserver():
            import socket
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(tport)
            s.close()
    else:
        zaddr = '', random.randrange(20000, 30000)
        pid, exitobj = forker.start_zeo_server(MappingStorage(), zaddr)
        def exitserver():
            exitobj.close()

    while 1:
        pid = start_child(zaddr)
        print "started", pid
        os.waitpid(pid, 0)

    exitserver()

def start_child(zaddr):

    pid = os.fork()
    if pid != 0:
        return pid
    try:
        _start_child(zaddr)
    finally:
        os._exit(0)

def _start_child(zaddr):
    storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5, wait=1)
    db = ZODB.DB(storage, pool_size=NUM_CONNECTIONS)
    setup(db.open())
    conns = []
    conn_count = 0

    for i in range(NUM_CONNECTIONS):
        c = db.open()
        c.__count = 0
        conns.append(c)
        conn_count += 1

    while conn_count < 25:
        c = random.choice(conns)
        if c.__count > NUM_TRANSACTIONS_PER_CONN:
            conns.remove(c)
            c.close()
            conn_count += 1
            c = db.open()
            c.__count = 0
            conns.append(c)
        else:
            c.__count += 1
        work(c)

if __name__ == "__main__":
    main()


=== Added File Zope/lib/python/ZEO/tests/testClientCache.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test suite for the ZEO.ClientCache module.

At times, we do 'white box' testing, i.e. we know about the internals
of the ClientCache object.
"""
from __future__ import nested_scopes

import os
import time
import tempfile
import unittest

from ZEO.ClientCache import ClientCache

class ClientCacheTests(unittest.TestCase):

    def setUp(self):
        unittest.TestCase.setUp(self)
        self.cachesize = 10*1000*1000
        self.cache = ClientCache(size=self.cachesize)
        self.cache.open()

    def tearDown(self):
        self.cache.close()
        unittest.TestCase.tearDown(self)

    def testOpenClose(self):
        pass # All the work is done by setUp() / tearDown()

    def testStoreLoad(self):
        cache = self.cache
        oid = 'abcdefgh'
        data = '1234'*100
        serial = 'ABCDEFGH'
        cache.store(oid, data, serial, '', '', '')
        loaded = cache.load(oid, '')
        self.assertEqual(loaded, (data, serial))

    def testMissingLoad(self):
        cache = self.cache
        oid = 'abcdefgh'
        data = '1234'*100
        serial = 'ABCDEFGH'
        cache.store(oid, data, serial, '', '', '')
        loaded = cache.load('garbage1', '')
        self.assertEqual(loaded, None)

    def testInvalidate(self):
        cache = self.cache
        oid = 'abcdefgh'
        data = '1234'*100
        serial = 'ABCDEFGH'
        cache.store(oid, data, serial, '', '', '')
        loaded = cache.load(oid, '')
        self.assertEqual(loaded, (data, serial))
        cache.invalidate(oid, '')
        loaded = cache.load(oid, '')
        self.assertEqual(loaded, None)

    def testVersion(self):
        cache = self.cache
        oid = 'abcdefgh'
        data = '1234'*100
        serial = 'ABCDEFGH'
        vname = 'myversion'
        vdata = '5678'*200
        vserial = 'IJKLMNOP'
        cache.store(oid, data, serial, vname, vdata, vserial)
        loaded = cache.load(oid, '')
        self.assertEqual(loaded, (data, serial))
        vloaded = cache.load(oid, vname)
        self.assertEqual(vloaded, (vdata, vserial))

    def testVersionOnly(self):
        cache = self.cache
        oid = 'abcdefgh'
        data = ''
        serial = ''
        vname = 'myversion'
        vdata = '5678'*200
        vserial = 'IJKLMNOP'
        cache.store(oid, data, serial, vname, vdata, vserial)
        loaded = cache.load(oid, '')
        self.assertEqual(loaded, None)
        vloaded = cache.load(oid, vname)
        self.assertEqual(vloaded, (vdata, vserial))

    def testInvalidateNonVersion(self):
        cache = self.cache
        oid = 'abcdefgh'
        data = '1234'*100
        serial = 'ABCDEFGH'
        vname = 'myversion'
        vdata = '5678'*200
        vserial = 'IJKLMNOP'
        cache.store(oid, data, serial, vname, vdata, vserial)
        loaded = cache.load(oid, '')
        self.assertEqual(loaded, (data, serial))
        vloaded = cache.load(oid, vname)
        self.assertEqual(vloaded, (vdata, vserial))
        cache.invalidate(oid, '')
        loaded = cache.load(oid, '')
        self.assertEqual(loaded, None)
        # The version data is also invalidated at this point
        vloaded = cache.load(oid, vname)
        self.assertEqual(vloaded, None)

    def testInvalidateVersion(self):
        # Invalidating a version should not invalidate the non-version data.
        # (This tests for the same bug as testInvalidatePersists below.)
        cache = self.cache
        oid = 'abcdefgh'
        data = '1234'*100
        serial = 'ABCDEFGH'
        cache.store(oid, data, serial, '', '', '')
        loaded = cache.load(oid, '')
        self.assertEqual(loaded, (data, serial))
        cache.invalidate(oid, 'bogus')
        loaded = cache.load(oid, '')
        self.assertEqual(loaded, (data, serial))

    def testVerify(self):
        cache = self.cache
        results = []
        def verifier(oid, serial, vserial):
            results.append((oid, serial, vserial))
        cache.verify(verifier)
        self.assertEqual(results, [])
        oid = 'abcdefgh'
        data = '1234'*100
        serial = 'ABCDEFGH'
        cache.store(oid, data, serial, '', '', '')
        results = []
        cache.verify(verifier)
        self.assertEqual(results, [(oid, serial, None)])

    def testCheckSize(self):
        # Make sure that cache._index[oid] is erased for oids that are
        # stored in the cache file that's rewritten after a flip.
        cache = self.cache
        oid = 'abcdefgh'
        data = '1234'*100
        serial = 'ABCDEFGH'
        cache.store(oid, data, serial, '', '', '')
        cache.checkSize(10*self.cachesize) # Force a file flip
        oid2 = 'abcdefgz'
        data2 = '1234'*10
        serial2 = 'ABCDEFGZ'
        cache.store(oid2, data2, serial2, '', '', '')
        cache.checkSize(10*self.cachesize) # Force another file flip
        self.assertNotEqual(cache._index.get(oid2), None)
        self.assertEqual(cache._index.get(oid), None)

    def testCopyToCurrent(self):
        # - write some objects to cache file 0
        # - force a flip
        # - write some objects to cache file 1
        # - load some objects that are in cache file 0
        # - load the same objects, making sure they are now in file 1
        # - write some more objects
        # - force another flip
        # - load the same objects again
        # - make sure they are now in file 0 again

        cache = self.cache

        # Create some objects
        oid1 = 'abcdefgh'
        data1 = '1234' * 100
        serial1 = 'ABCDEFGH'
        oid2 = 'bcdefghi'
        data2 = '2345' * 200
        serial2 = 'BCDEFGHI'
        version2 = 'myversion'
        nonversion = 'nada'
        vdata2 = '5432' * 250
        vserial2 = 'IHGFEDCB'
        oid3 = 'cdefghij'
        data3 = '3456' * 300
        serial3 = 'CDEFGHIJ'

        # Store them in the cache
        cache.store(oid1, data1, serial1, '', '', '')
        cache.store(oid2, data2, serial2, version2, vdata2, vserial2)
        cache.store(oid3, data3, serial3, '', '', '')

        # Verify that they are in file 0
        self.assert_(None is not cache._index.get(oid1) > 0)
        self.assert_(None is not cache._index.get(oid2) > 0)
        self.assert_(None is not cache._index.get(oid3) > 0)

        # Load them and verify that the loads return correct data
        self.assertEqual(cache.load(oid1, ''), (data1, serial1))
        self.assertEqual(cache.load(oid2, ''), (data2, serial2))
        self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
        self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
        self.assertEqual(cache.load(oid3, ''), (data3, serial3))

        # Verify that they are still in file 0
        self.assert_(None is not cache._index.get(oid1) > 0)
        self.assert_(None is not cache._index.get(oid2) > 0)
        self.assert_(None is not cache._index.get(oid3) > 0)

        # Cause a cache flip
        cache.checkSize(10*self.cachesize)

        # Load o1, o2, o4 again and verify that the loads return correct data
        self.assertEqual(cache.load(oid1, ''), (data1, serial1))
        self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
        self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
        self.assertEqual(cache.load(oid2, ''), (data2, serial2))

        # Verify that o1, o2, 04 are now in file 1, o3 still in file 0
        self.assert_(None is not cache._index.get(oid1) < 0)
        self.assert_(None is not cache._index.get(oid2) < 0)
        self.assert_(None is not cache._index.get(oid3) > 0)

        # Cause another cache flip
        cache.checkSize(10*self.cachesize)

        # Load o1 and o2 again and verify that the loads return correct data
        self.assertEqual(cache.load(oid1, ''), (data1, serial1))
        self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
        self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
        self.assertEqual(cache.load(oid2, ''), (data2, serial2))

        # Verify that o1 and o2 are now back in file 0, o3 is lost
        self.assert_(None is not cache._index.get(oid1) > 0)
        self.assert_(None is not cache._index.get(oid2) > 0)
        self.assert_(None is cache._index.get(oid3))

        # Invalidate version data for o2
        cache.invalidate(oid2, nonversion)
        self.assertEqual(cache.load(oid2, ''), (data2, serial2))
        self.assertEqual(cache.load(oid2, nonversion), None)
        self.assertEqual(cache.load(oid2, version2), None)

        # Cause another cache flip
        cache.checkSize(10*self.cachesize)

        # Load o1 and o2 again and verify that the loads return correct data
        self.assertEqual(cache.load(oid1, ''), (data1, serial1))
        self.assertEqual(cache.load(oid2, version2), None)
        self.assertEqual(cache.load(oid2, nonversion), None)
        self.assertEqual(cache.load(oid2, ''), (data2, serial2))

        # Verify that o1 and o2 are now in file 1
        self.assert_(None is not cache._index.get(oid1) < 0)
        self.assert_(None is not cache._index.get(oid2) < 0)

class PersistentClientCacheTests(unittest.TestCase):

    def setUp(self):
        unittest.TestCase.setUp(self)
        self.vardir = os.getcwd() # Don't use /tmp, it's a security risk
        self.cachesize = 10*1000*1000
        self.storagename = 'foo'
        self.clientname = 'test'
        # Predict file names
        fn0 = 'c%s-%s-0.zec' % (self.storagename, self.clientname)
        fn1 = 'c%s-%s-1.zec' % (self.storagename, self.clientname)
        for fn in fn0, fn1:
            fn = os.path.join(self.vardir, fn)
            try:
                os.unlink(fn)
            except os.error:
                pass
        self.openCache()

    def openCache(self):
        self.cache = ClientCache(storage=self.storagename,
                                 size=self.cachesize,
                                 client=self.clientname,
                                 var=self.vardir)
        self.cache.open()

    def reopenCache(self):
        self.cache.close()
        self.openCache()
        return self.cache

    def tearDown(self):
        self.cache.close()
        for filename in self.cache._p:
            if filename is not None:
                try:
                    os.unlink(filename)
                except os.error:
                    pass
        unittest.TestCase.tearDown(self)

    def testCacheFileSelection(self):
        # A bug in __init__ read the wrong slice of the file to determine
        # the serial number of the first record, reading the
        # last byte of the data size plus the first seven bytes of the
        # serial number.  This caused random selection of the proper
        # 'current' file when a persistent cache was opened.
        cache = self.cache
        self.assertEqual(cache._current, 0) # Check that file 0 is current
        oid = 'abcdefgh'
        data = '1234'
        serial = 'ABCDEFGH'
        cache.store(oid, data, serial, '', '', '')
        cache.checkSize(10*self.cachesize) # Force a file flip
        self.assertEqual(cache._current, 1) # Check that the flip worked
        oid = 'abcdefgh'
        data = '123'
        serial = 'ABCDEFGZ'
        cache.store(oid, data, serial, '', '', '')
        cache = self.reopenCache()
        loaded = cache.load(oid, '')
        # Check that we got the most recent data:
        self.assertEqual(loaded, (data, serial))
        self.assertEqual(cache._current, 1) # Double check that 1 is current

    def testInvalidationPersists(self):
        # A bug in invalidate() caused invalidation to overwrite the
        # 2nd byte of the data size on disk, rather rather than
        # overwriting the status byte.  For certain data sizes this
        # can be observed by reopening a persistent cache: the
        # invalidated data will appear valid (but with altered size).
        cache = self.cache
        magicsize = (ord('i') + 1) << 16
        cache = self.cache
        oid = 'abcdefgh'
        data = '!'*magicsize
        serial = 'ABCDEFGH'
        cache.store(oid, data, serial, '', '', '')
        loaded = cache.load(oid, '')
        self.assertEqual(loaded, (data, serial))
        cache.invalidate(oid, '')
        cache = self.reopenCache()
        loaded = cache.load(oid, '')
        if loaded != None:
            self.fail("invalidated data resurrected, size %d, was %d" %
                      (len(loaded[0]), len(data)))

def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(ClientCacheTests))
    suite.addTest(unittest.makeSuite(PersistentClientCacheTests))
    return suite

if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope/lib/python/ZEO/tests/testConnection.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test setup for ZEO connection logic.

The actual tests are in ConnectionTests.py; this file provides the
platform-dependent scaffolding.
"""

# System imports
import os
import time
import socket
import unittest

# Zope/ZODB3 imports
import zLOG

# ZEO test support
from ZEO.tests import forker

# Import the actual test class
from ZEO.tests.ConnectionTests import ConnectionTests

class UnixConnectionTests(ConnectionTests):

    """Add Unix-specific scaffolding to the generic test suite."""

    def startServer(self, create=1, index=0, read_only=0, ro_svr=0):
        zLOG.LOG("testZEO", zLOG.INFO,
                 "startServer(create=%d, index=%d, read_only=%d)" %
                 (create, index, read_only))
        path = "%s.%d" % (self.file, index)
        addr = self.addr[index]
        pid, server = forker.start_zeo_server(
            'FileStorage', (path, create, read_only), addr, ro_svr)
        self._pids.append(pid)
        self._servers.append(server)

    def shutdownServer(self, index=0):
        zLOG.LOG("testZEO", zLOG.INFO, "shutdownServer(index=%d)" % index)
        self._servers[index].close()
        if self._pids[index] is not None:
            try:
                os.waitpid(self._pids[index], 0)
                self._pids[index] = None
            except os.error, err:
                print err

class WindowsConnectionTests(ConnectionTests):

    """Add Windows-specific scaffolding to the generic test suite."""

    def startServer(self, create=1, index=0, read_only=0, ro_svr=0):
        zLOG.LOG("testZEO", zLOG.INFO,
                 "startServer(create=%d, index=%d, read_only=%d)" %
                 (create, index, read_only))
        path = "%s.%d" % (self.file, index)
        addr = self.addr[index]
        args = (path, '='+str(create), '='+str(read_only))
        _addr, test_addr, test_pid = forker.start_zeo_server(
            'FileStorage', args, addr, ro_svr)
        self._pids.append(test_pid)
        self._servers.append(test_addr)

    def shutdownServer(self, index=0):
        zLOG.LOG("testZEO", zLOG.INFO, "shutdownServer(index=%d)" % index)
        if self._servers[index] is not None:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect(self._servers[index])
            s.close()
            self._servers[index] = None
            # XXX waitpid() isn't available until Python 2.3
            time.sleep(0.5)

if os.name == "posix":
    test_classes = [UnixConnectionTests]
elif os.name == "nt":
    test_classes = [WindowsConnectionTests]
else:
    raise RuntimeError, "unsupported os: %s" % os.name

def test_suite():

    # shutup warnings about mktemp
    import warnings
    warnings.filterwarnings("ignore", "mktemp")

    suite = unittest.TestSuite()
    for klass in test_classes:
        sub = unittest.makeSuite(klass, 'check')
        suite.addTest(sub)
    return suite

if __name__ == "__main__":
    unittest.main(defaultTest='test_suite')


=== Added File Zope/lib/python/ZEO/tests/testStart.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

import os
import signal
import sys
import tempfile
import time
import unittest
import errno

import ZEO.start
from ZEO.ClientStorage import ClientStorage
from ZEO.util import Environment

try:
    from ZODB.tests.StorageTestBase import removefs
except ImportError:
    # for compatibility with Zope 2.5 &c.
    import errno

    def removefs(base):
        """Remove all files created by FileStorage with path base."""
        for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
            path = base + ext
            try:
                os.remove(path)
            except os.error, err:
                if err[0] != errno.ENOENT:
                    raise


class StartTests(unittest.TestCase):

    def setUp(self):
        startfile = ZEO.start.__file__
        if startfile[-1] == 'c':
            startfile = startfile[:-1]
        self.env = Environment(startfile)
        self.cmd = '%s %s' % (sys.executable, startfile)
        self.pids = {}

    def tearDown(self):
        try:
            self.stop_server()
            self.shutdown()
        finally:
            removefs("Data.fs")
            try:
                os.remove(self.env.zeo_pid)
            except os.error:
                pass

    def getpids(self):
        if not os.path.exists(self.env.zeo_pid):
            # If there's no pid file, assume the server isn't running
            return None, None
        return map(int, open(self.env.zeo_pid).read().split())

    def stop_server(self):
        ppid, pid = self.getpids()
        if ppid is None:
            return
        self.kill(pids=[pid])

    def kill(self, sig=signal.SIGTERM, pids=None):
        if pids is None:
            pids = self.pids.keys()
        for pid in pids:
            try:
                os.kill(pid, sig)
            except os.error, err:
                print err

    def wait(self, flag=0, pids=None):
        if pids is None:
            pids = self.pids.keys()
        alive = []
        for pid in pids:
            try:
                _pid, status = os.waitpid(pid, flag)
            except os.error, err:
                if err[0] == 10:
                    continue
                print err
            else:
                if status == 0:
                    alive.append(pid)
        return alive

    def shutdown(self):
        # XXX This is probably too complicated, but I'm not sure what
        # the right thing to do is.
        alive = self.wait(os.WNOHANG)
        if not alive:
            return
        self.kill(pids=alive)
        alive = self.wait(os.WNOHANG, alive)
        if not alive:
            return
        self.kill(signal.SIGKILL, pids=alive)
        alive = self.wait(pids=alive)

    def fork(self, *args):
        file = tempfile.mktemp()
        pid = os.fork()
        if pid:
            self.pids[pid] = file
            return file
        else:
            try:
                f = os.popen(self.cmd + " " + " ".join(args))
                buf = f.read()
                f.close()
                f = open(file, "wb")
                f.write(buf)
                f.close()
            finally:
                os._exit(0)

    def system(self, *args):
        file = self.fork(*args)
        self.wait()
        f = open(file, "rb")
        buf = f.read()
        f.close()
        return buf

    def connect(self, port=None, wait=1):
        cs = ClientStorage(('', port), wait=wait)
        cs.close()

    def testNoPort(self):
        outp = self.system("-s")
        self.assert_(outp.find("No port specified") != -1)

    def testStart(self):
        port = 9090
        outp = self.fork("-s", "-p", str(port))
        self.connect(port=port)

    def testLogRestart(self):
        port = 9090
        logfile1 = tempfile.mktemp(suffix="log")
        logfile2 = tempfile.mktemp(suffix="log")
        os.environ["STUPID_LOG_FILE"] = logfile1
        os.environ["EVENT_LOG_FILE"] = logfile1

        try:
            outp = self.fork("-s", "-p", str(port))
            self.connect(port=port)
            buf1 = None
            for i in range(10):
                try:
                    buf1 = open(logfile1).read()
                except IOError, e:
                    if e.errno != errno.ENOENT:
                        raise
                    time.sleep(1)
                else:
                    break
            self.assert_(buf1)
            os.rename(logfile1, logfile2)
            ppid, pid = self.getpids()
    ##        os.kill(ppid, signal.SIGHUP)
            os.kill(pid, signal.SIGHUP)
            self.connect(port=port)
            buf2 = open(logfile1).read()
            self.assert_(buf2)
        finally:
            self.shutdown()
            try:
                os.unlink(logfile1)
            except os.error:
                pass
            try:
                os.unlink(logfile2)
            except os.error:
                pass

def test_suite():

    # shutup warnings about mktemp
    import warnings
    warnings.filterwarnings("ignore", "mktemp")

    if os.name == "posix":
        return unittest.makeSuite(StartTests)
    else:
        # Don't even bother with these tests on Windows
        return None


=== Added File Zope/lib/python/ZEO/tests/testTransactionBuffer.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import random
import unittest

from ZEO.TransactionBuffer import TransactionBuffer

def random_string(size):
    """Return a random string of size size."""
    l = [chr(random.randrange(256)) for i in range(size)]
    return "".join(l)

def new_store_data():
    """Return arbitrary data to use as argument to store() method."""
    return random_string(8), '', random_string(random.randrange(1000))

def new_invalidate_data():
    """Return arbitrary data to use as argument to invalidate() method."""
    return random_string(8), ''

class TransBufTests(unittest.TestCase):

    def checkTypicalUsage(self):
        tbuf = TransactionBuffer()
        tbuf.store(*new_store_data())
        tbuf.invalidate(*new_invalidate_data())
        tbuf.begin_iterate()
        while 1:
            o = tbuf.next()
            if o is None:
                break
        tbuf.clear()

    def doUpdates(self, tbuf):
        data = []
        for i in range(10):
            d = new_store_data()
            tbuf.store(*d)
            data.append(d)
            d = new_invalidate_data()
            tbuf.invalidate(*d)
            data.append(d)

        tbuf.begin_iterate()
        for i in range(len(data)):
            x = tbuf.next()
            if x[2] is None:
                # the tbuf add a dummy None to invalidates
                x = x[:2]
            self.assertEqual(x, data[i])

    def checkOrderPreserved(self):
        tbuf = TransactionBuffer()
        self.doUpdates(tbuf)

    def checkReusable(self):
        tbuf = TransactionBuffer()
        self.doUpdates(tbuf)
        tbuf.clear()
        self.doUpdates(tbuf)
        tbuf.clear()
        self.doUpdates(tbuf)

def test_suite():
    return unittest.makeSuite(TransBufTests, 'check')


=== Added File Zope/lib/python/ZEO/tests/testZEO.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test suite for ZEO based on ZODB.tests."""

# System imports
import os
import sys
import time
import socket
import asyncore
import tempfile
import unittest

# Zope/ZODB3 imports
import zLOG

# ZODB test support
import ZODB
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle

# Handle potential absence of removefs
try:
    from ZODB.tests.StorageTestBase import removefs
except ImportError:
    # for compatibility with Zope 2.5 &c.
    import errno

    def removefs(base):
        """Remove all files created by FileStorage with path base."""
        for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
            path = base + ext
            try:
                os.remove(path)
            except os.error, err:
                if err[0] != errno.ENOENT:
                    raise

# ZODB test mixin classes
from ZODB.tests import StorageTestBase, BasicStorage, VersionStorage, \
     TransactionalUndoStorage, TransactionalUndoVersionStorage, \
     PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \
     MTStorage, ReadOnlyStorage

# ZEO imports
from ZEO.ClientStorage import ClientStorage
from ZEO.Exceptions import Disconnected

# ZEO test support
from ZEO.tests import forker, Cache

# ZEO test mixin classes
from ZEO.tests import CommitLockTests, ThreadTests

class DummyDB:
    def invalidate(self, *args):
        pass

class MiscZEOTests:

    """ZEO tests that don't fit in elsewhere."""

    def checkLargeUpdate(self):
        obj = MinPO("X" * (10 * 128 * 1024))
        self._dostore(data=obj)

    def checkZEOInvalidation(self):
        addr = self._storage._addr
        storage2 = ClientStorage(addr, wait=1, min_disconnect_poll=0.1)
        try:
            oid = self._storage.new_oid()
            ob = MinPO('first')
            revid1 = self._dostore(oid, data=ob)
            data, serial = storage2.load(oid, '')
            self.assertEqual(zodb_unpickle(data), MinPO('first'))
            self.assertEqual(serial, revid1)
            revid2 = self._dostore(oid, data=MinPO('second'), revid=revid1)
            for n in range(3):
                # Let the server and client talk for a moment.
                # Is there a better way to do this?
                asyncore.poll(0.1)
            data, serial = storage2.load(oid, '')
            self.assertEqual(zodb_unpickle(data), MinPO('second'),
                             'Invalidation message was not sent!')
            self.assertEqual(serial, revid2)
        finally:
            storage2.close()

class GenericTests(
    # Base class for all ZODB tests
    StorageTestBase.StorageTestBase,

    # ZODB test mixin classes (in the same order as imported)
    BasicStorage.BasicStorage,
    VersionStorage.VersionStorage,
    TransactionalUndoStorage.TransactionalUndoStorage,
    TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
    PackableStorage.PackableStorage,
    Synchronization.SynchronizedStorage,
    ConflictResolution.ConflictResolvingStorage,
    ConflictResolution.ConflictResolvingTransUndoStorage,
    RevisionStorage.RevisionStorage,
    MTStorage.MTStorage,
    ReadOnlyStorage.ReadOnlyStorage,

    # ZEO test mixin classes (in the same order as imported)
    Cache.StorageWithCache,
    Cache.TransUndoStorageWithCache,
    CommitLockTests.CommitLockTests,
    ThreadTests.ThreadTests,
    MiscZEOTests # Locally defined (see above)
    ):

    """Combine tests from various origins in one class."""

    def open(self, read_only=0):
        # XXX Needed to support ReadOnlyStorage tests.  Ought to be a
        # cleaner way.
        addr = self._storage._addr
        self._storage.close()
        self._storage = ClientStorage(addr, read_only=read_only, wait=1)

    def checkWriteMethods(self):
        # ReadOnlyStorage defines checkWriteMethods.  The decision
        # about where to raise the read-only error was changed after
        # Zope 2.5 was released.  So this test needs to detect Zope
        # of the 2.5 vintage and skip the test.

        # The __version__ attribute was not present in Zope 2.5.
        if hasattr(ZODB, "__version__"):
            ReadOnlyStorage.ReadOnlyStorage.checkWriteMethods(self)

class UnixTests(GenericTests):

    """Add Unix-specific scaffolding to the generic test suite."""

    def setUp(self):
        zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id())
        client, exit, pid = forker.start_zeo(*self.getStorage())
        self._pids = [pid]
        self._servers = [exit]
        self._storage = client
        client.registerDB(DummyDB(), None)

    def tearDown(self):
        self._storage.close()
        for server in self._servers:
            server.close()
        for pid in self._pids:
            os.waitpid(pid, 0)
        self.delStorage()

    def getStorage(self):
        self.__fs_base = tempfile.mktemp()
        return 'FileStorage', (self.__fs_base, '1')

    def delStorage(self):
        removefs(self.__fs_base)

class WindowsTests(GenericTests):

    """Add Windows-specific scaffolding to the generic test suite."""

    def setUp(self):
        zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id())
        args = self.getStorageInfo()
        name = args[0]
        args = args[1]
        zeo_addr, self.test_addr, self.test_pid = \
                  forker.start_zeo_server(name, args)
        storage = ClientStorage(zeo_addr, wait=1, min_disconnect_poll=0.1)
        self._storage = storage
        storage.registerDB(DummyDB(), None)

    def tearDown(self):
        self._storage.close()
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect(self.test_addr)
        s.close()
        # the connection should cause the storage server to die
        time.sleep(0.5)
        self.delStorage()

    def getStorageInfo(self):
        self.__fs_base = tempfile.mktemp()
        return 'FileStorage', (self.__fs_base, '1') # create=1

    def delStorage(self):
        removefs(self.__fs_base)

if os.name == "posix":
    test_classes = [UnixTests]
elif os.name == "nt":
    test_classes = [WindowsTests]
else:
    raise RuntimeError, "unsupported os: %s" % os.name

def test_suite():

    # shutup warnings about mktemp
    import warnings
    warnings.filterwarnings("ignore", "mktemp")

    suite = unittest.TestSuite()
    for klass in test_classes:
        sub = unittest.makeSuite(klass, 'check')
        suite.addTest(sub)
    return suite

if __name__ == "__main__":
    unittest.main(defaultTest='test_suite')


=== Added File Zope/lib/python/ZEO/tests/winserver.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Helper file used to launch ZEO server for Windows tests"""

import asyncore
import os
import random
import socket
import threading
import types

import ZEO.StorageServer

class ZEOTestServer(asyncore.dispatcher):
    """A trivial server for killing a server at the end of a test

    The server calls os._exit() as soon as it is connected to.  No
    chance to even send some data down the socket.
    """
    __super_init = asyncore.dispatcher.__init__

    def __init__(self, addr, storage):
        self.__super_init()
        self.storage = storage
        if type(addr) == types.StringType:
            self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
        else:
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(addr)
        self.listen(5)

    def handle_accept(self):
        sock, addr = self.accept()
        self.storage.close()
        os._exit(0)

def load_storage_class(name):
    package = __import__("ZODB." + name)
    mod = getattr(package, name)
    return getattr(mod, name)

def main(args):
    ro_svr = 0
    if args[0] == "-r":
        ro_svr = 1
        del args[0]
    port, storage_name, rawargs = args[0], args[1], args[2:]
    klass = load_storage_class(storage_name)
    args = []
    for arg in rawargs:
        if arg.startswith('='):
            arg = eval(arg[1:], {'__builtins__': {}})
        args.append(arg)
    storage = klass(*args)
    zeo_port = int(port)
    test_port = zeo_port + 1
    t = ZEOTestServer(('', test_port), storage)
    addr = ('', zeo_port)
    serv = ZEO.StorageServer.StorageServer(addr, {'1': storage}, ro_svr)
    asyncore.loop()
    # XXX The code below is evil because it can cause deadlocks in zrpc.
    # (To fix it, calling ThreadedAsync._start_loop() might help.)
##    import zLOG
##    label = "winserver:%d" % os.getpid()
##    while asyncore.socket_map:
##        zLOG.LOG(label, zLOG.DEBUG, "map: %r" % asyncore.socket_map)
##        asyncore.poll(30.0)

if __name__ == "__main__":
    import sys
    main(sys.argv[1:])