[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/FTP/tests - __init__.py:1.2 testFTPServer.py:1.2

Jim Fulton jim@zope.com
Mon, 10 Jun 2002 19:30:06 -0400


Update of /cvs-repository/Zope3/lib/python/Zope/Server/FTP/tests
In directory cvs.zope.org:/tmp/cvs-serv20468/lib/python/Zope/Server/FTP/tests

Added Files:
	__init__.py testFTPServer.py 
Log Message:
Merged Zope-3x-branch into newly forked Zope3 CVS Tree.

=== Zope3/lib/python/Zope/Server/FTP/tests/__init__.py 1.1 => 1.2 ===
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+
+""" Unit tests for Zope.Server """


=== Zope3/lib/python/Zope/Server/FTP/tests/testFTPServer.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import unittest
+import tempfile
+import os
+from asyncore import socket_map, poll
+import socket
+from types import StringType
+from StringIO import StringIO
+
+from threading import Thread
+from Zope.Server.TaskThreads import ThreadedTaskDispatcher
+from Zope.Server.FTP.FTPServer import FTPServer
+from Zope.Server.FTP.FTPStatusMessages import status_msgs
+from Zope.Server.Adjustments import Adjustments
+from Zope.Server.ITask import ITask
+
+from Zope.Server.VFS.OSFileSystem import OSFileSystem
+from Zope.Server.FTP.TestFilesystemAccess import TestFilesystemAccess
+
+import ftplib
+
+from time import sleep, time
+
+td = ThreadedTaskDispatcher()
+
+LOCALHOST = '127.0.0.1'
+SERVER_PORT = 0      # Set these port numbers to 0 to auto-bind, or
+CONNECT_TO_PORT = 0  # use specific numbers to inspect using TCPWatch.
+
+
+my_adj = Adjustments()
+
+
+def retrlines(ftpconn, cmd):
+    res = []
+    ftpconn.retrlines(cmd, res.append)
+    return ''.join(res)
+
+
+class Tests(unittest.TestCase):
+
+    def setUp(self):
+        td.setThreadCount(1)
+        self.orig_map_size = len(socket_map)
+
+        self.root_dir = tempfile.mktemp()
+        os.mkdir(self.root_dir)
+        os.mkdir(os.path.join(self.root_dir, 'test'))
+
+        fs = OSFileSystem(self.root_dir)
+        fs_access = TestFilesystemAccess(fs)
+
+        self.server = FTPServer(LOCALHOST, SERVER_PORT, fs_access,
+                                task_dispatcher=td, adj=my_adj)
+        if CONNECT_TO_PORT == 0:
+            self.port = self.server.socket.getsockname()[1]
+        else:
+            self.port = CONNECT_TO_PORT
+        self.run_loop = 1
+        self.counter = 0
+        self.thread = Thread(target=self.loop)
+        self.thread.start()
+        sleep(0.1)  # Give the thread some time to start.
+
+
+    def tearDown(self):
+        self.run_loop = 0
+        self.thread.join()
+        td.shutdown()
+        self.server.close()
+        # Make sure all sockets get closed by asyncore normally.
+        timeout = time() + 2
+        while 1:
+            if len(socket_map) == self.orig_map_size:
+                # Clean!
+                break
+            if time() >= timeout:
+                self.fail('Leaked a socket: %s' % `socket_map`)
+                break
+            poll(0.1, socket_map)
+
+
+    def loop(self):
+        while self.run_loop:
+            self.counter = self.counter + 1
+            # print 'loop', self.counter
+            poll(0.1, socket_map)
+
+
+    def getFTPConnection(self, login=1):
+        ftp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        ftp.connect((LOCALHOST, self.port))
+        result = ftp.recv(10000).split()[0]
+        self.assertEqual(result, '220')
+        if login:
+            ftp.send('USER foo\r\n')
+            self.assertEqual(ftp.recv(1024),
+                             status_msgs['PASS_REQUIRED'] +'\r\n')
+            ftp.send('PASS bar\r\n')
+            self.assertEqual(ftp.recv(1024),
+                             status_msgs['LOGIN_SUCCESS'] +'\r\n')
+
+        return ftp
+
+
+    def execute(self, commands, login=1):
+        ftp = self.getFTPConnection(login)
+
+        try:
+            if type(commands) is StringType:
+                commands = (commands,)
+
+            for command in commands:
+                ftp.send('%s\r\n' %command)
+                result = ftp.recv(10000)
+
+            self.failUnless(result.endswith('\r\n'))
+        finally:
+            ftp.close()
+        return result.split('\r\n')[0]
+
+
+    def testABOR(self):
+        self.assertEqual(self.execute('ABOR', 1),
+                         status_msgs['TRANSFER_ABORTED'])
+
+
+    def testCDUP(self):
+        self.assertEqual(self.execute(('CWD test', 'CDUP'), 1),
+                         status_msgs['SUCCESS_250'] %'CDUP')
+        self.assertEqual(self.execute('CDUP', 1),
+                         status_msgs['SUCCESS_250'] %'CDUP')
+
+
+    def testCWD(self):
+        self.assertEqual(self.execute('CWD test', 1),
+                         status_msgs['SUCCESS_250'] %'CWD')
+        self.assertEqual(self.execute('CWD foo', 1),
+                         status_msgs['ERR_NO_DIR'] %'/foo')
+
+
+    def testDELE(self):
+        open(os.path.join(self.root_dir, 'foo'), 'w').write('blah')
+        self.assertEqual(self.execute('DELE foo', 1),
+                         status_msgs['SUCCESS_250'] %'DELE')
+        res = self.execute('DELE bar', 1).split()[0]
+        self.assertEqual(res, '550')
+        self.assertEqual(self.execute('DELE', 1),
+                         status_msgs['ERR_ARGS'])
+
+
+    def testHELP(self):
+        result = status_msgs['HELP_START'] #+ '\r\n'
+        #result += 'Help goes here somewhen.\r\n'
+        #result += status_msgs['HELP_END']
+
+        self.assertEqual(self.execute('HELP', 1), result)
+
+
+    def testLIST(self):
+        conn = ftplib.FTP()
+        try:
+            conn.connect(LOCALHOST, self.port)
+            conn.login('foo', 'bar')
+            self.assertRaises(ftplib.Error, retrlines, conn, 'LIST /foo')
+            listing = retrlines(conn, 'LIST')
+            self.assert_(len(listing) > 0)
+        finally:
+            conn.close()
+        # Make sure no garbage was left behind.
+        self.testNOOP()
+
+
+    def testNOOP(self):
+        self.assertEqual(self.execute('NOOP', 0),
+                         status_msgs['SUCCESS_200'] %'NOOP')
+        self.assertEqual(self.execute('NOOP', 1),
+                         status_msgs['SUCCESS_200'] %'NOOP')
+
+
+    def testPASS(self):
+        self.assertEqual(self.execute('PASS', 0),
+                         status_msgs['LOGIN_MISMATCH'])
+        self.assertEqual(self.execute(('USER blah', 'PASS bar'), 0),
+                         status_msgs['LOGIN_MISMATCH'])
+
+
+    def testQUIT(self):
+        self.assertEqual(self.execute('QUIT', 0),
+                         status_msgs['GOODBYE'])
+        self.assertEqual(self.execute('QUIT', 1),
+                         status_msgs['GOODBYE'])
+
+
+    def testSTOR(self):
+        conn = ftplib.FTP()
+        try:
+            conn.connect(LOCALHOST, self.port)
+            conn.login('foo', 'bar')
+            fp = StringIO('Speak softly')
+            # Can't overwrite directory
+            self.assertRaises(
+                ftplib.error_perm, conn.storbinary, 'STOR /test', fp)
+            fp = StringIO('Charity never faileth')
+            # Successful write
+            conn.storbinary('STOR /stuff', fp)
+        finally:
+            conn.close()
+        # Make sure no garbage was left behind.
+        self.testNOOP()
+
+
+    def testUSER(self):
+        self.assertEqual(self.execute('USER foo', 0),
+                         status_msgs['PASS_REQUIRED'])
+        self.assertEqual(self.execute('USER', 0),
+                         status_msgs['ERR_ARGS'])
+
+
+
+def test_suite():
+    loader = unittest.TestLoader()
+    return loader.loadTestsFromTestCase(Tests)
+
+if __name__=='__main__':
+    unittest.TextTestRunner().run( test_suite() )