[Zope3-checkins]
SVN: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/
moved all the FTP tests in under the FTP directory has I got
very confused over the tests setup
Michael Kerrin
michael.kerrin at openapp.biz
Thu Oct 6 16:22:37 EDT 2005
Log message for revision 38826:
moved all the FTP tests in under the FTP directory has I got very confused over the tests setup
Changed:
A Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/demofs.py
A Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/fstests.py
A Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_demofs.py
U Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_ftpserver.py
A Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_publisher.py
D Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/demofs.py
D Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/fstests.py
D Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_demofs.py
D Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_ftp.py
D Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_publisher.py
-=-
Added: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/demofs.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/demofs.py 2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/demofs.py 2005-10-06 20:22:37 UTC (rev 38826)
@@ -0,0 +1,310 @@
+##############################################################################
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+##############################################################################
+"""Demo file-system implementation, for testing
+
+$Id: demofs.py 27459 2004-09-07 01:45:52Z shane $
+"""
+import posixpath
+from zope.security.interfaces import Unauthorized
+from zope.app.twisted.interfaces import IFileSystem
+## from zope.server.interfaces.ftp import IFileSystemAccess
+from zope.interface import implements
+
+execute = 1
+read = 2
+write = 4
+
+class File(object):
+ type = 'f'
+ modified=None
+
+ def __init__(self):
+ self.access = {'anonymous': read}
+
+ def accessable(self, user, access=read):
+ return (user == 'root'
+ or (self.access.get(user, 0) & access)
+ or (self.access.get('anonymous', 0) & access)
+ )
+
+ def grant(self, user, access):
+ self.access[user] = self.access.get(user, 0) | access
+
+ def revoke(self, user, access):
+ self.access[user] = self.access.get(user, 0) ^ access
+
+class Directory(File):
+
+ type = 'd'
+
+ def __init__(self):
+ super(Directory, self).__init__()
+ self.files = {}
+
+ def get(self, name, default=None):
+ return self.files.get(name, default)
+
+ def __getitem__(self, name):
+ return self.files[name]
+
+ def __setitem__(self, name, v):
+ self.files[name] = v
+
+ def __delitem__(self, name):
+ del self.files[name]
+
+ def __contains__(self, name):
+ return name in self.files
+
+ def __iter__(self):
+ return iter(self.files)
+
+class DemoFileSystem(object):
+ __doc__ = IFileSystem.__doc__
+
+ implements(IFileSystem)
+
+ File = File
+ Directory = Directory
+
+ def __init__(self, files, user=''):
+ self.files = files
+ self.user = user
+
+ def get(self, path, default=None):
+
+ while path.startswith('/'):
+ path = path[1:]
+
+ d = self.files
+ if path:
+ for name in path.split('/'):
+ if d.type is not 'd':
+ return default
+ if not d.accessable(self.user):
+ raise Unauthorized
+ d = d.get(name)
+ if d is None:
+ break
+
+ return d
+
+ def getany(self, path):
+ d = self.get(path)
+ if d is None:
+ raise OSError("No such file or directory:", path)
+ return d
+
+ def getdir(self, path):
+ d = self.getany(path)
+ if d.type != 'd':
+ raise OSError("Not a directory:", path)
+ return d
+
+ def getfile(self, path):
+ d = self.getany(path)
+ if d.type != 'f':
+ raise OSError("Not a file:", path)
+ return d
+
+ def getwdir(self, path):
+ d = self.getdir(path)
+ if not d.accessable(self.user, write):
+ raise OSError("Permission denied")
+ return d
+
+ def type(self, path):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ f = self.get(path)
+ return getattr(f, 'type', None)
+
+ def names(self, path, filter=None):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ f = list(self.getdir(path))
+ if filter is not None:
+ f = [name for name in f if filter(name)]
+
+ return f
+
+ def _lsinfo(self, name, file):
+ info = {
+ 'type': file.type,
+ 'name': name,
+ 'group_read': file.accessable(self.user, read),
+ 'group_write': file.accessable(self.user, write),
+ }
+ if file.type == 'f':
+ info['size'] = len(file.data)
+ if file.modified is not None:
+ info['mtime'] = file.modified
+
+ return info
+
+ def ls(self, path, filter=None):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ f = self.getdir(path)
+ if filter is None:
+ return [self._lsinfo(name, f.files[name])
+ for name in f
+ ]
+
+ return [self._lsinfo(name, f.files[name])
+ for name in f
+ if filter(name)]
+
+ def readfile(self, path, outstream, start=0, end=None):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ f = self.getfile(path)
+
+ data = f.data
+ if end is not None:
+ data = data[:end]
+ if start:
+ data = data[start:]
+
+ outstream.write(data)
+
+ def lsinfo(self, path):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ f = self.getany(path)
+ return self._lsinfo(posixpath.split(path)[1], f)
+
+ def mtime(self, path):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ f = self.getany(path)
+ return f.modified
+
+ def size(self, path):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ f = self.getany(path)
+ return len(getattr(f, 'data', ''))
+
+ def mkdir(self, path):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ path, name = posixpath.split(path)
+ d = self.getwdir(path)
+ if name in d.files:
+ raise OSError("Already exists:", name)
+ newdir = self.Directory()
+ newdir.grant(self.user, read | write)
+ d.files[name] = newdir
+
+ def remove(self, path):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ path, name = posixpath.split(path)
+ d = self.getwdir(path)
+ if name not in d.files:
+ raise OSError("Not exists:", name)
+ f = d.files[name]
+ if f.type == 'd':
+ raise OSError('Is a directory:', name)
+ del d.files[name]
+
+ def rmdir(self, path):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ path, name = posixpath.split(path)
+ d = self.getwdir(path)
+ if name not in d.files:
+ raise OSError("Not exists:", name)
+ f = d.files[name]
+ if f.type != 'd':
+ raise OSError('Is not a directory:', name)
+ del d.files[name]
+
+ def rename(self, old, new):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ oldpath, oldname = posixpath.split(old)
+ newpath, newname = posixpath.split(new)
+
+ olddir = self.getwdir(oldpath)
+ newdir = self.getwdir(newpath)
+
+ if oldname not in olddir.files:
+ raise OSError("Not exists:", oldname)
+ if newname in newdir.files:
+ raise OSError("Already exists:", newname)
+
+ newdir.files[newname] = olddir.files[oldname]
+ del olddir.files[oldname]
+
+ def writefile(self, path, instream, start=None, end=None, append=False):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ path, name = posixpath.split(path)
+ d = self.getdir(path)
+ f = d.files.get(name)
+ if f is None:
+ d = self.getwdir(path)
+ f = d.files[name] = self.File()
+ f.grant(self.user, read | write)
+ elif f.type != 'f':
+ raise OSError("Can't overwrite a directory")
+
+ if not f.accessable(self.user, write):
+ raise OSError("Permission denied")
+
+ if append:
+ f.data += instream.read()
+ else:
+
+ if start:
+ if start < 0:
+ raise ValueError("Negative starting file position")
+ prefix = f.data[:start]
+ if len(prefix) < start:
+ prefix += '\0' * (start - len(prefix))
+ else:
+ prefix = ''
+ start=0
+
+ if end:
+ if end < 0:
+ raise ValueError("Negative ending file position")
+ l = end - start
+ newdata = instream.read(l)
+
+ f.data = prefix+newdata+f.data[start+len(newdata):]
+ else:
+ f.data = prefix + instream.read()
+
+ def writable(self, path):
+ "See zope.server.interfaces.ftp.IFileSystem"
+ path, name = posixpath.split(path)
+ try:
+ d = self.getdir(path)
+ except OSError:
+ return False
+ if name not in d:
+ return d.accessable(self.user, write)
+ f = d[name]
+ return f.type == 'f' and f.accessable(self.user, write)
+
+## class DemoFileSystemAccess(object):
+## __doc__ = IFileSystemAccess.__doc__
+
+## implements(IFileSystemAccess)
+
+## def __init__(self, files, users):
+## self.files = files
+## self.users = users
+
+## def authenticate(self, credentials):
+## "See zope.server.interfaces.ftp.IFileSystemAccess"
+## user, password = credentials
+## if user != 'anonymous':
+## if self.users.get(user) != password:
+## raise Unauthorized
+## return user
+
+## def open(self, credentials):
+## "See zope.server.interfaces.ftp.IFileSystemAccess"
+## user = self.authenticate(credentials)
+## return DemoFileSystem(self.files, user)
Property changes on: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/demofs.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/fstests.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/fstests.py 2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/fstests.py 2005-10-06 20:22:37 UTC (rev 38826)
@@ -0,0 +1,156 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Abstract file-system tests
+
+$Id: fstests.py 26559 2004-07-15 21:22:32Z srichter $
+"""
+from StringIO import StringIO
+from zope.interface.verify import verifyObject
+from zope.app.twisted.interfaces import IFileSystem
+
+class FileSystemTests(object):
+ """Tests of a readable filesystem
+ """
+
+ filesystem = None
+ dir_name = '/dir'
+ file_name = '/dir/file.txt'
+ unwritable_filename = '/dir/protected.txt'
+ dir_contents = ['file.txt', 'protected.txt']
+ file_contents = 'Lengthen your stride'
+
+ def test_type(self):
+ self.assertEqual(self.filesystem.type(self.dir_name), 'd')
+ self.assertEqual(self.filesystem.type(self.file_name), 'f')
+
+
+ def test_names(self):
+ lst = self.filesystem.names(self.dir_name)
+ lst.sort()
+ self.assertEqual(lst, self.dir_contents)
+
+ def test_readfile(self):
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, s)
+ self.assertEqual(s.getvalue(), self.file_contents)
+
+
+ def testReadPartOfFile(self):
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, s, 2)
+ self.assertEqual(s.getvalue(), self.file_contents[2:])
+
+
+ def testReadPartOfFile2(self):
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, s, 1, 5)
+ self.assertEqual(s.getvalue(), self.file_contents[1:5])
+
+ def test_IFileSystemInterface(self):
+ verifyObject(IFileSystem, self.filesystem)
+
+ def testRemove(self):
+ self.filesystem.remove(self.file_name)
+ self.failIf(self.filesystem.type(self.file_name))
+
+
+ def testMkdir(self):
+ path = self.dir_name + '/x'
+ self.filesystem.mkdir(path)
+ self.assertEqual(self.filesystem.type(path), 'd')
+
+ def testRmdir(self):
+ self.filesystem.remove(self.file_name)
+ self.filesystem.rmdir(self.dir_name)
+ self.failIf(self.filesystem.type(self.dir_name))
+
+
+ def testRename(self):
+ self.filesystem.rename(self.file_name, self.file_name + '.bak')
+ self.assertEqual(self.filesystem.type(self.file_name), None)
+ self.assertEqual(self.filesystem.type(self.file_name + '.bak'), 'f')
+
+
+ def testWriteFile(self):
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, s)
+ self.assertEqual(s.getvalue(), self.file_contents)
+
+ data = 'Always ' + self.file_contents
+ s = StringIO(data)
+ self.filesystem.writefile(self.file_name, s)
+
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, s)
+ self.assertEqual(s.getvalue(), data)
+
+
+ def testAppendToFile(self):
+ data = ' again'
+ s = StringIO(data)
+ self.filesystem.writefile(self.file_name, s, append=True)
+
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, s)
+ self.assertEqual(s.getvalue(), self.file_contents + data)
+
+ def testWritePartOfFile(self):
+ data = '123'
+ s = StringIO(data)
+ self.filesystem.writefile(self.file_name, s, 3, 6)
+
+ expect = self.file_contents[:3] + data + self.file_contents[6:]
+
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, s)
+ self.assertEqual(s.getvalue(), expect)
+
+ def testWritePartOfFile_and_truncate(self):
+ data = '123'
+ s = StringIO(data)
+ self.filesystem.writefile(self.file_name, s, 3)
+
+ expect = self.file_contents[:3] + data
+
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, s)
+ self.assertEqual(s.getvalue(), expect)
+
+ def testWriteBeyondEndOfFile(self):
+ partlen = len(self.file_contents) - 6
+ data = 'daylight savings'
+ s = StringIO(data)
+ self.filesystem.writefile(self.file_name, s, partlen)
+
+ expect = self.file_contents[:partlen] + data
+
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, s)
+ self.assertEqual(s.getvalue(), expect)
+
+
+ def testWriteNewFile(self):
+ s = StringIO(self.file_contents)
+ self.filesystem.writefile(self.file_name + '.new', s)
+
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, s)
+ self.assertEqual(s.getvalue(), self.file_contents)
+
+
+ def test_writable(self):
+ self.failIf(self.filesystem.writable(self.dir_name))
+ self.failIf(self.filesystem.writable(self.unwritable_filename))
+ self.failUnless(self.filesystem.writable(self.file_name))
+ self.failUnless(self.filesystem.writable(self.file_name+'1'))
Property changes on: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/fstests.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_demofs.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_demofs.py 2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_demofs.py 2005-10-06 20:22:37 UTC (rev 38826)
@@ -0,0 +1,40 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test the Demo Filesystem implementation.
+
+$Id: test_demofs.py 26476 2004-07-13 16:59:11Z srichter $
+"""
+import demofs
+from unittest import TestCase, TestSuite, main, makeSuite
+from fstests import FileSystemTests
+from StringIO import StringIO
+
+class Test(FileSystemTests, TestCase):
+
+ def setUp(self):
+ root = demofs.Directory()
+ root.grant('bob', demofs.write)
+ fs = self.filesystem = demofs.DemoFileSystem(root, 'bob')
+ fs.mkdir(self.dir_name)
+ fs.writefile(self.file_name, StringIO(self.file_contents))
+ fs.writefile(self.unwritable_filename, StringIO("save this"))
+ fs.get(self.unwritable_filename).revoke('bob', demofs.write)
+
+def test_suite():
+ return TestSuite((
+ makeSuite(Test),
+ ))
+
+if __name__=='__main__':
+ main(defaultTest='test_suite')
Property changes on: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_demofs.py
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_ftpserver.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_ftpserver.py 2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_ftpserver.py 2005-10-06 20:22:37 UTC (rev 38826)
@@ -26,8 +26,8 @@
from zope.app.twisted.ftp.server import FTPFactory
-from zope.app.twisted.tests.test_publisher import RequestFactory
-from zope.app.twisted.tests import demofs
+from test_publisher import RequestFactory
+import demofs
class TestServerSetup(TestCase):
Added: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_publisher.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_publisher.py 2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_publisher.py 2005-10-06 20:22:37 UTC (rev 38826)
@@ -0,0 +1,123 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test the FTP publisher.
+
+$Id: test_publisher.py 26559 2004-07-15 21:22:32Z srichter $
+"""
+import demofs
+from unittest import TestCase, TestSuite, main, makeSuite
+from fstests import FileSystemTests
+from StringIO import StringIO
+from zope.publisher.publish import mapply
+from zope.app.twisted.ftp.utils import PublisherFileSystem
+
+class DemoFileSystem(demofs.DemoFileSystem):
+
+ def rename(self, path, old, new):
+ return demofs.DemoFileSystem.rename(
+ self, "%s/%s" % (path, old), "%s/%s" % (path, new))
+
+class Publication(object):
+
+ def __init__(self, root):
+ self.root = root
+
+ def beforeTraversal(self, request):
+ pass
+
+ def getApplication(self, request):
+ return self.root
+
+ def afterTraversal(self, request, ob):
+ pass
+
+ def callObject(self, request, ob):
+ command = getattr(ob, request.env['command'])
+ if 'name' in request.env:
+ request.env['path'] += "/" + request.env['name']
+ return mapply(command, request = request.env)
+
+ def afterCall(self, request, ob):
+ pass
+
+ def endRequest(self, request, ob):
+ pass
+
+ def handleException(self, object, request, info, retry_allowed=True):
+ request.response._exc = info[:2]
+
+
+class Request(object):
+
+ def __init__(self, input, env):
+ self.env = env
+ self.response = Response()
+ self.user = env['credentials']
+ del env['credentials']
+
+ def processInputs(self):
+ pass
+
+ def traverse(self, root):
+ root.user = self.user
+ return root
+
+ def close(self):
+ pass
+
+class Response(object):
+
+ _exc = _body = None
+
+ def setResult(self, result):
+ self._body = result
+
+ def outputBody(self):
+ pass
+
+ def getResult(self):
+ if self._exc:
+ raise self._exc[0], self._exc[1]
+ return self._body
+
+class RequestFactory(object):
+
+ def __init__(self, root):
+ self.pub = Publication(root)
+
+ def __call__(self, input, env):
+ r = Request(input, env)
+ r.publication = self.pub
+ return r
+
+class TestPublisherFileSystem(FileSystemTests, TestCase):
+
+ def setUp(self):
+ root = demofs.Directory()
+ root.grant('bob', demofs.write)
+ fs = DemoFileSystem(root, 'bob')
+ fs.mkdir(self.dir_name)
+ fs.writefile(self.file_name, StringIO(self.file_contents))
+ fs.writefile(self.unwritable_filename, StringIO("save this"))
+ fs.get(self.unwritable_filename).revoke('bob', demofs.write)
+
+ self.filesystem = PublisherFileSystem('bob', RequestFactory(fs))
+
+def test_suite():
+ return TestSuite((
+ makeSuite(TestPublisherFileSystem),
+ ))
+
+if __name__=='__main__':
+ main(defaultTest='test_suite')
Property changes on: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_publisher.py
___________________________________________________________________
Name: svn:eol-style
+ native
Deleted: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/demofs.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/demofs.py 2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/demofs.py 2005-10-06 20:22:37 UTC (rev 38826)
@@ -1,310 +0,0 @@
-##############################################################################
-# Copyright (c) 2003 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-##############################################################################
-"""Demo file-system implementation, for testing
-
-$Id: demofs.py 27459 2004-09-07 01:45:52Z shane $
-"""
-import posixpath
-from zope.security.interfaces import Unauthorized
-from zope.app.twisted.interfaces import IFileSystem
-## from zope.server.interfaces.ftp import IFileSystemAccess
-from zope.interface import implements
-
-execute = 1
-read = 2
-write = 4
-
-class File(object):
- type = 'f'
- modified=None
-
- def __init__(self):
- self.access = {'anonymous': read}
-
- def accessable(self, user, access=read):
- return (user == 'root'
- or (self.access.get(user, 0) & access)
- or (self.access.get('anonymous', 0) & access)
- )
-
- def grant(self, user, access):
- self.access[user] = self.access.get(user, 0) | access
-
- def revoke(self, user, access):
- self.access[user] = self.access.get(user, 0) ^ access
-
-class Directory(File):
-
- type = 'd'
-
- def __init__(self):
- super(Directory, self).__init__()
- self.files = {}
-
- def get(self, name, default=None):
- return self.files.get(name, default)
-
- def __getitem__(self, name):
- return self.files[name]
-
- def __setitem__(self, name, v):
- self.files[name] = v
-
- def __delitem__(self, name):
- del self.files[name]
-
- def __contains__(self, name):
- return name in self.files
-
- def __iter__(self):
- return iter(self.files)
-
-class DemoFileSystem(object):
- __doc__ = IFileSystem.__doc__
-
- implements(IFileSystem)
-
- File = File
- Directory = Directory
-
- def __init__(self, files, user=''):
- self.files = files
- self.user = user
-
- def get(self, path, default=None):
-
- while path.startswith('/'):
- path = path[1:]
-
- d = self.files
- if path:
- for name in path.split('/'):
- if d.type is not 'd':
- return default
- if not d.accessable(self.user):
- raise Unauthorized
- d = d.get(name)
- if d is None:
- break
-
- return d
-
- def getany(self, path):
- d = self.get(path)
- if d is None:
- raise OSError("No such file or directory:", path)
- return d
-
- def getdir(self, path):
- d = self.getany(path)
- if d.type != 'd':
- raise OSError("Not a directory:", path)
- return d
-
- def getfile(self, path):
- d = self.getany(path)
- if d.type != 'f':
- raise OSError("Not a file:", path)
- return d
-
- def getwdir(self, path):
- d = self.getdir(path)
- if not d.accessable(self.user, write):
- raise OSError("Permission denied")
- return d
-
- def type(self, path):
- "See zope.server.interfaces.ftp.IFileSystem"
- f = self.get(path)
- return getattr(f, 'type', None)
-
- def names(self, path, filter=None):
- "See zope.server.interfaces.ftp.IFileSystem"
- f = list(self.getdir(path))
- if filter is not None:
- f = [name for name in f if filter(name)]
-
- return f
-
- def _lsinfo(self, name, file):
- info = {
- 'type': file.type,
- 'name': name,
- 'group_read': file.accessable(self.user, read),
- 'group_write': file.accessable(self.user, write),
- }
- if file.type == 'f':
- info['size'] = len(file.data)
- if file.modified is not None:
- info['mtime'] = file.modified
-
- return info
-
- def ls(self, path, filter=None):
- "See zope.server.interfaces.ftp.IFileSystem"
- f = self.getdir(path)
- if filter is None:
- return [self._lsinfo(name, f.files[name])
- for name in f
- ]
-
- return [self._lsinfo(name, f.files[name])
- for name in f
- if filter(name)]
-
- def readfile(self, path, outstream, start=0, end=None):
- "See zope.server.interfaces.ftp.IFileSystem"
- f = self.getfile(path)
-
- data = f.data
- if end is not None:
- data = data[:end]
- if start:
- data = data[start:]
-
- outstream.write(data)
-
- def lsinfo(self, path):
- "See zope.server.interfaces.ftp.IFileSystem"
- f = self.getany(path)
- return self._lsinfo(posixpath.split(path)[1], f)
-
- def mtime(self, path):
- "See zope.server.interfaces.ftp.IFileSystem"
- f = self.getany(path)
- return f.modified
-
- def size(self, path):
- "See zope.server.interfaces.ftp.IFileSystem"
- f = self.getany(path)
- return len(getattr(f, 'data', ''))
-
- def mkdir(self, path):
- "See zope.server.interfaces.ftp.IFileSystem"
- path, name = posixpath.split(path)
- d = self.getwdir(path)
- if name in d.files:
- raise OSError("Already exists:", name)
- newdir = self.Directory()
- newdir.grant(self.user, read | write)
- d.files[name] = newdir
-
- def remove(self, path):
- "See zope.server.interfaces.ftp.IFileSystem"
- path, name = posixpath.split(path)
- d = self.getwdir(path)
- if name not in d.files:
- raise OSError("Not exists:", name)
- f = d.files[name]
- if f.type == 'd':
- raise OSError('Is a directory:', name)
- del d.files[name]
-
- def rmdir(self, path):
- "See zope.server.interfaces.ftp.IFileSystem"
- path, name = posixpath.split(path)
- d = self.getwdir(path)
- if name not in d.files:
- raise OSError("Not exists:", name)
- f = d.files[name]
- if f.type != 'd':
- raise OSError('Is not a directory:', name)
- del d.files[name]
-
- def rename(self, old, new):
- "See zope.server.interfaces.ftp.IFileSystem"
- oldpath, oldname = posixpath.split(old)
- newpath, newname = posixpath.split(new)
-
- olddir = self.getwdir(oldpath)
- newdir = self.getwdir(newpath)
-
- if oldname not in olddir.files:
- raise OSError("Not exists:", oldname)
- if newname in newdir.files:
- raise OSError("Already exists:", newname)
-
- newdir.files[newname] = olddir.files[oldname]
- del olddir.files[oldname]
-
- def writefile(self, path, instream, start=None, end=None, append=False):
- "See zope.server.interfaces.ftp.IFileSystem"
- path, name = posixpath.split(path)
- d = self.getdir(path)
- f = d.files.get(name)
- if f is None:
- d = self.getwdir(path)
- f = d.files[name] = self.File()
- f.grant(self.user, read | write)
- elif f.type != 'f':
- raise OSError("Can't overwrite a directory")
-
- if not f.accessable(self.user, write):
- raise OSError("Permission denied")
-
- if append:
- f.data += instream.read()
- else:
-
- if start:
- if start < 0:
- raise ValueError("Negative starting file position")
- prefix = f.data[:start]
- if len(prefix) < start:
- prefix += '\0' * (start - len(prefix))
- else:
- prefix = ''
- start=0
-
- if end:
- if end < 0:
- raise ValueError("Negative ending file position")
- l = end - start
- newdata = instream.read(l)
-
- f.data = prefix+newdata+f.data[start+len(newdata):]
- else:
- f.data = prefix + instream.read()
-
- def writable(self, path):
- "See zope.server.interfaces.ftp.IFileSystem"
- path, name = posixpath.split(path)
- try:
- d = self.getdir(path)
- except OSError:
- return False
- if name not in d:
- return d.accessable(self.user, write)
- f = d[name]
- return f.type == 'f' and f.accessable(self.user, write)
-
-## class DemoFileSystemAccess(object):
-## __doc__ = IFileSystemAccess.__doc__
-
-## implements(IFileSystemAccess)
-
-## def __init__(self, files, users):
-## self.files = files
-## self.users = users
-
-## def authenticate(self, credentials):
-## "See zope.server.interfaces.ftp.IFileSystemAccess"
-## user, password = credentials
-## if user != 'anonymous':
-## if self.users.get(user) != password:
-## raise Unauthorized
-## return user
-
-## def open(self, credentials):
-## "See zope.server.interfaces.ftp.IFileSystemAccess"
-## user = self.authenticate(credentials)
-## return DemoFileSystem(self.files, user)
Deleted: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/fstests.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/fstests.py 2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/fstests.py 2005-10-06 20:22:37 UTC (rev 38826)
@@ -1,156 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Abstract file-system tests
-
-$Id: fstests.py 26559 2004-07-15 21:22:32Z srichter $
-"""
-from StringIO import StringIO
-from zope.interface.verify import verifyObject
-from zope.app.twisted.interfaces import IFileSystem
-
-class FileSystemTests(object):
- """Tests of a readable filesystem
- """
-
- filesystem = None
- dir_name = '/dir'
- file_name = '/dir/file.txt'
- unwritable_filename = '/dir/protected.txt'
- dir_contents = ['file.txt', 'protected.txt']
- file_contents = 'Lengthen your stride'
-
- def test_type(self):
- self.assertEqual(self.filesystem.type(self.dir_name), 'd')
- self.assertEqual(self.filesystem.type(self.file_name), 'f')
-
-
- def test_names(self):
- lst = self.filesystem.names(self.dir_name)
- lst.sort()
- self.assertEqual(lst, self.dir_contents)
-
- def test_readfile(self):
- s = StringIO()
- self.filesystem.readfile(self.file_name, s)
- self.assertEqual(s.getvalue(), self.file_contents)
-
-
- def testReadPartOfFile(self):
- s = StringIO()
- self.filesystem.readfile(self.file_name, s, 2)
- self.assertEqual(s.getvalue(), self.file_contents[2:])
-
-
- def testReadPartOfFile2(self):
- s = StringIO()
- self.filesystem.readfile(self.file_name, s, 1, 5)
- self.assertEqual(s.getvalue(), self.file_contents[1:5])
-
- def test_IFileSystemInterface(self):
- verifyObject(IFileSystem, self.filesystem)
-
- def testRemove(self):
- self.filesystem.remove(self.file_name)
- self.failIf(self.filesystem.type(self.file_name))
-
-
- def testMkdir(self):
- path = self.dir_name + '/x'
- self.filesystem.mkdir(path)
- self.assertEqual(self.filesystem.type(path), 'd')
-
- def testRmdir(self):
- self.filesystem.remove(self.file_name)
- self.filesystem.rmdir(self.dir_name)
- self.failIf(self.filesystem.type(self.dir_name))
-
-
- def testRename(self):
- self.filesystem.rename(self.file_name, self.file_name + '.bak')
- self.assertEqual(self.filesystem.type(self.file_name), None)
- self.assertEqual(self.filesystem.type(self.file_name + '.bak'), 'f')
-
-
- def testWriteFile(self):
- s = StringIO()
- self.filesystem.readfile(self.file_name, s)
- self.assertEqual(s.getvalue(), self.file_contents)
-
- data = 'Always ' + self.file_contents
- s = StringIO(data)
- self.filesystem.writefile(self.file_name, s)
-
- s = StringIO()
- self.filesystem.readfile(self.file_name, s)
- self.assertEqual(s.getvalue(), data)
-
-
- def testAppendToFile(self):
- data = ' again'
- s = StringIO(data)
- self.filesystem.writefile(self.file_name, s, append=True)
-
- s = StringIO()
- self.filesystem.readfile(self.file_name, s)
- self.assertEqual(s.getvalue(), self.file_contents + data)
-
- def testWritePartOfFile(self):
- data = '123'
- s = StringIO(data)
- self.filesystem.writefile(self.file_name, s, 3, 6)
-
- expect = self.file_contents[:3] + data + self.file_contents[6:]
-
- s = StringIO()
- self.filesystem.readfile(self.file_name, s)
- self.assertEqual(s.getvalue(), expect)
-
- def testWritePartOfFile_and_truncate(self):
- data = '123'
- s = StringIO(data)
- self.filesystem.writefile(self.file_name, s, 3)
-
- expect = self.file_contents[:3] + data
-
- s = StringIO()
- self.filesystem.readfile(self.file_name, s)
- self.assertEqual(s.getvalue(), expect)
-
- def testWriteBeyondEndOfFile(self):
- partlen = len(self.file_contents) - 6
- data = 'daylight savings'
- s = StringIO(data)
- self.filesystem.writefile(self.file_name, s, partlen)
-
- expect = self.file_contents[:partlen] + data
-
- s = StringIO()
- self.filesystem.readfile(self.file_name, s)
- self.assertEqual(s.getvalue(), expect)
-
-
- def testWriteNewFile(self):
- s = StringIO(self.file_contents)
- self.filesystem.writefile(self.file_name + '.new', s)
-
- s = StringIO()
- self.filesystem.readfile(self.file_name, s)
- self.assertEqual(s.getvalue(), self.file_contents)
-
-
- def test_writable(self):
- self.failIf(self.filesystem.writable(self.dir_name))
- self.failIf(self.filesystem.writable(self.unwritable_filename))
- self.failUnless(self.filesystem.writable(self.file_name))
- self.failUnless(self.filesystem.writable(self.file_name+'1'))
Deleted: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_demofs.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_demofs.py 2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_demofs.py 2005-10-06 20:22:37 UTC (rev 38826)
@@ -1,40 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2003 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Test the Demo Filesystem implementation.
-
-$Id: test_demofs.py 26476 2004-07-13 16:59:11Z srichter $
-"""
-import demofs
-from unittest import TestCase, TestSuite, main, makeSuite
-from fstests import FileSystemTests
-from StringIO import StringIO
-
-class Test(FileSystemTests, TestCase):
-
- def setUp(self):
- root = demofs.Directory()
- root.grant('bob', demofs.write)
- fs = self.filesystem = demofs.DemoFileSystem(root, 'bob')
- fs.mkdir(self.dir_name)
- fs.writefile(self.file_name, StringIO(self.file_contents))
- fs.writefile(self.unwritable_filename, StringIO("save this"))
- fs.get(self.unwritable_filename).revoke('bob', demofs.write)
-
-def test_suite():
- return TestSuite((
- makeSuite(Test),
- ))
-
-if __name__=='__main__':
- main(defaultTest='test_suite')
Deleted: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_ftp.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_ftp.py 2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_ftp.py 2005-10-06 20:22:37 UTC (rev 38826)
@@ -1,28 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2004 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Doc tests for the FTP server.
-
-$Id$
-"""
-import unittest
-from zope.testing.doctestunit import DocTestSuite
-
-def test_suite():
- pass
- #return unittest.TestSuite((
- # DocTestSuite('zope.app.twisted.ftp'),
- # ))
-
-if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')
Deleted: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_publisher.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_publisher.py 2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_publisher.py 2005-10-06 20:22:37 UTC (rev 38826)
@@ -1,123 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2003 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Test the FTP publisher.
-
-$Id: test_publisher.py 26559 2004-07-15 21:22:32Z srichter $
-"""
-import demofs
-from unittest import TestCase, TestSuite, main, makeSuite
-from fstests import FileSystemTests
-from StringIO import StringIO
-from zope.publisher.publish import mapply
-from zope.app.twisted.ftp.utils import PublisherFileSystem
-
-class DemoFileSystem(demofs.DemoFileSystem):
-
- def rename(self, path, old, new):
- return demofs.DemoFileSystem.rename(
- self, "%s/%s" % (path, old), "%s/%s" % (path, new))
-
-class Publication(object):
-
- def __init__(self, root):
- self.root = root
-
- def beforeTraversal(self, request):
- pass
-
- def getApplication(self, request):
- return self.root
-
- def afterTraversal(self, request, ob):
- pass
-
- def callObject(self, request, ob):
- command = getattr(ob, request.env['command'])
- if 'name' in request.env:
- request.env['path'] += "/" + request.env['name']
- return mapply(command, request = request.env)
-
- def afterCall(self, request, ob):
- pass
-
- def endRequest(self, request, ob):
- pass
-
- def handleException(self, object, request, info, retry_allowed=True):
- request.response._exc = info[:2]
-
-
-class Request(object):
-
- def __init__(self, input, env):
- self.env = env
- self.response = Response()
- self.user = env['credentials']
- del env['credentials']
-
- def processInputs(self):
- pass
-
- def traverse(self, root):
- root.user = self.user
- return root
-
- def close(self):
- pass
-
-class Response(object):
-
- _exc = _body = None
-
- def setResult(self, result):
- self._body = result
-
- def outputBody(self):
- pass
-
- def getResult(self):
- if self._exc:
- raise self._exc[0], self._exc[1]
- return self._body
-
-class RequestFactory(object):
-
- def __init__(self, root):
- self.pub = Publication(root)
-
- def __call__(self, input, env):
- r = Request(input, env)
- r.publication = self.pub
- return r
-
-class TestPublisherFileSystem(FileSystemTests, TestCase):
-
- def setUp(self):
- root = demofs.Directory()
- root.grant('bob', demofs.write)
- fs = DemoFileSystem(root, 'bob')
- fs.mkdir(self.dir_name)
- fs.writefile(self.file_name, StringIO(self.file_contents))
- fs.writefile(self.unwritable_filename, StringIO("save this"))
- fs.get(self.unwritable_filename).revoke('bob', demofs.write)
-
- self.filesystem = PublisherFileSystem('bob', RequestFactory(fs))
-
-def test_suite():
- return TestSuite((
- makeSuite(TestPublisherFileSystem),
- ))
-
-if __name__=='__main__':
- main(defaultTest='test_suite')
More information about the Zope3-Checkins
mailing list