[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/VFS/tests - PosixFilesystemTests.py:1.2 ReadFilesystemTests.py:1.2 WriteFilesystemTests.py:1.2 __init__.py:1.2 testOSFileSystem.py:1.2 testPublisherFilesystem.py:1.2

Jim Fulton jim@zope.com
Mon, 10 Jun 2002 19:30:08 -0400


Update of /cvs-repository/Zope3/lib/python/Zope/Server/VFS/tests
In directory cvs.zope.org:/tmp/cvs-serv20468/lib/python/Zope/Server/VFS/tests

Added Files:
	PosixFilesystemTests.py ReadFilesystemTests.py 
	WriteFilesystemTests.py __init__.py testOSFileSystem.py 
	testPublisherFilesystem.py 
Log Message:
Merged Zope-3x-branch into newly forked Zope3 CVS Tree.

=== Zope3/lib/python/Zope/Server/VFS/tests/PosixFilesystemTests.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+
+import stat
+
+from Interface.Verify import verifyClass
+from Zope.Server.VFS.IPosixFileSystem import IPosixFileSystem
+
+from WriteFilesystemTests import WriteFilesystemTests
+
+
+class PosixFilesystemTests (WriteFilesystemTests):
+    """Tests of a writable and readable POSIX-compliant filesystem
+    """
+
+    def testChmod(self):
+        old_mode = self.filesystem.stat(self.file_name)[stat.ST_MODE]
+        new_mode = old_mode ^ 0444
+        self.filesystem.chmod(self.file_name, new_mode)
+        check_mode = self.filesystem.stat(self.file_name)[stat.ST_MODE]
+        self.assertEqual(check_mode, new_mode)
+
+
+    def testChown(self):
+        self.filesystem.chown(self.file_name, 500, 500)
+        s = self.filesystem.stat(self.file_name)
+        self.assertEqual(s[stat.ST_UID], 500)
+        self.assertEqual(s[stat.ST_GID], 500)
+
+
+    def testMakeLink(self):
+        self.filesystem.link(self.file_name, self.file_name + '.linked')
+        self.failUnless(self.filesystem.exists(self.file_name + '.linked'))
+        # Another test should test whether writing to one file
+        # changes the other.
+
+
+    def testMakeFifo(self):
+        path = self.dir_name + '/fifo'
+        self.filesystem.mkfifo(path)
+        self.failUnless(self.filesystem.exists(path))
+        # Another test should test the behavior of the fifo.
+
+
+    def testMakeSymlink(self):
+        self.filesystem.symlink(self.file_name, self.file_name + '.symlink')
+        self.failUnless(self.filesystem.exists(self.file_name + '.symlink'))
+        # Another test should test whether writing to one file
+        # changes the other.
+
+
+    def testPosixInterface(self):
+        class_ = self.filesystem.__class__
+        self.failUnless(
+            IPosixFileSystem.isImplementedByInstancesOf(class_))
+        self.failUnless(verifyClass(IPosixFileSystem, class_))
+
+
+def test_suite():
+    loader = unittest.TestLoader()
+    return loader.loadTestsFromTestCase(OSFileSystemTest)
+
+if __name__=='__main__':
+    unittest.TextTestRunner().run( test_suite() )


=== Zope3/lib/python/Zope/Server/VFS/tests/ReadFilesystemTests.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+
+import stat
+from StringIO import StringIO
+
+from Interface.Verify import verifyClass
+from Zope.Server.VFS.IReadFileSystem import IReadFileSystem
+
+
+class ReadFilesystemTests:
+    """Tests of a readable filesystem
+    """
+
+    filesystem = None
+    dir_name  = '/dir'
+    file_name = '/dir/file.txt'
+    dir_contents = ['file.txt']
+    file_contents = 'Lengthen your stride'
+
+    check_exceptions = 1
+
+
+    def testExists(self):
+        self.failUnless(self.filesystem.exists(self.dir_name))
+        self.failUnless(self.filesystem.exists(self.file_name))
+
+
+    def testIsDir(self):
+        self.failUnless(self.filesystem.isdir(self.dir_name))
+        self.failUnless(not self.filesystem.isdir(self.file_name))
+
+
+    def testIsFile(self):
+        self.failUnless(self.filesystem.isfile(self.file_name))
+        self.failUnless(not self.filesystem.isfile(self.dir_name))
+
+
+    def testListDir(self):
+        lst = self.filesystem.listdir(self.dir_name, 0)
+        lst.sort()
+        self.assertEqual(lst, self.dir_contents)
+
+
+    def testReadFile(self):
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, 'rb', s)
+        self.assertEqual(s.getvalue(), self.file_contents)
+
+
+    def testReadPartOfFile(self):
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, 'rb', s, 2)
+        self.assertEqual(s.getvalue(), self.file_contents[2:])
+
+
+    def testReadPartOfFile2(self):
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, 'rb', s, 1, 5)
+        self.assertEqual(s.getvalue(), self.file_contents[1:5])
+
+
+    def testReadInterface(self):
+        class_ = self.filesystem.__class__
+        self.failUnless(
+            IReadFileSystem.isImplementedByInstancesOf(class_))
+        self.failUnless(verifyClass(IReadFileSystem, class_))
+


=== Zope3/lib/python/Zope/Server/VFS/tests/WriteFilesystemTests.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+
+from StringIO import StringIO
+
+from Interface.Verify import verifyClass
+from Zope.Server.VFS.IWriteFileSystem import IWriteFileSystem
+
+from ReadFilesystemTests import ReadFilesystemTests
+
+
+class WriteFilesystemTests (ReadFilesystemTests):
+    """Tests of a writable and readable filesystem
+    """
+
+    def testRemove(self):
+        self.failIf(not self.filesystem.exists(self.file_name))
+        self.filesystem.remove(self.file_name)
+        self.failIf(self.filesystem.exists(self.file_name))
+
+
+    def testMkdir(self):
+        path = self.dir_name + '/x'
+        self.filesystem.mkdir(path)
+        self.failUnless(self.filesystem.exists(path))
+        self.failUnless(self.filesystem.isdir(path))
+
+
+    def testRmdir(self):
+        self.failIf(not self.filesystem.exists(self.dir_name))
+        self.filesystem.remove(self.file_name)
+        self.filesystem.rmdir(self.dir_name)
+        self.failIf(self.filesystem.exists(self.dir_name))
+
+
+    def testRename(self):
+        self.filesystem.rename(self.file_name, self.file_name + '.bak')
+        self.failIf(self.filesystem.exists(self.file_name))
+        self.failIf(not self.filesystem.exists(self.file_name + '.bak'))
+
+
+    def testWriteFile(self):
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, 'rb', s)
+        self.assertEqual(s.getvalue(), self.file_contents)
+
+        data = 'Always ' + self.file_contents
+        s = StringIO(data)
+        self.filesystem.writefile(self.file_name, 'wb', s)
+
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, 'rb', s)
+        self.assertEqual(s.getvalue(), data)
+
+
+    def testAppendToFile(self):
+        data = ' again'
+        s = StringIO(data)
+        self.filesystem.writefile(self.file_name, 'ab', s)
+
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, 'rb', s)
+        self.assertEqual(s.getvalue(), self.file_contents + data)
+        
+
+    def testWritePartOfFile(self):
+        data = '123'
+        s = StringIO(data)
+        self.filesystem.writefile(self.file_name, 'r+b', s, 3)
+
+        expect = self.file_contents[:3] + data + self.file_contents[6:]
+
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, 'rb', s)
+        self.assertEqual(s.getvalue(), expect)
+
+
+    def testWriteBeyondEndOfFile(self):
+        partlen = len(self.file_contents) - 6
+        data = 'daylight savings'
+        s = StringIO(data)
+        self.filesystem.writefile(self.file_name, 'r+b', s, partlen)
+
+        expect = self.file_contents[:partlen] + data
+
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, 'rb', s)
+        self.assertEqual(s.getvalue(), expect)
+
+
+    def testWriteNewFile(self):
+        s = StringIO(self.file_contents)
+        self.filesystem.writefile(self.file_name + '.new', 'wb', s)
+
+        s = StringIO()
+        self.filesystem.readfile(self.file_name, 'rb', s)
+        self.assertEqual(s.getvalue(), self.file_contents)
+
+
+    def testCheckWritable(self):
+        if self.check_exceptions:
+            # Can't overwrite a directory.
+            self.assertRaises(
+                IOError, self.filesystem.check_writable, self.dir_name)
+        # Can overwrite a file.
+        try:
+            self.filesystem.check_writable(self.file_name)
+        except IOError, v:
+            self.fail('%s should be writable. (%s)' % (self.file_name, v))
+
+
+    def testWriteInterface(self):
+        class_ = self.filesystem.__class__
+        self.failUnless(
+            IWriteFileSystem.isImplementedByInstancesOf(class_))
+        self.failUnless(verifyClass(IWriteFileSystem, class_))
+
+
+def test_suite():
+    loader = unittest.TestLoader()
+    return loader.loadTestsFromTestCase(OSFileSystemTest)
+
+if __name__=='__main__':
+    unittest.TextTestRunner().run( test_suite() )


=== Zope3/lib/python/Zope/Server/VFS/tests/__init__.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+


=== Zope3/lib/python/Zope/Server/VFS/tests/testOSFileSystem.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+
+import unittest
+import os
+import shutil
+import tempfile
+from StringIO import StringIO
+
+from Zope.Server.VFS.OSFileSystem import OSFileSystem
+
+from WriteFilesystemTests import WriteFilesystemTests
+
+
+def joinToRoot(root, name):
+    if name.startswith('/'):
+        name = name[1:]
+    return os.path.join(root, os.path.normpath(name))
+
+
+class OSFileSystemTests(unittest.TestCase, WriteFilesystemTests):
+    """This test is constructed in a way that it builds up a directory
+       structure, whcih is removed at the end.
+    """
+
+    filesystem_class = OSFileSystem
+    root = None
+
+    def setUp(self):
+        if self.root is None:
+            self.root = tempfile.mktemp()
+            self.filesystem = self.filesystem_class(self.root)
+
+        os.mkdir(self.root)
+        os.mkdir(joinToRoot(self.root, self.dir_name))
+        f = open(joinToRoot(self.root, self.file_name), 'w')
+        f.write(self.file_contents)
+        f.close()
+
+
+    def tearDown(self):
+
+        shutil.rmtree(self.root)
+
+
+    def testNormalize(self):
+
+        def norm(p):
+            return self.filesystem.normalize(p).replace(os.sep, '/')
+
+        self.assertEqual(norm('/foo/bar//'), '/foo/bar')
+        self.assertEqual(norm('/foo//bar'), '/foo/bar')
+        self.assertEqual(norm('///foo/bar'), '/foo/bar')
+        self.assertEqual(norm('///foo//bar////'), '/foo/bar')
+
+        self.assertEqual(norm('../foo/bar'), '/')
+        self.assertEqual(norm('..'), '/')
+        self.assertEqual(norm('/..'), '/')
+        self.assertEqual(norm('/foo/..'), '/')
+        self.assertEqual(norm('/foo/../bar'), '/bar')
+        self.assertEqual(norm('../../'), '/')
+
+        self.assertEqual(norm('///../foo/bar'), '/foo/bar')
+        self.assertEqual(norm('/foo/..///'), '/')
+        self.assertEqual(norm('///foo/..//bar'), '/bar')
+        self.assertEqual(norm('..///../'), '/')
+
+
+    def testTranslate(self):
+
+        self.assertEqual(self.filesystem.root, self.root)
+
+        self.assertEqual(self.filesystem.translate('/foo/'),
+                         os.path.join(self.root, 'foo'))
+        self.assertEqual(self.filesystem.translate('/foo/bar'),
+                         os.path.join(self.root, 'foo', 'bar'))
+        self.assertEqual(self.filesystem.translate('foo/bar'),
+                         os.path.join(self.root, 'foo', 'bar'))
+
+    def testStat(self):
+        stat = os.stat(joinToRoot(self.root, self.file_name))
+        self.assertEqual(self.filesystem.stat(self.file_name), stat)
+
+
+
+
+if 0 and os.name == 'posix':
+
+    from PosixFilesystemTests import PosixFilesystemTests
+
+    class OSPosixFilesystemTests(OSFileSystemTests, PosixFilesystemTests):
+
+        def testChown(self):
+            # Disable this test, since it won't work unless you're root.
+            return
+
+    OSFileSystemTests = OSPosixFilesystemTests
+
+
+
+def test_suite():
+    loader = unittest.TestLoader()
+    return loader.loadTestsFromTestCase(OSFileSystemTests)
+
+if __name__=='__main__':
+    unittest.TextTestRunner().run( test_suite() )


=== Zope3/lib/python/Zope/Server/VFS/tests/testPublisherFilesystem.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+
+import unittest
+from StringIO import StringIO
+
+from Zope.Server.VFS.PublisherFileSystem import PublisherFileSystem
+from Zope.Publisher.VFS.VFSRequest import VFSRequest
+from Zope.Publisher.DefaultPublication import DefaultPublication
+from Zope.Publisher.VFS.IVFSFilePublisher import IVFSFilePublisher
+from Zope.Publisher.VFS.IVFSDirectoryPublisher import IVFSDirectoryPublisher
+from Zope.Publisher.mapply import mapply
+
+from WriteFilesystemTests import WriteFilesystemTests
+
+
+class VFSPublication (DefaultPublication):
+    # This class will not be needed if we move callObject().
+
+    def callObject(self, request, ob):
+        method = getattr(ob, request.method)
+        return mapply(method, request.getPositionalArguments(), request)
+    
+    def traverseName(self, request, ob, name):
+        return ob.publishTraverse(request, name)
+
+
+class TestFile:
+
+    __implements__ = IVFSFilePublisher
+
+    def __init__(self, data=''):
+        self.data = data
+
+    def publishTraverse(self, request, name):
+        """See IVFSPublisher."""
+        raise OSError, 'Cannot traverse TestFiles'
+
+    def isdir(self):
+        """See IVFSObjectPublisher."""
+        return 0
+
+    def isfile(self):
+        """See IVFSObjectPublisher."""
+        return 1
+
+    def stat(self):
+        """See IVFSObjectPublisher."""
+        raise NotImplementedError
+
+    def read(self, mode, outstream, start=0, end=-1):
+        """See IVFSFilePublisher."""
+        if end >= 0:
+            s = self.data[start:end]
+        else:
+            s = self.data[start:]
+        outstream.write(s)
+        
+    def write(self, mode, instream, start=0):
+        """See IVFSFilePublisher."""
+        s = instream.read()
+        if 'a' in mode:
+            self.data = self.data + s
+        else:
+            self.data = self.data[:start] + s + self.data[start + len(s):]
+
+
+
+
+class TestDirectory:
+
+    __implements__ = IVFSDirectoryPublisher
+
+    def __init__(self, items={}):
+        self.items = items.copy()
+
+    def publishTraverse(self, request, name):
+        """See IVFSPublisher."""
+        return self.items[name]
+
+    def isdir(self):
+        """See IVFSObjectPublisher."""
+        return 1
+
+    def isfile(self):
+        """See IVFSObjectPublisher."""
+        return 0
+
+    def stat(self):
+        """See IVFSObjectPublisher."""
+        raise NotImplementedError
+
+    def exists(self, name):
+        """See IVFSDirectoryPublisher."""
+        return name in self.items
+
+    def listdir(self, with_stats=0, pattern='*'):
+        """See IVFSDirectoryPublisher."""
+        if with_stats or pattern != '*':
+            raise NotImplementedError
+        return self.items.keys()
+
+    def mkdir(self, name, mode=0777):
+        """See IVFSDirectoryPublisher."""
+        self.items[name] = TestDirectory()
+
+    def remove(self, name):
+        """See IVFSDirectoryPublisher."""
+        del self.items[name]
+
+    def rmdir(self, name):
+        """See IVFSDirectoryPublisher."""
+        del self.items[name]
+
+    def rename(self, old, new):
+        """See IVFSDirectoryPublisher."""
+        if new in self.items:
+            raise OSError, 'Name conflict'
+        self.items[new] = self.items[old]
+        del self.items[old]
+
+    def writefile(self, name, mode, instream, start=0):
+        """See IVFSDirectoryPublisher."""
+        if not (name in self.items):
+            self.items[name] = TestFile()
+        self.items[name].write(mode, instream, start)
+
+    def check_writable(self, name):
+        """See IVFSDirectoryPublisher."""
+        if name in self.items:
+            if not self.items[name].isfile():
+                raise IOError, 'Is not a file'
+
+
+class PublisherFileSystemTests(unittest.TestCase, WriteFilesystemTests):
+    """This test is constructed in a way that it builds up a directory
+       structure, whcih is removed at the end.
+    """
+
+    filesystem_class = PublisherFileSystem
+
+    check_exceptions = 1
+
+    def setUp(self):
+
+        app = TestDirectory()
+        
+        pub = VFSPublication(app)
+
+        def request_factory(input_stream, output_steam, env):
+            request = VFSRequest(input_stream, output_steam, env)
+            request.setPublication(pub)
+            return request
+
+        self.filesystem = PublisherFileSystem(None, request_factory)
+        self.filesystem.mkdir(self.dir_name)
+        s = StringIO(self.file_contents)
+        self.filesystem.writefile(self.file_name, 'w', s)
+
+    def tearDown(self):
+        pass
+
+
+
+def test_suite():
+    loader = unittest.TestLoader()
+    return loader.loadTestsFromTestCase(PublisherFileSystemTests)
+
+if __name__=='__main__':
+    unittest.TextTestRunner().run( test_suite() )