[Zope3-checkins] CVS: Zope3/src/zope/server/ftp/tests - demofs.py:1.1 fstests.py:1.1 test_demofs.py:1.1 test_publisher.py:1.1 test_ftpserver.py:1.4 test_osemulators.py:NONE
Jim Fulton
jim@zope.com
Mon, 3 Feb 2003 10:09:37 -0500
Update of /cvs-repository/Zope3/src/zope/server/ftp/tests
In directory cvs.zope.org:/tmp/cvs-serv15846/src/zope/server/ftp/tests
Modified Files:
test_ftpserver.py
Added Files:
demofs.py fstests.py test_demofs.py test_publisher.py
Removed Files:
test_osemulators.py
Log Message:
Refactored the ftp framework to make it much simpler, less general,
and easier to maintain. This included ripping out the vfs framework.
=== Added File Zope3/src/zope/server/ftp/tests/demofs.py ===
##############################################################################
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
##############################################################################
"""Demo file-system implementation, for testing
$Id: demofs.py,v 1.1 2003/02/03 15:09:01 jim Exp $
"""
__metaclass__ = type
import posixpath
from zope.exceptions import Unauthorized
from zope.server.interfaces.ftp import IFileSystem
from zope.server.interfaces.ftp import IFileSystemAccess
execute = 1
read = 2
write = 4
class File:
type = 'f'
modified=None
def __init__(self):
self.access = {'anonymous': read}
def accessable(self, user, access=read):
return (user == 'root'
or (self.access.get(user, 0) & access)
or (self.access.get('anonymous', 0) & access)
)
def grant(self, user, access):
self.access[user] = self.access.get(user, 0) | access
def revoke(self, user, access):
self.access[user] = self.access.get(user, 0) ^ access
class Directory(File):
type = 'd'
def __init__(self):
super(Directory, self).__init__()
self.files = {}
def get(self, name, default=None):
return self.files.get(name, default)
def __getitem__(self, name):
return self.files[name]
def __setitem__(self, name, v):
self.files[name] = v
def __delitem__(self, name):
del self.files[name]
def __contains__(self, name):
return name in self.files
def __iter__(self):
return iter(self.files)
class DemoFileSystem:
__doc__ = IFileSystem.__doc__
__implements__ = IFileSystem
File = File
Directory = Directory
def __init__(self, files, user=''):
self.files = files
self.user = user
def get(self, path, default=None):
while path.startswith('/'):
path = path[1:]
d = self.files
if path:
for name in path.split('/'):
if d.type is not 'd':
return default
if not d.accessable(self.user):
raise Unauthorized
d = d.get(name)
if d is None:
break
return d
def getany(self, path):
d = self.get(path)
if d is None:
raise OSError("No such file or directory:", path)
return d
def getdir(self, path):
d = self.getany(path)
if d.type != 'd':
raise OSError("Not a directory:", path)
return d
def getfile(self, path):
d = self.getany(path)
if d.type != 'f':
raise OSError("Not a file:", path)
return d
def getwdir(self, path):
d = self.getdir(path)
if not d.accessable(self.user, write):
raise OSError("Permission denied")
return d
def type(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
f = self.get(path)
return getattr(f, 'type', None)
def names(self, path, filter=None):
"See zope.server.interfaces.ftp.IFileSystem"
f = list(self.getdir(path))
if filter is not None:
f = [name for name in f if filter(name)]
return f
def _lsinfo(self, name, file):
info = {
'type': file.type,
'name': name,
'group_read': file.accessable(self.user, read),
'group_write': file.accessable(self.user, write),
}
if file.type == 'f':
info['size'] = len(file.data)
if file.modified is not None:
info['mtime'] = file.modified
return info
def ls(self, path, filter=None):
"See zope.server.interfaces.ftp.IFileSystem"
f = self.getdir(path)
if filter is None:
return [self._lsinfo(name, f.files[name])
for name in f
]
return [self._lsinfo(name, f.files[name])
for name in f
if filter(name)]
def readfile(self, path, outstream, start=0, end=None):
"See zope.server.interfaces.ftp.IFileSystem"
f = self.getfile(path)
data = f.data
if end is not None:
data = data[:end]
if start:
data = data[start:]
outstream.write(data)
def lsinfo(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
f = self.getany(path)
return self._lsinfo(posixpath.split(path)[1], f)
def mtime(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
f = self.getany(path)
return f.modified
def size(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
f = self.getany(path)
return len(getattr(f, 'data', ''))
def mkdir(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
path, name = posixpath.split(path)
d = self.getwdir(path)
if name in d.files:
raise OSError("Already exists:", name)
newdir = self.Directory()
newdir.grant(self.user, read | write)
d.files[name] = newdir
def remove(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
path, name = posixpath.split(path)
d = self.getwdir(path)
if name not in d.files:
raise OSError("Not exists:", name)
f = d.files[name]
if f.type == 'd':
raise OSError('Is a directory:', name)
del d.files[name]
def rmdir(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
path, name = posixpath.split(path)
d = self.getwdir(path)
if name not in d.files:
raise OSError("Not exists:", name)
f = d.files[name]
if f.type != 'd':
raise OSError('Is not a directory:', name)
del d.files[name]
def rename(self, old, new):
"See zope.server.interfaces.ftp.IFileSystem"
oldpath, oldname = posixpath.split(old)
newpath, newname = posixpath.split(new)
olddir = self.getwdir(oldpath)
newdir = self.getwdir(newpath)
if oldname not in olddir.files:
raise OSError("Not exists:", oldname)
if newname in newdir.files:
raise OSError("Already exists:", newname)
newdir.files[newname] = olddir.files[oldname]
del olddir.files[oldname]
def writefile(self, path, instream, start=None, end=None, append=False):
"See zope.server.interfaces.ftp.IFileSystem"
path, name = posixpath.split(path)
d = self.getdir(path)
f = d.files.get(name)
if f is None:
d = self.getwdir(path)
f = d.files[name] = self.File()
f.grant(self.user, read | write)
elif f.type != 'f':
raise OSError("Can't overwrite a directory")
if not f.accessable(self.user, write):
raise OSError("Permission denied")
if append:
f.data += instream.read()
else:
if start:
if start < 0:
raise ValueError("Negative starting file position")
prefix = f.data[:start]
if len(prefix) < start:
prefix += '\0' * (start - len(prefix))
else:
prefix = ''
start=0
if end:
if end < 0:
raise ValueError("Negative ending file position")
l = end - start
newdata = instream.read(l)
f.data = prefix+newdata+f.data[start+len(newdata):]
else:
f.data = prefix + instream.read()
def writable(self, path):
"See zope.server.interfaces.ftp.IFileSystem"
path, name = posixpath.split(path)
d = self.getdir(path)
if name not in d:
return d.accessable(self.user, write)
f = d[name]
return f.type == 'f' and f.accessable(self.user, write)
class DemoFileSystemAccess:
__doc__ = IFileSystemAccess.__doc__
__implements__ = IFileSystemAccess
def __init__(self, files, users):
self.files = files
self.users = users
def authenticate(self, credentials):
"See zope.server.interfaces.ftp.IFileSystemAccess"
user, password = credentials
if user != 'anonymous':
if self.users.get(user) != password:
raise Unauthorized
return user
def open(self, credentials):
"See zope.server.interfaces.ftp.IFileSystemAccess"
user = self.authenticate(credentials)
return DemoFileSystem(self.files, user)
=== Added File Zope3/src/zope/server/ftp/tests/fstests.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Abstract file-system tests
$Id: fstests.py,v 1.1 2003/02/03 15:09:01 jim Exp $
"""
import stat
from StringIO import StringIO
from zope.interface.verify import verifyObject
from zope.server.interfaces.ftp import IFileSystem
class FileSystemTests:
"""Tests of a readable filesystem
"""
filesystem = None
dir_name = '/dir'
file_name = '/dir/file.txt'
unwritable_filename = '/dir/protected.txt'
dir_contents = ['file.txt', 'protected.txt']
file_contents = 'Lengthen your stride'
def test_type(self):
self.assertEqual(self.filesystem.type(self.dir_name), 'd')
self.assertEqual(self.filesystem.type(self.file_name), 'f')
def test_names(self):
lst = self.filesystem.names(self.dir_name)
lst.sort()
self.assertEqual(lst, self.dir_contents)
def test_readfile(self):
s = StringIO()
self.filesystem.readfile(self.file_name, s)
self.assertEqual(s.getvalue(), self.file_contents)
def testReadPartOfFile(self):
s = StringIO()
self.filesystem.readfile(self.file_name, s, 2)
self.assertEqual(s.getvalue(), self.file_contents[2:])
def testReadPartOfFile2(self):
s = StringIO()
self.filesystem.readfile(self.file_name, s, 1, 5)
self.assertEqual(s.getvalue(), self.file_contents[1:5])
def test_IFileSystemInterface(self):
verifyObject(IFileSystem, self.filesystem)
def testRemove(self):
self.filesystem.remove(self.file_name)
self.failIf(self.filesystem.type(self.file_name))
def testMkdir(self):
path = self.dir_name + '/x'
self.filesystem.mkdir(path)
self.assertEqual(self.filesystem.type(path), 'd')
def testRmdir(self):
self.filesystem.remove(self.file_name)
self.filesystem.rmdir(self.dir_name)
self.failIf(self.filesystem.type(self.dir_name))
def testRename(self):
self.filesystem.rename(self.file_name, self.file_name + '.bak')
self.assertEqual(self.filesystem.type(self.file_name), None)
self.assertEqual(self.filesystem.type(self.file_name + '.bak'), 'f')
def testWriteFile(self):
s = StringIO()
self.filesystem.readfile(self.file_name, s)
self.assertEqual(s.getvalue(), self.file_contents)
data = 'Always ' + self.file_contents
s = StringIO(data)
self.filesystem.writefile(self.file_name, s)
s = StringIO()
self.filesystem.readfile(self.file_name, s)
self.assertEqual(s.getvalue(), data)
def testAppendToFile(self):
data = ' again'
s = StringIO(data)
self.filesystem.writefile(self.file_name, s, append=True)
s = StringIO()
self.filesystem.readfile(self.file_name, s)
self.assertEqual(s.getvalue(), self.file_contents + data)
def testWritePartOfFile(self):
data = '123'
s = StringIO(data)
self.filesystem.writefile(self.file_name, s, 3, 6)
expect = self.file_contents[:3] + data + self.file_contents[6:]
s = StringIO()
self.filesystem.readfile(self.file_name, s)
self.assertEqual(s.getvalue(), expect)
def testWritePartOfFile_and_truncate(self):
data = '123'
s = StringIO(data)
self.filesystem.writefile(self.file_name, s, 3)
expect = self.file_contents[:3] + data
s = StringIO()
self.filesystem.readfile(self.file_name, s)
self.assertEqual(s.getvalue(), expect)
def testWriteBeyondEndOfFile(self):
partlen = len(self.file_contents) - 6
data = 'daylight savings'
s = StringIO(data)
self.filesystem.writefile(self.file_name, s, partlen)
expect = self.file_contents[:partlen] + data
s = StringIO()
self.filesystem.readfile(self.file_name, s)
self.assertEqual(s.getvalue(), expect)
def testWriteNewFile(self):
s = StringIO(self.file_contents)
self.filesystem.writefile(self.file_name + '.new', s)
s = StringIO()
self.filesystem.readfile(self.file_name, s)
self.assertEqual(s.getvalue(), self.file_contents)
def test_writable(self):
self.failIf(self.filesystem.writable(self.dir_name))
self.failIf(self.filesystem.writable(self.unwritable_filename))
self.failUnless(self.filesystem.writable(self.file_name))
self.failUnless(self.filesystem.writable(self.file_name+'1'))
=== Added File Zope3/src/zope/server/ftp/tests/test_demofs.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""XXX short summary goes here.
XXX longer description goes here.
$Id: test_demofs.py,v 1.1 2003/02/03 15:09:01 jim Exp $
"""
import demofs
from unittest import TestCase, TestSuite, main, makeSuite
from fstests import FileSystemTests
from StringIO import StringIO
class Test(FileSystemTests, TestCase):
def setUp(self):
root = demofs.Directory()
root.grant('bob', demofs.write)
fs = self.filesystem = demofs.DemoFileSystem(root, 'bob')
fs.mkdir(self.dir_name)
fs.writefile(self.file_name, StringIO(self.file_contents))
fs.writefile(self.unwritable_filename, StringIO("save this"))
fs.get(self.unwritable_filename).revoke('bob', demofs.write)
def test_suite():
return TestSuite((
makeSuite(Test),
))
if __name__=='__main__':
main(defaultTest='test_suite')
=== Added File Zope3/src/zope/server/ftp/tests/test_publisher.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""XXX short summary goes here.
XXX longer description goes here.
$Id: test_publisher.py,v 1.1 2003/02/03 15:09:01 jim Exp $
"""
import demofs
from unittest import TestCase, TestSuite, main, makeSuite
from fstests import FileSystemTests
from StringIO import StringIO
from zope.publisher.publish import mapply
from zope.server.ftp.publisher import PublisherFileSystem
class DemoFileSystem(demofs.DemoFileSystem):
def rename(self, path, old, new):
return demofs.DemoFileSystem.rename(
self, "%s/%s" % (path, old), "%s/%s" % (path, new))
class Publication:
def __init__(self, root):
self.root = root
def beforeTraversal(self, request):
pass
def getApplication(self, request):
return self.root
def afterTraversal(self, request, ob):
pass
def callObject(self, request, ob):
command = getattr(ob, request.env['command'])
if 'name' in request.env:
request.env['path'] += "/" + request.env['name']
return mapply(command, request = request.env)
def afterCall(self, request):
pass
def handleException(self, object, request, info, retry_allowed=True):
request.response._exc = info[:2]
class Request:
def __init__(self, input, output, env):
self.env = env
self.response = Response()
self.user = env['credentials']
del env['credentials']
def processInputs(self):
pass
def traverse(self, root):
root.user = self.user
return root
def close(self):
pass
class Response:
_exc = _body = None
def setBody(self, result):
self._body = result
def outputBody(self):
pass
def getResult(self):
if self._exc:
raise self._exc[0], self._exc[1]
return self._body
class RequestFactory:
def __init__(self, root):
self.pub = Publication(root)
def __call__(self, input, output, env):
r = Request(input, output, env)
r.publication = self.pub
return r
class TestPublisherFileSystem(FileSystemTests, TestCase):
def setUp(self):
root = demofs.Directory()
root.grant('bob', demofs.write)
fs = DemoFileSystem(root, 'bob')
fs.mkdir(self.dir_name)
fs.writefile(self.file_name, StringIO(self.file_contents))
fs.writefile(self.unwritable_filename, StringIO("save this"))
fs.get(self.unwritable_filename).revoke('bob', demofs.write)
self.filesystem = PublisherFileSystem('bob', RequestFactory(fs))
def test_suite():
return TestSuite((
makeSuite(TestPublisherFileSystem),
))
if __name__=='__main__':
main(defaultTest='test_suite')
=== Zope3/src/zope/server/ftp/tests/test_ftpserver.py 1.3 => 1.4 ===
--- Zope3/src/zope/server/ftp/tests/test_ftpserver.py:1.3 Thu Jan 30 11:01:11 2003
+++ Zope3/src/zope/server/ftp/tests/test_ftpserver.py Mon Feb 3 10:09:01 2003
@@ -29,16 +29,15 @@
from threading import Thread, Event
from zope.server.taskthreads import ThreadedTaskDispatcher
-from zope.server.ftp.ftpserver import FTPServer
-from zope.server.ftp.ftpstatusmessages import status_msgs
+from zope.server.ftp.server import FTPServer, status_messages
from zope.server.adjustments import Adjustments
from zope.server.interfaces import ITask
-from zope.server.vfs.osfilesystem import OSFileSystem
-from zope.server.ftp.testfilesystemaccess import TestFilesystemAccess
+from zope.server.ftp.tests import demofs
from zope.server.tests.asyncerror import AsyncoreErrorHook
+
import ftplib
from time import sleep, time
@@ -66,13 +65,22 @@
self.orig_map_size = len(asyncore.socket_map)
self.hook_asyncore_error()
- self.root_dir = tempfile.mktemp()
- os.mkdir(self.root_dir)
- os.mkdir(os.path.join(self.root_dir, 'test'))
-
- fs = OSFileSystem(self.root_dir)
- fs_access = TestFilesystemAccess(fs)
+ root_dir = demofs.Directory()
+ root_dir['test'] = demofs.Directory()
+ root_dir['test'].access['foo'] = 7
+ root_dir['private'] = demofs.Directory()
+ root_dir['private'].access['foo'] = 7
+ root_dir['private'].access['anonymous'] = 0
+
+ fs = demofs.DemoFileSystem(root_dir, 'foo')
+ fs.writefile('/test/existing', StringIO('test initial data'))
+ fs.writefile('/private/existing', StringIO('private initial data'))
+
+ self.__fs = fs = demofs.DemoFileSystem(root_dir, 'root')
+ fs.writefile('/existing', StringIO('root initial data'))
+ fs_access = demofs.DemoFileSystemAccess(root_dir, {'foo': 'bar'})
+
self.server = FTPServer(LOCALHOST, SERVER_PORT, fs_access,
task_dispatcher=td, adj=my_adj)
if CONNECT_TO_PORT == 0:
@@ -105,9 +113,6 @@
self.unhook_asyncore_error()
- if os.path.exists(self.root_dir):
- shutil.rmtree(self.root_dir)
-
def loop(self):
self.thread_started.set()
import select
@@ -131,10 +136,10 @@
if login:
ftp.send('USER foo\r\n')
self.assertEqual(ftp.recv(1024),
- status_msgs['PASS_REQUIRED'] +'\r\n')
+ status_messages['PASS_REQUIRED'] +'\r\n')
ftp.send('PASS bar\r\n')
self.assertEqual(ftp.recv(1024),
- status_msgs['LOGIN_SUCCESS'] +'\r\n')
+ status_messages['LOGIN_SUCCESS'] +'\r\n')
return ftp
@@ -158,40 +163,83 @@
def testABOR(self):
self.assertEqual(self.execute('ABOR', 1),
- status_msgs['TRANSFER_ABORTED'])
+ status_messages['TRANSFER_ABORTED'])
+
+
+ def testAPPE(self):
+ conn = ftplib.FTP()
+ try:
+ conn.connect(LOCALHOST, self.port)
+ conn.login('foo', 'bar')
+ fp = StringIO('Charity never faileth')
+ # Successful write
+ conn.storbinary('APPE /test/existing', fp)
+ self.assertEqual(self.__fs.files['test']['existing'].data,
+ 'test initial dataCharity never faileth')
+ finally:
+ conn.close()
+ # Make sure no garbage was left behind.
+ self.testNOOP()
+
+ def testAPPE_errors(self):
+ conn = ftplib.FTP()
+ try:
+ conn.connect(LOCALHOST, self.port)
+ conn.login('foo', 'bar')
+
+ fp = StringIO('Speak softly')
+ # Can't overwrite directory
+ self.assertRaises(
+ ftplib.error_perm, conn.storbinary, 'APPE /test', fp)
+
+ # No such file
+ self.assertRaises(
+ ftplib.error_perm, conn.storbinary, 'APPE /nosush', fp)
+
+ # No such dir
+ self.assertRaises(
+ ftplib.error_perm, conn.storbinary, 'APPE /nosush/f', fp)
+
+ # Not allowed
+ self.assertRaises(
+ ftplib.error_perm, conn.storbinary, 'APPE /existing', fp)
+
+ finally:
+ conn.close()
+ # Make sure no garbage was left behind.
+ self.testNOOP()
def testCDUP(self):
self.assertEqual(self.execute(('CWD test', 'CDUP'), 1),
- status_msgs['SUCCESS_250'] %'CDUP')
+ status_messages['SUCCESS_250'] %'CDUP')
self.assertEqual(self.execute('CDUP', 1),
- status_msgs['SUCCESS_250'] %'CDUP')
+ status_messages['SUCCESS_250'] %'CDUP')
def testCWD(self):
self.assertEqual(self.execute('CWD test', 1),
- status_msgs['SUCCESS_250'] %'CWD')
+ status_messages['SUCCESS_250'] %'CWD')
self.assertEqual(self.execute('CWD foo', 1),
- status_msgs['ERR_NO_DIR'] %'/foo')
+ status_messages['ERR_NO_DIR'] %'/foo')
def testDELE(self):
- open(os.path.join(self.root_dir, 'foo'), 'w').write('blah')
- self.assertEqual(self.execute('DELE foo', 1),
- status_msgs['SUCCESS_250'] %'DELE')
+ self.assertEqual(self.execute('DELE test/existing', 1),
+ status_messages['SUCCESS_250'] %'DELE')
res = self.execute('DELE bar', 1).split()[0]
self.assertEqual(res, '550')
self.assertEqual(self.execute('DELE', 1),
- status_msgs['ERR_ARGS'])
+ status_messages['ERR_ARGS'])
def XXXtestHELP(self):
# XXX This test doesn't work. I think it is because execute()
# doesn't read the whole reply. The execeute() helper
# function should be fixed, but that's for another day.
- result = status_msgs['HELP_START'] + '\r\n'
+ result = status_messages['HELP_START'] + '\r\n'
result += 'Help goes here somewhen.\r\n'
- result += status_msgs['HELP_END']
+ result += status_messages['HELP_END']
self.assertEqual(self.execute('HELP', 1), result)
@@ -200,7 +248,7 @@
conn = ftplib.FTP()
try:
conn.connect(LOCALHOST, self.port)
- conn.login('foo', 'bar')
+ conn.login('anonymous', 'bar')
self.assertRaises(ftplib.Error, retrlines, conn, 'LIST /foo')
listing = retrlines(conn, 'LIST')
self.assert_(len(listing) > 0)
@@ -212,16 +260,16 @@
self.testNOOP()
def testMKDLIST(self):
- self.execute(['MKD f1', 'MKD f2'], 1)
+ self.execute(['MKD test/f1', 'MKD test/f2'], 1)
conn = ftplib.FTP()
try:
conn.connect(LOCALHOST, self.port)
conn.login('foo', 'bar')
listing = []
- conn.retrlines('LIST', listing.append)
+ conn.retrlines('LIST /test', listing.append)
self.assert_(len(listing) > 2)
listing = []
- conn.retrlines('LIST -lad f1', listing.append)
+ conn.retrlines('LIST -lad test/f1', listing.append)
self.assertEqual(len(listing), 1)
self.assertEqual(listing[0][0], 'd')
finally:
@@ -232,23 +280,23 @@
def testNOOP(self):
self.assertEqual(self.execute('NOOP', 0),
- status_msgs['SUCCESS_200'] %'NOOP')
+ status_messages['SUCCESS_200'] %'NOOP')
self.assertEqual(self.execute('NOOP', 1),
- status_msgs['SUCCESS_200'] %'NOOP')
+ status_messages['SUCCESS_200'] %'NOOP')
def testPASS(self):
self.assertEqual(self.execute('PASS', 0),
- status_msgs['LOGIN_MISMATCH'])
+ status_messages['LOGIN_MISMATCH'])
self.assertEqual(self.execute(('USER blah', 'PASS bar'), 0),
- status_msgs['LOGIN_MISMATCH'])
+ status_messages['LOGIN_MISMATCH'])
def testQUIT(self):
self.assertEqual(self.execute('QUIT', 0),
- status_msgs['GOODBYE'])
+ status_messages['GOODBYE'])
self.assertEqual(self.execute('QUIT', 1),
- status_msgs['GOODBYE'])
+ status_messages['GOODBYE'])
def testSTOR(self):
@@ -262,7 +310,24 @@
ftplib.error_perm, conn.storbinary, 'STOR /test', fp)
fp = StringIO('Charity never faileth')
# Successful write
- conn.storbinary('STOR /stuff', fp)
+ conn.storbinary('STOR /test/stuff', fp)
+ self.assertEqual(self.__fs.files['test']['stuff'].data,
+ 'Charity never faileth')
+ finally:
+ conn.close()
+ # Make sure no garbage was left behind.
+ self.testNOOP()
+
+
+ def testSTOR_over(self):
+ conn = ftplib.FTP()
+ try:
+ conn.connect(LOCALHOST, self.port)
+ conn.login('foo', 'bar')
+ fp = StringIO('Charity never faileth')
+ conn.storbinary('STOR /test/existing', fp)
+ self.assertEqual(self.__fs.files['test']['existing'].data,
+ 'Charity never faileth')
finally:
conn.close()
# Make sure no garbage was left behind.
@@ -271,9 +336,9 @@
def testUSER(self):
self.assertEqual(self.execute('USER foo', 0),
- status_msgs['PASS_REQUIRED'])
+ status_messages['PASS_REQUIRED'])
self.assertEqual(self.execute('USER', 0),
- status_msgs['ERR_ARGS'])
+ status_messages['ERR_ARGS'])
=== Removed File Zope3/src/zope/server/ftp/tests/test_osemulators.py ===