[Zodb-checkins] CVS: ZODB3/ZEO/zrpc - smac.py:1.31

Guido van Rossum guido@python.org
Sat, 28 Sep 2002 23:22:28 -0400


Update of /cvs-repository/ZODB3/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv27189

Modified Files:
	smac.py 
Log Message:
Tim saw Win2k crashes that made me realize that the input and output
buffer variables here are accessed from multiple threads without any
locking@  Add such locking: a separate lock for input and one for
output.  XXX Note: handle_read() keeps the lock for a potentially long
time.  But this is required to serialize incoming calls anyway.

Unrelated nicety: use short_repr() when logging message output, for
consistency with other places.


=== ZODB3/ZEO/zrpc/smac.py 1.30 => 1.31 ===
--- ZODB3/ZEO/zrpc/smac.py:1.30	Sat Sep 28 22:46:58 2002
+++ ZODB3/ZEO/zrpc/smac.py	Sat Sep 28 23:22:28 2002
@@ -17,11 +17,12 @@
 """
 
 import asyncore, struct
+import threading
 from ZEO.Exceptions import Disconnected
 import zLOG
 from types import StringType
 
-from ZEO.zrpc.log import log
+from ZEO.zrpc.log import log, short_repr
 
 import socket, errno
 
@@ -63,6 +64,8 @@
             self._debug = debug
         elif not hasattr(self, '_debug'):
             self._debug = __debug__
+        # __input_lock protects __inp, __input_len, __state, __msg_size
+        self.__input_lock = threading.Lock()
         self.__inp = None # None, a single String, or a list
         self.__input_len = 0
         # Instance variables __state and __msg_size work together:
@@ -74,6 +77,7 @@
         # The state alternates between 0 and 1.
         self.__state = 0
         self.__msg_size = 4
+        self.__output_lock = threading.Lock() # Protects __output
         self.__output = []
         self.__closed = 0
         self.__super_init(sock, map)
@@ -95,47 +99,61 @@
         if not d:
             return
 
-        input_len = self.__input_len + len(d)
-        msg_size = self.__msg_size
-        state = self.__state
-
-        inp = self.__inp
-        if msg_size > input_len:
-            if inp is None:
-                self.__inp = d
-            elif type(self.__inp) is StringType:
-                self.__inp = [self.__inp, d]
+        self.__input_lock.acquire()
+        try:
+            input_len = self.__input_len + len(d)
+            msg_size = self.__msg_size
+            state = self.__state
+
+            inp = self.__inp
+            if msg_size > input_len:
+                if inp is None:
+                    self.__inp = d
+                elif type(self.__inp) is StringType:
+                    self.__inp = [self.__inp, d]
+                else:
+                    self.__inp.append(d)
+                self.__input_len = input_len
+                return # keep waiting for more input
+
+            # load all previous input and d into single string inp
+            if isinstance(inp, StringType):
+                inp = inp + d
+            elif inp is None:
+                inp = d
             else:
-                self.__inp.append(d)
-            self.__input_len = input_len
-            return # keep waiting for more input
-
-        # load all previous input and d into single string inp
-        if isinstance(inp, StringType):
-            inp = inp + d
-        elif inp is None:
-            inp = d
-        else:
-            inp.append(d)
-            inp = "".join(inp)
+                inp.append(d)
+                inp = "".join(inp)
 
-        offset = 0
-        while (offset + msg_size) <= input_len:
-            msg = inp[offset:offset + msg_size]
-            offset = offset + msg_size
-            if not state:
-                # waiting for message
-                msg_size = struct.unpack(">i", msg)[0]
-                state = 1
-            else:
-                msg_size = 4
-                state = 0
-                self.message_input(msg)
-
-        self.__state = state
-        self.__msg_size = msg_size
-        self.__inp = inp[offset:]
-        self.__input_len = input_len - offset
+            offset = 0
+            while (offset + msg_size) <= input_len:
+                msg = inp[offset:offset + msg_size]
+                offset = offset + msg_size
+                if not state:
+                    # waiting for message
+                    msg_size = struct.unpack(">i", msg)[0]
+                    state = 1
+                else:
+                    msg_size = 4
+                    state = 0
+                    # XXX We call message_input() with __input_lock
+                    # held!!!  And message_input() may end up calling
+                    # message_output(), which has its own lock.  But
+                    # message_output() cannot call message_input(), so
+                    # the locking order is always consistent, which
+                    # prevents deadlock.  Also, message_input() may
+                    # take a long time, because it can cause an
+                    # incoming call to be handled.  During all this
+                    # time, the __input_lock is held.  That's a good
+                    # thing, because it serializes incoming calls.
+                    self.message_input(msg)
+
+            self.__state = state
+            self.__msg_size = msg_size
+            self.__inp = inp[offset:]
+            self.__input_len = input_len - offset
+        finally:
+            self.__input_lock.release()
 
     def readable(self):
         return 1
@@ -147,36 +165,40 @@
             return 1
 
     def handle_write(self):
-        output = self.__output
-        while output:
-            # Accumulate output into a single string so that we avoid
-            # multiple send() calls, but avoid accumulating too much
-            # data.  If we send a very small string and have more data
-            # to send, we will likely incur delays caused by the
-            # unfortunate interaction between the Nagle algorithm and
-            # delayed acks.  If we send a very large string, only a
-            # portion of it will actually be delivered at a time.
-
-            l = 0
-            for i in range(len(output)):
-                l += len(output[i])
-                if l > SEND_SIZE:
-                    break
-
-            i += 1
-            # It is very unlikely that i will be 1.
-            v = "".join(output[:i])
-            del output[:i]
-
-            try:
-                n = self.send(v)
-            except socket.error, err:
-                if err[0] in expected_socket_write_errors:
-                    break # we couldn't write anything
-                raise
-            if n < len(v):
-                output.insert(0, v[n:])
-                break # we can't write any more
+        self.__output_lock.acquire()
+        try:
+            output = self.__output
+            while output:
+                # Accumulate output into a single string so that we avoid
+                # multiple send() calls, but avoid accumulating too much
+                # data.  If we send a very small string and have more data
+                # to send, we will likely incur delays caused by the
+                # unfortunate interaction between the Nagle algorithm and
+                # delayed acks.  If we send a very large string, only a
+                # portion of it will actually be delivered at a time.
+
+                l = 0
+                for i in range(len(output)):
+                    l += len(output[i])
+                    if l > SEND_SIZE:
+                        break
+
+                i += 1
+                # It is very unlikely that i will be 1.
+                v = "".join(output[:i])
+                del output[:i]
+
+                try:
+                    n = self.send(v)
+                except socket.error, err:
+                    if err[0] in expected_socket_write_errors:
+                        break # we couldn't write anything
+                    raise
+                if n < len(v):
+                    output.insert(0, v[n:])
+                    break # we can't write any more
+        finally:
+            self.__output_lock.release()
 
     def handle_close(self):
         self.close()
@@ -184,11 +206,8 @@
     def message_output(self, message):
         if __debug__:
             if self._debug:
-                if len(message) > 40:
-                    m = message[:40]+' ...'
-                else:
-                    m = message
-                log('message_output %d bytes: %s' % (len(message), `m`),
+                log('message_output %d bytes: %s' %
+                    (len(message), short_repr(message)),
                     level=zLOG.TRACE)
 
         if self.__closed:
@@ -196,13 +215,17 @@
                 "This action is temporarily unavailable."
                 "<p>"
                 )
-        # do two separate appends to avoid copying the message string
-        self.__output.append(struct.pack(">i", len(message)))
-        if len(message) <= SEND_SIZE:
-            self.__output.append(message)
-        else:
-            for i in range(0, len(message), SEND_SIZE):
-                self.__output.append(message[i:i+SEND_SIZE])
+        self.__output_lock.acquire()
+        try:
+            # do two separate appends to avoid copying the message string
+            self.__output.append(struct.pack(">i", len(message)))
+            if len(message) <= SEND_SIZE:
+                self.__output.append(message)
+            else:
+                for i in range(0, len(message), SEND_SIZE):
+                    self.__output.append(message[i:i+SEND_SIZE])
+        finally:
+            self.__output_lock.release()
 
     def close(self):
         if not self.__closed: