[Zodb-checkins] CVS: StandaloneZODB/ZEO/tests - forker.py:1.7 testZEO.py:1.11 winserver.py:1.2
Jeremy Hylton
jeremy@zope.com
Fri, 17 Aug 2001 15:16:54 -0400
Update of /cvs-repository/StandaloneZODB/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv6800
Modified Files:
forker.py testZEO.py winserver.py
Log Message:
Update ZEO connection tests to run on Windows
forker.py:
Extend start_zeo_server() on Windows with arg that specifies port,
which is used to restart the server on the same port.
Remove unnecessary imports.
Set PYTHONPATH for the spawned script.
testZEO.py:
Refactor ConnectionTests into abstract base class with two platform-
specific subclasses.
Add goofy test_suite() function, which is goofy because I can't figure
out how to use unittest any other way.
winserver.py:
Keep track of storage in ZEOTestServer so that its close() method can be
called when the server exits.
=== StandaloneZODB/ZEO/tests/forker.py 1.6 => 1.7 ===
import asyncore
-import atexit
import os
import profile
import random
-import socket
import sys
-import threading
-import time
import types
-import ThreadedAsync
import ZEO.ClientStorage, ZEO.StorageServer
PROFILE = 0
if os.name == "nt":
- def start_zeo_server(storage_name, args):
+ def start_zeo_server(storage_name, args, port=None):
"""Start a ZEO server in a separate process.
Returns the ZEO port, the test server port, and the pid.
"""
import ZEO.tests.winserver
- port = random.randrange(20000, 30000)
+ if port is None:
+ port = random.randrange(20000, 30000)
script = ZEO.tests.winserver.__file__
if script.endswith('.pyc'):
script = script[:-1]
args = (sys.executable, script, str(port), storage_name) + args
- pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
+ d = os.environ.copy()
+ d['PYTHONPATH'] = os.pathsep.join(sys.path)
+ pid = os.spawnve(os.P_NOWAIT, sys.executable, args, os.environ)
return ('localhost', port), ('localhost', port + 1), pid
else:
=== StandaloneZODB/ZEO/tests/testZEO.py 1.10 => 1.11 ===
from ZODB.tests.StorageTestBase import zodb_unpickle
-
ZERO = '\0'*8
class DummyDB:
@@ -223,44 +222,18 @@
start and stop a ZEO storage server.
"""
- __super_setUp = StorageTestBase.StorageTestBase.setUp
- __super_tearDown = StorageTestBase.StorageTestBase.tearDown
-
ports = range(29000, 30000, 10) # enough for 100 tests
random.shuffle(ports)
- def setUp(self):
- """Start a ZEO server using a Unix domain socket
-
- The ZEO server uses the storage object returned by the
- getStorage() method.
- """
- self.running = 1
- self.__fs_base = tempfile.mktemp()
- self.addr = '', random.randrange(2000, 3000)
- pid, exit = self._startServer()
- self._pid = pid
- self._server = exit
- self.__super_setUp()
-
- def _startServer(self, create=1):
- fs = FileStorage(self.__fs_base, create=create)
- return forker.start_zeo_server(fs, self.addr)
+ __super_tearDown = StorageTestBase.StorageTestBase.tearDown
def openClientStorage(self, cache='', cache_size=200000, wait=1):
- base = ZEO.ClientStorage.ClientStorage(self.addr,
- client=cache,
- cache_size=cache_size,
- wait_for_server_on_startup=wait)
- storage = PackWaitWrapper(base)
- storage.registerDB(DummyDB(), None)
- return storage
+ # defined by subclasses
+ pass
def shutdownServer(self):
- if self.running:
- self.running = 0
- self._server.close()
- os.waitpid(self._pid, 0)
+ # defined by subclasses
+ pass
def tearDown(self):
"""Try to cause the tests to halt"""
@@ -323,7 +296,7 @@
revid1 = self._dostore(oid, data=obj)
self.shutdownServer()
self.running = 1
- self._pid, self._server = self._startServer(create=0)
+ self._startServer(create=0)
oid = self._storage.new_oid()
obj = MinPO(12)
while 1:
@@ -335,6 +308,81 @@
else:
break
+class UnixConnectionTests(ConnectionTests):
+ __super_setUp = StorageTestBase.StorageTestBase.setUp
+
+ def setUp(self):
+ """Start a ZEO server using a Unix domain socket
+
+ The ZEO server uses the storage object returned by the
+ getStorage() method.
+ """
+ self.running = 1
+ self.__fs_base = tempfile.mktemp()
+ self.addr = '', self.ports.pop()
+ self._startServer()
+ self.__super_setUp()
+
+ def _startServer(self, create=1):
+ fs = FileStorage(self.__fs_base, create=create)
+ self._pid, self._server = forker.start_zeo_server(fs, self.addr)
+
+ def openClientStorage(self, cache='', cache_size=200000, wait=1):
+ base = ZEO.ClientStorage.ClientStorage(self.addr,
+ client=cache,
+ cache_size=cache_size,
+ wait_for_server_on_startup=wait)
+ storage = PackWaitWrapper(base)
+ storage.registerDB(DummyDB(), None)
+ return storage
+
+ def shutdownServer(self):
+ if self.running:
+ self.running = 0
+ self._server.close()
+ os.waitpid(self._pid, 0)
+
+class WindowsConnectionTests(ConnectionTests):
+ __super_setUp = StorageTestBase.StorageTestBase.setUp
+
+ def setUp(self):
+ self.file = tempfile.mktemp()
+ self._startServer()
+ self.__super_setUp()
+
+ def _startServer(self, create=1):
+ if create == 0:
+ port = self.addr[1]
+ else:
+ port = None
+ self.addr, self.test_a, pid = forker.start_zeo_server('FileStorage',
+ (self.file,
+ str(create)),
+ port)
+ self.running = 1
+
+ def openClientStorage(self, cache='', cache_size=200000, wait=1):
+ base = ZEO.ClientStorage.ClientStorage(self.addr,
+ client=cache,
+ cache_size=cache_size,
+ debug=1,
+ wait_for_server_on_startup=wait)
+ storage = PackWaitWrapper(base)
+ storage.registerDB(DummyDB(), None)
+ return storage
+
+ def shutdownServer(self):
+ if self.running:
+ self.running = 0
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(self.test_a)
+ s.close()
+ time.sleep(1.0)
+
+ def tearDown(self):
+ self.shutdownServer()
+
+
def get_methods(klass):
l = [klass]
meth = {}
@@ -348,9 +396,9 @@
return meth.keys()
if os.name == "posix":
- test_classes = ZEOFileStorageTests, ConnectionTests
+ test_classes = ZEOFileStorageTests, UnixConnectionTests
elif os.name == "nt":
- test_classes = WindowsZEOFileStorageTests
+ test_classes = WindowsZEOFileStorageTests, WindowsConnectionTests
else:
raise RuntimeError, "unsupported os: %s" % os.name
@@ -364,7 +412,13 @@
return suite
def test_suite():
- return unittest.makeSuite(WindowsZEOFileStorageTests, 'check')
+ t = unittest.TestLoader()
+ t.testMethodPrefix = 'check'
+ tests = []
+ for klass in test_classes:
+ s = t.loadTestsFromTestCase(klass)
+ tests.extend(s._tests)
+ return unittest.TestSuite(tests)
def main():
import sys, getopt
@@ -385,4 +439,5 @@
runner.run(tests)
if __name__ == "__main__":
+ print sys.path
main()
=== StandaloneZODB/ZEO/tests/winserver.py 1.1 => 1.2 ===
__super_init = asyncore.dispatcher.__init__
- def __init__(self, addr):
+ def __init__(self, addr, storage):
self.__super_init()
+ self.storage = storage
if type(addr) == types.StringType:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
@@ -28,6 +29,7 @@
def handle_accept(self):
sock, addr = self.accept()
+ self.storage.close()
os._exit(0)
def load_storage_class(name):
@@ -40,12 +42,11 @@
storage = klass(*args)
zeo_port = int(port)
test_port = zeo_port + 1
- t = ZEOTestServer(('', test_port))
-## t = threading.Thread(target=ZEOTestServer, args=(('', test_port),))
-## t.start()
+ t = ZEOTestServer(('', test_port), storage)
serv = ZEO.StorageServer.StorageServer(('', zeo_port), {'1': storage})
asyncore.loop()
if __name__ == "__main__":
import sys
+
main(sys.argv[1], sys.argv[2], sys.argv[3:])