[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/FTP - FTPServerChannel.py:1.1.2.14 RecvChannel.py:1.1.2.5 XmitChannel.py:1.1.2.4

Shane Hathaway shane@cvs.zope.org
Fri, 5 Apr 2002 18:09:14 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server/FTP
In directory cvs.zope.org:/tmp/cvs-serv28562

Modified Files:
      Tag: Zope3-Server-Branch
	FTPServerChannel.py RecvChannel.py XmitChannel.py 
Log Message:
Primarily made sure file transfers don't span threads, while avoiding blocking
the application and flushing buffers as quickly as possible.  Fixed bugs
and refactored some.


=== Zope3/lib/python/Zope/Server/FTP/FTPServerChannel.py 1.1.2.13 => 1.1.2.14 ===
 from Zope.Server.LineReceiver.LineServerChannel import LineServerChannel
 from FTPStatusMessages import status_msgs
-from FileProducer import FileProducer
 
 from IFTPCommandHandler import IFTPCommandHandler
 from PassiveAcceptor import PassiveAcceptor
@@ -74,13 +73,10 @@
 
         self.client_addr = (addr[0], 21)
 
-        self.active_channels = {}        # Class-specific channel tracker
-        self.next_channel_cleanup = [0]  # Class-specific cleanup time
-
         self.passive_acceptor = None
         self.client_dc = None
 
-        self.transfer_mode = 'i'  # Use binary by default
+        self.transfer_mode = 'a'  # Have to default to ASCII :-|
         self.passive_mode = 0
         self.cwd = '/'
         self._rnfr = None
@@ -104,7 +100,7 @@
 
     def cmd_appe (self, args):
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
-        return self.cmd_stor(args, 'ab')
+        return self.cmd_stor(args, 'a')
 
 
     def cmd_cdup(self, args):
@@ -150,19 +146,21 @@
         self.reply('HELP_END')
 
 
-    def cmd_list(self, args):
+    def cmd_list(self, args, long=1):
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
         args = args.split()
         try:
-            producer = self.getDirectoryList(args, 1)
+            s = self.getDirectoryList(args, long)
         except os.error, why:
-            self.reply('ERR_NO_LIST', repr(why))
+            self.reply('ERR_NO_LIST', str(why))
             return
-
-        self.reply('OPEN_DATA_CONN', self.type_map[self.transfer_mode])
-        self.createXmitChannel()
-        self.client_dc.push_with_producer(producer)
-        self.client_dc.close_when_done()
+        try:
+            self.reply('OPEN_DATA_CONN', self.type_map[self.transfer_mode])
+            self.openDataChannel(XmitChannel, s)
+        finally:
+            if hasattr(s, 'close'):
+                # Close the stream.
+                s.close()
 
 
     def cmd_mdtm(self, args):
@@ -202,21 +200,7 @@
 
     def cmd_nlst(self, args):
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
-        # ncftp adds the -FC argument for the user-visible 'nlist'
-        # command.  We could try to emulate ls flags, but not just yet.
-        args = args.split()
-        if '-FC' in args:
-            args.remove('-FC')
-        try:
-            producer = self.getDirectoryList(args, 0)
-        except os.error, why:
-            self.reply('ERR_NO_LIST', repr(why))
-            return
-
-        self.createXmitChannel()
-        self.client_dc.push_with_producer(producer)
-        self.client_dc.close_when_done()
-        self.reply('OPEN_DATA_CONN', (self.type_map[self.transfer_mode],))
+        self.cmd_list(args, 0)
 
 
     def cmd_noop(self, args):
@@ -267,7 +251,7 @@
 
     def cmd_pwd(self, args):
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
-        self.reply('SUCCESS_257', self.cwd)
+        self.reply('ALREADY_CURRENT', self.cwd)
 
 
     def cmd_quit(self, args):
@@ -284,34 +268,34 @@
             path = self._generatePath(args)
 
             if not self.server.filesystem.isfile(path):
-                #self.log_info ('checking %s' % file)
-                self.reply(550, 2, path)
+                self.reply('ERR_IS_NOT_FILE', path)
             else:
                 try:
-                    # FIXME: for some reason, 'rt' isn't working on win95
-                    mode = 'r'+self.type_mode_map[self.transfer_mode]
+                    mode = 'r' + self.type_mode_map[self.transfer_mode]
                     fd = self.server.filesystem.open(path, mode)
                 except IOError, why:
-                    self.reply('ERR_OPEN_READ', repre(why))
+                    self.reply('ERR_OPEN_READ', str(why))
                     return
-                self.reply('OPEN_CONN',
-                           (self.type_map[self.transfer_mode], path) )
-                self.createXmitChannel()
-
-                if self.restart_position:
-                    # try to position the file as requested, but
-                    # give up silently on failure (the 'file object'
-                    # may not support seek())
-                    try:
-                        fd.seek(self.restart_position)
-                    except:
-                        pass
-                    self.restart_position = 0
-
-                self.client_dc.push_with_producer (
-                        FileProducer(self.server, self.client_dc, fd)
-                        )
-                self.client_dc.close_when_done()
+                try:
+                    self.reply('OPEN_CONN',
+                               (self.type_map[self.transfer_mode], path) )
+
+                    if self.restart_position:
+                        # try to position the file as requested, but
+                        # give up silently on failure (the 'file object'
+                        # may not support seek())
+                        try:
+                            fd.seek(self.restart_position)
+                        except:
+                            pass
+                        self.restart_position = 0
+
+                    self.openDataChannel(XmitChannel, fd)
+
+                finally:
+                    if hasattr(fd, 'close'):
+                        # Close the stream.
+                        fd.close()
 
 
     def cmd_rest(self, args):
@@ -370,7 +354,7 @@
                        self.server.filesystem.stat(path)[stat.ST_SIZE])
 
 
-    def cmd_stor(self, args, mode='wb'):
+    def cmd_stor(self, args, write_mode='w'):
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
         if not args:
             self.reply('CMD_UNKNOWN', 'STOR')
@@ -381,14 +365,44 @@
                 restart_position = 0
                 self.reply('ERR_RESTART_STOR')
                 return
-            # todo: handle that type flag
+            mode = write_mode + self.type_mode_map[self.transfer_mode]
+
             try:
-                fd = self.server.filesystem.open(args, mode)
+                # Optionally verify the file can be opened,
+                # but don't open it yet.  The actually write
+                # should be transactional without holding up the
+                # application.
+                fs = self.server.filesystem
+                if hasattr(fs, 'check_open'):
+                    fs.check_open(path, mode)
             except IOError, why:
-                self.reply('ERR_OPEN_WRITE', repr(why))
+                self.reply('ERR_OPEN_WRITE', str(why))
                 return
             self.reply('OPEN_CONN', (self.type_map[self.transfer_mode], file) )
-            self.createRecvChannel(fd)
+            self.openDataChannel(RecvChannel, (path, mode))
+
+
+    def finishedRecv(self, buffer, path, mode):
+        """Called by RecvChannel when the transfer is finished."""
+        # Always called in a task.
+        try:
+            outfile = self.server.filesystem.open(path, mode)
+            try:
+                copy_bytes = self.adj.copy_bytes
+                while 1:
+                    data = buffer.get(copy_bytes)
+                    if not data:
+                        break
+                    buffer.skip(len(data), 1)
+                    outfile.write(data)
+            finally:
+                print 'cp4'
+                outfile.close()
+                print 'cp5'
+        except IOError, why:
+            self.reply('ERR_OPEN_WRITE', str(why))
+            return
+        self.reply('TRANS_SUCCESS')
 
 
     def cmd_stru(self, args):
@@ -418,8 +432,7 @@
             self.reply('WRONG_BYTE_SIZE')
 
         else:
-            if t == 'a':
-                self.transfer_mode = t
+            self.transfer_mode = t
             self.reply('TYPE_SET_OK', self.type_map[t])
 
 
@@ -454,7 +467,7 @@
 
 
     def listdir (self, path, long=0):
-        """returns a producer"""
+        """returns a string or stream"""
         return self.server.filesystem.listdir(path, long)
 
 
@@ -476,66 +489,33 @@
         return self.listdir(dir, long)
 
 
-    def createXmitChannel(self):
-        """In PASV mode, the connection may or may _not_ have been
-           made yet.  [although in most cases it is... FTP Explorer
-           being the only exception I've yet seen].  This gets
-           somewhat confusing because things may happen in any
-           order...
-        """
+    def openDataChannel(self, class_, *args):
         pa = self.passive_acceptor
         if pa:
+            # PASV mode.
             if pa.ready:
                 # a connection has already been made.
-                conn, addr = self.passive_acceptor.ready
-                cdc = XmitChannel(self, addr)
-                cdc.set_socket(conn)
+                conn, addr = pa.ready
+                cdc = class_(self, addr, *args)
+                cdc.set_socket (conn)
                 cdc.connected = 1
                 self.passive_acceptor.close()
                 self.passive_acceptor = None
             else:
                 # we're still waiting for a connect to the PASV port.
-                cdc = XmitChannel(self)
-
+                # FTP Explorer is known to do this.
+                cdc = class_(self, None, *args)
         else:
             # not in PASV mode.
             ip, port = self.client_addr
-            cdc = XmitChannel(self, self.client_addr)
+            cdc = class_(self, self.client_addr, *args)
             cdc.create_socket(socket.AF_INET, socket.SOCK_STREAM)
             if self.bind_local_minus_one:
                 cdc.bind(('', self.server.port - 1))
             try:
-                cdc.connect ((ip, port))
+                cdc.connect((ip, port))
             except socket.error, why:
                 self.reply(425)
 
         self.client_dc = cdc
 
-
-    # pretty much the same as xmit, but only right on the verge of
-    # being worth a merge.
-    def createRecvChannel(self, fd):
-        pa = self.passive_acceptor
-        if pa:
-            if pa.ready:
-                # a connection has already been made.
-                conn, addr = pa.ready
-                cdc = RecvChannel(self, addr, fd)
-                cdc.set_socket (conn)
-                cdc.connected = 1
-                self.passive_acceptor.close()
-                self.passive_acceptor = None
-            else:
-                # we're still waiting for a connect to the PASV port.
-                cdc = RecvChannel(self, None, fd)
-        else:
-            # not in PASV mode.
-            ip, port = self.client_addr
-            cdc = RecvChannel(self, self.client_addr, fd)
-            cdc.create_socket(socket.AF_INET, socket.SOCK_STREAM)
-            try:
-                cdc.connect ((ip, port))
-            except socket.error, why:
-                self.reply(425)
-
-        self.client_dc = cdc


=== Zope3/lib/python/Zope/Server/FTP/RecvChannel.py 1.1.2.4 => 1.1.2.5 ===
 $Id$
 """
-import asyncore
-from Zope.Server.Counter import Counter
+from Zope.Server.ServerChannelBase import ChannelBaseClass
+from Zope.Server.Buffers import OverflowableBuffer
+from Zope.Server.ITask import ITask
 
-class RecvChannel(asyncore.dispatcher):
+
+class RecvChannel(ChannelBaseClass):
     """ """
 
-    def __init__ (self, channel, client_addr, fd):
-        self.channel = channel
+    def __init__ (self, cmd_channel, client_addr, finish_args,
+                  adj=None, socket_map=None):
+        self.cmd_channel = cmd_channel
         self.client_addr = client_addr
-        self.fd = fd
-        asyncore.dispatcher.__init__ (self)
-        self.bytes_in = Counter()
-
+        self.finish_args = finish_args
+        ChannelBaseClass.__init__(self, None, None, adj, socket_map)
+        self.inbuf = OverflowableBuffer(self.adj.inbuf_overflow)
 
-    def log (self, *ignore):
-        pass
+##    def log (self, *ignore):
+##        pass
 
+    def writable (self):
+        return 0
 
     def handle_connect (self):
         pass
 
+    def received (self, data):
+        self.inbuf.append(data)
+
+    def handle_close (self):
+        c = self.cmd_channel
+        task = FinishedRecvTask(c, self.inbuf, self.finish_args)
+        self.close()
+        self.cmd_channel = None
+        c.start_task(task)
 
-    def writable (self):
-        return 0
 
 
-    def recv (*args):
-        result = apply (asyncore.dispatcher.recv, args)
-        self = args[0]
-        self.bytes_in.increment(len(result))
-        return result
+class FinishedRecvTask:
 
+    __implements__ = ITask
 
-    buffer_size = 8192
+    def __init__(self, cmd_channel, inbuf, args):
+        self.cmd_channel = cmd_channel
+        self.inbuf = inbuf
+        self.args = args
 
 
-    def handle_read (self):
-        block = self.recv (self.buffer_size)
-        if block:
+    ############################################################
+    # Implementation methods for interface
+    # Zope.Server.ITask
+
+    def service(self):
+        """Called to execute the task.
+        """
+        close_on_finish = 0
+        c = self.cmd_channel
+        try:
             try:
-                self.fd.write (block)
-            except IOError:
-                self.log_info ('got exception writing block...', 'error')
+                c.finishedRecv(self.inbuf, *self.args)
+            except socket.error:
+                close_on_finish = 1
+                if c.adj.log_socket_errors:
+                    raise
+        finally:
+            c.end_task(close_on_finish)
+
+
+    def cancel(self):
+        'See Zope.Server.ITask.ITask'
+        self.cmd_channel.close_when_done()
 
 
-    def handle_close (self):
-        s = self.channel.server
-        s.total_files_in.increment()
-        s.total_bytes_in.increment(self.bytes_in.as_long())
-        self.fd.close()
-        self.channel.reply('TRANS_SUCCESS')
-        self.close()
+    def defer(self):
+        'See Zope.Server.ITask.ITask'
+        pass
+
+    #
+    ############################################################
+


=== Zope3/lib/python/Zope/Server/FTP/XmitChannel.py 1.1.2.3 => 1.1.2.4 ===
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""
-
-$Id$
-"""
-
-import asynchat
-
-
-class XmitChannel(asynchat.async_chat, object):
-
-    # for an ethernet, you want this to be fairly large, in fact, it
-    # _must_ be large for performance comparable to an ftpd.  [64k] we
-    # ought to investigate automatically-sized buffers...
-    ac_out_buffer_size = 16384
-
-    bytes_out = 0
-
-    def __init__ (self, channel, client_addr=None):
-        self.channel = channel
-        self.client_addr = client_addr
-        super(XmitChannel, self).__init__()
-
-
-    def log (*args):
-        pass
-
-
-    def readable (self):
-        return not self.connected
-
-
-    def writable (self):
-        return 1
-
-
-    def send (self, data):
-        result = super(XmitChannel, self).send(data)
-        self.bytes_out = self.bytes_out + result
-        return result
-
-
-    def handle_error (self):
-        # usually this is to catch an unexpected disconnect.
-        # XXX: Helpfule for debugging
-        import traceback
-        traceback.print_exc()
-        self.log_info ('unexpected disconnect on data xmit channel', 'error')
-        try:
-            self.close()
-        except:
-            pass
-
-    # TODO: there's a better way to do this.  we need to be able to
-    # put 'events' in the producer fifo.  to do this cleanly we need
-    # to reposition the 'producer' fifo as an 'event' fifo.
-
-    # dummy function to suppress warnings caused by some FTP clients
-    def handle_connect(self):
-        pass
-
-
-    def close (self):
-        c = self.channel
-        s = c.server
-        c.client_dc = None
-        s.total_files_out.increment()
-        s.total_bytes_out.increment (self.bytes_out)
-        if not len(self.producer_fifo):
-            c.reply('TRANS_SUCCESS')
-        elif not c.closed:
-            c.reply('TRANSFER_ABORTED')
-        del c
-        del s
-        del self.channel
-        asynchat.async_chat.close(self)
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+from Zope.Server.ServerChannelBase import ChannelBaseClass
+
+
+class XmitChannel(ChannelBaseClass):
+
+    def __init__ (self, cmd_channel, client_addr, s,
+                  adj=None, socket_map=None):
+        self.cmd_channel = cmd_channel
+        self.client_addr = client_addr
+        ChannelBaseClass.__init__(self, None, None, adj, socket_map)
+        # Send the stream and close when finished.
+        self.writeAll(s)
+        self.close_when_done()
+
+    def writeAll(self, s):
+        """Sends a stream or string.
+
+        Returns the number of bytes sent.
+        """
+        sent = 0
+        if hasattr(s, 'read'):
+            # s is a stream.
+            copy_bytes = self.adj.copy_bytes
+            while 1:
+                data = s.read(copy_bytes)
+                if not data:
+                    break
+                sent += len(data)
+                self.write(data)
+        else:
+            # s is a string.
+            sent = len(s)
+            self.write(s)
+        return sent
+
+    def readable(self):
+        return not self.connected
+
+    def handle_connect(self):
+        pass
+
+    def handle_comm_error(self):
+        if self.adj.log_socket_errors:
+            self.handle_error()
+        else:
+            self.close()
+
+    def close (self):
+        c = self.cmd_channel
+        s = c.server
+        c.client_dc = None
+        #s.total_files_out += 1
+        #s.total_bytes_out += self.bytes_out
+        if not len(self.outbuf):
+            # All data transferred
+            c.reply('TRANS_SUCCESS')
+        else:
+            # Not all data transferred
+            c.reply('TRANSFER_ABORTED')
+        del c
+        del s
+        del self.cmd_channel
+        ChannelBaseClass.close(self)
+