[Zodb-checkins] CVS: StandaloneZODB/ZEO/zrpc - NOTES:1.1.2.1 __init__.py:1.1.2.1 client.py:1.1.2.1 connection.py:1.1.2.1 error.py:1.1.2.1 log.py:1.1.2.1 marshal.py:1.1.2.1 server.py:1.1.2.1 trigger.py:1.1.2.1

Jeremy Hylton jeremy@zope.com
Wed, 16 Jan 2002 09:52:39 -0500


Update of /cvs-repository/StandaloneZODB/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv22529/zrpc

Added Files:
      Tag: Standby-branch
	NOTES __init__.py client.py connection.py error.py log.py 
	marshal.py server.py trigger.py 
Log Message:
Convert zrpc from module zrpc2 to zrpc package.


=== Added File StandaloneZODB/ZEO/zrpc/NOTES ===
The Connection object should be extended to support more flexible
handling for outstanding calls.  In particular, it should be possible
to have multiple calls with return values outstanding.

The mechanism described here is based on the promises mechanism in
Argus, which was influenced by futures in Multilisp.

    Promises: Linguistic Support for Efficient Asynchronous Procedure
    Calls in Distributed Systems.  Barbara Liskov and Liuba Shrira.
    Proc. of Conf. on Programming Language Design and Implementation
    (PLDI), June 1988.

We want to support two different kinds of calls:

  - send : invoke a method that returns no value
  - call : invoke a method that returns a value

On the client, a call immediately returns a promise.  A promise is an
object that can be used to claim the return value when it becomes
available. 

  - ready(): returns true if the return value is ready or an exception
             occurred
  - claim(): returns the call's return value or raises an exception,
             blocking if necessary

The server side of a zrpc connection can be implemented using
asyncore.  In that case, a method call blocks other RPC activity until
it returns.  If a call needs to return a value, but can't return
immediately, it returns a delay object (ZEO.zrpc.server.Delay).  

When the zrpc connection receives a Delay object, it does not
immediately return to the caller.  Instead, it returns when the
reply() method is called.  A Delay has two methods:

  - set_sender()
  - reply(obj): returns obj to the sender

-----------------------------------------

Open issues:

Delayed exception

There is currently no mechanism to raise an exception from a delayed
pcall. 

Synchronization

The following item is part of Argus, but the motivation isn't entirely
clear.

    For any two calls, C1 and C2, C1 always starts on the server
    first.  For the promises, C2 is ready() iff C1 is also ready().
    The promises can be claimed in any order.

A related notion:

    The connection should also support a synch() method that returns
    only when all outstanding calls have completed.  If any of these
    calls raised an exception, the synch() call raises an exception.

XXX synch() sounds potentially useful, but it's not clear if it would
be useful for ZEO.  In ZEO a single connection object handles multiple
threads, each thread is going to make independent calls.  When a
particular tpc_begin() returns and a thread commits its transaction,
it makes more calls.  These calls will before any of the other
tpc_begin() calls.

I think the Argus approach would be to use separate handlers for each
thread (not sure Argus had threads), so that a single thread could
rely on ordering guarantees.

Multithreaded server

There are lots of issues to work out here.

Delays may not be necessary if the connecftion handler runs in a
different thread than the object the handles the calls.  

=== Added File StandaloneZODB/ZEO/zrpc/__init__.py ===
# zrpc is a package with the following modules
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
# connection -- object dispatcher
# client -- manages connection creation to remote server
# server -- manages incoming connections from remote clients
# trigger -- medusa's trigger


=== Added File StandaloneZODB/ZEO/zrpc/client.py ===
import errno
import select
import socket
import sys
import threading
import time
import types

import ThreadedAsync
import zLOG

from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.connection import ManagedConnection

class ConnectionManager:
    """Keeps a connection up over time"""

    def __init__(self, addr, client, tmin=1, tmax=180):
        self.set_addr(addr)
        self.client = client
        self.tmin = tmin
        self.tmax = tmax
        self.connected = 0
        self.connection = None
        # If _thread is not None, then there is a helper thread
        # attempting to connect.  _thread is protected by _connect_lock.
        self._thread = None
        self._connect_lock = threading.Lock()
        self.trigger = None
        self.thr_async = 0
        self.closed = 0
        ThreadedAsync.register_loop_callback(self.set_async)

    def __repr__(self):
        return "<%s for %s>" % (self.__class__.__name__, self.addr)

    def set_addr(self, addr):
        "Set one or more addresses to use for server."

        # For backwards compatibility (and simplicity?) the
        # constructor accepts a single address in the addr argument --
        # a string for a Unix domain socket or a 2-tuple with a
        # hostname and port.  It can also accept a list of such addresses.

        addr_type = self._guess_type(addr)
        if addr_type is not None:
            self.addr = [(addr_type, addr)]
        else:
            self.addr = []
            for a in addr:
                addr_type = self._guess_type(a)
                if addr_type is None:
                    raise ValueError, "unknown address in list: %s" % repr(a)
                self.addr.append((addr_type, a))

    def _guess_type(self, addr):
        if isinstance(addr, types.StringType):
            return socket.AF_UNIX

        if (len(addr) == 2
            and isinstance(addr[0], types.StringType)
            and isinstance(addr[1], types.IntType)):
            return socket.AF_INET

        # not anything I know about
        return None

    def close(self):
        """Prevent ConnectionManager from opening new connections"""
        self.closed = 1
        self._connect_lock.acquire()
        try:
            if self._thread is not None:
                # XXX race on _thread
                self._thread.stop()
                self._thread.join()
        finally:
            self._connect_lock.release()
        if self.connection:
            self.connection.close()

    # XXX get rid of this?
    def register_client(self, client):
        self.client = client

    def set_async(self, map):
        # XXX need each connection started with async==0 to have a callback
        self.trigger = trigger()
        self.thr_async = 1 # XXX needs to be set on the Connection

    def attempt_connect(self):
        # XXX will a single attempt take too long?
        self.connect()
        try:
            event = self._thread.one_attempt
        except AttributeError:
            pass
        else:
            event.wait()
        return self.connected

    def connect(self, sync=0):
        if self.connected == 1:
            return
        self._connect_lock.acquire()
        try:
            if self._thread is None:
                log("starting thread to connect to server")
                self._thread = ConnectThread(self, self.client, self.addr,
                                             self.tmin, self.tmax)
                self._thread.start()
            if sync:
                try:
                    self._thread.join()
                except AttributeError:
                    # probably means the thread exited quickly
                    pass
        finally:
            self._connect_lock.release()

    def connect_done(self, c):
        log("connect_done()")
        self.connected = 1
        self.connection = c
        self._thread = None

    def notify_closed(self, conn):
        self.connected = 0
        self.connection = None
        self.client.notifyDisconnected()
        if not self.closed:
            self.connect()

class Connected(Exception):
    # helper for non-local exit
    def __init__(self, sock):
        self.sock = sock
            
class ConnectThread(threading.Thread):

    __super_init = threading.Thread.__init__

    def __init__(self, mgr, client, addr, tmin, tmax):
        self.__super_init(name="Connect(%s)" % addr)
        self.mgr = mgr
        self.client = client
        self.addr = addr
        self.tmin = tmin
        self.tmax = tmax
        self.stopped = 0
        self.one_attempt = threading.Event()

    def stop(self):
        self.stopped = 1

    def run(self):
        delay = self.tmin
        while not (self.stopped or self.attempt_connects()):
            if not self.one_attempt.isSet():
                self.one_attempt.set()
            time.sleep(delay)
            delay *= 2
            if delay > self.tmax:
                delay = self.tmax
        log("thread exiting: %s" % self.getName())
                
    def attempt_connects(self):
        "Return true if any connect attempt succeeds."
        sockets = {}

        log("attempting connection on %d sockets" % len(self.addr))
        try:
            for domain, addr in self.addr:
                if __debug__:
                    log("attempt connection to %s" % repr(addr),
                        level=zLOG.DEBUG)
                s = socket.socket(domain, socket.SOCK_STREAM)
                s.setblocking(0)
                # XXX can still block for a while if addr requires DNS
                e = self.connect(s, addr)
                if e is not None:
                    sockets[s] = addr

            # next wait until they actually connect
            while sockets:
                if self.stopped:
                    for s in sockets.keys():
                        s.close()
                    return 0
                try:
                    r, w, x = select.select([], sockets.keys(), [], 1.0)
                except select.error:
                    continue
                for s in w:
                    e = self.connect(s, sockets[s])
                    if e is None:
                        del sockets[s]
        except Connected, container:
            s = container.sock
            del sockets[s]
            # close all the other sockets
            for s in sockets.keys():
                s.close()
            return 1
        return 0

    def connect(self, s, addr):
        """Call s.connect_ex(addr) and return true if loop should continue.

        We have to handle several possible return values from
        connect_ex().  If the socket is connected and the initial ZEO
        setup works, we're done.  Report success by raising an
        exception.  Yes, the is odd, but we need to bail out of the
        select() loop in the caller and an exception is a principled
        way to do the abort.

        If the socket sonnects and the initial ZEO setup fails or the
        connect_ex() returns an error, we close the socket and ignore it.

        If connect_ex() returns EINPROGRESS, we need to try again later.
        """
        
        e = s.connect_ex(addr)
        if e == errno.EINPROGRESS:
            return 1
        elif e == 0:
            c = self.test_connection(s, addr)
            log("connected to %s" % repr(addr), level=zLOG.DEBUG)
            if c:
                raise Connected(s)
        else:
            if __debug__:
                log("error connecting to %s: %s" % (addr, errno.errorcode[e]),
                    level=zLOG.DEBUG)
            s.close()

    def test_connection(self, s, addr):
        c = ManagedConnection(s, addr, self.client, self.mgr)
        try:
            self.client.notifyConnected(c)
        except:
            log("error connecting to server: %s" % str(addr),
                level=zLOG.ERROR, error=sys.exc_info())
            c.close()
            return 0
        self.mgr.connect_done(c)
        return 1


=== Added File StandaloneZODB/ZEO/zrpc/connection.py ===
import asyncore
import sys
import threading
import types

import ThreadedAsync
from ZEO import smac # XXX put smac in zrpc?
from ZEO.zrpc.error import ZRPCError, DisconnectedError, DecodingError
from ZEO.zrpc.log import log
from ZEO.zrpc.marshal import Marshaller
from ZEO.zrpc.trigger import trigger
import zLOG
from ZODB import POSException

REPLY = ".reply" # message name used for replies
ASYNC = 1

# XXX get rid of this class and use hasattr()
class Handler:
    """Base class used to handle RPC caller discovery"""

    def set_caller(self, addr):
        self.__caller = addr

    def get_caller(self):
        return self.__caller

    def clear_caller(self):
        self.__caller = None

class Delay:
    """Used to delay response to client for synchronous calls

    When a synchronous call is made and the original handler returns
    without handling the call, it returns a Delay object that prevents
    the mainloop from sending a response.
    """

    def set_sender(self, msgid, send_reply):
        self.msgid = msgid
        self.send_reply = send_reply

    def reply(self, obj):
        self.send_reply(self.msgid, obj)

class Connection(smac.SizedMessageAsyncConnection):
    """Dispatcher for RPC on object

    The connection supports synchronous calls, which expect a return,
    and asynchronous calls that do not.

    It uses the Marshaller class to handle encoding and decoding of
    method calls are arguments.

    A Connection is designed for use in a multithreaded application,
    where a synchronous call must block until a response is ready.
    The current design only allows a single synchronous call to be
    outstanding. 
    """

    __super_init = smac.SizedMessageAsyncConnection.__init__
    __super_close = smac.SizedMessageAsyncConnection.close
    __super_writable = smac.SizedMessageAsyncConnection.writable

    def __init__(self, sock, addr, obj=None):
        self.obj = obj
        self.marshal = Marshaller()
        self.closed = 0
        self.msgid = 0
        self.__super_init(sock, addr)
        # A Connection either uses asyncore directly or relies on an
        # asyncore mainloop running in a separate thread.  If
        # thr_async is true, then the mainloop is running in a
        # separate thread.  If thr_async is true, then the asyncore
        # trigger (self.trigger) is used to notify that thread of
        # activity on the current thread.
        self.thr_async = 0
        self.trigger = None
        self._prepare_async()
        self._map = {self._fileno: self}
        self.__call_lock = threading.Lock()
        # The reply lock is used to block when a synchronous call is
        # waiting for a response
        self.__reply_lock = threading.Lock()
        self.__reply_lock.acquire()
        # If the object implements the Handler interface (XXX checked
        # by isinstance), it wants to know who the caller is.
        if isinstance(obj, Handler):
            self.set_caller = 1
        else:
            self.set_caller = 0

    def __repr__(self):
        return "<%s %s>" % (self.__class__.__name__, self.addr)

    def close(self):
        caller = sys._getframe(1).f_code.co_name
        log("close() caller=%s" % caller)
        if self.closed:
            return
        self.closed = 1
        self.close_trigger()
        self.__super_close()

    def close_trigger(self):
        if self.trigger is not None:
            self.trigger.close()

    def register_object(self, obj):
        """Register obj as the true object to invoke methods on"""
        self.obj = obj

    def message_input(self, message):
        """Decoding an incoming message and dispatch it"""
        # XXX Not sure what to do with errors that reach this level.
        # Need to catch ZRPCErrors in handle_reply() and
        # handle_request() so that they get back to the client.
        try:
            msgid, flags, name, args = self.marshal.decode(message)
        except DecodingError, msg:
            return self.return_error(None, None, DecodingError, msg)

        if __debug__:
            log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
                                              repr(args)[:40]),
                level=zLOG.DEBUG)
        if name == REPLY:
            self.handle_reply(msgid, flags, args)
        else:
            self.handle_request(msgid, flags, name, args)

    def handle_reply(self, msgid, flags, args):
        if __debug__:
            log("recv reply: %s, %s, %s" % (msgid, flags, str(args)[:40]),
                level=zLOG.DEBUG)
        self.__reply = msgid, flags, args
        self.__reply_lock.release() # will fail if lock is unlocked

    def handle_request(self, msgid, flags, name, args):
        if __debug__:
            log("call %s%s on %s" % (name, repr(args)[:40], repr(self.obj)),
                zLOG.DEBUG)
        if not self.check_method(name):
            raise ZRPCError("Invalid method name: %s on %s" % (name,
                                                               `self.obj`))

        meth = getattr(self.obj, name)
        try:
            if self.set_caller:
                self.obj.set_caller(self)
                try:
                    ret = meth(*args)
                finally:
                    self.obj.clear_caller()
            else:
                ret = meth(*args)
        except (POSException.UndoError,
                POSException.VersionCommitError), msg:
            error = sys.exc_info()[:2]
            log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
            return self.return_error(msgid, flags, error[0], error[1])
        except Exception, msg:
            error = sys.exc_info()[:2]
            log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
            return self.return_error(msgid, flags, error[0], error[1])

        if flags & ASYNC:
            if ret is not None:
                log("async method %s returned value %s" % (name, repr(ret)),
                    zLOG.ERROR)
                raise ZRPCError("async method returned value")
        else:
            if __debug__:
                log("%s return %s" % (name, repr(ret)[:40]), zLOG.DEBUG)
            if isinstance(ret, Delay):
                ret.set_sender(msgid, self.send_reply)
            else:
                self.send_reply(msgid, ret)

    def handle_error(self):
        self.log_error()
        self.close()

    def log_error(self, msg="No error message supplied"):
        error = sys.exc_info()
        log(msg, zLOG.ERROR, error=error)
        del error

    def check_method(self, name):
        # XXX minimal security check should go here: Is name exported?
        return hasattr(self.obj, name)

    def send_reply(self, msgid, ret):
        msg = self.marshal.encode(msgid, 0, REPLY, ret)
        self.message_output(msg)
    
    def return_error(self, msgid, flags, err_type, err_value):
        if flags is None:
            self.log_error("Exception raised during decoding")
            return
        if flags & ASYNC:
            self.log_error("Asynchronous call raised exception: %s" % self)
            return
        if type(err_value) is not types.InstanceType:
            err_value = err_type, err_value

        try:
            msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
        except self.marshal.errors:
            err = ZRPCError("Couldn't pickle error %s" % `err_value`)
            msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
        self.message_output(msg)
        self._do_io()

    # The next two methods are used by clients to invoke methods on
    # remote objects  

    # XXX Should revise design to allow multiple outstanding
    # synchronous calls

    def call(self, method, *args):
        self.__call_lock.acquire()
        try:
            return self._call(method, args)
        finally:
            self.__call_lock.release()

    def _call(self, method, args):
        if self.closed:
            raise DisconnectedError("This action is temporarily unavailable")
        msgid = self.msgid
        self.msgid = self.msgid + 1
        if __debug__:
            log("send msg: %d, 0, %s, ..." % (msgid, method))
        self.message_output(self.marshal.encode(msgid, 0, method, args))

        self.__reply = None
        # lock is currently held
        self._do_io(wait=1)
        # lock is held again...
        r_msgid, r_flags, r_args = self.__reply
        self.__reply_lock.acquire()
        assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)

        if type(r_args) == types.TupleType \
           and type(r_args[0]) == types.ClassType \
           and issubclass(r_args[0], Exception):
            raise r_args[1] # error raised by server
        return r_args

    def callAsync(self, method, *args):
        self.__call_lock.acquire()
        try:
            self._callAsync(method, args)
        finally:
            self.__call_lock.release()

    def _callAsync(self, method, args):
        if self.closed:
            raise DisconnectedError("This action is temporarily unavailable")
        msgid = self.msgid
        self.msgid += 1
        if __debug__:
            log("send msg: %d, %d, %s, ..." % (msgid, ASYNC, method))
        self.message_output(self.marshal.encode(msgid, ASYNC, method, args))
        self._do_io()

    # handle IO, possibly in async mode

    def _prepare_async(self):
        self.thr_async = 0
        ThreadedAsync.register_loop_callback(self.set_async)
        # XXX If we are not in async mode, this will cause dead
        # Connections to be leaked.

    def set_async(self, map):
        # XXX do we need a lock around this?  I'm not sure there is
        # any harm to a race with _do_io().
        self.trigger = trigger()
        self.thr_async = 1

    def is_async(self):
        if self.thr_async:
            return 1
        else:
            return 0
            
    def _do_io(self, wait=0): # XXX need better name
        # XXX invariant? lock must be held when calling with wait==1
        # otherwise, in non-async mode, there will be no poll

        if __debug__:
            log("_do_io(wait=%d), async=%d" % (wait, self.is_async()),
                level=zLOG.DEBUG)
        if self.is_async():
            self.trigger.pull_trigger()
            if wait:
                self.__reply_lock.acquire()
                # wait until reply...
                self.__reply_lock.release()
        else:
            if wait:
                # do loop only if lock is already acquired
                while not self.__reply_lock.acquire(0):
                    asyncore.poll(10.0, self._map)
                    if self.closed:
                        raise DisconnectedError()
                self.__reply_lock.release()
            else:
                asyncore.poll(0.0, self._map)

        # XXX it seems that we need to release before returning if
        # called with wait==1.  perhaps the caller need not acquire
        # upon return...

class ServerConnection(Connection):
    # XXX this is a hack
    def _do_io(self, wait=0):
        """If this is a server, there is no explicit IO to do"""
        pass

class ManagedServerConnection(ServerConnection):
    """A connection that notifies its ConnectionManager of closing"""
    __super_init = Connection.__init__
    __super_close = Connection.close

    def __init__(self, sock, addr, obj, mgr):
        self.__mgr = mgr
        self.__super_init(sock, addr, obj)

    def close(self):
        self.__super_close()
        self.__mgr.close(self)

class ManagedConnection(Connection):
    """A connection that notifies its ConnectionManager of closing.

    A managed connection also defers the ThreadedAsync work to its
    manager. 
    """
    __super_init = Connection.__init__
    __super_close = Connection.close

    def __init__(self, sock, addr, obj, mgr):
        self.__mgr = mgr
        self.__super_init(sock, addr, obj)
        self.check_mgr_async()

    def close_trigger(self):
        # the manager should actually close the trigger
        del self.trigger

    def _prepare_async(self):
        # Don't do the register_loop_callback that the superclass does
        pass

    def check_mgr_async(self):
        if not self.thr_async and self.__mgr.thr_async:
            assert self.__mgr.trigger is not None, \
                   "manager (%s) has no trigger" % self.__mgr
            self.thr_async = 1
            self.trigger = self.__mgr.trigger
            return 1
        return 0

    def is_async(self):
        if self.thr_async:
            return 1
        return self.check_mgr_async()

    def close(self):
        self.__super_close()
        self.__mgr.notify_closed(self)



=== Added File StandaloneZODB/ZEO/zrpc/error.py ===
from ZODB import POSException
from ZEO.Exceptions import Disconnected

class ZRPCError(POSException.StorageError):
    pass

class DecodingError(ZRPCError):
    """A ZRPC message could not be decoded."""

class DisconnectedError(ZRPCError, Disconnected):
    """The database storage is disconnected from the storage server."""



=== Added File StandaloneZODB/ZEO/zrpc/log.py ===
import os
import zLOG

_label = "zrpc:%s" % os.getpid()

def new_label():
    global _label
    _label = "zrpc:%s" % os.getpid()

def log(message, level=zLOG.BLATHER, label=None, error=None):
    zLOG.LOG(label or _label, level, message, error=error)


=== Added File StandaloneZODB/ZEO/zrpc/marshal.py ===
import cPickle
from cStringIO import StringIO
import types

class Marshaller:
    """Marshal requests and replies to second across network"""

    # It's okay to share a single Pickler as long as it's in fast
    # mode, which means that it doesn't have a memo.
    
    pickler = cPickle.Pickler()
    pickler.fast = 1
    pickle = pickler.dump

    errors = (cPickle.UnpickleableError,
              cPickle.UnpicklingError,
              cPickle.PickleError,
              cPickle.PicklingError)

    def encode(self, msgid, flags, name, args):
        """Returns an encoded message"""
        return self.pickle((msgid, flags, name, args), 1)

    def decode(self, msg):
        """Decodes msg and returns its parts"""
        unpickler = cPickle.Unpickler(StringIO(msg))
        unpickler.find_global = find_global

        try:
            return unpickler.load() # msgid, flags, name, args
        except (cPickle.UnpicklingError, IndexError), err_msg:
            log("can't decode %s" % repr(msg), level=zLOG.ERROR)
            raise DecodingError(msg)

_globals = globals()
_silly = ('__doc__',)

def find_global(module, name):
    """Helper for message unpickler"""
    try:
        m = __import__(module, _globals, _globals, _silly)
    except ImportError, msg:
        raise ZRPCError("import error %s: %s" % (module, msg))

    try:
        r = getattr(m, name)
    except AttributeError:
        raise ZRPCError("module %s has no global %s" % (module, name))
        
    safe = getattr(r, '__no_side_effects__', 0)
    if safe:
        return r

    # XXX what's a better way to do this?  esp w/ 2.1 & 2.2
    if type(r) == types.ClassType and issubclass(r, Exception):
        return r

    raise ZRPCError("Unsafe global: %s.%s" % (module, name))


=== Added File StandaloneZODB/ZEO/zrpc/server.py ===
import asyncore
import socket
import types

from ZEO.zrpc.connection import Connection, Delay
from ZEO.zrpc.log import log

# Export the main asyncore loop
loop = asyncore.loop

class Dispatcher(asyncore.dispatcher):
    """A server that accepts incoming RPC connections"""
    __super_init = asyncore.dispatcher.__init__

    reuse_addr = 1

    def __init__(self, addr, factory=Connection, reuse_addr=None):  
        self.__super_init()
        self.addr = addr
        self.factory = factory
        self.clients = []
        if reuse_addr is not None:
            self.reuse_addr = reuse_addr
        self._open_socket()

    def _open_socket(self):
        if type(self.addr) == types.TupleType:
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.set_reuse_addr()
        log("listening on %s" % str(self.addr))
        self.bind(self.addr)
        self.listen(5)

    def writable(self):
        return 0

    def readable(self):
        return 1

    def handle_accept(self):
        try:
            sock, addr = self.accept()
        except socket.error, msg:
            log("accepted failed: %s" % msg)
            return
        c = self.factory(sock, addr)
        log("connect from %s: %s" % (repr(addr), c))
        self.clients.append(c)


=== Added File StandaloneZODB/ZEO/zrpc/trigger.py ===
##############################################################################
# 
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
# 
# Copyright (c) Digital Creations.  All rights reserved.
# 
# This license has been certified as Open Source(tm).
# 
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# 
# 1. Redistributions in source code must retain the above copyright
#    notice, this list of conditions, and the following disclaimer.
# 
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions, and the following disclaimer in
#    the documentation and/or other materials provided with the
#    distribution.
# 
# 3. Digital Creations requests that attribution be given to Zope
#    in any manner possible. Zope includes a "Powered by Zope"
#    button that is installed by default. While it is not a license
#    violation to remove this button, it is requested that the
#    attribution remain. A significant investment has been put
#    into Zope, and this effort will continue if the Zope community
#    continues to grow. This is one way to assure that growth.
# 
# 4. All advertising materials and documentation mentioning
#    features derived from or use of this software must display
#    the following acknowledgement:
# 
#      "This product includes software developed by Digital Creations
#      for use in the Z Object Publishing Environment
#      (http://www.zope.org/)."
# 
#    In the event that the product being advertised includes an
#    intact Zope distribution (with copyright and license included)
#    then this clause is waived.
# 
# 5. Names associated with Zope or Digital Creations must not be used to
#    endorse or promote products derived from this software without
#    prior written permission from Digital Creations.
# 
# 6. Modified redistributions of any form whatsoever must retain
#    the following acknowledgment:
# 
#      "This product includes software developed by Digital Creations
#      for use in the Z Object Publishing Environment
#      (http://www.zope.org/)."
# 
#    Intact (re-)distributions of any official Zope release do not
#    require an external acknowledgement.
# 
# 7. Modifications are encouraged but must be packaged separately as
#    patches to official Zope releases.  Distributions that do not
#    clearly separate the patches from the original work must be clearly
#    labeled as unofficial distributions.  Modifications which do not
#    carry the name Zope may be packaged in any form, as long as they
#    conform to all of the clauses above.
# 
# 
# Disclaimer
# 
#   THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
#   EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#   IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
#   PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
#   USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
#   ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
#   OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
#   OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
#   SUCH DAMAGE.
# 
# 
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations.  Specific
# attributions are listed in the accompanying credits file.
# 
##############################################################################

# This module is a simplified version of the select_trigger module
# from Sam Rushing's Medusa server.


import asyncore
#import asynchat

import os
import socket
import string
import thread
    
if os.name == 'posix':

    class trigger (asyncore.file_dispatcher):

        "Wake up a call to select() running in the main thread"

        # This is useful in a context where you are using Medusa's I/O
        # subsystem to deliver data, but the data is generated by another
        # thread.  Normally, if Medusa is in the middle of a call to
        # select(), new output data generated by another thread will have
        # to sit until the call to select() either times out or returns.
        # If the trigger is 'pulled' by another thread, it should immediately
        # generate a READ event on the trigger object, which will force the
        # select() invocation to return.

        # A common use for this facility: letting Medusa manage I/O for a
        # large number of connections; but routing each request through a
        # thread chosen from a fixed-size thread pool.  When a thread is
        # acquired, a transaction is performed, but output data is
        # accumulated into buffers that will be emptied more efficiently
        # by Medusa. [picture a server that can process database queries
        # rapidly, but doesn't want to tie up threads waiting to send data
        # to low-bandwidth connections]

        # The other major feature provided by this class is the ability to
        # move work back into the main thread: if you call pull_trigger()
        # with a thunk argument, when select() wakes up and receives the
        # event it will call your thunk from within that thread.  The main
        # purpose of this is to remove the need to wrap thread locks around
        # Medusa's data structures, which normally do not need them.  [To see
        # why this is true, imagine this scenario: A thread tries to push some
        # new data onto a channel's outgoing data queue at the same time that
        # the main thread is trying to remove some]

        def __init__ (self):
            r, w = os.pipe()
            self.trigger = w
            asyncore.file_dispatcher.__init__ (self, r)
            self.lock = thread.allocate_lock()
            self.thunks = []

        def __repr__ (self):
            return '<select-trigger (pipe) at %x>' % id(self)

        def readable (self):
            return 1

        def writable (self):
            return 0

        def handle_connect (self):
            pass

        def pull_trigger (self, thunk=None):
            # print 'PULL_TRIGGER: ', len(self.thunks)
            if thunk:
                try:
                    self.lock.acquire()
                    self.thunks.append (thunk)
                finally:
                    self.lock.release()
            os.write (self.trigger, 'x')

        def handle_read (self):
            self.recv (8192)
            try:
                self.lock.acquire()
                for thunk in self.thunks:
                    try:
                        thunk()
                    except:
                        (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
                        print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
                self.thunks = []
            finally:
                self.lock.release()

else:

    # win32-safe version

    class trigger (asyncore.dispatcher):

        address = ('127.9.9.9', 19999)

        def __init__ (self):
            a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
            w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)

            # set TCP_NODELAY to true to avoid buffering
            w.setsockopt(socket.IPPROTO_TCP, 1, 1)

            # tricky: get a pair of connected sockets
            host='127.0.0.1'
            port=19999
            while 1:
                try:
                    self.address=(host, port)
                    a.bind(self.address)
                    break
                except:
                    if port <= 19950:
                        raise 'Bind Error', 'Cannot bind trigger!'
                    port=port - 1
            
            a.listen (1)
            w.setblocking (0)
            try:
                w.connect (self.address)
            except:
                pass
            r, addr = a.accept()
            a.close()
            w.setblocking (1)
            self.trigger = w

            asyncore.dispatcher.__init__ (self, r)
            self.lock = thread.allocate_lock()
            self.thunks = []
            self._trigger_connected = 0

        def __repr__ (self):
            return '<select-trigger (loopback) at %x>' % id(self)

        def readable (self):
            return 1

        def writable (self):
            return 0

        def handle_connect (self):
            pass

        def pull_trigger (self, thunk=None):
            if thunk:
                try:
                    self.lock.acquire()
                    self.thunks.append (thunk)
                finally:
                    self.lock.release()
            self.trigger.send ('x')

        def handle_read (self):
            self.recv (8192)
            try:
                self.lock.acquire()
                for thunk in self.thunks:
                    try:
                        thunk()
                    except:
                        (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
                        print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
                self.thunks = []
            finally:
                self.lock.release()