[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/FTP - FTPServerChannel.py:1.1.2.16 FTPStatusMessages.py:1.1.2.8 RecvChannel.py:1.1.2.6 XmitChannel.py:1.1.2.5

Shane Hathaway shane@cvs.zope.org
Mon, 8 Apr 2002 16:33:12 -0400


Update of /cvs-repository/Zope3/lib/python/Zope/Server/FTP
In directory cvs.zope.org:/tmp/cvs-serv18622

Modified Files:
      Tag: Zope3-Server-Branch
	FTPServerChannel.py FTPStatusMessages.py RecvChannel.py 
	XmitChannel.py 
Log Message:
- Used new readfile(), writefile(), and check_writable() methods.
  This involved some new complexity, since we can't send the 150 response
  until the VFS confirms the file is accessible, but it's better to have
  this complexity in the FTP server than in the VFS implementation, which
  open() would have required.

- Improved some status messages.

- Redid the LIST test, added a STOR test, and re-enabled the socket leak
  detection.
	  


=== Zope3/lib/python/Zope/Server/FTP/FTPServerChannel.py 1.1.2.15 => 1.1.2.16 ===
 import os
 import stat
+import sys
 import socket
 import time
 
@@ -27,7 +28,7 @@
 from IFTPCommandHandler import IFTPCommandHandler
 from PassiveAcceptor import PassiveAcceptor
 from RecvChannel import RecvChannel
-from XmitChannel import XmitChannel
+from XmitChannel import XmitChannel, ApplicationXmitStream
 
 
 class FTPServerChannel(LineServerChannel):
@@ -94,9 +95,10 @@
 
     def cmd_abor(self, args):
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
-        if self.client_dc:
-            self.client_dc.close()
-        self.reply('TRANSFER_ABORTED')
+        if self.client_dc is not None:
+            self.client_dc.close('TRANSFER_ABORTED')
+        else:
+            self.reply('TRANSFER_ABORTED')
 
 
     def cmd_appe (self, args):
@@ -127,17 +129,16 @@
     def cmd_dele(self, args):
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
         if not args:
-            self.reply('CMD_UNKNOWN', 'DELE')
-        else:
-            path = self._generatePath(args)
+            self.reply('ERR_ARGS')
+            return
+        path = self._generatePath(args)
 
-            try:
-                self.server.filesystem.unlink(path)
-                self.reply('SUCCESS_250', 'DELE')
-            except:
-                self.reply('ERR_DELETE_FILE')
-            else:
-                self.reply('NO_FILE', file)
+        try:
+            self.server.filesystem.remove(path)
+        except OSError, err:
+            self.reply('ERR_DELETE_FILE', str(err))
+        else:
+            self.reply('SUCCESS_250', 'DELE')
 
 
     def cmd_help(self, args):
@@ -152,16 +153,17 @@
         args = args.split()
         try:
             s = self.getDirectoryList(args, long)
-        except os.error, why:
-            self.reply('ERR_NO_LIST', str(why))
+        except OSError, err:
+            self.reply('ERR_NO_LIST', str(err))
             return
+        ok_reply = ('OPEN_DATA_CONN', self.type_map[self.transfer_mode])
+        cdc = XmitChannel(self, ok_reply)
+        self.client_dc = cdc
         try:
-            self.reply('OPEN_DATA_CONN', self.type_map[self.transfer_mode])
-            self.openDataChannel(XmitChannel, s)
-        finally:
-            if hasattr(s, 'close'):
-                # Close the stream.
-                s.close()
+            cdc.write(s)
+            cdc.close_when_done()
+        except OSError, err:
+            cdc.close('ERR_NO_LIST', str(err))
 
 
     def cmd_mdtm(self, args):
@@ -180,15 +182,15 @@
     def cmd_mkd(self, args):
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
         if not args:
-            self.reply('CMD_UNKNOWN', 'MKD')
+            self.reply('ERR_ARGS')
+            return
+        path = self._generatePath(args)
+        try:
+            self.server.filesystem.mkdir(path)
+        except OSError, err:
+            self.reply('ERR_CREATE_DIR', str(err))
         else:
-            path = self._generatePath(args)
-
-            try:
-                self.server.filesystem.mkdir(path)
-                self.reply('SUCCESS_257', 'MKD')
-            except:
-                self.reply('ERR_CREATE_DIR')
+            self.reply('SUCCESS_257', 'MKD')
 
 
     def cmd_mode(self, args):
@@ -236,18 +238,15 @@
     def cmd_port(self, args):
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
 
-        try:
-            info = args.split(',')
-            ip = '.'.join(info[:4])
-            port = int(info[4])*256 + int(info[5])
-            # how many data connections at a time?
-            # I'm assuming one for now...
-            # XXX: we should (optionally) verify that the
-            # ip number belongs to the client.  [wu-ftpd does this?]
-            self.client_addr = (ip, port)
-            self.reply('SUCCESS_200', 'PORT')
-        except:
-            return self.reply('CMD_UNKNOWN', 'PORT')
+        info = args.split(',')
+        ip = '.'.join(info[:4])
+        port = int(info[4])*256 + int(info[5])
+        # how many data connections at a time?
+        # I'm assuming one for now...
+        # XXX: we should (optionally) verify that the
+        # ip number belongs to the client.  [wu-ftpd does this?]
+        self.client_addr = (ip, port)
+        self.reply('SUCCESS_200', 'PORT')
 
 
     def cmd_pwd(self, args):
@@ -265,38 +264,32 @@
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
         if not args:
             self.reply('CMD_UNKNOWN', 'RETR')
-        else:
-            path = self._generatePath(args)
+        path = self._generatePath(args)
 
-            if not self.server.filesystem.isfile(path):
-                self.reply('ERR_IS_NOT_FILE', path)
-            else:
-                try:
-                    mode = 'r' + self.type_mode_map[self.transfer_mode]
-                    fd = self.server.filesystem.open(path, mode)
-                except IOError, why:
-                    self.reply('ERR_OPEN_READ', str(why))
-                    return
-                try:
-                    self.reply('OPEN_CONN',
-                               (self.type_map[self.transfer_mode], path) )
-
-                    if self.restart_position:
-                        # try to position the file as requested, but
-                        # give up silently on failure (the 'file object'
-                        # may not support seek())
-                        try:
-                            fd.seek(self.restart_position)
-                        except:
-                            pass
-                        self.restart_position = 0
-
-                    self.openDataChannel(XmitChannel, fd)
-
-                finally:
-                    if hasattr(fd, 'close'):
-                        # Close the stream.
-                        fd.close()
+        if not self.server.filesystem.isfile(path):
+            self.reply('ERR_IS_NOT_FILE', path)
+            return
+
+        mode = 'r' + self.type_mode_map[self.transfer_mode]
+        start = 0
+        if self.restart_position:
+            start = self.restart_position
+            self.restart_position = 0
+
+        ok_reply = (
+            'OPEN_CONN', (self.type_map[self.transfer_mode], path) )
+        cdc = XmitChannel(self, ok_reply)
+        self.client_dc = cdc
+        outstream = ApplicationXmitStream(cdc)
+
+        try:
+            self.server.filesystem.readfile(
+                path, mode, outstream, start)
+            cdc.close_when_done()
+        except OSError, err:
+            cdc.close('ERR_OPEN_READ', str(err))
+        except IOError, err:
+            cdc.close('ERR_IO', str(err))
 
 
     def cmd_rest(self, args):
@@ -304,7 +297,8 @@
         try:
             pos = int(args)
         except ValueError:
-            self.reply('CMD_UNKNWON', 'REST')
+            self.reply('ERR_ARGS')
+            return
         self.restart_position = pos
         self.reply('RESTART_TRANSFER', pos)
 
@@ -312,14 +306,15 @@
     def cmd_rmd(self, args):
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
         if not args:
-            self.reply('CMD_UNKNOWN', 'RMD')
+            self.reply('ERR_ARGS')
+            return
+        path = self._generatePath(args)
+        try:
+            self.server.filesystem.rmdir(path)
+        except OSError, err:
+            self.reply('ERR_DELETE_DIR', str(err))
         else:
-            path = self._generatePath(args)
-            try:
-                self.server.filesystem.rmdir(path)
-                self.reply('SUCCESS_250', 'RMD')
-            except:
-                self.reply('ERR_DELETE_DIR')
+            self.reply('SUCCESS_250', 'RMD')
 
 
     def cmd_rnfr(self, args):
@@ -339,9 +334,10 @@
             self.reply('ERR_RENAME')
         try:
             self.server.filesystem.rename(self._rnfr, path)
+        except OSError, err:
+            self.reply('ERR_RENAME', (self._rnfr, rnto, str(err)))
+        else:
             self.reply('SUCCESS_250', 'RNTO')
-        except:
-            self.reply('ERR_RENAME', (self._rnfr, rnto))
         self._rnfr = None
 
 
@@ -358,50 +354,49 @@
     def cmd_stor(self, args, write_mode='w'):
         'See Zope.Server.FTP.IFTPCommandHandler.IFTPCommandHandler'
         if not args:
-            self.reply('CMD_UNKNOWN', 'STOR')
-        else:
-            path = self._generatePath(args)
+            self.reply('ERR_ARGS')
+            return
+        path = self._generatePath(args)
 
-            if self.restart_position:
-                restart_position = 0
-                self.reply('ERR_RESTART_STOR')
-                return
-            mode = write_mode + self.type_mode_map[self.transfer_mode]
+        start = 0
+        if self.restart_position:
+            self.start = self.restart_position
+            restart_position = 0
+        mode = write_mode + self.type_mode_map[self.transfer_mode]
 
-            try:
-                # Optionally verify the file can be opened,
-                # but don't open it yet.  The actually write
-                # should be transactional without holding up the
-                # application.
-                fs = self.server.filesystem
-                if hasattr(fs, 'check_open'):
-                    fs.check_open(path, mode)
-            except IOError, why:
-                self.reply('ERR_OPEN_WRITE', str(why))
-                return
-            self.reply('OPEN_CONN', (self.type_map[self.transfer_mode], file) )
-            self.openDataChannel(RecvChannel, (path, mode))
+        try:
+            # Verify the file can be opened, but don't open it yet.
+            # The actually write should be transactional without
+            # holding up the application.
+            fs = self.server.filesystem
+            fs.check_writable(path, mode)
+        except OSError, err:
+            self.reply('ERR_OPEN_WRITE', str(err))
+            return
+        except IOError, err:
+            self.reply('ERR_IO', str(err))
+            return
+        cdc = RecvChannel(self, (path, mode, start))
+        self.client_dc = cdc
+        self.reply('OPEN_CONN', (self.type_map[self.transfer_mode], path))
+        self.connectDataChannel(cdc)
 
 
-    def finishedRecv(self, buffer, path, mode):
+    def finishedRecv(self, buffer, (path, mode, start)):
         """Called by RecvChannel when the transfer is finished."""
         # Always called in a task.
         try:
-            outfile = self.server.filesystem.open(path, mode)
-            try:
-                copy_bytes = self.adj.copy_bytes
-                while 1:
-                    data = buffer.get(copy_bytes)
-                    if not data:
-                        break
-                    buffer.skip(len(data), 1)
-                    outfile.write(data)
-            finally:
-                outfile.close()
-        except IOError, why:
-            self.reply('ERR_OPEN_WRITE', str(why))
-            return
-        self.reply('TRANS_SUCCESS')
+            infile = buffer.getfile()
+            infile.seek(0)
+            self.server.filesystem.writefile(path, mode, infile, start)
+        except OSError, err:
+            self.reply('ERR_OPEN_WRITE', str(err))
+        except IOError, err:
+            self.reply('ERR_IO', str(err))
+        except:
+            self.exception()
+        else:
+            self.reply('TRANS_SUCCESS')
 
 
     def cmd_stru(self, args):
@@ -425,7 +420,7 @@
         # no support for EBCDIC
         # if t not in ['a','e','i','l']:
         if t not in ['a','i','l']:
-            self.reply('CMD_UNKNOWN', 'TYPE')
+            self.reply('ERR_ARGS')
 
         elif t == 'l' and (len(args) > 2 and args[2] != '8'):
             self.reply('WRONG_BYTE_SIZE')
@@ -441,7 +436,7 @@
             self.username = args
             self.reply('PASS_REQUIRED')
         else:
-            self.reply('CMD_UNKNOWN', 'USER')
+            self.reply('ERR_ARGS')
 
     #
     ############################################################
@@ -466,8 +461,12 @@
 
 
     def listdir (self, path, long=0):
-        """returns a string or stream"""
-        return self.server.filesystem.listdir(path, long)
+        """returns a string"""
+        res = self.server.filesystem.listdir(path, long)
+        if hasattr(res, 'read'):
+            # Dump the stream.
+            res = res.read()
+        return res
 
 
     def getDirectoryList(self, args, long=0):
@@ -488,33 +487,34 @@
         return self.listdir(dir, long)
 
 
-    def openDataChannel(self, class_, *args):
+    def connectDataChannel(self, cdc):
         pa = self.passive_acceptor
         if pa:
             # PASV mode.
             if pa.ready:
                 # a connection has already been made.
                 conn, addr = pa.ready
-                cdc = class_(self, addr, *args)
                 cdc.set_socket (conn)
                 cdc.connected = 1
                 self.passive_acceptor.close()
                 self.passive_acceptor = None
-            else:
-                # we're still waiting for a connect to the PASV port.
-                # FTP Explorer is known to do this.
-                cdc = class_(self, None, *args)
+            # else we're still waiting for a connect to the PASV port.
+            # FTP Explorer is known to do this.
         else:
             # not in PASV mode.
             ip, port = self.client_addr
-            cdc = class_(self, self.client_addr, *args)
             cdc.create_socket(socket.AF_INET, socket.SOCK_STREAM)
             if self.bind_local_minus_one:
                 cdc.bind(('', self.server.port - 1))
             try:
                 cdc.connect((ip, port))
-            except socket.error, why:
-                self.reply(425)
+            except socket.error, err:
+                cdc.close('NO_DATA_CONN')
 
-        self.client_dc = cdc
+
+    def notifyClientDCClosing(self, *reply_args):
+        if self.client_dc is not None:
+            self.client_dc = None
+            if reply_args:
+                self.reply(*reply_args)
 


=== Zope3/lib/python/Zope/Server/FTP/FTPStatusMessages.py 1.1.2.7 => 1.1.2.8 ===
     'TRANSFER_ABORTED' : '426 Connection closed; transfer aborted.',
     'CMD_UNKNOWN'      : "500 '%s': command not understood.",
+    'INTERNAL_ERROR'   : "500 Internal error: %s",
+    'ERR_ARGS'         : '500 Bad command arguments',
     'MODE_UNKOWN'      : '502 Unimplemented MODE type',
     'WRONG_BYTE_SIZE'  : '504 Byte size must be 8',
     'STRU_UNKNOWN'     : '504 Unimplemented STRU type',
@@ -57,13 +59,13 @@
     'ERR_NO_FILE'      : '550 "%s": No such file.',
     'ERR_IS_NOT_FILE'  : '550 "%s": Is not a file',
     'ERR_CREATE_FILE'  : '550 Error creating file.',
-    'ERR_CREATE_DIR'   : '550 Error creating directory.',
-    'ERR_DELETE_FILE'  : '550 Error deleting file.',
-    'ERR_DELETE_DIR'   : '550 Error removing directory.',
+    'ERR_CREATE_DIR'   : '550 Error creating directory: %s',
+    'ERR_DELETE_FILE'  : '550 Error deleting file: %s',
+    'ERR_DELETE_DIR'   : '550 Error removing directory: %s',
     'ERR_OPEN_READ'    : '553 Could not open file for reading: %s',
     'ERR_OPEN_WRITE'   : '553 Could not open file for writing: %s',
-    'ERR_RESTART_STOR' : '553 Restart on STOR not yet supported',
-    'ERR_RENAME'       : '560 Could not rename "%s" to "%s".',
+    'ERR_IO'           : '553 I/O Error: %s',
+    'ERR_RENAME'       : '560 Could not rename "%s" to "%s": %s',
     'ERR_RNFR_SOURCE'  : '560 No source filename specify. Call RNFR first.',
     }
 


=== Zope3/lib/python/Zope/Server/FTP/RecvChannel.py 1.1.2.5 => 1.1.2.6 ===
     """ """
 
-    def __init__ (self, cmd_channel, client_addr, finish_args,
+    complete_transfer = 0
+    _fileno = None  # provide a default for asyncore.dispatcher._fileno
+
+    def __init__ (self, control_channel, finish_args,
                   adj=None, socket_map=None):
-        self.cmd_channel = cmd_channel
-        self.client_addr = client_addr
+        self.control_channel = control_channel
         self.finish_args = finish_args
+        self.inbuf = OverflowableBuffer(control_channel.adj.inbuf_overflow)
         ChannelBaseClass.__init__(self, None, None, adj, socket_map)
-        self.inbuf = OverflowableBuffer(self.adj.inbuf_overflow)
-
-##    def log (self, *ignore):
-##        pass
+        # Note that this channel starts out in async mode.
 
     def writable (self):
         return 0
@@ -41,25 +41,41 @@
         pass
 
     def received (self, data):
-        self.inbuf.append(data)
+        if data:
+            self.inbuf.append(data)
 
     def handle_close (self):
-        c = self.cmd_channel
+        """Client closed, indicating EOF."""
+        c = self.control_channel
         task = FinishedRecvTask(c, self.inbuf, self.finish_args)
+        self.complete_transfer = 1
         self.close()
-        self.cmd_channel = None
         c.start_task(task)
 
+    def close(self, *reply_args):
+        try:
+            c = self.control_channel
+            if c is not None:
+                self.control_channel = None
+                if not self.complete_transfer and not reply_args:
+                    # Not all data transferred
+                    reply_args = ('TRANSFER_ABORTED',)
+                c.notifyClientDCClosing(*reply_args)
+        finally:
+            if self.socket is not None:
+                # XXX asyncore.dispatcher.close() doesn't like socket == None
+                ChannelBaseClass.close(self)
+
 
 
 class FinishedRecvTask:
 
     __implements__ = ITask
 
-    def __init__(self, cmd_channel, inbuf, args):
-        self.cmd_channel = cmd_channel
+    def __init__(self, control_channel, inbuf, finish_args):
+        self.control_channel = control_channel
         self.inbuf = inbuf
-        self.args = args
+        self.finish_args = finish_args
 
 
     ############################################################
@@ -70,10 +86,10 @@
         """Called to execute the task.
         """
         close_on_finish = 0
-        c = self.cmd_channel
+        c = self.control_channel
         try:
             try:
-                c.finishedRecv(self.inbuf, *self.args)
+                c.finishedRecv(self.inbuf, self.finish_args)
             except socket.error:
                 close_on_finish = 1
                 if c.adj.log_socket_errors:
@@ -84,7 +100,7 @@
 
     def cancel(self):
         'See Zope.Server.ITask.ITask'
-        self.cmd_channel.close_when_done()
+        self.control_channel.close_when_done()
 
 
     def defer(self):


=== Zope3/lib/python/Zope/Server/FTP/XmitChannel.py 1.1.2.4 => 1.1.2.5 ===
 class XmitChannel(ChannelBaseClass):
 
-    def __init__ (self, cmd_channel, client_addr, s,
+    opened = 0
+    _fileno = None  # provide a default for asyncore.dispatcher._fileno
+
+    def __init__ (self, control_channel, ok_reply_args,
                   adj=None, socket_map=None):
-        self.cmd_channel = cmd_channel
-        self.client_addr = client_addr
+        self.control_channel = control_channel
+        self.ok_reply_args = ok_reply_args
+        self.set_sync()
         ChannelBaseClass.__init__(self, None, None, adj, socket_map)
-        # Send the stream and close when finished.
-        self.writeAll(s)
-        self.close_when_done()
-
-    def writeAll(self, s):
-        """Sends a stream or string.
-
-        Returns the number of bytes sent.
-        """
-        sent = 0
-        if hasattr(s, 'read'):
-            # s is a stream.
-            copy_bytes = self.adj.copy_bytes
-            while 1:
-                data = s.read(copy_bytes)
-                if not data:
-                    break
-                sent += len(data)
-                self.write(data)
-        else:
-            # s is a string.
-            sent = len(s)
-            self.write(s)
-        return sent
+
+    def _open(self):
+        """Signal the client to open the connection."""
+        self.opened = 1
+        self.control_channel.reply(*self.ok_reply_args)
+        self.control_channel.connectDataChannel(self)
+
+    def write(self, data):
+        if self.control_channel is None:
+            raise IOError, 'Client FTP connection closed'
+        if not self.opened:
+            self._open()
+        ChannelBaseClass.write(self, data)
 
     def readable(self):
         return not self.connected
 
+    def handle_read(self):
+        # This is only called when making the connection.
+        try:
+            self.recv(1)
+        except:
+            # The connection failed.
+            self.close('NO_DATA_CONN')
+
     def handle_connect(self):
         pass
 
     def handle_comm_error(self):
-        if self.adj.log_socket_errors:
-            self.handle_error()
-        else:
-            self.close()
-
-    def close (self):
-        c = self.cmd_channel
-        s = c.server
-        c.client_dc = None
-        #s.total_files_out += 1
-        #s.total_bytes_out += self.bytes_out
-        if not len(self.outbuf):
-            # All data transferred
-            c.reply('TRANS_SUCCESS')
-        else:
-            # Not all data transferred
-            c.reply('TRANSFER_ABORTED')
-        del c
-        del s
-        del self.cmd_channel
-        ChannelBaseClass.close(self)
+        self.close('TRANSFER_ABORTED')
+
+    def close(self, *reply_args):
+        try:
+            c = self.control_channel
+            if c is not None:
+                self.control_channel = None
+                if not reply_args:
+                    if not len(self.outbuf):
+                        # All data transferred
+                        if not self.opened:
+                            # Zero-length file
+                            self._open()
+                        reply_args = ('TRANS_SUCCESS',)
+                    else:
+                        # Not all data transferred
+                        reply_args = ('TRANSFER_ABORTED',)
+                c.notifyClientDCClosing(*reply_args)
+        finally:
+            if self.socket is not None:
+                # XXX asyncore.dispatcher.close() doesn't like socket == None
+                ChannelBaseClass.close(self)
+
+
+class ApplicationXmitStream:
+    """Provide stream output, remapping close() to close_when_done().
+    """
+
+    def __init__(self, xmit_channel):
+        self.write = xmit_channel.write
+        self.flush = xmit_channel.flush
+        self.close = xmit_channel.close_when_done