[Zope3-checkins] SVN: Zope3/trunk/src/zope/server/ Another go at
fixing the asyncore errors.
Shane Hathaway
shane at zope.com
Sun Sep 12 02:02:39 EDT 2004
Log message for revision 27500:
Another go at fixing the asyncore errors.
Strategy:
1. Move work into the main thread, since it's more predictable than
threads.
2. Work around asyncore idiosyncracies. For example, I just found out
that if you close() twice, you might remove some other socket from the
map. Wow, dude.
3. Run the unit tests repeatedly, although they still pass 100% of the
time for me. Bummer.
Changed:
U Zope3/trunk/src/zope/server/dualmodechannel.py
U Zope3/trunk/src/zope/server/ftp/server.py
U Zope3/trunk/src/zope/server/serverchannelbase.py
-=-
Modified: Zope3/trunk/src/zope/server/dualmodechannel.py
===================================================================
--- Zope3/trunk/src/zope/server/dualmodechannel.py 2004-09-11 22:24:02 UTC (rev 27499)
+++ Zope3/trunk/src/zope/server/dualmodechannel.py 2004-09-12 06:02:38 UTC (rev 27500)
@@ -38,11 +38,11 @@
the main loop.
"""
- # will_close is set to 1 to close the socket.
- will_close = 0
+ # will_close is set to True to close the socket.
+ will_close = False
# boolean: async or sync mode
- async_mode = 1
+ async_mode = True
def __init__(self, conn, addr, adj=None):
self.addr = addr
@@ -110,13 +110,13 @@
The main thread will stop calling received().
"""
- self.async_mode = 0
+ self.async_mode = False
#
# SYNCHRONOUS METHODS
#
- def flush(self, block=1):
+ def flush(self, block=True):
"""Sends pending data.
If block is set, this pauses the application. If it is turned
@@ -127,13 +127,13 @@
while self._flush_some():
pass
return
- blocked = 0
+ blocked = False
try:
while self.outbuf:
# We propagate errors to the application on purpose.
if not blocked:
self.socket.setblocking(1)
- blocked = 1
+ blocked = True
self._flush_some()
finally:
if blocked:
@@ -144,7 +144,7 @@
The main thread will begin calling received() again.
"""
- self.async_mode = 1
+ self.async_mode = True
self.pull_trigger()
#
@@ -183,11 +183,11 @@
# Flush all possible.
while self._flush_some():
pass
- self.will_close = 1
+ self.will_close = True
if not self.async_mode:
# For safety, don't close the socket until the
# main thread calls handle_write().
- self.async_mode = 1
+ self.async_mode = True
self.pull_trigger()
def close(self):
@@ -195,4 +195,5 @@
# closed in a thread, the main loop can end up with a bad file
# descriptor.
assert self.async_mode
+ self.connected = False
asyncore.dispatcher.close(self)
Modified: Zope3/trunk/src/zope/server/ftp/server.py
===================================================================
--- Zope3/trunk/src/zope/server/ftp/server.py 2004-09-11 22:24:02 UTC (rev 27499)
+++ Zope3/trunk/src/zope/server/ftp/server.py 2004-09-12 06:02:38 UTC (rev 27500)
@@ -29,7 +29,7 @@
from zope.server.interfaces.ftp import IFTPCommandHandler
from zope.server.linereceiver.lineserverchannel import LineServerChannel
from zope.server.serverbase import ServerBase
-from zope.server.dualmodechannel import DualModeChannel
+from zope.server.dualmodechannel import DualModeChannel, the_trigger
status_messages = {
'OPEN_DATA_CONN' : '150 Opening %s mode data connection for file list',
@@ -92,16 +92,17 @@
# List of commands that are always available
- special_commands = ('cmd_quit', 'cmd_type', 'cmd_noop', 'cmd_user',
- 'cmd_pass')
+ special_commands = (
+ 'cmd_quit', 'cmd_type', 'cmd_noop', 'cmd_user', 'cmd_pass')
# These are the commands that are accessing the filesystem.
# Since this could be also potentially a longer process, these commands
# are also the ones that are executed in a different thread.
- thread_commands = ('cmd_appe', 'cmd_cdup', 'cmd_cwd', 'cmd_dele',
- 'cmd_list', 'cmd_nlst', 'cmd_mdtm', 'cmd_mkd',
- 'cmd_pass', 'cmd_retr', 'cmd_rmd', 'cmd_rnfr',
- 'cmd_rnto', 'cmd_size', 'cmd_stor', 'cmd_stru')
+ thread_commands = (
+ 'cmd_appe', 'cmd_cdup', 'cmd_cwd', 'cmd_dele',
+ 'cmd_list', 'cmd_nlst', 'cmd_mdtm', 'cmd_mkd',
+ 'cmd_pass', 'cmd_retr', 'cmd_rmd', 'cmd_rnfr',
+ 'cmd_rnto', 'cmd_size', 'cmd_stor', 'cmd_stru')
# Define the status messages
status_messages = status_messages
@@ -125,11 +126,10 @@
def __init__(self, server, conn, addr, adj=None):
super(FTPServerChannel, self).__init__(server, conn, addr, adj)
- self.client_addr = (addr[0], 21)
+ self.port_addr = None # The client's PORT address
+ self.passive_listener = None # The PASV listener
+ self.client_dc = None # The data connection
- self.passive_acceptor = None
- self.client_dc = None
-
self.transfer_mode = 'a' # Have to default to ASCII :-|
self.passive_mode = 0
self.cwd = '/'
@@ -145,14 +145,15 @@
"""Open the filesystem using the current credentials."""
return self.server.fs_access.open(self.credentials)
+
def cmd_abor(self, args):
'See IFTPCommandHandler'
assert self.async_mode
self.reply('TRANSFER_ABORTED')
- if self.client_dc is not None:
- self.client_dc.reported = True
- self.client_dc.close()
+ self.abortPassive()
+ self.abortData()
+
def cmd_appe (self, args):
'See IFTPCommandHandler'
return self.cmd_stor(args, 'a')
@@ -229,8 +230,7 @@
self.reply('ERR_NO_LIST', str(err))
return
ok_reply = ('OPEN_DATA_CONN', self.type_map[self.transfer_mode])
- cdc = XmitChannel(self, ok_reply)
- self.client_dc = cdc
+ cdc = RETRChannel(self, ok_reply)
try:
cdc.write(s)
cdc.close_when_done()
@@ -256,7 +256,6 @@
path = self._generatePath(path)
-
if fs.type(path) == 'd' and not directory:
if long:
file_list = map(ls, fs.ls(path))
@@ -342,11 +341,13 @@
def cmd_pasv(self, args):
'See IFTPCommandHandler'
- pc = self.newPassiveAcceptor()
- self.client_dc = None
- port = pc.addr[1]
- ip_addr = pc.control_channel.getsockname()[0]
- self.reply('PASV_MODE_MSG', (','.join(ip_addr.split('.')),
+ assert self.async_mode
+ # Kill any existing passive listener first.
+ self.abortPassive()
+ local_addr = self.getsockname()[0]
+ self.passive_listener = PassiveListener(self, local_addr)
+ port = self.passive_listener.port
+ self.reply('PASV_MODE_MSG', (','.join(local_addr.split('.')),
port/256,
port%256 ) )
@@ -360,7 +361,7 @@
# I'm assuming one for now...
# TODO: we should (optionally) verify that the
# ip number belongs to the client. [wu-ftpd does this?]
- self.client_addr = (ip, port)
+ self.port_addr = (ip, port)
self.reply('SUCCESS_200', 'PORT')
@@ -392,9 +393,8 @@
self.restart_position = 0
ok_reply = 'OPEN_CONN', (self.type_map[self.transfer_mode], path)
- cdc = XmitChannel(self, ok_reply)
- self.client_dc = cdc
- outstream = ApplicationXmitStream(cdc)
+ cdc = RETRChannel(self, ok_reply)
+ outstream = ApplicationOutputStream(cdc)
try:
fs.readfile(path, outstream, start)
@@ -484,14 +484,13 @@
self.reply('ERR_OPEN_WRITE', "Can't write file")
return
- cdc = RecvChannel(self, (path, mode, start))
- self.client_dc = cdc
+ cdc = STORChannel(self, (path, mode, start))
+ self.syncConnectData(cdc)
self.reply('OPEN_CONN', (self.type_map[self.transfer_mode], path))
- self.connectDataChannel(cdc)
- def finishedRecv(self, buffer, (path, mode, start)):
- """Called by RecvChannel when the transfer is finished."""
+ def finishSTOR(self, buffer, (path, mode, start)):
+ """Called by STORChannel when the client has sent all data."""
assert not self.async_mode
try:
infile = buffer.getfile()
@@ -530,10 +529,8 @@
# if t not in ['a','e','i','l']:
if t not in ['a','i','l']:
self.reply('ERR_ARGS')
-
elif t == 'l' and (len(args) > 2 and args[2] != '8'):
self.reply('WRONG_BYTE_SIZE')
-
else:
self.transfer_mode = t
self.reply('TYPE_SET_OK', self.type_map[t])
@@ -548,7 +545,6 @@
else:
self.reply('ERR_ARGS')
- #
############################################################
def _generatePath(self, args):
@@ -558,55 +554,61 @@
path = posixpath.join(self.cwd, args)
return posixpath.normpath(path)
- def newPassiveAcceptor(self):
- # ensure that only one of these exists at a time.
- assert self.async_mode
- if self.passive_acceptor is not None:
- self.passive_acceptor.close()
- self.passive_acceptor = None
- self.passive_acceptor = PassiveAcceptor(self)
- return self.passive_acceptor
+ def syncConnectData(self, cdc):
+ """Calls asyncConnectData in the asynchronous thread."""
+ the_trigger.pull_trigger(lambda: self.asyncConnectData(cdc))
- def connectDataChannel(self, cdc):
- """Attempt to connect the data channel."""
- pa = self.passive_acceptor
- if pa:
- # PASV mode.
- if pa.ready:
- # a connection has already been made.
- conn, addr = pa.ready
- cdc.set_socket (conn)
- cdc.connected = 1
- self.passive_acceptor = None
- # else we're still waiting for a connect to the PASV port.
- # FTP Explorer is known to do this.
- else:
- # not in PASV mode.
- ip, port = self.client_addr
- cdc.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- if self.bind_local_minus_one:
- cdc.bind(('', self.server.port - 1))
- try:
- cdc.connect((ip, port))
- except socket.error:
- self.reply('NO_DATA_CONN')
- cdc.reported = True
- cdc.close_when_done()
+ def asyncConnectData(self, cdc):
+ """Starts connecting the data channel.
+ This is a little complicated because the data connection might
+ be established already (in passive mode) or might be
+ established in the near future (in port or passive mode.) If
+ the connection has already been established,
+ self.passive_listener already has a socket and is waiting for
+ a call to connectData(). If the connection has not been
+ established in passive mode, the passive listener will
+ remember the data channel and send it when it's ready. In port
+ mode, this method tells the data connection to connect.
+ """
+ self.abortData()
+ self.client_dc = cdc
+ if self.passive_listener is not None:
+ # Connect via PASV
+ self.passive_listener.connectData(cdc)
+ if self.port_addr:
+ # Connect via PORT
+ a = self.port_addr
+ self.port_addr = None
+ cdc.connectPort(a)
+
+ def connectedPassive(self):
+ """Accepted a passive connection."""
+ self.passive_listener = None
+
+ def abortPassive(self):
+ """Close the passive listener."""
+ if self.passive_listener is not None:
+ self.passive_listener.abort()
+ self.passive_listener = None
+
+ def abortData(self):
+ """Close the data connection."""
+ if self.client_dc is not None:
+ self.client_dc.abort()
+ self.client_dc = None
+
def closedData(self):
self.client_dc = None
def close(self):
+ # Make sure the passive listener and active client DC get closed.
+ self.abortPassive()
+ self.abortData()
LineServerChannel.close(self)
- # Make sure the client DC gets closed too.
- cdc = self.client_dc
- if cdc is not None:
- self.client_dc = None
- cdc.close()
-
def ls(ls_info):
"""Formats a directory entry similarly to the 'ls' command.
"""
@@ -664,15 +666,15 @@
)
-class PassiveAcceptor(asyncore.dispatcher):
+class PassiveListener(asyncore.dispatcher):
"""This socket accepts a data connection, used when the server has
been placed in passive mode. Although the RFC implies that we
- ought to be able to use the same acceptor over and over again,
+ ought to be able to use the same listener over and over again,
this presents a problem: how do we shut it off, so that we are
accepting connections only when we expect them? [we can't]
wuftpd, and probably all the other servers, solve this by
- allowing only one connection to hit this acceptor. They then
+ allowing only one connection to hit this listener. They then
close it. Any subsequent data-connection command will then try
for the default port on the client side [which is of course
never there]. So the 'always-send-PORT/PASV' behavior seems
@@ -681,51 +683,89 @@
Another note: wuftpd will also be listening on the channel as
soon as the PASV command is sent. It does not wait for a data
command first.
+ """
- --- we need to queue up a particular behavior:
- 1) xmit : queue up producer[s]
- 2) recv : the file object
-
- It would be nice if we could make both channels the same.
- Hmmm.."""
-
- ready = None
-
- def __init__ (self, control_channel):
+ def __init__ (self, control_channel, local_addr):
asyncore.dispatcher.__init__ (self)
self.control_channel = control_channel
+ self.accepted = None # The accepted socket address
+ self.client_dc = None # The data connection to accept the socket
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- # bind to an address on the interface that the
- # control connection is coming from.
- self.bind((self.control_channel.getsockname()[0], 0))
- self.addr = self.getsockname()
+ self.closed = False
+ # bind to an address on the interface where the
+ # control connection is connected.
+ self.bind((local_addr, 0))
+ self.port = self.getsockname()[1]
self.listen(1)
def log (self, *ignore):
pass
+ def abort(self):
+ """Abort the passive listener."""
+ if not self.closed:
+ self.closed = True
+ self.close()
+ if self.accepted is not None:
+ self.accepted.close()
+
def handle_accept (self):
- conn, addr = self.accept()
- conn.setblocking(0)
- dc = self.control_channel.client_dc
- if dc is not None:
- dc.set_socket(conn)
- dc.addr = addr
- dc.connected = 1
- self.control_channel.passive_acceptor = None
- else:
- self.ready = conn, addr
+ self.accepted, addr = self.accept()
+ self.accepted.setblocking(0)
+ self.closed = True
self.close()
+ if self.client_dc is not None:
+ self.connectData(self.client_dc)
+ def connectData(self, cdc):
+ """Sends the connection to the data channel.
+ If the connection has not yet been made, sends the connection
+ when it becomes available.
+ """
+ if self.accepted is not None:
+ cdc.set_socket(self.accepted)
+ # Note that this method will be called twice, once by the
+ # control channel, and once by handle_accept, and the two
+ # calls may come in either order. If handle_accept calls
+ # first, we don't want to call set_socket() on the data
+ # connection twice, so set self.accepted = None to keep a
+ # record that the data connection already has the socket.
+ self.accepted = None
+ self.control_channel.connectedPassive()
+ else:
+ self.client_dc = cdc
+
+
class FTPDataChannel(DualModeChannel):
- """Base class for FTP data connections"""
+ """Base class for FTP data connections.
+
+ Note that data channels are always in async mode.
+ """
def __init__ (self, control_channel):
self.control_channel = control_channel
self.reported = False
+ self.closed = False
DualModeChannel.__init__(self, None, None, control_channel.adj)
+ def connectPort(self, client_addr):
+ """Connect to a port on the client"""
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ #if bind_local_minus_one:
+ # self.bind(('', self.control_channel.server.port - 1))
+ try:
+ self.sock.connect(self.client_addr)
+ except socket.error:
+ self.report('NO_DATA_CONN')
+
+ def abort(self):
+ """Abort the data connection without reporting."""
+ self.reported = True
+ if not self.closed:
+ self.closed = True
+ self.close()
+
def report(self, *reply_args):
"""Reports the result of the data transfer."""
self.reported = True
@@ -740,18 +780,18 @@
"""Notifies the control channel when the data connection closes."""
c = self.control_channel
try:
- if c is not None and not self.reported:
+ if c is not None and c.connected and not self.reported:
self.reportDefault()
finally:
self.control_channel = None
DualModeChannel.close(self)
if c is not None:
c.closedData()
-
-class RecvChannel(FTPDataChannel):
- """FTP data receive channel"""
+class STORChannel(FTPDataChannel):
+ """Channel for uploading one file from client to server"""
+
complete_transfer = 0
_fileno = None # provide a default for asyncore.dispatcher._fileno
@@ -774,7 +814,7 @@
def handle_close (self):
"""Client closed, indicating EOF."""
c = self.control_channel
- task = FinishedRecvTask(c, self.inbuf, self.finish_args)
+ task = FinishSTORTask(c, self.inbuf, self.finish_args)
self.complete_transfer = 1
self.close()
c.queue_task(task)
@@ -782,12 +822,16 @@
def reportDefault(self):
if not self.complete_transfer:
self.report('TRANSFER_ABORTED')
- # else the transfer completed and FinishedRecvTask will
- # provide a complete reply through finishedRecv().
+ # else the transfer completed and FinishSTORTask will
+ # provide a complete reply through finishSTOR().
-class FinishedRecvTask(object):
+class FinishSTORTask(object):
+ """Calls control_channel.finishSTOR() in an application thread.
+ This task executes after the client has finished uploading.
+ """
+
implements(ITask)
def __init__(self, control_channel, inbuf, finish_args):
@@ -802,7 +846,7 @@
c = self.control_channel
try:
try:
- c.finishedRecv(self.inbuf, self.finish_args)
+ c.finishSTOR(self.inbuf, self.finish_args)
except socket.error:
close_on_finish = 1
if c.adj.log_socket_errors:
@@ -820,22 +864,21 @@
pass
-class XmitChannel(FTPDataChannel):
- """FTP data send channel"""
+class RETRChannel(FTPDataChannel):
+ """Channel for downloading one file from server to client"""
opened = 0
_fileno = None # provide a default for asyncore.dispatcher._fileno
def __init__ (self, control_channel, ok_reply_args):
self.ok_reply_args = ok_reply_args
- self.set_sync()
FTPDataChannel.__init__(self, control_channel)
def _open(self):
"""Signal the client to open the connection."""
self.opened = 1
self.control_channel.reply(*self.ok_reply_args)
- self.control_channel.connectDataChannel(self)
+ self.control_channel.asyncConnectData(self)
def write(self, data):
if self.control_channel is None:
@@ -875,14 +918,16 @@
self.report('TRANSFER_ABORTED')
-class ApplicationXmitStream(object):
- """Provide stream output, remapping close() to close_when_done().
+class ApplicationOutputStream(object):
+ """Provide stream output to RETRChannel.
+
+ Maps close() to close_when_done().
"""
- def __init__(self, xmit_channel):
- self.write = xmit_channel.write
- self.flush = xmit_channel.flush
- self.close = xmit_channel.close_when_done
+ def __init__(self, retr_channel):
+ self.write = retr_channel.write
+ self.flush = retr_channel.flush
+ self.close = retr_channel.close_when_done
class FTPServer(ServerBase):
Modified: Zope3/trunk/src/zope/server/serverchannelbase.py
===================================================================
--- Zope3/trunk/src/zope/server/serverchannelbase.py 2004-09-11 22:24:02 UTC (rev 27499)
+++ Zope3/trunk/src/zope/server/serverchannelbase.py 2004-09-12 06:02:38 UTC (rev 27500)
@@ -211,6 +211,7 @@
raise
def cancel(self):
+ """Cancels all pending tasks"""
task_lock.acquire()
try:
if self.tasks:
More information about the Zope3-Checkins
mailing list