[Zope3-checkins] SVN: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ moved all the FTP tests in under the FTP directory has I got very confused over the tests setup

Michael Kerrin michael.kerrin at openapp.biz
Thu Oct 6 16:22:37 EDT 2005


Log message for revision 38826:
  moved all the FTP tests in under the FTP directory has I got very confused over the tests setup

Changed:
  A   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/demofs.py
  A   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/fstests.py
  A   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_demofs.py
  U   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_ftpserver.py
  A   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_publisher.py
  D   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/demofs.py
  D   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/fstests.py
  D   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_demofs.py
  D   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_ftp.py
  D   Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_publisher.py

-=-
Added: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/demofs.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/demofs.py	2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/demofs.py	2005-10-06 20:22:37 UTC (rev 38826)
@@ -0,0 +1,310 @@
+##############################################################################
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+##############################################################################
+"""Demo file-system implementation, for testing
+
+$Id: demofs.py 27459 2004-09-07 01:45:52Z shane $
+"""
+import posixpath
+from zope.security.interfaces import Unauthorized
+from zope.app.twisted.interfaces import IFileSystem
+## from zope.server.interfaces.ftp import IFileSystemAccess
+from zope.interface import implements
+
+execute = 1
+read = 2
+write = 4
+
+class File(object):
+    type = 'f'
+    modified=None
+
+    def __init__(self):
+        self.access = {'anonymous': read}
+
+    def accessable(self, user, access=read):
+        return (user == 'root'
+                or (self.access.get(user, 0) & access)
+                or (self.access.get('anonymous', 0) & access)
+                )
+
+    def grant(self, user, access):
+        self.access[user] = self.access.get(user, 0) | access
+
+    def revoke(self, user, access):
+        self.access[user] = self.access.get(user, 0) ^ access
+
+class Directory(File):
+
+    type = 'd'
+
+    def __init__(self):
+        super(Directory, self).__init__()
+        self.files = {}
+
+    def get(self, name, default=None):
+        return self.files.get(name, default)
+
+    def __getitem__(self, name):
+        return self.files[name]
+
+    def __setitem__(self, name, v):
+        self.files[name] = v
+
+    def __delitem__(self, name):
+        del self.files[name]
+
+    def __contains__(self, name):
+        return name in self.files
+
+    def __iter__(self):
+        return iter(self.files)
+
+class DemoFileSystem(object):
+    __doc__ = IFileSystem.__doc__
+
+    implements(IFileSystem)
+
+    File = File
+    Directory = Directory
+
+    def __init__(self, files, user=''):
+        self.files = files
+        self.user = user
+
+    def get(self, path, default=None):
+
+        while path.startswith('/'):
+            path = path[1:]
+
+        d = self.files
+        if path:
+            for name in path.split('/'):
+                if d.type is not 'd':
+                    return default
+                if not d.accessable(self.user):
+                    raise Unauthorized
+                d = d.get(name)
+                if d is None:
+                    break
+
+        return d
+
+    def getany(self, path):
+        d = self.get(path)
+        if d is None:
+            raise OSError("No such file or directory:", path)
+        return d
+
+    def getdir(self, path):
+        d = self.getany(path)
+        if d.type != 'd':
+            raise OSError("Not a directory:", path)
+        return d
+
+    def getfile(self, path):
+        d = self.getany(path)
+        if d.type != 'f':
+            raise OSError("Not a file:", path)
+        return d
+
+    def getwdir(self, path):
+        d = self.getdir(path)
+        if not d.accessable(self.user, write):
+            raise OSError("Permission denied")
+        return d
+
+    def type(self, path):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        f = self.get(path)
+        return getattr(f, 'type', None)
+
+    def names(self, path, filter=None):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        f = list(self.getdir(path))
+        if filter is not None:
+            f = [name for name in f if filter(name)]
+
+        return f
+
+    def _lsinfo(self, name, file):
+        info = {
+            'type': file.type,
+            'name': name,
+            'group_read': file.accessable(self.user, read),
+            'group_write': file.accessable(self.user, write),
+            }
+        if file.type == 'f':
+            info['size'] = len(file.data)
+        if file.modified is not None:
+            info['mtime'] = file.modified
+
+        return info
+
+    def ls(self, path, filter=None):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        f = self.getdir(path)
+        if filter is None:
+            return [self._lsinfo(name, f.files[name])
+                    for name in f
+                    ]
+
+        return [self._lsinfo(name, f.files[name])
+                for name in f
+                if filter(name)]
+
+    def readfile(self, path, outstream, start=0, end=None):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        f = self.getfile(path)
+
+        data = f.data
+        if end is not None:
+            data = data[:end]
+        if start:
+            data = data[start:]
+
+        outstream.write(data)
+
+    def lsinfo(self, path):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        f = self.getany(path)
+        return self._lsinfo(posixpath.split(path)[1], f)
+
+    def mtime(self, path):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        f = self.getany(path)
+        return f.modified
+
+    def size(self, path):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        f = self.getany(path)
+        return len(getattr(f, 'data', ''))
+
+    def mkdir(self, path):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        path, name = posixpath.split(path)
+        d = self.getwdir(path)
+        if name in d.files:
+            raise OSError("Already exists:", name)
+        newdir = self.Directory()
+        newdir.grant(self.user, read | write)
+        d.files[name] = newdir
+
+    def remove(self, path):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        path, name = posixpath.split(path)
+        d = self.getwdir(path)
+        if name not in d.files:
+            raise OSError("Not exists:", name)
+        f = d.files[name]
+        if f.type == 'd':
+            raise OSError('Is a directory:', name)
+        del d.files[name]
+
+    def rmdir(self, path):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        path, name = posixpath.split(path)
+        d = self.getwdir(path)
+        if name not in d.files:
+            raise OSError("Not exists:", name)
+        f = d.files[name]
+        if f.type != 'd':
+            raise OSError('Is not a directory:', name)
+        del d.files[name]
+
+    def rename(self, old, new):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        oldpath, oldname = posixpath.split(old)
+        newpath, newname = posixpath.split(new)
+
+        olddir = self.getwdir(oldpath)
+        newdir = self.getwdir(newpath)
+
+        if oldname not in olddir.files:
+            raise OSError("Not exists:", oldname)
+        if newname in newdir.files:
+            raise OSError("Already exists:", newname)
+
+        newdir.files[newname] = olddir.files[oldname]
+        del olddir.files[oldname]
+
+    def writefile(self, path, instream, start=None, end=None, append=False):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        path, name = posixpath.split(path)
+        d = self.getdir(path)
+        f = d.files.get(name)
+        if f is None:
+            d = self.getwdir(path)
+            f = d.files[name] = self.File()
+            f.grant(self.user, read | write)
+        elif f.type != 'f':
+            raise OSError("Can't overwrite a directory")
+
+        if not f.accessable(self.user, write):
+            raise OSError("Permission denied")
+
+        if append:
+            f.data += instream.read()
+        else:
+
+            if start:
+                if start < 0:
+                    raise ValueError("Negative starting file position")
+                prefix = f.data[:start]
+                if len(prefix) < start:
+                    prefix += '\0' * (start - len(prefix))
+            else:
+                prefix = ''
+                start=0
+
+            if end:
+                if end < 0:
+                    raise ValueError("Negative ending file position")
+                l = end - start
+                newdata = instream.read(l)
+
+                f.data = prefix+newdata+f.data[start+len(newdata):]
+            else:
+                f.data = prefix + instream.read()
+
+    def writable(self, path):
+        "See zope.server.interfaces.ftp.IFileSystem"
+        path, name = posixpath.split(path)
+        try:
+            d = self.getdir(path)
+        except OSError:
+            return False
+        if name not in d:
+            return d.accessable(self.user, write)
+        f = d[name]
+        return f.type == 'f' and f.accessable(self.user, write)
+
+## class DemoFileSystemAccess(object):
+##     __doc__ = IFileSystemAccess.__doc__
+
+##     implements(IFileSystemAccess)
+
+##     def __init__(self, files, users):
+##         self.files = files
+##         self.users = users
+
+##     def authenticate(self, credentials):
+##         "See zope.server.interfaces.ftp.IFileSystemAccess"
+##         user, password = credentials
+##         if user != 'anonymous':
+##             if self.users.get(user) != password:
+##                 raise Unauthorized
+##         return user
+
+##     def open(self, credentials):
+##         "See zope.server.interfaces.ftp.IFileSystemAccess"
+##         user = self.authenticate(credentials)
+##         return DemoFileSystem(self.files, user)


Property changes on: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/demofs.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/fstests.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/fstests.py	2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/fstests.py	2005-10-06 20:22:37 UTC (rev 38826)
@@ -0,0 +1,156 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Abstract file-system tests
+
+$Id: fstests.py 26559 2004-07-15 21:22:32Z srichter $
+"""
+from StringIO import StringIO
+from zope.interface.verify import verifyObject
+from zope.app.twisted.interfaces import IFileSystem
+
+class FileSystemTests(object):
+    """Tests of a readable filesystem
+    """
+
+    filesystem = None
+    dir_name  = '/dir'
+    file_name = '/dir/file.txt'
+    unwritable_filename = '/dir/protected.txt'
+    dir_contents = ['file.txt', 'protected.txt']
+    file_contents = 'Lengthen your stride'
+
+    def test_type(self):
+        self.assertEqual(self.filesystem.type(self.dir_name), 'd')
+        self.assertEqual(self.filesystem.type(self.file_name), 'f')
+
+
+    def test_names(self):
+        lst = self.filesystem.names(self.dir_name)
+        lst.sort()
+        self.assertEqual(lst, self.dir_contents)
+
+    def test_readfile(self):
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, s)
+        self.assertEqual(s.getvalue(), self.file_contents)
+
+
+    def testReadPartOfFile(self):
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, s, 2)
+        self.assertEqual(s.getvalue(), self.file_contents[2:])
+
+
+    def testReadPartOfFile2(self):
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, s, 1, 5)
+        self.assertEqual(s.getvalue(), self.file_contents[1:5])
+
+    def test_IFileSystemInterface(self):
+        verifyObject(IFileSystem, self.filesystem)
+
+    def testRemove(self):
+        self.filesystem.remove(self.file_name)
+        self.failIf(self.filesystem.type(self.file_name))
+
+
+    def testMkdir(self):
+        path = self.dir_name + '/x'
+        self.filesystem.mkdir(path)
+        self.assertEqual(self.filesystem.type(path), 'd')
+
+    def testRmdir(self):
+        self.filesystem.remove(self.file_name)
+        self.filesystem.rmdir(self.dir_name)
+        self.failIf(self.filesystem.type(self.dir_name))
+
+
+    def testRename(self):
+        self.filesystem.rename(self.file_name, self.file_name + '.bak')
+        self.assertEqual(self.filesystem.type(self.file_name), None)
+        self.assertEqual(self.filesystem.type(self.file_name + '.bak'), 'f')
+
+
+    def testWriteFile(self):
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, s)
+        self.assertEqual(s.getvalue(), self.file_contents)
+
+        data = 'Always ' + self.file_contents
+        s = StringIO(data)
+        self.filesystem.writefile(self.file_name, s)
+
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, s)
+        self.assertEqual(s.getvalue(), data)
+
+
+    def testAppendToFile(self):
+        data = ' again'
+        s = StringIO(data)
+        self.filesystem.writefile(self.file_name, s, append=True)
+
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, s)
+        self.assertEqual(s.getvalue(), self.file_contents + data)
+
+    def testWritePartOfFile(self):
+        data = '123'
+        s = StringIO(data)
+        self.filesystem.writefile(self.file_name, s, 3, 6)
+
+        expect = self.file_contents[:3] + data + self.file_contents[6:]
+
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, s)
+        self.assertEqual(s.getvalue(), expect)
+
+    def testWritePartOfFile_and_truncate(self):
+        data = '123'
+        s = StringIO(data)
+        self.filesystem.writefile(self.file_name, s, 3)
+
+        expect = self.file_contents[:3] + data
+
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, s)
+        self.assertEqual(s.getvalue(), expect)
+
+    def testWriteBeyondEndOfFile(self):
+        partlen = len(self.file_contents) - 6
+        data = 'daylight savings'
+        s = StringIO(data)
+        self.filesystem.writefile(self.file_name, s, partlen)
+
+        expect = self.file_contents[:partlen] + data
+
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, s)
+        self.assertEqual(s.getvalue(), expect)
+
+
+    def testWriteNewFile(self):
+        s = StringIO(self.file_contents)
+        self.filesystem.writefile(self.file_name + '.new', s)
+
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, s)
+        self.assertEqual(s.getvalue(), self.file_contents)
+
+
+    def test_writable(self):
+        self.failIf(self.filesystem.writable(self.dir_name))
+        self.failIf(self.filesystem.writable(self.unwritable_filename))
+        self.failUnless(self.filesystem.writable(self.file_name))
+        self.failUnless(self.filesystem.writable(self.file_name+'1'))


Property changes on: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/fstests.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_demofs.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_demofs.py	2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_demofs.py	2005-10-06 20:22:37 UTC (rev 38826)
@@ -0,0 +1,40 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test the Demo Filesystem implementation.
+
+$Id: test_demofs.py 26476 2004-07-13 16:59:11Z srichter $
+"""
+import demofs
+from unittest import TestCase, TestSuite, main, makeSuite
+from fstests import FileSystemTests
+from StringIO import StringIO
+
+class Test(FileSystemTests, TestCase):
+
+    def setUp(self):
+        root = demofs.Directory()
+        root.grant('bob', demofs.write)
+        fs = self.filesystem = demofs.DemoFileSystem(root, 'bob')
+        fs.mkdir(self.dir_name)
+        fs.writefile(self.file_name, StringIO(self.file_contents))
+        fs.writefile(self.unwritable_filename, StringIO("save this"))
+        fs.get(self.unwritable_filename).revoke('bob', demofs.write)
+
+def test_suite():
+    return TestSuite((
+        makeSuite(Test),
+        ))
+
+if __name__=='__main__':
+    main(defaultTest='test_suite')


Property changes on: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_demofs.py
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_ftpserver.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_ftpserver.py	2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_ftpserver.py	2005-10-06 20:22:37 UTC (rev 38826)
@@ -26,8 +26,8 @@
 
 from zope.app.twisted.ftp.server import FTPFactory
 
-from zope.app.twisted.tests.test_publisher import RequestFactory
-from zope.app.twisted.tests import demofs
+from test_publisher import RequestFactory
+import demofs
 
 class TestServerSetup(TestCase):
 

Added: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_publisher.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_publisher.py	2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_publisher.py	2005-10-06 20:22:37 UTC (rev 38826)
@@ -0,0 +1,123 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test the FTP publisher.
+
+$Id: test_publisher.py 26559 2004-07-15 21:22:32Z srichter $
+"""
+import demofs
+from unittest import TestCase, TestSuite, main, makeSuite
+from fstests import FileSystemTests
+from StringIO import StringIO
+from zope.publisher.publish import mapply
+from zope.app.twisted.ftp.utils import PublisherFileSystem
+
+class DemoFileSystem(demofs.DemoFileSystem):
+
+    def rename(self, path, old, new):
+        return demofs.DemoFileSystem.rename(
+            self, "%s/%s" % (path, old), "%s/%s" % (path, new))
+
+class Publication(object):
+
+    def __init__(self, root):
+        self.root = root
+
+    def beforeTraversal(self, request):
+        pass
+
+    def getApplication(self, request):
+        return self.root
+
+    def afterTraversal(self, request, ob):
+        pass
+
+    def callObject(self, request, ob):
+        command = getattr(ob, request.env['command'])
+        if 'name' in request.env:
+            request.env['path'] += "/" + request.env['name']
+        return mapply(command, request = request.env)
+
+    def afterCall(self, request, ob):
+        pass
+
+    def endRequest(self, request, ob):
+        pass
+
+    def handleException(self, object, request, info, retry_allowed=True):
+        request.response._exc = info[:2]
+
+
+class Request(object):
+
+    def __init__(self, input, env):
+        self.env = env
+        self.response = Response()
+        self.user = env['credentials']
+        del env['credentials']
+
+    def processInputs(self):
+        pass
+
+    def traverse(self, root):
+        root.user = self.user
+        return root
+
+    def close(self):
+        pass
+
+class Response(object):
+
+    _exc = _body = None
+
+    def setResult(self, result):
+        self._body = result
+
+    def outputBody(self):
+        pass
+
+    def getResult(self):
+        if self._exc:
+            raise self._exc[0], self._exc[1]
+        return self._body
+
+class RequestFactory(object):
+
+    def __init__(self, root):
+        self.pub = Publication(root)
+
+    def __call__(self, input, env):
+        r = Request(input, env)
+        r.publication = self.pub
+        return r
+
+class TestPublisherFileSystem(FileSystemTests, TestCase):
+
+    def setUp(self):
+        root = demofs.Directory()
+        root.grant('bob', demofs.write)
+        fs = DemoFileSystem(root, 'bob')
+        fs.mkdir(self.dir_name)
+        fs.writefile(self.file_name, StringIO(self.file_contents))
+        fs.writefile(self.unwritable_filename, StringIO("save this"))
+        fs.get(self.unwritable_filename).revoke('bob', demofs.write)
+
+        self.filesystem = PublisherFileSystem('bob', RequestFactory(fs))
+
+def test_suite():
+    return TestSuite((
+        makeSuite(TestPublisherFileSystem),
+        ))
+
+if __name__=='__main__':
+    main(defaultTest='test_suite')


Property changes on: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/ftp/tests/test_publisher.py
___________________________________________________________________
Name: svn:eol-style
   + native

Deleted: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/demofs.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/demofs.py	2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/demofs.py	2005-10-06 20:22:37 UTC (rev 38826)
@@ -1,310 +0,0 @@
-##############################################################################
-# Copyright (c) 2003 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-##############################################################################
-"""Demo file-system implementation, for testing
-
-$Id: demofs.py 27459 2004-09-07 01:45:52Z shane $
-"""
-import posixpath
-from zope.security.interfaces import Unauthorized
-from zope.app.twisted.interfaces import IFileSystem
-## from zope.server.interfaces.ftp import IFileSystemAccess
-from zope.interface import implements
-
-execute = 1
-read = 2
-write = 4
-
-class File(object):
-    type = 'f'
-    modified=None
-
-    def __init__(self):
-        self.access = {'anonymous': read}
-
-    def accessable(self, user, access=read):
-        return (user == 'root'
-                or (self.access.get(user, 0) & access)
-                or (self.access.get('anonymous', 0) & access)
-                )
-
-    def grant(self, user, access):
-        self.access[user] = self.access.get(user, 0) | access
-
-    def revoke(self, user, access):
-        self.access[user] = self.access.get(user, 0) ^ access
-
-class Directory(File):
-
-    type = 'd'
-
-    def __init__(self):
-        super(Directory, self).__init__()
-        self.files = {}
-
-    def get(self, name, default=None):
-        return self.files.get(name, default)
-
-    def __getitem__(self, name):
-        return self.files[name]
-
-    def __setitem__(self, name, v):
-        self.files[name] = v
-
-    def __delitem__(self, name):
-        del self.files[name]
-
-    def __contains__(self, name):
-        return name in self.files
-
-    def __iter__(self):
-        return iter(self.files)
-
-class DemoFileSystem(object):
-    __doc__ = IFileSystem.__doc__
-
-    implements(IFileSystem)
-
-    File = File
-    Directory = Directory
-
-    def __init__(self, files, user=''):
-        self.files = files
-        self.user = user
-
-    def get(self, path, default=None):
-
-        while path.startswith('/'):
-            path = path[1:]
-
-        d = self.files
-        if path:
-            for name in path.split('/'):
-                if d.type is not 'd':
-                    return default
-                if not d.accessable(self.user):
-                    raise Unauthorized
-                d = d.get(name)
-                if d is None:
-                    break
-
-        return d
-
-    def getany(self, path):
-        d = self.get(path)
-        if d is None:
-            raise OSError("No such file or directory:", path)
-        return d
-
-    def getdir(self, path):
-        d = self.getany(path)
-        if d.type != 'd':
-            raise OSError("Not a directory:", path)
-        return d
-
-    def getfile(self, path):
-        d = self.getany(path)
-        if d.type != 'f':
-            raise OSError("Not a file:", path)
-        return d
-
-    def getwdir(self, path):
-        d = self.getdir(path)
-        if not d.accessable(self.user, write):
-            raise OSError("Permission denied")
-        return d
-
-    def type(self, path):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        f = self.get(path)
-        return getattr(f, 'type', None)
-
-    def names(self, path, filter=None):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        f = list(self.getdir(path))
-        if filter is not None:
-            f = [name for name in f if filter(name)]
-
-        return f
-
-    def _lsinfo(self, name, file):
-        info = {
-            'type': file.type,
-            'name': name,
-            'group_read': file.accessable(self.user, read),
-            'group_write': file.accessable(self.user, write),
-            }
-        if file.type == 'f':
-            info['size'] = len(file.data)
-        if file.modified is not None:
-            info['mtime'] = file.modified
-
-        return info
-
-    def ls(self, path, filter=None):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        f = self.getdir(path)
-        if filter is None:
-            return [self._lsinfo(name, f.files[name])
-                    for name in f
-                    ]
-
-        return [self._lsinfo(name, f.files[name])
-                for name in f
-                if filter(name)]
-
-    def readfile(self, path, outstream, start=0, end=None):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        f = self.getfile(path)
-
-        data = f.data
-        if end is not None:
-            data = data[:end]
-        if start:
-            data = data[start:]
-
-        outstream.write(data)
-
-    def lsinfo(self, path):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        f = self.getany(path)
-        return self._lsinfo(posixpath.split(path)[1], f)
-
-    def mtime(self, path):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        f = self.getany(path)
-        return f.modified
-
-    def size(self, path):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        f = self.getany(path)
-        return len(getattr(f, 'data', ''))
-
-    def mkdir(self, path):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        path, name = posixpath.split(path)
-        d = self.getwdir(path)
-        if name in d.files:
-            raise OSError("Already exists:", name)
-        newdir = self.Directory()
-        newdir.grant(self.user, read | write)
-        d.files[name] = newdir
-
-    def remove(self, path):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        path, name = posixpath.split(path)
-        d = self.getwdir(path)
-        if name not in d.files:
-            raise OSError("Not exists:", name)
-        f = d.files[name]
-        if f.type == 'd':
-            raise OSError('Is a directory:', name)
-        del d.files[name]
-
-    def rmdir(self, path):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        path, name = posixpath.split(path)
-        d = self.getwdir(path)
-        if name not in d.files:
-            raise OSError("Not exists:", name)
-        f = d.files[name]
-        if f.type != 'd':
-            raise OSError('Is not a directory:', name)
-        del d.files[name]
-
-    def rename(self, old, new):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        oldpath, oldname = posixpath.split(old)
-        newpath, newname = posixpath.split(new)
-
-        olddir = self.getwdir(oldpath)
-        newdir = self.getwdir(newpath)
-
-        if oldname not in olddir.files:
-            raise OSError("Not exists:", oldname)
-        if newname in newdir.files:
-            raise OSError("Already exists:", newname)
-
-        newdir.files[newname] = olddir.files[oldname]
-        del olddir.files[oldname]
-
-    def writefile(self, path, instream, start=None, end=None, append=False):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        path, name = posixpath.split(path)
-        d = self.getdir(path)
-        f = d.files.get(name)
-        if f is None:
-            d = self.getwdir(path)
-            f = d.files[name] = self.File()
-            f.grant(self.user, read | write)
-        elif f.type != 'f':
-            raise OSError("Can't overwrite a directory")
-
-        if not f.accessable(self.user, write):
-            raise OSError("Permission denied")
-
-        if append:
-            f.data += instream.read()
-        else:
-
-            if start:
-                if start < 0:
-                    raise ValueError("Negative starting file position")
-                prefix = f.data[:start]
-                if len(prefix) < start:
-                    prefix += '\0' * (start - len(prefix))
-            else:
-                prefix = ''
-                start=0
-
-            if end:
-                if end < 0:
-                    raise ValueError("Negative ending file position")
-                l = end - start
-                newdata = instream.read(l)
-
-                f.data = prefix+newdata+f.data[start+len(newdata):]
-            else:
-                f.data = prefix + instream.read()
-
-    def writable(self, path):
-        "See zope.server.interfaces.ftp.IFileSystem"
-        path, name = posixpath.split(path)
-        try:
-            d = self.getdir(path)
-        except OSError:
-            return False
-        if name not in d:
-            return d.accessable(self.user, write)
-        f = d[name]
-        return f.type == 'f' and f.accessable(self.user, write)
-
-## class DemoFileSystemAccess(object):
-##     __doc__ = IFileSystemAccess.__doc__
-
-##     implements(IFileSystemAccess)
-
-##     def __init__(self, files, users):
-##         self.files = files
-##         self.users = users
-
-##     def authenticate(self, credentials):
-##         "See zope.server.interfaces.ftp.IFileSystemAccess"
-##         user, password = credentials
-##         if user != 'anonymous':
-##             if self.users.get(user) != password:
-##                 raise Unauthorized
-##         return user
-
-##     def open(self, credentials):
-##         "See zope.server.interfaces.ftp.IFileSystemAccess"
-##         user = self.authenticate(credentials)
-##         return DemoFileSystem(self.files, user)

Deleted: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/fstests.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/fstests.py	2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/fstests.py	2005-10-06 20:22:37 UTC (rev 38826)
@@ -1,156 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Abstract file-system tests
-
-$Id: fstests.py 26559 2004-07-15 21:22:32Z srichter $
-"""
-from StringIO import StringIO
-from zope.interface.verify import verifyObject
-from zope.app.twisted.interfaces import IFileSystem
-
-class FileSystemTests(object):
-    """Tests of a readable filesystem
-    """
-
-    filesystem = None
-    dir_name  = '/dir'
-    file_name = '/dir/file.txt'
-    unwritable_filename = '/dir/protected.txt'
-    dir_contents = ['file.txt', 'protected.txt']
-    file_contents = 'Lengthen your stride'
-
-    def test_type(self):
-        self.assertEqual(self.filesystem.type(self.dir_name), 'd')
-        self.assertEqual(self.filesystem.type(self.file_name), 'f')
-
-
-    def test_names(self):
-        lst = self.filesystem.names(self.dir_name)
-        lst.sort()
-        self.assertEqual(lst, self.dir_contents)
-
-    def test_readfile(self):
-        s = StringIO()
-        self.filesystem.readfile(self.file_name, s)
-        self.assertEqual(s.getvalue(), self.file_contents)
-
-
-    def testReadPartOfFile(self):
-        s = StringIO()
-        self.filesystem.readfile(self.file_name, s, 2)
-        self.assertEqual(s.getvalue(), self.file_contents[2:])
-
-
-    def testReadPartOfFile2(self):
-        s = StringIO()
-        self.filesystem.readfile(self.file_name, s, 1, 5)
-        self.assertEqual(s.getvalue(), self.file_contents[1:5])
-
-    def test_IFileSystemInterface(self):
-        verifyObject(IFileSystem, self.filesystem)
-
-    def testRemove(self):
-        self.filesystem.remove(self.file_name)
-        self.failIf(self.filesystem.type(self.file_name))
-
-
-    def testMkdir(self):
-        path = self.dir_name + '/x'
-        self.filesystem.mkdir(path)
-        self.assertEqual(self.filesystem.type(path), 'd')
-
-    def testRmdir(self):
-        self.filesystem.remove(self.file_name)
-        self.filesystem.rmdir(self.dir_name)
-        self.failIf(self.filesystem.type(self.dir_name))
-
-
-    def testRename(self):
-        self.filesystem.rename(self.file_name, self.file_name + '.bak')
-        self.assertEqual(self.filesystem.type(self.file_name), None)
-        self.assertEqual(self.filesystem.type(self.file_name + '.bak'), 'f')
-
-
-    def testWriteFile(self):
-        s = StringIO()
-        self.filesystem.readfile(self.file_name, s)
-        self.assertEqual(s.getvalue(), self.file_contents)
-
-        data = 'Always ' + self.file_contents
-        s = StringIO(data)
-        self.filesystem.writefile(self.file_name, s)
-
-        s = StringIO()
-        self.filesystem.readfile(self.file_name, s)
-        self.assertEqual(s.getvalue(), data)
-
-
-    def testAppendToFile(self):
-        data = ' again'
-        s = StringIO(data)
-        self.filesystem.writefile(self.file_name, s, append=True)
-
-        s = StringIO()
-        self.filesystem.readfile(self.file_name, s)
-        self.assertEqual(s.getvalue(), self.file_contents + data)
-
-    def testWritePartOfFile(self):
-        data = '123'
-        s = StringIO(data)
-        self.filesystem.writefile(self.file_name, s, 3, 6)
-
-        expect = self.file_contents[:3] + data + self.file_contents[6:]
-
-        s = StringIO()
-        self.filesystem.readfile(self.file_name, s)
-        self.assertEqual(s.getvalue(), expect)
-
-    def testWritePartOfFile_and_truncate(self):
-        data = '123'
-        s = StringIO(data)
-        self.filesystem.writefile(self.file_name, s, 3)
-
-        expect = self.file_contents[:3] + data
-
-        s = StringIO()
-        self.filesystem.readfile(self.file_name, s)
-        self.assertEqual(s.getvalue(), expect)
-
-    def testWriteBeyondEndOfFile(self):
-        partlen = len(self.file_contents) - 6
-        data = 'daylight savings'
-        s = StringIO(data)
-        self.filesystem.writefile(self.file_name, s, partlen)
-
-        expect = self.file_contents[:partlen] + data
-
-        s = StringIO()
-        self.filesystem.readfile(self.file_name, s)
-        self.assertEqual(s.getvalue(), expect)
-
-
-    def testWriteNewFile(self):
-        s = StringIO(self.file_contents)
-        self.filesystem.writefile(self.file_name + '.new', s)
-
-        s = StringIO()
-        self.filesystem.readfile(self.file_name, s)
-        self.assertEqual(s.getvalue(), self.file_contents)
-
-
-    def test_writable(self):
-        self.failIf(self.filesystem.writable(self.dir_name))
-        self.failIf(self.filesystem.writable(self.unwritable_filename))
-        self.failUnless(self.filesystem.writable(self.file_name))
-        self.failUnless(self.filesystem.writable(self.file_name+'1'))

Deleted: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_demofs.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_demofs.py	2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_demofs.py	2005-10-06 20:22:37 UTC (rev 38826)
@@ -1,40 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2003 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Test the Demo Filesystem implementation.
-
-$Id: test_demofs.py 26476 2004-07-13 16:59:11Z srichter $
-"""
-import demofs
-from unittest import TestCase, TestSuite, main, makeSuite
-from fstests import FileSystemTests
-from StringIO import StringIO
-
-class Test(FileSystemTests, TestCase):
-
-    def setUp(self):
-        root = demofs.Directory()
-        root.grant('bob', demofs.write)
-        fs = self.filesystem = demofs.DemoFileSystem(root, 'bob')
-        fs.mkdir(self.dir_name)
-        fs.writefile(self.file_name, StringIO(self.file_contents))
-        fs.writefile(self.unwritable_filename, StringIO("save this"))
-        fs.get(self.unwritable_filename).revoke('bob', demofs.write)
-
-def test_suite():
-    return TestSuite((
-        makeSuite(Test),
-        ))
-
-if __name__=='__main__':
-    main(defaultTest='test_suite')

Deleted: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_ftp.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_ftp.py	2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_ftp.py	2005-10-06 20:22:37 UTC (rev 38826)
@@ -1,28 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2004 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Doc tests for the FTP server.
-
-$Id$
-"""
-import unittest
-from zope.testing.doctestunit import DocTestSuite
-
-def test_suite():
-    pass
-    #return unittest.TestSuite((
-    #    DocTestSuite('zope.app.twisted.ftp'),
-    #    ))
-
-if __name__ == '__main__':
-    unittest.main(defaultTest='test_suite')

Deleted: Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_publisher.py
===================================================================
--- Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_publisher.py	2005-10-06 20:08:54 UTC (rev 38825)
+++ Zope3/branches/srichter-twisted-integration2/src/zope/app/twisted/tests/test_publisher.py	2005-10-06 20:22:37 UTC (rev 38826)
@@ -1,123 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2003 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Test the FTP publisher.
-
-$Id: test_publisher.py 26559 2004-07-15 21:22:32Z srichter $
-"""
-import demofs
-from unittest import TestCase, TestSuite, main, makeSuite
-from fstests import FileSystemTests
-from StringIO import StringIO
-from zope.publisher.publish import mapply
-from zope.app.twisted.ftp.utils import PublisherFileSystem
-
-class DemoFileSystem(demofs.DemoFileSystem):
-
-    def rename(self, path, old, new):
-        return demofs.DemoFileSystem.rename(
-            self, "%s/%s" % (path, old), "%s/%s" % (path, new))
-
-class Publication(object):
-
-    def __init__(self, root):
-        self.root = root
-
-    def beforeTraversal(self, request):
-        pass
-
-    def getApplication(self, request):
-        return self.root
-
-    def afterTraversal(self, request, ob):
-        pass
-
-    def callObject(self, request, ob):
-        command = getattr(ob, request.env['command'])
-        if 'name' in request.env:
-            request.env['path'] += "/" + request.env['name']
-        return mapply(command, request = request.env)
-
-    def afterCall(self, request, ob):
-        pass
-
-    def endRequest(self, request, ob):
-        pass
-
-    def handleException(self, object, request, info, retry_allowed=True):
-        request.response._exc = info[:2]
-
-
-class Request(object):
-
-    def __init__(self, input, env):
-        self.env = env
-        self.response = Response()
-        self.user = env['credentials']
-        del env['credentials']
-
-    def processInputs(self):
-        pass
-
-    def traverse(self, root):
-        root.user = self.user
-        return root
-
-    def close(self):
-        pass
-
-class Response(object):
-
-    _exc = _body = None
-
-    def setResult(self, result):
-        self._body = result
-
-    def outputBody(self):
-        pass
-
-    def getResult(self):
-        if self._exc:
-            raise self._exc[0], self._exc[1]
-        return self._body
-
-class RequestFactory(object):
-
-    def __init__(self, root):
-        self.pub = Publication(root)
-
-    def __call__(self, input, env):
-        r = Request(input, env)
-        r.publication = self.pub
-        return r
-
-class TestPublisherFileSystem(FileSystemTests, TestCase):
-
-    def setUp(self):
-        root = demofs.Directory()
-        root.grant('bob', demofs.write)
-        fs = DemoFileSystem(root, 'bob')
-        fs.mkdir(self.dir_name)
-        fs.writefile(self.file_name, StringIO(self.file_contents))
-        fs.writefile(self.unwritable_filename, StringIO("save this"))
-        fs.get(self.unwritable_filename).revoke('bob', demofs.write)
-
-        self.filesystem = PublisherFileSystem('bob', RequestFactory(fs))
-
-def test_suite():
-    return TestSuite((
-        makeSuite(TestPublisherFileSystem),
-        ))
-
-if __name__=='__main__':
-    main(defaultTest='test_suite')



More information about the Zope3-Checkins mailing list