[Zodb-checkins] CVS: ZODB3/ZEO/tests - ConnectionTests.py:1.1 testZEO.py:1.52
Jeremy Hylton
jeremy@zope.com
Tue, 1 Oct 2002 12:09:26 -0400
Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv7619/tests
Modified Files:
testZEO.py
Added Files:
ConnectionTests.py
Log Message:
Factor ConnectionTests into a separate module.
Also rename _startServer() to startServer() for consistency with
shutdownServer().
=== Added File ZODB3/ZEO/tests/ConnectionTests.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import os
import random
import select
import socket
import sys
import tempfile
import thread
import time
import zLOG
import ZEO.ClientStorage
from ZEO.Exceptions import Disconnected
from ZODB.Transaction import get_transaction
from ZODB.POSException import ReadOnlyError
from ZODB.tests import StorageTestBase
from ZODB.tests.StorageTestBase import zodb_unpickle, MinPO
class ConnectionTests(StorageTestBase.StorageTestBase):
"""Tests that explicitly manage the server process.
To test the cache or re-connection, these test cases explicit
start and stop a ZEO storage server.
"""
__super_tearDown = StorageTestBase.StorageTestBase.tearDown
def setUp(self):
"""Start a ZEO server using a Unix domain socket
The ZEO server uses the storage object returned by the
getStorage() method.
"""
zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id())
self.file = tempfile.mktemp()
self.addr = []
self._pids = []
self._servers = []
self._newAddr()
self.startServer()
# startServer(), shutdownServer() are defined in OS-specific subclasses
def _newAddr(self):
self.addr.append(self._getAddr())
def _getAddr(self):
# On windows, port+1 is also used (see winserver.py), so only
# draw even port numbers
return 'localhost', random.randrange(25000, 30000, 2)
def openClientStorage(self, cache='', cache_size=200000, wait=1,
read_only=0, read_only_fallback=0):
raise NotImplementedError
def shutdownServer(self, index=0):
raise NotImplementedError
def startServer(self, create=1, index=0, read_only=0, ro_svr=0):
raise NotImplementedError
def tearDown(self):
"""Try to cause the tests to halt"""
zLOG.LOG("testZEO", zLOG.INFO, "tearDown() %s" % self.id())
if getattr(self, '_storage', None) is not None:
self._storage.close()
for i in range(len(self._servers)):
self.shutdownServer(i)
# file storage appears to create four files
for i in range(len(self.addr)):
for ext in '', '.index', '.lock', '.tmp':
path = "%s.%s%s" % (self.file, i, ext)
if os.path.exists(path):
try:
os.unlink(path)
except os.error:
pass
for i in 0, 1:
path = "c1-test-%d.zec" % i
if os.path.exists(path):
try:
os.unlink(path)
except os.error:
pass
self.__super_tearDown()
def pollUp(self, timeout=30.0):
# Poll until we're connected
now = time.time()
giveup = now + timeout
while not self._storage.is_connected():
asyncore.poll(0.1)
now = time.time()
if now > giveup:
self.fail("timed out waiting for storage to connect")
def pollDown(self, timeout=30.0):
# Poll until we're disconnected
now = time.time()
giveup = now + timeout
while self._storage.is_connected():
asyncore.poll(0.1)
now = time.time()
if now > giveup:
self.fail("timed out waiting for storage to disconnect")
def checkMultipleAddresses(self):
for i in range(4):
self._newAddr()
self._storage = self.openClientStorage('test', 100000, wait=1)
oid = self._storage.new_oid()
obj = MinPO(12)
self._dostore(oid, data=obj)
self._storage.close()
def checkMultipleServers(self):
# XXX crude test at first -- just start two servers and do a
# commit at each one.
self._newAddr()
self._storage = self.openClientStorage('test', 100000, wait=1)
self._dostore()
self.shutdownServer(index=0)
self.startServer(index=1)
# If we can still store after shutting down one of the
# servers, we must be reconnecting to the other server.
for i in range(10):
try:
self._dostore()
break
except Disconnected:
time.sleep(0.5)
def checkReadOnlyClient(self):
# Open a read-only client to a read-write server; stores fail
# Start a read-only client for a read-write server
self._storage = self.openClientStorage(read_only=1)
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
def checkReadOnlyStorage(self):
# Open a read-only client to a read-only *storage*; stores fail
# We don't want the read-write server created by setUp()
self.shutdownServer()
self._servers = []
self._pids = []
# Start a read-only server
self.startServer(create=0, index=0, read_only=1)
# Start a read-only client
self._storage = self.openClientStorage(read_only=1)
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
def checkReadOnlyServer(self):
# Open a read-only client to a read-only *server*; stores fail
# We don't want the read-write server created by setUp()
self.shutdownServer()
self._servers = []
self._pids = []
# Start a read-only server
self.startServer(create=0, index=0, ro_svr=1)
# Start a read-only client
self._storage = self.openClientStorage(read_only=1)
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
def checkReadOnlyFallbackWritable(self):
# Open a fallback client to a read-write server; stores succeed
# Start a read-only-fallback client for a read-write server
self._storage = self.openClientStorage(read_only_fallback=1)
# Stores should succeed here
self._dostore()
def checkReadOnlyFallbackReadOnlyStorage(self):
# Open a fallback client to a read-only *storage*; stores fail
# We don't want the read-write server created by setUp()
self.shutdownServer()
self._servers = []
self._pids = []
# Start a read-only server
self.startServer(create=0, index=0, read_only=1)
# Start a read-only-fallback client
self._storage = self.openClientStorage(wait=1, read_only_fallback=1)
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
def checkReadOnlyFallbackReadOnlyServer(self):
# Open a fallback client to a read-only *server*; stores fail
# We don't want the read-write server created by setUp()
self.shutdownServer()
self._servers = []
self._pids = []
# Start a read-only server
self.startServer(create=0, index=0, ro_svr=1)
# Start a read-only-fallback client
self._storage = self.openClientStorage(wait=1, read_only_fallback=1)
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
# XXX Compare checkReconnectXXX() here to checkReconnection()
# further down. Is the code here hopelessly naive, or is
# checkReconnection() overwrought?
def checkReconnectWritable(self):
# A read-write client reconnects to a read-write server
# Start a client
self._storage = self.openClientStorage(wait=1)
# Stores should succeed here
self._dostore()
# Shut down the server
self.shutdownServer()
self._servers = []
self._pids = []
# Poll until the client disconnects
self.pollDown()
# Stores should fail now
self.assertRaises(Disconnected, self._dostore)
# Restart the server
self.startServer(create=0)
# Poll until the client connects
self.pollUp()
# Stores should succeed here
self._dostore()
def checkReconnectReadOnly(self):
# A read-only client reconnects from a read-write to a
# read-only server
# Start a client
self._storage = self.openClientStorage(wait=1, read_only=1)
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
# Shut down the server
self.shutdownServer()
self._servers = []
self._pids = []
# Poll until the client disconnects
self.pollDown()
# Stores should still fail
self.assertRaises(ReadOnlyError, self._dostore)
# Restart the server
self.startServer(create=0, read_only=1)
# Poll until the client connects
self.pollUp()
# Stores should still fail
self.assertRaises(ReadOnlyError, self._dostore)
def checkReconnectFallback(self):
# A fallback client reconnects from a read-write to a
# read-only server
# Start a client in fallback mode
self._storage = self.openClientStorage(wait=1, read_only_fallback=1)
# Stores should succeed here
self._dostore()
# Shut down the server
self.shutdownServer()
self._servers = []
self._pids = []
# Poll until the client disconnects
self.pollDown()
# Stores should fail now
self.assertRaises(Disconnected, self._dostore)
# Restart the server
self.startServer(create=0, read_only=1)
# Poll until the client connects
self.pollUp()
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
def checkReconnectUpgrade(self):
# A fallback client reconnects from a read-only to a
# read-write server
# We don't want the read-write server created by setUp()
self.shutdownServer()
self._servers = []
self._pids = []
# Start a read-only server
self.startServer(create=0, read_only=1)
# Start a client in fallback mode
self._storage = self.openClientStorage(wait=1, read_only_fallback=1)
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
# Shut down the server
self.shutdownServer()
self._servers = []
self._pids = []
# Poll until the client disconnects
self.pollDown()
# Stores should fail now
self.assertRaises(Disconnected, self._dostore)
# Restart the server, this time read-write
self.startServer(create=0)
# Poll until the client sconnects
self.pollUp()
# Stores should now succeed
self._dostore()
def checkReconnectSwitch(self):
# A fallback client initially connects to a read-only server,
# then discovers a read-write server and switches to that
# We don't want the read-write server created by setUp()
self.shutdownServer()
self._servers = []
self._pids = []
# Allocate a second address (for the second server)
self._newAddr()
# Start a read-only server
self.startServer(create=0, index=0, read_only=1)
# Start a client in fallback mode
self._storage = self.openClientStorage(wait=1, read_only_fallback=1)
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
# Start a read-write server
self.startServer(index=1, read_only=0)
# After a while, stores should work
for i in range(300): # Try for 30 seconds
try:
self._dostore()
break
except (Disconnected, ReadOnlyError,
select.error, thread.error, socket.error):
time.sleep(0.1)
else:
self.fail("Couldn't store after starting a read-write server")
def checkDisconnectionError(self):
# Make sure we get a Disconnected when we try to read an
# object when we're not connected to a storage server and the
# object is not in the cache.
self.shutdownServer()
self._storage = self.openClientStorage('test', 1000, wait=0)
self.assertRaises(Disconnected, self._storage.load, 'fredwash', '')
def checkBasicPersistence(self):
# Verify cached data persists across client storage instances.
# To verify that the cache is being used, the test closes the
# server and then starts a new client with the server down.
# When the server is down, a load() gets the data from its cache.
self._storage = self.openClientStorage('test', 100000, wait=1)
oid = self._storage.new_oid()
obj = MinPO(12)
revid1 = self._dostore(oid, data=obj)
self._storage.close()
self.shutdownServer()
self._storage = self.openClientStorage('test', 100000, wait=0)
data, revid2 = self._storage.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO(12))
self.assertEqual(revid1, revid2)
self._storage.close()
def checkRollover(self):
# Check that the cache works when the files are swapped.
# In this case, only one object fits in a cache file. When the
# cache files swap, the first object is effectively uncached.
self._storage = self.openClientStorage('test', 1000, wait=1)
oid1 = self._storage.new_oid()
obj1 = MinPO("1" * 500)
self._dostore(oid1, data=obj1)
oid2 = self._storage.new_oid()
obj2 = MinPO("2" * 500)
self._dostore(oid2, data=obj2)
self._storage.close()
self.shutdownServer()
self._storage = self.openClientStorage('test', 1000, wait=0)
self._storage.load(oid1, '')
self._storage.load(oid2, '')
def checkReconnection(self):
# Check that the client reconnects when a server restarts.
# XXX Seem to get occasional errors that look like this:
# File ZEO/zrpc2.py, line 217, in handle_request
# File ZEO/StorageServer.py, line 325, in storea
# File ZEO/StorageServer.py, line 209, in _check_tid
# StorageTransactionError: (None, <tid>)
# could system reconnect and continue old transaction?
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
obj = MinPO(12)
self._dostore(oid, data=obj)
zLOG.LOG("checkReconnection", zLOG.INFO,
"About to shutdown server")
self.shutdownServer()
zLOG.LOG("checkReconnection", zLOG.INFO,
"About to restart server")
self.startServer(create=0)
oid = self._storage.new_oid()
obj = MinPO(12)
while 1:
try:
self._dostore(oid, data=obj)
break
except (Disconnected, select.error, thread.error, socket.error):
zLOG.LOG("checkReconnection", zLOG.INFO,
"Error after server restart; retrying.",
error=sys.exc_info())
get_transaction().abort()
time.sleep(0.1) # XXX how long to sleep
# XXX This is a bloody pain. We're placing a heavy burden
# on users to catch a plethora of exceptions in order to
# write robust code. Need to think about implementing
# John Heintz's suggestion to make sure all exceptions
# inherit from POSException.
zLOG.LOG("checkReconnection", zLOG.INFO, "finished")
=== ZODB3/ZEO/tests/testZEO.py 1.51 => 1.52 === (417/517 lines abridged)
--- ZODB3/ZEO/tests/testZEO.py:1.51 Mon Sep 30 13:05:48 2002
+++ ZODB3/ZEO/tests/testZEO.py Tue Oct 1 12:09:25 2002
@@ -24,7 +24,6 @@
import time
import unittest
-import ZEO.ClientStorage
from ZODB.Transaction import get_transaction
from ZODB.POSException import ReadOnlyError
import zLOG
@@ -46,7 +45,9 @@
raise
+from ZEO.ClientStorage import ClientStorage
from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
+from ZEO.tests.ConnectionTests import ConnectionTests
from ZEO.Exceptions import Disconnected
from ZODB.tests import StorageTestBase, BasicStorage, VersionStorage, \
@@ -117,9 +118,7 @@
addr = self._storage._addr
self._storage.close()
- self._storage = ZEO.ClientStorage.ClientStorage(addr,
- read_only=read_only,
- wait=1)
+ self._storage = ClientStorage(addr, read_only=read_only, wait=1)
def checkLargeUpdate(self):
obj = MinPO("X" * (10 * 128 * 1024))
@@ -127,8 +126,7 @@
def checkZEOInvalidation(self):
addr = self._storage._addr
- storage2 = ZEO.ClientStorage.ClientStorage(addr, wait=1,
- min_disconnect_poll=0.1)
+ storage2 = ClientStorage(addr, wait=1, min_disconnect_poll=0.1)
try:
oid = self._storage.new_oid()
ob = MinPO('first')
@@ -178,8 +176,7 @@
args = args[1]
zeo_addr, self.test_addr, self.test_pid = \
forker.start_zeo_server(name, args)
- storage = ZEO.ClientStorage.ClientStorage(zeo_addr, wait=1,
- min_disconnect_poll=0.1)
+ storage = ClientStorage(zeo_addr, wait=1, min_disconnect_poll=0.1)
self._storage = PackWaitWrapper(storage)
storage.registerDB(DummyDB(), None)
[-=- -=- -=- 417 lines omitted -=- -=- -=-]
- self.shutdownServer()
- zLOG.LOG("checkReconnection", zLOG.INFO,
- "About to restart server")
- self._startServer(create=0)
- oid = self._storage.new_oid()
- obj = MinPO(12)
- while 1:
- try:
- revid1 = self._dostore(oid, data=obj)
- break
- except (Disconnected, select.error, thread.error, socket.error), \
- err:
- zLOG.LOG("checkReconnection", zLOG.INFO,
- "Error after server restart; retrying.",
- error=sys.exc_info())
- get_transaction().abort()
- time.sleep(0.1) # XXX how long to sleep
- # XXX This is a bloody pain. We're placing a heavy burden
- # on users to catch a plethora of exceptions in order to
- # write robust code. Need to think about implementing
- # John Heintz's suggestion to make sure all exceptions
- # inherit from POSException.
- zLOG.LOG("checkReconnection", zLOG.INFO, "finished")
-
-class UnixConnectionTests(ConnectionTests):
+class UnixConnectionTests(BaseConnectionTests):
- def _startServer(self, create=1, index=0, read_only=0, ro_svr=0):
+ def startServer(self, create=1, index=0, read_only=0, ro_svr=0):
zLOG.LOG("testZEO", zLOG.INFO,
- "_startServer(create=%d, index=%d, read_only=%d)" %
+ "startServer(create=%d, index=%d, read_only=%d)" %
(create, index, read_only))
path = "%s.%d" % (self.file, index)
addr = self.addr[index]
@@ -650,11 +237,11 @@
except os.error, err:
print err
-class WindowsConnectionTests(ConnectionTests):
+class WindowsConnectionTests(BaseConnectionTests):
- def _startServer(self, create=1, index=0, read_only=0, ro_svr=0):
+ def startServer(self, create=1, index=0, read_only=0, ro_svr=0):
zLOG.LOG("testZEO", zLOG.INFO,
- "_startServer(create=%d, index=%d, read_only=%d)" %
+ "startServer(create=%d, index=%d, read_only=%d)" %
(create, index, read_only))
path = "%s.%d" % (self.file, index)
addr = self.addr[index]