[Zope3-checkins] CVS: Zope3/src/zope/server/vfs - __init__.py:1.1.2.1 osfilesystem.py:1.1.2.1 publisherfilesystem.py:1.1.2.1 testfilesystemaccess.py:1.1.2.1 usernamepassword.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:33:27 -0500


Update of /cvs-repository/Zope3/src/zope/server/vfs
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/server/vfs

Added Files:
      Tag: NameGeddon-branch
	__init__.py osfilesystem.py publisherfilesystem.py 
	testfilesystemaccess.py usernamepassword.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zope/server/vfs/__init__.py ===
#
# This file is necessary to make this directory a package.


=== Added File Zope3/src/zope/server/vfs/osfilesystem.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Filesystem implementation for a real (unix-like) OS filesystem.

$Id: osfilesystem.py,v 1.1.2.1 2002/12/23 19:33:25 jim Exp $
"""
import os
import re
import stat
import datetime
import fnmatch
fromts = datetime.datetime.fromtimestamp

from zope.server.interfaces.vfs import IPosixFileSystem


class OSFileSystem(object):
    """Generic OS FileSystem implementation.

       The root of this file system is a string describing the path
       to the directory used as root.
    """

    __implements__ = IPosixFileSystem

    copy_bytes = 65536


    def __init__ (self, root):
        self.root = root

    ############################################################
    # Implementation methods for interface
    # Zope.Server.VFS.IPosixFileSystem.IPosixFileSystem

    def chmod(self, path, mode):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        p = self.translate (path)
        return os.chmod(p, mode)


    def chown(self, path, uid, gid):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        p = self.translate (path)
        return os.chown(p, uid, gid)


    def link(self, src, dst):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        src = self.translate(src)
        dst = self.translate(dst)
        return os.link(src, dst)


    def mkfifo(self, path, mode=6*2**6):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        return os.mkfifo(path, mode)


    def symlink(self, src, dst):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        src = self.translate(src)
        dst = self.translate(dst)
        return os.symlink(src, dst)


    ######################################
    # from: Zope.Server.VFS.IReadFileSystem.IReadFileSystem

    def exists(self, path):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        p = self.translate(path)
        return os.path.exists(p)


    def isdir(self, path):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        p = self.translate(path)
        return os.path.isdir(p)


    def isfile(self, path):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        p = self.translate(path)
        return os.path.isfile(p)


    def listdir(self, path, with_stats=0, pattern='*'):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        p = self.translate(path)
        # list the directory's files
        ld = os.listdir(p)
        # filter them using the pattern
        ld = filter(lambda f, p=pattern, fnm=fnmatch.fnmatch: fnm(f, p), ld)
        # sort them alphabetically
        ld.sort()
        if not with_stats:
            result = ld
        else:
            result = []
            for file in ld:
                path = os.path.join(p, file)
                stat = safe_stat(path)
                if stat is not None:
                    result.append((file, stat))
        return result


    def readfile(self, path, mode, outstream, start=0, end=-1):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        p = self.translate(path)
        instream = open(p, mode)
        if start:
            instream.seek(start)
        pos = start
        while end < 0 or pos < end:
            toread = self.copy_bytes
            if end >= 0:
                toread = min(toread, end - pos)
            data = instream.read(toread)
            if not data:
                break
            pos += len(data)
            outstream.write(data)


    def stat(self, path):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        p = self.translate(path)
        stat = os.stat(p)
        return stat[0:6], fromts(stat[7]), fromts(stat[8]), fromts(stat[9])


    ######################################
    # from: Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem

    def mkdir(self, path, mode=6*2**6):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        p = self.translate(path)
        return os.mkdir(p, mode)


    def remove(self, path):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        p = self.translate (path)
        return os.remove(p)


    def rmdir(self, path):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        p = self.translate (path)
        return os.rmdir(p)


    def rename(self, old, new):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        old = self.translate(old)
        new = self.translate(new)
        return os.rename(old, new)


    def writefile(self, path, mode, instream, start=0):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        p = self.translate(path)
        outstream = open(p, mode)
        if start:
            outstream.seek(start)
        while 1:
            data = instream.read(self.copy_bytes)
            if not data:
                break
            outstream.write(data)

    def check_writable(self, path):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        p = self.translate(path)
        if os.path.exists(p):
            remove = 0
        else:
            remove = 1
        f = open(p, 'a')  # append mode
        f.close()
        if remove:
            os.remove(p)

    #
    ############################################################


    # utility methods

    def normalize(self, path):
        # watch for the ever-sneaky '/+' path element
        # XXX It is unclear why "/+" is dangerous.  It is definitely
        # unexpected.
        path = re.sub('/+', '/', path)
        path = os.path.normpath(path)
        if path.startswith('..'):
            # Someone is trying to get lower than the permitted root.
            # We just ignore it.
            path = os.sep
        return path


    def translate(self, path):
        """We need to join together three separate path components,
           and do it safely.  <real_root>/<path>
           use the operating system's path separator.

           We need to be extremly careful to include the cases where a hacker
           could attempt to a directory below root!
        """
        # Normalize the directory
        path = self.normalize(path)
        # Prepare for joining with root
        while path.startswith(os.sep):
            path = path[1:]
        # Join path with root
        return os.path.join(self.root, path)


    def __repr__(self):
        return '<OSFileSystem, root=%s>' % self.root



def safe_stat(path):
    try:
        stat = os.stat(path)
        return stat[:7] + (fromts(stat[7]), fromts(stat[8]), fromts(stat[9]))
    except OSError:
        return None


=== Added File Zope3/src/zope/server/vfs/publisherfilesystem.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: publisherfilesystem.py,v 1.1.2.1 2002/12/23 19:33:25 jim Exp $
"""

import re
import stat
import time
import posixpath

from cStringIO import StringIO

from zope.server.interfaces.vfs import IReadFileSystem
from zope.server.interfaces.vfs import IWriteFileSystem

from zope.publisher.publish import publish



class NoOutput:
    """An output stream lookalike that warns you if you try to
    dump anything into it."""

    def write(self, data):
        raise RuntimeError, "Not a writable stream"

    def flush(self):
        pass

    close = flush



class PublisherFileSystem:
    """Generic Publisher FileSystem implementation.
    """

    __implements__ = IReadFileSystem, IWriteFileSystem

    def __init__ (self, credentials, request_factory):
        self.credentials = credentials
        self.request_factory = request_factory


    def _execute(self, path, command, env=None):
        if env is None:
            env = {}

        env['command'] = command
        env['path'] = path
        env['credentials'] = self.credentials
        # NoOutput avoids creating a black hole.
        request = self.request_factory(StringIO(''), NoOutput(), env)
        # Note that publish() calls close() on request, which deletes the
        # response from the request, so that we need to keep track of it.
        response = request.response
        publish(request)
        return response.getResult()


    ############################################################
    # Implementation methods for interface
    # Zope.Server.VFS.IReadFileSystem.

    def exists(self, path):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        path = self.translate(path)
        if path == '/':
            return 1
        path, file = posixpath.split(path)
        env = {'name': file}
        return self._execute(path, 'exists', env)


    def isdir(self, path):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        path = self.translate(path)
        return self._execute(path, 'isdir')


    def isfile(self, path):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        path = self.translate(path)
        return self._execute(path, 'isfile')


    def listdir(self, path, with_stats=0, pattern='*'):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        path = self.translate(path)
        env = {'with_stats' : with_stats,
               'pattern' : pattern}
        return self._execute(path, 'listdir', env)


    def readfile(self, path, mode, outstream, start=0, end=-1):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        path = self.translate(path)
        env = {'mode'      : mode,
               'outstream' : outstream,
               'start'     : start,
               'end'       : end}
        return self._execute(path, 'read', env)


    def stat(self, path):
        'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
        path = self.translate(path)
        return self._execute(path, 'stat')

    #
    ############################################################

    ############################################################
    # Implementation methods for interface
    # Zope.Server.VFS.IWriteFileSystem.

    def mkdir(self, path, mode=777):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        path = self.translate(path)
        path, dir = posixpath.split(path)
        env = {'name': dir}
        return self._execute(path, 'mkdir', env)


    def remove(self, path):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        path = self.translate(path)
        path, name = posixpath.split(path)
        env = {'name': name}
        return self._execute(path, 'remove', env)


    def rmdir(self, path):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        path = self.translate(path)
        path, dir = posixpath.split(path)
        env = {'name': dir}
        return self._execute(path, 'rmdir', env)


    def rename(self, old, new):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        old = self.translate(old)
        new = self.translate(new)
        path0, old = posixpath.split(old)
        path1, new = posixpath.split(new)
        assert path0 == path1
        env = {'old' : old,
               'new' : new}
        return self._execute(path0, 'rename', env)

    def writefile(self, path, mode, instream, start=0):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        path = self.translate(path)
        path, name = posixpath.split(path)
        env = {'name'      : name,
               'mode'      : mode,
               'instream'  : instream,
               'start'     : start}
        return self._execute(path, 'writefile', env)


    def check_writable(self, path):
        'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
        path = self.translate(path)
        path, name = posixpath.split(path)
        env = {'name'      : name}
        return self._execute(path, 'check_writable', env)

    #
    ############################################################

    # utility methods

    def translate (self, path):
        # Normalize
        path = posixpath.normpath(path)
        if path.startswith('..'):
            # Someone is trying to get lower than the permitted root.
            # We just ignore it.
            path = '/'
        return path



=== Added File Zope3/src/zope/server/vfs/testfilesystemaccess.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Implementation of IFilesystemAccess intended only for testing.

$Id: testfilesystemaccess.py,v 1.1.2.1 2002/12/23 19:33:25 jim Exp $
"""

from zope.server.interfaces.vfs import IFilesystemAccess
from zope.server.interfaces.vfs import IUsernamePassword
from zope.exceptions import Unauthorized


class TestFilesystemAccess:

    __implements__ = IFilesystemAccess

    passwords = {'foo': 'bar'}

    def __init__(self, fs):
        self.fs = fs

    def authenticate(self, credentials):
        if not IUsernamePassword.isImplementedBy(credentials):
            raise Unauthorized
        name = credentials.getUserName()
        if not (name in self.passwords):
            raise Unauthorized
        if credentials.getPassword() != self.passwords[name]:
            raise Unauthorized

    def open(self, credentials):
        self.authenticate(credentials)
        return self.fs



=== Added File Zope3/src/zope/server/vfs/usernamepassword.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: usernamepassword.py,v 1.1.2.1 2002/12/23 19:33:26 jim Exp $
"""

from zope.server.interfaces.vfs import IUsernamePassword


class UsernamePassword:

    __implements__ = IUsernamePassword

    def __init__(self, username, password):
        self.username = username
        self.password = password

    def getUserName(self):
        return self.username

    def getPassword(self):
        return self.password