[Zope3-checkins] CVS: Zope3/src/zope/server/vfs - __init__.py:1.1.2.1 osfilesystem.py:1.1.2.1 publisherfilesystem.py:1.1.2.1 testfilesystemaccess.py:1.1.2.1 usernamepassword.py:1.1.2.1
Jim Fulton
jim@zope.com
Mon, 23 Dec 2002 14:33:27 -0500
Update of /cvs-repository/Zope3/src/zope/server/vfs
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/server/vfs
Added Files:
Tag: NameGeddon-branch
__init__.py osfilesystem.py publisherfilesystem.py
testfilesystemaccess.py usernamepassword.py
Log Message:
Initial renaming before debugging
=== Added File Zope3/src/zope/server/vfs/__init__.py ===
#
# This file is necessary to make this directory a package.
=== Added File Zope3/src/zope/server/vfs/osfilesystem.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Filesystem implementation for a real (unix-like) OS filesystem.
$Id: osfilesystem.py,v 1.1.2.1 2002/12/23 19:33:25 jim Exp $
"""
import os
import re
import stat
import datetime
import fnmatch
fromts = datetime.datetime.fromtimestamp
from zope.server.interfaces.vfs import IPosixFileSystem
class OSFileSystem(object):
"""Generic OS FileSystem implementation.
The root of this file system is a string describing the path
to the directory used as root.
"""
__implements__ = IPosixFileSystem
copy_bytes = 65536
def __init__ (self, root):
self.root = root
############################################################
# Implementation methods for interface
# Zope.Server.VFS.IPosixFileSystem.IPosixFileSystem
def chmod(self, path, mode):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate (path)
return os.chmod(p, mode)
def chown(self, path, uid, gid):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate (path)
return os.chown(p, uid, gid)
def link(self, src, dst):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
src = self.translate(src)
dst = self.translate(dst)
return os.link(src, dst)
def mkfifo(self, path, mode=6*2**6):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
return os.mkfifo(path, mode)
def symlink(self, src, dst):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
src = self.translate(src)
dst = self.translate(dst)
return os.symlink(src, dst)
######################################
# from: Zope.Server.VFS.IReadFileSystem.IReadFileSystem
def exists(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
p = self.translate(path)
return os.path.exists(p)
def isdir(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
p = self.translate(path)
return os.path.isdir(p)
def isfile(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
p = self.translate(path)
return os.path.isfile(p)
def listdir(self, path, with_stats=0, pattern='*'):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
p = self.translate(path)
# list the directory's files
ld = os.listdir(p)
# filter them using the pattern
ld = filter(lambda f, p=pattern, fnm=fnmatch.fnmatch: fnm(f, p), ld)
# sort them alphabetically
ld.sort()
if not with_stats:
result = ld
else:
result = []
for file in ld:
path = os.path.join(p, file)
stat = safe_stat(path)
if stat is not None:
result.append((file, stat))
return result
def readfile(self, path, mode, outstream, start=0, end=-1):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
p = self.translate(path)
instream = open(p, mode)
if start:
instream.seek(start)
pos = start
while end < 0 or pos < end:
toread = self.copy_bytes
if end >= 0:
toread = min(toread, end - pos)
data = instream.read(toread)
if not data:
break
pos += len(data)
outstream.write(data)
def stat(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
p = self.translate(path)
stat = os.stat(p)
return stat[0:6], fromts(stat[7]), fromts(stat[8]), fromts(stat[9])
######################################
# from: Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem
def mkdir(self, path, mode=6*2**6):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate(path)
return os.mkdir(p, mode)
def remove(self, path):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate (path)
return os.remove(p)
def rmdir(self, path):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate (path)
return os.rmdir(p)
def rename(self, old, new):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
old = self.translate(old)
new = self.translate(new)
return os.rename(old, new)
def writefile(self, path, mode, instream, start=0):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate(path)
outstream = open(p, mode)
if start:
outstream.seek(start)
while 1:
data = instream.read(self.copy_bytes)
if not data:
break
outstream.write(data)
def check_writable(self, path):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate(path)
if os.path.exists(p):
remove = 0
else:
remove = 1
f = open(p, 'a') # append mode
f.close()
if remove:
os.remove(p)
#
############################################################
# utility methods
def normalize(self, path):
# watch for the ever-sneaky '/+' path element
# XXX It is unclear why "/+" is dangerous. It is definitely
# unexpected.
path = re.sub('/+', '/', path)
path = os.path.normpath(path)
if path.startswith('..'):
# Someone is trying to get lower than the permitted root.
# We just ignore it.
path = os.sep
return path
def translate(self, path):
"""We need to join together three separate path components,
and do it safely. <real_root>/<path>
use the operating system's path separator.
We need to be extremly careful to include the cases where a hacker
could attempt to a directory below root!
"""
# Normalize the directory
path = self.normalize(path)
# Prepare for joining with root
while path.startswith(os.sep):
path = path[1:]
# Join path with root
return os.path.join(self.root, path)
def __repr__(self):
return '<OSFileSystem, root=%s>' % self.root
def safe_stat(path):
try:
stat = os.stat(path)
return stat[:7] + (fromts(stat[7]), fromts(stat[8]), fromts(stat[9]))
except OSError:
return None
=== Added File Zope3/src/zope/server/vfs/publisherfilesystem.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: publisherfilesystem.py,v 1.1.2.1 2002/12/23 19:33:25 jim Exp $
"""
import re
import stat
import time
import posixpath
from cStringIO import StringIO
from zope.server.interfaces.vfs import IReadFileSystem
from zope.server.interfaces.vfs import IWriteFileSystem
from zope.publisher.publish import publish
class NoOutput:
"""An output stream lookalike that warns you if you try to
dump anything into it."""
def write(self, data):
raise RuntimeError, "Not a writable stream"
def flush(self):
pass
close = flush
class PublisherFileSystem:
"""Generic Publisher FileSystem implementation.
"""
__implements__ = IReadFileSystem, IWriteFileSystem
def __init__ (self, credentials, request_factory):
self.credentials = credentials
self.request_factory = request_factory
def _execute(self, path, command, env=None):
if env is None:
env = {}
env['command'] = command
env['path'] = path
env['credentials'] = self.credentials
# NoOutput avoids creating a black hole.
request = self.request_factory(StringIO(''), NoOutput(), env)
# Note that publish() calls close() on request, which deletes the
# response from the request, so that we need to keep track of it.
response = request.response
publish(request)
return response.getResult()
############################################################
# Implementation methods for interface
# Zope.Server.VFS.IReadFileSystem.
def exists(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
if path == '/':
return 1
path, file = posixpath.split(path)
env = {'name': file}
return self._execute(path, 'exists', env)
def isdir(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
return self._execute(path, 'isdir')
def isfile(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
return self._execute(path, 'isfile')
def listdir(self, path, with_stats=0, pattern='*'):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
env = {'with_stats' : with_stats,
'pattern' : pattern}
return self._execute(path, 'listdir', env)
def readfile(self, path, mode, outstream, start=0, end=-1):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
env = {'mode' : mode,
'outstream' : outstream,
'start' : start,
'end' : end}
return self._execute(path, 'read', env)
def stat(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
return self._execute(path, 'stat')
#
############################################################
############################################################
# Implementation methods for interface
# Zope.Server.VFS.IWriteFileSystem.
def mkdir(self, path, mode=777):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
path = self.translate(path)
path, dir = posixpath.split(path)
env = {'name': dir}
return self._execute(path, 'mkdir', env)
def remove(self, path):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
path = self.translate(path)
path, name = posixpath.split(path)
env = {'name': name}
return self._execute(path, 'remove', env)
def rmdir(self, path):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
path = self.translate(path)
path, dir = posixpath.split(path)
env = {'name': dir}
return self._execute(path, 'rmdir', env)
def rename(self, old, new):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
old = self.translate(old)
new = self.translate(new)
path0, old = posixpath.split(old)
path1, new = posixpath.split(new)
assert path0 == path1
env = {'old' : old,
'new' : new}
return self._execute(path0, 'rename', env)
def writefile(self, path, mode, instream, start=0):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
path = self.translate(path)
path, name = posixpath.split(path)
env = {'name' : name,
'mode' : mode,
'instream' : instream,
'start' : start}
return self._execute(path, 'writefile', env)
def check_writable(self, path):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
path = self.translate(path)
path, name = posixpath.split(path)
env = {'name' : name}
return self._execute(path, 'check_writable', env)
#
############################################################
# utility methods
def translate (self, path):
# Normalize
path = posixpath.normpath(path)
if path.startswith('..'):
# Someone is trying to get lower than the permitted root.
# We just ignore it.
path = '/'
return path
=== Added File Zope3/src/zope/server/vfs/testfilesystemaccess.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Implementation of IFilesystemAccess intended only for testing.
$Id: testfilesystemaccess.py,v 1.1.2.1 2002/12/23 19:33:25 jim Exp $
"""
from zope.server.interfaces.vfs import IFilesystemAccess
from zope.server.interfaces.vfs import IUsernamePassword
from zope.exceptions import Unauthorized
class TestFilesystemAccess:
__implements__ = IFilesystemAccess
passwords = {'foo': 'bar'}
def __init__(self, fs):
self.fs = fs
def authenticate(self, credentials):
if not IUsernamePassword.isImplementedBy(credentials):
raise Unauthorized
name = credentials.getUserName()
if not (name in self.passwords):
raise Unauthorized
if credentials.getPassword() != self.passwords[name]:
raise Unauthorized
def open(self, credentials):
self.authenticate(credentials)
return self.fs
=== Added File Zope3/src/zope/server/vfs/usernamepassword.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: usernamepassword.py,v 1.1.2.1 2002/12/23 19:33:26 jim Exp $
"""
from zope.server.interfaces.vfs import IUsernamePassword
class UsernamePassword:
__implements__ = IUsernamePassword
def __init__(self, username, password):
self.username = username
self.password = password
def getUserName(self):
return self.username
def getPassword(self):
return self.password