[Zodb-checkins] CVS: ZODB3/ZEO/zrpc - smac.py:1.31
Guido van Rossum
guido@python.org
Sat, 28 Sep 2002 23:22:28 -0400
Update of /cvs-repository/ZODB3/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv27189
Modified Files:
smac.py
Log Message:
Tim saw Win2k crashes that made me realize that the input and output
buffer variables here are accessed from multiple threads without any
locking@ Add such locking: a separate lock for input and one for
output. XXX Note: handle_read() keeps the lock for a potentially long
time. But this is required to serialize incoming calls anyway.
Unrelated nicety: use short_repr() when logging message output, for
consistency with other places.
=== ZODB3/ZEO/zrpc/smac.py 1.30 => 1.31 ===
--- ZODB3/ZEO/zrpc/smac.py:1.30 Sat Sep 28 22:46:58 2002
+++ ZODB3/ZEO/zrpc/smac.py Sat Sep 28 23:22:28 2002
@@ -17,11 +17,12 @@
"""
import asyncore, struct
+import threading
from ZEO.Exceptions import Disconnected
import zLOG
from types import StringType
-from ZEO.zrpc.log import log
+from ZEO.zrpc.log import log, short_repr
import socket, errno
@@ -63,6 +64,8 @@
self._debug = debug
elif not hasattr(self, '_debug'):
self._debug = __debug__
+ # __input_lock protects __inp, __input_len, __state, __msg_size
+ self.__input_lock = threading.Lock()
self.__inp = None # None, a single String, or a list
self.__input_len = 0
# Instance variables __state and __msg_size work together:
@@ -74,6 +77,7 @@
# The state alternates between 0 and 1.
self.__state = 0
self.__msg_size = 4
+ self.__output_lock = threading.Lock() # Protects __output
self.__output = []
self.__closed = 0
self.__super_init(sock, map)
@@ -95,47 +99,61 @@
if not d:
return
- input_len = self.__input_len + len(d)
- msg_size = self.__msg_size
- state = self.__state
-
- inp = self.__inp
- if msg_size > input_len:
- if inp is None:
- self.__inp = d
- elif type(self.__inp) is StringType:
- self.__inp = [self.__inp, d]
+ self.__input_lock.acquire()
+ try:
+ input_len = self.__input_len + len(d)
+ msg_size = self.__msg_size
+ state = self.__state
+
+ inp = self.__inp
+ if msg_size > input_len:
+ if inp is None:
+ self.__inp = d
+ elif type(self.__inp) is StringType:
+ self.__inp = [self.__inp, d]
+ else:
+ self.__inp.append(d)
+ self.__input_len = input_len
+ return # keep waiting for more input
+
+ # load all previous input and d into single string inp
+ if isinstance(inp, StringType):
+ inp = inp + d
+ elif inp is None:
+ inp = d
else:
- self.__inp.append(d)
- self.__input_len = input_len
- return # keep waiting for more input
-
- # load all previous input and d into single string inp
- if isinstance(inp, StringType):
- inp = inp + d
- elif inp is None:
- inp = d
- else:
- inp.append(d)
- inp = "".join(inp)
+ inp.append(d)
+ inp = "".join(inp)
- offset = 0
- while (offset + msg_size) <= input_len:
- msg = inp[offset:offset + msg_size]
- offset = offset + msg_size
- if not state:
- # waiting for message
- msg_size = struct.unpack(">i", msg)[0]
- state = 1
- else:
- msg_size = 4
- state = 0
- self.message_input(msg)
-
- self.__state = state
- self.__msg_size = msg_size
- self.__inp = inp[offset:]
- self.__input_len = input_len - offset
+ offset = 0
+ while (offset + msg_size) <= input_len:
+ msg = inp[offset:offset + msg_size]
+ offset = offset + msg_size
+ if not state:
+ # waiting for message
+ msg_size = struct.unpack(">i", msg)[0]
+ state = 1
+ else:
+ msg_size = 4
+ state = 0
+ # XXX We call message_input() with __input_lock
+ # held!!! And message_input() may end up calling
+ # message_output(), which has its own lock. But
+ # message_output() cannot call message_input(), so
+ # the locking order is always consistent, which
+ # prevents deadlock. Also, message_input() may
+ # take a long time, because it can cause an
+ # incoming call to be handled. During all this
+ # time, the __input_lock is held. That's a good
+ # thing, because it serializes incoming calls.
+ self.message_input(msg)
+
+ self.__state = state
+ self.__msg_size = msg_size
+ self.__inp = inp[offset:]
+ self.__input_len = input_len - offset
+ finally:
+ self.__input_lock.release()
def readable(self):
return 1
@@ -147,36 +165,40 @@
return 1
def handle_write(self):
- output = self.__output
- while output:
- # Accumulate output into a single string so that we avoid
- # multiple send() calls, but avoid accumulating too much
- # data. If we send a very small string and have more data
- # to send, we will likely incur delays caused by the
- # unfortunate interaction between the Nagle algorithm and
- # delayed acks. If we send a very large string, only a
- # portion of it will actually be delivered at a time.
-
- l = 0
- for i in range(len(output)):
- l += len(output[i])
- if l > SEND_SIZE:
- break
-
- i += 1
- # It is very unlikely that i will be 1.
- v = "".join(output[:i])
- del output[:i]
-
- try:
- n = self.send(v)
- except socket.error, err:
- if err[0] in expected_socket_write_errors:
- break # we couldn't write anything
- raise
- if n < len(v):
- output.insert(0, v[n:])
- break # we can't write any more
+ self.__output_lock.acquire()
+ try:
+ output = self.__output
+ while output:
+ # Accumulate output into a single string so that we avoid
+ # multiple send() calls, but avoid accumulating too much
+ # data. If we send a very small string and have more data
+ # to send, we will likely incur delays caused by the
+ # unfortunate interaction between the Nagle algorithm and
+ # delayed acks. If we send a very large string, only a
+ # portion of it will actually be delivered at a time.
+
+ l = 0
+ for i in range(len(output)):
+ l += len(output[i])
+ if l > SEND_SIZE:
+ break
+
+ i += 1
+ # It is very unlikely that i will be 1.
+ v = "".join(output[:i])
+ del output[:i]
+
+ try:
+ n = self.send(v)
+ except socket.error, err:
+ if err[0] in expected_socket_write_errors:
+ break # we couldn't write anything
+ raise
+ if n < len(v):
+ output.insert(0, v[n:])
+ break # we can't write any more
+ finally:
+ self.__output_lock.release()
def handle_close(self):
self.close()
@@ -184,11 +206,8 @@
def message_output(self, message):
if __debug__:
if self._debug:
- if len(message) > 40:
- m = message[:40]+' ...'
- else:
- m = message
- log('message_output %d bytes: %s' % (len(message), `m`),
+ log('message_output %d bytes: %s' %
+ (len(message), short_repr(message)),
level=zLOG.TRACE)
if self.__closed:
@@ -196,13 +215,17 @@
"This action is temporarily unavailable."
"<p>"
)
- # do two separate appends to avoid copying the message string
- self.__output.append(struct.pack(">i", len(message)))
- if len(message) <= SEND_SIZE:
- self.__output.append(message)
- else:
- for i in range(0, len(message), SEND_SIZE):
- self.__output.append(message[i:i+SEND_SIZE])
+ self.__output_lock.acquire()
+ try:
+ # do two separate appends to avoid copying the message string
+ self.__output.append(struct.pack(">i", len(message)))
+ if len(message) <= SEND_SIZE:
+ self.__output.append(message)
+ else:
+ for i in range(0, len(message), SEND_SIZE):
+ self.__output.append(message[i:i+SEND_SIZE])
+ finally:
+ self.__output_lock.release()
def close(self):
if not self.__closed: