[Zope3-checkins] CVS: Zope3/src/zope/publisher - __init__.py:1.1.2.1 base.py:1.1.2.1 browser.py:1.1.2.1 configure.zcml:1.1.2.1 http.py:1.1.2.1 maybe_lock.py:1.1.2.1 meta.zcml:1.1.2.1 normal.clb:1.1.2.1 publish.py:1.1.2.1 vfs.py:1.1.2.1 xmlrpc.py:1.1.2.1
Jim Fulton
jim@zope.com
Mon, 23 Dec 2002 14:33:12 -0500
Update of /cvs-repository/Zope3/src/zope/publisher
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/publisher
Added Files:
Tag: NameGeddon-branch
__init__.py base.py browser.py configure.zcml http.py
maybe_lock.py meta.zcml normal.clb publish.py vfs.py xmlrpc.py
Log Message:
Initial renaming before debugging
=== Added File Zope3/src/zope/publisher/__init__.py ===
#
# This file is necessary to make this directory a package.
=== Added File Zope3/src/zope/publisher/base.py === (457/557 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
'''Response Output formatter
$Id: base.py,v 1.1.2.1 2002/12/23 19:33:08 jim Exp $
'''
import traceback
from zope.interfaces.publisher import IPublisherResponse
from zope.interfaces.publisher import IApplicationResponse
class IResponse(IPublisherResponse, IApplicationResponse):
"""The basic response contract
"""
class BaseResponse(object):
"""Base Response Class
"""
__slots__ = (
'_body', # The response body
'_outstream', # The output stream
)
__implements__ = IResponse
def __init__(self, outstream):
self._body = ''
self._outstream = outstream
############################################################
# Implementation methods for interface
# Zope.Publisher.BaseResponse.IResponse
######################################
[-=- -=- -=- 457 lines omitted -=- -=- -=-]
def getApplication(self, request):
return self.app
def callTraversalHooks(self, request, ob):
pass
def traverseName(self, request, ob, name, check_auth=1):
if name.startswith('_'):
raise Unauthorized("Name %s begins with an underscore" % `name`)
if hasattr(ob, name):
subob = getattr(ob, name)
else:
try:
subob = ob[name]
except (KeyError, IndexError,
TypeError, AttributeError):
raise NotFound(ob, name, request)
if self.require_docstrings and not getattr(subob, '__doc__', None):
raise DebugError(subob, 'Missing or empty doc string')
return subob
def getDefaultTraversal(self, request, ob):
return ob, ()
def afterTraversal(self, request, ob):
pass
def callObject(self, request, ob):
return mapply(ob, request.getPositionalArguments(), request)
def afterCall(self, request):
pass
def handleException(self, object, request, exc_info, retry_allowed=1):
# Let the response handle it as best it can.
request.response.handleException(exc_info)
class TestPublication(DefaultPublication):
def traverseName(self, request, ob, name, check_auth=1):
if hasattr(ob, name):
subob = getattr(ob, name)
else:
try:
subob = ob[name]
except (KeyError, IndexError,
TypeError, AttributeError):
raise NotFound(ob, name, request)
return subob
=== Added File Zope3/src/zope/publisher/browser.py === (803/903 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
__version__='$Revision: 1.1.2.1 $'[11:-2]
import re
from types import ListType, TupleType
__ArrayTypes = (ListType, TupleType)
def field2string(v):
if hasattr(v,'read'): v=v.read()
else: v=str(v)
return v
def field2text(v, nl=re.compile('\r\n|\n\r').search):
if hasattr(v,'read'): v=v.read()
else: v=str(v)
mo = nl(v)
if mo is None: return v
l = mo.start(0)
r=[]
s=0
while l >= s:
r.append(v[s:l])
s=l+2
mo=nl(v,s)
if mo is None: l=-1
else: l=mo.start(0)
r.append(v[s:])
return '\n'.join(r)
def field2required(v):
if hasattr(v,'read'): v=v.read()
else: v=str(v)
if v.strip(): return v
raise ValueError, 'No input for required field<p>'
[-=- -=- -=- 803 lines omitted -=- -=- -=-]
def __wrapInHTML(self, title, content):
t = escape(title)
return (
"<html><head><title>%s</title></head>\n"
"<body><h2>%s</h2>\n"
"%s\n"
"</body></html>\n" %
(t, t, content)
)
def __insertBase(self, body):
# Only insert a base tag if content appears to be html.
content_type = self.getHeader('content-type', '')
if content_type and not is_text_html(content_type):
return body
if getattr(self, '_base', ''):
if body:
match = start_of_header_search(body)
if match is not None:
index = match.start(0) + len(match.group(0))
ibase = base_re_search(body)
if ibase is None:
body = ('%s\n<base href="%s" />\n%s' %
(body[:index], self._base, body[index:]))
return body
def getBase(self):
return getattr(self, '_base', '')
def setBase(self, base):
self._base = base
def redirect(self, location, status=302):
base = getattr(self, '_base', '')
if base and isRelative(str(location)):
l = base.rfind('/')
if l >= 0:
base = base[:l+1]
else:
base += '/'
location = base + location
super(BrowserResponse, self).redirect(location, status)
def is_text_html(content_type):
return content_type.startswith('text/html')
=== Added File Zope3/src/zope/publisher/configure.zcml ===
<zopeConfigure
xmlns='http://namespaces.zope.org/zope'
>
<include package=".HTTP" />
</zopeConfigure>
<zopeConfigure
xmlns='http://namespaces.zope.org/zope'
>
<content class="zope.publisher.http.HTTPRequest">
<require
permission="Zope.View"
interface="zope.publisher.interfaces.http.IHTTPApplicationRequest"/>
</content>
<content class="zope.publisher.http.URLGetter">
<require
permission="Zope.View"
attributes="get __getitem__ __str__" />
</content>
</zopeConfigure>
=== Added File Zope3/src/zope/publisher/http.py === (993/1093 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: http.py,v 1.1.2.1 2002/12/23 19:33:08 jim Exp $
"""
import re, time, random
from urllib import quote, splitport
from types import StringType
from zope.publisher.base import BaseRequest
from zope.exceptions import NotFoundError
from zope.publisher.interfaces.http import IHTTPCredentials
from zope.publisher.interfaces.http import IHTTPRequest
from zope.publisher.interfaces.http import IHTTPApplicationRequest
from zope.publisher.base \
import RequestDataProperty, RequestDataMapper, RequestDataGetter
# Default Encoding
ENCODING = 'UTF-8'
class CookieMapper(RequestDataMapper):
_mapname = '_cookies'
class HeaderGetter(RequestDataGetter):
_gettrname = 'getHeader'
_marker = object()
class URLGetter:
def __init__(self, request):
self.__request = request
def __str__(self):
[-=- -=- -=- 993 lines omitted -=- -=- -=-]
class HTTPCharsets:
__implements__ = IUserPreferredCharsets
def __init__(self, request):
self.request = request
############################################################
# Implementation methods for interface
# Zope.I18n.IUserPreferredCharsets.
def getPreferredCharsets(self):
'''See interface IUserPreferredCharsets'''
charsets = []
sawstar = sawiso88591 = 0
for charset in self.request.get('HTTP_ACCEPT_CHARSET', '').split(','):
charset = charset.strip().lower()
if charset:
if ';' in charset:
charset, quality = charset.split(';')
if not quality.startswith('q='):
# not a quality parameter
quality = 1.0
else:
try:
quality = float(quality[2:])
except ValueError:
continue
else:
quality = 1.0
if quality == 0.0:
continue
if charset == '*':
sawstar = 1
if charset == 'iso-8859-1':
sawiso88591 = 1
charsets.append((quality, charset))
# Quoting RFC 2616, $14.2: If no "*" is present in an Accept-Charset
# field, then all character sets not explicitly mentioned get a
# quality value of 0, except for ISO-8859-1, which gets a quality
# value of 1 if not explicitly mentioned.
if not sawstar and not sawiso88591:
charsets.append((1.0, 'iso-8859-1'))
# UTF-8 is **always** preferred over anything else.
# XXX Please give more details as to why!
charsets.sort(sort_charsets)
return [c[1] for c in charsets]
#
############################################################
=== Added File Zope3/src/zope/publisher/maybe_lock.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
__version__='$Revision: 1.1.2.1 $'[11:-2]
# Waaaa, I wish I didn't have to work this hard.
try: from thread import allocate_lock
except:
class allocate_lock:
def acquire(*args): pass
def release(*args): pass
=== Added File Zope3/src/zope/publisher/meta.zcml ===
<zopeConfigure
xmlns='http://namespaces.zope.org/zope'
xmlns:browser='http://namespaces.zope.org/browser'
>
<include package=".VFS" file="meta.zcml" />
</zopeConfigure>
=== Added File Zope3/src/zope/publisher/normal.clb ===
Publisher framework base collaboration
Participants:
publisher:IPublisher
request:IPublisherRequest
response:IPublicationResponse = request.getResponse()
publication:IPublication = request.getPublication()
Values:
root
"the top-level object"
foo
"an object obtaines by traversing the root with 'foo'"
bar
"an object obtaines by traversing the root with 'bar'"
result
"The result of calling bar"
Scenario: Normal, path = foo/bar
"Show normal publishing of a simple path without errors"
publisher.publish(request)
request.processInputs()
publication.beforeTraversal(request)
publication.getApplication(request)
"Get the root object to be traversed"
request.traverse(root)
publication.callTraversalHooks(request, root)
'''Call any "before traversal step" hooks. These hooks
should be called before traversing an object for the
first time. If the same object is traversed more than
once, the hook will still only be called the first
time.
The last constraint is probably important to get
virtual host semantics rigfht. :)
'''
publication.traverseName(request, root, 'foo')
publication.callTraversalHooks(request, foo)
publication.traverseName(request, foo, 'bar')
return bar
publication.afterTraversal(request, bar)
publication.callObject(request, bar)
return result
response.setBody(result)
publication.afterCall(request)
response.outputBody()
request.close()
Scenario: Error during application call, path = foo
"Show what heppens when the main application code raises an error"
publisher.publish(request)
request.processInputs()
publication.beforeTraversal(request)
publication.getApplication(request)
request.traverse(root)
publication.callTraversalHooks(request, root)
publication.traverseName(request, root, 'foo')
return foo
publication.afterTraversal(request, foo)
publication.callObject(request, foo)
raise AttributeError, 'spam'
publication.handleException()
response.outputBody()
request.close()
=== Added File Zope3/src/zope/publisher/publish.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Provide an apply-like facility that works with any mapping object
"""
from zope.proxy.introspection import removeAllProxies
_marker = [] # Create a new marker object.
def unwrapMethod(object):
""" object -> ( unwrapped, wrapperCount )
Unwrap 'object' until we get to a real function, counting the
number of unwrappings.
Bail if we find a class or something we can't
idendify as callable.
"""
wrapperCount = 0
unwrapped = object
for i in range(10):
bases = getattr(unwrapped, '__bases__', None)
if bases is not None:
raise TypeError, "mapply() can not call class constructors"
im_func = getattr(unwrapped, 'im_func', None)
if im_func is not None:
unwrapped = im_func
wrapperCount += 1
continue
func_code = getattr(unwrapped, 'func_code', None)
if func_code is not None:
break
__call__ = getattr(unwrapped, '__call__' , None)
if __call__ is not None:
unwrapped = unwrapped.__call__
else:
raise TypeError, "mapply() can not call %s" % `object`
else:
raise TypeError(
"couldn't find callable metadata, mapply() error on %s"%`object`
)
return unwrapped, wrapperCount
def mapply(object, positional=(), request={}):
__traceback_info__ = object
# we need deep access for intrspection. Waaa.
unwrapped = removeAllProxies(object)
unwrapped, wrapperCount = unwrapMethod(unwrapped)
code = unwrapped.func_code
defaults = unwrapped.func_defaults
names = code.co_varnames[wrapperCount:code.co_argcount]
nargs = len(names)
if positional:
args = list(positional)
if len(args) > nargs:
given = len(args)
if wrapperCount:
given = given + wrapperCount
raise TypeError, (
'%s() takes at most %d argument%s(%d given)' % (
getattr(unwrapped, '__name__', repr(object)), code.co_argcount,
(code.co_argcount > 1 and 's ' or ' '), given))
else:
args = []
get = request.get
if defaults:
nrequired = len(names) - (len(defaults))
else:
nrequired = len(names)
for index in range(len(args), nargs):
name = names[index]
v = get(name, _marker)
if v is _marker:
if name == 'REQUEST':
v = request
elif index < nrequired:
raise TypeError, 'Missing argument to %s(): %s' % (
getattr(unwrapped, '__name__', repr(object)), name)
else:
v = defaults[index-nrequired]
args.append(v)
args = tuple(args)
return object(*args)
"""
Python Object Publisher -- Publish Python objects on web servers
$Id: publish.py,v 1.1.2.1 2002/12/23 19:33:08 jim Exp $
"""
import sys, os
from zope.interfaces.publisher import Retry
def publish(request, handle_errors=1):
try: # finally to clean up to_raise and close request
to_raise = None
while 1:
publication = request.publication
try:
try:
object = None
try:
request.processInputs()
publication.beforeTraversal(request)
object = publication.getApplication(request)
object = request.traverse(object)
publication.afterTraversal(request, object)
result = publication.callObject(request, object)
response = request.response
if result is not response:
response.setBody(result)
publication.afterCall(request)
except:
publication.handleException(object, request, sys.exc_info(), 1)
if not handle_errors:
raise
break # Successful.
except Retry, retryException:
if request.supportsRetry():
# Create a copy of the request and use it.
newrequest = request.retry()
request.close()
request = newrequest
elif handle_errors:
# Output the original exception.
publication = request.publication
publication.handleException(
object, request, retryException.getOriginalException(), 0)
break
else:
raise
except:
# Bad exception handler or retry method.
# Re-raise after outputting the response.
if handle_errors:
request.response.internalError()
to_raise = sys.exc_info()
break
else:
raise
response = request.response
response.outputBody()
if to_raise is not None:
raise to_raise[0], to_raise[1], to_raise[2]
finally:
to_raise = None # Avoid circ. ref.
request.close() # Close database connections, etc.
=== Added File Zope3/src/zope/publisher/vfs.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
$Id: vfs.py,v 1.1.2.1 2002/12/23 19:33:08 jim Exp $
"""
from zope.publisher.base import BaseResponse
class VFSResponse(BaseResponse):
"""VFS response
"""
__slots__ = (
'_exc',
)
def setBody(self, body):
"""Sets the body of the response
It is very important to note that in this case the body may
not be just a astring, but any Python object.
"""
self._body = body
def outputBody(self):
'See Zope.Publisher.IPublisherResponse.IPublisherResponse'
pass
def getResult(self):
if getattr(self, '_exc', None) is not None:
raise self._exc[0], self._exc[1]
return self._getBody()
def handleException(self, exc_info):
self._exc = exc_info[:2]
# import traceback
# traceback.print_exc()
"""
$Id: vfs.py,v 1.1.2.1 2002/12/23 19:33:08 jim Exp $
"""
__metaclass__ = type # All classes are new style when run with Python 2.2+
from zope.publisher.interfaces.vfs import IVFSView
class VFSView:
__implements__ = IVFSView
def __init__(self, context, request):
self.context = context
self.request = request
"""
$Id: vfs.py,v 1.1.2.1 2002/12/23 19:33:08 jim Exp $
"""
from zope.publisher.base import BaseRequest
from zope.publisher.interfaces.vfs import IVFSView
from zope.publisher.interfaces.vfs import IVFSCredentials
class VFSRequest(BaseRequest):
__implements__ = BaseRequest.__implements__, IVFSCredentials
# _presentation_type is overridden from the BaseRequest
# to implement IVFSView
_presentation_type = IVFSView
def __init__(self, body_instream, outstream, environ, response=None):
""" """
super(VFSRequest, self).__init__(
body_instream, outstream, environ, response)
self._environ = environ
self.method = ''
self.__setupPath()
def _createResponse(self, outstream):
"""Create a specific XML-RPC response object."""
return VFSResponse(outstream)
############################################################
# Implementation methods for interface
# Zope.Publisher.VFS.IVFSCredentials.
def _authUserPW(self):
'See Zope.Publisher.VFS.IVFSCredentials.IVFSCredentials'
# XXX This is wrong. Instead of _authUserPW() there should
# be a method of all requests called getCredentials() which
# returns an ICredentials instance.
credentials = self._environ['credentials']
return credentials.getUserName(), credentials.getPassword()
def unauthorized(self, challenge):
'See Zope.Publisher.VFS.IVFSCredentials.IVFSCredentials'
pass
#
############################################################
######################################
# from: Zope.Publisher.IPublisherRequest.IPublisherRequest
def processInputs(self):
'See Zope.Publisher.IPublisherRequest.IPublisherRequest'
if 'command' in self._environ:
self.method = self._environ['command']
#
############################################################
def __setupPath(self):
self._setupPath_helper("path")
def __repr__(self):
# Returns a *short* string.
return '<%s instance at 0x%x, path=%s>' % (
str(self.__class__), id(self), '/'.join(self._traversal_stack))
"""VFS-View for IFile
VFS-view implementation for a generic file.
$Id: vfs.py,v 1.1.2.1 2002/12/23 19:33:08 jim Exp $
"""
import datetime
zerotime = datetime.datetime.fromtimestamp(0)
from zope.component import queryAdapter
from zope.publisher.interfaces.vfs import IVFSFilePublisher
from zope.app.interfaces.dublincore import IZopeDublinCore
class VFSFileView(VFSView):
"""Abstract class providing the infrastructure for a basic VFS view
for basic file-like content object."""
__implements__ = IVFSFilePublisher, VFSView.__implements__
# These methods need to be implmented by the real view class
def _setData(self, data):
raise NotImplemented
def _getData(self):
raise NotImplemented
def _getSize(self):
return len(self._getData())
############################################################
# Implementation methods for interface
# Zope.Publisher.VFS.IVFSFilePublisher.
def read(self, mode, outstream, start = 0, end = -1):
"""See Zope.Publisher.VFS.IVFSFilePublisher.IVFSFilePublisher"""
data = self._getData()
try:
if end != -1: data = data[:end]
if start != 0: data = data[start:]
except TypeError:
pass
outstream.write(data)
def write(self, mode, instream, start = 0):
"""See Zope.Publisher.VFS.IVFSFilePublisher.IVFSFilePublisher"""
try:
instream.seek(start)
except:
pass
self._setData(instream.read())
def check_writable(self, mode):
"""See Zope.Publisher.VFS.IVFSFilePublisher.IVFSFilePublisher"""
return 1
######################################
# from: Zope.Publisher.VFS.IVFSObjectPublisher.IVFSObjectPublisher
def isdir(self):
"""See Zope.Publisher.VFS.IVFSObjectPublisher.IVFSObjectPublisher"""
return 0
def isfile(self):
"""See Zope.Publisher.VFS.IVFSObjectPublisher.IVFSObjectPublisher"""
return 1
def stat(self):
"""See Zope.Publisher.VFS.IVFSObjectPublisher.IVFSObjectPublisher"""
dc = queryAdapter(self, IZopeDublinCore)
if dc is not None:
modified = dc.modified
created = dc.created
else:
created = zerotime
modified = zerotime
if created is None:
created = zerotime
if modified is None:
modified = created
size = self._getSize()
uid = "nouser"
gid = "nogroup"
return (504, 0, 0, 0, uid, gid, size, modified, modified, created)
######################################
# from: Zope.Publisher.VFS.IVFSPublisher.IVFSPublisher
def publishTraverse(self, request, name):
"""See Zope.Publisher.VFS.IVFSPublisher.IVFSPublisher"""
# Traversing always stops here.
return None
#
############################################################
=== Added File Zope3/src/zope/publisher/xmlrpc.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: xmlrpc.py,v 1.1.2.1 2002/12/23 19:33:08 jim Exp $
"""
from zope.publisher.interfaces.xmlrpc import IXMLRPCPublisher
from zope.publisher.http import DefaultPublisher
class MethodPublisher(DefaultPublisher):
"""Simple XML-RPC publisher that is identical to the HTTP Default Publisher
except that it implements the IXMLRPCPublisher interface.
"""
__implements__ = IXMLRPCPublisher
"""
$Id: xmlrpc.py,v 1.1.2.1 2002/12/23 19:33:08 jim Exp $
"""
import xmlrpclib
from cgi import FieldStorage
from zope.publisher.http import HTTPRequest
from zope.publisher.interfaces.xmlrpc import IXMLRPCPublisher
from zope.publisher.interfaces.xmlrpc import IXMLRPCPublication
from zope.publisher.interfaces.xmlrpc import IXMLRPCPresentation
class XMLRPCRequest(HTTPRequest):
__implements__ = HTTPRequest.__implements__, IXMLRPCPublication
# _presentation_type is overridden from the BaseRequest
# to implement IXMLRPCPublisher
_presentation_type = IXMLRPCPresentation
_args = ()
def _createResponse(self, outstream):
"""Create a specific XML-RPC response object."""
return XMLRPCResponse(outstream)
###################################################################
# from: Zope.Publisher.IPublisherRequest.IPublisherRequest
def processInputs(self):
'See Zope.Publisher.IPublisherRequest.IPublisherRequest'
# Parse the request XML structure
self._args, function = xmlrpclib.loads(self._body_instream.read())
# Translate '.' to '/' in function to represent object traversal.
function = function.replace('.', '/')
if function:
self.setPathSuffix((function,))
#
###################################################################
class TestRequest(XMLRPCRequest):
def __init__(self, body_instream=None, outstream=None, environ=None,
response=None, **kw):
_testEnv = {
'SERVER_URL': 'http://127.0.0.1',
'HTTP_HOST': '127.0.0.1',
'CONTENT_LENGTH': '0',
'GATEWAY_INTERFACE': 'TestFooInterface/1.0',
}
if environ:
_testEnv.update(environ)
if kw:
_testEnv.update(kw)
if body_instream is None:
from StringIO import StringIO
body_instream = StringIO('')
if outstream is None:
outstream = StringIO()
super(TestRequest, self).__init__(
body_instream, outstream, _testEnv, response)
"""
$Id: xmlrpc.py,v 1.1.2.1 2002/12/23 19:33:08 jim Exp $
"""
import xmlrpclib
from zope.publisher.http import HTTPResponse
from zope.proxy.introspection import removeAllProxies
class XMLRPCResponse(HTTPResponse):
"""XMLRPC response
"""
__implements__ = HTTPResponse.__implements__
def setBody(self, body):
"""Sets the body of the response
Sets the return body equal to the (string) argument "body". Also
updates the "content-length" return header.
If the body is a 2-element tuple, then it will be treated
as (title,body)
If is_error is true then the HTML will be formatted as a Zope error
message instead of a generic HTML page.
"""
body = removeAllProxies(body)
if isinstance(body, xmlrpclib.Fault):
# Convert Fault object to XML-RPC response.
body = xmlrpclib.dumps(body, methodresponse=1)
else:
# Marshall our body as an XML-RPC response. Strings will be sent
# strings, integers as integers, etc. We do *not* convert
# everything to a string first.
if body is None:
body = xmlrpclib.False # Argh, XML-RPC doesn't handle null
try:
body = xmlrpclib.dumps((body,), methodresponse=1)
except Exception, e:
self.handleException(e)
return
# Set our body to the XML-RPC message, and fix our MIME type.
self.setHeader('content-type', 'text/xml')
self._body = body
self._updateContentLength()
if not self._status_set:
self.setStatus(200)
def handleException(self, exc_info):
"""Handle Errors during publsihing and wrap it in XML-RPC XML"""
t, value = exc_info[:2]
import traceback
traceback.print_tb(exc_info[2])
print t
print value
# Create an appropriate Fault object. Unfortunately, we throw away
# most of the debugging information. More useful error reporting is
# left as an exercise for the reader.
Fault = xmlrpclib.Fault
fault_text = None
try:
if isinstance(value, Fault):
fault_text = value
elif isinstance(value, Exception):
fault_text = Fault(-1, "Unexpected Zope exception: " +
str(value))
else:
fault_text = Fault(-2, "Unexpected Zope error value: " +
str(value))
except:
fault_text = Fault(-3, "Unknown Zope fault type")
# Do the damage.
self.setBody(fault_text)
self.setStatus(200)
"""
$Id: xmlrpc.py,v 1.1.2.1 2002/12/23 19:33:08 jim Exp $
"""
__metaclass__ = type # All classes are new style when run with Python 2.2+
from zope.publisher.interfaces.xmlrpc import IXMLRPCView
class XMLRPCView:
__implements__ = IXMLRPCView
def __init__(self, context, request):
self.context = context
self.request = request