[Zope3-checkins] SVN: Zope3/trunk/src/zope/server/ Call
close_when_done() rather than close() in FTP data connections.
Shane Hathaway
shane at zope.com
Mon Sep 6 21:52:17 EDT 2004
Log message for revision 27460:
Call close_when_done() rather than close() in FTP data connections.
ftp/server.py: a lot of rearranging was necessary to fix code that
called close() with arguments. It's important to send a report at the
end of FTP data connections, but the old way relied on close() with
arguments and we sometimes need to call close_when_done() instead of
close().
dualmodechannel.py: added an assertion that verifies close() is always
called in asynchronous mode. See the comment.
Changed:
U Zope3/trunk/src/zope/server/dualmodechannel.py
U Zope3/trunk/src/zope/server/ftp/server.py
-=-
Modified: Zope3/trunk/src/zope/server/dualmodechannel.py
===================================================================
--- Zope3/trunk/src/zope/server/dualmodechannel.py 2004-09-07 01:45:52 UTC (rev 27459)
+++ Zope3/trunk/src/zope/server/dualmodechannel.py 2004-09-07 01:52:16 UTC (rev 27460)
@@ -82,7 +82,7 @@
return not self.will_close
def handle_read(self):
- if not self.async_mode:
+ if not self.async_mode or self.will_close:
return
try:
data = self.recv(self.adj.recv_bytes)
@@ -189,3 +189,10 @@
# main thread calls handle_write().
self.async_mode = 1
self.pull_trigger()
+
+ def close(self):
+ # Always close in asynchronous mode. If the connection is
+ # closed in a thread, the main loop can end up with a bad file
+ # descriptor.
+ assert self.async_mode
+ asyncore.dispatcher.close(self)
Modified: Zope3/trunk/src/zope/server/ftp/server.py
===================================================================
--- Zope3/trunk/src/zope/server/ftp/server.py 2004-09-07 01:45:52 UTC (rev 27459)
+++ Zope3/trunk/src/zope/server/ftp/server.py 2004-09-07 01:52:16 UTC (rev 27460)
@@ -147,10 +147,11 @@
def cmd_abor(self, args):
'See IFTPCommandHandler'
+ assert self.async_mode
+ self.reply('TRANSFER_ABORTED')
if self.client_dc is not None:
- self.client_dc.close('TRANSFER_ABORTED')
- else:
- self.reply('TRANSFER_ABORTED')
+ self.client_dc.reported = True
+ self.client_dc.close()
def cmd_appe (self, args):
'See IFTPCommandHandler'
@@ -208,14 +209,11 @@
except GetoptError:
self.reply('ERR_ARGS')
return
-
if len(args) > 1:
self.reply('ERR_ARGS')
return
-
args = args and args[0] or ''
-
fs = self._getFileSystem()
path = self._generatePath(args)
if not fs.type(path):
@@ -237,7 +235,9 @@
cdc.write(s)
cdc.close_when_done()
except OSError, err:
- cdc.close('ERR_NO_LIST', str(err))
+ self.reply('ERR_NO_LIST', str(err))
+ cdc.reported = True
+ cdc.close_when_done()
def getList(self, args, long=0, directory=0):
# we need to scan the command line for arguments to '/bin/ls'...
@@ -271,7 +271,6 @@
return '\r\n'.join(file_list) + '\r\n'
-
def cmd_mdtm(self, args):
'See IFTPCommandHandler'
fs = self._getFileSystem()
@@ -401,9 +400,13 @@
fs.readfile(path, outstream, start)
cdc.close_when_done()
except OSError, err:
- cdc.close('ERR_OPEN_READ', str(err))
+ self.reply('ERR_OPEN_READ', str(err))
+ cdc.reported = True
+ cdc.close_when_done()
except IOError, err:
- cdc.close('ERR_IO', str(err))
+ self.reply('ERR_IO', str(err))
+ cdc.reported = True
+ cdc.close_when_done()
def cmd_rest(self, args):
@@ -489,7 +492,7 @@
def finishedRecv(self, buffer, (path, mode, start)):
"""Called by RecvChannel when the transfer is finished."""
- # Always called in a task.
+ assert not self.async_mode
try:
infile = buffer.getfile()
infile.seek(0)
@@ -555,18 +558,17 @@
path = posixpath.join(self.cwd, args)
return posixpath.normpath(path)
-
def newPassiveAcceptor(self):
# ensure that only one of these exists at a time.
+ assert self.async_mode
if self.passive_acceptor is not None:
self.passive_acceptor.close()
self.passive_acceptor = None
self.passive_acceptor = PassiveAcceptor(self)
return self.passive_acceptor
-
-
def connectDataChannel(self, cdc):
+ """Attempt to connect the data channel."""
pa = self.passive_acceptor
if pa:
# PASV mode.
@@ -575,7 +577,6 @@
conn, addr = pa.ready
cdc.set_socket (conn)
cdc.connected = 1
- self.passive_acceptor.close()
self.passive_acceptor = None
# else we're still waiting for a connect to the PASV port.
# FTP Explorer is known to do this.
@@ -588,16 +589,13 @@
try:
cdc.connect((ip, port))
except socket.error:
- cdc.close('NO_DATA_CONN')
+ self.reply('NO_DATA_CONN')
+ cdc.reported = True
+ cdc.close_when_done()
-
- def notifyClientDCClosing(self, *reply_args):
- if self.client_dc is not None:
- self.client_dc = None
- if reply_args:
- self.reply(*reply_args)
+ def closedData(self):
+ self.client_dc = None
-
def close(self):
LineServerChannel.close(self)
# Make sure the client DC gets closed too.
@@ -703,11 +701,9 @@
self.addr = self.getsockname()
self.listen(1)
-
def log (self, *ignore):
pass
-
def handle_accept (self):
conn, addr = self.accept()
conn.setblocking(0)
@@ -722,18 +718,48 @@
self.close()
-class RecvChannel(DualModeChannel):
- """ """
+class FTPDataChannel(DualModeChannel):
+ """Base class for FTP data connections"""
+
+ def __init__ (self, control_channel):
+ self.control_channel = control_channel
+ self.reported = False
+ DualModeChannel.__init__(self, None, None, control_channel.adj)
+ def report(self, *reply_args):
+ """Reports the result of the data transfer."""
+ self.reported = True
+ if self.control_channel is not None:
+ self.control_channel.reply(*reply_args)
+
+ def reportDefault(self):
+ """Provide a default report on close."""
+ pass
+
+ def close(self):
+ """Notifies the control channel when the data connection closes."""
+ c = self.control_channel
+ try:
+ if c is not None and not self.reported:
+ self.reportDefault()
+ finally:
+ self.control_channel = None
+ DualModeChannel.close(self)
+ if c is not None:
+ c.closedData()
+
+
+class RecvChannel(FTPDataChannel):
+ """FTP data receive channel"""
+
complete_transfer = 0
_fileno = None # provide a default for asyncore.dispatcher._fileno
def __init__ (self, control_channel, finish_args):
- self.control_channel = control_channel
self.finish_args = finish_args
self.inbuf = OverflowableBuffer(control_channel.adj.inbuf_overflow)
- DualModeChannel.__init__(self, None, None, control_channel.adj)
- # Note that this channel starts out in async mode.
+ FTPDataChannel.__init__(self, control_channel)
+ # Note that this channel starts in async mode.
def writable (self):
return 0
@@ -753,22 +779,13 @@
self.close()
c.queue_task(task)
- def close(self, *reply_args):
- try:
- c = self.control_channel
- if c is not None:
- self.control_channel = None
- if not self.complete_transfer and not reply_args:
- # Not all data transferred
- reply_args = ('TRANSFER_ABORTED',)
- c.notifyClientDCClosing(*reply_args)
- finally:
- if self.socket is not None:
- # XXX asyncore.dispatcher.close() doesn't like socket == None
- DualModeChannel.close(self)
+ def reportDefault(self):
+ if not self.complete_transfer:
+ self.report('TRANSFER_ABORTED')
+ # else the transfer completed and FinishedRecvTask will
+ # provide a complete reply through finishedRecv().
-
class FinishedRecvTask(object):
implements(ITask)
@@ -803,16 +820,16 @@
pass
-class XmitChannel(DualModeChannel):
+class XmitChannel(FTPDataChannel):
+ """FTP data send channel"""
opened = 0
_fileno = None # provide a default for asyncore.dispatcher._fileno
def __init__ (self, control_channel, ok_reply_args):
- self.control_channel = control_channel
self.ok_reply_args = ok_reply_args
self.set_sync()
- DualModeChannel.__init__(self, None, None, control_channel.adj)
+ FTPDataChannel.__init__(self, control_channel)
def _open(self):
"""Signal the client to open the connection."""
@@ -825,7 +842,7 @@
raise IOError, 'Client FTP connection closed'
if not self.opened:
self._open()
- DualModeChannel.write(self, data)
+ FTPDataChannel.write(self, data)
def readable(self):
return not self.connected
@@ -836,34 +853,26 @@
self.recv(1)
except:
# The connection failed.
- self.close('NO_DATA_CONN')
+ self.report('NO_DATA_CONN')
+ self.close()
def handle_connect(self):
pass
def handle_comm_error(self):
- self.close('TRANSFER_ABORTED')
+ self.report('TRANSFER_ABORTED')
+ self.close()
- def close(self, *reply_args):
- try:
- c = self.control_channel
- if c is not None:
- self.control_channel = None
- if not reply_args:
- if not len(self.outbuf):
- # All data transferred
- if not self.opened:
- # Zero-length file
- self._open()
- reply_args = ('TRANS_SUCCESS',)
- else:
- # Not all data transferred
- reply_args = ('TRANSFER_ABORTED',)
- c.notifyClientDCClosing(*reply_args)
- finally:
- if self.socket is not None:
- # XXX asyncore.dispatcher.close() doesn't like socket == None
- DualModeChannel.close(self)
+ def reportDefault(self):
+ if not len(self.outbuf):
+ # All data transferred
+ if not self.opened:
+ # Zero-length file
+ self._open()
+ self.report('TRANS_SUCCESS')
+ else:
+ # Not all data transferred
+ self.report('TRANSFER_ABORTED')
class ApplicationXmitStream(object):
More information about the Zope3-Checkins
mailing list