[Zope3-checkins] CVS: Zope3/src/zope/server/vfs/tests - __init__.py:1.1.2.1 posixfilesystemtests.py:1.1.2.1 readfilesystemtests.py:1.1.2.1 test_osfilesystem.py:1.1.2.1 test_publisherfilesystem.py:1.1.2.1 writefilesystemtests.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:33:27 -0500


Update of /cvs-repository/Zope3/src/zope/server/vfs/tests
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/server/vfs/tests

Added Files:
      Tag: NameGeddon-branch
	__init__.py posixfilesystemtests.py readfilesystemtests.py 
	test_osfilesystem.py test_publisherfilesystem.py 
	writefilesystemtests.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zope/server/vfs/tests/__init__.py ===
#
# This file is necessary to make this directory a package.


=== Added File Zope3/src/zope/server/vfs/tests/posixfilesystemtests.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: posixfilesystemtests.py,v 1.1.2.1 2002/12/23 19:33:26 jim Exp $
"""


import stat

from zope.interface.verify import verifyClass
from zope.server.interfaces.vfs import IPosixFileSystem

from zope.server.vfs.tests.writefilesystemtests import WriteFilesystemTests


class PosixFilesystemTests (WriteFilesystemTests):
    """Tests of a writable and readable POSIX-compliant filesystem
    """

    def testChmod(self):
        old_mode = self.filesystem.stat(self.file_name)[stat.ST_MODE]
        new_mode = old_mode ^ 0444
        self.filesystem.chmod(self.file_name, new_mode)
        check_mode = self.filesystem.stat(self.file_name)[stat.ST_MODE]
        self.assertEqual(check_mode, new_mode)


    def testChown(self):
        self.filesystem.chown(self.file_name, 500, 500)
        s = self.filesystem.stat(self.file_name)
        self.assertEqual(s[stat.ST_UID], 500)
        self.assertEqual(s[stat.ST_GID], 500)


    def testMakeLink(self):
        self.filesystem.link(self.file_name, self.file_name + '.linked')
        self.failUnless(self.filesystem.exists(self.file_name + '.linked'))
        # Another test should test whether writing to one file
        # changes the other.


    def testMakeFifo(self):
        path = self.dir_name + '/fifo'
        self.filesystem.mkfifo(path)
        self.failUnless(self.filesystem.exists(path))
        # Another test should test the behavior of the fifo.


    def testMakeSymlink(self):
        self.filesystem.symlink(self.file_name, self.file_name + '.symlink')
        self.failUnless(self.filesystem.exists(self.file_name + '.symlink'))
        # Another test should test whether writing to one file
        # changes the other.


    def testPosixInterface(self):
        class_ = self.filesystem.__class__
        self.failUnless(
            IPosixFileSystem.isImplementedByInstancesOf(class_))
        self.failUnless(verifyClass(IPosixFileSystem, class_))



=== Added File Zope3/src/zope/server/vfs/tests/readfilesystemtests.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: readfilesystemtests.py,v 1.1.2.1 2002/12/23 19:33:26 jim Exp $
"""


import stat
from StringIO import StringIO

from zope.interface.verify import verifyClass
from zope.server.interfaces.vfs import IReadFileSystem


class ReadFilesystemTests:
    """Tests of a readable filesystem
    """

    filesystem = None
    dir_name  = '/dir'
    file_name = '/dir/file.txt'
    dir_contents = ['file.txt']
    file_contents = 'Lengthen your stride'

    check_exceptions = 1


    def testExists(self):
        self.failUnless(self.filesystem.exists(self.dir_name))
        self.failUnless(self.filesystem.exists(self.file_name))


    def testIsDir(self):
        self.failUnless(self.filesystem.isdir(self.dir_name))
        self.failUnless(not self.filesystem.isdir(self.file_name))


    def testIsFile(self):
        self.failUnless(self.filesystem.isfile(self.file_name))
        self.failUnless(not self.filesystem.isfile(self.dir_name))


    def testListDir(self):
        lst = self.filesystem.listdir(self.dir_name, 0)
        lst.sort()
        self.assertEqual(lst, self.dir_contents)


    def testReadFile(self):
        s = StringIO()
        self.filesystem.readfile(self.file_name, 'rb', s)
        self.assertEqual(s.getvalue(), self.file_contents)


    def testReadPartOfFile(self):
        s = StringIO()
        self.filesystem.readfile(self.file_name, 'rb', s, 2)
        self.assertEqual(s.getvalue(), self.file_contents[2:])


    def testReadPartOfFile2(self):
        s = StringIO()
        self.filesystem.readfile(self.file_name, 'rb', s, 1, 5)
        self.assertEqual(s.getvalue(), self.file_contents[1:5])


    def testReadInterface(self):
        class_ = self.filesystem.__class__
        self.failUnless(
            IReadFileSystem.isImplementedByInstancesOf(class_))
        self.failUnless(verifyClass(IReadFileSystem, class_))



=== Added File Zope3/src/zope/server/vfs/tests/test_osfilesystem.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: test_osfilesystem.py,v 1.1.2.1 2002/12/23 19:33:26 jim Exp $
"""
import unittest
import os
import shutil
import tempfile
import datetime
fromts = datetime.datetime.fromtimestamp

from StringIO import StringIO

from zope.server.vfs.osfilesystem import OSFileSystem

from zope.server.vfs.tests.writefilesystemtests import WriteFilesystemTests


def joinToRoot(root, name):
    if name.startswith('/'):
        name = name[1:]
    return os.path.join(root, os.path.normpath(name))


class OSFileSystemTests(unittest.TestCase, WriteFilesystemTests):
    """This test is constructed in a way that it builds up a directory
       structure, whcih is removed at the end.
    """

    filesystem_class = OSFileSystem
    root = None

    def setUp(self):
        if self.root is None:
            self.root = tempfile.mktemp()
            self.filesystem = self.filesystem_class(self.root)

        os.mkdir(self.root)
        os.mkdir(joinToRoot(self.root, self.dir_name))
        f = open(joinToRoot(self.root, self.file_name), 'w')
        f.write(self.file_contents)
        f.close()


    def tearDown(self):

        shutil.rmtree(self.root)


    def testNormalize(self):

        def norm(p):
            return self.filesystem.normalize(p).replace(os.sep, '/')

        self.assertEqual(norm('/foo/bar//'), '/foo/bar')
        self.assertEqual(norm('/foo//bar'), '/foo/bar')
        self.assertEqual(norm('///foo/bar'), '/foo/bar')
        self.assertEqual(norm('///foo//bar////'), '/foo/bar')

        self.assertEqual(norm('../foo/bar'), '/')
        self.assertEqual(norm('..'), '/')
        self.assertEqual(norm('/..'), '/')
        self.assertEqual(norm('/foo/..'), '/')
        self.assertEqual(norm('/foo/../bar'), '/bar')
        self.assertEqual(norm('../../'), '/')

        self.assertEqual(norm('///../foo/bar'), '/foo/bar')
        self.assertEqual(norm('/foo/..///'), '/')
        self.assertEqual(norm('///foo/..//bar'), '/bar')
        self.assertEqual(norm('..///../'), '/')


    def testTranslate(self):

        self.assertEqual(self.filesystem.root, self.root)

        self.assertEqual(self.filesystem.translate('/foo/'),
                         os.path.join(self.root, 'foo'))
        self.assertEqual(self.filesystem.translate('/foo/bar'),
                         os.path.join(self.root, 'foo', 'bar'))
        self.assertEqual(self.filesystem.translate('foo/bar'),
                         os.path.join(self.root, 'foo', 'bar'))

    def testStat(self):
        stat = os.stat(joinToRoot(self.root, self.file_name))
        stat = stat[0:6], fromts(stat[7]), fromts(stat[8]), fromts(stat[9]) 
        self.assertEqual(self.filesystem.stat(self.file_name), stat)




if 0 and os.name == 'posix':

    from zope.server.vfs.tests.posixfilesystemtests import PosixFilesystemTests

    class OSPosixFilesystemTests(OSFileSystemTests, PosixFilesystemTests):

        def testChown(self):
            # Disable this test, since it won't work unless you're root.
            return

    OSFileSystemTests = OSPosixFilesystemTests



def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(OSFileSystemTests)

if __name__=='__main__':
    unittest.TextTestRunner().run( test_suite() )


=== Added File Zope3/src/zope/server/vfs/tests/test_publisherfilesystem.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: test_publisherfilesystem.py,v 1.1.2.1 2002/12/23 19:33:26 jim Exp $
"""


import unittest
from StringIO import StringIO

from zope.server.vfs.publisherfilesystem import PublisherFileSystem
from zope.publisher.vfs import VFSRequest
from zope.publisher.base import DefaultPublication
from zope.publisher.interfaces.vfs import IVFSFilePublisher
from zope.publisher.interfaces.vfs import IVFSDirectoryPublisher
from zope.publisher.publish import mapply

from zope.server.vfs.tests.writefilesystemtests import WriteFilesystemTests


class VFSPublication (DefaultPublication):
    # This class will not be needed if we move callObject().

    def callObject(self, request, ob):
        method = getattr(ob, request.method)
        return mapply(method, request.getPositionalArguments(), request)
    
    def traverseName(self, request, ob, name):
        return ob.publishTraverse(request, name)


class TestFile:

    __implements__ = IVFSFilePublisher

    def __init__(self, data=''):
        self.data = data

    def publishTraverse(self, request, name):
        """See IVFSPublisher."""
        raise OSError, 'Cannot traverse TestFiles'

    def isdir(self):
        """See IVFSObjectPublisher."""
        return 0

    def isfile(self):
        """See IVFSObjectPublisher."""
        return 1

    def stat(self):
        """See IVFSObjectPublisher."""
        raise NotImplementedError

    def read(self, mode, outstream, start=0, end=-1):
        """See IVFSFilePublisher."""
        if end >= 0:
            s = self.data[start:end]
        else:
            s = self.data[start:]
        outstream.write(s)
        
    def write(self, mode, instream, start=0):
        """See IVFSFilePublisher."""
        s = instream.read()
        if 'a' in mode:
            self.data = self.data + s
        else:
            self.data = self.data[:start] + s + self.data[start + len(s):]




class TestDirectory:

    __implements__ = IVFSDirectoryPublisher

    def __init__(self, items={}):
        self.items = items.copy()

    def publishTraverse(self, request, name):
        """See IVFSPublisher."""
        return self.items[name]

    def isdir(self):
        """See IVFSObjectPublisher."""
        return 1

    def isfile(self):
        """See IVFSObjectPublisher."""
        return 0

    def stat(self):
        """See IVFSObjectPublisher."""
        raise NotImplementedError

    def exists(self, name):
        """See IVFSDirectoryPublisher."""
        return name in self.items

    def listdir(self, with_stats=0, pattern='*'):
        """See IVFSDirectoryPublisher."""
        if with_stats or pattern != '*':
            raise NotImplementedError
        return self.items.keys()

    def mkdir(self, name, mode=0777):
        """See IVFSDirectoryPublisher."""
        self.items[name] = TestDirectory()

    def remove(self, name):
        """See IVFSDirectoryPublisher."""
        del self.items[name]

    def rmdir(self, name):
        """See IVFSDirectoryPublisher."""
        del self.items[name]

    def rename(self, old, new):
        """See IVFSDirectoryPublisher."""
        if new in self.items:
            raise OSError, 'Name conflict'
        self.items[new] = self.items[old]
        del self.items[old]

    def writefile(self, name, mode, instream, start=0):
        """See IVFSDirectoryPublisher."""
        if not (name in self.items):
            self.items[name] = TestFile()
        self.items[name].write(mode, instream, start)

    def check_writable(self, name):
        """See IVFSDirectoryPublisher."""
        if name in self.items:
            if not self.items[name].isfile():
                raise IOError, 'Is not a file'


class PublisherFileSystemTests(unittest.TestCase, WriteFilesystemTests):
    """This test is constructed in a way that it builds up a directory
       structure, whcih is removed at the end.
    """

    filesystem_class = PublisherFileSystem

    check_exceptions = 1

    def setUp(self):

        app = TestDirectory()
        
        pub = VFSPublication(app)

        def request_factory(input_stream, output_steam, env):
            request = VFSRequest(input_stream, output_steam, env)
            request.setPublication(pub)
            return request

        self.filesystem = PublisherFileSystem(None, request_factory)
        self.filesystem.mkdir(self.dir_name)
        s = StringIO(self.file_contents)
        self.filesystem.writefile(self.file_name, 'w', s)

    def tearDown(self):
        pass



def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(PublisherFileSystemTests)

if __name__=='__main__':
    unittest.TextTestRunner().run( test_suite() )


=== Added File Zope3/src/zope/server/vfs/tests/writefilesystemtests.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: writefilesystemtests.py,v 1.1.2.1 2002/12/23 19:33:26 jim Exp $
"""
from cStringIO import StringIO

from zope.interface.verify import verifyClass
from zope.server.interfaces.vfs import IWriteFileSystem

from zope.server.vfs.tests.readfilesystemtests import ReadFilesystemTests


class WriteFilesystemTests (ReadFilesystemTests):
    """Tests of a writable and readable filesystem
    """

    def testRemove(self):
        self.failIf(not self.filesystem.exists(self.file_name))
        self.filesystem.remove(self.file_name)
        self.failIf(self.filesystem.exists(self.file_name))


    def testMkdir(self):
        path = self.dir_name + '/x'
        self.filesystem.mkdir(path)
        self.failUnless(self.filesystem.exists(path))
        self.failUnless(self.filesystem.isdir(path))


    def testRmdir(self):
        self.failIf(not self.filesystem.exists(self.dir_name))
        self.filesystem.remove(self.file_name)
        self.filesystem.rmdir(self.dir_name)
        self.failIf(self.filesystem.exists(self.dir_name))


    def testRename(self):
        self.filesystem.rename(self.file_name, self.file_name + '.bak')
        self.failIf(self.filesystem.exists(self.file_name))
        self.failIf(not self.filesystem.exists(self.file_name + '.bak'))


    def testWriteFile(self):
        s = StringIO()
        self.filesystem.readfile(self.file_name, 'rb', s)
        self.assertEqual(s.getvalue(), self.file_contents)

        data = 'Always ' + self.file_contents
        s = StringIO(data)
        self.filesystem.writefile(self.file_name, 'wb', s)

        s = StringIO()
        self.filesystem.readfile(self.file_name, 'rb', s)
        self.assertEqual(s.getvalue(), data)


    def testAppendToFile(self):
        data = ' again'
        s = StringIO(data)
        self.filesystem.writefile(self.file_name, 'ab', s)

        s = StringIO()
        self.filesystem.readfile(self.file_name, 'rb', s)
        self.assertEqual(s.getvalue(), self.file_contents + data)
        

    def testWritePartOfFile(self):
        data = '123'
        s = StringIO(data)
        self.filesystem.writefile(self.file_name, 'r+b', s, 3)

        expect = self.file_contents[:3] + data + self.file_contents[6:]

        s = StringIO()
        self.filesystem.readfile(self.file_name, 'rb', s)
        self.assertEqual(s.getvalue(), expect)


    def testWriteBeyondEndOfFile(self):
        partlen = len(self.file_contents) - 6
        data = 'daylight savings'
        s = StringIO(data)
        self.filesystem.writefile(self.file_name, 'r+b', s, partlen)

        expect = self.file_contents[:partlen] + data

        s = StringIO()
        self.filesystem.readfile(self.file_name, 'rb', s)
        self.assertEqual(s.getvalue(), expect)


    def testWriteNewFile(self):
        s = StringIO(self.file_contents)
        self.filesystem.writefile(self.file_name + '.new', 'wb', s)

        s = StringIO()
        self.filesystem.readfile(self.file_name, 'rb', s)
        self.assertEqual(s.getvalue(), self.file_contents)


    def testCheckWritable(self):
        if self.check_exceptions:
            # Can't overwrite a directory.
            self.assertRaises(
                IOError, self.filesystem.check_writable, self.dir_name)
        # Can overwrite a file.
        try:
            self.filesystem.check_writable(self.file_name)
        except IOError, v:
            self.fail('%s should be writable. (%s)' % (self.file_name, v))


    def testWriteInterface(self):
        class_ = self.filesystem.__class__
        self.failUnless(
            IWriteFileSystem.isImplementedByInstancesOf(class_))
        self.failUnless(verifyClass(IWriteFileSystem, class_))