[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/zrpc/smac.py Added support
for message iterators. This allows one, for example, to
Jim Fulton
jim at zope.com
Fri May 18 14:02:00 EDT 2007
Log message for revision 75838:
Added support for message iterators. This allows one, for example, to
use an iterator to send a large file without loading it in memory.
Changed:
U ZODB/trunk/src/ZEO/zrpc/smac.py
-=-
Modified: ZODB/trunk/src/ZEO/zrpc/smac.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/smac.py 2007-05-18 18:01:57 UTC (rev 75837)
+++ ZODB/trunk/src/ZEO/zrpc/smac.py 2007-05-18 18:01:59 UTC (rev 75838)
@@ -101,7 +101,7 @@
self.__state = 0
self.__has_mac = 0
self.__msg_size = 4
- self.__output_lock = threading.Lock() # Protects __output
+ self.__output_messages = []
self.__output = []
self.__closed = False
# Each side of the connection sends and receives messages. A
@@ -129,9 +129,31 @@
def setSessionKey(self, sesskey):
log("set session key %r" % sesskey)
- self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha)
- self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha)
+ # Low-level construction is now delayed until data are sent.
+ # This is to allow use of iterators that generate messages
+ # only when we're ready to do I/O so that we can effeciently
+ # transmit large files. Because we delay messages, we also
+ # have to delay setting the session key to retain proper
+ # ordering.
+
+ # The low-level output queue supports strings, a special close
+ # marker, and iterators. It doesn't support callbacks. We
+ # can create a allback by providing an iterator that doesn't
+ # yield anything.
+
+ # The hack fucntion below is a callback in iterator's
+ # clothing. :) It never yields anything, but is a generator
+ # and thus iterator, because it contains a yield statement.
+
+ def hack():
+ self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha)
+ self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha)
+ if False:
+ yield ''
+
+ self.message_output(hack())
+
def get_addr(self):
return self.addr
@@ -232,87 +254,91 @@
return True
def writable(self):
- if len(self.__output) == 0:
- return False
- else:
- return True
+ return bool(self.__output_messages or self.__output)
def should_close(self):
- self.__output.append(_close_marker)
+ self.__output_messages.append(_close_marker)
def handle_write(self):
- self.__output_lock.acquire()
- try:
- output = self.__output
- while output:
- # Accumulate output into a single string so that we avoid
- # multiple send() calls, but avoid accumulating too much
- # data. If we send a very small string and have more data
- # to send, we will likely incur delays caused by the
- # unfortunate interaction between the Nagle algorithm and
- # delayed acks. If we send a very large string, only a
- # portion of it will actually be delivered at a time.
+ output = self.__output
+ messages = self.__output_messages
+ while output or messages:
- l = 0
- for i in range(len(output)):
+ # Process queued messages until we have enough output
+ size = sum((len(s) for s in output))
+ while (size <= SEND_SIZE) and messages:
+ message = messages[0]
+ if message.__class__ is str:
+ size += self.__message_output(messages.pop(0), output)
+ elif message is _close_marker:
+ del messages[:]
+ del output[:]
+ return self.close()
+ else:
try:
- l += len(output[i])
- except TypeError:
- # We had an output marker, close the connection
- assert output[i] is _close_marker
- return self.close()
-
- if l > SEND_SIZE:
- break
+ message = message.next()
+ except StopIteration:
+ messages.pop(0)
+ else:
+ size += self.__message_output(message, output)
- i += 1
- # It is very unlikely that i will be 1.
- v = "".join(output[:i])
- del output[:i]
+ # Accumulate output into a single string so that we avoid
+ # multiple send() calls, but avoid accumulating too much
+ # data. If we send a very small string and have more data
+ # to send, we will likely incur delays caused by the
+ # unfortunate interaction between the Nagle algorithm and
+ # delayed acks. If we send a very large string, only a
+ # portion of it will actually be delivered at a time.
+ l = 0
+ for i in range(len(output)):
+ l += len(output[i])
+ if l > SEND_SIZE:
+ break
- try:
- n = self.send(v)
- except socket.error, err:
- if err[0] in expected_socket_write_errors:
- break # we couldn't write anything
- raise
- if n < len(v):
- output.insert(0, v[n:])
- break # we can't write any more
- finally:
- self.__output_lock.release()
+ i += 1
+ # It is very unlikely that i will be 1.
+ v = "".join(output[:i])
+ del output[:i]
+ try:
+ n = self.send(v)
+ except socket.error, err:
+ if err[0] in expected_socket_write_errors:
+ break # we couldn't write anything
+ raise
+
+ if n < l:
+ output.insert(0, v[n:])
+ break # we can't write any more
+
def handle_close(self):
self.close()
def message_output(self, message):
- if __debug__:
- if self._debug:
- log("message_output %d bytes: %s hmac=%d" %
- (len(message), short_repr(message),
- self.__hmac_send and 1 or 0),
- level=TRACE)
-
if self.__closed:
raise DisconnectedError(
"This action is temporarily unavailable.<p>")
- self.__output_lock.acquire()
- try:
- # do two separate appends to avoid copying the message string
- if self.__hmac_send:
- self.__output.append(struct.pack(">I", len(message) | MAC_BIT))
- self.__hmac_send.update(message)
- self.__output.append(self.__hmac_send.digest())
- else:
- self.__output.append(struct.pack(">I", len(message)))
- if len(message) <= SEND_SIZE:
- self.__output.append(message)
- else:
- for i in range(0, len(message), SEND_SIZE):
- self.__output.append(message[i:i+SEND_SIZE])
- finally:
- self.__output_lock.release()
+ self.__output_messages.append(message)
+ def __message_output(self, message, output):
+ # do two separate appends to avoid copying the message string
+ size = 4
+ if self.__hmac_send:
+ output.append(struct.pack(">I", len(message) | MAC_BIT))
+ self.__hmac_send.update(message)
+ output.append(self.__hmac_send.digest())
+ size += 20
+ else:
+ output.append(struct.pack(">I", len(message)))
+
+ if len(message) <= SEND_SIZE:
+ output.append(message)
+ else:
+ for i in range(0, len(message), SEND_SIZE):
+ output.append(message[i:i+SEND_SIZE])
+
+ return size + len(message)
+
def close(self):
if not self.__closed:
self.__closed = True
More information about the Zodb-checkins
mailing list