[Zodb-checkins] CVS: Zope3/src/zodb/zeo/tests - thread.py:1.2.6.1 common.py:1.2.6.1 zeoserver.py:1.7.8.1 threadtests.py:1.4.4.1 test_zeo.py:1.7.4.2 test_conn.py:1.3.8.1 test_cache.py:1.2.12.1 forker.py:1.4.8.1 connection.py:1.5.4.1 commitlock.py:1.3.4.1
Jeremy Hylton
jeremy@zope.com
Wed, 12 Mar 2003 16:41:57 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo/tests
In directory cvs.zope.org:/tmp/cvs-serv29357/zeo/tests
Modified Files:
Tag: opaque-pickles-branch
zeoserver.py threadtests.py test_zeo.py test_conn.py
test_cache.py forker.py connection.py commitlock.py
Added Files:
Tag: opaque-pickles-branch
thread.py common.py
Log Message:
Update from trunk.
Resolve a few import conflicts.
=== Added File Zope3/src/zodb/zeo/tests/thread.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A Thread base class for use with unittest."""
from cStringIO import StringIO
import threading
import traceback
class TestThread(threading.Thread):
__super_init = threading.Thread.__init__
__super_run = threading.Thread.run
def __init__(self, testcase, group=None, target=None, name=None,
args=(), kwargs={}, verbose=None):
self.__super_init(group, target, name, args, kwargs, verbose)
self.setDaemon(1)
self._testcase = testcase
def run(self):
try:
self.testrun()
except Exception:
s = StringIO()
traceback.print_exc(file=s)
self._testcase.fail("Exception in thread %s:\n%s\n" %
(self, s.getvalue()))
def cleanup(self, timeout=15):
self.join(timeout)
if self.isAlive():
self._testcase.fail("Thread did not finish: %s" % self)
=== Added File Zope3/src/zodb/zeo/tests/common.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Common helper classes for testing."""
import threading
from zodb.zeo.client import ClientStorage
class TestClientStorage(ClientStorage):
def verify_cache(self, stub):
self.end_verify = threading.Event()
self.verify_result = super(TestClientStorage, self).verify_cache(stub)
def endVerify(self):
super(TestClientStorage, self).endVerify()
self.end_verify.set()
class DummyDB:
def invalidate(self, *args, **kws):
pass
=== Zope3/src/zodb/zeo/tests/zeoserver.py 1.7 => 1.7.8.1 ===
--- Zope3/src/zodb/zeo/tests/zeoserver.py:1.7 Mon Jan 27 14:44:14 2003
+++ Zope3/src/zodb/zeo/tests/zeoserver.py Wed Mar 12 16:41:21 2003
@@ -29,24 +29,12 @@
import zodb.zeo.server
from zodb.zeo import threadedasync
-
def load_storage(fp):
context = ZConfig.Context.Context()
rootconf = context.loadFile(fp)
storageconf = rootconf.getSection('Storage')
return config.createStorage(storageconf)
-
-def cleanup(storage):
- # FileStorage and the Berkeley storages have this method, which deletes
- # all files and directories used by the storage. This prevents @-files
- # from clogging up /tmp
- try:
- storage.cleanup()
- except AttributeError:
- pass
-
-
class ZEOTestServer(asyncore.dispatcher):
"""A server for killing the whole process at the end of a test.
@@ -64,10 +52,10 @@
"""
__super_init = asyncore.dispatcher.__init__
- def __init__(self, addr, storage, keep):
+ def __init__(self, addr, server, keep):
self.__super_init()
self._sockets = [self]
- self._storage = storage
+ self._server = server
self._keep = keep
# Count down to zero, the number of connects
self._count = 1
@@ -95,9 +83,10 @@
# the ack character until the storage is finished closing.
if self._count <= 0:
self.logger.info('closing the storage')
- self._storage.close()
+ self._server.close_server()
if not self._keep:
- cleanup(self._storage)
+ for storage in self._server.storages.values():
+ storage.cleanup()
self.logger.info('exiting')
# Close all the other sockets so that we don't have to wait
# for os._exit() to get to it before starting the next
@@ -137,8 +126,11 @@
ro_svr = False
keep = False
configfile = None
+ invalidation_queue_size = 100
+ transaction_timeout = None
+ monitor_address = None
# Parse the arguments and let getopt.error percolate
- opts, args = getopt.getopt(sys.argv[1:], 'rkC:')
+ opts, args = getopt.getopt(sys.argv[1:], 'rkC:Q:T:m:')
for opt, arg in opts:
if opt == '-r':
ro_svr = True
@@ -146,6 +138,12 @@
keep = True
elif opt == '-C':
configfile = arg
+ elif opt == '-Q':
+ invalidation_queue_size = int(arg)
+ elif opt == '-T':
+ transaction_timeout = int(arg)
+ elif opt == '-m':
+ monitor_address = '', int(arg)
# Open the config file and let ZConfig parse the data there. Then remove
# the config file, otherwise we'll leave turds.
fp = open(configfile, 'r')
@@ -156,19 +154,24 @@
zeo_port = int(args[0])
test_port = zeo_port + 1
test_addr = ('', test_port)
+ addr = ('', zeo_port)
+ serv = zodb.zeo.server.StorageServer(
+ addr, {'1': storage}, ro_svr,
+ invalidation_queue_size=invalidation_queue_size,
+ transaction_timeout=transaction_timeout,
+ monitor_address=monitor_address)
try:
logger.info('creating the test server, ro: %s, keep: %s',
ro_svr, keep)
- t = ZEOTestServer(test_addr, storage, keep)
+ t = ZEOTestServer(test_addr, serv, keep)
except socket.error, e:
if e[0] <> errno.EADDRINUSE: raise
logger.info('addr in use, closing and exiting')
storage.close()
- cleanup(storage)
+ storage.cleanup()
sys.exit(2)
addr = ('', zeo_port)
logger.info('creating the storage server')
- serv = zodb.zeo.server.StorageServer(addr, {'1': storage}, ro_svr)
t.register_socket(serv.dispatcher)
# Loop for socket events
logger.info('entering threadedasync loop')
=== Zope3/src/zodb/zeo/tests/threadtests.py 1.4 => 1.4.4.1 ===
--- Zope3/src/zodb/zeo/tests/threadtests.py:1.4 Wed Feb 5 18:28:22 2003
+++ Zope3/src/zodb/zeo/tests/threadtests.py Wed Mar 12 16:41:21 2003
@@ -19,7 +19,7 @@
from zodb.storage.tests.base import zodb_pickle, MinPO
from zodb.zeo.client import ClientStorageError
-from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.interfaces import ClientDisconnected
ZERO = '\0'*8
@@ -86,7 +86,7 @@
self.gotValueError = 1
try:
self.storage.tpcAbort(self.trans)
- except Disconnected:
+ except ClientDisconnected:
self.gotDisconnected = 1
class MTStoresThread(threading.Thread):
@@ -142,24 +142,6 @@
thread2.join()
self.assertEqual(thread1.gotValueError, 1)
self.assertEqual(thread2.gotValueError, 1)
-
- def checkThatFailedBeginDoesNotHaveLock(self):
- doNextEvent = threading.Event()
- threadStartedEvent = threading.Event()
- thread1 = GetsThroughVoteThread(self._storage,
- doNextEvent, threadStartedEvent)
- thread2 = AbortsAfterBeginFailsThread(self._storage,
- doNextEvent, threadStartedEvent)
- thread1.start()
- threadStartedEvent.wait(1)
- thread2.start()
- self._storage.close()
- doNextEvent.set()
- thread1.join()
- thread2.join()
- self.assertEqual(thread1.gotValueError, 1)
- self.assertEqual(thread2.gotValueError, 1)
- self.assertEqual(thread2.gotDisconnected, 1)
# Run a bunch of threads doing small and large stores in parallel
def checkMTStores(self):
=== Zope3/src/zodb/zeo/tests/test_zeo.py 1.7.4.1 => 1.7.4.2 ===
--- Zope3/src/zodb/zeo/tests/test_zeo.py:1.7.4.1 Mon Mar 10 14:40:48 2003
+++ Zope3/src/zodb/zeo/tests/test_zeo.py Wed Mar 12 16:41:21 2003
@@ -31,26 +31,18 @@
# ZODB test mixin classes
-from zodb.storage.tests import base, basic, version, \
- undo, undoversion, \
- packable, synchronization, conflict, revision, \
- mt, readonly
+from zodb.storage.tests import base, basic, version, undo, undoversion, \
+ packable, synchronization, conflict, revision, mt, readonly
-# ZEO imports
from zodb.zeo.client import ClientStorage
-from zodb.zeo.interfaces import Disconnected
-
-# ZEO test support
from zodb.zeo.tests import forker, cache
-
-# ZEO test mixin classes
from zodb.zeo.tests import commitlock, threadtests
+from zodb.zeo.tests.common import TestClientStorage, DummyDB
class DummyDB:
def invalidate(self, *args):
pass
-
class MiscZEOTests:
"""ZEO tests that don't fit in elsewhere."""
@@ -60,7 +52,7 @@
def checkZEOInvalidation(self):
addr = self._storage._addr
- storage2 = ClientStorage(addr, wait=1, min_disconnect_poll=0.1)
+ storage2 = TestClientStorage(addr, wait=True, min_disconnect_poll=0.1)
try:
oid = self._storage.newObjectId()
ob = MinPO('first')
@@ -80,59 +72,38 @@
finally:
storage2.close()
+class ZEOConflictTests(
+ conflict.ConflictResolvingStorage,
+ conflict.ConflictResolvingTransUndoStorage):
+
+ def unresolvable(self, klass):
+ # This helper method is used to test the implementation of
+ # conflict resolution. That code runs in the server, and there
+ # is no way for the test suite (a client) to inquire about it.
+ return False
-class GenericTests(
+class StorageTests(
# Base class for all ZODB tests
base.StorageTestBase,
# ZODB test mixin classes
basic.BasicStorage,
- version.VersionStorage,
- undo.TransactionalUndoStorage,
- undoversion.TransactionalUndoVersionStorage,
- packable.PackableStorage,
- synchronization.SynchronizedStorage,
- conflict.ConflictResolvingStorage,
- conflict.ConflictResolvingTransUndoStorage,
- revision.RevisionStorage,
- mt.MTStorage,
readonly.ReadOnlyStorage,
+ synchronization.SynchronizedStorage,
# ZEO test mixin classes
- cache.StorageWithCache,
- cache.TransUndoStorageWithCache,
commitlock.CommitLockTests,
threadtests.ThreadTests,
# Locally defined (see above)
MiscZEOTests
):
-
- """Combine tests from various origins in one class."""
-
- def open(self, read_only=0):
- # XXX Needed to support ReadOnlyStorage tests. Ought to be a
- # cleaner way.
- addr = self._storage._addr
- self._storage.close()
- self._storage = ClientStorage(addr, read_only=read_only, wait=1)
-
- _open = open
-
- def unresolvable(self, klass):
- # This helper method is used to test the implementation of
- # conflict resolution. That code runs in the server, and there
- # is no way for the test suite (a client) to inquire about it.
- pass
-
-
-class FileStorageTests(GenericTests):
- """Test ZEO backed by a FileStorage."""
+ """Tests for storage that supports IStorage."""
def setUp(self):
logging.info("testZEO: setUp() %s", self.id())
zeoport, adminaddr, pid = forker.start_zeo_server(self.getConfig())
self._pids = [pid]
self._servers = [adminaddr]
- self._storage = ClientStorage(zeoport, '1', cache_size=20000000,
- min_disconnect_poll=0.5, wait=1)
+ self._storage = TestClientStorage(zeoport, '1', cache_size=20000000,
+ min_disconnect_poll=0.5, wait=1)
self._storage.registerDB(DummyDB())
def tearDown(self):
@@ -146,9 +117,39 @@
for pid in self._pids:
os.waitpid(pid, 0)
+ def open(self, read_only=False):
+ # XXX Needed to support ReadOnlyStorage tests. Ought to be a
+ # cleaner way.
+ addr = self._storage._addr
+ self._storage.close()
+ self._storage = TestClientStorage(addr, read_only=read_only, wait=True)
+
+class UndoVersionStorageTests(
+ StorageTests,
+ ZEOConflictTests,
+ cache.StorageWithCache,
+ cache.TransUndoStorageWithCache,
+ commitlock.CommitLockUndoTests,
+ mt.MTStorage,
+ packable.PackableStorage,
+ revision.RevisionStorage,
+ undo.TransactionalUndoStorage,
+ undoversion.TransactionalUndoVersionStorage,
+ version.VersionStorage,
+ ):
+ """Tests for storage that supports IUndoStorage and IVersionStorage."""
+
+ # XXX Some of the pack tests should really be run for the mapping
+ # storage, but the pack tests assume that the storage also supports
+ # multiple revisions.
+
+class FileStorageTests(UndoVersionStorageTests):
+ """Test ZEO backed by a FileStorage."""
+
+ level = 2
+
def getConfig(self):
filename = self.__fs_base = tempfile.mktemp()
- # Return a 1-tuple
return """\
<Storage>
type FileStorage
@@ -157,13 +158,13 @@
</Storage>
""" % filename
-
-class BDBTests(FileStorageTests):
+class BDBTests(UndoVersionStorageTests):
"""ZEO backed by a Berkeley Full storage."""
- def getStorage(self):
+ level = 2
+
+ def getConfig(self):
self._envdir = tempfile.mktemp()
- # Return a 1-tuple
return """\
<Storage>
type BDBFullStorage
@@ -171,25 +172,35 @@
</Storage>
""" % self._envdir
+ # XXX These test seems to have massive failures when I run them.
+ # I don't think they should fail, but need Barry's help to debug.
+
+ def checkCommitLockUndoClose(self):
+ pass
+
+ def checkCommitLockUndoAbort(self):
+ pass
+
+class MappingStorageTests(StorageTests):
+
+ def getConfig(self):
+ self._envdir = tempfile.mktemp()
+ return """\
+ <Storage>
+ type MappingStorage
+ name %s
+ </Storage>
+ """ % self._envdir
-test_classes = [FileStorageTests]
+test_classes = [FileStorageTests, MappingStorageTests]
from zodb.storage.base import berkeley_is_available
if berkeley_is_available:
test_classes.append(BDBTests)
-
def test_suite():
- # shutup warnings about mktemp
- import warnings
- warnings.filterwarnings("ignore", "mktemp")
-
suite = unittest.TestSuite()
for klass in test_classes:
sub = unittest.makeSuite(klass, 'check')
suite.addTest(sub)
return suite
-
-
-if __name__ == "__main__":
- unittest.main(defaultTest='test_suite')
=== Zope3/src/zodb/zeo/tests/test_conn.py 1.3 => 1.3.8.1 ===
--- Zope3/src/zodb/zeo/tests/test_conn.py:1.3 Wed Jan 22 16:46:53 2003
+++ Zope3/src/zodb/zeo/tests/test_conn.py Wed Mar 12 16:41:21 2003
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -17,11 +17,10 @@
platform-dependent scaffolding.
"""
-# System imports
import unittest
-# Import the actual test class
-from zodb.zeo.tests import connection
+from zodb.zeo.tests.connection import ConnectionTests, ReconnectionTests
+from zodb.storage.base import berkeley_is_available
class FileStorageConfig:
def getConfig(self, path, create, read_only):
@@ -35,7 +34,6 @@
create and 'yes' or 'no',
read_only and 'yes' or 'no')
-
class BerkeleyStorageConfig:
def getConfig(self, path, create, read_only):
# Full always creates and doesn't have a read_only flag
@@ -47,54 +45,33 @@
</Storage>""" % (path,
read_only and 'yes' or 'no')
+class MappingStorageConfig:
+ def getConfig(self, path, create, read_only):
+ return """\
+ <Storage>
+ type MappingStorage
+ name %s
+ </Storage>""" % path
-class FileStorageConnectionTests(
- FileStorageConfig,
- connection.ConnectionTests
- ):
- """FileStorage-specific connection tests."""
-
-
-class FileStorageReconnectionTests(
- FileStorageConfig,
- connection.ReconnectionTests
- ):
- """FileStorage-specific re-connection tests."""
-
-
-class BDBConnectionTests(
- BerkeleyStorageConfig,
- connection.ConnectionTests
- ):
- """Berkeley storage connection tests."""
-
-
-class BDBReconnectionTests(
- BerkeleyStorageConfig,
- connection.ReconnectionTests
- ):
- """Berkeley storage re-connection tests."""
-
-
-test_classes = [FileStorageConnectionTests, FileStorageReconnectionTests]
-
-from zodb.storage.base import berkeley_is_available
+tests = [
+ (MappingStorageConfig, ConnectionTests, 1),
+ (FileStorageConfig, ReconnectionTests, 1),
+ (FileStorageConfig, ConnectionTests, 2),
+ ]
+
if berkeley_is_available:
- test_classes.append(BDBConnectionTests)
- test_classes.append(BDBReconnectionTests)
-
+ tests += [
+ (BerkeleyStorageConfig, ConnectionTests, 2),
+ (BerkeleyStorageConfig, ReconnectionTests, 2),
+ ]
def test_suite():
- # shutup warnings about mktemp
- import warnings
- warnings.filterwarnings("ignore", "mktemp")
-
suite = unittest.TestSuite()
- for klass in test_classes:
- sub = unittest.makeSuite(klass, 'check')
+ for testclass, configclass, level in tests:
+ # synthesize a concrete class combining tests and configuration
+ name = "%s:%s" % (testclass.__name__, configclass.__name__)
+ aclass = type.__new__(type, name, (configclass, testclass, object), {})
+ aclass.level = level
+ sub = unittest.makeSuite(aclass, "check")
suite.addTest(sub)
return suite
-
-
-if __name__ == "__main__":
- unittest.main(defaultTest='test_suite')
=== Zope3/src/zodb/zeo/tests/test_cache.py 1.2 => 1.2.12.1 ===
--- Zope3/src/zodb/zeo/tests/test_cache.py:1.2 Wed Dec 25 09:12:22 2002
+++ Zope3/src/zodb/zeo/tests/test_cache.py Wed Mar 12 16:41:21 2003
@@ -16,7 +16,6 @@
At times, we do 'white box' testing, i.e. we know about the internals
of the ClientCache object.
"""
-from __future__ import nested_scopes
import os
import time
=== Zope3/src/zodb/zeo/tests/forker.py 1.4 => 1.4.8.1 ===
--- Zope3/src/zodb/zeo/tests/forker.py:1.4 Mon Jan 27 14:44:14 2003
+++ Zope3/src/zodb/zeo/tests/forker.py Wed Mar 12 16:41:21 2003
@@ -51,8 +51,8 @@
raise RuntimeError, "Can't find port"
-def start_zeo_server(conf, addr=None, ro_svr=False, keep=False,
- admin_retries=10):
+def start_zeo_server(conf, addr=None, ro_svr=False, monitor=False, keep=False,
+ invq=None, timeout=None):
"""Start a ZEO server in a separate process.
Returns the ZEO port, the test server port, and the pid.
@@ -72,11 +72,19 @@
if script.endswith('.pyc'):
script = script[:-1]
# Create a list of arguments, which we'll tuplify below
- args = [sys.executable, script, '-C', tmpfile]
+ qa = _quote_arg
+ args = [qa(sys.executable), qa(script), '-C', qa(tmpfile)]
if ro_svr:
args.append('-r')
if keep:
args.append('-k')
+ if invq:
+ args += ['-Q', str(invq)]
+ if timeout:
+ args += ['-T', str(timeout)]
+ if monitor:
+ # XXX Is it safe to reuse the port?
+ args += ['-m', '42000']
args.append(str(port))
d = os.environ.copy()
d['PYTHONPATH'] = os.pathsep.join(sys.path)
@@ -86,7 +94,7 @@
# Always do a sleep as the first thing, since we don't expect
# the spawned process to get started right away.
delay = 0.25
- for i in range(admin_retries):
+ for i in range(10):
time.sleep(delay)
logging.debug('forker: connect %s', i)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -107,6 +115,12 @@
raise
return ('localhost', port), adminaddr, pid
+if sys.platform[:3].lower() == "win":
+ def _quote_arg(s):
+ return '"%s"' % s
+else:
+ def _quote_arg(s):
+ return s
def shutdown_zeo_server(adminaddr):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
=== Zope3/src/zodb/zeo/tests/connection.py 1.5 => 1.5.4.1 ===
--- Zope3/src/zodb/zeo/tests/connection.py:1.5 Wed Feb 5 18:28:22 2003
+++ Zope3/src/zodb/zeo/tests/connection.py Wed Mar 12 16:41:21 2003
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -24,9 +24,10 @@
import logging
from zodb.zeo.client import ClientStorage
-from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.interfaces import ClientDisconnected
from zodb.zeo.zrpc.marshal import Marshaller
from zodb.zeo.tests import forker
+from zodb.zeo.tests.common import TestClientStorage, DummyDB
from transaction import get_transaction
from zodb.ztransaction import Transaction
@@ -36,22 +37,17 @@
from zodb.storage.tests.base import zodb_pickle, zodb_unpickle
from zodb.storage.tests.base import handle_all_serials, ZERO
-
-class DummyDB:
- def invalidate(self, *args, **kws):
- pass
-
-
class CommonSetupTearDown(StorageTestBase):
- """Tests that explicitly manage the server process.
-
- To test the cache or re-connection, these test cases explicit
- start and stop a ZEO storage server.
- """
+ """Common boilerplate"""
__super_setUp = StorageTestBase.setUp
__super_tearDown = StorageTestBase.tearDown
+ keep = 0
+ invq = None
+ timeout = None
+ monitor = 0
+
def setUp(self):
"""Test setup for connection tests.
@@ -67,7 +63,7 @@
self._pids = []
self._servers = []
self._newAddr()
- self.startServer(keep=self.keep)
+ self.startServer()
def tearDown(self):
"""Try to cause the tests to halt"""
@@ -99,21 +95,18 @@
# port+1 is also used, so only draw even port numbers
return 'localhost', random.randrange(25000, 30000, 2)
- def getConfig(self):
- raise NotImplementedError
-
def openClientStorage(self, cache='', cache_size=200000, wait=True,
read_only=False, read_only_fallback=False,
addr=None):
if addr is None:
addr = self.addr
- storage = ClientStorage(addr,
- client=cache,
- cache_size=cache_size,
- wait=wait,
- min_disconnect_poll=0.1,
- read_only=read_only,
- read_only_fallback=read_only_fallback)
+ storage = TestClientStorage(addr,
+ client=cache,
+ cache_size=cache_size,
+ wait=wait,
+ min_disconnect_poll=0.1,
+ read_only=read_only,
+ read_only_fallback=read_only_fallback)
storage.registerDB(DummyDB())
return storage
@@ -122,15 +115,18 @@
# it fails with an error.
forker_admin_retries = 10
- def startServer(self, create=True, index=0,
- read_only=False, ro_svr=False, keep=False):
+ # Concrete test classes must provide a getConfig() method
+
+ def startServer(self, create=True, index=0, read_only=False, ro_svr=False,
+ keep=False):
addr = self.addr[index]
self.logger.warn("startServer(create=%d, index=%d, read_only=%d) @ %s",
create, index, read_only, addr)
path = "%s.%d" % (self.file, index)
conf = self.getConfig(path, create, read_only)
zeoport, adminaddr, pid = forker.start_zeo_server(
- conf, addr, ro_svr, keep, self.forker_admin_retries)
+ conf, addr, ro_svr,
+ self.monitor, self.keep, self.invq, self.timeout)
self._pids.append(pid)
self._servers.append(adminaddr)
@@ -142,11 +138,13 @@
forker.shutdown_zeo_server(adminaddr)
self._servers[index] = None
- def pollUp(self, timeout=30.0):
+ def pollUp(self, timeout=30.0, storage=None):
# Poll until we're connected
+ if storage is None:
+ storage = self._storage
now = time.time()
giveup = now + timeout
- while not self._storage.is_connected():
+ while not storage.is_connected():
asyncore.poll(0.1)
now = time.time()
if now > giveup:
@@ -164,7 +162,11 @@
class ConnectionTests(CommonSetupTearDown):
- keep = False
+ """Tests that explicitly manage the server process.
+
+ To test the cache or re-connection, these test cases explicit
+ start and stop a ZEO storage server.
+ """
def checkMultipleAddresses(self):
for i in range(4):
@@ -193,8 +195,9 @@
try:
self._dostore()
break
- except Disconnected:
+ except ClientDisconnected:
time.sleep(0.5)
+ self._storage.sync()
def checkReadOnlyClient(self):
# Open a read-only client to a read-write server; stores fail
@@ -256,7 +259,7 @@
# Poll until the client disconnects
self.pollDown()
# Stores should fail now
- self.assertRaises(Disconnected, self._dostore)
+ self.assertRaises(ClientDisconnected, self._dostore)
# Restart the server
self.startServer(create=False)
@@ -266,12 +269,29 @@
self._dostore()
def checkDisconnectionError(self):
- # Make sure we get a Disconnected when we try to read an
+ # Make sure we get a ClientDisconnected when we try to read an
# object when we're not connected to a storage server and the
# object is not in the cache.
self.shutdownServer()
self._storage = self.openClientStorage('test', 1000, wait=False)
- self.assertRaises(Disconnected, self._storage.load, 'fredwash', '')
+ self.assertRaises(ClientDisconnected,
+ self._storage.load, 'fredwash', '')
+
+ def checkDisconnectedAbort(self):
+ self._storage = self.openClientStorage()
+ self._dostore()
+ oids = [self._storage.newObjectId() for i in range(5)]
+ txn = Transaction()
+ self._storage.tpcBegin(txn)
+ for oid in oids:
+ data = zodb_pickle(MinPO(oid))
+ self._storage.store(oid, None, data, '', txn)
+ self.shutdownServer()
+ self.assertRaises(ClientDisconnected, self._storage.tpcVote, txn)
+ self._storage.tpcAbort(txn)
+ self.startServer(create=0)
+ self._storage._wait()
+ self._dostore()
def checkBasicPersistence(self):
# Verify cached data persists across client storage instances.
@@ -335,18 +355,14 @@
try:
self._dostore(oid, data=obj)
break
- except (Disconnected, select.error,
- threading.ThreadError, socket.error):
+ except ClientDisconnected:
self.logger.warn("checkReconnection: "
"Error after server restart; retrying.",
exc_info=True)
get_transaction().abort()
- time.sleep(0.1) # XXX how long to sleep
- # XXX This is a bloody pain. We're placing a heavy burden
- # on users to catch a plethora of exceptions in order to
- # write robust code. Need to think about implementing
- # John Heintz's suggestion to make sure all exceptions
- # inherit from POSException.
+ self._storage.sync()
+ else:
+ self.fail("Could not reconnect to server")
self.logger.warn("checkReconnection: finished")
def checkBadMessage1(self):
@@ -377,7 +393,7 @@
try:
self._dostore()
- except Disconnected:
+ except ClientDisconnected:
pass
else:
self._storage.close()
@@ -427,6 +443,7 @@
class ReconnectionTests(CommonSetupTearDown):
keep = True
forker_admin_retries = 20
+ invq = 2
def checkReadOnlyStorage(self):
# Open a read-only client to a read-only *storage*; stores fail
@@ -493,7 +510,7 @@
# Poll until the client disconnects
self.pollDown()
# Stores should fail now
- self.assertRaises(Disconnected, self._dostore)
+ self.assertRaises(ClientDisconnected, self._dostore)
# Restart the server
self.startServer(create=False, read_only=True)
@@ -522,7 +539,7 @@
# Poll until the client disconnects
self.pollDown()
# Stores should fail now
- self.assertRaises(Disconnected, self._dostore)
+ self.assertRaises(ClientDisconnected, self._dostore)
# Restart the server, this time read-write
self.startServer(create=False)
@@ -554,12 +571,142 @@
try:
self._dostore()
break
- except (Disconnected, ReadOnlyError,
- select.error, threading.ThreadError, socket.error):
+ except (ClientDisconnected, ReadOnlyError):
time.sleep(0.1)
+ self._storage.sync()
else:
self.fail("Couldn't store after starting a read-write server")
+ def checkNoVerificationOnServerRestart(self):
+ self._storage = self.openClientStorage()
+ # When we create a new storage, it should always do a full
+ # verification
+ self.assertEqual(self._storage.verify_result, "full verification")
+ self._dostore()
+ self.shutdownServer()
+ self.pollDown()
+ self._storage.verify_result = None
+ self.startServer(create=0)
+ self.pollUp()
+ # There were no transactions committed, so no verification
+ # should be needed.
+ self.assertEqual(self._storage.verify_result, "no verification")
+
+ def checkNoVerificationOnServerRestartWith2Clients(self):
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "full verification")
+
+ self._storage = self.openClientStorage()
+ oid = self._storage.newObjectId()
+ # When we create a new storage, it should always do a full
+ # verification
+ self.assertEqual(self._storage.verify_result, "full verification")
+ # do two storages of the object to make sure an invalidation
+ # message is generated
+ revid = self._dostore(oid)
+ self._dostore(oid, revid)
+
+ perstorage.load(oid, '')
+
+ self.shutdownServer()
+
+ self.pollDown()
+ self._storage.verify_result = None
+ perstorage.verify_result = None
+ self.startServer(create=0)
+ self.pollUp()
+ self.pollUp(storage=perstorage)
+ # There were no transactions committed, so no verification
+ # should be needed.
+ self.assertEqual(self._storage.verify_result, "no verification")
+ self.assertEqual(perstorage.verify_result, "no verification")
+ perstorage.close()
+
+ def checkQuickVerificationWith2Clients(self):
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "full verification")
+
+ self._storage = self.openClientStorage()
+ oid = self._storage.newObjectId()
+ # When we create a new storage, it should always do a full
+ # verification
+ self.assertEqual(self._storage.verify_result, "full verification")
+ # do two storages of the object to make sure an invalidation
+ # message is generated
+ revid = self._dostore(oid)
+ revid = self._dostore(oid, revid)
+
+ perstorage.load(oid, '')
+ perstorage.close()
+
+ revid = self._dostore(oid, revid)
+
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "quick verification")
+
+ self.assertEqual(perstorage.load(oid, ''),
+ self._storage.load(oid, ''))
+
+
+
+ def checkVerificationWith2ClientsInvqOverflow(self):
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "full verification")
+
+ self._storage = self.openClientStorage()
+ oid = self._storage.newObjectId()
+ # When we create a new storage, it should always do a full
+ # verification
+ self.assertEqual(self._storage.verify_result, "full verification")
+ # do two storages of the object to make sure an invalidation
+ # message is generated
+ revid = self._dostore(oid)
+ revid = self._dostore(oid, revid)
+
+ perstorage.load(oid, '')
+ perstorage.close()
+
+ # the test code sets invq bound to 2
+ for i in range(5):
+ revid = self._dostore(oid, revid)
+
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "full verification")
+ t = time.time() + 30
+ while not perstorage.end_verify.isSet():
+ perstorage.sync()
+ if time.time() > t:
+ self.fail("timed out waiting for endVerify")
+
+ self.assertEqual(self._storage.load(oid, '')[1], revid)
+ self.assertEqual(perstorage.load(oid, ''),
+ self._storage.load(oid, ''))
+
+ perstorage.close()
+
+class TimeoutTests(CommonSetupTearDown):
+ timeout = 1
+
+ def checkTimeout(self):
+ storage = self.openClientStorage()
+ txn = Transaction()
+ storage.tpcBegin(txn)
+ storage.tpcVote(txn)
+ time.sleep(2)
+ self.assertRaises(ClientDisconnected, storage.tpcFinish, txn)
+
+ def checkTimeoutOnAbort(self):
+ storage = self.openClientStorage()
+ txn = Transaction()
+ storage.tpcBegin(txn)
+ storage.tpcVote(txn)
+ storage.tpcAbort(txn)
+
+ def checkTimeoutOnAbortNoLock(self):
+ storage = self.openClientStorage()
+ txn = Transaction()
+ storage.tpcBegin(txn)
+ storage.tpcAbort(txn)
class MSTThread(threading.Thread):
=== Zope3/src/zodb/zeo/tests/commitlock.py 1.3 => 1.3.4.1 ===
--- Zope3/src/zodb/zeo/tests/commitlock.py:1.3 Wed Feb 5 18:28:22 2003
+++ Zope3/src/zodb/zeo/tests/commitlock.py Wed Mar 12 16:41:21 2003
@@ -21,44 +21,18 @@
from zodb.storage.tests.base import zodb_pickle, MinPO
from zodb.zeo.client import ClientStorage
-from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.interfaces import ClientDisconnected
+from zodb.zeo.tests.thread import TestThread
+from zodb.zeo.tests.common import DummyDB
ZERO = '\0'*8
-class DummyDB:
- def invalidate(self, *args):
- pass
-
-class TestThread(threading.Thread):
- __super_init = threading.Thread.__init__
- __super_run = threading.Thread.run
-
- def __init__(self, testcase, group=None, target=None, name=None,
- args=(), kwargs={}, verbose=None):
- self.__super_init(group, target, name, args, kwargs, verbose)
- self.setDaemon(1)
- self._testcase = testcase
-
- def run(self):
- try:
- self.testrun()
- except Exception:
- s = StringIO()
- traceback.print_exc(file=s)
- self._testcase.fail("Exception in thread %s:\n%s\n" %
- (self, s.getvalue()))
-
- def cleanup(self, timeout=15):
- self.join(timeout)
- if self.isAlive():
- self._testcase.fail("Thread did not finish: %s" % self)
-
class WorkerThread(TestThread):
# run the entire test in a thread so that the blocking call for
# tpc_vote() doesn't hang the test suite.
- def __init__(self, testcase, storage, trans, method="tpc_finish"):
+ def __init__(self, testcase, storage, trans, method="tpcFinish"):
self.storage = storage
self.trans = trans
self.method = method
@@ -74,53 +48,45 @@
oid = self.storage.newObjectId()
p = zodb_pickle(MinPO("c"))
self.storage.store(oid, ZERO, p, '', self.trans)
- self.ready.set()
- self.storage.tpcVote(self.trans)
- if self.method == "tpc_finish":
+ self.myvote()
+ if self.method == "tpcFinish":
self.storage.tpcFinish(self.trans)
else:
self.storage.tpcAbort(self.trans)
- except Disconnected:
+ except ClientDisconnected:
pass
-class CommitLockTests:
-
- # The commit lock tests verify that the storage successfully
- # blocks and restarts transactions when there is content for a
- # single storage. There are a lot of cases to cover.
-
- # CommitLock1 checks the case where a single transaction delays
- # other transactions before they actually block. IOW, by the time
- # the other transactions get to the vote stage, the first
- # transaction has finished.
+ def myvote(self):
+ # The vote() call is synchronous, which makes it difficult to
+ # coordinate the action of multiple threads that all call
+ # vote(). This method sends the vote call, then sets the
+ # event saying vote was called, then waits for the vote
+ # response. It digs deep into the implementation of the client.
+
+ # This method is a replacement for:
+ # self.ready.set()
+ # self.storage.tpc_vote(self.trans)
+
+ rpc = self.storage._server.rpc
+ msgid = rpc._deferred_call("tpcVote", self.storage._serial)
+ self.ready.set()
+ rpc._deferred_wait(msgid)
+ self.storage._check_serials()
- def checkCommitLock1OnCommit(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
- finally:
- self._cleanup()
+class CommitLockTests:
- def checkCommitLock1OnAbort(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
- finally:
- self._cleanup()
+ NUM_CLIENTS = 5
- def checkCommitLock2OnCommit(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
- finally:
- self._cleanup()
-
- def checkCommitLock2OnAbort(self):
- self._storages = []
- try:
- self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
- finally:
- self._cleanup()
+ # The commit lock tests verify that the storage successfully
+ # blocks and restarts transactions when there is contention for a
+ # single storage. There are a lot of cases to cover. transaction
+ # has finished.
+
+ # The general flow of these tests is to start a transaction by
+ # getting far enough into 2PC to acquire the commit lock. Then
+ # begin one or more other connections that also want to commit.
+ # This causes the commit lock code to be exercised. Once the
+ # other connections are started, the first transaction completes.
def _cleanup(self):
for store, trans in self._storages:
@@ -128,77 +94,74 @@
store.close()
self._storages = []
- def _checkCommitLock(self, method_name, dosetup, dowork):
- # check the commit lock when a client attemps a transaction,
- # but fails/exits before finishing the commit.
-
- # The general flow of these tests is to start a transaction by
- # calling tpc_begin(). Then begin one or more other
- # connections that also want to commit. This causes the
- # commit lock code to be exercised. Once the other
- # connections are started, the first transaction completes.
- # Either by commit or abort, depending on whether method_name
- # is "tpc_finish."
-
- # The tests are parameterized by method_name, dosetup(), and
- # dowork(). The dosetup() function is called with a
- # connectioned client storage, transaction, and timestamp.
- # Any work it does occurs after the first transaction has
- # started, but before it finishes. The dowork() function
- # executes after the first transaction has completed.
-
- # Start on transaction normally and get the lock.
- t = Transaction()
- self._storage.tpcBegin(t)
+ def _start_txn(self):
+ txn = Transaction()
+ self._storage.tpcBegin(txn)
oid = self._storage.newObjectId()
- self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
- self._storage.tpcVote(t)
+ self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
+ return oid, txn
- # Start a second transaction on a different connection without
- # blocking the test thread.
- self._storages = []
- for i in range(4):
- storage2 = self._duplicate_client()
- t2 = Transaction()
- tid = self._get_timestamp()
- dosetup(storage2, t2, tid)
- if i == 0:
- storage2.close()
- else:
- self._storages.append((storage2, t2))
+ def checkCommitLockVoteFinish(self):
+ oid, txn = self._start_txn()
+ self._storage.tpcVote(txn)
+
+ self._begin_threads()
- if method_name == "tpc_finish":
- self._storage.tpcFinish(t)
- self._storage.load(oid, '')
- else:
- self._storage.tpcAbort(t)
+ self._storage.tpcFinish(txn)
+ self._storage.load(oid, '')
- dowork(method_name)
+ self._finish_threads()
- # Make sure the server is still responsive
self._dostore()
+ self._cleanup()
+
+ def checkCommitLockVoteAbort(self):
+ oid, txn = self._start_txn()
+ self._storage.tpcVote(txn)
- def _dosetup1(self, storage, trans, tid):
- storage.tpcBegin(trans, tid)
+ self._begin_threads()
- def _dowork1(self, method_name):
- for store, trans in self._storages:
- oid = store.newObjectId()
- store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
- store.tpcVote(trans)
- if method_name == "tpc_finish":
- store.tpcFinish(trans)
- else:
- store.tpcAbort(trans)
+ self._storage.tpcAbort(txn)
+
+ self._finish_threads()
+
+ self._dostore()
+ self._cleanup()
+
+ def checkCommitLockVoteClose(self):
+ oid, txn = self._start_txn()
+ self._storage.tpcVote(txn)
+
+ self._begin_threads()
+
+ self._storage.close()
- def _dosetup2(self, storage, trans, tid):
+ self._finish_threads()
+ self._cleanup()
+
+ def _begin_threads(self):
+ # Start a second transaction on a different connection without
+ # blocking the test thread.
+ self._storages = []
self._threads = []
- t = WorkerThread(self, storage, trans)
- self._threads.append(t)
- t.start()
- t.ready.wait()
+
+ for i in range(self.NUM_CLIENTS):
+ storage = self._duplicate_client()
+ txn = Transaction()
+ tid = self._get_timestamp()
+
+ t = WorkerThread(self, storage, txn)
+ self._threads.append(t)
+ t.start()
+ t.ready.wait()
+
+ # Close on the connections abnormally to test server response
+ if i == 0:
+ storage.close()
+ else:
+ self._storages.append((storage, txn))
- def _dowork2(self, method_name):
+ def _finish_threads(self):
for t in self._threads:
t.cleanup()
@@ -215,5 +178,67 @@
def _get_timestamp(self):
t = time.time()
- ts = TimeStamp(*(time.gmtime(t)[:5] + (t % 60,)))
- return ts.raw()
+ t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
+ return `t`
+
+class CommitLockUndoTests(CommitLockTests):
+
+ def _get_trans_id(self):
+ self._dostore()
+ L = self._storage.undoInfo()
+ return L[0]['id']
+
+ def _begin_undo(self, trans_id):
+ rpc = self._storage._server.rpc
+ return rpc._deferred_call("undo", trans_id, self._storage._serial)
+
+ def _finish_undo(self, msgid):
+ return self._storage._server.rpc._deferred_wait(msgid)
+
+ def checkCommitLockUndoFinish(self):
+ trans_id = self._get_trans_id()
+ oid, txn = self._start_txn()
+ msgid = self._begin_undo(trans_id)
+
+ self._begin_threads()
+
+ self._finish_undo(msgid)
+ self._storage.tpcVote(txn)
+ self._storage.tpcFinish(txn)
+ self._storage.load(oid, '')
+
+ self._finish_threads()
+
+ self._dostore()
+ self._cleanup()
+
+ def checkCommitLockUndoAbort(self):
+ trans_id = self._get_trans_id()
+ oid, txn = self._start_txn()
+ msgid = self._begin_undo(trans_id)
+
+ self._begin_threads()
+
+ self._finish_undo(msgid)
+ self._storage.tpcVote(txn)
+ self._storage.tpcAbort(txn)
+
+ self._finish_threads()
+
+ self._dostore()
+ self._cleanup()
+
+ def checkCommitLockUndoClose(self):
+ trans_id = self._get_trans_id()
+ oid, txn = self._start_txn()
+ msgid = self._begin_undo(trans_id)
+
+ self._begin_threads()
+
+ self._finish_undo(msgid)
+ self._storage.tpcVote(txn)
+ self._storage.close()
+
+ self._finish_threads()
+
+ self._cleanup()