[Zodb-checkins] CVS: ZEO/ZEO/tests - Cache.py:1.1.2.3 forker.py:1.1.2.5 multi.py:1.1.2.6 speed.py:1.1.2.3 testZEO.py:1.1.2.13
Jeremy Hylton
jeremy@zope.com
Thu, 20 Dec 2001 13:21:09 -0500
Update of /cvs-repository/ZEO/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv27019/ZEO/tests
Modified Files:
Tag: ZEO-ZRPC-Dev
Cache.py forker.py multi.py speed.py testZEO.py
Log Message:
Merge tests from the ZEO head.
XXX Three of these tests are known to fail. Apparently the client
does not properly reconnect to the server.
=== ZEO/ZEO/tests/Cache.py 1.1.2.2 => 1.1.2.3 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL). A copy of the ZPL should accompany this
+# distribution. THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
+# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
+# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
+
"""Tests of the ZEO cache"""
from ZODB.Transaction import Transaction
@@ -13,7 +22,17 @@
revid = self._dostore(oid, revid=revid, data=MinPO(25))
info = self._storage.undoInfo()
+ if not info:
+ # XXX perhaps we have an old storage implementation that
+ # does do the negative nonsense
+ info = self._storage.undoInfo(0, 20)
tid = info[0]['id']
+
+ # We may need to bail at this point if the storage doesn't
+ # support transactional undo
+ if not self._storage.supportsTransactionalUndo():
+ return
+
# Now start an undo transaction
self._transaction.note('undo1')
self._storage.tpc_begin(self._transaction)
@@ -50,7 +69,7 @@
obj = zodb_unpickle(data)
assert obj == MinPO(2), obj
- def checkCommitVersionInvalidation(self):
+ def checkCommitEmptyVersionInvalidation(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(1))
revid = self._dostore(oid, revid=revid, data=MinPO(2))
@@ -62,5 +81,20 @@
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
data, revid = self._storage.load(oid, "")
+ obj = zodb_unpickle(data)
+ assert obj == MinPO(3), obj
+
+ def checkCommitVersionInvalidation(self):
+ oid = self._storage.new_oid()
+ revid = self._dostore(oid, data=MinPO(1))
+ revid = self._dostore(oid, revid=revid, data=MinPO(2))
+ revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
+ t = Transaction()
+ self._storage.tpc_begin(t)
+ self._storage.commitVersion("foo", "bar", t)
+ self._storage.load(oid, "")
+ self._storage.tpc_vote(t)
+ self._storage.tpc_finish(t)
+ data, revid = self._storage.load(oid, "bar")
obj = zodb_unpickle(data)
assert obj == MinPO(3), obj
=== ZEO/ZEO/tests/forker.py 1.1.2.4 => 1.1.2.5 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL). A copy of the ZPL should accompany this
+# distribution. THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
+# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
+# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
+
"""Library for forking storage server and connecting client storage"""
import asyncore
-import atexit
import os
import profile
+import random
+import socket
import sys
-import time
import types
-import ThreadedAsync
import ZEO.ClientStorage, ZEO.StorageServer
PROFILE = 0
-class ZEOServerExit(asyncore.file_dispatcher):
- """Used to exit ZEO.StorageServer when run is done"""
+def get_port():
+ """Return a port that is not in use.
- def writable(self):
- return 0
+ Checks if a port is in use by trying to connect to it. Assumes it
+ is not in use if connect raises an exception.
- def readable(self):
- return 1
+ Raises RuntimeError after 10 tries.
+ """
+ for i in range(10):
+ port = random.randrange(20000, 30000)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ try:
+ s.connect(('localhost', port))
+ except socket.error:
+ # XXX check value of error?
+ return port
+ finally:
+ s.close()
+ raise RuntimeError, "Can't find port"
+
+if os.name == "nt":
+
+ def start_zeo_server(storage_name, args, port=None):
+ """Start a ZEO server in a separate process.
+
+ Returns the ZEO port, the test server port, and the pid.
+ """
+ import ZEO.tests.winserver
+ if port is None:
+ port = get_port()
+ script = ZEO.tests.winserver.__file__
+ if script.endswith('.pyc'):
+ script = script[:-1]
+ args = (sys.executable, script, str(port), storage_name) + args
+ d = os.environ.copy()
+ d['PYTHONPATH'] = os.pathsep.join(sys.path)
+ pid = os.spawnve(os.P_NOWAIT, sys.executable, args, os.environ)
+ return ('localhost', port), ('localhost', port + 1), pid
+
+else:
+
+ class ZEOServerExit(asyncore.file_dispatcher):
+ """Used to exit ZEO.StorageServer when run is done"""
+
+ def writable(self):
+ return 0
+
+ def readable(self):
+ return 1
+
+ def handle_read(self):
+ buf = self.recv(4)
+ if buf:
+ assert buf == "done"
+ asyncore.socket_map.clear()
- def handle_read(self):
- buf = self.recv(4)
- if buf:
- assert buf == "done"
+ def handle_close(self):
asyncore.socket_map.clear()
-
- def handle_close(self):
- asyncore.socket_map.clear()
-
-class ZEOClientExit:
- """Used by client to cause server to exit"""
- def __init__(self, pipe):
- self.pipe = pipe
-
- def close(self):
- os.write(self.pipe, "done")
-
-def start_zeo_server(storage, addr):
- rd, wr = os.pipe()
- pid = os.fork()
- if pid == 0:
- if PROFILE:
- p = profile.Profile()
- p.runctx("run_server(storage, addr, rd, wr)", globals(),
- locals())
- p.dump_stats("stats.s.%d" % os.getpid())
- else:
- run_server(storage, addr, rd, wr)
- os._exit(0)
- else:
- os.close(rd)
- return pid, ZEOClientExit(wr)
-
-def run_server(storage, addr, rd, wr):
- # in the child, run the storage server
- os.close(wr)
- ZEOServerExit(rd)
- serv = ZEO.StorageServer.StorageServer(addr, {'1':storage})
- asyncore.loop()
- storage.close()
- if isinstance(addr, types.StringType):
- os.unlink(addr)
-
-def start_zeo(storage, cache=None, cleanup=None, domain="AF_INET",
- storage_id="1"):
- """Setup ZEO client-server for storage.
- Returns a ClientStorage instance and a ZEOClientExit instance.
-
- XXX Don't know if os.pipe() will work on Windows.
- """
+ class ZEOClientExit:
+ """Used by client to cause server to exit"""
+ def __init__(self, pipe):
+ self.pipe = pipe
+
+ def close(self):
+ os.write(self.pipe, "done")
+ os.close(self.pipe)
+
+ def start_zeo_server(storage, addr):
+ rd, wr = os.pipe()
+ pid = os.fork()
+ if pid == 0:
+ if PROFILE:
+ p = profile.Profile()
+ p.runctx("run_server(storage, addr, rd, wr)", globals(),
+ locals())
+ p.dump_stats("stats.s.%d" % os.getpid())
+ else:
+ run_server(storage, addr, rd, wr)
+ os._exit(0)
+ else:
+ os.close(rd)
+ return pid, ZEOClientExit(wr)
- if domain == "AF_INET":
- import random
- addr = '', random.randrange(2000, 3000)
- elif domain == "AF_UNIX":
- import tempfile
- addr = tempfile.mktemp()
- else:
- raise ValueError, "bad domain: %s" % domain
+ def run_server(storage, addr, rd, wr):
+ # in the child, run the storage server
+ os.close(wr)
+ ZEOServerExit(rd)
+ serv = ZEO.StorageServer.StorageServer(addr, {'1':storage})
+ asyncore.loop()
+ os.close(rd)
+ storage.close()
+ if isinstance(addr, types.StringType):
+ os.unlink(addr)
+
+ def start_zeo(storage, cache=None, cleanup=None, domain="AF_INET",
+ storage_id="1", cache_size=20000000):
+ """Setup ZEO client-server for storage.
+
+ Returns a ClientStorage instance and a ZEOClientExit instance.
+
+ XXX Don't know if os.pipe() will work on Windows.
+ """
+
+ if domain == "AF_INET":
+ addr = '', get_port()
+ elif domain == "AF_UNIX":
+ import tempfile
+ addr = tempfile.mktemp()
+ else:
+ raise ValueError, "bad domain: %s" % domain
- pid, exit = start_zeo_server(storage, addr)
- try:
+ pid, exit = start_zeo_server(storage, addr)
s = ZEO.ClientStorage.ClientStorage(addr, storage_id,
- debug=1, client=cache)
- if hasattr(s, 'is_connected'):
- while not s.is_connected():
- time.sleep(0.1)
- except Exception, err:
- raise RuntimeError, (err, exit, pid)
- return s, exit, pid
+ debug=1, client=cache,
+ cache_size=cache_size,
+ min_disconnect_poll=0.5)
+ return s, exit, pid
=== ZEO/ZEO/tests/multi.py 1.1.2.5 => 1.1.2.6 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL). A copy of the ZPL should accompany this
+# distribution. THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
+# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
+# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
+
"""A multi-client test of the ZEO storage server"""
import ZODB, ZODB.DB, ZODB.FileStorage, ZODB.POSException
@@ -11,8 +20,6 @@
import time
import types
-PROFILE = 0
-
VERBOSE = 1
CLIENTS = 4
RECORDS_PER_CLIENT = 100
@@ -118,7 +125,9 @@
t0 = time.time()
server_pid, server = start_server(addr)
t1 = time.time()
- pids = [start_client(addr, client_func) for i in range(CLIENTS)]
+ pids = []
+ for i in range(CLIENTS):
+ pids.append(start_client(addr, client_func))
for pid in pids:
assert type(pid) == types.IntType, "invalid pid type: %s (%s)" % \
(repr(pid), type(pid))
=== ZEO/ZEO/tests/speed.py 1.1.2.2 => 1.1.2.3 ===
-#
-# Zope Public License (ZPL) Version 1.0
-# -------------------------------------
-#
-# Copyright (c) Digital Creations. All rights reserved.
-#
-# This license has been certified as Open Source(tm).
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# 1. Redistributions in source code must retain the above copyright
-# notice, this list of conditions, and the following disclaimer.
-#
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions, and the following disclaimer in
-# the documentation and/or other materials provided with the
-# distribution.
-#
-# 3. Digital Creations requests that attribution be given to Zope
-# in any manner possible. Zope includes a "Powered by Zope"
-# button that is installed by default. While it is not a license
-# violation to remove this button, it is requested that the
-# attribution remain. A significant investment has been put
-# into Zope, and this effort will continue if the Zope community
-# continues to grow. This is one way to assure that growth.
-#
-# 4. All advertising materials and documentation mentioning
-# features derived from or use of this software must display
-# the following acknowledgement:
-#
-# "This product includes software developed by Digital Creations
-# for use in the Z Object Publishing Environment
-# (http://www.zope.org/)."
-#
-# In the event that the product being advertised includes an
-# intact Zope distribution (with copyright and license included)
-# then this clause is waived.
-#
-# 5. Names associated with Zope or Digital Creations must not be used to
-# endorse or promote products derived from this software without
-# prior written permission from Digital Creations.
-#
-# 6. Modified redistributions of any form whatsoever must retain
-# the following acknowledgment:
-#
-# "This product includes software developed by Digital Creations
-# for use in the Z Object Publishing Environment
-# (http://www.zope.org/)."
-#
-# Intact (re-)distributions of any official Zope release do not
-# require an external acknowledgement.
-#
-# 7. Modifications are encouraged but must be packaged separately as
-# patches to official Zope releases. Distributions that do not
-# clearly separate the patches from the original work must be clearly
-# labeled as unofficial distributions. Modifications which do not
-# carry the name Zope may be packaged in any form, as long as they
-# conform to all of the clauses above.
-#
-#
-# Disclaimer
-#
-# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
-# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
-# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
-# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
-#
-#
-# This software consists of contributions made by Digital Creations and
-# many individuals on behalf of Digital Creations. Specific
-# attributions are listed in the accompanying credits file.
-#
-##############################################################################
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL). A copy of the ZPL should accompany this
+# distribution. THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
+# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
+# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
+
usage="""Test speed of a ZODB storage
Options:
@@ -151,6 +76,7 @@
for j in range(nrep):
for r in 1, 10, 100, 1000:
t = time.time()
+ conflicts = 0
jar = db.open()
while 1:
@@ -171,7 +97,7 @@
setattr(p, str(i), v)
get_transaction().commit()
except ConflictError:
- pass
+ conflicts = conflicts + 1
else:
break
jar.close()
@@ -179,10 +105,11 @@
t = time.time() - t
if detailed:
if threadno is None:
- print "%s\t%s\t%.4f" % (j, r, t)
+ print "%s\t%s\t%.4f\t%d" % (j, r, t, conflicts)
else:
- print "%s\t%s\t%.4f\t%d" % (j, r, t, threadno)
- results[r] = results[r] + t
+ print "%s\t%s\t%.4f\t%d\t%d" % (j, r, t, conflicts,
+ threadno)
+ results[r].append((t, conflicts))
rt=d=p=v=None # release all references
if minimize:
time.sleep(3)
@@ -230,8 +157,6 @@
else:
fs = ZODB.FileStorage.FileStorage(fs_name, create=1)
s, server, pid = forker.start_zeo(fs, domain=domain)
- while not s.is_connected():
- time.sleep(0.1)
data=open(data).read()
db=ZODB.DB(s,
@@ -240,13 +165,15 @@
cache_deactivate_after=6000,)
print "Beginning work..."
- results={1:0, 10:0, 100:0, 1000:0}
+ results={1:[], 10:[], 100:[], 1000:[]}
if threads > 1:
import threading
- l = [threading.Thread(target=work,
- args=(db, results, nrep, compress, data,
- detailed, minimize, i))
- for i in range(threads)]
+ l = []
+ for i in range(threads):
+ t = threading.Thread(target=work,
+ args=(db, results, nrep, compress, data,
+ detailed, minimize, i))
+ l.append(t)
for t in l:
t.start()
for t in l:
@@ -261,9 +188,19 @@
if detailed:
print '-'*24
+ print "num\tmean\tmin\tmax"
for r in 1, 10, 100, 1000:
- t=results[r]/(nrep * threads)
- print "mean:\t%s\t%.4f\t%.4f (s/o)" % (r, t, t/r)
+ times = []
+ for time, conf in results[r]:
+ times.append(time)
+ t = mean(times)
+ print "%d\t%.4f\t%.4f\t%.4f" % (r, t, min(times), max(times))
+
+def mean(l):
+ tot = 0
+ for v in l:
+ tot = tot + v
+ return tot / len(l)
##def compress(s):
## c = zlib.compressobj()
=== ZEO/ZEO/tests/testZEO.py 1.1.2.12 => 1.1.2.13 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL). A copy of the ZPL should accompany this
+# distribution. THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
+# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
+# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
+
"""Test suite for ZEO based on ZODB.tests"""
import asyncore
import os
+import random
+import socket
+import sys
import tempfile
import time
import types
@@ -10,19 +22,34 @@
import ZEO.ClientStorage, ZEO.StorageServer
import ThreadedAsync, ZEO.trigger
from ZODB.FileStorage import FileStorage
+import thread
-from ZEO import zeolog
from ZEO.tests import forker, Cache
+from ZEO.smac import Disconnected
# Sorry Jim...
from ZODB.tests import StorageTestBase, BasicStorage, VersionStorage, \
TransactionalUndoStorage, TransactionalUndoVersionStorage, \
- PackableStorage, Synchronization, ConflictResolution
+ PackableStorage, Synchronization, ConflictResolution, RevisionStorage
from ZODB.tests.MinPO import MinPO
-
+from ZODB.tests.StorageTestBase import zodb_unpickle
ZERO = '\0'*8
+class DummyDB:
+ def invalidate(self, *args):
+ pass
+
+class PackWaitWrapper:
+ def __init__(self, storage):
+ self.storage = storage
+
+ def __getattr__(self, attr):
+ return getattr(self.storage, attr)
+
+ def pack(self, t, f):
+ self.storage.pack(t, f, wait=1)
+
class ZEOTestBase(StorageTestBase.StorageTestBase):
"""Version of the storage test class that supports ZEO.
@@ -31,31 +58,6 @@
will get no later than the return value from vote.
"""
- __super_setUp = StorageTestBase.StorageTestBase.setUp
- __super_tearDown = StorageTestBase.StorageTestBase.tearDown
-
- def setUp(self):
- """Start a ZEO server using a Unix domain socket
-
- The ZEO server uses the storage object returned by the
- getStorage() method.
- """
- self.running = 1
- client, exit, pid = forker.start_zeo(self.getStorage())
- self._pid = pid
- self._server = exit
- self._storage = client
- while not self._storage.is_connected():
- time.sleep(0.1)
- self.__super_setUp()
-
- def tearDown(self):
- """Try to cause the tests to halt"""
- self.running = 0
- self._server.close()
- os.waitpid(self._pid, 0)
- self.__super_tearDown()
-
def _dostore(self, oid=None, revid=None, data=None, version=None,
already_pickled=0):
"""Do a complete storage transaction.
@@ -101,24 +103,37 @@
raise RuntimeError, "unexpected ZEO response: no oid"
else:
for oid, serial in r:
+ if isinstance(serial, Exception):
+ raise serial
d[oid] = serial
return d
- def checkLargeUpdate(self):
- obj = MinPO("X" * (10 * 128 * 1024))
- self._dostore(data=obj)
+# Some of the ZEO tests depend on the version of FileStorage available
+# for the tests. If we run these tests using Zope 2.3, FileStorage
+# doesn't support TransactionalUndo.
+
+if hasattr(FileStorage, 'supportsTransactionalUndo'):
+ # XXX Assume that a FileStorage that supports transactional undo
+ # also supports conflict resolution.
+ class VersionDependentTests(
+ TransactionalUndoStorage.TransactionalUndoStorage,
+ TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
+ ConflictResolution.ConflictResolvingStorage,
+ ConflictResolution.ConflictResolvingTransUndoStorage):
+ pass
+else:
+ class VersionDependentTests:
+ pass
class GenericTests(ZEOTestBase,
+ VersionDependentTests,
Cache.StorageWithCache,
Cache.TransUndoStorageWithCache,
BasicStorage.BasicStorage,
VersionStorage.VersionStorage,
+ RevisionStorage.RevisionStorage,
PackableStorage.PackableStorage,
Synchronization.SynchronizedStorage,
- ConflictResolution.ConflictResolvingStorage,
- ConflictResolution.ConflictResolvingTransUndoStorage,
- TransactionalUndoStorage.TransactionalUndoStorage,
- TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
):
"""An abstract base class for ZEO tests
@@ -128,8 +143,37 @@
returns a specific storage, e.g. FileStorage.
"""
+ __super_setUp = StorageTestBase.StorageTestBase.setUp
+ __super_tearDown = StorageTestBase.StorageTestBase.tearDown
+
+ def setUp(self):
+ """Start a ZEO server using a Unix domain socket
+
+ The ZEO server uses the storage object returned by the
+ getStorage() method.
+ """
+ self.running = 1
+ client, exit, pid = forker.start_zeo(self.getStorage())
+ self._pid = pid
+ self._server = exit
+ self._storage = PackWaitWrapper(client)
+ client.registerDB(DummyDB(), None)
+ self.__super_setUp()
+
+ def tearDown(self):
+ """Try to cause the tests to halt"""
+ self.running = 0
+ self._storage.close()
+ self._server.close()
+ os.waitpid(self._pid, 0)
+ self.delStorage()
+ self.__super_tearDown()
+
+ def checkLargeUpdate(self):
+ obj = MinPO("X" * (10 * 128 * 1024))
+ self._dostore(data=obj)
+
class ZEOFileStorageTests(GenericTests):
- """Tests of storage behavior using FileStorage underneath ZEO"""
__super_setUp = GenericTests.setUp
def setUp(self):
@@ -140,32 +184,269 @@
return FileStorage(self.__fs_base, create=1)
def delStorage(self):
- # file storage appears to create three files
+ # file storage appears to create four files
+ for ext in '', '.index', '.lock', '.tmp':
+ path = self.__fs_base + ext
+ try:
+ os.remove(path)
+ except os.error:
+ pass
+
+class WindowsGenericTests(GenericTests):
+ """Subclass to support server creation on Windows.
+
+ On Windows, the getStorage() design won't work because the storage
+ can't be created in the parent process and passed to the child.
+ All the work has to be done in the server's process.
+ """
+ __super_setUp = StorageTestBase.StorageTestBase.setUp
+ __super_tearDown = StorageTestBase.StorageTestBase.tearDown
+
+ def setUp(self):
+ self.__super_setUp()
+ args = self.getStorageInfo()
+ name = args[0]
+ args = args[1:]
+ zeo_addr, self.test_addr, self.test_pid = \
+ forker.start_zeo_server(name, args)
+ storage = ZEO.ClientStorage.ClientStorage(zeo_addr, debug=1,
+ min_disconnect_poll=0.1)
+ self._storage = PackWaitWrapper(storage)
+ storage.registerDB(DummyDB(), None)
+
+ def tearDown(self):
+ self._storage.close()
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(self.test_addr)
+ s.close()
+ # the connection should cause the storage server to die
+## os.waitpid(self.test_pid, 0)
+ time.sleep(0.5)
+ self.delStorage()
+ self.__super_tearDown()
+
+class WindowsZEOFileStorageTests(WindowsGenericTests):
+
+ def getStorageInfo(self):
+ self.__fs_base = tempfile.mktemp()
+ return 'FileStorage', self.__fs_base, '1'
+
+ def delStorage(self):
+ # file storage appears to create four files
for ext in '', '.index', '.lock', '.tmp':
path = self.__fs_base + ext
- os.unlink(path)
+ try:
+ os.remove(path)
+ except os.error:
+ pass
-class OtherZEOTests(unittest.TestCase):
- """Tests of ZEO that are basically independent of the storage"""
+class ConnectionTests(ZEOTestBase):
+ """Tests that explicitly manage the server process.
- def checkBadStorageID(self):
- tmp = tempfile.mktemp()
- fs = FileStorage(tmp, create=1)
- try:
- client, exit, pid = forker.start_zeo(fs, storage_id="gotcha")
- except RuntimeError, msg:
- err, exit, pid = msg
- assert isinstance(err, ValueError)
- else:
- assert 0, "Established ZEO connection with invalid storage id"
- exit.close()
- os.waitpid(pid, 0)
- fs.close()
+ To test the cache or re-connection, these test cases explicit
+ start and stop a ZEO storage server.
+ """
+
+ __super_tearDown = StorageTestBase.StorageTestBase.tearDown
+
+ ports = []
+ for i in range(200):
+ ports.append(random.randrange(25000, 30000))
+ del i
+
+ def openClientStorage(self, cache='', cache_size=200000, wait=1):
+ # defined by subclasses
+ pass
+
+ def shutdownServer(self):
+ # defined by subclasses
+ pass
+
+ def tearDown(self):
+ """Try to cause the tests to halt"""
+ self.shutdownServer()
+ # file storage appears to create four files
for ext in '', '.index', '.lock', '.tmp':
- path = tmp + ext
+ path = self.file + ext
if os.path.exists(path):
os.unlink(path)
-
+ for i in 0, 1:
+ path = "c1-test-%d.zec" % i
+ if os.path.exists(path):
+ os.unlink(path)
+ self.__super_tearDown()
+
+ def checkBasicPersistence(self):
+ """Verify cached data persists across client storage instances.
+
+ To verify that the cache is being used, the test closes the
+ server and then starts a new client with the server down.
+ """
+ self._storage = self.openClientStorage('test', 100000, 1)
+ oid = self._storage.new_oid()
+ obj = MinPO(12)
+ revid1 = self._dostore(oid, data=obj)
+ self._storage.close()
+ self.shutdownServer()
+ self._storage = self.openClientStorage('test', 100000, 0)
+ data, revid2 = self._storage.load(oid, '')
+ assert zodb_unpickle(data) == MinPO(12)
+ assert revid1 == revid2
+ self._storage.close()
+
+ def checkRollover(self):
+ """Check that the cache works when the files are swapped.
+
+ In this case, only one object fits in a cache file. When the
+ cache files swap, the first object is effectively uncached.
+ """
+ self._storage = self.openClientStorage('test', 1000, 1)
+ oid1 = self._storage.new_oid()
+ obj1 = MinPO("1" * 500)
+ revid1 = self._dostore(oid1, data=obj1)
+ oid2 = self._storage.new_oid()
+ obj2 = MinPO("2" * 500)
+ revid2 = self._dostore(oid2, data=obj2)
+ self._storage.close()
+ self.shutdownServer()
+ self._storage = self.openClientStorage('test', 1000, 0)
+ self._storage.load(oid2, '')
+ self.assertRaises(Disconnected, self._storage.load, oid1, '')
+
+ def checkReconnection(self):
+ """Check that the client reconnects when a server restarts."""
+
+ from ZEO.ClientStorage import ClientDisconnected
+ self._storage = self.openClientStorage()
+ oid = self._storage.new_oid()
+ obj = MinPO(12)
+ revid1 = self._dostore(oid, data=obj)
+ self.shutdownServer()
+ self.running = 1
+ self._startServer(create=0)
+ oid = self._storage.new_oid()
+ obj = MinPO(12)
+ while 1:
+ try:
+ revid1 = self._dostore(oid, data=obj)
+ except (ClientDisconnected, thread.error, socket.error), err:
+ get_transaction().abort()
+ time.sleep(0.1)
+ else:
+ break
+ # XXX This is a bloody pain. We're placing a heavy burden
+ # on users to catch a plethora of exceptions in order to
+ # write robust code. Need to think about implementing
+ # John Heintz's suggestion to make sure all exceptions
+ # inherit from POSException.
+
+class UnixConnectionTests(ConnectionTests):
+ __super_setUp = StorageTestBase.StorageTestBase.setUp
+
+ def setUp(self):
+ """Start a ZEO server using a Unix domain socket
+
+ The ZEO server uses the storage object returned by the
+ getStorage() method.
+ """
+ self.running = 1
+ self.file = tempfile.mktemp()
+ self.addr = '', self.ports.pop()
+ self._startServer()
+ self.__super_setUp()
+
+ def _startServer(self, create=1):
+ fs = FileStorage(self.file, create=create)
+ self._pid, self._server = forker.start_zeo_server(fs, self.addr)
+
+ def openClientStorage(self, cache='', cache_size=200000, wait=1):
+ base = ZEO.ClientStorage.ClientStorage(self.addr,
+ client=cache,
+ cache_size=cache_size,
+ wait_for_server_on_startup=wait)
+ storage = PackWaitWrapper(base)
+ storage.registerDB(DummyDB(), None)
+ return storage
+
+ def shutdownServer(self):
+ if self.running:
+ self.running = 0
+ self._server.close()
+ os.waitpid(self._pid, 0)
+
+class WindowsConnectionTests(ConnectionTests):
+ __super_setUp = StorageTestBase.StorageTestBase.setUp
+
+ def setUp(self):
+ self.file = tempfile.mktemp()
+ self._startServer()
+ self.__super_setUp()
+
+ def _startServer(self, create=1):
+ if create == 0:
+ port = self.addr[1]
+ else:
+ port = None
+ self.addr, self.test_a, pid = forker.start_zeo_server('FileStorage',
+ (self.file,
+ str(create)),
+ port)
+ self.running = 1
+
+ def openClientStorage(self, cache='', cache_size=200000, wait=1):
+ base = ZEO.ClientStorage.ClientStorage(self.addr,
+ client=cache,
+ cache_size=cache_size,
+ debug=1,
+ wait_for_server_on_startup=wait)
+ storage = PackWaitWrapper(base)
+ storage.registerDB(DummyDB(), None)
+ return storage
+
+ def shutdownServer(self):
+ if self.running:
+ self.running = 0
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(self.test_a)
+ s.close()
+ time.sleep(1.0)
+
+ def tearDown(self):
+ self.shutdownServer()
+
+
+def get_methods(klass):
+ l = [klass]
+ meth = {}
+ while l:
+ klass = l.pop(0)
+ for base in klass.__bases__:
+ l.append(base)
+ for k, v in klass.__dict__.items():
+ if callable(v):
+ meth[k] = 1
+ return meth.keys()
+
+if os.name == "posix":
+ test_classes = ZEOFileStorageTests, UnixConnectionTests
+elif os.name == "nt":
+ test_classes = WindowsZEOFileStorageTests, WindowsConnectionTests
+else:
+ raise RuntimeError, "unsupported os: %s" % os.name
+
+def makeTestSuite(testname=''):
+ suite = unittest.TestSuite()
+ name = 'check' + testname
+ lname = len(name)
+ for klass in test_classes:
+ for meth in get_methods(klass):
+ if meth[:lname] == name:
+ suite.addTest(klass(meth))
+ return suite
+
+def test_suite():
+ return makeTestSuite()
+
def main():
import sys, getopt
@@ -177,11 +458,10 @@
name_of_test = val
if args:
- print >> sys.stderr, "Did not expect arguments. Got %s" % args
+ print "Did not expect arguments. Got %s" % args
return 0
- tests = unittest.makeSuite(ZEOFileStorageTests, 'check' + name_of_test)
- tests.addTest(OtherZEOTests("checkBadStorageID"))
+ tests = makeTestSuite(name_of_test)
runner = unittest.TextTestRunner()
runner.run(tests)