[Zodb-checkins] CVS: ZEO/ZEO/tests - winserver.py:1.2.2.1
Jeremy Hylton
jeremy@zope.com
Thu, 23 Aug 2001 20:58:06 -0400
Update of /cvs-repository/ZEO/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv25479
Added Files:
Tag: zeo-1_0-branch
winserver.py
Log Message:
Adding file from trunk (for Windows test success)
=== Added File ZEO/ZEO/tests/winserver.py ===
"""Helper file used to launch ZEO server for Windows tests"""
import asyncore
import os
import random
import socket
import threading
import types
import ZEO.StorageServer
class ZEOTestServer(asyncore.dispatcher):
"""A trivial server for killing a server at the end of a test
The server calls os._exit() as soon as it is connected to. No
chance to even send some data down the socket.
"""
__super_init = asyncore.dispatcher.__init__
def __init__(self, addr, storage):
self.__super_init()
self.storage = storage
if type(addr) == types.StringType:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(addr)
self.listen(5)
def handle_accept(self):
sock, addr = self.accept()
self.storage.close()
os._exit(0)
def load_storage_class(name):
package = __import__("ZODB." + name)
mod = getattr(package, name)
return getattr(mod, name)
def main(port, storage_name, args):
klass = load_storage_class(storage_name)
storage = klass(*args)
zeo_port = int(port)
test_port = zeo_port + 1
t = ZEOTestServer(('', test_port), storage)
serv = ZEO.StorageServer.StorageServer(('', zeo_port), {'1': storage})
asyncore.loop()
if __name__ == "__main__":
import sys
main(sys.argv[1], sys.argv[2], sys.argv[3:])