[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/zrpc/smac.py Added support for message iterators. This allows one, for example, to

Jim Fulton jim at zope.com
Fri May 18 14:02:00 EDT 2007


Log message for revision 75838:
  Added support for message iterators.  This allows one, for example, to
  use an iterator to send a large file without loading it in memory.
  

Changed:
  U   ZODB/trunk/src/ZEO/zrpc/smac.py

-=-
Modified: ZODB/trunk/src/ZEO/zrpc/smac.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/smac.py	2007-05-18 18:01:57 UTC (rev 75837)
+++ ZODB/trunk/src/ZEO/zrpc/smac.py	2007-05-18 18:01:59 UTC (rev 75838)
@@ -101,7 +101,7 @@
         self.__state = 0
         self.__has_mac = 0
         self.__msg_size = 4
-        self.__output_lock = threading.Lock() # Protects __output
+        self.__output_messages = []
         self.__output = []
         self.__closed = False
         # Each side of the connection sends and receives messages.  A
@@ -129,9 +129,31 @@
 
     def setSessionKey(self, sesskey):
         log("set session key %r" % sesskey)
-        self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha)
-        self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha)
 
+        # Low-level construction is now delayed until data are sent.
+        # This is to allow use of iterators that generate messages
+        # only when we're ready to do I/O so that we can effeciently
+        # transmit large files.  Because we delay messages, we also
+        # have to delay setting the session key to retain proper
+        # ordering.
+
+        # The low-level output queue supports strings, a special close
+        # marker, and iterators.  It doesn't support callbacks.  We
+        # can create a allback by providing an iterator that doesn't
+        # yield anything.
+
+        # The hack fucntion below is a callback in iterator's
+        # clothing. :)  It never yields anything, but is a generator
+        # and thus iterator, because it contains a yield statement.
+
+        def hack():
+            self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha)
+            self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha)
+            if False:
+                yield ''
+
+        self.message_output(hack())
+        
     def get_addr(self):
         return self.addr
 
@@ -232,87 +254,91 @@
         return True
 
     def writable(self):
-        if len(self.__output) == 0:
-            return False
-        else:
-            return True
+        return bool(self.__output_messages or self.__output)
 
     def should_close(self):
-        self.__output.append(_close_marker)
+        self.__output_messages.append(_close_marker)
 
     def handle_write(self):
-        self.__output_lock.acquire()
-        try:
-            output = self.__output
-            while output:
-                # Accumulate output into a single string so that we avoid
-                # multiple send() calls, but avoid accumulating too much
-                # data.  If we send a very small string and have more data
-                # to send, we will likely incur delays caused by the
-                # unfortunate interaction between the Nagle algorithm and
-                # delayed acks.  If we send a very large string, only a
-                # portion of it will actually be delivered at a time.
+        output = self.__output
+        messages = self.__output_messages
+        while output or messages:
 
-                l = 0
-                for i in range(len(output)):
+            # Process queued messages until we have enough output
+            size = sum((len(s) for s in output))
+            while (size <= SEND_SIZE) and messages:
+                message = messages[0]
+                if message.__class__ is str:
+                    size += self.__message_output(messages.pop(0), output)
+                elif message is _close_marker:
+                    del messages[:]
+                    del output[:]
+                    return self.close()
+                else:
                     try:
-                        l += len(output[i])
-                    except TypeError:
-                        # We had an output marker, close the connection
-                        assert output[i] is _close_marker
-                        return self.close()
-                    
-                    if l > SEND_SIZE:
-                        break
+                        message = message.next()
+                    except StopIteration:
+                        messages.pop(0)
+                    else:
+                        size += self.__message_output(message, output)
 
-                i += 1
-                # It is very unlikely that i will be 1.
-                v = "".join(output[:i])
-                del output[:i]
+            # Accumulate output into a single string so that we avoid
+            # multiple send() calls, but avoid accumulating too much
+            # data.  If we send a very small string and have more data
+            # to send, we will likely incur delays caused by the
+            # unfortunate interaction between the Nagle algorithm and
+            # delayed acks.  If we send a very large string, only a
+            # portion of it will actually be delivered at a time.
+            l = 0
+            for i in range(len(output)):
+                l += len(output[i])
+                if l > SEND_SIZE:
+                    break
 
-                try:
-                    n = self.send(v)
-                except socket.error, err:
-                    if err[0] in expected_socket_write_errors:
-                        break # we couldn't write anything
-                    raise
-                if n < len(v):
-                    output.insert(0, v[n:])
-                    break # we can't write any more
-        finally:
-            self.__output_lock.release()
+            i += 1
+            # It is very unlikely that i will be 1.
+            v = "".join(output[:i])
+            del output[:i]
 
+            try:
+                n = self.send(v)
+            except socket.error, err:
+                if err[0] in expected_socket_write_errors:
+                    break # we couldn't write anything
+                raise
+            
+            if n < l:
+                output.insert(0, v[n:])
+                break # we can't write any more
+
     def handle_close(self):
         self.close()
 
     def message_output(self, message):
-        if __debug__:
-            if self._debug:
-                log("message_output %d bytes: %s hmac=%d" %
-                    (len(message), short_repr(message),
-                    self.__hmac_send and 1 or 0),
-                    level=TRACE)
-
         if self.__closed:
             raise DisconnectedError(
                 "This action is temporarily unavailable.<p>")
-        self.__output_lock.acquire()
-        try:
-            # do two separate appends to avoid copying the message string
-            if self.__hmac_send:
-                self.__output.append(struct.pack(">I", len(message) | MAC_BIT))
-                self.__hmac_send.update(message)
-                self.__output.append(self.__hmac_send.digest())
-            else:
-                self.__output.append(struct.pack(">I", len(message)))
-            if len(message) <= SEND_SIZE:
-                self.__output.append(message)
-            else:
-                for i in range(0, len(message), SEND_SIZE):
-                    self.__output.append(message[i:i+SEND_SIZE])
-        finally:
-            self.__output_lock.release()
+        self.__output_messages.append(message)
 
+    def __message_output(self, message, output):
+        # do two separate appends to avoid copying the message string
+        size = 4        
+        if self.__hmac_send:
+            output.append(struct.pack(">I", len(message) | MAC_BIT))
+            self.__hmac_send.update(message)
+            output.append(self.__hmac_send.digest())
+            size += 20
+        else:
+            output.append(struct.pack(">I", len(message)))
+
+        if len(message) <= SEND_SIZE:
+            output.append(message)
+        else:
+            for i in range(0, len(message), SEND_SIZE):
+                output.append(message[i:i+SEND_SIZE])
+
+        return size + len(message)
+
     def close(self):
         if not self.__closed:
             self.__closed = True



More information about the Zodb-checkins mailing list