[Zodb-checkins] CVS: Zope3/src/zodb/zeo/tests - zeoserver.py:1.7.6.1 test_conn.py:1.3.6.1 forker.py:1.4.6.1 connection.py:1.5.2.3
Jeremy Hylton
jeremy@zope.com
Thu, 13 Feb 2003 19:29:11 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo/tests
In directory cvs.zope.org:/tmp/cvs-serv25656/zeo/tests
Modified Files:
Tag: ZODB3-2-integration-branch
zeoserver.py test_conn.py forker.py connection.py
Log Message:
Port new connection tests from ZODB3.2.
Not all of the tests pass, but it's a good start.
=== Zope3/src/zodb/zeo/tests/zeoserver.py 1.7 => 1.7.6.1 ===
--- Zope3/src/zodb/zeo/tests/zeoserver.py:1.7 Mon Jan 27 14:44:14 2003
+++ Zope3/src/zodb/zeo/tests/zeoserver.py Thu Feb 13 19:29:10 2003
@@ -29,24 +29,12 @@
import zodb.zeo.server
from zodb.zeo import threadedasync
-
def load_storage(fp):
context = ZConfig.Context.Context()
rootconf = context.loadFile(fp)
storageconf = rootconf.getSection('Storage')
return config.createStorage(storageconf)
-
-def cleanup(storage):
- # FileStorage and the Berkeley storages have this method, which deletes
- # all files and directories used by the storage. This prevents @-files
- # from clogging up /tmp
- try:
- storage.cleanup()
- except AttributeError:
- pass
-
-
class ZEOTestServer(asyncore.dispatcher):
"""A server for killing the whole process at the end of a test.
@@ -64,10 +52,10 @@
"""
__super_init = asyncore.dispatcher.__init__
- def __init__(self, addr, storage, keep):
+ def __init__(self, addr, server, keep):
self.__super_init()
self._sockets = [self]
- self._storage = storage
+ self._server = server
self._keep = keep
# Count down to zero, the number of connects
self._count = 1
@@ -95,9 +83,9 @@
# the ack character until the storage is finished closing.
if self._count <= 0:
self.logger.info('closing the storage')
- self._storage.close()
- if not self._keep:
- cleanup(self._storage)
+ self._server.close_server()
+ for storage in self._server.storages.values():
+ storage.cleanup()
self.logger.info('exiting')
# Close all the other sockets so that we don't have to wait
# for os._exit() to get to it before starting the next
@@ -137,8 +125,11 @@
ro_svr = False
keep = False
configfile = None
+ invalidation_queue_size = 100
+ transaction_timeout = None
+ monitor_address = None
# Parse the arguments and let getopt.error percolate
- opts, args = getopt.getopt(sys.argv[1:], 'rkC:')
+ opts, args = getopt.getopt(sys.argv[1:], 'rkC:Q:T:m:')
for opt, arg in opts:
if opt == '-r':
ro_svr = True
@@ -146,6 +137,12 @@
keep = True
elif opt == '-C':
configfile = arg
+ elif opt == '-Q':
+ invalidation_queue_size = int(arg)
+ elif opt == '-T':
+ transaction_timeout = int(arg)
+ elif opt == '-m':
+ monitor_address = '', int(arg)
# Open the config file and let ZConfig parse the data there. Then remove
# the config file, otherwise we'll leave turds.
fp = open(configfile, 'r')
@@ -156,19 +153,24 @@
zeo_port = int(args[0])
test_port = zeo_port + 1
test_addr = ('', test_port)
+ addr = ('', zeo_port)
+ serv = zodb.zeo.server.StorageServer(
+ addr, {'1': storage}, ro_svr,
+ invalidation_queue_size=invalidation_queue_size,
+ transaction_timeout=transaction_timeout,
+ monitor_address=monitor_address)
try:
logger.info('creating the test server, ro: %s, keep: %s',
ro_svr, keep)
- t = ZEOTestServer(test_addr, storage, keep)
+ t = ZEOTestServer(test_addr, serv, keep)
except socket.error, e:
if e[0] <> errno.EADDRINUSE: raise
logger.info('addr in use, closing and exiting')
storage.close()
- cleanup(storage)
+ storage.cleanup()
sys.exit(2)
addr = ('', zeo_port)
logger.info('creating the storage server')
- serv = zodb.zeo.server.StorageServer(addr, {'1': storage}, ro_svr)
t.register_socket(serv.dispatcher)
# Loop for socket events
logger.info('entering threadedasync loop')
=== Zope3/src/zodb/zeo/tests/test_conn.py 1.3 => 1.3.6.1 ===
--- Zope3/src/zodb/zeo/tests/test_conn.py:1.3 Wed Jan 22 16:46:53 2003
+++ Zope3/src/zodb/zeo/tests/test_conn.py Thu Feb 13 19:29:10 2003
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -17,11 +17,10 @@
platform-dependent scaffolding.
"""
-# System imports
import unittest
-# Import the actual test class
-from zodb.zeo.tests import connection
+from zodb.zeo.tests import connection
+from zodb.storage.base import berkeley_is_available
class FileStorageConfig:
def getConfig(self, path, create, read_only):
@@ -35,7 +34,6 @@
create and 'yes' or 'no',
read_only and 'yes' or 'no')
-
class BerkeleyStorageConfig:
def getConfig(self, path, create, read_only):
# Full always creates and doesn't have a read_only flag
@@ -47,54 +45,28 @@
</Storage>""" % (path,
read_only and 'yes' or 'no')
+class MappingStorageConfig:
+ def getConfig(self, path, create, read_only):
+ return """\
+ <Storage>
+ type MappingStorage
+ name %s
+ </Storage>""" % path
-class FileStorageConnectionTests(
- FileStorageConfig,
- connection.ConnectionTests
- ):
- """FileStorage-specific connection tests."""
-
-
-class FileStorageReconnectionTests(
- FileStorageConfig,
- connection.ReconnectionTests
- ):
- """FileStorage-specific re-connection tests."""
-
-
-class BDBConnectionTests(
- BerkeleyStorageConfig,
- connection.ConnectionTests
- ):
- """Berkeley storage connection tests."""
-
-
-class BDBReconnectionTests(
- BerkeleyStorageConfig,
- connection.ReconnectionTests
- ):
- """Berkeley storage re-connection tests."""
-
-
-test_classes = [FileStorageConnectionTests, FileStorageReconnectionTests]
-
-from zodb.storage.base import berkeley_is_available
+tests = [connection.ConnectionTests, connection.ReconnectionTests]
+configs = [FileStorageConfig, MappingStorageConfig]
if berkeley_is_available:
- test_classes.append(BDBConnectionTests)
- test_classes.append(BDBReconnectionTests)
-
+ configs += [BerkeleyStorageConfig]
def test_suite():
- # shutup warnings about mktemp
- import warnings
- warnings.filterwarnings("ignore", "mktemp")
-
suite = unittest.TestSuite()
- for klass in test_classes:
- sub = unittest.makeSuite(klass, 'check')
- suite.addTest(sub)
+ for testclass in tests:
+ for configclass in configs:
+ # synthesize a concrete class combining tests and configuration
+ name = "%s:%s" % (testclass.__name__,
+ configclass.__name__)
+ aclass = type.__new__(type, name,
+ (configclass, testclass, object), {})
+ sub = unittest.makeSuite(aclass, "check")
+ suite.addTest(sub)
return suite
-
-
-if __name__ == "__main__":
- unittest.main(defaultTest='test_suite')
=== Zope3/src/zodb/zeo/tests/forker.py 1.4 => 1.4.6.1 ===
--- Zope3/src/zodb/zeo/tests/forker.py:1.4 Mon Jan 27 14:44:14 2003
+++ Zope3/src/zodb/zeo/tests/forker.py Thu Feb 13 19:29:10 2003
@@ -51,8 +51,8 @@
raise RuntimeError, "Can't find port"
-def start_zeo_server(conf, addr=None, ro_svr=False, keep=False,
- admin_retries=10):
+def start_zeo_server(conf, addr=None, ro_svr=False, monitor=False, keep=False,
+ invq=None, timeout=None):
"""Start a ZEO server in a separate process.
Returns the ZEO port, the test server port, and the pid.
@@ -72,11 +72,19 @@
if script.endswith('.pyc'):
script = script[:-1]
# Create a list of arguments, which we'll tuplify below
- args = [sys.executable, script, '-C', tmpfile]
+ qa = _quote_arg
+ args = [qa(sys.executable), qa(script), '-C', qa(tmpfile)]
if ro_svr:
args.append('-r')
if keep:
args.append('-k')
+ if invq:
+ args += ['-Q', str(invq)]
+ if timeout:
+ args += ['-T', str(timeout)]
+ if monitor:
+ # XXX Is it safe to reuse the port?
+ args += ['-m', '42000']
args.append(str(port))
d = os.environ.copy()
d['PYTHONPATH'] = os.pathsep.join(sys.path)
@@ -86,7 +94,7 @@
# Always do a sleep as the first thing, since we don't expect
# the spawned process to get started right away.
delay = 0.25
- for i in range(admin_retries):
+ for i in range(10):
time.sleep(delay)
logging.debug('forker: connect %s', i)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -107,6 +115,12 @@
raise
return ('localhost', port), adminaddr, pid
+if sys.platform[:3].lower() == "win":
+ def _quote_arg(s):
+ return '"%s"' % s
+else:
+ def _quote_arg(s):
+ return s
def shutdown_zeo_server(adminaddr):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
=== Zope3/src/zodb/zeo/tests/connection.py 1.5.2.2 => 1.5.2.3 ===
--- Zope3/src/zodb/zeo/tests/connection.py:1.5.2.2 Thu Feb 13 12:40:14 2003
+++ Zope3/src/zodb/zeo/tests/connection.py Thu Feb 13 19:29:10 2003
@@ -36,22 +36,31 @@
from zodb.storage.tests.base import zodb_pickle, zodb_unpickle
from zodb.storage.tests.base import handle_all_serials, ZERO
+class TestClientStorage(ClientStorage):
+
+ def verify_cache(self, stub):
+ self.end_verify = threading.Event()
+ self.verify_result = ClientStorage.verify_cache(self, stub)
+
+ def endVerify(self):
+ ClientStorage.endVerify(self)
+ self.end_verify.set()
class DummyDB:
def invalidate(self, *args, **kws):
pass
-
class CommonSetupTearDown(StorageTestBase):
- """Tests that explicitly manage the server process.
-
- To test the cache or re-connection, these test cases explicit
- start and stop a ZEO storage server.
- """
+ """Common boilerplate"""
__super_setUp = StorageTestBase.setUp
__super_tearDown = StorageTestBase.tearDown
+ keep = 0
+ invq = None
+ timeout = None
+ monitor = 0
+
def setUp(self):
"""Test setup for connection tests.
@@ -67,7 +76,7 @@
self._pids = []
self._servers = []
self._newAddr()
- self.startServer(keep=self.keep)
+ self.startServer()
def tearDown(self):
"""Try to cause the tests to halt"""
@@ -99,21 +108,18 @@
# port+1 is also used, so only draw even port numbers
return 'localhost', random.randrange(25000, 30000, 2)
- def getConfig(self):
- raise NotImplementedError
-
def openClientStorage(self, cache='', cache_size=200000, wait=True,
read_only=False, read_only_fallback=False,
addr=None):
if addr is None:
addr = self.addr
- storage = ClientStorage(addr,
- client=cache,
- cache_size=cache_size,
- wait=wait,
- min_disconnect_poll=0.1,
- read_only=read_only,
- read_only_fallback=read_only_fallback)
+ storage = TestClientStorage(addr,
+ client=cache,
+ cache_size=cache_size,
+ wait=wait,
+ min_disconnect_poll=0.1,
+ read_only=read_only,
+ read_only_fallback=read_only_fallback)
storage.registerDB(DummyDB())
return storage
@@ -122,6 +128,8 @@
# it fails with an error.
forker_admin_retries = 10
+ # Concrete test classes must provide a getConfig() method
+
def startServer(self, create=True, index=0,
read_only=False, ro_svr=False, keep=False):
addr = self.addr[index]
@@ -130,7 +138,8 @@
path = "%s.%d" % (self.file, index)
conf = self.getConfig(path, create, read_only)
zeoport, adminaddr, pid = forker.start_zeo_server(
- conf, addr, ro_svr, keep, self.forker_admin_retries)
+ conf, addr, ro_svr,
+ self.monitor, self.keep, self.invq, self.timeout)
self._pids.append(pid)
self._servers.append(adminaddr)
@@ -142,11 +151,13 @@
forker.shutdown_zeo_server(adminaddr)
self._servers[index] = None
- def pollUp(self, timeout=30.0):
+ def pollUp(self, timeout=30.0, storage=None):
# Poll until we're connected
+ if storage is None:
+ storage = self._storage
now = time.time()
giveup = now + timeout
- while not self._storage.is_connected():
+ while not storage.is_connected():
asyncore.poll(0.1)
now = time.time()
if now > giveup:
@@ -164,7 +175,11 @@
class ConnectionTests(CommonSetupTearDown):
- keep = False
+ """Tests that explicitly manage the server process.
+
+ To test the cache or re-connection, these test cases explicit
+ start and stop a ZEO storage server.
+ """
def checkMultipleAddresses(self):
for i in range(4):
@@ -267,7 +282,7 @@
self._dostore()
def checkDisconnectionError(self):
- # Make sure we get a Disconnected when we try to read an
+ # Make sure we get a ClientDisconnected when we try to read an
# object when we're not connected to a storage server and the
# object is not in the cache.
self.shutdownServer()
@@ -275,6 +290,22 @@
self.assertRaises(ClientDisconnected,
self._storage.load, 'fredwash', '')
+ def checkDisconnectedAbort(self):
+ self._storage = self.openClientStorage()
+ self._dostore()
+ oids = [self._storage.newObjectId() for i in range(5)]
+ txn = Transaction()
+ self._storage.tpcBegin(txn)
+ for oid in oids:
+ data = zodb_pickle(MinPO(oid))
+ self._storage.store(oid, None, data, '', txn)
+ self.shutdownServer()
+ self.assertRaises(ClientDisconnected, self._storage.tpcVote, txn)
+ self._storage.tpcAbort(txn)
+ self.startServer(create=0)
+ self._storage._wait()
+ self._dostore()
+
def checkBasicPersistence(self):
# Verify cached data persists across client storage instances.
@@ -425,6 +456,7 @@
class ReconnectionTests(CommonSetupTearDown):
keep = True
forker_admin_retries = 20
+ invq = 2
def checkReadOnlyStorage(self):
# Open a read-only client to a read-only *storage*; stores fail
@@ -558,6 +590,136 @@
else:
self.fail("Couldn't store after starting a read-write server")
+ def checkNoVerificationOnServerRestart(self):
+ self._storage = self.openClientStorage()
+ # When we create a new storage, it should always do a full
+ # verification
+ self.assertEqual(self._storage.verify_result, "full verification")
+ self._dostore()
+ self.shutdownServer()
+ self.pollDown()
+ self._storage.verify_result = None
+ self.startServer(create=0)
+ self.pollUp()
+ # There were no transactions committed, so no verification
+ # should be needed.
+ self.assertEqual(self._storage.verify_result, "no verification")
+
+ def checkNoVerificationOnServerRestartWith2Clients(self):
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "full verification")
+
+ self._storage = self.openClientStorage()
+ oid = self._storage.newObjectId()
+ # When we create a new storage, it should always do a full
+ # verification
+ self.assertEqual(self._storage.verify_result, "full verification")
+ # do two storages of the object to make sure an invalidation
+ # message is generated
+ revid = self._dostore(oid)
+ self._dostore(oid, revid)
+
+ perstorage.load(oid, '')
+
+ self.shutdownServer()
+
+ self.pollDown()
+ self._storage.verify_result = None
+ perstorage.verify_result = None
+ self.startServer(create=0)
+ self.pollUp()
+ self.pollUp(storage=perstorage)
+ # There were no transactions committed, so no verification
+ # should be needed.
+ self.assertEqual(self._storage.verify_result, "no verification")
+ self.assertEqual(perstorage.verify_result, "no verification")
+ perstorage.close()
+
+ def checkQuickVerificationWith2Clients(self):
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "full verification")
+
+ self._storage = self.openClientStorage()
+ oid = self._storage.newObjectId()
+ # When we create a new storage, it should always do a full
+ # verification
+ self.assertEqual(self._storage.verify_result, "full verification")
+ # do two storages of the object to make sure an invalidation
+ # message is generated
+ revid = self._dostore(oid)
+ revid = self._dostore(oid, revid)
+
+ perstorage.load(oid, '')
+ perstorage.close()
+
+ revid = self._dostore(oid, revid)
+
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "quick verification")
+
+ self.assertEqual(perstorage.load(oid, ''),
+ self._storage.load(oid, ''))
+
+
+
+ def checkVerificationWith2ClientsInvqOverflow(self):
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "full verification")
+
+ self._storage = self.openClientStorage()
+ oid = self._storage.newObjectId()
+ # When we create a new storage, it should always do a full
+ # verification
+ self.assertEqual(self._storage.verify_result, "full verification")
+ # do two storages of the object to make sure an invalidation
+ # message is generated
+ revid = self._dostore(oid)
+ revid = self._dostore(oid, revid)
+
+ perstorage.load(oid, '')
+ perstorage.close()
+
+ # the test code sets invq bound to 2
+ for i in range(5):
+ revid = self._dostore(oid, revid)
+
+ perstorage = self.openClientStorage(cache="test")
+ self.assertEqual(perstorage.verify_result, "full verification")
+ t = time.time() + 30
+ while not perstorage.end_verify.isSet():
+ perstorage.sync()
+ if time.time() > t:
+ self.fail("timed out waiting for endVerify")
+
+ self.assertEqual(self._storage.load(oid, '')[1], revid)
+ self.assertEqual(perstorage.load(oid, ''),
+ self._storage.load(oid, ''))
+
+ perstorage.close()
+
+class TimeoutTests(CommonSetupTearDown):
+ timeout = 1
+
+ def checkTimeout(self):
+ storage = self.openClientStorage()
+ txn = Transaction()
+ storage.tpcBegin(txn)
+ storage.tpcVote(txn)
+ time.sleep(2)
+ self.assertRaises(ClientDisconnected, storage.tpcFinish, txn)
+
+ def checkTimeoutOnAbort(self):
+ storage = self.openClientStorage()
+ txn = Transaction()
+ storage.tpcBegin(txn)
+ storage.tpcVote(txn)
+ storage.tpcAbort(txn)
+
+ def checkTimeoutOnAbortNoLock(self):
+ storage = self.openClientStorage()
+ txn = Transaction()
+ storage.tpcBegin(txn)
+ storage.tpcAbort(txn)
class MSTThread(threading.Thread):