[Zope-Checkins] CVS: Zope3/lib/python/Zope/Publisher/Browser - cgi_names.py:1.1.2.1 BrowserRequest.py:1.1.2.4 BrowserResponse.py:1.1.2.2 IBrowserApplicationRequest.py:1.1.2.1.2.2 IBrowserPublication.py:1.1.2.2 IBrowserRequest.py:1.1.2.2
Jim Fulton
jim@zope.com
Tue, 26 Mar 2002 11:55:08 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Publisher/Browser
In directory cvs.zope.org:/tmp/cvs-serv4886/Zope/Publisher/Browser
Modified Files:
Tag: Zope3-publisher-refactor-branch
BrowserRequest.py BrowserResponse.py
IBrowserApplicationRequest.py IBrowserPublication.py
IBrowserRequest.py
Added Files:
Tag: Zope3-publisher-refactor-branch
cgi_names.py
Log Message:
Got the BrowserPublisher to pass tests.
The remaining task (aside from writing more tests) is to get the
new server setup working. This should also get the server tests to pass.
=== Added File Zope3/lib/python/Zope/Publisher/Browser/cgi_names.py ===
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#cgi hotfix:
import cgi
if not hasattr(cgi, 'valid_boundary'):
try: import cgi_hotfix
except ImportError: pass
isCGI_NAME = {
# These fields are placed in request.environ instead of request.form.
'SERVER_SOFTWARE' : 1,
'SERVER_NAME' : 1,
'GATEWAY_INTERFACE' : 1,
'SERVER_PROTOCOL' : 1,
'SERVER_PORT' : 1,
'REQUEST_METHOD' : 1,
'PATH_INFO' : 1,
'PATH_TRANSLATED' : 1,
'SCRIPT_NAME' : 1,
'QUERY_STRING' : 1,
'REMOTE_HOST' : 1,
'REMOTE_ADDR' : 1,
'AUTH_TYPE' : 1,
'REMOTE_USER' : 1,
'REMOTE_IDENT' : 1,
'CONTENT_TYPE' : 1,
'CONTENT_LENGTH' : 1,
'SERVER_URL': 1,
}.has_key
hide_key={
'HTTP_AUTHORIZATION':1,
'HTTP_CGI_AUTHORIZATION': 1,
}.has_key
=== Zope3/lib/python/Zope/Publisher/Browser/BrowserRequest.py 1.1.2.3 => 1.1.2.4 ===
"""
+import re
+
+from cgi import FieldStorage
from urllib import quote, unquote, splittype, splitport
from cgi_names import isCGI_NAME, hide_key
from Zope.Publisher.Converters import get_converter
from Zope.Publisher.HTTP.HTTPRequest import HTTPRequest
+from IBrowserPublisher import IBrowserPublisher
from IBrowserRequest import IBrowserRequest
+from IBrowserPublication import IBrowserPublication
+from IBrowserApplicationRequest import IBrowserApplicationRequest
+from BrowserResponse import BrowserResponse
# Flas Constants
-SEQUENCE = 1,
-DEFAULT = 2,
-RECORD = 4,
-RECORDS = 8,
-REC = 12, # RECORD|RECORDS
-EMPTY = 16,
-CONVERTED = 32,
+SEQUENCE = 1
+DEFAULT = 2
+RECORD = 4
+RECORDS = 8
+REC = 12 # RECORD|RECORDS
+EMPTY = 16
+CONVERTED = 32
search_type = re.compile('(:[a-zA-Z][a-zA-Z0-9_]+|\\.[xy])$').search
class BrowserRequest(HTTPRequest):
- __implements__ = IBrowserRequest
-
-
- __computed_urls = () # Names of computed URLx variables
- __script = () # SERVER_URL + __script + quoted_steps == full URL
- __server_script = '' # __server_script + quoted_steps = full URL
- __server_url = '' # Possible munged server_url
+ __implements__ = (HTTPRequest.__implements__,
+ IBrowserRequest, IBrowserPublication,
+ IBrowserApplicationRequest,
+ )
+
+ __slots__ = (
+ '_form', # Form data
+ )
+ use_redirect = 0 # Set this to 1 in a subclass to redirect GET
+ # requests when the effective and actual URLs differ.
# _viewtype is overridden from the BaseRequest
# to implement IBrowserPublisher
_viewtype = IBrowserPublisher
-
+
def __init__(self, body_instream, outstream, environ):
-
+ self._form = {}
super(BrowserRequest, self).__init__(body_instream, outstream, environ)
- self.__form = {}
-
- ############################################################
- # Implementation methods for interface
- # Zope.Publisher.Browser.IBrowserRequest.IBrowserRequest
-
- def setServerURL(self, protocol=None, hostname=None, port=None):
- 'See Zope.Publisher.Browser.IBrowserRequest.IBrowserRequest'
- server_url = self.getServerURL()
- if protocol is None and hostname is None and port is None:
- return server_url
- # XXX: Where did the
- oldprotocol, oldhost = splittype(server_url)
- oldhostname, oldport = splitport(oldhost[2:])
- if protocol is None:
- protocol = oldprotocol
- if hostname is None:
- hostname = oldhostname
- if port is None: port = oldport
-
- if (not port or DEFAULT_PORTS.get(protocol, 80) == port):
- host = hostname
- else:
- host = hostname + ':' + port
- server_url = self.__server_url = '%s://%s' % (protocol, host)
- self._resetURLs()
- return server_url
-
-
- def getServerURL(self, defaut=None):
- 'See Zope.Publisher.Browser.IBrowserRequest.IBrowserRequest'
- return self.__server_url
-
-
- def computeURLn(self, key, n, pathonly):
- 'See Zope.Publisher.Browser.IBrowserRequest.IBrowserRequest'
- path = self._script + self.quoted_steps
- n = len(path) - n
- if n < 0:
- raise KeyError, key
- if pathonly:
- path = [''] + path[:n]
- else:
- path = [self.getServerURL()] + path[:n]
- self.other[key] = res = '/'.join(path)
- self._computed_urls = self._computed_urls + (key,)
- return res
-
-
- def computeBASEn(self, key, n, pathonly):
- 'See Zope.Publisher.Browser.IBrowserRequest.IBrowserRequest'
- path = self.quoted_steps
- if n:
- n = n - 1
- if len(path) < n:
- raise KeyError, key
- v = self._script + path[:n]
- else:
- v = self._script[:-1]
- if pathonly:
- v.insert(0, '')
- else:
- v.insert(0, self.getServerURL())
- self.other[key] = res = '/'.join(v)
- self._computed_urls = self._computed_urls + (key,)
- return res
-
-
- ######################################
- # from: Zope.Publisher.Browser.IVirtualHostRequest.IVirtualHostRequest
-
-
- def setVirtualRoot(self, self, path, hard=0):
- 'See Zope.Publisher.Browser.IVirtualHostRequest.IVirtualHostRequest'
- if isinstance(path, StringType):
- path = filter(None, path.split('/'))
- self._script[:] = map(quote, path)
- del self.quoted_steps[:]
- traversed = self.traversed
- if hard:
- del traversed[:-1]
- self.other['VirtualRootPhysicalPath'] = traversed[-1].getPhysicalPath()
- self._resetURLs()
-
-
- def convertPhysicalPathToURL(self, path, relative=0):
- 'See Zope.Publisher.Browser.IVirtualHostRequest.IVirtualHostRequest'
- path = self._script + map(quote, self.physicalPathToVirtualPath(path))
- if relative:
- path.insert(0, '')
- else:
- path.insert(0, self.getServerURL())
- return '/'.join(path)
-
-
- def convertPhysicalPathToVirtualPath(self, path):
- 'See Zope.Publisher.Browser.IVirtualHostRequest.IVirtualHostRequest'
- if isinstance(path, StringType):
- path = path.split('/')
- rpp = self.other.get('VirtualRootPhysicalPath', ('',))
- i = 0
- for name in rpp[:len(path)]:
- if path[i] == name:
- i = i + 1
- else:
- break
- return path[i:]
-
-
- def getPhysicalPathFromURL(self, self, URL):
- 'See Zope.Publisher.Browser.IVirtualHostRequest.IVirtualHostRequest'
- bad_server_url = 0
- path = filter(None, URL.split('/'))
-
- if URL.find('://') >= 0:
- path = path[2:]
-
- # Check the path against BASEPATH1
- vhbase = self._script
- vhbl = len(vhbase)
- bad_basepath = 0
- if path[:vhbl] == vhbase:
- path = path[vhbl:]
- else:
- raise ValueError, (
- 'Url does not match virtual hosting context'
- )
- vrpp = self.other.get('VirtualRootPhysicalPath', ('',))
- return list(vrpp) + map(unquote, path)
- def getEffectiveURL(self):
- 'See Zope.Publisher.Browser.IVirtualHostRequest.IVirtualHostRequest'
- return self.effective_url or self.URL
+ def _createResponse(self, outstream):
+ # Should be overridden by subclasses
+ return BrowserResponse(outstream)
+
######################################
# from: Zope.Publisher.IPublisherRequest.IPublisherRequest
@@ -198,13 +80,12 @@
'See Zope.Publisher.IPublisherRequest.IPublisherRequest'
environ = self._environ
- other = self._other
- form = self.__form
+ form = self._form
method = environ.get('REQUEST_METHOD', 'GET')
if method != 'GET':
# Process form if not a GET request.
- fp = request.__body_instream
+ fp = self._body_instream
else:
fp = None
@@ -526,66 +407,63 @@
item = tuple(form[key])
form[key] = item
- other.update(form)
if meth:
self.setPathSuffix((meth,))
-
-
-
######################################
# from: Zope.Publisher.IPublisherRequest.IPublisherRequest
def traverse(self, object):
'See Zope.Publisher.IPublisherRequest.IPublisherRequest'
ob = super(BrowserRequest, self).traverse(object)
+ method = self._environ.get('REQUEST_METHOD', 'GET').upper()
+
+ base_needed = 0
if self._path_suffix:
# We had a :method variable, so we need to set the base,
# but we don't look for default documents any more.
base_needed = 1
- elif self._environ.get('REQUEST_METHOD', 'GET').upper() == 'GET':
+ redirect = 0
+ elif method == 'GET':
# We need to check for default documents
publication = self.getPublication()
+
+ nsteps = 0
ob, add_steps = publication.getDefaultTraversal(self, ob)
while add_steps:
- base_needed = 1
- ob = super(BrowserRequest, self).traverse(object)
+ nsteps += len(add_steps)
+ add_steps = list(add_steps)
+ add_steps.reverse()
+ self.setTraversalStack(add_steps)
+ ob = super(BrowserRequest, self).traverse(ob)
ob, add_steps = publication.getDefaultTraversal(self, ob)
- else:
- base_needed = 0
-
+ if nsteps > self._endswithslash:
+ base_needed = 1
+ redirect = self.use_redirect
+
+
+ if base_needed:
+ url = self.getURL()
+ response = self.getResponse()
+ if redirect:
+ response.redirect(url)
+ return ''
+ elif not response.getBase():
+ response.setBase(url)
+
+ return ob
+
######################################
# from: Interface.Common.Mapping.IEnumerableMapping
def keys(self):
'See Interface.Common.Mapping.IEnumerableMapping'
- keys = {'URL':1, 'SERVER_URL':1}
- keys.update(self._common)
- keys.update(self._other)
- keys.update(self._environ)
-
- for key in self.environ.keys():
- if (isCGI_NAME(key) or key.startswith('HTTP_')) and \
- (not hide_key(key)):
- keys[key] = 1
-
- n=0
- while 1:
- n=n+1
- key = "URL%s" % n
- if not self.has_key(key): break
-
- n=0
- while 1:
- n=n+1
- key = "BASE%s" % n
- if not self.has_key(key): break
-
- keys=keys.keys()
- keys.sort()
-
- return keys
+ d = {}
+ d.update(self._environ)
+ d.update(self._cookies)
+ d.update(self._form)
+ return d.keys()
######################################
@@ -594,56 +472,47 @@
def get(self, key, default=None):
'See Interface.Common.Mapping.IReadMapping'
+ result = self._form.get(key, self)
+ if result is not self: return result
- if key.startswith('U'):
- match = URLmatch(key)
- if match is not None:
- pathonly, n = match.groups()
- return self.computeURLn(key, int(n), pathonly)
-
- if key.startswith('B'):
- match = BASEmatch(key)
- if match is not None:
- pathonly, n = match.groups()
- return self.computeBASEn(key, int(n), pathonly)
-
- if isCGI_NAME(key) or key.startswith('HTTP_'):
- environ=self.environ
- if environ.has_key(key) and (not hide_key(key)):
- return environ[key]
- return ''
-
- handler = self._key_handlers.get(key, None)
- if handler is not None:
- result = handler(self, self)
- if result is not self:
- return result
-
- result = self._other.get(key, self)
- if result is not self: return result
-
- result = self._common.get(key, self)
- if result is not self: return result
-
+ result = self._cookies.get(key, self)
+ if result is not self: return result
+
result = self._environ.get(key, self)
if result is not self: return result
return default
- get.__permission__ = 'Zope.Public'
-
#
############################################################
+class RedirectingBrowserRequest(BrowserRequest):
+ """Browser requests that redirect when the actual and effective URLs differ
+ """
+
+ use_redirect = 1
+
+class TestRequest(BrowserRequest):
+ def __init__(self, body_instream=None, outstream=None, environ=None, **kw):
- def _resetURLs(self):
- self.URL = '/'.join(
- [self.getServerURL()] + self._script + self.quoted_steps)
- for x in self._computed_urls:
- del self.other[x]
- self._computed_urls = ()
+ _testEnv = {
+ 'SERVER_URL': 'http://127.0.0.1',
+ 'HTTP_HOST': '127.0.0.1',
+ 'CONTENT_LENGTH': '0',
+ 'GATEWAY_INTERFACE': 'TestFooInterface/1.0',
+ }
+
+ if environ:
+ _testEnv.update(environ)
+ if kw:
+ _testEnv.update(kw)
+ if body_instream is None:
+ from StringIO import StringIO
+ body_instream = StringIO('')
+ if outstream is None:
+ outstream = StringIO()
+ super(TestRequest, self).__init__(body_instream, outstream, _testEnv)
- _key_handlers = BaseRequest._key_handlers.copy()
=== Zope3/lib/python/Zope/Publisher/Browser/BrowserResponse.py 1.1.2.1 => 1.1.2.2 ===
class BrowserResponse(HTTPResponse):
- """ """
+ """Browser response
+ """
+
+ __slots__ = (
+ '_base', # The base href
+ )
def setBody(self, body):
- """
- Sets the body of the response
+ """Sets the body of the response
Sets the return body equal to the (string) argument "body". Also
updates the "content-length" return header.
@@ -42,11 +46,11 @@
"""
body = str(body)
- if not response.headers.has_key('content-type'):
+ if not self._headers.has_key('content-type'):
c = (self.__isHTML(body) and 'text/html' or 'text/plain')
- response.setHeader('content-type', c)
+ self.setHeader('content-type', c)
- content_type = response.headers['content-type']
+ content_type = self._headers['content-type']
if is_text_html(content_type):
# Some browsers interpret certain characters in Latin 1 as html
# special characters. These cannot be removed by html_quote,
@@ -54,33 +58,9 @@
body = body.replace('\213', '<')
body = body.replace('\233', '>')
- body = self.__insertBase(response, body)
+ body = self.__insertBase(body)
self._body = body
self._updateContentLength()
- return response
-
-
- def handleException(self, exc_info):
- """
- Calls response.setBody() with an error response.
- """
- t, v = exc_info[:2]
- if isinstance(t, ClassType):
- title = tname = t.__name__
- if issubclass(t, Redirect):
- self.redirect(v.getLocation())
- return
- else:
- title = tname = str(t)
-
- # Throwing non-protocol-specific exceptions is a good way
- # for apps to control the status code.
- self.setStatus(tname)
-
- tb = escape(traceback_string(t, v, exc_info[2]))
- body = self.__wrapInHTML(title, "<pre>\n%s\n</pre>" % tb)
- self.setBody(body)
-
def __isHTML(self, str):
s = str.strip().lower()
@@ -104,7 +84,7 @@
if content_type and content_type != 'text/html':
return body
- if self.base:
+ if getattr(self, '_base', ''):
if body:
match = start_of_header_search(body)
if match is not None:
@@ -112,5 +92,22 @@
ibase = base_re_search(body)
if ibase is None:
body = ('%s\n<base href="%s" />\n%s' %
- (body[:index], self.base, body[index:]))
+ (body[:index], self._base, body[index:]))
return body
+
+ def getBase(self):
+ return getattr(self, '_base', '')
+
+ def setBase(self, base):
+ self._base = base
+
+
+
+latin1_alias_match = re.compile(
+ r'text/html(\s*;\s*charset=((latin)|(latin[-_]?1)|'
+ r'(cp1252)|(cp819)|(csISOLatin1)|(IBM819)|(iso-ir-100)|'
+ r'(iso[-_]8859[-_]1(:1987)?)))?$',re.I).match
+
+def is_text_html(content_type):
+ return (content_type == 'text/html' or
+ latin1_alias_match(content_type) is not None)
=== Zope3/lib/python/Zope/Publisher/Browser/IBrowserApplicationRequest.py 1.1.2.1.2.1 => 1.1.2.1.2.2 ===
from Interface.Attribute import Attribute
-class IHTTPApplicationRequest(IHTTPApplicationRequest):
+class IBrowserApplicationRequest(IHTTPApplicationRequest):
"""Browser-specific requests
"""
=== Zope3/lib/python/Zope/Publisher/Browser/IBrowserPublication.py 1.1.2.1 => 1.1.2.2 ===
from Zope.Publisher.IPublication import IPublication
-class IBrowserGetPublication (IPublication):
+class IBrowserPublication (IPublication):
"""
Object publication framework.
"""
=== Zope3/lib/python/Zope/Publisher/Browser/IBrowserRequest.py 1.1.2.1 => 1.1.2.2 ===
the Request object to the end-developer.
"""
-
- def computeURLn(key, n, pathonly):
- """Compute the nth URL of the path.
- """
-
-
- def computeBASEn(key, n, pathonly):
- """Compute the nth BASE of the path.
- """
-
-
- def setServerURL(protocol=None, hostname=None, port=None):
- """Set the parts of generated URLs.
- """
-
-
- def getServerURL(self, defaut=None):
- """Get the Server URL string.
- """