[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/VFS/tests - PosixFilesystemTests.py:1.2 ReadFilesystemTests.py:1.2 WriteFilesystemTests.py:1.2 __init__.py:1.2 testOSFileSystem.py:1.2 testPublisherFilesystem.py:1.2
Jim Fulton
jim@zope.com
Mon, 10 Jun 2002 19:30:08 -0400
Update of /cvs-repository/Zope3/lib/python/Zope/Server/VFS/tests
In directory cvs.zope.org:/tmp/cvs-serv20468/lib/python/Zope/Server/VFS/tests
Added Files:
PosixFilesystemTests.py ReadFilesystemTests.py
WriteFilesystemTests.py __init__.py testOSFileSystem.py
testPublisherFilesystem.py
Log Message:
Merged Zope-3x-branch into newly forked Zope3 CVS Tree.
=== Zope3/lib/python/Zope/Server/VFS/tests/PosixFilesystemTests.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+
+import stat
+
+from Interface.Verify import verifyClass
+from Zope.Server.VFS.IPosixFileSystem import IPosixFileSystem
+
+from WriteFilesystemTests import WriteFilesystemTests
+
+
+class PosixFilesystemTests (WriteFilesystemTests):
+ """Tests of a writable and readable POSIX-compliant filesystem
+ """
+
+ def testChmod(self):
+ old_mode = self.filesystem.stat(self.file_name)[stat.ST_MODE]
+ new_mode = old_mode ^ 0444
+ self.filesystem.chmod(self.file_name, new_mode)
+ check_mode = self.filesystem.stat(self.file_name)[stat.ST_MODE]
+ self.assertEqual(check_mode, new_mode)
+
+
+ def testChown(self):
+ self.filesystem.chown(self.file_name, 500, 500)
+ s = self.filesystem.stat(self.file_name)
+ self.assertEqual(s[stat.ST_UID], 500)
+ self.assertEqual(s[stat.ST_GID], 500)
+
+
+ def testMakeLink(self):
+ self.filesystem.link(self.file_name, self.file_name + '.linked')
+ self.failUnless(self.filesystem.exists(self.file_name + '.linked'))
+ # Another test should test whether writing to one file
+ # changes the other.
+
+
+ def testMakeFifo(self):
+ path = self.dir_name + '/fifo'
+ self.filesystem.mkfifo(path)
+ self.failUnless(self.filesystem.exists(path))
+ # Another test should test the behavior of the fifo.
+
+
+ def testMakeSymlink(self):
+ self.filesystem.symlink(self.file_name, self.file_name + '.symlink')
+ self.failUnless(self.filesystem.exists(self.file_name + '.symlink'))
+ # Another test should test whether writing to one file
+ # changes the other.
+
+
+ def testPosixInterface(self):
+ class_ = self.filesystem.__class__
+ self.failUnless(
+ IPosixFileSystem.isImplementedByInstancesOf(class_))
+ self.failUnless(verifyClass(IPosixFileSystem, class_))
+
+
+def test_suite():
+ loader = unittest.TestLoader()
+ return loader.loadTestsFromTestCase(OSFileSystemTest)
+
+if __name__=='__main__':
+ unittest.TextTestRunner().run( test_suite() )
=== Zope3/lib/python/Zope/Server/VFS/tests/ReadFilesystemTests.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+
+import stat
+from StringIO import StringIO
+
+from Interface.Verify import verifyClass
+from Zope.Server.VFS.IReadFileSystem import IReadFileSystem
+
+
+class ReadFilesystemTests:
+ """Tests of a readable filesystem
+ """
+
+ filesystem = None
+ dir_name = '/dir'
+ file_name = '/dir/file.txt'
+ dir_contents = ['file.txt']
+ file_contents = 'Lengthen your stride'
+
+ check_exceptions = 1
+
+
+ def testExists(self):
+ self.failUnless(self.filesystem.exists(self.dir_name))
+ self.failUnless(self.filesystem.exists(self.file_name))
+
+
+ def testIsDir(self):
+ self.failUnless(self.filesystem.isdir(self.dir_name))
+ self.failUnless(not self.filesystem.isdir(self.file_name))
+
+
+ def testIsFile(self):
+ self.failUnless(self.filesystem.isfile(self.file_name))
+ self.failUnless(not self.filesystem.isfile(self.dir_name))
+
+
+ def testListDir(self):
+ lst = self.filesystem.listdir(self.dir_name, 0)
+ lst.sort()
+ self.assertEqual(lst, self.dir_contents)
+
+
+ def testReadFile(self):
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, 'rb', s)
+ self.assertEqual(s.getvalue(), self.file_contents)
+
+
+ def testReadPartOfFile(self):
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, 'rb', s, 2)
+ self.assertEqual(s.getvalue(), self.file_contents[2:])
+
+
+ def testReadPartOfFile2(self):
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, 'rb', s, 1, 5)
+ self.assertEqual(s.getvalue(), self.file_contents[1:5])
+
+
+ def testReadInterface(self):
+ class_ = self.filesystem.__class__
+ self.failUnless(
+ IReadFileSystem.isImplementedByInstancesOf(class_))
+ self.failUnless(verifyClass(IReadFileSystem, class_))
+
=== Zope3/lib/python/Zope/Server/VFS/tests/WriteFilesystemTests.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+
+from StringIO import StringIO
+
+from Interface.Verify import verifyClass
+from Zope.Server.VFS.IWriteFileSystem import IWriteFileSystem
+
+from ReadFilesystemTests import ReadFilesystemTests
+
+
+class WriteFilesystemTests (ReadFilesystemTests):
+ """Tests of a writable and readable filesystem
+ """
+
+ def testRemove(self):
+ self.failIf(not self.filesystem.exists(self.file_name))
+ self.filesystem.remove(self.file_name)
+ self.failIf(self.filesystem.exists(self.file_name))
+
+
+ def testMkdir(self):
+ path = self.dir_name + '/x'
+ self.filesystem.mkdir(path)
+ self.failUnless(self.filesystem.exists(path))
+ self.failUnless(self.filesystem.isdir(path))
+
+
+ def testRmdir(self):
+ self.failIf(not self.filesystem.exists(self.dir_name))
+ self.filesystem.remove(self.file_name)
+ self.filesystem.rmdir(self.dir_name)
+ self.failIf(self.filesystem.exists(self.dir_name))
+
+
+ def testRename(self):
+ self.filesystem.rename(self.file_name, self.file_name + '.bak')
+ self.failIf(self.filesystem.exists(self.file_name))
+ self.failIf(not self.filesystem.exists(self.file_name + '.bak'))
+
+
+ def testWriteFile(self):
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, 'rb', s)
+ self.assertEqual(s.getvalue(), self.file_contents)
+
+ data = 'Always ' + self.file_contents
+ s = StringIO(data)
+ self.filesystem.writefile(self.file_name, 'wb', s)
+
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, 'rb', s)
+ self.assertEqual(s.getvalue(), data)
+
+
+ def testAppendToFile(self):
+ data = ' again'
+ s = StringIO(data)
+ self.filesystem.writefile(self.file_name, 'ab', s)
+
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, 'rb', s)
+ self.assertEqual(s.getvalue(), self.file_contents + data)
+
+
+ def testWritePartOfFile(self):
+ data = '123'
+ s = StringIO(data)
+ self.filesystem.writefile(self.file_name, 'r+b', s, 3)
+
+ expect = self.file_contents[:3] + data + self.file_contents[6:]
+
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, 'rb', s)
+ self.assertEqual(s.getvalue(), expect)
+
+
+ def testWriteBeyondEndOfFile(self):
+ partlen = len(self.file_contents) - 6
+ data = 'daylight savings'
+ s = StringIO(data)
+ self.filesystem.writefile(self.file_name, 'r+b', s, partlen)
+
+ expect = self.file_contents[:partlen] + data
+
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, 'rb', s)
+ self.assertEqual(s.getvalue(), expect)
+
+
+ def testWriteNewFile(self):
+ s = StringIO(self.file_contents)
+ self.filesystem.writefile(self.file_name + '.new', 'wb', s)
+
+ s = StringIO()
+ self.filesystem.readfile(self.file_name, 'rb', s)
+ self.assertEqual(s.getvalue(), self.file_contents)
+
+
+ def testCheckWritable(self):
+ if self.check_exceptions:
+ # Can't overwrite a directory.
+ self.assertRaises(
+ IOError, self.filesystem.check_writable, self.dir_name)
+ # Can overwrite a file.
+ try:
+ self.filesystem.check_writable(self.file_name)
+ except IOError, v:
+ self.fail('%s should be writable. (%s)' % (self.file_name, v))
+
+
+ def testWriteInterface(self):
+ class_ = self.filesystem.__class__
+ self.failUnless(
+ IWriteFileSystem.isImplementedByInstancesOf(class_))
+ self.failUnless(verifyClass(IWriteFileSystem, class_))
+
+
+def test_suite():
+ loader = unittest.TestLoader()
+ return loader.loadTestsFromTestCase(OSFileSystemTest)
+
+if __name__=='__main__':
+ unittest.TextTestRunner().run( test_suite() )
=== Zope3/lib/python/Zope/Server/VFS/tests/__init__.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
=== Zope3/lib/python/Zope/Server/VFS/tests/testOSFileSystem.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+
+import unittest
+import os
+import shutil
+import tempfile
+from StringIO import StringIO
+
+from Zope.Server.VFS.OSFileSystem import OSFileSystem
+
+from WriteFilesystemTests import WriteFilesystemTests
+
+
+def joinToRoot(root, name):
+ if name.startswith('/'):
+ name = name[1:]
+ return os.path.join(root, os.path.normpath(name))
+
+
+class OSFileSystemTests(unittest.TestCase, WriteFilesystemTests):
+ """This test is constructed in a way that it builds up a directory
+ structure, whcih is removed at the end.
+ """
+
+ filesystem_class = OSFileSystem
+ root = None
+
+ def setUp(self):
+ if self.root is None:
+ self.root = tempfile.mktemp()
+ self.filesystem = self.filesystem_class(self.root)
+
+ os.mkdir(self.root)
+ os.mkdir(joinToRoot(self.root, self.dir_name))
+ f = open(joinToRoot(self.root, self.file_name), 'w')
+ f.write(self.file_contents)
+ f.close()
+
+
+ def tearDown(self):
+
+ shutil.rmtree(self.root)
+
+
+ def testNormalize(self):
+
+ def norm(p):
+ return self.filesystem.normalize(p).replace(os.sep, '/')
+
+ self.assertEqual(norm('/foo/bar//'), '/foo/bar')
+ self.assertEqual(norm('/foo//bar'), '/foo/bar')
+ self.assertEqual(norm('///foo/bar'), '/foo/bar')
+ self.assertEqual(norm('///foo//bar////'), '/foo/bar')
+
+ self.assertEqual(norm('../foo/bar'), '/')
+ self.assertEqual(norm('..'), '/')
+ self.assertEqual(norm('/..'), '/')
+ self.assertEqual(norm('/foo/..'), '/')
+ self.assertEqual(norm('/foo/../bar'), '/bar')
+ self.assertEqual(norm('../../'), '/')
+
+ self.assertEqual(norm('///../foo/bar'), '/foo/bar')
+ self.assertEqual(norm('/foo/..///'), '/')
+ self.assertEqual(norm('///foo/..//bar'), '/bar')
+ self.assertEqual(norm('..///../'), '/')
+
+
+ def testTranslate(self):
+
+ self.assertEqual(self.filesystem.root, self.root)
+
+ self.assertEqual(self.filesystem.translate('/foo/'),
+ os.path.join(self.root, 'foo'))
+ self.assertEqual(self.filesystem.translate('/foo/bar'),
+ os.path.join(self.root, 'foo', 'bar'))
+ self.assertEqual(self.filesystem.translate('foo/bar'),
+ os.path.join(self.root, 'foo', 'bar'))
+
+ def testStat(self):
+ stat = os.stat(joinToRoot(self.root, self.file_name))
+ self.assertEqual(self.filesystem.stat(self.file_name), stat)
+
+
+
+
+if 0 and os.name == 'posix':
+
+ from PosixFilesystemTests import PosixFilesystemTests
+
+ class OSPosixFilesystemTests(OSFileSystemTests, PosixFilesystemTests):
+
+ def testChown(self):
+ # Disable this test, since it won't work unless you're root.
+ return
+
+ OSFileSystemTests = OSPosixFilesystemTests
+
+
+
+def test_suite():
+ loader = unittest.TestLoader()
+ return loader.loadTestsFromTestCase(OSFileSystemTests)
+
+if __name__=='__main__':
+ unittest.TextTestRunner().run( test_suite() )
=== Zope3/lib/python/Zope/Server/VFS/tests/testPublisherFilesystem.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+
+import unittest
+from StringIO import StringIO
+
+from Zope.Server.VFS.PublisherFileSystem import PublisherFileSystem
+from Zope.Publisher.VFS.VFSRequest import VFSRequest
+from Zope.Publisher.DefaultPublication import DefaultPublication
+from Zope.Publisher.VFS.IVFSFilePublisher import IVFSFilePublisher
+from Zope.Publisher.VFS.IVFSDirectoryPublisher import IVFSDirectoryPublisher
+from Zope.Publisher.mapply import mapply
+
+from WriteFilesystemTests import WriteFilesystemTests
+
+
+class VFSPublication (DefaultPublication):
+ # This class will not be needed if we move callObject().
+
+ def callObject(self, request, ob):
+ method = getattr(ob, request.method)
+ return mapply(method, request.getPositionalArguments(), request)
+
+ def traverseName(self, request, ob, name):
+ return ob.publishTraverse(request, name)
+
+
+class TestFile:
+
+ __implements__ = IVFSFilePublisher
+
+ def __init__(self, data=''):
+ self.data = data
+
+ def publishTraverse(self, request, name):
+ """See IVFSPublisher."""
+ raise OSError, 'Cannot traverse TestFiles'
+
+ def isdir(self):
+ """See IVFSObjectPublisher."""
+ return 0
+
+ def isfile(self):
+ """See IVFSObjectPublisher."""
+ return 1
+
+ def stat(self):
+ """See IVFSObjectPublisher."""
+ raise NotImplementedError
+
+ def read(self, mode, outstream, start=0, end=-1):
+ """See IVFSFilePublisher."""
+ if end >= 0:
+ s = self.data[start:end]
+ else:
+ s = self.data[start:]
+ outstream.write(s)
+
+ def write(self, mode, instream, start=0):
+ """See IVFSFilePublisher."""
+ s = instream.read()
+ if 'a' in mode:
+ self.data = self.data + s
+ else:
+ self.data = self.data[:start] + s + self.data[start + len(s):]
+
+
+
+
+class TestDirectory:
+
+ __implements__ = IVFSDirectoryPublisher
+
+ def __init__(self, items={}):
+ self.items = items.copy()
+
+ def publishTraverse(self, request, name):
+ """See IVFSPublisher."""
+ return self.items[name]
+
+ def isdir(self):
+ """See IVFSObjectPublisher."""
+ return 1
+
+ def isfile(self):
+ """See IVFSObjectPublisher."""
+ return 0
+
+ def stat(self):
+ """See IVFSObjectPublisher."""
+ raise NotImplementedError
+
+ def exists(self, name):
+ """See IVFSDirectoryPublisher."""
+ return name in self.items
+
+ def listdir(self, with_stats=0, pattern='*'):
+ """See IVFSDirectoryPublisher."""
+ if with_stats or pattern != '*':
+ raise NotImplementedError
+ return self.items.keys()
+
+ def mkdir(self, name, mode=0777):
+ """See IVFSDirectoryPublisher."""
+ self.items[name] = TestDirectory()
+
+ def remove(self, name):
+ """See IVFSDirectoryPublisher."""
+ del self.items[name]
+
+ def rmdir(self, name):
+ """See IVFSDirectoryPublisher."""
+ del self.items[name]
+
+ def rename(self, old, new):
+ """See IVFSDirectoryPublisher."""
+ if new in self.items:
+ raise OSError, 'Name conflict'
+ self.items[new] = self.items[old]
+ del self.items[old]
+
+ def writefile(self, name, mode, instream, start=0):
+ """See IVFSDirectoryPublisher."""
+ if not (name in self.items):
+ self.items[name] = TestFile()
+ self.items[name].write(mode, instream, start)
+
+ def check_writable(self, name):
+ """See IVFSDirectoryPublisher."""
+ if name in self.items:
+ if not self.items[name].isfile():
+ raise IOError, 'Is not a file'
+
+
+class PublisherFileSystemTests(unittest.TestCase, WriteFilesystemTests):
+ """This test is constructed in a way that it builds up a directory
+ structure, whcih is removed at the end.
+ """
+
+ filesystem_class = PublisherFileSystem
+
+ check_exceptions = 1
+
+ def setUp(self):
+
+ app = TestDirectory()
+
+ pub = VFSPublication(app)
+
+ def request_factory(input_stream, output_steam, env):
+ request = VFSRequest(input_stream, output_steam, env)
+ request.setPublication(pub)
+ return request
+
+ self.filesystem = PublisherFileSystem(None, request_factory)
+ self.filesystem.mkdir(self.dir_name)
+ s = StringIO(self.file_contents)
+ self.filesystem.writefile(self.file_name, 'w', s)
+
+ def tearDown(self):
+ pass
+
+
+
+def test_suite():
+ loader = unittest.TestLoader()
+ return loader.loadTestsFromTestCase(PublisherFileSystemTests)
+
+if __name__=='__main__':
+ unittest.TextTestRunner().run( test_suite() )