[Zope3-checkins] SVN: Zope3/trunk/src/zope/server/ DualModeChannel
no longer attempts to close except in the main thread.
Shane Hathaway
shane at zope.com
Thu Sep 2 01:31:27 EDT 2004
Log message for revision 27408:
DualModeChannel no longer attempts to close except in the main thread.
Also simplified by removing the experimental SimultaneousModeChannel
class. DualModeChannel is ambitious enough already. We don't need to
expose even more bugs in asyncore.
The advantage of closing in application threads is that it forces the
TCP stack to flush buffers immediately, resulting in a quick response.
However, TCP_NODELAY should have the same effect, so in theory, no speed
has been lost.
Changed:
U Zope3/trunk/src/zope/server/dualmodechannel.py
U Zope3/trunk/src/zope/server/ftp/server.py
U Zope3/trunk/src/zope/server/serverchannelbase.py
-=-
Modified: Zope3/trunk/src/zope/server/dualmodechannel.py
===================================================================
--- Zope3/trunk/src/zope/server/dualmodechannel.py 2004-09-02 05:24:05 UTC (rev 27407)
+++ Zope3/trunk/src/zope/server/dualmodechannel.py 2004-09-02 05:31:25 UTC (rev 27408)
@@ -68,9 +68,6 @@
def handle_write(self):
if not self.async_mode:
return
- self.inner_handle_write()
-
- def inner_handle_write(self):
if self.outbuf:
try:
self._flush_some()
@@ -87,9 +84,6 @@
def handle_read(self):
if not self.async_mode:
return
- self.inner_handle_read()
-
- def inner_handle_read(self):
try:
data = self.recv(self.adj.recv_bytes)
except socket.error:
@@ -122,16 +116,6 @@
# SYNCHRONOUS METHODS
#
- def write(self, data):
- if data:
- self.outbuf.append(data)
- while len(self.outbuf) >= self.adj.send_bytes:
- # Send what we can without blocking.
- # We propagate errors to the application on purpose
- # (to stop the application if the connection closes).
- if not self._flush_some():
- break
-
def flush(self, block=1):
"""Sends pending data.
@@ -167,6 +151,16 @@
# METHODS USED IN BOTH MODES
#
+ def write(self, data):
+ if data:
+ self.outbuf.append(data)
+ while len(self.outbuf) >= self.adj.send_bytes:
+ # Send what we can without blocking.
+ # We propagate errors to the application on purpose
+ # (to stop the application if the connection closes).
+ if not self._flush_some():
+ break
+
def pull_trigger(self):
"""Wakes up the main loop.
"""
@@ -186,96 +180,12 @@
return 0
def close_when_done(self):
- # We might be able close immediately.
+ # Flush all possible.
while self._flush_some():
pass
- if not self.outbuf:
- # Quick exit.
- self.close()
- else:
- # Wait until outbuf is flushed.
- self.will_close = 1
- if not self.async_mode:
- self.async_mode = 1
- self.pull_trigger()
-
-
-allocate_lock = None
-
-
-class SimultaneousModeChannel(DualModeChannel):
- """Layer on top of DualModeChannel that allows communication in
- both the main thread and other threads at the same time.
-
- The channel operates in synchronous mode with an asynchronous
- helper. The asynchronous callbacks empty the output buffer
- and fill the input buffer.
- """
-
- def __init__(self, conn, addr, adj=None):
- global allocate_lock
- if allocate_lock is None:
- from thread import allocate_lock
-
- # writelock protects all accesses to outbuf, since reads and
- # writes of buffers in this class need to be serialized.
- writelock = allocate_lock()
- self._writelock_acquire = writelock.acquire
- self._writelock_release = writelock.release
- self._writelock_locked = writelock.locked
- DualModeChannel.__init__(self, conn, addr, adj)
-
- #
- # ASYNCHRONOUS METHODS
- #
-
- def writable(self):
- return self.will_close or (
- self.outbuf and not self._writelock_locked())
-
- def handle_write(self):
- if not self._writelock_acquire(0):
- # A synchronous method is writing.
- return
- try:
- self.inner_handle_write()
- finally:
- self._writelock_release()
-
- def readable(self):
- return not self.will_close
-
- def handle_read(self):
- self.inner_handle_read()
-
- def set_sync(self):
- pass
-
- #
- # SYNCHRONOUS METHODS
- #
-
- def write(self, data):
- self._writelock_acquire()
- try:
- DualModeChannel.write(self, data)
- finally:
- self._writelock_release()
-
- def flush(self, block=1):
- self._writelock_acquire()
- try:
- DualModeChannel.flush(self, block)
- finally:
- self._writelock_release()
-
- def set_async(self):
- pass
-
- #
- # METHODS USED IN BOTH MODES
- #
-
- def close_when_done(self):
self.will_close = 1
- self.pull_trigger()
+ if not self.async_mode:
+ # For safety, don't close the socket until the
+ # main thread calls handle_write().
+ self.async_mode = 1
+ self.pull_trigger()
Modified: Zope3/trunk/src/zope/server/ftp/server.py
===================================================================
--- Zope3/trunk/src/zope/server/ftp/server.py 2004-09-02 05:24:05 UTC (rev 27407)
+++ Zope3/trunk/src/zope/server/ftp/server.py 2004-09-02 05:31:25 UTC (rev 27408)
@@ -29,7 +29,7 @@
from zope.server.interfaces.ftp import IFTPCommandHandler
from zope.server.linereceiver.lineserverchannel import LineServerChannel
from zope.server.serverbase import ServerBase
-from zope.server.serverchannelbase import ChannelBaseClass
+from zope.server.dualmodechannel import DualModeChannel
status_messages = {
'OPEN_DATA_CONN' : '150 Opening %s mode data connection for file list',
@@ -722,7 +722,7 @@
self.close()
-class RecvChannel(ChannelBaseClass):
+class RecvChannel(DualModeChannel):
""" """
complete_transfer = 0
@@ -732,7 +732,7 @@
self.control_channel = control_channel
self.finish_args = finish_args
self.inbuf = OverflowableBuffer(control_channel.adj.inbuf_overflow)
- ChannelBaseClass.__init__(self, None, None, control_channel.adj)
+ DualModeChannel.__init__(self, None, None, control_channel.adj)
# Note that this channel starts out in async mode.
def writable (self):
@@ -765,7 +765,7 @@
finally:
if self.socket is not None:
# XXX asyncore.dispatcher.close() doesn't like socket == None
- ChannelBaseClass.close(self)
+ DualModeChannel.close(self)
@@ -804,7 +804,7 @@
pass
-class XmitChannel(ChannelBaseClass):
+class XmitChannel(DualModeChannel):
opened = 0
_fileno = None # provide a default for asyncore.dispatcher._fileno
@@ -813,7 +813,7 @@
self.control_channel = control_channel
self.ok_reply_args = ok_reply_args
self.set_sync()
- ChannelBaseClass.__init__(self, None, None, control_channel.adj)
+ DualModeChannel.__init__(self, None, None, control_channel.adj)
def _open(self):
"""Signal the client to open the connection."""
@@ -826,7 +826,7 @@
raise IOError, 'Client FTP connection closed'
if not self.opened:
self._open()
- ChannelBaseClass.write(self, data)
+ DualModeChannel.write(self, data)
def readable(self):
return not self.connected
@@ -864,7 +864,7 @@
finally:
if self.socket is not None:
# XXX asyncore.dispatcher.close() doesn't like socket == None
- ChannelBaseClass.close(self)
+ DualModeChannel.close(self)
class ApplicationXmitStream(object):
Modified: Zope3/trunk/src/zope/server/serverchannelbase.py
===================================================================
--- Zope3/trunk/src/zope/server/serverchannelbase.py 2004-09-02 05:24:05 UTC (rev 27407)
+++ Zope3/trunk/src/zope/server/serverchannelbase.py 2004-09-02 05:31:25 UTC (rev 27408)
@@ -25,22 +25,14 @@
from thread import allocate_lock
from zope.interface import implements
-# Enable ZOPE_SERVER_SIMULT_MODE to enable experimental
-# simultaneous channel mode, which may improve or degrade
-# throughput depending on load characteristics.
-if os.environ.get('ZOPE_SERVER_SIMULT_MODE'):
- from zope.server.dualmodechannel import SimultaneousModeChannel as \
- ChannelBaseClass
-else:
- from zope.server.dualmodechannel import DualModeChannel as ChannelBaseClass
-
+from zope.server.dualmodechannel import DualModeChannel
from zope.server.interfaces import IServerChannel
# Synchronize access to the "running_tasks" attributes.
running_lock = allocate_lock()
-class ServerChannelBase(ChannelBaseClass, object):
+class ServerChannelBase(DualModeChannel, object):
"""Base class for a high-performance, mixed-mode server-side channel."""
implements(IServerChannel)
@@ -59,12 +51,12 @@
running_tasks = 0 # boolean: true when any task is being executed
#
- # ASYNCHRONOUS METHODS (incl. __init__)
+ # ASYNCHRONOUS METHODS (including __init__)
#
def __init__(self, server, conn, addr, adj=None):
"""See async.dispatcher"""
- ChannelBaseClass.__init__(self, conn, addr, adj)
+ DualModeChannel.__init__(self, conn, addr, adj)
self.server = server
self.last_activity = t = self.creation_time
self.check_maintenance(t)
@@ -72,17 +64,17 @@
def add_channel(self, map=None):
"""See async.dispatcher
- This hook keeps track of opened HTTP channels.
+ This hook keeps track of opened channels.
"""
- ChannelBaseClass.add_channel(self, map)
+ DualModeChannel.add_channel(self, map)
self.__class__.active_channels[self._fileno] = self
def del_channel(self, map=None):
"""See async.dispatcher
- This hook keeps track of closed HTTP channels.
+ This hook keeps track of closed channels.
"""
- ChannelBaseClass.del_channel(self, map)
+ DualModeChannel.del_channel(self, map)
ac = self.__class__.active_channels
fd = self._fileno
if fd in ac:
More information about the Zope3-Checkins
mailing list