[Zodb-checkins] CVS: Packages/ZEO  - forker.py:1.1 testZEO.py:1.2
   
    jeremy@digicool.com
     
    jeremy@digicool.com
       
    Fri, 27 Apr 2001 13:32:20 -0400 (EDT)
    
    
  
Update of /cvs-repository/Packages/ZEO/tests
In directory korak:/tmp/cvs-serv30049
Added Files:
	forker.py testZEO.py 
Log Message:
Add ZEO tests using Python 2.1 unittest framework.
All tests except checkLen() pass.
--- Added File forker.py in package Packages/ZEO ---
"""Library for forking storage server and connecting client storage"""
import asyncore
import os
import sys
import ThreadedAsync
import ZEO.ClientStorage, ZEO.StorageServer
class ZEOServerExit(asyncore.file_dispatcher):
    """Used to exit ZEO.StorageServer when run is done"""
    def writable(self):
        return 0
    def readable(self):
        return 1
    def handle_read(self):
        buf = self.recv(4)
        if buf:
            assert buf == "done"
            asyncore.socket_map.clear()
        
    def handle_close(self):
        asyncore.socket_map.clear()
class ZEOClientExit:
    """Used by client to cause server to exit"""
    def __init__(self, pipe):
        self.pipe = pipe
    def close(self):
        os.write(self.pipe, "done")
def start_zeo(storage, cache=None, cleanup=None, domain="AF_INET"):
    """Setup ZEO client-server for storage.
    Returns a ClientStorage instance and a ZEOClientExit instance.
    XXX Don't know if os.pipe() will work on Windows.
    """
    if domain == "AF_INET":
        import random
        addr = '', random.randrange(2000, 3000)
    elif domain == "AF_UNIX":
        import tempfile
        addr = tempfile.mktemp()
    else:
        raise ValueError, "bad domain: %s" % domain
    
    rd, wr = os.pipe()
    pid = os.fork()
    if pid == 0:
        # in the child, run the storage server
        try:
            os.close(wr)
            ZEOServerExit(rd)
            serv = ZEO.StorageServer.StorageServer(addr, {'1':storage})
            asyncore.loop()
            storage.close()
            if domain == "AF_UNIX":
                os.unlink(addr)
            if cleanup:
                cleanup()
        finally:
            os._exit(0)
    else:
        os.close(rd)
        s = ZEO.ClientStorage.ClientStorage(addr, debug=1, client=cache)
        return s, ZEOClientExit(wr), pid
--- Updated File testZEO.py in package Packages/ZEO --