[Zope3-checkins] SVN: Zope3/branches/ZopeX3-3.0/src/zope/server/ Merged from trunk:

Jim Fulton jim at zope.com
Thu Sep 2 14:31:38 EDT 2004


Log message for revision 27426:
  Merged from trunk:
  
    r27408 | shane | 2004-09-02 01:31:25 -0400 (Thu, 02 Sep 2004) | 12 lines
  
    DualModeChannel no longer attempts to close except in the main thread.
  
    Also simplified by removing the experimental SimultaneousModeChannel 
    class.  DualModeChannel is ambitious enough already.  We don't need to 
    expose even more bugs in asyncore.
  
    The advantage of closing in application threads is that it forces the 
    TCP stack to flush buffers immediately, resulting in a quick response.  
    However, TCP_NODELAY should have the same effect, so in theory, no speed 
    has been lost.
  
  
    ------------------------------------------------------------------------
    r27407 | shane | 2004-09-02 01:24:05 -0400 (Thu, 02 Sep 2004) | 5 lines
  
    Added a method of setting socket options and turned on TCP_NODELAY.
  
    Zope buffers everything already, so the Nagle algorithm is likely 
    to delay unnecessarily in most forseeable cases.
  
    ------------------------------------------------------------------------
    r27406 | shane | 2004-09-02 01:20:28 -0400 (Thu, 02 Sep 2004) | 2 lines
  
    Improved wording in interface docstrings
    
  


Changed:
  U   Zope3/branches/ZopeX3-3.0/src/zope/server/adjustments.py
  U   Zope3/branches/ZopeX3-3.0/src/zope/server/dualmodechannel.py
  U   Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/server.py
  U   Zope3/branches/ZopeX3-3.0/src/zope/server/interfaces/__init__.py
  U   Zope3/branches/ZopeX3-3.0/src/zope/server/serverbase.py
  U   Zope3/branches/ZopeX3-3.0/src/zope/server/serverchannelbase.py


-=-
Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/adjustments.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/adjustments.py	2004-09-02 18:15:15 UTC (rev 27425)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/adjustments.py	2004-09-02 18:31:38 UTC (rev 27426)
@@ -15,6 +15,8 @@
 
 $Id$
 """
+import socket
+
 from zope.server import maxsockets
 
 
@@ -33,7 +35,11 @@
     recv_bytes = 8192
 
     # send_bytes is the number of bytes to send to socket.send().
-    send_bytes = 8192
+    # Multiples of 9000 should avoid partly-filled packets, but don't
+    # set this larger than the TCP write buffer size.  In Linux,
+    # /proc/sys/net/ipv4/tcp_wmem controls the minimum, default, and
+    # maximum sizes of TCP write buffers.
+    send_bytes = 9000
 
     # copy_bytes is the number of bytes to copy from one file to another.
     copy_bytes = 65536
@@ -61,5 +67,13 @@
     # Boolean: turn off to not log premature client disconnects.
     log_socket_errors = 1
 
+    # The socket options to set on receiving a connection.
+    # It is a list of (level, optname, value) tuples.
+    # TCP_NODELAY is probably good for Zope, since Zope buffers
+    # data itself.
+    socket_options = [
+        (socket.SOL_TCP, socket.TCP_NODELAY, 1),
+        ]
 
+
 default_adj = Adjustments()

Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/dualmodechannel.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/dualmodechannel.py	2004-09-02 18:15:15 UTC (rev 27425)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/dualmodechannel.py	2004-09-02 18:31:38 UTC (rev 27426)
@@ -68,9 +68,6 @@
     def handle_write(self):
         if not self.async_mode:
             return
-        self.inner_handle_write()
-
-    def inner_handle_write(self):
         if self.outbuf:
             try:
                 self._flush_some()
@@ -87,9 +84,6 @@
     def handle_read(self):
         if not self.async_mode:
             return
-        self.inner_handle_read()
-
-    def inner_handle_read(self):
         try:
             data = self.recv(self.adj.recv_bytes)
         except socket.error:
@@ -122,16 +116,6 @@
     # SYNCHRONOUS METHODS
     #
 
-    def write(self, data):
-        if data:
-            self.outbuf.append(data)
-        while len(self.outbuf) >= self.adj.send_bytes:
-            # Send what we can without blocking.
-            # We propagate errors to the application on purpose
-            # (to stop the application if the connection closes).
-            if not self._flush_some():
-                break
-
     def flush(self, block=1):
         """Sends pending data.
 
@@ -167,6 +151,16 @@
     # METHODS USED IN BOTH MODES
     #
 
+    def write(self, data):
+        if data:
+            self.outbuf.append(data)
+        while len(self.outbuf) >= self.adj.send_bytes:
+            # Send what we can without blocking.
+            # We propagate errors to the application on purpose
+            # (to stop the application if the connection closes).
+            if not self._flush_some():
+                break
+
     def pull_trigger(self):
         """Wakes up the main loop.
         """
@@ -186,96 +180,12 @@
         return 0
 
     def close_when_done(self):
-        # We might be able close immediately.
+        # Flush all possible.
         while self._flush_some():
             pass
-        if not self.outbuf:
-            # Quick exit.
-            self.close()
-        else:
-            # Wait until outbuf is flushed.
-            self.will_close = 1
-            if not self.async_mode:
-                self.async_mode = 1
-                self.pull_trigger()
-
-
-allocate_lock = None
-
-
-class SimultaneousModeChannel(DualModeChannel):
-    """Layer on top of DualModeChannel that allows communication in
-    both the main thread and other threads at the same time.
-
-    The channel operates in synchronous mode with an asynchronous
-    helper.  The asynchronous callbacks empty the output buffer
-    and fill the input buffer.
-    """
-
-    def __init__(self, conn, addr, adj=None):
-        global allocate_lock
-        if allocate_lock is None:
-            from thread import allocate_lock
-
-        # writelock protects all accesses to outbuf, since reads and
-        # writes of buffers in this class need to be serialized.
-        writelock = allocate_lock()
-        self._writelock_acquire = writelock.acquire
-        self._writelock_release = writelock.release
-        self._writelock_locked = writelock.locked
-        DualModeChannel.__init__(self, conn, addr, adj)
-
-    #
-    # ASYNCHRONOUS METHODS
-    #
-
-    def writable(self):
-        return self.will_close or (
-            self.outbuf and not self._writelock_locked())
-
-    def handle_write(self):
-        if not self._writelock_acquire(0):
-            # A synchronous method is writing.
-            return
-        try:
-            self.inner_handle_write()
-        finally:
-            self._writelock_release()
-
-    def readable(self):
-        return not self.will_close
-
-    def handle_read(self):
-        self.inner_handle_read()
-
-    def set_sync(self):
-        pass
-
-    #
-    # SYNCHRONOUS METHODS
-    #
-
-    def write(self, data):
-        self._writelock_acquire()
-        try:
-            DualModeChannel.write(self, data)
-        finally:
-            self._writelock_release()
-
-    def flush(self, block=1):
-        self._writelock_acquire()
-        try:
-            DualModeChannel.flush(self, block)
-        finally:
-            self._writelock_release()
-
-    def set_async(self):
-        pass
-
-    #
-    # METHODS USED IN BOTH MODES
-    #
-
-    def close_when_done(self):
         self.will_close = 1
-        self.pull_trigger()
+        if not self.async_mode:
+            # For safety, don't close the socket until the
+            # main thread calls handle_write().
+            self.async_mode = 1
+            self.pull_trigger()

Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/server.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/server.py	2004-09-02 18:15:15 UTC (rev 27425)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/ftp/server.py	2004-09-02 18:31:38 UTC (rev 27426)
@@ -29,7 +29,7 @@
 from zope.server.interfaces.ftp import IFTPCommandHandler
 from zope.server.linereceiver.lineserverchannel import LineServerChannel
 from zope.server.serverbase import ServerBase
-from zope.server.serverchannelbase import ChannelBaseClass
+from zope.server.dualmodechannel import DualModeChannel
 
 status_messages = {
     'OPEN_DATA_CONN'   : '150 Opening %s mode data connection for file list',
@@ -722,7 +722,7 @@
         self.close()
 
 
-class RecvChannel(ChannelBaseClass):
+class RecvChannel(DualModeChannel):
     """ """
 
     complete_transfer = 0
@@ -732,7 +732,7 @@
         self.control_channel = control_channel
         self.finish_args = finish_args
         self.inbuf = OverflowableBuffer(control_channel.adj.inbuf_overflow)
-        ChannelBaseClass.__init__(self, None, None, control_channel.adj)
+        DualModeChannel.__init__(self, None, None, control_channel.adj)
         # Note that this channel starts out in async mode.
 
     def writable (self):
@@ -765,7 +765,7 @@
         finally:
             if self.socket is not None:
                 # XXX asyncore.dispatcher.close() doesn't like socket == None
-                ChannelBaseClass.close(self)
+                DualModeChannel.close(self)
 
 
 
@@ -804,7 +804,7 @@
         pass
 
 
-class XmitChannel(ChannelBaseClass):
+class XmitChannel(DualModeChannel):
 
     opened = 0
     _fileno = None  # provide a default for asyncore.dispatcher._fileno
@@ -813,7 +813,7 @@
         self.control_channel = control_channel
         self.ok_reply_args = ok_reply_args
         self.set_sync()
-        ChannelBaseClass.__init__(self, None, None, control_channel.adj)
+        DualModeChannel.__init__(self, None, None, control_channel.adj)
 
     def _open(self):
         """Signal the client to open the connection."""
@@ -826,7 +826,7 @@
             raise IOError, 'Client FTP connection closed'
         if not self.opened:
             self._open()
-        ChannelBaseClass.write(self, data)
+        DualModeChannel.write(self, data)
 
     def readable(self):
         return not self.connected
@@ -864,7 +864,7 @@
         finally:
             if self.socket is not None:
                 # XXX asyncore.dispatcher.close() doesn't like socket == None
-                ChannelBaseClass.close(self)
+                DualModeChannel.close(self)
 
 
 class ApplicationXmitStream(object):

Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/interfaces/__init__.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/interfaces/__init__.py	2004-09-02 18:15:15 UTC (rev 27425)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/interfaces/__init__.py	2004-09-02 18:31:38 UTC (rev 27426)
@@ -21,8 +21,8 @@
 class ISocket(Interface):
     """Represents a socket.
 
-       Note: Most of this documentation is taken from the Python Library
-             Reference.
+    Note: Most of this documentation is taken from the Python Library
+    Reference.
     """
 
     def listen(backlog):
@@ -240,10 +240,9 @@
        implement this interface or inherit its base class) is that it
        uses a mix of asynchronous and thread-based mechanism to
        serve. While the low-level socket listener uses async, the
-       actual request is executed in a thread.  This has the huge
-       advantage that if a request takes really long to process, the
-       server does not hang at that point to wait for the request to
-       finish.
+       actual request is executed in a thread.  This is important
+       because even if a request takes a long time to process, the
+       server can service other requests simultaneously.
     """
 
     channel_class = Attribute("""

Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/serverbase.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/serverbase.py	2004-09-02 18:15:15 UTC (rev 27425)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/serverbase.py	2004-09-02 18:31:38 UTC (rev 27426)
@@ -145,4 +145,6 @@
                 self.log_info ('warning: server accept() threw an exception',
                                'warning')
             return
+        for (level, optname, value) in self.adj.socket_options:
+            conn.setsockopt(level, optname, value)
         self.channel_class(self, conn, addr, self.adj)

Modified: Zope3/branches/ZopeX3-3.0/src/zope/server/serverchannelbase.py
===================================================================
--- Zope3/branches/ZopeX3-3.0/src/zope/server/serverchannelbase.py	2004-09-02 18:15:15 UTC (rev 27425)
+++ Zope3/branches/ZopeX3-3.0/src/zope/server/serverchannelbase.py	2004-09-02 18:31:38 UTC (rev 27426)
@@ -25,22 +25,14 @@
 from thread import allocate_lock
 from zope.interface import implements
 
-# Enable ZOPE_SERVER_SIMULT_MODE to enable experimental
-# simultaneous channel mode, which may improve or degrade
-# throughput depending on load characteristics.
-if os.environ.get('ZOPE_SERVER_SIMULT_MODE'):
-    from zope.server.dualmodechannel import SimultaneousModeChannel as \
-         ChannelBaseClass
-else:
-    from zope.server.dualmodechannel import DualModeChannel as ChannelBaseClass
-
+from zope.server.dualmodechannel import DualModeChannel
 from zope.server.interfaces import IServerChannel
 
 # Synchronize access to the "running_tasks" attributes.
 running_lock = allocate_lock()
 
 
-class ServerChannelBase(ChannelBaseClass, object):
+class ServerChannelBase(DualModeChannel, object):
     """Base class for a high-performance, mixed-mode server-side channel."""
 
     implements(IServerChannel)
@@ -59,12 +51,12 @@
     running_tasks = 0         # boolean: true when any task is being executed
 
     #
-    # ASYNCHRONOUS METHODS (incl. __init__)
+    # ASYNCHRONOUS METHODS (including __init__)
     #
 
     def __init__(self, server, conn, addr, adj=None):
         """See async.dispatcher"""
-        ChannelBaseClass.__init__(self, conn, addr, adj)
+        DualModeChannel.__init__(self, conn, addr, adj)
         self.server = server
         self.last_activity = t = self.creation_time
         self.check_maintenance(t)
@@ -72,17 +64,17 @@
     def add_channel(self, map=None):
         """See async.dispatcher
 
-        This hook keeps track of opened HTTP channels.
+        This hook keeps track of opened channels.
         """
-        ChannelBaseClass.add_channel(self, map)
+        DualModeChannel.add_channel(self, map)
         self.__class__.active_channels[self._fileno] = self
 
     def del_channel(self, map=None):
         """See async.dispatcher
 
-        This hook keeps track of closed HTTP channels.
+        This hook keeps track of closed channels.
         """
-        ChannelBaseClass.del_channel(self, map)
+        DualModeChannel.del_channel(self, map)
         ac = self.__class__.active_channels
         fd = self._fileno
         if fd in ac:



More information about the Zope3-Checkins mailing list