[Zope-Checkins] CVS: ZODB3/ZEO/tests - testZEO.py:1.69
Jeremy Hylton
jeremy@zope.com
Thu, 29 May 2003 14:42:04 -0400
Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv1353/ZEO/tests
Modified Files:
testZEO.py
Log Message:
Revise the test framework to use ZConfig instead of a custom argv.
=== ZODB3/ZEO/tests/testZEO.py 1.68 => 1.69 ===
--- ZODB3/ZEO/tests/testZEO.py:1.68 Fri May 23 17:33:29 2003
+++ ZODB3/ZEO/tests/testZEO.py Thu May 29 14:42:04 2003
@@ -17,6 +17,7 @@
import os
import sys
import time
+import random
import socket
import asyncore
import tempfile
@@ -37,13 +38,8 @@
PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \
MTStorage, ReadOnlyStorage
-# ZEO imports
from ZEO.ClientStorage import ClientStorage
-
-# ZEO test support
from ZEO.tests import forker, Cache
-
-# ZEO test mixin classes
from ZEO.tests import CommitLockTests, ThreadTests
class DummyDB:
@@ -80,6 +76,26 @@
finally:
storage2.close()
+def get_port():
+ """Return a port that is not in use.
+
+ Checks if a port is in use by trying to connect to it. Assumes it
+ is not in use if connect raises an exception.
+
+ Raises RuntimeError after 10 tries.
+ """
+ for i in range(10):
+ port = random.randrange(20000, 30000)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ try:
+ s.connect(('localhost', port))
+ except socket.error:
+ # XXX check value of error?
+ return port
+ finally:
+ s.close()
+ raise RuntimeError, "Can't find port"
class GenericTests(
# Base class for all ZODB tests
@@ -109,15 +125,20 @@
def setUp(self):
zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id())
- zeoport, adminaddr, pid = forker.start_zeo_server(self.getConfig())
+ port = get_port()
+ zconf = forker.ZEOConfig(('', port))
+ zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
+ zconf, port)
self._pids = [pid]
self._servers = [adminaddr]
- self._storage = ClientStorage(zeoport, '1', cache_size=20000000,
+ self._conf_path = path
+ self._storage = ClientStorage(zport, '1', cache_size=20000000,
min_disconnect_poll=0.5, wait=1)
self._storage.registerDB(DummyDB(), None)
def tearDown(self):
self._storage.close()
+ os.remove(self._conf_path)
for server in self._servers:
forker.shutdown_zeo_server(server)
if hasattr(os, 'waitpid'):
@@ -150,7 +171,7 @@
def getConfig(self):
filename = self.__fs_base = tempfile.mktemp()
return """\
- <filestorage>
+ <filestorage 1>
path %s
</filestorage>
""" % filename
@@ -162,7 +183,7 @@
def getConfig(self):
self._envdir = tempfile.mktemp()
return """\
- <fullstorage>
+ <fullstorage 1>
name %s
</fullstorage>
""" % self._envdir
@@ -171,7 +192,7 @@
"""ZEO backed by a Mapping storage."""
def getConfig(self):
- return """<mappingstorage/>"""
+ return """<mappingstorage 1/>"""
# Tests which MappingStorage can't possibly pass, because it doesn't
# support versions or undo.