[Zope3-checkins] CVS: ZODB4/ZEO/tests - forker.py:1.18
Barry Warsaw
barry@wooz.org
Mon, 16 Dec 2002 16:11:52 -0500
Update of /cvs-repository/ZODB4/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv7707
Modified Files:
forker.py
Log Message:
Forward port from zodb3, use one true way to spawn the zeo server
subproc for both windows and unix.
Also use True/False for all booleans.
One addition made here that needs to be backported to zodb3: the
`keep' flag to start_zeo_server() which tells the subprocess whether
the database files should be cleaned up or not when done. The
read-only tests need to preserve the temp files.
=== ZODB4/ZEO/tests/forker.py 1.17 => 1.18 ===
--- ZODB4/ZEO/tests/forker.py:1.17 Fri Nov 22 16:24:53 2002
+++ ZODB4/ZEO/tests/forker.py Mon Dec 16 16:11:52 2002
@@ -13,20 +13,23 @@
##############################################################################
"""Library for forking storage server and connecting client storage"""
-import asyncore
import os
+import sys
+import time
+import errno
import random
import socket
-import sys
+import tempfile
import traceback
-import types
-import ZEO.ClientStorage
+
+import zLOG
# Change value of PROFILE to enable server-side profiling
-PROFILE = 0
+PROFILE = False
if PROFILE:
import hotshot
+
def get_port():
"""Return a port that is not in use.
@@ -48,130 +51,62 @@
s.close()
raise RuntimeError, "Can't find port"
-if os.name == "nt":
-
- def start_zeo_server(storage_name, args, addr=None, ro_svr=0):
- """Start a ZEO server in a separate process.
- Returns the ZEO port, the test server port, and the pid.
- """
- import ZEO.tests.winserver
- if addr is None:
- port = get_port()
- else:
- port = addr[1]
- script = ZEO.tests.winserver.__file__
- if script.endswith('.pyc'):
- script = script[:-1]
- if ro_svr:
- prefix = (sys.executable, script, "-r")
- else:
- prefix = (sys.executable, script)
- args = prefix + (str(port), storage_name) + args
- d = os.environ.copy()
- d['PYTHONPATH'] = os.pathsep.join(sys.path)
- pid = os.spawnve(os.P_NOWAIT, sys.executable, args, d)
- return ('localhost', port), ('localhost', port + 1), pid
-
-else:
-
- class ZEOServerExit(asyncore.file_dispatcher):
- """Used to exit ZEO.StorageServer when run is done"""
-
- def writable(self):
- return 0
-
- def readable(self):
- return 1
-
- def handle_read(self):
- buf = self.recv(4)
- if buf:
- assert buf == "done"
- server.close_server()
- asyncore.socket_map.clear()
-
- def handle_close(self):
- server.close_server()
- asyncore.socket_map.clear()
-
- class ZEOClientExit:
- """Used by client to cause server to exit"""
- def __init__(self, pipe):
- self.pipe = pipe
+def start_zeo_server(conf, addr=None, ro_svr=False, keep=False):
+ """Start a ZEO server in a separate process.
- def close(self):
- try:
- os.write(self.pipe, "done")
- os.close(self.pipe)
- except os.error:
- pass
-
- def start_zeo_server(storage_name, args, addr, ro_svr=0):
- assert isinstance(args, types.TupleType)
- rd, wr = os.pipe()
- pid = os.fork()
- if pid == 0:
- asyncore.socket_map.clear() # Don't service the parent's sockets
- import ZEO.zrpc.log
- reload(ZEO.zrpc.log) # Don't share the logging file object
- try:
- if PROFILE:
- p = hotshot.Profile("stats.s.%d" % os.getpid())
- p.runctx(
- "run_server(addr, rd, wr, storage_name, args, ro_svr)",
- globals(), locals())
- p.close()
- else:
- run_server(addr, rd, wr, storage_name, args, ro_svr)
- except:
- print "Exception in ZEO server process"
- traceback.print_exc()
- os._exit(0)
- else:
- os.close(rd)
- return pid, ZEOClientExit(wr)
-
- def load_storage(name, args):
- package = __import__("ZODB." + name)
- mod = getattr(package, name)
- klass = getattr(mod, name)
- return klass(*args)
-
- def run_server(addr, rd, wr, storage_name, args, ro_svr):
- # in the child, run the storage server
- global server
- os.close(wr)
- ZEOServerExit(rd)
- import ZEO.StorageServer, ZEO.zrpc.server
- storage = load_storage(storage_name, args)
- server = ZEO.StorageServer.StorageServer(addr, {'1':storage}, ro_svr)
- ZEO.zrpc.server.loop()
- storage.close()
- if isinstance(addr, types.StringType):
- os.unlink(addr)
-
- def start_zeo(storage_name, args, cache=None, cleanup=None,
- domain="AF_INET", storage_id="1", cache_size=20000000):
- """Setup ZEO client-server for storage.
-
- Returns a ClientStorage instance and a ZEOClientExit instance.
-
- XXX Don't know if os.pipe() will work on Windows.
- """
-
- if domain == "AF_INET":
- addr = '', get_port()
- elif domain == "AF_UNIX":
- import tempfile
- addr = tempfile.mktemp()
- else:
- raise ValueError, "bad domain: %s" % domain
-
- pid, exit = start_zeo_server(storage_name, args, addr)
- s = ZEO.ClientStorage.ClientStorage(addr, storage_id,
- client=cache,
- cache_size=cache_size,
- min_disconnect_poll=0.5,
- wait=1)
- return s, exit, pid
+ Returns the ZEO port, the test server port, and the pid.
+ """
+ # Store the config info in a temp file.
+ tmpfile = tempfile.mktemp()
+ fp = open(tmpfile, 'w')
+ fp.write(conf)
+ fp.close()
+ # Create the server
+ import ZEO.tests.zeoserver
+ if addr is None:
+ port = get_port()
+ else:
+ port = addr[1]
+ script = ZEO.tests.zeoserver.__file__
+ if script.endswith('.pyc'):
+ script = script[:-1]
+ # Create a list of arguments, which we'll tuplify below
+ args = [sys.executable, script, '-C', tmpfile]
+ if ro_svr:
+ args.append('-r')
+ if keep:
+ args.append('-k')
+ args.append(str(port))
+ d = os.environ.copy()
+ d['PYTHONPATH'] = os.pathsep.join(sys.path)
+ pid = os.spawnve(os.P_NOWAIT, sys.executable, tuple(args), d)
+ adminaddr = ('localhost', port+1)
+ # We need to wait until the server starts, but not forever
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ for i in range(5):
+ try:
+ zLOG.LOG('forker', zLOG.DEBUG, 'connect %s' % i)
+ s.connect(adminaddr)
+ ack = s.recv(1024)
+ zLOG.LOG('forker', zLOG.DEBUG, 'acked: %s' % ack)
+ break
+ except socket.error, e:
+ if e[0] <> errno.ECONNREFUSED: raise
+ time.sleep(1)
+ else:
+ zLOG.LOG('forker', zLOG.DEBUG, 'boo hoo')
+ raise
+ return ('localhost', port), adminaddr, pid
+
+
+def shutdown_zeo_server(adminaddr):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(adminaddr)
+ try:
+ ack = s.recv(1024)
+ except socket.error, e:
+ if e[0] <> errno.ECONNRESET: raise
+ ack = 'no ack received'
+ zLOG.LOG('shutdownServer', zLOG.DEBUG, 'acked: %s' % ack)
+ s.close()