[Zodb-checkins]
SVN: ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py
Queue message processing for main thread so we can support sane file
Jim Fulton
jim at zope.com
Tue May 15 15:49:01 EDT 2007
Log message for revision 75778:
Queue message processing for main thread so we can support sane file
iteration.
Changed:
U ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py
-=-
Modified: ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py 2007-05-15 19:31:56 UTC (rev 75777)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/zrpc/smac.py 2007-05-15 19:49:01 UTC (rev 75778)
@@ -101,7 +101,7 @@
self.__state = 0
self.__has_mac = 0
self.__msg_size = 4
- self.__output_lock = threading.Lock() # Protects __output
+ self.__output_messages = []
self.__output = []
self.__closed = False
# Each side of the connection sends and receives messages. A
@@ -129,9 +129,14 @@
def setSessionKey(self, sesskey):
log("set session key %r" % sesskey)
- self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha)
- self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha)
+ def hack():
+ self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha)
+ self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha)
+ if False:
+ yield ''
+ self.message_output(hack())
+
def get_addr(self):
return self.addr
@@ -232,87 +237,86 @@
return True
def writable(self):
- if len(self.__output) == 0:
- return False
- else:
- return True
+ return bool(self.__output_messages or self.__output)
def should_close(self):
- self.__output.append(_close_marker)
+ self.__output_messages.append(_close_marker)
def handle_write(self):
- self.__output_lock.acquire()
- try:
- output = self.__output
- while output:
- # Accumulate output into a single string so that we avoid
- # multiple send() calls, but avoid accumulating too much
- # data. If we send a very small string and have more data
- # to send, we will likely incur delays caused by the
- # unfortunate interaction between the Nagle algorithm and
- # delayed acks. If we send a very large string, only a
- # portion of it will actually be delivered at a time.
+ output = self.__output
+ messages = self.__output_messages
- l = 0
- for i in range(len(output)):
- try:
- l += len(output[i])
- except TypeError:
- # We had an output marker, close the connection
- assert output[i] is _close_marker
- return self.close()
-
- if l > SEND_SIZE:
- break
+ while output or messages:
+ # Accumulate output into a single string so that we avoid
+ # multiple send() calls, but avoid accumulating too much
+ # data. If we send a very small string and have more data
+ # to send, we will likely incur delays caused by the
+ # unfortunate interaction between the Nagle algorithm and
+ # delayed acks. If we send a very large string, only a
+ # portion of it will actually be delivered at a time.
- i += 1
- # It is very unlikely that i will be 1.
- v = "".join(output[:i])
- del output[:i]
+ while messages:
+ message = messages.pop(0)
+ if message.__class__ is str:
+ self.__message_output(message)
+ elif message is _close_marker:
+ output.append(message)
+ else:
+ for m in message:
+ if m:
+ self.__message_output(m)
+
+ l = 0
+ for i in range(len(output)):
try:
- n = self.send(v)
- except socket.error, err:
- if err[0] in expected_socket_write_errors:
- break # we couldn't write anything
- raise
- if n < len(v):
- output.insert(0, v[n:])
- break # we can't write any more
- finally:
- self.__output_lock.release()
+ l += len(output[i])
+ except TypeError:
+ # We had an output marker, close the connection
+ assert output[i] is _close_marker
+ return self.close()
+ if l > SEND_SIZE:
+ break
+
+ i += 1
+ # It is very unlikely that i will be 1.
+ v = "".join(output[:i])
+ del output[:i]
+
+ try:
+ n = self.send(v)
+ except socket.error, err:
+ if err[0] in expected_socket_write_errors:
+ break # we couldn't write anything
+ raise
+ if n < len(v):
+ output.insert(0, v[n:])
+ break # we can't write any more
+
def handle_close(self):
self.close()
def message_output(self, message):
- if __debug__:
- if self._debug:
- log("message_output %d bytes: %s hmac=%d" %
- (len(message), short_repr(message),
- self.__hmac_send and 1 or 0),
- level=TRACE)
-
if self.__closed:
raise DisconnectedError(
"This action is temporarily unavailable.<p>")
- self.__output_lock.acquire()
- try:
- # do two separate appends to avoid copying the message string
- if self.__hmac_send:
- self.__output.append(struct.pack(">I", len(message) | MAC_BIT))
- self.__hmac_send.update(message)
- self.__output.append(self.__hmac_send.digest())
- else:
- self.__output.append(struct.pack(">I", len(message)))
- if len(message) <= SEND_SIZE:
- self.__output.append(message)
- else:
- for i in range(0, len(message), SEND_SIZE):
- self.__output.append(message[i:i+SEND_SIZE])
- finally:
- self.__output_lock.release()
+ self.__output_messages.append(message)
+ def __message_output(self, message):
+ # do two separate appends to avoid copying the message string
+ if self.__hmac_send:
+ self.__output.append(struct.pack(">I", len(message) | MAC_BIT))
+ self.__hmac_send.update(message)
+ self.__output.append(self.__hmac_send.digest())
+ else:
+ self.__output.append(struct.pack(">I", len(message)))
+ if len(message) <= SEND_SIZE:
+ self.__output.append(message)
+ else:
+ for i in range(0, len(message), SEND_SIZE):
+ self.__output.append(message[i:i+SEND_SIZE])
+
def close(self):
if not self.__closed:
self.__closed = True
More information about the Zodb-checkins
mailing list