[Zodb-checkins] CVS: ZODB3/ZEO/zrpc - connection.py:1.24

Guido van Rossum guido@python.org
Mon, 16 Sep 2002 18:06:41 -0400


Update of /cvs-repository/ZODB3/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv895

Modified Files:
	connection.py 
Log Message:
This checkin contains changes (by me) that will allow multiple
parallel outstanding calls.  However it also contains code (by Jeremy,
with one notifyAll() call added by me) that enforces the old rule of a
single outstanding call.  This is hopefully unnecessessary, but we
haven't reviewed the server side yet to make sure that that's really
the case (the server was until now getting serialized calls per
connection).


=== ZODB3/ZEO/zrpc/connection.py 1.23 => 1.24 ===
--- ZODB3/ZEO/zrpc/connection.py:1.23	Sun Sep 15 22:42:43 2002
+++ ZODB3/ZEO/zrpc/connection.py	Mon Sep 16 18:06:41 2002
@@ -74,14 +74,12 @@
     and asynchronous calls, which do not.
 
     It uses the Marshaller class to handle encoding and decoding of
-    method calls are arguments.  Marshaller uses pickle to encode
+    method calls and arguments.  Marshaller uses pickle to encode
     arbitrary Python objects.  The code here doesn't ever see the wire
     format.
 
     A Connection is designed for use in a multithreaded application,
     where a synchronous call must block until a response is ready.
-    The current design only allows a single synchronous call to be
-    outstanding.
 
     A socket connection between a client and a server allows either
     side to invoke methods on the other side.  The processes on each
@@ -137,13 +135,10 @@
         self._map = {self._fileno: self}
         # __msgid_lock guards access to msgid
         self.__msgid_lock = threading.Lock()
-        # __call_lock prevents more than one synchronous call from
-        # being issued at one time.
-        self.__call_lock = threading.Lock()
-        # __reply_lock is used to block when a synchronous call is
+        # __replies_cond is used to block when a synchronous call is
         # waiting for a response
-        self.__reply_lock = threading.Lock()
-        self.__reply_lock.acquire()
+        self.__replies_cond = threading.Condition()
+        self.__replies = {}
         self.register_object(obj)
         self.handshake()
 
@@ -153,6 +148,7 @@
     def close(self):
         if self.closed:
             return
+        self._map.clear()
         self.closed = 1
         self.close_trigger()
         self.__super_close()
@@ -183,6 +179,9 @@
     def recv_handshake(self, message):
         if message == self.protocol_version:
             self.message_input = self._message_input
+        else:
+            log("recv_handshake: bad handshake %s" % repr(message),
+                level=zLOG.ERROR)
         # otherwise do something else...
 
     def message_input(self, message):
@@ -208,8 +207,12 @@
         if __debug__:
             log("recv reply: %s, %s, %s" % (msgid, flags, short_repr(args)),
                 level=zLOG.DEBUG)
-        self.__reply = msgid, flags, args
-        self.__reply_lock.release() # will fail if lock is unlocked
+        self.__replies_cond.acquire()
+        try:
+            self.__replies[msgid] = flags, args
+            self.__replies_cond.notifyAll()
+        finally:
+            self.__replies_cond.release()
 
     def handle_request(self, msgid, flags, name, args):
         if not self.check_method(name):
@@ -225,6 +228,10 @@
             raise
         except Exception, msg:
             error = sys.exc_info()
+            # XXX Since we're just passing this on to the caller, and
+            # there are several cases where this happens during the
+            # normal course of action, shouldn't this be logged at the
+            # INFO level?
             log("%s() raised exception: %s" % (name, msg), zLOG.ERROR,
                 error=error)
             error = error[:2]
@@ -298,22 +305,19 @@
         return msgid
 
     def call(self, method, *args):
-        self.__call_lock.acquire()
+        self.__replies_cond.acquire()
         try:
-            return self._call(method, args)
+            while self.__replies and not self.closed:
+                log("waiting for previous call to finish %s" %
+                    repr(self.__replies.values()[0]))
+                self.__replies_cond.wait(30)
+            if self.closed:
+                raise DisconnectedError()
+            msgid = self.send_call(method, args, 0)
+            self.__replies[msgid] = None
         finally:
-            self.__call_lock.release()
-
-    def _call(self, method, args):
-        if self.closed:
-            raise DisconnectedError()
-        msgid = self.send_call(method, args, 0)
-        self.__reply = None
-        self.wait() # will release reply lock before returning
-        r_msgid, r_flags, r_args = self.__reply
-        self.__reply_lock.acquire()
-        assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
-
+            self.__replies_cond.release()
+        r_flags, r_args = self.wait(msgid)
         if (isinstance(r_args, types.TupleType)
             and type(r_args[0]) == types.ClassType
             and issubclass(r_args[0], Exception)):
@@ -347,27 +351,39 @@
         else:
             return 0
 
-    def wait(self):
+    def wait(self, msgid):
         """Invoke asyncore mainloop and wait for reply."""
         if __debug__:
             log("wait() async=%d" % self.is_async(), level=zLOG.TRACE)
         if self.is_async():
             self.trigger.pull_trigger()
-            self.__reply_lock.acquire()
-            # wait until reply...
-        else:
-            # Do loop until asyncore handler unlocks the lock.
-            assert not self.__reply_lock.acquire(0)
-            while not self.__reply_lock.acquire(0):
-                try:
-                    asyncore.poll(10.0, self._map)
-                except select.error, err:
-                    log("Closing.  asyncore.poll() raised %s." % err,
-                        level=zLOG.BLATHER)
-                    self.close()
+            
+        self.__replies_cond.acquire()
+        try:
+            while 1:
                 if self.closed:
                     raise DisconnectedError()
-        self.__reply_lock.release()
+                reply = self.__replies.get(msgid)
+                if reply is not None:
+                    del self.__replies[msgid]
+                    assert len(self.__replies) == 0
+                    self.__replies_cond.notifyAll()
+                    return reply
+                if self.is_async():
+                    self.__replies_cond.wait(10.0)
+                else:
+                    self.__replies_cond.release()
+                    try:
+                        try:
+                            asyncore.poll(10.0, self._map)
+                        except select.error, err:
+                            log("Closing.  asyncore.poll() raised %s." % err,
+                                level=zLOG.BLATHER)
+                            self.close()
+                    finally:
+                        self.__replies_cond.acquire()
+        finally:
+            self.__replies_cond.release()
 
     def poll(self, wait_for_reply=0):
         """Invoke asyncore mainloop to get pending message out."""