[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/ Refactored the zrpc implementation to:

Jim Fulton jim at zope.com
Thu Jan 28 16:49:35 EST 2010

Log message for revision 108624:
  Refactored the zrpc implementation to:
  - Most server methods now return data to clients more quickly by writing to
    client sockets immediately, rather than waiting for the asyncore
    select loop to get around to it.
  - More clearly define client and server responsibilities. Machinery
    needed for just clients or just servers has been moved to the
    corresponding connection subclasses.
  - Degeneralized "flags" argument to many methods. There's just one
    async flag.

  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/tests/servertesting.py
  U   ZODB/trunk/src/ZEO/tests/testZEO2.py
  U   ZODB/trunk/src/ZEO/zrpc/connection.py

Modified: ZODB/trunk/src/ZEO/StorageServer.py
--- ZODB/trunk/src/ZEO/StorageServer.py	2010-01-28 21:49:32 UTC (rev 108623)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2010-01-28 21:49:34 UTC (rev 108624)
@@ -1340,10 +1340,10 @@
         self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
     def serialnos(self, arg):
-        self.rpc.callAsync('serialnos', arg)
+        self.rpc.callAsyncNoPoll('serialnos', arg)
     def info(self, arg):
-        self.rpc.callAsync('info', arg)
+        self.rpc.callAsyncNoPoll('info', arg)
     def storeBlob(self, oid, serial, blobfilename):

Modified: ZODB/trunk/src/ZEO/tests/servertesting.py
--- ZODB/trunk/src/ZEO/tests/servertesting.py	2010-01-28 21:49:32 UTC (rev 108623)
+++ ZODB/trunk/src/ZEO/tests/servertesting.py	2010-01-28 21:49:34 UTC (rev 108624)
@@ -56,3 +56,5 @@
     def callAsync(self, meth, *args):
         print self.name, 'callAsync', meth, repr(args)
+    callAsyncNoPoll = callAsync

Modified: ZODB/trunk/src/ZEO/tests/testZEO2.py
--- ZODB/trunk/src/ZEO/tests/testZEO2.py	2010-01-28 21:49:32 UTC (rev 108623)
+++ ZODB/trunk/src/ZEO/tests/testZEO2.py	2010-01-28 21:49:34 UTC (rev 108624)
@@ -78,9 +78,10 @@
     >>> zs2.storeBlobEnd(oid, serial, data, '1')
     >>> delay = zs2.vote('1')
-    >>> def send_reply(id, reply):
-    ...     print 'reply', id, reply
-    >>> delay.set_sender(1, send_reply, None)
+    >>> class Sender:
+    ...     def send_reply(self, id, reply):
+    ...         print 'reply', id, reply
+    >>> delay.set_sender(1, Sender())
     >>> logger = logging.getLogger('ZEO')
     >>> handler = logging.StreamHandler(sys.stdout)

Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
--- ZODB/trunk/src/ZEO/zrpc/connection.py	2010-01-28 21:49:32 UTC (rev 108623)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py	2010-01-28 21:49:34 UTC (rev 108624)
@@ -30,7 +30,6 @@
 import ZODB.POSException
 REPLY = ".reply" # message name used for replies
-ASYNC = 1
 exception_type_type = type(Exception)
@@ -180,34 +179,33 @@
     the mainloop from sending a response.
-    def set_sender(self, msgid, send_reply, return_error):
+    def set_sender(self, msgid, conn):
         self.msgid = msgid
-        self.send_reply = send_reply
-        self.return_error = return_error
+        self.conn = conn
     def reply(self, obj):
-        self.send_reply(self.msgid, obj)
+        self.conn.send_reply(self.msgid, obj)
     def error(self, exc_info):
         log("Error raised in delayed method", logging.ERROR, exc_info=True)
-        self.return_error(self.msgid, 0, *exc_info[:2])
+        self.conn.return_error(self.msgid, *exc_info[:2])
 class MTDelay(Delay):
     def __init__(self):
         self.ready = threading.Event()
-    def set_sender(self, msgid, send_reply, return_error):
-        Delay.set_sender(self, msgid, send_reply, return_error)
+    def set_sender(self, *args):
+        Delay.set_sender(self, *args)
     def reply(self, obj):
-        Delay.reply(self, obj)
+        self.conn.call_from_thread(self.conn.send_reply, self.msgid, obj)
     def error(self, exc_info):
-        Delay.error(self, exc_info)
+        self.conn.call_from_thread(Delay.error, self, exc_info)
@@ -304,9 +302,7 @@
     client for that particular call.
     The protocol also supports asynchronous calls.  The client does
-    not wait for a return value for an asynchronous call.  The only
-    defined flag is ASYNC.  If a method call message has the ASYNC
-    flag set, the server will raise an exception.
+    not wait for a return value for an asynchronous call.
     If a method call raises an Exception, the exception is propagated
     back to the client via the REPLY message.  The client side will
@@ -428,15 +424,6 @@
         # The singleton dict is a socket map containing only this object.
         self._singleton = {self._fileno: self}
-        # msgid_lock guards access to msgid
-        self.msgid = 0
-        self.msgid_lock = threading.Lock()
-        # replies_cond is used to block when a synchronous call is
-        # waiting for a response
-        self.replies_cond = threading.Condition()
-        self.replies = {}
         # waiting_for_reply is used internally to indicate whether
         # a call is in progress.  setting a session key is deferred
         # until after the call returns.
@@ -488,9 +475,6 @@
         self.closed = True
-        self.replies_cond.acquire()
-        self.replies_cond.notifyAll()
-        self.replies_cond.release()
     def register_object(self, obj):
         """Register obj as the true object to invoke methods on."""
@@ -537,29 +521,19 @@
         # will raise an exception.  The exception will ultimately
         # result in asycnore calling handle_error(), which will
         # close the connection.
-        msgid, flags, name, args = self.marshal.decode(message)
+        msgid, async, name, args = self.marshal.decode(message)
         if debug_zrpc:
-            self.log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
+            self.log("recv msg: %s, %s, %s, %s" % (msgid, async, name,
         if name == REPLY:
-            self.handle_reply(msgid, flags, args)
+            assert not async
+            self.handle_reply(msgid, args)
-            self.handle_request(msgid, flags, name, args)
+            self.handle_request(msgid, async, name, args)
-    def handle_reply(self, msgid, flags, args):
-        if debug_zrpc:
-            self.log("recv reply: %s, %s, %s"
-                     % (msgid, flags, short_repr(args)), level=TRACE)
-        self.replies_cond.acquire()
-        try:
-            self.replies[msgid] = flags, args
-            self.replies_cond.notifyAll()
-        finally:
-            self.replies_cond.release()
-    def handle_request(self, msgid, flags, name, args):
+    def handle_request(self, msgid, async, name, args):
         obj = self.obj
         if name.startswith('_') or not hasattr(obj, name):
@@ -590,9 +564,14 @@
                 self.log("%s() raised exception: %s" % (name, msg),
                          logging.ERROR, exc_info=True)
             error = sys.exc_info()[:2]
-            return self.return_error(msgid, flags, *error)
+            if async:
+                self.log("Asynchronous call raised exception: %s" % self,
+                         level=logging.ERROR, exc_info=True)
+            else:
+                self.return_error(msgid, *error)
+            return
-        if flags & ASYNC:
+        if async:
             if ret is not None:
                 raise ZRPCError("async method %s returned value %s" %
                                 (name, short_repr(ret)))
@@ -601,43 +580,19 @@
                 self.log("%s returns %s" % (name, short_repr(ret)),
             if isinstance(ret, Delay):
-                ret.set_sender(msgid, self.send_reply, self.return_error)
+                ret.set_sender(msgid, self)
-                self.send_reply(msgid, ret)
+                self.send_reply(msgid, ret, not self.delay_sesskey)
         if self.delay_sesskey:
             self.delay_sesskey = None
-    def handle_error(self):
-        if sys.exc_info()[0] == SystemExit:
-            raise sys.exc_info()
-        self.log("Error caught in asyncore",
-                 level=logging.ERROR, exc_info=True)
-        self.close()
+    def return_error(self, msgid, err_type, err_value):
+        # Note that, ideally, this should be defined soley for
+        # servers, but a test arranges to get it called on
+        # a client. Too much trouble to fix it now. :/
-    def send_reply(self, msgid, ret):
-        # encode() can pass on a wide variety of exceptions from cPickle.
-        # While a bare `except` is generally poor practice, in this case
-        # it's acceptable -- we really do want to catch every exception
-        # cPickle may raise.
-        try:
-            msg = self.marshal.encode(msgid, 0, REPLY, ret)
-        except: # see above
-            try:
-                r = short_repr(ret)
-            except:
-                r = "<unreprable>"
-            err = ZRPCError("Couldn't pickle return %.100s" % r)
-            msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
-        self.message_output(msg)
-        self.poll()
-    def return_error(self, msgid, flags, err_type, err_value):
-        if flags & ASYNC:
-            self.log("Asynchronous call raised exception: %s" % self,
-                     level=logging.ERROR, exc_info=True)
-            return
         if not isinstance(err_value, Exception):
             err_value = err_type, err_value
@@ -657,79 +612,37 @@
+    def handle_error(self):
+        if sys.exc_info()[0] == SystemExit:
+            raise sys.exc_info()
+        self.log("Error caught in asyncore",
+                 level=logging.ERROR, exc_info=True)
+        self.close()
     def setSessionKey(self, key):
         if self.waiting_for_reply:
             self.delay_sesskey = key
-    # The next two public methods (call and callAsync) are used by
-    # clients to invoke methods on remote objects
+    def send_call(self, method, args, async=False):
+        # send a message and return its msgid
+        if async:
+            msgid = 0
+        else:
+            msgid = self._new_msgid()
-    def __new_msgid(self):
-        self.msgid_lock.acquire()
-        try:
-            msgid = self.msgid
-            self.msgid = self.msgid + 1
-            return msgid
-        finally:
-            self.msgid_lock.release()
-    def __call_message(self, method, args, flags):
-        # compute a message and return it
-        msgid = self.__new_msgid()
         if debug_zrpc:
-            self.log("send msg: %d, %d, %s, ..." % (msgid, flags, method),
+            self.log("send msg: %d, %d, %s, ..." % (msgid, async, method),
-        return self.marshal.encode(msgid, flags, method, args)
-    def send_call(self, method, args, flags):
-        # send a message and return its msgid
-        msgid = self.__new_msgid()
-        if debug_zrpc:
-            self.log("send msg: %d, %d, %s, ..." % (msgid, flags, method),
-                     level=TRACE)
-        buf = self.marshal.encode(msgid, flags, method, args)
+        buf = self.marshal.encode(msgid, async, method, args)
         return msgid
-    def call(self, method, *args):
-        if self.closed:
-            raise DisconnectedError()
-        msgid = self.send_call(method, args, 0)
-        r_flags, r_args = self.wait(msgid)
-        if (isinstance(r_args, tuple) and len(r_args) > 1
-            and type(r_args[0]) == exception_type_type
-            and issubclass(r_args[0], Exception)):
-            inst = r_args[1]
-            raise inst # error raised by server
-        else:
-            return r_args
-    # For testing purposes, it is useful to begin a synchronous call
-    # but not block waiting for its response.
-    def _deferred_call(self, method, *args):
-        if self.closed:
-            raise DisconnectedError()
-        msgid = self.send_call(method, args, 0)
-        self.trigger.pull_trigger()
-        return msgid
-    def _deferred_wait(self, msgid):
-        r_flags, r_args = self.wait(msgid)
-        if (isinstance(r_args, tuple)
-            and type(r_args[0]) == exception_type_type
-            and issubclass(r_args[0], Exception)):
-            inst = r_args[1]
-            raise inst # error raised by server
-        else:
-            return r_args
     def callAsync(self, method, *args):
         if self.closed:
             raise DisconnectedError()
-        self.send_call(method, args, ASYNC)
+        self.send_call(method, args, 1)
     def callAsyncNoPoll(self, method, *args):
@@ -738,7 +651,7 @@
         # allowing any client to sneak in a load request.
         if self.closed:
             raise DisconnectedError()
-        self.send_call(method, args, ASYNC)
+        self.send_call(method, args, 1)
     def callAsyncIterator(self, iterator):
         """Queue a sequence of calls using an iterator
@@ -746,47 +659,12 @@
         The calls will not be interleaved with other calls from the same
-        self.message_output(self.__outputIterator(iterator))
+        self.message_output(self.marshal.encode(0, 1, method, args)
+                            for method, args in iterator)
-    def __outputIterator(self, iterator):
-        for method, args in iterator:
-            yield self.__call_message(method, args, ASYNC)
+    def handle_reply(self, msgid, ret):
+        assert msgid == -1 and ret is None
-    def wait(self, msgid):
-        """Invoke asyncore mainloop and wait for reply."""
-        if debug_zrpc:
-            self.log("wait(%d)" % msgid, level=TRACE)
-        self.trigger.pull_trigger()
-        # Delay used when we call asyncore.poll() directly.
-        # Start with a 1 msec delay, double until 1 sec.
-        delay = 0.001
-        self.replies_cond.acquire()
-        try:
-            while 1:
-                if self.closed:
-                    raise DisconnectedError()
-                reply = self.replies.get(msgid)
-                if reply is not None:
-                    del self.replies[msgid]
-                    if debug_zrpc:
-                        self.log("wait(%d): reply=%s" %
-                                 (msgid, short_repr(reply)), level=TRACE)
-                    return reply
-                self.replies_cond.wait()
-        finally:
-            self.replies_cond.release()
-    def flush(self):
-        """Invoke poll() until the output buffer is empty."""
-        if debug_zrpc:
-            self.log("flush")
-        while self.writable():
-            self.poll()
     def poll(self):
         """Invoke asyncore mainloop to get pending message out."""
         if debug_zrpc:
@@ -794,7 +672,6 @@
 class ManagedServerConnection(Connection):
     """Server-side Connection subclass."""
@@ -803,6 +680,7 @@
     # Servers use a shared server trigger that uses the asyncore socket map
     trigger = trigger()
+    call_from_thread = trigger.pull_trigger
     def __init__(self, sock, addr, obj, mgr):
         self.mgr = mgr
@@ -821,13 +699,33 @@
+    def send_reply(self, msgid, ret, immediately=True):
+        # encode() can pass on a wide variety of exceptions from cPickle.
+        # While a bare `except` is generally poor practice, in this case
+        # it's acceptable -- we really do want to catch every exception
+        # cPickle may raise.
+        try:
+            msg = self.marshal.encode(msgid, 0, REPLY, ret)
+        except: # see above
+            try:
+                r = short_repr(ret)
+            except:
+                r = "<unreprable>"
+            err = ZRPCError("Couldn't pickle return %.100s" % r)
+            msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
+        self.message_output(msg)
+        if immediately:
+            self.poll()
+    poll = smac.SizedMessageAsyncConnection.handle_write
 class ManagedClientConnection(Connection):
     """Client-side Connection subclass."""
     __super_init = Connection.__init__
-    __super_close = Connection.close
     base_message_output = Connection.message_output
     trigger = client_trigger
+    call_from_thread = trigger.pull_trigger
     def __init__(self, sock, addr, mgr):
         self.mgr = mgr
@@ -846,9 +744,24 @@
         self.queue_output = True
         self.queued_messages = []
+        # msgid_lock guards access to msgid
+        self.msgid = 0
+        self.msgid_lock = threading.Lock()
+        # replies_cond is used to block when a synchronous call is
+        # waiting for a response
+        self.replies_cond = threading.Condition()
+        self.replies = {}
         self.__super_init(sock, addr, None, tag='C', map=client_map)
+    def close(self):
+        Connection.close(self)
+        self.replies_cond.acquire()
+        self.replies_cond.notifyAll()
+        self.replies_cond.release()
     # Our message_ouput() queues messages until recv_handshake() gets the
     # protocol handshake from the server.
     def message_output(self, message):
@@ -890,3 +803,88 @@
             self.queue_output = False
+    def _new_msgid(self):
+        self.msgid_lock.acquire()
+        try:
+            msgid = self.msgid
+            self.msgid = self.msgid + 1
+            return msgid
+        finally:
+            self.msgid_lock.release()
+    def call(self, method, *args):
+        if self.closed:
+            raise DisconnectedError()
+        msgid = self.send_call(method, args)
+        r_args = self.wait(msgid)
+        if (isinstance(r_args, tuple) and len(r_args) > 1
+            and type(r_args[0]) == exception_type_type
+            and issubclass(r_args[0], Exception)):
+            inst = r_args[1]
+            raise inst # error raised by server
+        else:
+            return r_args
+    def wait(self, msgid):
+        """Invoke asyncore mainloop and wait for reply."""
+        if debug_zrpc:
+            self.log("wait(%d)" % msgid, level=TRACE)
+        self.trigger.pull_trigger()
+        # Delay used when we call asyncore.poll() directly.
+        # Start with a 1 msec delay, double until 1 sec.
+        delay = 0.001
+        self.replies_cond.acquire()
+        try:
+            while 1:
+                if self.closed:
+                    raise DisconnectedError()
+                reply = self.replies.get(msgid, self)
+                if reply is not self:
+                    del self.replies[msgid]
+                    if debug_zrpc:
+                        self.log("wait(%d): reply=%s" %
+                                 (msgid, short_repr(reply)), level=TRACE)
+                    return reply
+                self.replies_cond.wait()
+        finally:
+            self.replies_cond.release()
+    # For testing purposes, it is useful to begin a synchronous call
+    # but not block waiting for its response.
+    def _deferred_call(self, method, *args):
+        if self.closed:
+            raise DisconnectedError()
+        msgid = self.send_call(method, args)
+        self.trigger.pull_trigger()
+        return msgid
+    def _deferred_wait(self, msgid):
+        r_args = self.wait(msgid)
+        if (isinstance(r_args, tuple)
+            and type(r_args[0]) == exception_type_type
+            and issubclass(r_args[0], Exception)):
+            inst = r_args[1]
+            raise inst # error raised by server
+        else:
+            return r_args
+    def handle_reply(self, msgid, args):
+        if debug_zrpc:
+            self.log("recv reply: %s, %s"
+                     % (msgid, short_repr(args)), level=TRACE)
+        self.replies_cond.acquire()
+        try:
+            self.replies[msgid] = args
+            self.replies_cond.notifyAll()
+        finally:
+            self.replies_cond.release()
+    def send_reply(self, msgid, ret):
+        # Whimper. Used to send heartbeat
+        assert msgid == -1 and ret is None
+        self.message_output('(J\xff\xff\xff\xffK\x00U\x06.replyNt.')

More information about the Zodb-checkins mailing list