[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/FTP - FTPServerChannel.py:1.1.2.14 RecvChannel.py:1.1.2.5 XmitChannel.py:1.1.2.4
Shane Hathaway
shane@cvs.zope.org
Fri, 5 Apr 2002 18:09:14 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server/FTP
In directory cvs.zope.org:/tmp/cvs-serv28562
Modified Files:
Tag: Zope3-Server-Branch
FTPServerChannel.py RecvChannel.py XmitChannel.py
Log Message:
Primarily made sure file transfers don't span threads, while avoiding blocking
the application and flushing buffers as quickly as possible. Fixed bugs
and refactored some.
=== Zope3/lib/python/Zope/Server/FTP/FTPServerChannel.py 1.1.2.13 => 1.1.2.14 ===
from Zope.Server.LineReceiver.LineServerChannel import LineServerChannel
from FTPStatusMessages import status_msgs
-from FileProducer import FileProducer
from IFTPCommandHandler import IFTPCommandHandler
from PassiveAcceptor import PassiveAcceptor
@@ -74,13 +73,10 @@
self.client_addr = (addr[0], 21)
- self.active_channels = {} # Class-specific channel tracker
- self.next_channel_cleanup = [0] # Class-specific cleanup time
-
self.passive_acceptor = None
self.client_dc = None
- self.transfer_mode = 'i' # Use binary by default
+ self.transfer_mode = 'a' # Have to default to ASCII :-|
self.passive_mode = 0
self.cwd = '/'
self._rnfr = None
@@ -104,7 +100,7 @@
def cmd_appe (self, args):
'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
- return self.cmd_stor(args, 'ab')
+ return self.cmd_stor(args, 'a')
def cmd_cdup(self, args):
@@ -150,19 +146,21 @@
self.reply('HELP_END')
- def cmd_list(self, args):
+ def cmd_list(self, args, long=1):
'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
args = args.split()
try:
- producer = self.getDirectoryList(args, 1)
+ s = self.getDirectoryList(args, long)
except os.error, why:
- self.reply('ERR_NO_LIST', repr(why))
+ self.reply('ERR_NO_LIST', str(why))
return
-
- self.reply('OPEN_DATA_CONN', self.type_map[self.transfer_mode])
- self.createXmitChannel()
- self.client_dc.push_with_producer(producer)
- self.client_dc.close_when_done()
+ try:
+ self.reply('OPEN_DATA_CONN', self.type_map[self.transfer_mode])
+ self.openDataChannel(XmitChannel, s)
+ finally:
+ if hasattr(s, 'close'):
+ # Close the stream.
+ s.close()
def cmd_mdtm(self, args):
@@ -202,21 +200,7 @@
def cmd_nlst(self, args):
'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
- # ncftp adds the -FC argument for the user-visible 'nlist'
- # command. We could try to emulate ls flags, but not just yet.
- args = args.split()
- if '-FC' in args:
- args.remove('-FC')
- try:
- producer = self.getDirectoryList(args, 0)
- except os.error, why:
- self.reply('ERR_NO_LIST', repr(why))
- return
-
- self.createXmitChannel()
- self.client_dc.push_with_producer(producer)
- self.client_dc.close_when_done()
- self.reply('OPEN_DATA_CONN', (self.type_map[self.transfer_mode],))
+ self.cmd_list(args, 0)
def cmd_noop(self, args):
@@ -267,7 +251,7 @@
def cmd_pwd(self, args):
'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
- self.reply('SUCCESS_257', self.cwd)
+ self.reply('ALREADY_CURRENT', self.cwd)
def cmd_quit(self, args):
@@ -284,34 +268,34 @@
path = self._generatePath(args)
if not self.server.filesystem.isfile(path):
- #self.log_info ('checking %s' % file)
- self.reply(550, 2, path)
+ self.reply('ERR_IS_NOT_FILE', path)
else:
try:
- # FIXME: for some reason, 'rt' isn't working on win95
- mode = 'r'+self.type_mode_map[self.transfer_mode]
+ mode = 'r' + self.type_mode_map[self.transfer_mode]
fd = self.server.filesystem.open(path, mode)
except IOError, why:
- self.reply('ERR_OPEN_READ', repre(why))
+ self.reply('ERR_OPEN_READ', str(why))
return
- self.reply('OPEN_CONN',
- (self.type_map[self.transfer_mode], path) )
- self.createXmitChannel()
-
- if self.restart_position:
- # try to position the file as requested, but
- # give up silently on failure (the 'file object'
- # may not support seek())
- try:
- fd.seek(self.restart_position)
- except:
- pass
- self.restart_position = 0
-
- self.client_dc.push_with_producer (
- FileProducer(self.server, self.client_dc, fd)
- )
- self.client_dc.close_when_done()
+ try:
+ self.reply('OPEN_CONN',
+ (self.type_map[self.transfer_mode], path) )
+
+ if self.restart_position:
+ # try to position the file as requested, but
+ # give up silently on failure (the 'file object'
+ # may not support seek())
+ try:
+ fd.seek(self.restart_position)
+ except:
+ pass
+ self.restart_position = 0
+
+ self.openDataChannel(XmitChannel, fd)
+
+ finally:
+ if hasattr(fd, 'close'):
+ # Close the stream.
+ fd.close()
def cmd_rest(self, args):
@@ -370,7 +354,7 @@
self.server.filesystem.stat(path)[stat.ST_SIZE])
- def cmd_stor(self, args, mode='wb'):
+ def cmd_stor(self, args, write_mode='w'):
'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
if not args:
self.reply('CMD_UNKNOWN', 'STOR')
@@ -381,14 +365,44 @@
restart_position = 0
self.reply('ERR_RESTART_STOR')
return
- # todo: handle that type flag
+ mode = write_mode + self.type_mode_map[self.transfer_mode]
+
try:
- fd = self.server.filesystem.open(args, mode)
+ # Optionally verify the file can be opened,
+ # but don't open it yet. The actually write
+ # should be transactional without holding up the
+ # application.
+ fs = self.server.filesystem
+ if hasattr(fs, 'check_open'):
+ fs.check_open(path, mode)
except IOError, why:
- self.reply('ERR_OPEN_WRITE', repr(why))
+ self.reply('ERR_OPEN_WRITE', str(why))
return
self.reply('OPEN_CONN', (self.type_map[self.transfer_mode], file) )
- self.createRecvChannel(fd)
+ self.openDataChannel(RecvChannel, (path, mode))
+
+
+ def finishedRecv(self, buffer, path, mode):
+ """Called by RecvChannel when the transfer is finished."""
+ # Always called in a task.
+ try:
+ outfile = self.server.filesystem.open(path, mode)
+ try:
+ copy_bytes = self.adj.copy_bytes
+ while 1:
+ data = buffer.get(copy_bytes)
+ if not data:
+ break
+ buffer.skip(len(data), 1)
+ outfile.write(data)
+ finally:
+ print 'cp4'
+ outfile.close()
+ print 'cp5'
+ except IOError, why:
+ self.reply('ERR_OPEN_WRITE', str(why))
+ return
+ self.reply('TRANS_SUCCESS')
def cmd_stru(self, args):
@@ -418,8 +432,7 @@
self.reply('WRONG_BYTE_SIZE')
else:
- if t == 'a':
- self.transfer_mode = t
+ self.transfer_mode = t
self.reply('TYPE_SET_OK', self.type_map[t])
@@ -454,7 +467,7 @@
def listdir (self, path, long=0):
- """returns a producer"""
+ """returns a string or stream"""
return self.server.filesystem.listdir(path, long)
@@ -476,66 +489,33 @@
return self.listdir(dir, long)
- def createXmitChannel(self):
- """In PASV mode, the connection may or may _not_ have been
- made yet. [although in most cases it is... FTP Explorer
- being the only exception I've yet seen]. This gets
- somewhat confusing because things may happen in any
- order...
- """
+ def openDataChannel(self, class_, *args):
pa = self.passive_acceptor
if pa:
+ # PASV mode.
if pa.ready:
# a connection has already been made.
- conn, addr = self.passive_acceptor.ready
- cdc = XmitChannel(self, addr)
- cdc.set_socket(conn)
+ conn, addr = pa.ready
+ cdc = class_(self, addr, *args)
+ cdc.set_socket (conn)
cdc.connected = 1
self.passive_acceptor.close()
self.passive_acceptor = None
else:
# we're still waiting for a connect to the PASV port.
- cdc = XmitChannel(self)
-
+ # FTP Explorer is known to do this.
+ cdc = class_(self, None, *args)
else:
# not in PASV mode.
ip, port = self.client_addr
- cdc = XmitChannel(self, self.client_addr)
+ cdc = class_(self, self.client_addr, *args)
cdc.create_socket(socket.AF_INET, socket.SOCK_STREAM)
if self.bind_local_minus_one:
cdc.bind(('', self.server.port - 1))
try:
- cdc.connect ((ip, port))
+ cdc.connect((ip, port))
except socket.error, why:
self.reply(425)
self.client_dc = cdc
-
- # pretty much the same as xmit, but only right on the verge of
- # being worth a merge.
- def createRecvChannel(self, fd):
- pa = self.passive_acceptor
- if pa:
- if pa.ready:
- # a connection has already been made.
- conn, addr = pa.ready
- cdc = RecvChannel(self, addr, fd)
- cdc.set_socket (conn)
- cdc.connected = 1
- self.passive_acceptor.close()
- self.passive_acceptor = None
- else:
- # we're still waiting for a connect to the PASV port.
- cdc = RecvChannel(self, None, fd)
- else:
- # not in PASV mode.
- ip, port = self.client_addr
- cdc = RecvChannel(self, self.client_addr, fd)
- cdc.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- cdc.connect ((ip, port))
- except socket.error, why:
- self.reply(425)
-
- self.client_dc = cdc
=== Zope3/lib/python/Zope/Server/FTP/RecvChannel.py 1.1.2.4 => 1.1.2.5 ===
$Id$
"""
-import asyncore
-from Zope.Server.Counter import Counter
+from Zope.Server.ServerChannelBase import ChannelBaseClass
+from Zope.Server.Buffers import OverflowableBuffer
+from Zope.Server.ITask import ITask
-class RecvChannel(asyncore.dispatcher):
+
+class RecvChannel(ChannelBaseClass):
""" """
- def __init__ (self, channel, client_addr, fd):
- self.channel = channel
+ def __init__ (self, cmd_channel, client_addr, finish_args,
+ adj=None, socket_map=None):
+ self.cmd_channel = cmd_channel
self.client_addr = client_addr
- self.fd = fd
- asyncore.dispatcher.__init__ (self)
- self.bytes_in = Counter()
-
+ self.finish_args = finish_args
+ ChannelBaseClass.__init__(self, None, None, adj, socket_map)
+ self.inbuf = OverflowableBuffer(self.adj.inbuf_overflow)
- def log (self, *ignore):
- pass
+## def log (self, *ignore):
+## pass
+ def writable (self):
+ return 0
def handle_connect (self):
pass
+ def received (self, data):
+ self.inbuf.append(data)
+
+ def handle_close (self):
+ c = self.cmd_channel
+ task = FinishedRecvTask(c, self.inbuf, self.finish_args)
+ self.close()
+ self.cmd_channel = None
+ c.start_task(task)
- def writable (self):
- return 0
- def recv (*args):
- result = apply (asyncore.dispatcher.recv, args)
- self = args[0]
- self.bytes_in.increment(len(result))
- return result
+class FinishedRecvTask:
+ __implements__ = ITask
- buffer_size = 8192
+ def __init__(self, cmd_channel, inbuf, args):
+ self.cmd_channel = cmd_channel
+ self.inbuf = inbuf
+ self.args = args
- def handle_read (self):
- block = self.recv (self.buffer_size)
- if block:
+ ############################################################
+ # Implementation methods for interface
+ # Zope.Server.ITask
+
+ def service(self):
+ """Called to execute the task.
+ """
+ close_on_finish = 0
+ c = self.cmd_channel
+ try:
try:
- self.fd.write (block)
- except IOError:
- self.log_info ('got exception writing block...', 'error')
+ c.finishedRecv(self.inbuf, *self.args)
+ except socket.error:
+ close_on_finish = 1
+ if c.adj.log_socket_errors:
+ raise
+ finally:
+ c.end_task(close_on_finish)
+
+
+ def cancel(self):
+ 'See Zope.Server.ITask.ITask'
+ self.cmd_channel.close_when_done()
- def handle_close (self):
- s = self.channel.server
- s.total_files_in.increment()
- s.total_bytes_in.increment(self.bytes_in.as_long())
- self.fd.close()
- self.channel.reply('TRANS_SUCCESS')
- self.close()
+ def defer(self):
+ 'See Zope.Server.ITask.ITask'
+ pass
+
+ #
+ ############################################################
+
=== Zope3/lib/python/Zope/Server/FTP/XmitChannel.py 1.1.2.3 => 1.1.2.4 ===
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""
-
-$Id$
-"""
-
-import asynchat
-
-
-class XmitChannel(asynchat.async_chat, object):
-
- # for an ethernet, you want this to be fairly large, in fact, it
- # _must_ be large for performance comparable to an ftpd. [64k] we
- # ought to investigate automatically-sized buffers...
- ac_out_buffer_size = 16384
-
- bytes_out = 0
-
- def __init__ (self, channel, client_addr=None):
- self.channel = channel
- self.client_addr = client_addr
- super(XmitChannel, self).__init__()
-
-
- def log (*args):
- pass
-
-
- def readable (self):
- return not self.connected
-
-
- def writable (self):
- return 1
-
-
- def send (self, data):
- result = super(XmitChannel, self).send(data)
- self.bytes_out = self.bytes_out + result
- return result
-
-
- def handle_error (self):
- # usually this is to catch an unexpected disconnect.
- # XXX: Helpfule for debugging
- import traceback
- traceback.print_exc()
- self.log_info ('unexpected disconnect on data xmit channel', 'error')
- try:
- self.close()
- except:
- pass
-
- # TODO: there's a better way to do this. we need to be able to
- # put 'events' in the producer fifo. to do this cleanly we need
- # to reposition the 'producer' fifo as an 'event' fifo.
-
- # dummy function to suppress warnings caused by some FTP clients
- def handle_connect(self):
- pass
-
-
- def close (self):
- c = self.channel
- s = c.server
- c.client_dc = None
- s.total_files_out.increment()
- s.total_bytes_out.increment (self.bytes_out)
- if not len(self.producer_fifo):
- c.reply('TRANS_SUCCESS')
- elif not c.closed:
- c.reply('TRANSFER_ABORTED')
- del c
- del s
- del self.channel
- asynchat.async_chat.close(self)
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+from Zope.Server.ServerChannelBase import ChannelBaseClass
+
+
+class XmitChannel(ChannelBaseClass):
+
+ def __init__ (self, cmd_channel, client_addr, s,
+ adj=None, socket_map=None):
+ self.cmd_channel = cmd_channel
+ self.client_addr = client_addr
+ ChannelBaseClass.__init__(self, None, None, adj, socket_map)
+ # Send the stream and close when finished.
+ self.writeAll(s)
+ self.close_when_done()
+
+ def writeAll(self, s):
+ """Sends a stream or string.
+
+ Returns the number of bytes sent.
+ """
+ sent = 0
+ if hasattr(s, 'read'):
+ # s is a stream.
+ copy_bytes = self.adj.copy_bytes
+ while 1:
+ data = s.read(copy_bytes)
+ if not data:
+ break
+ sent += len(data)
+ self.write(data)
+ else:
+ # s is a string.
+ sent = len(s)
+ self.write(s)
+ return sent
+
+ def readable(self):
+ return not self.connected
+
+ def handle_connect(self):
+ pass
+
+ def handle_comm_error(self):
+ if self.adj.log_socket_errors:
+ self.handle_error()
+ else:
+ self.close()
+
+ def close (self):
+ c = self.cmd_channel
+ s = c.server
+ c.client_dc = None
+ #s.total_files_out += 1
+ #s.total_bytes_out += self.bytes_out
+ if not len(self.outbuf):
+ # All data transferred
+ c.reply('TRANS_SUCCESS')
+ else:
+ # Not all data transferred
+ c.reply('TRANSFER_ABORTED')
+ del c
+ del s
+ del self.cmd_channel
+ ChannelBaseClass.close(self)
+