[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/FTP/tests - testFTPServer.py:1.1.2.3
Shane Hathaway
shane@cvs.zope.org
Mon, 8 Apr 2002 16:33:12 -0400
Update of /cvs-repository/Zope3/lib/python/Zope/Server/FTP/tests
In directory cvs.zope.org:/tmp/cvs-serv18622/tests
Modified Files:
Tag: Zope3-Server-Branch
testFTPServer.py
Log Message:
- Used new readfile(), writefile(), and check_writable() methods.
This involved some new complexity, since we can't send the 150 response
until the VFS confirms the file is accessible, but it's better to have
this complexity in the FTP server than in the VFS implementation, which
open() would have required.
- Improved some status messages.
- Redid the LIST test, added a STOR test, and re-enabled the socket leak
detection.
=== Zope3/lib/python/Zope/Server/FTP/tests/testFTPServer.py 1.1.2.2 => 1.1.2.3 ===
import socket
from types import StringType
+from StringIO import StringIO
from threading import Thread
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
@@ -50,6 +51,12 @@
my_adj.inbuf_overflow = 10000
+def retrlines(ftpconn, cmd):
+ res = []
+ ftpconn.retrlines(cmd, res.append)
+ return ''.join(res)
+
+
class Tests(unittest.TestCase):
def setUp(self):
@@ -81,15 +88,14 @@
self.server.close()
# Make sure all sockets get closed by asyncore normally.
timeout = time() + 5
- #while 1:
- # if len(socket_map) == self.orig_map_size:
- # # Clean!
- # break
- # if time() >= timeout:
- # print 'Leaked a socket: %s' % `socket_map`
- # break
- # #self.fail('Leaked a socket: %s' % `socket_map`)
- # poll(0.1, socket_map)
+ while 1:
+ if len(socket_map) == self.orig_map_size:
+ # Clean!
+ break
+ if time() >= timeout:
+ self.fail('Leaked a socket: %s' % `socket_map`)
+ break
+ poll(0.1, socket_map)
def loop(self):
@@ -119,16 +125,18 @@
def execute(self, commands, login=1):
ftp = self.getFTPConnection(login)
- if type(commands) is StringType:
- commands = (commands,)
-
- for command in commands:
- ftp.send('%s\r\n' %command)
- result = ftp.recv(10000)
- #print result[:-2]
-
- self.failUnless(result.endswith('\r\n'))
- ftp.close()
+ try:
+ if type(commands) is StringType:
+ commands = (commands,)
+
+ for command in commands:
+ ftp.send('%s\r\n' %command)
+ result = ftp.recv(10000)
+ #print result[:-2]
+
+ self.failUnless(result.endswith('\r\n'))
+ finally:
+ ftp.close()
return result[:-2]
@@ -155,10 +163,10 @@
open(os.path.join(self.root_dir, 'foo'), 'w').write('blah')
self.assertEqual(self.execute('DELE foo', 1),
status_msgs['SUCCESS_250'] %'DELE')
- self.assertEqual(self.execute('DELE bar', 1),
- status_msgs['ERR_DELETE_FILE'])
+ res = self.execute('DELE bar', 1).split()[0]
+ self.assertEqual(res, '550')
self.assertEqual(self.execute('DELE', 1),
- status_msgs['CMD_UNKNOWN'] %'DELE')
+ status_msgs['ERR_ARGS'])
def testHELP(self):
@@ -169,14 +177,18 @@
self.assertEqual(self.execute('HELP', 1), result)
- # def testLIST(self):
- # path = os.path.join(self.root_dir, 'foo')
- # result = "[Errno 2] No such file or directory: '%s'" %path
- #
- # self.assertEqual(self.execute('LIST /foo', 1),
- # status_msgs['ERR_NO_LIST'] %result)
- # self.assertEqual(self.execute('LIST', 1),
- # status_msgs['SUCCESS_200'] %'NOOP')
+ def testLIST(self):
+ conn = ftplib.FTP()
+ try:
+ conn.connect(LOCALHOST, self.port)
+ conn.login('foo', 'bar')
+ self.assertRaises(ftplib.Error, retrlines, conn, 'LIST /foo')
+ listing = retrlines(conn, 'LIST')
+ self.assert_(len(listing) > 0)
+ finally:
+ conn.close()
+ # Make sure no garbage was left behind.
+ self.testNOOP()
def testNOOP(self):
@@ -200,11 +212,29 @@
status_msgs['GOODBYE'])
+ def testSTOR(self):
+ conn = ftplib.FTP()
+ try:
+ conn.connect(LOCALHOST, self.port)
+ conn.login('foo', 'bar')
+ fp = StringIO('Speak softly')
+ # Can't overwrite directory
+ self.assertRaises(
+ ftplib.error_perm, conn.storbinary, 'STOR /test', fp)
+ fp = StringIO('Charity never faileth')
+ # Successful write
+ conn.storbinary('STOR /stuff', fp)
+ finally:
+ conn.close()
+ # Make sure no garbage was left behind.
+ self.testNOOP()
+
+
def testUSER(self):
self.assertEqual(self.execute('USER foo', 0),
status_msgs['PASS_REQUIRED'])
self.assertEqual(self.execute('USER', 0),
- status_msgs['CMD_UNKNOWN'] %'USER')
+ status_msgs['ERR_ARGS'])