[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/VFS - PublisherFileSystem.py:1.1.2.1 PublisherVFSTask.py:1.1.2.1 ZODBFileSystem.py:NONE
Stephan Richter
srichter@cbu.edu
Thu, 4 Apr 2002 06:44:54 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Server/VFS
In directory cvs.zope.org:/tmp/cvs-serv21650/lib/python/Zope/Server/VFS
Added Files:
Tag: Zope3-Server-Branch
PublisherFileSystem.py PublisherVFSTask.py
Removed Files:
Tag: Zope3-Server-Branch
ZODBFileSystem.py
Log Message:
Worked some more on FTP, especially in regards with the Publisher.
- Finally we can get rid of the medusa base. I think I have extracted all
of the interesting code.
- Started on a PublisherFileSystem. A few methods might work, but most of
the FS methods are not hooked up yet.
- Started writing VFS Publisher hooks.
- Added the FTPServer to the startup registry. It comes up, but do not
expect it to work, since no views have been written for VFS yet.
=== Added File Zope3/lib/python/Zope/Server/VFS/PublisherFileSystem.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: PublisherFileSystem.py,v 1.1.2.1 2002/04/04 11:44:53 srichter Exp $
"""
import os, re
import stat
import time
from IReadFileSystem import IReadFileSystem
from IWriteFileSystem import IWriteFileSystem
from Zope.Publisher.Publish import publish
from ListProducer import ListProducer
class PublisherFileSystem:
"""Generic Publisher FileSystem implementation.
"""
__implements__ = IReadFileSystem, IWriteFileSystem
request_factory = None
def __init__ (self, root):
self.root = root
def _execute(self, path, command, env=None):
if env is None:
env = {}
env['command'] = command
env['path'] = path
request = self.request_factory(StringIO(''), StringIO(''), env)
publish(request)
return request.getResponse().getResult()
############################################################
# Implementation methods for interface
# Zope.Server.VFS.IReadFileSystem.IReadFileSystem
def exists(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
return self._execute(path, 'exists')
def isdir(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
return self._execute(path, 'isdir')
def isfile(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
return self._execute(path, 'isfile')
def listdir(self, path, long=0):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
# Returns a list of Wrapper objects representing the objects
# Also, the Wrapper object should contain *all* stat information
ld = self._execute(path, 'listdir')
ld.sort()
if not long:
return ListProducer(ld, 0, None)
else:
return ListProducer(result, 1, self.longify)
def longify(self, (path, stat_info)):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
return unix_longify (path, stat_info)
def open(self, path, mode):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
return self._execute(path, 'open', {'mode': mode})
def stat(self, path):
'See Zope.Server.VFS.IReadFileSystem.IReadFileSystem'
path = self.translate(path)
return self._execute(path, 'stat')
#
############################################################
############################################################
# Implementation methods for interface
# Zope.Server.VFS.IWriteFileSystem.
def chmod(self, path, mode):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate (path)
return os.chmod(p, mode)
def chown(self, path, uid, gid):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate (path)
raise NotImplementedError('This function is not implemented.')
def link(self, src, dst):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
src = self.translate(src)
dst = self.translate(dst)
raise NotImplementedError('This function is not implemented.')
def mkdir(self, path, mode=6*2**6):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate(path)
def mkfifo(self, path, mode=6*2**6):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
raise NotImplementedError('This function is not implemented.')
def remove(self, path):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate (path)
def rmdir(self, path):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
p = self.translate (path)
def rename(self, old, new):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
old = self.translate(old)
new = self.translate(new)
def symlink(self, src, dst):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
src = self.translate(src)
dst = self.translate(dst)
raise NotImplementedError('This function is not implemented.')
def unlink(self, path):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
pass
def write(self, fd, data):
'See Zope.Server.VFS.IWriteFileSystem.IWriteFileSystem'
pass
#
############################################################
# utility methods
def normalize (self, path):
# watch for the ever-sneaky '/+' path element
path = re.sub('/+', '/', path)
# Someone is trying to get lower than the permitted root.
# We just ignore it.
path = os.path.normpath(path)
if path.startswith('..'):
path = '/'
return path
def translate (self, path):
"""We need to join together three separate path components,
and do it safely. <real_root>/<path>
use the operating system's path separator.
We need to be extremly careful to include the cases where a hacker
could attempt to a directory below root!
"""
# Normalize the directory
path = os.sep.join(path.split('/'))
path = self.normalize(self.path_module.join(path))
# Prepare for joining with root
if path[0] == '/':
path = path[1:]
# Join path with root
path = self.path_module.join(self.root, path)
return path
def __repr__ (self):
return '<Publisher-FileSystem Root:%s>' % self.root
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
mode_table = {
'0':'---',
'1':'--x',
'2':'-w-',
'3':'-wx',
'4':'r--',
'5':'r-x',
'6':'rw-',
'7':'rwx'
}
def unix_longify (file, stat_info):
# for now, only pay attention to the lower bits
import pwd, grp
try: username = pwd.getpwuid(int(stat_info[stat.ST_UID]))[0]
except: username = stat_info[stat.ST_UID]
try: grpname = grp.getgrgid(int(stat_info[stat.ST_GID]))[0]
except: grpname = stat_info[stat.ST_GID]
mode = ('%o' % stat_info[stat.ST_MODE])[-3:]
mode = ''.join(map (lambda x: mode_table[x], mode))
if stat.S_ISDIR (stat_info[stat.ST_MODE]):
dirchar = 'd'
else:
dirchar = '-'
date = ls_date (long(time.time()), stat_info[stat.ST_MTIME])
return '%s%s %3d %-8s %-8s %8d %s %s' % (
dirchar,
mode,
stat_info[stat.ST_NLINK],
username,
grpname,
stat_info[stat.ST_SIZE],
date,
file
)
def ls_date (now, t):
"""Emulate the unix 'ls' command's date field. it has two formats
- if the date is more than 180 days in the past, then it's like
this: Oct 19 1995 otherwise, it looks like this: Oct 19 17:33
"""
try:
info = time.gmtime(t)
except:
info = time.gmtime(0)
# 15,600,000 == 86,400 * 180
if (now - t) > 15600000:
return '%s %2d %d' % (
months[info[1]-1],
info[2],
info[0]
)
else:
return '%s %2d %02d:%02d' % (
months[info[1]-1],
info[2],
info[3],
info[4]
)
def safe_stat (path):
try:
return os.stat(path)
except:
return None
=== Added File Zope3/lib/python/Zope/Server/VFS/PublisherVFSTask.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: PublisherVFSTask.py,v 1.1.2.1 2002/04/04 11:44:53 srichter Exp $
"""
import socket
from Zope.Server.ITask import ITask
class FTPTask:
"""
"""
__implements__ = ITask
def __init__(self, channel, command, m_name):
self.channel = channel
self.m_name = m_name
self.args = command.args
############################################################
# Implementation methods for interface
# Zope.Server.ITask
def service(self):
"""Called to execute the task.
"""
try:
try:
self.start()
getattr(self, self.m_name)(self.args)
self.finish()
except socket.error:
self.close_on_finish = 1
if self.channel.adj.log_socket_errors:
raise
finally:
self.channel.end_task(self.close_on_finish)
def cancel(self):
'See Zope.Server.ITask.ITask'
self.channel.close_when_done()
def defer(self):
'See Zope.Server.ITask.ITask'
pass
#
############################################################
def start(self):
now = time.time()
self.start_time = now
def finish(self):
hit_log = self.channel.server.hit_log
if hit_log is not None:
hit_log.log(self)
=== Removed File Zope3/lib/python/Zope/Server/VFS/ZODBFileSystem.py ===