[Zope-Checkins] CVS: Zope3/lib/python/Zope/App/ZopePublication - PublicationTraverse.py:1.1.2.1 ZopePublication.py:1.1.2.27
Jim Fulton
jim@zope.com
Tue, 12 Feb 2002 20:01:20 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/App/ZopePublication
In directory cvs.zope.org:/tmp/cvs-serv18111
Modified Files:
Tag: Zope-3x-branch
ZopePublication.py
Added Files:
Tag: Zope-3x-branch
PublicationTraverse.py
Log Message:
Fixed bug in handling name traversers. Wrapping and security checks
weren't being done.
Added support for etc namespace, which, for now just has "Services".
I think later we need some sort of service for these.
Factored out traversal logic so we could use it when checking
ZMI tab access.
=== Added File Zope3/lib/python/Zope/App/ZopePublication/PublicationTraverse.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
Revision information: $Id: PublicationTraverse.py,v 1.1.2.1 2002/02/13 01:01:19 jim Exp $
"""
from Zope.Publisher.Browser.IBrowserPublisher import IBrowserPublisher
from Zope.ComponentArchitecture import getRequestView
from Zope.App.Security.SecurityManagement import getSecurityManager
from Zope.Publisher.Exceptions import NotFound
from types import StringType, UnicodeType
from Zope.ContextWrapper import wrapper
class DuplicateNamespaces(Exception):
"""More than one namespave was specified in a request"""
class UnknownNamespace(Exception):
"""A parameter specified an unknown namespace"""
class ExcessiveWrapping(NotFound):
"""Too many levels of acquisition wrapping. We don't beleive them."""
class PublicationTraverse:
def traverseName(self, request, ob, name):
nm = name # the name to look up the object with
if name.find(';'):
# Process URI segment parameters. It makes sense to centralize
# this here. Later it may be abstracted and distributed again,
# but, if so it will be distributed to various path
# traversers, rather than to traversal adapters/views.
ns = ''
parts = name.split(';')
nm = parts[:1]
for param in parts[1:]:
l = param.find('=')
if l >= 0:
pname = param[:l]
pval = param[l+1:]
if pname == 'ns':
if ns:
raise DuplicateNamespaces(name)
ns = pval
else:
pset = getattr(self, "_parameterSet%s" % pname,
self # marker
)
if pset is self:
# We don't know about this one, so leave it in the
# name
nm.append(param)
else:
pset(pname, pval, request)
else:
if ns:
raise DuplicateNamespaces(name)
ns = param
nm = ';'.join(nm)
if ns:
traverse = getattr(self, "_traverse%s" % ns,
self # marker
)
if traverse is self:
raise UnknownNamespace(ns, name)
ob2 = traverse(request, ob, nm)
return self._wrap(ob2, ob, name, nm)
elif not nm:
# Just set params, so skip
return ob
if nm == '.':
return ob
if IBrowserPublisher.isImplementedBy(ob):
ob2 = ob.browser_traverse(request, nm)
else:
adapter = getRequestView(ob, '_traverse', request, self # marker
)
if adapter is not self:
ob2 = adapter.browser_traverse(request, nm)
else:
raise NotFound(ob, name, request)
return self._wrap(ob2, ob, name, nm)
def _wrap(self, ob, parent, name, nm):
wrapped = wrapper.Wrapper(ob, parent, name=name)
getSecurityManager().validate(nm, wrapped)
return wrapped
def _traverseview(self, request, ob, name):
# use self as marker
r = getRequestView(ob, name, request, self)
if r is self:
raise NotFound(ob, name, request)
return r
def _traverseetc(self, request, ob, name):
# XXX
# This is here now to allow us to get service managers from a
# separate namespace from the content. We add and etc
# namespace to allow us to handle misc objects. We'll apply
# YAGNI for now and hard code this. We'll want something more
# general later. We were thinking of just calling "get"
# methods, but this is probably too magic. In particular, we
# will treat returned objects as sub-objects wrt security and
# not all get methods may satisfy this assumption. It might be
# best to introduce some sort of etc registry.
if name != 'Services':
raise NotFound(ob, name, request)
method_name = "getServiceManager"
method = getattr(ob, method_name, self)
if method is self:
raise NotFound(ob, name, request)
# Check access
self._wrap(method, ob, name, name)
return method()
def _traverseacquire(self, request, ob, name):
i = 0
while i < 200:
i = i + 1
r = getattr(ob, name, self)
if r is not self:
return r
r = getcontext(ob)
if r is None:
raise NotFound(ob, name, request)
raise ExcessiveWrapping(ob, name, request)
class PublicationTraverser(PublicationTraverse):
def traversePath(self, request, ob, path):
if type(path) in (StringType, UnicodeType):
path = path.split('/')
if len(path) > 1 and not path[-1]:
# Remove trailing slash
path.pop()
else:
path = list(path)
# Remove dingle dots
path = [x for x in path if x != '.']
path.reverse()
# Remove double dots
while '..' in path:
l = path.index('..')
if l < 0 or l+2 > len(path):
break
del path[l:l+2]
pop = path.pop
while path:
name = pop()
ob = self.traverseName(request, ob, name)
return ob
=== Zope3/lib/python/Zope/App/ZopePublication/ZopePublication.py 1.1.2.26 => 1.1.2.27 ===
from zLOG import LOG, ERROR, INFO
-from Zope.Publisher.Exceptions import NotFound, DebugError
from Zope.ComponentArchitecture import getRequestView
from Zope.Publisher.DefaultPublication import DefaultPublication
from Zope.Publisher.mapply import mapply
@@ -24,6 +23,9 @@
from ZODB.POSException import ConflictError
from Zope.App.OFS.Folder.RootFolder import RootFolder
from Zope.ContextWrapper import wrapper
+from PublicationTraverse import PublicationTraverse
+
+from Zope.Publisher.Browser.IBrowserPublisher import IBrowserPublisher
class RequestContainer:
# TODO: add security assertion declaring access to REQUEST
@@ -108,145 +110,53 @@
get_transaction().commit()
def handleException(self, request, exc_info, retry_allowed=1):
- # Abort the transaction.
- get_transaction().abort()
-
- # XXX This does not leak exc_info, but the reason it doesn't
- # is not easy to explain and potentially brittle,
- # so maybe it's not a good idea to not pass exc_info
- # to this method. Maybe we should render the traceback first.
- v = exc_info[1]
-
- # Delegate Unauthorized errors to the authentication service
- # XXX Is this the right way to handle Unauthorized? We need
- # to understand this better.
- if isinstance(v, Unauthorized):
- sm = getSecurityManager()
- id = sm.getPrincipal()
- prin_reg.unauthorized(id, request) # May issue challenge
- return
-
- # Look for a component to handle the exception.
- traversed = request.traversed
- if traversed:
- context = traversed[-1]
- #handler = getExceptionHandler(context, t, IBrowserPublisher)
- handler = None # no getExceptionHandler() exists yet.
- if handler is not None:
- handler(request, exc_info)
+ try:
+ # Abort the transaction.
+ get_transaction().abort()
+
+ # Delegate Unauthorized errors to the authentication service
+ # XXX Is this the right way to handle Unauthorized? We need
+ # to understand this better.
+ if isinstance(exc_info[1], Unauthorized):
+ sm = getSecurityManager()
+ id = sm.getPrincipal()
+ prin_reg.unauthorized(id, request) # May issue challenge
+ request.response.handleException(exc_info)
return
- # Convert ConflictErrors to Retry exceptions.
- if retry_allowed and isinstance(v, ConflictError):
- LOG('Zope Publication', INFO,
- 'Competing writes/reads at %s'
- % request.get('PATH_INFO', '???'),
- error=sys.exc_info())
- raise Retry
-
- # Let the response handle it as best it can.
- response = request.response
- response.handleException(exc_info)
- return
-
-
-from Zope.Publisher.Browser.IBrowserPublisher import IBrowserPublisher
+ # Look for a component to handle the exception.
+ traversed = request.traversed
+ if traversed:
+ context = traversed[-1]
+ #handler = getExceptionHandler(context, t, IBrowserPublisher)
+ handler = None # no getExceptionHandler() exists yet.
+ if handler is not None:
+ handler(request, exc_info)
+ return
+
+ # Convert ConflictErrors to Retry exceptions.
+ if retry_allowed and isinstance(exc_info[1], ConflictError):
+ LOG('Zope Publication', INFO,
+ 'Competing writes/reads at %s'
+ % request.get('PATH_INFO', '???'),
+ error=sys.exc_info())
+ raise Retry
+
+ # Let the response handle it as best it can.
+ response = request.response
+ response.handleException(exc_info)
+ return
+ finally:
+ # Avoid leaking
+ exc_info = 0
-class DuplicateNamespaces(Exception):
- """More than one namespave was specified in a request"""
-
-class UnknownNamespace(Exception):
- """A parameter specified an unknown namespace"""
-class BrowserPublication(ZopePublication):
+class BrowserPublication(PublicationTraverse, ZopePublication):
"""Web browser (HTTP) publication handling."""
-
- def traverseName(self, request, ob, name, check_auth=1):
-
- nm = name # the name to look up the object with
-
- if name.find(';'):
- # Process URI segment parameters. It makes sense to centralize
- # this here. Later it may be abstracted and distributed again,
- # but, if so it will be distributed to various path
- # traversers, rather than to traversal adapters/views.
- ns = ''
- parts = name.split(';')
- nm = parts[:1]
- for param in parts[1:]:
- l = param.find('=')
- if l >= 0:
- pname = param[:l]
- pval = param[l+1:]
- if pname == 'ns':
- if ns:
- raise DuplicateNamespaces(name)
- ns = pval
- else:
- pset = getattr(self, "_parameterSet%s" % pname,
- self # marker
- )
- if pset is self:
- # We don't know about this one, so leave it in the
- # name
- nm.append(param)
- else:
- pset(pname, pval, request)
- else:
- if ns:
- raise DuplicateNamespaces(name)
- ns = param
-
- nm = ';'.join(nm)
- if ns:
- traverse = getattr(self, "_traverse%s" % ns,
- self # marker
- )
- if traverse is self:
- raise UnknownNamespace(ns, name)
-
- return traverse(request, ob, nm)
- elif not nm:
- # Just set params, so skip
- return ob
-
-
- if IBrowserPublisher.isImplementedBy(ob):
- ob2 = ob.browser_traverse(request, nm)
- else:
- adapter = getRequestView(ob, '_traverse', request, self # marker
- )
-
- if adapter is not self:
- ob2 = adapter.browser_traverse(request, nm)
- else:
- raise NotFound(ob, name, request)
-
- wrapped = wrapper.Wrapper(ob2, ob, name=name)
- getSecurityManager().validate(nm, wrapped)
- return wrapped
+
def _parameterSetskin(self, pname, pval, request):
request.setViewSkin(pval)
-
- def _traverseview(self, request, ob, name):
- # use self as marker
- r = getRequestView(ob, name, request, self)
- if r is self:
- raise NotFound(ob, name, request)
- return r
-
- def _traverseacquire(self, request, ob, name):
- i = 0
- while i < 200:
- i = i + 1
- r = getattr(ob, name, self)
- if r is not self:
- return r
- r = getcontext(ob)
- if r is None:
- raise NotFound(ob, name, request)
- raise ExcessiveWrapping(ob, name, request)
def getDefaultTraversal(self, request, ob):
@@ -267,8 +177,3 @@
wrapped = wrapper.Wrapper(r[0], ob, name=None)
getSecurityManager().validate(None, wrapped)
return (wrapped, r[1])
-
-
-class ExcessiveWrapping(NotFound):
- """Too many levels of acquisition wrapping. We don't beleive them."""
-