[Zope-Checkins] CVS: Zope3/lib/python/Zope/App/ZopePublication - ZopePublication.py: __init__.py:
Shane Hathaway
Thu, 15 Nov 2001 18:11:49 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/App/ZopePublication
In directory cvs.zope.org:/tmp/cvs-serv16980/ZopePublication
Added Files:
Tag: Zope-3x-branch
ZopePublication.py __init__.py
Log Message:
Rough ZopePublication implementation
=== Added File Zope3/lib/python/Zope/App/ZopePublication/ZopePublication.py ===
import sys
from ZODB.POSException import ConflictError
from zLOG import LOG, ERROR, INFO
from Zope.Publisher.DefaultPublication import DefaultPublication
from Zope.Publisher.mapply import mapply
from Zope.Publisher.Exceptions import Retry
class RequestContainer:
# TODO: add security assertion declaring access to REQUEST
def __init__(self, request):
self.REQUEST = request
class ZopePublication (DefaultPublication):
Zope publication specification.
version_cookie = 'Zope-Version'
root_name = 'Application'
def __init__(self, db, ptype):
# db is a ZODB.DB.DB object.
# ptype is a subinterface of IPublish.
self.db = db
self.ptype = ptype
def beforeTraversal(self, request):
def openedConnection(self, conn):
# Hook for auto-refresh
def getApplication(self, request):
version = request.get(self.version_cookie, '')
conn = self.db.open(version)
cleanup = Cleanup(conn.close)
request._hold(cleanup) # Close the connection on request.close()
conn.setDebugInfo(getattr(request, 'environ', None), request.other)
v = conn.root()[self.root_name]
if hasattr(v, '__of__'):
v = v.__of__(RequestContainer(request))
return v
def invokeHooks(self, request, ob):
# Call __before_publishing_traverse__ hooks
def traverseName(self, request, ob, name, check_auth=1):
if name[:1] == '_':
raise Unauthorized("Name %s begins with an underscore" % `name`)
if hasattr(ob, name):
subob = getattr(ob, name)
subob = ob[name]
except (KeyError, IndexError,
TypeError, AttributeError):
raise NotFound(ob, name, request.getURL())
if not getattr(subob, '__doc__', None):
raise DebugError(subob, 'Missing or empty doc string at: %s' %
return subob
def getDefaultTraversal(self, request, ob):
return ob, None
def afterTraversal(self, request, ob):
#recordMetaData(object, request)
def callObject(self, request, ob):
return mapply(ob, request.args, request)
def afterCall(self, request):
def handleException(self, request, exc_info, retry_allowed=1):
# Abort the transaction.
t, v = exc_info[:2]
# Look for a component to handle the exception.
traversed = request.traversed
if traversed:
context = traversed[-1]
handler = getExceptionHandler(context, t, self.ptype)
if handler is not None:
handler(request, exc_info)
return request.response
# Handle special exception types.
if isinstance(t, ClassType):
if retry_allowed and issubclass(t, ConflictError):
LOG('Zope Publication', INFO,
'Competing writes at %s' % request.get('PATH_INFO', '???'),
raise Retry
# Let the response handle it as best it can.
response = request.response
return response
=== Added File Zope3/lib/python/Zope/App/ZopePublication/__init__.py ===
Zope publication package.