[Zope3-checkins] CVS: Zope3/src/zodb/zeo/tests - __init__.py:1.2 cache.py:1.2 commitlock.py:1.2 connection.py:1.2 forker.py:1.2 multi.py:1.2 speed.py:1.2 stress.py:1.2 test_cache.py:1.2 test_conn.py:1.2 test_tbuf.py:1.2 test_zeo.py:1.2 threadtests.py:1.2 zeoserver.py:1.2
Jim Fulton
jim@zope.com
Wed, 25 Dec 2002 09:13:55 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo/tests
In directory cvs.zope.org:/tmp/cvs-serv15352/src/zodb/zeo/tests
Added Files:
__init__.py cache.py commitlock.py connection.py forker.py
multi.py speed.py stress.py test_cache.py test_conn.py
test_tbuf.py test_zeo.py threadtests.py zeoserver.py
Log Message:
Grand renaming:
- Renamed most files (especially python modules) to lower case.
- Moved views and interfaces into separate hierarchies within each
project, where each top-level directory under the zope package
is a separate project.
- Moved everything to src from lib/python.
lib/python will eventually go away. I need access to the cvs
repository to make this happen, however.
There are probably some bits that are broken. All tests pass
and zope runs, but I haven't tried everything. There are a number
of cleanups I'll work on tomorrow.
=== Zope3/src/zodb/zeo/tests/__init__.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/tests/__init__.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,13 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
=== Zope3/src/zodb/zeo/tests/cache.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/tests/cache.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,105 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Tests of the ZEO cache"""
+
+from zodb.ztransaction import Transaction
+from zodb.storage.tests.minpo import MinPO
+from zodb.storage.tests.base import zodb_unpickle
+
+class TransUndoStorageWithCache:
+
+ def checkUndoInvalidation(self):
+ oid = self._storage.new_oid()
+ revid = self._dostore(oid, data=MinPO(23))
+ revid = self._dostore(oid, revid=revid, data=MinPO(24))
+ revid = self._dostore(oid, revid=revid, data=MinPO(25))
+
+ info = self._storage.undoInfo()
+ if not info:
+ # XXX perhaps we have an old storage implementation that
+ # does do the negative nonsense
+ info = self._storage.undoInfo(0, 20)
+ tid = info[0]['id']
+
+ # We may need to bail at this point if the storage doesn't
+ # support transactional undo
+ if not self._storage.supportsTransactionalUndo():
+ return
+
+ # Now start an undo transaction
+ t = Transaction()
+ t.note('undo1')
+ self._storage.tpc_begin(t)
+
+ oids = self._storage.transactionalUndo(tid, t)
+
+ # Make sure this doesn't load invalid data into the cache
+ self._storage.load(oid, '')
+
+ self._storage.tpc_vote(t)
+ self._storage.tpc_finish(t)
+
+ assert len(oids) == 1
+ assert oids[0] == oid
+ data, revid = self._storage.load(oid, '')
+ obj = zodb_unpickle(data)
+ assert obj == MinPO(24)
+
+class StorageWithCache:
+
+ def checkAbortVersionInvalidation(self):
+ oid = self._storage.new_oid()
+ revid = self._dostore(oid, data=MinPO(1))
+ revid = self._dostore(oid, revid=revid, data=MinPO(2))
+ revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
+ revid = self._dostore(oid, revid=revid, data=MinPO(4), version="foo")
+ t = Transaction()
+ self._storage.tpc_begin(t)
+ self._storage.abortVersion("foo", t)
+ self._storage.load(oid, "foo")
+ self._storage.tpc_vote(t)
+ self._storage.tpc_finish(t)
+ data, revid = self._storage.load(oid, "foo")
+ obj = zodb_unpickle(data)
+ assert obj == MinPO(2), obj
+
+ def checkCommitEmptyVersionInvalidation(self):
+ oid = self._storage.new_oid()
+ revid = self._dostore(oid, data=MinPO(1))
+ revid = self._dostore(oid, revid=revid, data=MinPO(2))
+ revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
+ t = Transaction()
+ self._storage.tpc_begin(t)
+ self._storage.commitVersion("foo", "", t)
+ self._storage.load(oid, "")
+ self._storage.tpc_vote(t)
+ self._storage.tpc_finish(t)
+ data, revid = self._storage.load(oid, "")
+ obj = zodb_unpickle(data)
+ assert obj == MinPO(3), obj
+
+ def checkCommitVersionInvalidation(self):
+ oid = self._storage.new_oid()
+ revid = self._dostore(oid, data=MinPO(1))
+ revid = self._dostore(oid, revid=revid, data=MinPO(2))
+ revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
+ t = Transaction()
+ self._storage.tpc_begin(t)
+ self._storage.commitVersion("foo", "bar", t)
+ self._storage.load(oid, "")
+ self._storage.tpc_vote(t)
+ self._storage.tpc_finish(t)
+ data, revid = self._storage.load(oid, "bar")
+ obj = zodb_unpickle(data)
+ assert obj == MinPO(3), obj
=== Zope3/src/zodb/zeo/tests/commitlock.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/tests/commitlock.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,219 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Tests of the distributed commit lock."""
+
+import threading
+import time
+
+from zodb.ztransaction import Transaction
+from zodb.timestamp import TimeStamp
+from zodb.storage.tests.base import zodb_pickle, MinPO
+
+from zodb.zeo.client import ClientStorage
+from zodb.zeo.interfaces import Disconnected
+
+ZERO = '\0'*8
+
+class DummyDB:
+ def invalidate(self, *args):
+ pass
+
+class TestThread(threading.Thread):
+ __super_init = threading.Thread.__init__
+ __super_run = threading.Thread.run
+
+ def __init__(self, testcase, group=None, target=None, name=None,
+ args=(), kwargs={}, verbose=None):
+ self.__super_init(group, target, name, args, kwargs, verbose)
+ self.setDaemon(1)
+ self._testcase = testcase
+
+ def run(self):
+ try:
+ self.testrun()
+ except Exception:
+ s = StringIO()
+ traceback.print_exc(file=s)
+ self._testcase.fail("Exception in thread %s:\n%s\n" %
+ (self, s.getvalue()))
+
+ def cleanup(self, timeout=15):
+ self.join(timeout)
+ if self.isAlive():
+ self._testcase.fail("Thread did not finish: %s" % self)
+
+class WorkerThread(TestThread):
+
+ # run the entire test in a thread so that the blocking call for
+ # tpc_vote() doesn't hang the test suite.
+
+ def __init__(self, testcase, storage, trans, method="tpc_finish"):
+ self.storage = storage
+ self.trans = trans
+ self.method = method
+ self.ready = threading.Event()
+ TestThread.__init__(self, testcase)
+
+ def testrun(self):
+ try:
+ self.storage.tpc_begin(self.trans)
+ oid = self.storage.new_oid()
+ p = zodb_pickle(MinPO("c"))
+ self.storage.store(oid, ZERO, p, '', self.trans)
+ oid = self.storage.new_oid()
+ p = zodb_pickle(MinPO("c"))
+ self.storage.store(oid, ZERO, p, '', self.trans)
+ self.ready.set()
+ self.storage.tpc_vote(self.trans)
+ if self.method == "tpc_finish":
+ self.storage.tpc_finish(self.trans)
+ else:
+ self.storage.tpc_abort(self.trans)
+ except Disconnected:
+ pass
+
+class CommitLockTests:
+
+ # The commit lock tests verify that the storage successfully
+ # blocks and restarts transactions when there is content for a
+ # single storage. There are a lot of cases to cover.
+
+ # CommitLock1 checks the case where a single transaction delays
+ # other transactions before they actually block. IOW, by the time
+ # the other transactions get to the vote stage, the first
+ # transaction has finished.
+
+ def checkCommitLock1OnCommit(self):
+ self._storages = []
+ try:
+ self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
+ finally:
+ self._cleanup()
+
+ def checkCommitLock1OnAbort(self):
+ self._storages = []
+ try:
+ self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
+ finally:
+ self._cleanup()
+
+ def checkCommitLock2OnCommit(self):
+ self._storages = []
+ try:
+ self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
+ finally:
+ self._cleanup()
+
+ def checkCommitLock2OnAbort(self):
+ self._storages = []
+ try:
+ self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
+ finally:
+ self._cleanup()
+
+ def _cleanup(self):
+ for store, trans in self._storages:
+ store.tpc_abort(trans)
+ store.close()
+ self._storages = []
+
+ def _checkCommitLock(self, method_name, dosetup, dowork):
+ # check the commit lock when a client attemps a transaction,
+ # but fails/exits before finishing the commit.
+
+ # The general flow of these tests is to start a transaction by
+ # calling tpc_begin(). Then begin one or more other
+ # connections that also want to commit. This causes the
+ # commit lock code to be exercised. Once the other
+ # connections are started, the first transaction completes.
+ # Either by commit or abort, depending on whether method_name
+ # is "tpc_finish."
+
+ # The tests are parameterized by method_name, dosetup(), and
+ # dowork(). The dosetup() function is called with a
+ # connectioned client storage, transaction, and timestamp.
+ # Any work it does occurs after the first transaction has
+ # started, but before it finishes. The dowork() function
+ # executes after the first transaction has completed.
+
+ # Start on transaction normally and get the lock.
+ t = Transaction()
+ self._storage.tpc_begin(t)
+ oid = self._storage.new_oid()
+ self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
+ self._storage.tpc_vote(t)
+
+ # Start a second transaction on a different connection without
+ # blocking the test thread.
+ self._storages = []
+ for i in range(4):
+ storage2 = self._duplicate_client()
+ t2 = Transaction()
+ tid = self._get_timestamp()
+ dosetup(storage2, t2, tid)
+ if i == 0:
+ storage2.close()
+ else:
+ self._storages.append((storage2, t2))
+
+ if method_name == "tpc_finish":
+ self._storage.tpc_finish(t)
+ self._storage.load(oid, '')
+ else:
+ self._storage.tpc_abort(t)
+
+ dowork(method_name)
+
+ # Make sure the server is still responsive
+ self._dostore()
+
+ def _dosetup1(self, storage, trans, tid):
+ storage.tpc_begin(trans, tid)
+
+ def _dowork1(self, method_name):
+ for store, trans in self._storages:
+ oid = store.new_oid()
+ store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
+ store.tpc_vote(trans)
+ if method_name == "tpc_finish":
+ store.tpc_finish(trans)
+ else:
+ store.tpc_abort(trans)
+
+ def _dosetup2(self, storage, trans, tid):
+ self._threads = []
+ t = WorkerThread(self, storage, trans)
+ self._threads.append(t)
+ t.start()
+ t.ready.wait()
+
+ def _dowork2(self, method_name):
+ for t in self._threads:
+ t.cleanup()
+
+ def _duplicate_client(self):
+ "Open another ClientStorage to the same server."
+ # XXX argh it's hard to find the actual address
+ # The rpc mgr addr attribute is a list. Each element in the
+ # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
+ # address.
+ addr = self._storage._addr
+ new = ClientStorage(addr, wait=1)
+ new.registerDB(DummyDB())
+ return new
+
+ def _get_timestamp(self):
+ t = time.time()
+ ts = TimeStamp(*(time.gmtime(t)[:5] + (t % 60,)))
+ return ts.raw()
=== Zope3/src/zodb/zeo/tests/connection.py 1.1 => 1.2 === (533/633 lines abridged)
--- /dev/null Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/tests/connection.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,630 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+
+import os
+import sys
+import time
+import random
+import select
+import socket
+import asyncore
+import tempfile
+import threading
+import logging
+
+from zodb.zeo.client import ClientStorage
+from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.zrpc.marshal import Marshaller
+from zodb.zeo.tests import forker
+
+from transaction import get_transaction
+from zodb.interfaces import ReadOnlyError
+from zodb.ztransaction import Transaction
+from zodb.storage.tests.base import StorageTestBase
+from zodb.storage.tests.minpo import MinPO
+from zodb.storage.tests.base import zodb_pickle, zodb_unpickle
+from zodb.storage.tests.base import handle_all_serials, ZERO
+
+
+class DummyDB:
+ def invalidate(self, *args, **kws):
+ pass
+
+
+class CommonSetupTearDown(StorageTestBase):
+ """Tests that explicitly manage the server process.
+
[-=- -=- -=- 533 lines omitted -=- -=- -=-]
+ # expanded in-line (mostly).
+
+ # Create oid->serial mappings
+ for c in clients:
+ c.__oids = []
+ c.__serials = {}
+
+ # Begin a transaction
+ t = Transaction()
+ for c in clients:
+ #print "%s.%s.%s begin\n" % (tname, c.__name, i),
+ c.tpc_begin(t)
+
+ for j in range(testcase.nobj):
+ for c in clients:
+ # Create and store a new object on each server
+ oid = c.new_oid()
+ c.__oids.append(oid)
+ data = MinPO("%s.%s.t%d.o%d" % (tname, c.__name, i, j))
+ #print data.value
+ data = zodb_pickle(data)
+ s = c.store(oid, ZERO, data, '', t)
+ c.__serials.update(handle_all_serials(oid, s))
+
+ # Vote on all servers and handle serials
+ for c in clients:
+ #print "%s.%s.%s vote\n" % (tname, c.__name, i),
+ s = c.tpc_vote(t)
+ c.__serials.update(handle_all_serials(None, s))
+
+ # Finish on all servers
+ for c in clients:
+ #print "%s.%s.%s finish\n" % (tname, c.__name, i),
+ c.tpc_finish(t)
+
+ for c in clients:
+ # Check that we got serials for all oids
+ for oid in c.__oids:
+ testcase.failUnless(c.__serials.has_key(oid))
+ # Check that we got serials for no other oids
+ for oid in c.__serials.keys():
+ testcase.failUnless(oid in c.__oids)
+
+ def closeclients(self):
+ # Close clients opened by run()
+ for c in self.clients:
+ try:
+ c.close()
+ except:
+ pass
=== Zope3/src/zodb/zeo/tests/forker.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/tests/forker.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,111 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Library for forking storage server and connecting client storage"""
+
+import os
+import sys
+import time
+import errno
+import random
+import socket
+import tempfile
+import traceback
+import logging
+
+# Change value of PROFILE to enable server-side profiling
+PROFILE = False
+if PROFILE:
+ import hotshot
+
+
+def get_port():
+ """Return a port that is not in use.
+
+ Checks if a port is in use by trying to connect to it. Assumes it
+ is not in use if connect raises an exception.
+
+ Raises RuntimeError after 10 tries.
+ """
+ for i in range(10):
+ port = random.randrange(20000, 30000)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ try:
+ s.connect(('localhost', port))
+ except socket.error:
+ # XXX check value of error?
+ return port
+ finally:
+ s.close()
+ raise RuntimeError, "Can't find port"
+
+
+def start_zeo_server(conf, addr=None, ro_svr=False, keep=False):
+ """Start a ZEO server in a separate process.
+
+ Returns the ZEO port, the test server port, and the pid.
+ """
+ # Store the config info in a temp file.
+ tmpfile = tempfile.mktemp()
+ fp = open(tmpfile, 'w')
+ fp.write(conf)
+ fp.close()
+ # Create the server
+ import zodb.zeo.tests.zeoserver
+ if addr is None:
+ port = get_port()
+ else:
+ port = addr[1]
+ script = zodb.zeo.tests.zeoserver.__file__
+ if script.endswith('.pyc'):
+ script = script[:-1]
+ # Create a list of arguments, which we'll tuplify below
+ args = [sys.executable, script, '-C', tmpfile]
+ if ro_svr:
+ args.append('-r')
+ if keep:
+ args.append('-k')
+ args.append(str(port))
+ d = os.environ.copy()
+ d['PYTHONPATH'] = os.pathsep.join(sys.path)
+ pid = os.spawnve(os.P_NOWAIT, sys.executable, tuple(args), d)
+ adminaddr = ('localhost', port+1)
+ # We need to wait until the server starts, but not forever
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ for i in range(5):
+ try:
+ logging.debug('forker: connect %s', i)
+ s.connect(adminaddr)
+ ack = s.recv(1024)
+ logging.debug('forker: acked: %s', ack)
+ break
+ except socket.error, e:
+ if e[0] <> errno.ECONNREFUSED: raise
+ time.sleep(1)
+ else:
+ logging.debug('forker: boo hoo')
+ raise
+ return ('localhost', port), adminaddr, pid
+
+
+def shutdown_zeo_server(adminaddr):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(adminaddr)
+ try:
+ ack = s.recv(1024)
+ except socket.error, e:
+ if e[0] <> errno.ECONNRESET: raise
+ ack = 'no ack received'
+ logging.debug('shutdownServer: acked: %s', ack)
+ s.close()
=== Zope3/src/zodb/zeo/tests/multi.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/tests/multi.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,160 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""A multi-client test of the ZEO storage server"""
+# XXX This code is currently broken.
+
+import zodb.db
+from zodb.storage.file import FileStorage
+import zodb.interfaces
+import persistence
+import PersistentMapping
+from zodb.zeo.tests import forker
+
+import asyncore
+import os
+import tempfile
+import time
+import types
+
+VERBOSE = 1
+CLIENTS = 4
+RECORDS_PER_CLIENT = 100
+CONFLICT_DELAY = 0.1
+CONNECT_DELAY = 0.1
+CLIENT_CACHE = '' # use temporary cache
+
+class Record(Persistence.Persistent):
+ def __init__(self, client=None, value=None):
+ self.client = client
+ self.value = None
+ self.next = None
+
+ def set_next(self, next):
+ self.next = next
+
+class Stats(Persistence.Persistent):
+ def __init__(self):
+ self.begin = time.time()
+ self.end = None
+
+ def done(self):
+ self.end = time.time()
+
+def init_storage():
+ path = tempfile.mktemp()
+ if VERBOSE:
+ print "FileStorage path:", path
+ fs = FileStorage(path)
+
+ db = zodb.db(fs)
+ root = db.open().root()
+ root["multi"] = PersistentMapping.PersistentMapping()
+ get_transaction().commit()
+
+ return fs
+
+def start_server(addr):
+ storage = init_storage()
+ pid, exit = forker.start_zeo_server(storage, addr)
+ return pid, exit
+
+def start_client(addr, client_func=None):
+ pid = os.fork()
+ if pid == 0:
+ try:
+ import ZEO.ClientStorage
+ if VERBOSE:
+ print "Client process started:", os.getpid()
+ cli = ZEO.ClientStorage.ClientStorage(addr, client=CLIENT_CACHE)
+ if client_func is None:
+ run(cli)
+ else:
+ client_func(cli)
+ cli.close()
+ finally:
+ os._exit(0)
+ else:
+ return pid
+
+def run(storage):
+ if hasattr(storage, 'is_connected'):
+ while not storage.is_connected():
+ time.sleep(CONNECT_DELAY)
+ pid = os.getpid()
+ print "Client process connected:", pid, storage
+ db = zodb.db(storage)
+ root = db.open().root()
+ while 1:
+ try:
+ s = root[pid] = Stats()
+ get_transaction().commit()
+ except zodb.interfaces.ConflictError:
+ get_transaction().abort()
+ time.sleep(CONFLICT_DELAY)
+ else:
+ break
+
+ dict = root["multi"]
+ prev = None
+ i = 0
+ while i < RECORDS_PER_CLIENT:
+ try:
+ size = len(dict)
+ r = dict[size] = Record(pid, size)
+ if prev:
+ prev.set_next(r)
+ get_transaction().commit()
+ except zodb.interfaces.ConflictError, err:
+ get_transaction().abort()
+ time.sleep(CONFLICT_DELAY)
+ else:
+ i = i + 1
+ if VERBOSE and (i < 5 or i % 10 == 0):
+ print "Client %s: %s of %s" % (pid, i, RECORDS_PER_CLIENT)
+ s.done()
+ get_transaction().commit()
+
+ print "Client completed:", pid
+
+def main(client_func=None):
+ if VERBOSE:
+ print "Main process:", os.getpid()
+ addr = tempfile.mktemp()
+ t0 = time.time()
+ server_pid, server = start_server(addr)
+ t1 = time.time()
+ pids = []
+ for i in range(CLIENTS):
+ pids.append(start_client(addr, client_func))
+ for pid in pids:
+ assert type(pid) == types.IntType, "invalid pid type: %s (%s)" % \
+ (repr(pid), type(pid))
+ try:
+ if VERBOSE:
+ print "waitpid(%s)" % repr(pid)
+ os.waitpid(pid, 0)
+ except os.error, err:
+ print "waitpid(%s) failed: %s" % (repr(pid), err)
+ t2 = time.time()
+ server.close()
+ os.waitpid(server_pid, 0)
+
+ # XXX Should check that the results are consistent!
+
+ print "Total time:", t2 - t0
+ print "Server start time", t1 - t0
+ print "Client time:", t2 - t1
+
+if __name__ == "__main__":
+ main()
=== Zope3/src/zodb/zeo/tests/speed.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/tests/speed.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,215 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+usage="""Test speed of a ZODB storage
+
+Options:
+
+ -d file The data file to use as input.
+ The default is this script.
+
+ -n n The number of repititions
+
+ -s module A module that defines a 'Storage'
+ attribute, which is an open storage.
+ If not specified, a FileStorage will ne
+ used.
+
+ -z Test compressing data
+
+ -D Run in debug mode
+
+ -L Test loads as well as stores by minimizing
+ the cache after eachrun
+
+ -M Output means only
+
+ -C Run with a persistent client cache
+
+ -U Run ZEO using a Unix domain socket
+
+ -t n Number of concurrent threads to run.
+"""
+
+import asyncore
+import sys, os, getopt, string, time
+##sys.path.insert(0, os.getcwd())
+
+import zodb.db
+import persistence
+import ZEO.ClientStorage, ZEO.StorageServer
+from zodb.zeo.tests import forker
+from zodb.interfaces import ConflictError
+
+class P(Persistence.Persistent):
+ pass
+
+fs_name = "zeo-speed.fs"
+
+class ZEOExit(asyncore.file_dispatcher):
+ """Used to exit ZEO.StorageServer when run is done"""
+ def writable(self):
+ return 0
+ def readable(self):
+ return 1
+ def handle_read(self):
+ buf = self.recv(4)
+ assert buf == "done"
+ self.delete_fs()
+ os._exit(0)
+ def handle_close(self):
+ print "Parent process exited unexpectedly"
+ self.delete_fs()
+ os._exit(0)
+ def delete_fs(self):
+ os.unlink(fs_name)
+ os.unlink(fs_name + ".lock")
+ os.unlink(fs_name + ".tmp")
+
+def work(db, results, nrep, compress, data, detailed, minimize, threadno=None):
+ for j in range(nrep):
+ for r in 1, 10, 100, 1000:
+ t = time.time()
+ conflicts = 0
+
+ jar = db.open()
+ while 1:
+ try:
+ get_transaction().begin()
+ rt = jar.root()
+ key = 's%s' % r
+ if rt.has_key(key):
+ p = rt[key]
+ else:
+ rt[key] = p =P()
+ for i in range(r):
+ v = getattr(p, str(i), P())
+ if compress is not None:
+ v.d = compress(data)
+ else:
+ v.d = data
+ setattr(p, str(i), v)
+ get_transaction().commit()
+ except ConflictError:
+ conflicts = conflicts + 1
+ else:
+ break
+ jar.close()
+
+ t = time.time() - t
+ if detailed:
+ if threadno is None:
+ print "%s\t%s\t%.4f\t%d" % (j, r, t, conflicts)
+ else:
+ print "%s\t%s\t%.4f\t%d\t%d" % (j, r, t, conflicts,
+ threadno)
+ results[r].append((t, conflicts))
+ rt=d=p=v=None # release all references
+ if minimize:
+ time.sleep(3)
+ jar.cacheMinimize(3)
+
+def main(args):
+ opts, args = getopt.getopt(args, 'zd:n:Ds:LMt:U')
+ s = None
+ compress = None
+ data=sys.argv[0]
+ nrep=5
+ minimize=0
+ detailed=1
+ cache = None
+ domain = 'AF_INET'
+ threads = 1
+ for o, v in opts:
+ if o=='-n': nrep = int(v)
+ elif o=='-d': data = v
+ elif o=='-s': s = v
+ elif o=='-z':
+ import zlib
+ compress = zlib.compress
+ elif o=='-L':
+ minimize=1
+ elif o=='-M':
+ detailed=0
+ elif o=='-D':
+ global debug
+ os.environ['STUPID_LOG_FILE']=''
+ os.environ['STUPID_LOG_SEVERITY']='-999'
+ debug = 1
+ elif o == '-C':
+ cache = 'speed'
+ elif o == '-U':
+ domain = 'AF_UNIX'
+ elif o == '-t':
+ threads = int(v)
+
+ zeo_pipe = None
+ if s:
+ s = __import__(s, globals(), globals(), ('__doc__',))
+ s = s.Storage
+ server = None
+ else:
+ s, server, pid = forker.start_zeo("FileStorage",
+ (fs_name, 1), domain=domain)
+
+ data=open(data).read()
+ db=zodb.db(s,
+ # disable cache deactivation
+ cache_size=4000,
+ cache_deactivate_after=6000,)
+
+ print "Beginning work..."
+ results={1:[], 10:[], 100:[], 1000:[]}
+ if threads > 1:
+ import threading
+ l = []
+ for i in range(threads):
+ t = threading.Thread(target=work,
+ args=(db, results, nrep, compress, data,
+ detailed, minimize, i))
+ l.append(t)
+ for t in l:
+ t.start()
+ for t in l:
+ t.join()
+
+ else:
+ work(db, results, nrep, compress, data, detailed, minimize)
+
+ if server is not None:
+ server.close()
+ os.waitpid(pid, 0)
+
+ if detailed:
+ print '-'*24
+ print "num\tmean\tmin\tmax"
+ for r in 1, 10, 100, 1000:
+ times = []
+ for time, conf in results[r]:
+ times.append(time)
+ t = mean(times)
+ print "%d\t%.4f\t%.4f\t%.4f" % (r, t, min(times), max(times))
+
+def mean(l):
+ tot = 0
+ for v in l:
+ tot = tot + v
+ return tot / len(l)
+
+##def compress(s):
+## c = zlib.compressobj()
+## o = c.compress(s)
+## return o + c.flush()
+
+if __name__=='__main__':
+ main(sys.argv[1:])
=== Zope3/src/zodb/zeo/tests/stress.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/tests/stress.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,139 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""A ZEO client-server stress test to look for leaks.
+
+The stress test should run in an infinite loop and should involve
+multiple connections.
+"""
+# XXX This code is currently broken.
+
+from __future__ import nested_scopes
+
+import zodb.db
+from zodb.zeo.client import ClientStorage
+from zodb.storage.mapping import MappingStorage
+from zodb.zeo.tests import forker
+from zodb.storage.tests import MinPO
+
+import os
+import random
+import sys
+import types
+
+NUM_TRANSACTIONS_PER_CONN = 10
+NUM_CONNECTIONS = 10
+NUM_ROOTS = 20
+MAX_DEPTH = 20
+MIN_OBJSIZE = 128
+MAX_OBJSIZE = 2048
+
+def an_object():
+ """Return an object suitable for a PersistentMapping key"""
+ size = random.randrange(MIN_OBJSIZE, MAX_OBJSIZE)
+ if os.path.exists("/dev/urandom"):
+ f = open("/dev/urandom")
+ buf = f.read(size)
+ f.close()
+ return buf
+ else:
+ f = open(MinPO.__file__)
+ l = list(f.read(size))
+ f.close()
+ random.shuffle(l)
+ return "".join(l)
+
+def setup(cn):
+ """Initialize the database with some objects"""
+ root = cn.root()
+ for i in range(NUM_ROOTS):
+ prev = an_object()
+ for j in range(random.randrange(1, MAX_DEPTH)):
+ o = MinPO.MinPO(prev)
+ prev = o
+ root[an_object()] = o
+ get_transaction().commit()
+ cn.close()
+
+def work(cn):
+ """Do some work with a transaction"""
+ cn.sync()
+ root = cn.root()
+ obj = random.choice(root.values())
+ # walk down to the bottom
+ while not isinstance(obj.value, types.StringType):
+ obj = obj.value
+ obj.value = an_object()
+ get_transaction().commit()
+
+def main():
+ # Yuck! Need to cleanup forker so that the API is consistent
+ # across Unix and Windows, at least if that's possible.
+ if os.name == "nt":
+ zaddr, tport, pid = forker.start_zeo_server('MappingStorage', ())
+ def exitserver():
+ import socket
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(tport)
+ s.close()
+ else:
+ zaddr = '', random.randrange(20000, 30000)
+ pid, exitobj = forker.start_zeo_server(MappingStorage(), zaddr)
+ def exitserver():
+ exitobj.close()
+
+ while 1:
+ pid = start_child(zaddr)
+ print "started", pid
+ os.waitpid(pid, 0)
+
+ exitserver()
+
+def start_child(zaddr):
+
+ pid = os.fork()
+ if pid != 0:
+ return pid
+ try:
+ _start_child(zaddr)
+ finally:
+ os._exit(0)
+
+def _start_child(zaddr):
+ storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5, wait=1)
+ db = zodb.db(storage, pool_size=NUM_CONNECTIONS)
+ setup(db.open())
+ conns = []
+ conn_count = 0
+
+ for i in range(NUM_CONNECTIONS):
+ c = db.open()
+ c.__count = 0
+ conns.append(c)
+ conn_count += 1
+
+ while conn_count < 25:
+ c = random.choice(conns)
+ if c.__count > NUM_TRANSACTIONS_PER_CONN:
+ conns.remove(c)
+ c.close()
+ conn_count += 1
+ c = db.open()
+ c.__count = 0
+ conns.append(c)
+ else:
+ c.__count += 1
+ work(c)
+
+if __name__ == "__main__":
+ main()
=== Zope3/src/zodb/zeo/tests/test_cache.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/tests/test_cache.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,358 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Test suite for the ZEO.ClientCache module.
+
+At times, we do 'white box' testing, i.e. we know about the internals
+of the ClientCache object.
+"""
+from __future__ import nested_scopes
+
+import os
+import time
+import tempfile
+import unittest
+
+from zodb.zeo.cache import ClientCache
+
+class ClientCacheTests(unittest.TestCase):
+
+ def setUp(self):
+ unittest.TestCase.setUp(self)
+ self.cachesize = 10*1000*1000
+ self.cache = ClientCache(size=self.cachesize)
+ self.cache.open()
+
+ def tearDown(self):
+ self.cache.close()
+ unittest.TestCase.tearDown(self)
+
+ def testOpenClose(self):
+ pass # All the work is done by setUp() / tearDown()
+
+ def testStoreLoad(self):
+ cache = self.cache
+ oid = 'abcdefgh'
+ data = '1234'*100
+ serial = 'ABCDEFGH'
+ cache.store(oid, data, serial, '', '', '')
+ loaded = cache.load(oid, '')
+ self.assertEqual(loaded, (data, serial))
+
+ def testMissingLoad(self):
+ cache = self.cache
+ oid = 'abcdefgh'
+ data = '1234'*100
+ serial = 'ABCDEFGH'
+ cache.store(oid, data, serial, '', '', '')
+ loaded = cache.load('garbage1', '')
+ self.assertEqual(loaded, None)
+
+ def testInvalidate(self):
+ cache = self.cache
+ oid = 'abcdefgh'
+ data = '1234'*100
+ serial = 'ABCDEFGH'
+ cache.store(oid, data, serial, '', '', '')
+ loaded = cache.load(oid, '')
+ self.assertEqual(loaded, (data, serial))
+ cache.invalidate(oid, '')
+ loaded = cache.load(oid, '')
+ self.assertEqual(loaded, None)
+
+ def testVersion(self):
+ cache = self.cache
+ oid = 'abcdefgh'
+ data = '1234'*100
+ serial = 'ABCDEFGH'
+ vname = 'myversion'
+ vdata = '5678'*200
+ vserial = 'IJKLMNOP'
+ cache.store(oid, data, serial, vname, vdata, vserial)
+ loaded = cache.load(oid, '')
+ self.assertEqual(loaded, (data, serial))
+ vloaded = cache.load(oid, vname)
+ self.assertEqual(vloaded, (vdata, vserial))
+
+ def testVersionOnly(self):
+ cache = self.cache
+ oid = 'abcdefgh'
+ data = ''
+ serial = ''
+ vname = 'myversion'
+ vdata = '5678'*200
+ vserial = 'IJKLMNOP'
+ cache.store(oid, data, serial, vname, vdata, vserial)
+ loaded = cache.load(oid, '')
+ self.assertEqual(loaded, None)
+ vloaded = cache.load(oid, vname)
+ self.assertEqual(vloaded, (vdata, vserial))
+
+ def testInvalidateNonVersion(self):
+ cache = self.cache
+ oid = 'abcdefgh'
+ data = '1234'*100
+ serial = 'ABCDEFGH'
+ vname = 'myversion'
+ vdata = '5678'*200
+ vserial = 'IJKLMNOP'
+ cache.store(oid, data, serial, vname, vdata, vserial)
+ loaded = cache.load(oid, '')
+ self.assertEqual(loaded, (data, serial))
+ vloaded = cache.load(oid, vname)
+ self.assertEqual(vloaded, (vdata, vserial))
+ cache.invalidate(oid, '')
+ loaded = cache.load(oid, '')
+ self.assertEqual(loaded, None)
+ # The version data is also invalidated at this point
+ vloaded = cache.load(oid, vname)
+ self.assertEqual(vloaded, None)
+
+ def testInvalidateVersion(self):
+ # Invalidating a version should not invalidate the non-version data.
+ # (This tests for the same bug as testInvalidatePersists below.)
+ cache = self.cache
+ oid = 'abcdefgh'
+ data = '1234'*100
+ serial = 'ABCDEFGH'
+ cache.store(oid, data, serial, '', '', '')
+ loaded = cache.load(oid, '')
+ self.assertEqual(loaded, (data, serial))
+ cache.invalidate(oid, 'bogus')
+ loaded = cache.load(oid, '')
+ self.assertEqual(loaded, (data, serial))
+
+ def testVerify(self):
+ cache = self.cache
+ results = []
+ def verifier(oid, serial, vserial):
+ results.append((oid, serial, vserial))
+ cache.verify(verifier)
+ self.assertEqual(results, [])
+ oid = 'abcdefgh'
+ data = '1234'*100
+ serial = 'ABCDEFGH'
+ cache.store(oid, data, serial, '', '', '')
+ results = []
+ cache.verify(verifier)
+ self.assertEqual(results, [(oid, serial, None)])
+
+ def testCheckSize(self):
+ # Make sure that cache._index[oid] is erased for oids that are
+ # stored in the cache file that's rewritten after a flip.
+ cache = self.cache
+ oid = 'abcdefgh'
+ data = '1234'*100
+ serial = 'ABCDEFGH'
+ cache.store(oid, data, serial, '', '', '')
+ cache.checkSize(10*self.cachesize) # Force a file flip
+ oid2 = 'abcdefgz'
+ data2 = '1234'*10
+ serial2 = 'ABCDEFGZ'
+ cache.store(oid2, data2, serial2, '', '', '')
+ cache.checkSize(10*self.cachesize) # Force another file flip
+ self.assertNotEqual(cache._index.get(oid2), None)
+ self.assertEqual(cache._index.get(oid), None)
+
+ def testCopyToCurrent(self):
+ # - write some objects to cache file 0
+ # - force a flip
+ # - write some objects to cache file 1
+ # - load some objects that are in cache file 0
+ # - load the same objects, making sure they are now in file 1
+ # - write some more objects
+ # - force another flip
+ # - load the same objects again
+ # - make sure they are now in file 0 again
+
+ cache = self.cache
+
+ # Create some objects
+ oid1 = 'abcdefgh'
+ data1 = '1234' * 100
+ serial1 = 'ABCDEFGH'
+ oid2 = 'bcdefghi'
+ data2 = '2345' * 200
+ serial2 = 'BCDEFGHI'
+ version2 = 'myversion'
+ nonversion = 'nada'
+ vdata2 = '5432' * 250
+ vserial2 = 'IHGFEDCB'
+ oid3 = 'cdefghij'
+ data3 = '3456' * 300
+ serial3 = 'CDEFGHIJ'
+
+ # Store them in the cache
+ cache.store(oid1, data1, serial1, '', '', '')
+ cache.store(oid2, data2, serial2, version2, vdata2, vserial2)
+ cache.store(oid3, data3, serial3, '', '', '')
+
+ # Verify that they are in file 0
+ self.assert_(None is not cache._index.get(oid1) > 0)
+ self.assert_(None is not cache._index.get(oid2) > 0)
+ self.assert_(None is not cache._index.get(oid3) > 0)
+
+ # Load them and verify that the loads return correct data
+ self.assertEqual(cache.load(oid1, ''), (data1, serial1))
+ self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+ self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
+ self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
+ self.assertEqual(cache.load(oid3, ''), (data3, serial3))
+
+ # Verify that they are still in file 0
+ self.assert_(None is not cache._index.get(oid1) > 0)
+ self.assert_(None is not cache._index.get(oid2) > 0)
+ self.assert_(None is not cache._index.get(oid3) > 0)
+
+ # Cause a cache flip
+ cache.checkSize(10*self.cachesize)
+
+ # Load o1, o2, o4 again and verify that the loads return correct data
+ self.assertEqual(cache.load(oid1, ''), (data1, serial1))
+ self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
+ self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
+ self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+
+ # Verify that o1, o2, 04 are now in file 1, o3 still in file 0
+ self.assert_(None is not cache._index.get(oid1) < 0)
+ self.assert_(None is not cache._index.get(oid2) < 0)
+ self.assert_(None is not cache._index.get(oid3) > 0)
+
+ # Cause another cache flip
+ cache.checkSize(10*self.cachesize)
+
+ # Load o1 and o2 again and verify that the loads return correct data
+ self.assertEqual(cache.load(oid1, ''), (data1, serial1))
+ self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
+ self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
+ self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+
+ # Verify that o1 and o2 are now back in file 0, o3 is lost
+ self.assert_(None is not cache._index.get(oid1) > 0)
+ self.assert_(None is not cache._index.get(oid2) > 0)
+ self.assert_(None is cache._index.get(oid3))
+
+ # Invalidate version data for o2
+ cache.invalidate(oid2, nonversion)
+ self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+ self.assertEqual(cache.load(oid2, nonversion), None)
+ self.assertEqual(cache.load(oid2, version2), None)
+
+ # Cause another cache flip
+ cache.checkSize(10*self.cachesize)
+
+ # Load o1 and o2 again and verify that the loads return correct data
+ self.assertEqual(cache.load(oid1, ''), (data1, serial1))
+ self.assertEqual(cache.load(oid2, version2), None)
+ self.assertEqual(cache.load(oid2, nonversion), None)
+ self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+
+ # Verify that o1 and o2 are now in file 1
+ self.assert_(None is not cache._index.get(oid1) < 0)
+ self.assert_(None is not cache._index.get(oid2) < 0)
+
+class PersistentClientCacheTests(unittest.TestCase):
+
+ def setUp(self):
+ unittest.TestCase.setUp(self)
+ self.vardir = os.getcwd() # Don't use /tmp, it's a security risk
+ self.cachesize = 10*1000*1000
+ self.storagename = 'foo'
+ self.clientname = 'test'
+ # Predict file names
+ fn0 = 'c%s-%s-0.zec' % (self.storagename, self.clientname)
+ fn1 = 'c%s-%s-1.zec' % (self.storagename, self.clientname)
+ for fn in fn0, fn1:
+ fn = os.path.join(self.vardir, fn)
+ try:
+ os.unlink(fn)
+ except os.error:
+ pass
+ self.openCache()
+
+ def openCache(self):
+ self.cache = ClientCache(storage=self.storagename,
+ size=self.cachesize,
+ client=self.clientname,
+ var=self.vardir)
+ self.cache.open()
+
+ def reopenCache(self):
+ self.cache.close()
+ self.openCache()
+ return self.cache
+
+ def tearDown(self):
+ self.cache.close()
+ for filename in self.cache._p:
+ if filename is not None:
+ try:
+ os.unlink(filename)
+ except os.error:
+ pass
+ unittest.TestCase.tearDown(self)
+
+ def testCacheFileSelection(self):
+ # A bug in __init__ read the wrong slice of the file to determine
+ # the serial number of the first record, reading the
+ # last byte of the data size plus the first seven bytes of the
+ # serial number. This caused random selection of the proper
+ # 'current' file when a persistent cache was opened.
+ cache = self.cache
+ self.assertEqual(cache._current, 0) # Check that file 0 is current
+ oid = 'abcdefgh'
+ data = '1234'
+ serial = 'ABCDEFGH'
+ cache.store(oid, data, serial, '', '', '')
+ cache.checkSize(10*self.cachesize) # Force a file flip
+ self.assertEqual(cache._current, 1) # Check that the flip worked
+ oid = 'abcdefgh'
+ data = '123'
+ serial = 'ABCDEFGZ'
+ cache.store(oid, data, serial, '', '', '')
+ cache = self.reopenCache()
+ loaded = cache.load(oid, '')
+ # Check that we got the most recent data:
+ self.assertEqual(loaded, (data, serial))
+ self.assertEqual(cache._current, 1) # Double check that 1 is current
+
+ def testInvalidationPersists(self):
+ # A bug in invalidate() caused invalidation to overwrite the
+ # 2nd byte of the data size on disk, rather rather than
+ # overwriting the status byte. For certain data sizes this
+ # can be observed by reopening a persistent cache: the
+ # invalidated data will appear valid (but with altered size).
+ cache = self.cache
+ magicsize = (ord('i') + 1) << 16
+ cache = self.cache
+ oid = 'abcdefgh'
+ data = '!'*magicsize
+ serial = 'ABCDEFGH'
+ cache.store(oid, data, serial, '', '', '')
+ loaded = cache.load(oid, '')
+ self.assertEqual(loaded, (data, serial))
+ cache.invalidate(oid, '')
+ cache = self.reopenCache()
+ loaded = cache.load(oid, '')
+ if loaded != None:
+ self.fail("invalidated data resurrected, size %d, was %d" %
+ (len(loaded[0]), len(data)))
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(ClientCacheTests))
+ suite.addTest(unittest.makeSuite(PersistentClientCacheTests))
+ return suite
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
=== Zope3/src/zodb/zeo/tests/test_conn.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/tests/test_conn.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,102 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Test setup for ZEO connection logic.
+
+The actual tests are in ConnectionTests.py; this file provides the
+platform-dependent scaffolding.
+"""
+
+# System imports
+import unittest
+# Import the actual test class
+from zodb.zeo.tests import connection
+
+
+class FileStorageConfig:
+ def getConfig(self, path, create, read_only):
+ return """\
+ <Storage>
+ type FileStorage
+ file_name %s
+ create %s
+ read_only %s
+ </Storage>""" % (path,
+ create and 'yes' or 'no',
+ read_only and 'yes' or 'no')
+
+
+class BerkeleyStorageConfig:
+ def getConfig(self, path, create, read_only):
+ # Full always creates and doesn't have a read_only flag
+ return """\
+ <Storage>
+ type BDBFullStorage
+ name %s
+ read_only %s
+ </Storage>""" % (path,
+ read_only and 'yes' or 'no')
+
+
+class FileStorageConnectionTests(
+ FileStorageConfig,
+ connection.ConnectionTests
+ ):
+ """FileStorage-specific connection tests."""
+
+
+class FileStorageReconnectionTests(
+ FileStorageConfig,
+ connection.ReconnectionTests
+ ):
+ """FileStorage-specific re-connection tests."""
+
+
+class BDBConnectionTests(
+ BerkeleyStorageConfig,
+ connection.ConnectionTests
+ ):
+ """Berkeley storage connection tests."""
+
+
+class BDBReconnectionTests(
+ BerkeleyStorageConfig,
+ connection.ReconnectionTests
+ ):
+ """Berkeley storage re-connection tests."""
+
+
+test_classes = [FileStorageConnectionTests, FileStorageReconnectionTests]
+try:
+ from zodb.storage.bdbfull import BDBFullStorage
+except ImportError:
+ pass
+else:
+ test_classes.append(BDBConnectionTests)
+ test_classes.append(BDBReconnectionTests)
+
+
+def test_suite():
+ # shutup warnings about mktemp
+ import warnings
+ warnings.filterwarnings("ignore", "mktemp")
+
+ suite = unittest.TestSuite()
+ for klass in test_classes:
+ sub = unittest.makeSuite(klass, 'check')
+ suite.addTest(sub)
+ return suite
+
+
+if __name__ == "__main__":
+ unittest.main(defaultTest='test_suite')
=== Zope3/src/zodb/zeo/tests/test_tbuf.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/tests/test_tbuf.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,76 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import random
+import unittest
+
+from zodb.zeo.tbuf import TransactionBuffer
+
+def random_string(size):
+ """Return a random string of size size."""
+ l = [chr(random.randrange(256)) for i in range(size)]
+ return "".join(l)
+
+def new_store_data():
+ """Return arbitrary data to use as argument to store() method."""
+ return random_string(8), '', random_string(random.randrange(1000))
+
+def new_invalidate_data():
+ """Return arbitrary data to use as argument to invalidate() method."""
+ return random_string(8), ''
+
+class TransBufTests(unittest.TestCase):
+
+ def checkTypicalUsage(self):
+ tbuf = TransactionBuffer()
+ tbuf.store(*new_store_data())
+ tbuf.invalidate(*new_invalidate_data())
+ tbuf.begin_iterate()
+ while 1:
+ o = tbuf.next()
+ if o is None:
+ break
+ tbuf.clear()
+
+ def doUpdates(self, tbuf):
+ data = []
+ for i in range(10):
+ d = new_store_data()
+ tbuf.store(*d)
+ data.append(d)
+ d = new_invalidate_data()
+ tbuf.invalidate(*d)
+ data.append(d)
+
+ tbuf.begin_iterate()
+ for i in range(len(data)):
+ x = tbuf.next()
+ if x[2] is None:
+ # the tbuf add a dummy None to invalidates
+ x = x[:2]
+ self.assertEqual(x, data[i])
+
+ def checkOrderPreserved(self):
+ tbuf = TransactionBuffer()
+ self.doUpdates(tbuf)
+
+ def checkReusable(self):
+ tbuf = TransactionBuffer()
+ self.doUpdates(tbuf)
+ tbuf.clear()
+ self.doUpdates(tbuf)
+ tbuf.clear()
+ self.doUpdates(tbuf)
+
+def test_suite():
+ return unittest.makeSuite(TransBufTests, 'check')
=== Zope3/src/zodb/zeo/tests/test_zeo.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/tests/test_zeo.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,192 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Test suite for ZEO based on zodb.tests."""
+
+# System imports
+import os
+import sys
+import time
+import socket
+import asyncore
+import tempfile
+import unittest
+import logging
+
+# ZODB test support
+import zodb
+from zodb.storage.tests.minpo import MinPO
+from zodb.storage.tests.base import zodb_unpickle
+
+
+# ZODB test mixin classes
+from zodb.storage.tests import base, basic, version, \
+ undo, undoversion, \
+ packable, synchronization, conflict, revision, \
+ mt, readonly
+
+# ZEO imports
+from zodb.zeo.client import ClientStorage
+from zodb.zeo.interfaces import Disconnected
+
+# ZEO test support
+from zodb.zeo.tests import forker, cache
+
+# ZEO test mixin classes
+from zodb.zeo.tests import commitlock, threadtests
+
+class DummyDB:
+ def invalidate(self, *args):
+ pass
+
+
+class MiscZEOTests:
+ """ZEO tests that don't fit in elsewhere."""
+
+ def checkLargeUpdate(self):
+ obj = MinPO("X" * (10 * 128 * 1024))
+ self._dostore(data=obj)
+
+ def checkZEOInvalidation(self):
+ addr = self._storage._addr
+ storage2 = ClientStorage(addr, wait=1, min_disconnect_poll=0.1)
+ try:
+ oid = self._storage.new_oid()
+ ob = MinPO('first')
+ revid1 = self._dostore(oid, data=ob)
+ data, serial = storage2.load(oid, '')
+ self.assertEqual(zodb_unpickle(data), MinPO('first'))
+ self.assertEqual(serial, revid1)
+ revid2 = self._dostore(oid, data=MinPO('second'), revid=revid1)
+ for n in range(3):
+ # Let the server and client talk for a moment.
+ # Is there a better way to do this?
+ asyncore.poll(0.1)
+ data, serial = storage2.load(oid, '')
+ self.assertEqual(zodb_unpickle(data), MinPO('second'),
+ 'Invalidation message was not sent!')
+ self.assertEqual(serial, revid2)
+ finally:
+ storage2.close()
+
+
+class GenericTests(
+ # Base class for all ZODB tests
+ base.StorageTestBase,
+ # ZODB test mixin classes (in the same order as imported)
+ basic.BasicStorage,
+ version.VersionStorage,
+ undo.TransactionalUndoStorage,
+ undoversion.TransactionalUndoVersionStorage,
+ packable.PackableStorage,
+ synchronization.SynchronizedStorage,
+ conflict.ConflictResolvingStorage,
+ conflict.ConflictResolvingTransUndoStorage,
+ revision.RevisionStorage,
+ mt.MTStorage,
+ readonly.ReadOnlyStorage,
+ # ZEO test mixin classes (in the same order as imported)
+ cache.StorageWithCache,
+ cache.TransUndoStorageWithCache,
+ commitlock.CommitLockTests,
+ threadtests.ThreadTests,
+ # Locally defined (see above)
+ MiscZEOTests
+ ):
+
+ """Combine tests from various origins in one class."""
+
+ def open(self, read_only=0):
+ # XXX Needed to support ReadOnlyStorage tests. Ought to be a
+ # cleaner way.
+ addr = self._storage._addr
+ self._storage.close()
+ self._storage = ClientStorage(addr, read_only=read_only, wait=1)
+
+ def unresolvable(self, klass):
+ # This helper method is used to test the implementation of
+ # conflict resolution. That code runs in the server, and there
+ # is no way for the test suite (a client) to inquire about it.
+ pass
+
+
+class FileStorageTests(GenericTests):
+ """Test ZEO backed by a FileStorage."""
+
+ def setUp(self):
+ logging.info("testZEO: setUp() %s", self.id())
+ zeoport, adminaddr, pid = forker.start_zeo_server(self.getConfig())
+ self._pids = [pid]
+ self._servers = [adminaddr]
+ self._storage = ClientStorage(zeoport, '1', cache_size=20000000,
+ min_disconnect_poll=0.5, wait=1)
+ self._storage.registerDB(DummyDB())
+
+ def tearDown(self):
+ self._storage.close()
+ for server in self._servers:
+ forker.shutdown_zeo_server(server)
+ if hasattr(os, 'waitpid'):
+ # Not in Windows Python until 2.3
+ for pid in self._pids:
+ os.waitpid(pid, 0)
+
+ def getConfig(self):
+ filename = self.__fs_base = tempfile.mktemp()
+ # Return a 1-tuple
+ return """\
+ <Storage>
+ type FileStorage
+ file_name %s
+ create yes
+ </Storage>
+ """ % filename
+
+
+class BDBTests(FileStorageTests):
+ """ZEO backed by a Berkeley Full storage."""
+
+ def getStorage(self):
+ self._envdir = tempfile.mktemp()
+ # Return a 1-tuple
+ return """\
+ <Storage>
+ type BDBFullStorage
+ name %s
+ </Storage>
+ """ % self._envdir
+
+
+test_classes = [FileStorageTests]
+try:
+ from zodb.storage.bdbfull import BDBFullStorage
+except ImportError:
+ pass
+else:
+ test_classes.append(BDBTests)
+
+
+def test_suite():
+ # shutup warnings about mktemp
+ import warnings
+ warnings.filterwarnings("ignore", "mktemp")
+
+ suite = unittest.TestSuite()
+ for klass in test_classes:
+ sub = unittest.makeSuite(klass, 'check')
+ suite.addTest(sub)
+ return suite
+
+
+if __name__ == "__main__":
+ unittest.main(defaultTest='test_suite')
=== Zope3/src/zodb/zeo/tests/threadtests.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/tests/threadtests.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,169 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Compromising positions involving threads."""
+
+import threading
+
+from zodb.ztransaction import Transaction
+from zodb.storage.tests.base import zodb_pickle, MinPO
+
+from zodb.zeo.client import ClientStorageError
+from zodb.zeo.interfaces import Disconnected
+
+ZERO = '\0'*8
+
+class BasicThread(threading.Thread):
+ def __init__(self, storage, doNextEvent, threadStartedEvent):
+ self.storage = storage
+ self.trans = Transaction()
+ self.doNextEvent = doNextEvent
+ self.threadStartedEvent = threadStartedEvent
+ self.gotValueError = 0
+ self.gotDisconnected = 0
+ threading.Thread.__init__(self)
+ self.setDaemon(1)
+
+ def join(self):
+ threading.Thread.join(self, 10)
+ assert not self.isAlive()
+
+
+class GetsThroughVoteThread(BasicThread):
+ # This thread gets partially through a transaction before it turns
+ # execution over to another thread. We're trying to establish that a
+ # tpc_finish() after a storage has been closed by another thread will get
+ # a ClientStorageError error.
+ #
+ # This class gets does a tpc_begin(), store(), tpc_vote() and is waiting
+ # to do the tpc_finish() when the other thread closes the storage.
+ def run(self):
+ self.storage.tpc_begin(self.trans)
+ oid = self.storage.new_oid()
+ self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
+ self.storage.tpc_vote(self.trans)
+ self.threadStartedEvent.set()
+ self.doNextEvent.wait(10)
+ try:
+ self.storage.tpc_finish(self.trans)
+ except ClientStorageError:
+ self.gotValueError = 1
+ self.storage.tpc_abort(self.trans)
+
+
+class GetsThroughBeginThread(BasicThread):
+ # This class is like the above except that it is intended to be run when
+ # another thread is already in a tpc_begin(). Thus, this thread will
+ # block in the tpc_begin until another thread closes the storage. When
+ # that happens, this one will get disconnected too.
+ def run(self):
+ try:
+ self.storage.tpc_begin(self.trans)
+ except ClientStorageError:
+ self.gotValueError = 1
+
+
+class AbortsAfterBeginFailsThread(BasicThread):
+ # This class is identical to GetsThroughBeginThread except that it
+ # attempts to tpc_abort() after the tpc_begin() fails. That will raise a
+ # ClientDisconnected exception which implies that we don't have the lock,
+ # and that's what we really want to test (but it's difficult given the
+ # threading module's API).
+ def run(self):
+ try:
+ self.storage.tpc_begin(self.trans)
+ except ClientStorageError:
+ self.gotValueError = 1
+ try:
+ self.storage.tpc_abort(self.trans)
+ except Disconnected:
+ self.gotDisconnected = 1
+
+
+class ThreadTests:
+ # Thread 1 should start a transaction, but not get all the way through it.
+ # Main thread should close the connection. Thread 1 should then get
+ # disconnected.
+ def checkDisconnectedOnThread2Close(self):
+ doNextEvent = threading.Event()
+ threadStartedEvent = threading.Event()
+ thread1 = GetsThroughVoteThread(self._storage,
+ doNextEvent, threadStartedEvent)
+ thread1.start()
+ threadStartedEvent.wait(10)
+ self._storage.close()
+ doNextEvent.set()
+ thread1.join()
+ self.assertEqual(thread1.gotValueError, 1)
+
+ # Thread 1 should start a transaction, but not get all the way through
+ # it. While thread 1 is in the middle of the transaction, a second thread
+ # should start a transaction, and it will block in the tcp_begin() --
+ # because thread 1 has acquired the lock in its tpc_begin(). Now the main
+ # thread closes the storage and both sub-threads should get disconnected.
+ def checkSecondBeginFails(self):
+ doNextEvent = threading.Event()
+ threadStartedEvent = threading.Event()
+ thread1 = GetsThroughVoteThread(self._storage,
+ doNextEvent, threadStartedEvent)
+ thread2 = GetsThroughBeginThread(self._storage,
+ doNextEvent, threadStartedEvent)
+ thread1.start()
+ threadStartedEvent.wait(1)
+ thread2.start()
+ self._storage.close()
+ doNextEvent.set()
+ thread1.join()
+ thread2.join()
+ self.assertEqual(thread1.gotValueError, 1)
+ self.assertEqual(thread2.gotValueError, 1)
+
+ def checkThatFailedBeginDoesNotHaveLock(self):
+ doNextEvent = threading.Event()
+ threadStartedEvent = threading.Event()
+ thread1 = GetsThroughVoteThread(self._storage,
+ doNextEvent, threadStartedEvent)
+ thread2 = AbortsAfterBeginFailsThread(self._storage,
+ doNextEvent, threadStartedEvent)
+ thread1.start()
+ threadStartedEvent.wait(1)
+ thread2.start()
+ self._storage.close()
+ doNextEvent.set()
+ thread1.join()
+ thread2.join()
+ self.assertEqual(thread1.gotValueError, 1)
+ self.assertEqual(thread2.gotValueError, 1)
+ self.assertEqual(thread2.gotDisconnected, 1)
+
+ # Run a bunch of threads doing small and large stores in parallel
+ def checkMTStores(self):
+ threads = []
+ for i in range(5):
+ t = threading.Thread(target=self.mtstorehelper)
+ threads.append(t)
+ t.start()
+ for t in threads:
+ t.join(30)
+ for i in threads:
+ self.failUnless(not t.isAlive())
+
+ # Helper for checkMTStores
+ def mtstorehelper(self):
+ name = threading.currentThread().getName()
+ objs = []
+ for i in range(10):
+ objs.append(MinPO("X" * 200000))
+ objs.append(MinPO("X"))
+ for obj in objs:
+ self._dostore(data=obj)
=== Zope3/src/zodb/zeo/tests/zeoserver.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/tests/zeoserver.py Wed Dec 25 09:12:22 2002
@@ -0,0 +1,161 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Helper file used to launch a ZEO server cross platform"""
+
+import os
+import sys
+import errno
+import getopt
+import random
+import socket
+import logging
+import asyncore
+
+import ZConfig
+from zodb import config
+import zodb.zeo.server
+from zodb.zeo import threadedasync
+
+
+def load_storage(fp):
+ rootconf = ZConfig.loadfile(fp)
+ storageconf = rootconf.getSection('Storage')
+ return config.createStorage(storageconf)
+
+
+def cleanup(storage):
+ # FileStorage and the Berkeley storages have this method, which deletes
+ # all files and directories used by the storage. This prevents @-files
+ # from clogging up /tmp
+ try:
+ storage.cleanup()
+ except AttributeError:
+ pass
+
+
+class ZEOTestServer(asyncore.dispatcher):
+ """A server for killing the whole process at the end of a test.
+
+ The first time we connect to this server, we write an ack character down
+ the socket. The other end should block on a recv() of the socket so it
+ can guarantee the server has started up before continuing on.
+
+ The second connect to the port immediately exits the process, via
+ os._exit(), without writing data on the socket. It does close and clean
+ up the storage first. The other end will get the empty string from its
+ recv() which will be enough to tell it that the server has exited.
+
+ I think this should prevent us from ever getting a legitimate addr-in-use
+ error.
+ """
+ __super_init = asyncore.dispatcher.__init__
+
+ def __init__(self, addr, storage, keep):
+ self.__super_init()
+ self._storage = storage
+ self._keep = keep
+ # Count down to zero, the number of connects
+ self._count = 1
+ # Create a logger
+ self.logger = logging.getLogger('zeoserver.%d.%s' %
+ (os.getpid(), addr))
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ # Some ZEO tests attempt a quick start of the server using the same
+ # port so we have to set the reuse flag.
+ self.set_reuse_addr()
+ try:
+ self.bind(addr)
+ except:
+ # We really want to see these exceptions
+ import traceback
+ traceback.print_exc()
+ raise
+ self.listen(5)
+ self.logger.info('bound and listening')
+
+ def handle_accept(self):
+ sock, addr = self.accept()
+ self.logger.info('in handle_accept()')
+ # When we're done with everything, close the storage. Do not write
+ # the ack character until the storage is finished closing.
+ if self._count <= 0:
+ self.logger.info('closing the storage')
+ self._storage.close()
+ if not self._keep:
+ cleanup(self._storage)
+ self.logger.info('exiting')
+ os._exit(0)
+ self.logger.info('continuing')
+ sock.send('X')
+ self._count -= 1
+
+
+def main():
+
+ # Initialize the logging module.
+ import logging.config
+ logging.basicConfig()
+ level = os.getenv("LOGGING")
+ if level:
+ level = int(level)
+ else:
+ level = logging.CRITICAL
+ logging.root.setLevel(level)
+
+ # Create a logger
+ logger = logging.getLogger('zeoserver.%d' % os.getpid())
+ logger.info('starting')
+
+ # We don't do much sanity checking of the arguments, since if we get it
+ # wrong, it's a bug in the test suite.
+ ro_svr = False
+ keep = False
+ configfile = None
+ # Parse the arguments and let getopt.error percolate
+ opts, args = getopt.getopt(sys.argv[1:], 'rkC:')
+ for opt, arg in opts:
+ if opt == '-r':
+ ro_svr = True
+ elif opt == '-k':
+ keep = True
+ elif opt == '-C':
+ configfile = arg
+ # Open the config file and let ZConfig parse the data there. Then remove
+ # the config file, otherwise we'll leave turds.
+ fp = open(configfile, 'r')
+ storage = load_storage(fp)
+ fp.close()
+ os.remove(configfile)
+ # The rest of the args are hostname, portnum
+ zeo_port = int(args[0])
+ test_port = zeo_port + 1
+ try:
+ logger.info('creating the test server, ro: %s, keep: %s',
+ ro_svr, keep)
+ t = ZEOTestServer(('', test_port), storage, keep)
+ except socket.error, e:
+ if e[0] <> errno.EADDRINUSE: raise
+ logger.info('addr in use, closing and exiting')
+ storage.close()
+ cleanup(storage)
+ sys.exit(2)
+ addr = ('', zeo_port)
+ logger.info('creating the storage server')
+ serv = zodb.zeo.server.StorageServer(addr, {'1': storage}, ro_svr)
+ logger.info('entering threadedasync loop')
+ threadedasync.loop()
+
+
+if __name__ == '__main__':
+ main()