[Zope-Checkins] SVN: Zope/trunk/src/ Merge -r 106827-106988 from the tseaver-fix_wsgi branch.
Tres Seaver
tseaver at palladion.com
Tue Jun 1 17:08:53 EDT 2010
Log message for revision 112892:
Merge -r 106827-106988 from the tseaver-fix_wsgi branch.
- 100% coverage for 'ZPublisher.HTTPResponse'.
- Stop dancing the status / errmsg into / out of the headers list -- they
aren't "headers" in any practical sense.
- Conform to PEP 8.
- Normalize imports, avoiding BBB import names.
Changed:
U Zope/trunk/src/Testing/ZopeTestCase/zopedoctest/functional.py
U Zope/trunk/src/ZPublisher/HTTPResponse.py
U Zope/trunk/src/ZPublisher/WSGIPublisher.py
U Zope/trunk/src/ZPublisher/tests/testHTTPResponse.py
U Zope/trunk/src/ZServer/HTTPResponse.py
U Zope/trunk/src/ZServer/tests/test_responses.py
A Zope/trunk/src/Zope2/utilities/skel/bin/zope2.wsgi.in
-=-
Modified: Zope/trunk/src/Testing/ZopeTestCase/zopedoctest/functional.py
===================================================================
--- Zope/trunk/src/Testing/ZopeTestCase/zopedoctest/functional.py 2010-06-01 18:39:30 UTC (rev 112891)
+++ Zope/trunk/src/Testing/ZopeTestCase/zopedoctest/functional.py 2010-06-01 21:08:52 UTC (rev 112892)
@@ -185,7 +185,7 @@
header_output.setResponseStatus(response.getStatus(), response.errmsg)
header_output.setResponseHeaders(response.headers)
header_output.appendResponseHeaders(response._cookie_list())
- header_output.appendResponseHeaders(response.accumulated_headers.splitlines())
+ header_output.appendResponseHeaders(response.accumulated_headers)
sync()
Modified: Zope/trunk/src/ZPublisher/HTTPResponse.py
===================================================================
--- Zope/trunk/src/ZPublisher/HTTPResponse.py 2010-06-01 18:39:30 UTC (rev 112891)
+++ Zope/trunk/src/ZPublisher/HTTPResponse.py 2010-06-01 21:08:52 UTC (rev 112892)
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2001 Zope Foundation and Contributors.
+# Copyright (c) 2001-2009 Zope Foundation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
@@ -10,23 +10,30 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-'''CGI Response Output formatter
+""" CGI Response Output formatter
+"""
+from cgi import escape
+import os
+import re
+from string import maketrans
+from string import translate
+import struct
+import sys
+import types
+from urllib import quote
+import zlib
-$Id$'''
-
-import types, os, sys, re
-import zlib, struct
-from string import translate, maketrans
from zope.event import notify
-from BaseResponse import BaseResponse
-from zExceptions import Unauthorized, Redirect
+from zExceptions import Redirect
+from zExceptions import Unauthorized
from zExceptions.ExceptionFormatter import format_exception
-from ZPublisher import BadRequest, InternalError, NotFound
+from ZPublisher import BadRequest
+from ZPublisher import InternalError
+from ZPublisher import NotFound
+from ZPublisher.BaseResponse import BaseResponse
from ZPublisher.pubevents import PubBeforeStreaming
-from cgi import escape
-from urllib import quote
-nl2sp = maketrans('\n',' ')
+nl2sp = maketrans('\n', ' ')
# This may get overwritten during configuration
default_encoding = 'iso-8859-15'
@@ -104,9 +111,6 @@
start_of_header_search = re.compile('(<head[^>]*>)', re.IGNORECASE).search
-accumulate_header = {'set-cookie': 1}.has_key
-
-
_gzip_header = ("\037\213" # magic
"\010" # compression method
"\000" # flags
@@ -130,8 +134,7 @@
return ''.join(_CRLF.split(str(name))), ''.join(_CRLF.split(str(value)))
class HTTPResponse(BaseResponse):
- """\
- An object representation of an HTTP response.
+ """ An object representation of an HTTP response.
The Response type encapsulates all possible responses to HTTP
requests. Responses are normally created by the object publisher.
@@ -150,8 +153,8 @@
passed into the object must be used.
""" #'
- accumulated_headers = ''
body = ''
+ base = ''
realm = 'Zope'
_error_format = 'text/html'
_locked_status = 0
@@ -163,61 +166,48 @@
# 2 - ignore accept-encoding (i.e. force)
use_HTTP_content_compression = 0
- def __init__(self,body='',status=200,headers=None,
- stdout=sys.stdout, stderr=sys.stderr,):
- '''\
- Creates a new response. In effect, the constructor calls
- "self.setBody(body); self.setStatus(status); for name in
- headers.keys(): self.setHeader(name, headers[name])"
- '''
+ def __init__(self,
+ body='',
+ status=200,
+ headers=None,
+ stdout=sys.stdout,
+ stderr=sys.stderr,
+ ):
+ """ Create a new response using the given values.
+ """
if headers is None:
headers = {}
self.headers = headers
+ self.accumulated_headers = []
if status == 200:
self.status = 200
self.errmsg = 'OK'
- headers['status'] = "200 OK"
else:
self.setStatus(status)
- self.base = ''
+
if body:
self.setBody(body)
+
self.cookies = {}
self.stdout = stdout
self.stderr = stderr
def retry(self):
- """Return a response object to be used in a retry attempt
+ """ Return a cloned response object to be used in a retry attempt.
"""
-
# This implementation is a bit lame, because it assumes that
# only stdout stderr were passed to the constructor. OTOH, I
# think that that's all that is ever passed.
-
return self.__class__(stdout=self.stdout, stderr=self.stderr)
- _shutdown_flag = None
- def _requestShutdown(self, exitCode=0):
- """Request that the server shut down with exitCode after fulfilling
- the current request."""
- import ZServer
- ZServer.exit_code = exitCode
- self._shutdown_flag = 1
-
- def _shutdownRequested(self):
- """Returns true if this request requested a server shutdown."""
- return self._shutdown_flag is not None
-
def setStatus(self, status, reason=None, lock=None):
- '''\
- Sets the HTTP status code of the response; the argument may
- either be an integer or a string from { OK, Created, Accepted,
- NoContent, MovedPermanently, MovedTemporarily,
- NotModified, BadRequest, Unauthorized, Forbidden,
- NotFound, InternalError, NotImplemented, BadGateway,
- ServiceUnavailable } that will be converted to the correct
- integer value. '''
+ """ Set the HTTP status code of the response
+
+ o The argument may either be an integer or a string from the
+ 'status_reasons' dict values: status messages will be converted
+ to the correct integer value.
+ """
if self._locked_status:
# Don't change the response status.
# It has already been determined.
@@ -242,50 +232,191 @@
reason = status_reasons[status]
else:
reason = 'Unknown'
- self.setHeader('Status', "%d %s" % (status,str(reason)))
+
self.errmsg = reason
# lock the status if we're told to
if lock:
self._locked_status = 1
+ def setCookie(self, name, value, quoted=True, **kw):
+ """ Set an HTTP cookie.
+
+ The response will include an HTTP header that sets a cookie on
+ cookie-enabled browsers with a key "name" and value
+ "value".
+
+ This value overwrites any previously set value for the
+ cookie in the Response object.
+ """
+ name = str(name)
+ value = str(value)
+
+ cookies = self.cookies
+ if cookies.has_key(name):
+ cookie = cookies[name]
+ else:
+ cookie = cookies[name] = {}
+ for k, v in kw.items():
+ cookie[k] = v
+ cookie['value'] = value
+ cookie['quoted'] = quoted
+
+ def appendCookie(self, name, value):
+ """ Set an HTTP cookie.
+
+ Returns an HTTP header that sets a cookie on cookie-enabled
+ browsers with a key "name" and value "value". If a value for the
+ cookie has previously been set in the response object, the new
+ value is appended to the old one separated by a colon.
+ """
+ name = str(name)
+ value = str(value)
+
+ cookies = self.cookies
+ if cookies.has_key(name):
+ cookie = cookies[name]
+ else:
+ cookie = cookies[name] = {}
+ if cookie.has_key('value'):
+ cookie['value'] = '%s:%s' % (cookie['value'], value)
+ else:
+ cookie['value'] = value
+
+ def expireCookie(self, name, **kw):
+ """ Clear an HTTP cookie.
+
+ The response will include an HTTP header that will remove the cookie
+ corresponding to "name" on the client, if one exists. This is
+ accomplished by sending a new cookie with an expiration date
+ that has already passed. Note that some clients require a path
+ to be specified - this path must exactly match the path given
+ when creating the cookie. The path can be specified as a keyword
+ argument.
+ """
+ name = str(name)
+
+ d = kw.copy()
+ if 'value' in d:
+ d.pop('value')
+ d['max_age'] = 0
+ d['expires'] = 'Wed, 31-Dec-97 23:59:59 GMT'
+
+ self.setCookie(name, value='deleted', **d)
+
+ def getHeader(self, name, literal=0):
+ """ Get a previously set header value.
+
+ Return the value associated with a HTTP return header, or
+ None if no such header has been set in the response
+ yet.
+
+ If the 'literal' flag is true, preserve the case of the header name;
+ otherwise lower-case the header name before looking up the value.
+ """
+ key = literal and name or name.lower()
+ return self.headers.get(key, None)
+
def setHeader(self, name, value, literal=0, scrubbed=False):
- '''\
- Sets an HTTP return header "name" with value "value", clearing
- the previous value set for the header, if one exists. If the
- literal flag is true, the case of the header name is preserved,
- otherwise the header name will be lowercased.'''
+ """ Set an HTTP return header on the response.
+
+ Replay any existing value set for the header.
+
+ If the 'literal' flag is true, preserve the case of the header name;
+ otherwise the header name will be lowercased.
+
+ 'scrubbed' is for internal use, to indicate that another API has
+ already removed any CRLF from the name and value.
+ """
if not scrubbed:
name, value = _scrubHeader(name, value)
key = name.lower()
- if accumulate_header(key):
- self.accumulated_headers = (
- "%s%s: %s\r\n" % (self.accumulated_headers, name, value))
- return
- name = literal and name or key
- self.headers[name] = value
+ # The following is crazy, given that we have APIs for cookies.
+ # Special behavior will go away in Zope 2.13
+ if key == 'set-cookie':
+ self.accumulated_headers.append((name, value))
+ else:
+ name = literal and name or key
+ self.headers[name] = value
- def getHeader(self, name, literal=0):
- '''\
- Get a header value
+ def appendHeader(self, name, value, delimiter=","):
+ """ Append a value to an HTTP return header.
- Returns the value associated with a HTTP return header, or
- "None" if no such header has been set in the response
- yet. If the literal flag is true, the case of the header name is
- preserved, otherwise the header name will be lowercased.'''
- key = name.lower()
- name = literal and name or key
- return self.headers.get(name, None)
+ Set an HTTP return header "name" with value "value",
+ appending it following a comma if there was a previous value
+ set for the header.
+ 'name' is always lowercased before use.
+ """
+ name, value = _scrubHeader(name, value)
+ name = name.lower()
+
+ headers = self.headers
+ if headers.has_key(name):
+ h = headers[name]
+ h = "%s%s\r\n\t%s" % (h, delimiter, value)
+ else:
+ h = value
+ self.setHeader(name,h, scrubbed=True)
+
def addHeader(self, name, value):
- '''\
- Set a new HTTP return header with the given value, while retaining
- any previously set headers with the same name.'''
+ """ Set a new HTTP return header with the given value,
+
+ Retain any previously set headers with the same name.
+
+ Note that this API appneds to the 'accumulated_headers' attribute;
+ it does not update the 'headers' mapping.
+ """
name, value = _scrubHeader(name, value)
- self.accumulated_headers = (
- "%s%s: %s\r\n" % (self.accumulated_headers, name, value))
+ self.accumulated_headers.append((name, value))
__setitem__ = setHeader
+ def setBase(self, base):
+ """Set the base URL for the returned document.
+
+ If base is None, set to the empty string.
+
+ If base is not None, ensure that it has a trailing slach.
+ """
+ if base is None:
+ base = ''
+ elif not base.endswith('/'):
+ base = base + '/'
+
+ self.base = str(base)
+
+ def insertBase(self,
+ base_re_search=re.compile('(<base.*?>)',re.I).search
+ ):
+
+ # Only insert a base tag if content appears to be html.
+ content_type = self.headers.get('content-type', '').split(';')[0]
+ if content_type and (content_type != 'text/html'):
+ return
+
+ if self.base:
+ body = self.body
+ if body:
+ match = start_of_header_search(body)
+ if match is not None:
+ index = match.start(0) + len(match.group(0))
+ ibase = base_re_search(body)
+ if ibase is None:
+ self.body = ('%s\n<base href="%s" />\n%s' %
+ (body[:index], escape(self.base, 1),
+ body[index:]))
+ self.setHeader('content-length', len(self.body))
+
+ def isHTML(self, s):
+ s = s.lstrip()
+ # Note that the string can be big, so s.lower().startswith() is more
+ # expensive than s[:n].lower().
+ if (s[:6].lower() == '<html>' or s[:14].lower() == '<!doctype html'):
+ return 1
+ if s.find('</') > 0:
+ return 1
+ return 0
+
def setBody(self, body, title='', is_error=0,
bogus_str_search=re.compile(" [a-fA-F0-9]+>$").search,
latin1_alias_match=re.compile(
@@ -294,21 +425,31 @@
r'(iso[-_]8859[-_]1(:1987)?)))?$',re.I).match,
lock=None
):
- '''\
- Set the body of the response
+ """ Set the body of the response
Sets the return body equal to the (string) argument "body". Also
updates the "content-length" return header.
+ If the body is already locked via a previous call, do nothing and
+ return None.
+
You can also specify a title, in which case the title and body
will be wrapped up in html, head, title, and body tags.
If the body is a 2-element tuple, then it will be treated
as (title,body)
- If is_error is true then the HTML will be formatted as a Zope error
- message instead of a generic HTML page.
- '''
+ If body is unicode, encode it.
+
+ If body is not a string or unicode, but has an 'asHTML' method, use
+ the result of that method as the body; otherwise, use the 'str'
+ of body.
+
+ If is_error is true, format the HTML as a Zope error message instead
+ of a generic HTML page.
+
+ Return 'self' (XXX as a true value?).
+ """
# allow locking of the body in the same way as the status
if self._locked_body:
return
@@ -340,7 +481,7 @@
bogus_str_search(body) is not None):
self.notFoundError(body[1:-1])
else:
- if(title):
+ if title:
title = str(title)
if not is_error:
self.body = self._html(title, body)
@@ -350,30 +491,34 @@
self.body = body
- isHTML = self.isHTML(self.body)
- if not self.headers.has_key('content-type'):
- if isHTML:
- c = 'text/html; charset=%s' % default_encoding
- else:
- c = 'text/plain; charset=%s' % default_encoding
- self.setHeader('content-type', c)
- else:
- c = self.headers['content-type']
- if c.startswith('text/') and not 'charset=' in c:
- c = '%s; charset=%s' % (c, default_encoding)
- self.setHeader('content-type', c)
+ content_type = self.headers.get('content-type')
# Some browsers interpret certain characters in Latin 1 as html
# special characters. These cannot be removed by html_quote,
# because this is not the case for all encodings.
- content_type = self.headers['content-type']
- if content_type == 'text/html' or latin1_alias_match(
- content_type) is not None:
+ if (content_type == 'text/html' or
+ content_type and latin1_alias_match(content_type) is not None):
body = '<'.join(body.split('\213'))
body = '>'.join(body.split('\233'))
+ self.body = body
+ if content_type is None:
+ if self.isHTML(self.body):
+ content_type = 'text/html; charset=%s' % default_encoding
+ else:
+ content_type = 'text/plain; charset=%s' % default_encoding
+ self.setHeader('content-type', content_type)
+ else:
+ if (content_type.startswith('text/') and
+ 'charset=' not in content_type):
+ content_type = '%s; charset=%s' % (content_type,
+ default_encoding)
+ self.setHeader('content-type', content_type)
+
self.setHeader('content-length', len(self.body))
+
self.insertBase()
+
if self.use_HTTP_content_compression and \
self.headers.get('content-encoding', 'gzip') == 'gzip':
# use HTTP content encoding to compress body contents unless
@@ -402,10 +547,10 @@
# respect Accept-Encoding client header
vary = self.getHeader('Vary')
if vary is None or 'Accept-Encoding' not in vary:
- self.appendHeader('Vary','Accept-Encoding')
+ self.appendHeader('Vary', 'Accept-Encoding')
return self
- def enableHTTPCompression(self,REQUEST={},force=0,disable=0,query=0):
+ def enableHTTPCompression(self, REQUEST={}, force=0, disable=0, query=0):
"""Enable HTTP Content Encoding with gzip compression if possible
REQUEST -- used to check if client can accept compression
@@ -458,11 +603,35 @@
return self.use_HTTP_content_compression
+ def redirect(self, location, status=302, lock=0):
+ """Cause a redirection without raising an error"""
+ self.setStatus(status, lock=lock)
+ self.setHeader('Location', location)
+
+ return str(location)
+
+ # The following two methods are part of a private protocol with the
+ # publisher for handling fatal import errors and TTW shutdown requests.
+ _shutdown_flag = None
+ def _requestShutdown(self, exitCode=0):
+ """ Request that the server shut down with exitCode after fulfilling
+ the current request.
+ """
+ import ZServer
+ ZServer.exit_code = exitCode
+ self._shutdown_flag = 1
+
+ def _shutdownRequested(self):
+ """ Returns true if this request requested a server shutdown.
+ """
+ return self._shutdown_flag is not None
+
+
def _encode_unicode(self,body,
- charset_re=re.compile(r'(?:application|text)/[-+0-9a-z]+\s*;\s*' +
- r'charset=([-_0-9a-z]+' +
- r')(?:(?:\s*;)|\Z)',
- re.IGNORECASE)):
+ charset_re=re.compile(
+ r'(?:application|text)/[-+0-9a-z]+\s*;\s*' +
+ r'charset=([-_0-9a-z]+' +
+ r')(?:(?:\s*;)|\Z)', re.IGNORECASE)):
def fix_xml_preamble(body, encoding):
""" fixes the encoding in the XML preamble according
@@ -471,7 +640,8 @@
if body.startswith('<?xml'):
pos_right = body.find('?>') # right end of the XML preamble
- body = ('<?xml version="1.0" encoding="%s" ?>' % encoding) + body[pos_right+2:]
+ body = ('<?xml version="1.0" encoding="%s" ?>'
+ % encoding) + body[pos_right+2:]
return body
# Encode the Unicode data as requested
@@ -486,133 +656,14 @@
return body
else:
if ct.startswith('text/') or ct.startswith('application/'):
- self.headers['content-type'] = '%s; charset=%s' % (ct, default_encoding)
+ self.headers['content-type'] = '%s; charset=%s' % (ct,
+ default_encoding)
# Use the default character encoding
body = body.encode(default_encoding, 'replace')
body = fix_xml_preamble(body, default_encoding)
return body
- def setBase(self,base):
- """Set the base URL for the returned document.
- If base is None, or the document already has a base, do nothing."""
- if base is None:
- base = ''
- elif not base.endswith('/'):
- base = base+'/'
- self.base = str(base)
-
- def insertBase(self,
- base_re_search=re.compile('(<base.*?>)',re.I).search
- ):
-
- # Only insert a base tag if content appears to be html.
- content_type = self.headers.get('content-type', '').split(';')[0]
- if content_type and (content_type != 'text/html'):
- return
-
- if self.base:
- body = self.body
- if body:
- match = start_of_header_search(body)
- if match is not None:
- index = match.start(0) + len(match.group(0))
- ibase = base_re_search(body)
- if ibase is None:
- self.body = ('%s\n<base href="%s" />\n%s' %
- (body[:index], escape(self.base, 1),
- body[index:]))
- self.setHeader('content-length', len(self.body))
-
- def appendCookie(self, name, value):
- '''\
- Returns an HTTP header that sets a cookie on cookie-enabled
- browsers with a key "name" and value "value". If a value for the
- cookie has previously been set in the response object, the new
- value is appended to the old one separated by a colon. '''
-
- name = str(name)
- value = str(value)
-
- cookies = self.cookies
- if cookies.has_key(name):
- cookie = cookies[name]
- else:
- cookie = cookies[name] = {}
- if cookie.has_key('value'):
- cookie['value'] = '%s:%s' % (cookie['value'], value)
- else:
- cookie['value'] = value
-
- def expireCookie(self, name, **kw):
- '''\
- Cause an HTTP cookie to be removed from the browser
-
- The response will include an HTTP header that will remove the cookie
- corresponding to "name" on the client, if one exists. This is
- accomplished by sending a new cookie with an expiration date
- that has already passed. Note that some clients require a path
- to be specified - this path must exactly match the path given
- when creating the cookie. The path can be specified as a keyword
- argument.
- '''
- name = str(name)
-
- d = kw.copy()
- d['max_age'] = 0
- d['expires'] = 'Wed, 31-Dec-97 23:59:59 GMT'
- apply(HTTPResponse.setCookie, (self, name, 'deleted'), d)
-
- def setCookie(self, name, value, quoted=True, **kw):
- '''\
- Set an HTTP cookie on the browser
-
- The response will include an HTTP header that sets a cookie on
- cookie-enabled browsers with a key "name" and value
- "value". This overwrites any previously set value for the
- cookie in the Response object.
- '''
- name = str(name)
- value = str(value)
-
- cookies = self.cookies
- if cookies.has_key(name):
- cookie = cookies[name]
- else:
- cookie = cookies[name] = {}
- for k, v in kw.items():
- cookie[k] = v
- cookie['value'] = value
- cookie['quoted'] = quoted
-
- def appendHeader(self, name, value, delimiter=","):
- '''\
- Append a value to a header.
-
- Sets an HTTP return header "name" with value "value",
- appending it following a comma if there was a previous value
- set for the header. '''
- name, value = _scrubHeader(name, value)
- name = name.lower()
-
- headers = self.headers
- if headers.has_key(name):
- h = headers[name]
- h = "%s%s\r\n\t%s" % (h,delimiter,value)
- else:
- h = value
- self.setHeader(name,h, scrubbed=True)
-
- def isHTML(self, s):
- s = s.lstrip()
- # Note that the string can be big, so s.lower().startswith() is more
- # expensive than s[:n].lower().
- if (s[:6].lower() == '<html>' or s[:14].lower() == '<!doctype html'):
- return 1
- if s.find('</') > 0:
- return 1
- return 0
-
# deprecated
def quoteHTML(self, text):
return escape(text, 1)
@@ -621,21 +672,7 @@
tb = format_exception(t, v, tb, as_html=as_html)
return '\n'.join(tb)
- def redirect(self, location, status=302, lock=0):
- """Cause a redirection without raising an error"""
- self.setStatus(status)
- self.setHeader('Location', location)
- location = str(location)
-
- if lock:
- # Don't let anything change the status code.
- # The "lock" argument needs to be set when redirecting
- # from a standard_error_message page.
- self._locked_status = 1
- return location
-
-
def _html(self,title,body):
return ("<html>\n"
"<head>\n<title>%s</title>\n</head>\n"
@@ -806,7 +843,8 @@
if fatal and t is SystemExit and v.code == 0:
body = self.setBody(
(str(t),
- 'Zope has exited normally.<p>' + self._traceback(t, v, tb) + '</p>'),
+ 'Zope has exited normally.<p>'
+ + self._traceback(t, v, tb) + '</p>'),
is_error=1)
else:
try:
@@ -885,23 +923,23 @@
not headers.has_key('transfer-encoding'):
self.setHeader('content-length',len(body))
- headersl = []
- append = headersl.append
+ chunks = []
+ append = chunks.append
# status header must come first.
- append("Status: %s" % headers.get('status', '200 OK'))
+ append("Status: %d %s" % (self.status, self.errmsg))
append("X-Powered-By: Zope (www.zope.org), Python (www.python.org)")
- if headers.has_key('status'):
- del headers['status']
- for key, val in headers.items():
+ for key, value in headers.items():
if key.lower() == key:
# only change non-literal header names
key = '-'.join([x.capitalize() for x in key.split('-')])
- append("%s: %s" % (key, val))
- if self.cookies:
- headersl = headersl+self._cookie_list()
- headersl[len(headersl):] = [self.accumulated_headers, body]
- return '\r\n'.join(headersl)
+ append("%s: %s" % (key, value))
+ chunks.extend(self._cookie_list())
+ for key, value in self.accumulated_headers:
+ append("%s: %s" % (key, value))
+ append('') # RFC 2616 mandates empty line between headers and payload
+ append(body)
+ return '\r\n'.join(chunks)
def write(self,data):
"""\
Modified: Zope/trunk/src/ZPublisher/WSGIPublisher.py
===================================================================
--- Zope/trunk/src/ZPublisher/WSGIPublisher.py 2010-06-01 18:39:30 UTC (rev 112891)
+++ Zope/trunk/src/ZPublisher/WSGIPublisher.py 2010-06-01 21:08:52 UTC (rev 112892)
@@ -10,28 +10,29 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
-__doc__="""Python Object Publisher -- Publish Python objects on web servers
+""" Python Object Publisher -- Publish Python objects on web servers
+"""
+from cStringIO import StringIO
+import re
+import sys
+import time
-$Id: Publish.py 67721 2006-04-28 14:57:35Z regebro $"""
-
-import sys, os, re, time
import transaction
-from Response import Response
-from Request import Request
-from maybe_lock import allocate_lock
-from mapply import mapply
from zExceptions import Redirect
-from cStringIO import StringIO
from ZServer.medusa.http_date import build_http_date
+from ZPublisher.HTTPResponse import HTTPResponse
+from ZPublisher.HTTPRequest import HTTPRequest
+from ZPublisher.maybe_lock import allocate_lock
+from ZPublisher.mapply import mapply
-class WSGIResponse(Response):
+class WSGIResponse(HTTPResponse):
"""A response object for WSGI
This Response object knows nothing about ZServer, but tries to be
compatible with the ZServerHTTPResponse.
- Most significantly, streaming is not (yet) supported."""
-
+ Most significantly, streaming is not (yet) supported.
+ """
_streaming = 0
def __str__(self,
@@ -271,7 +272,7 @@
response._http_connection = environ.get('CONNECTION_TYPE', 'close')
response._server_version = environ['SERVER_SOFTWARE']
- request = Request(environ['wsgi.input'], environ, response)
+ request = HTTPRequest(environ['wsgi.input'], environ, response)
# Let's support post-mortem debugging
handle_errors = environ.get('wsgi.handleErrors', True)
Modified: Zope/trunk/src/ZPublisher/tests/testHTTPResponse.py
===================================================================
--- Zope/trunk/src/ZPublisher/tests/testHTTPResponse.py 2010-06-01 18:39:30 UTC (rev 112891)
+++ Zope/trunk/src/ZPublisher/tests/testHTTPResponse.py 2010-06-01 21:08:52 UTC (rev 112892)
@@ -4,6 +4,17 @@
class HTTPResponseTests(unittest.TestCase):
+ _old_default_encoding = None
+
+ def tearDown(self):
+ if self._old_default_encoding is not None:
+ self._setDefaultEncoding(self._old_default_encoding)
+
+ def _setDefaultEncoding(self, value):
+ from ZPublisher import HTTPResponse as module
+ (module.default_encoding,
+ self._old_default_encoding) = (value, module.default_encoding)
+
def _getTargetClass(self):
from ZPublisher.HTTPResponse import HTTPResponse
@@ -13,23 +24,181 @@
return self._getTargetClass()(*args, **kw)
- def test_setStatus_with_exceptions(self):
+ def test_ctor_defaults(self):
+ import sys
+ response = self._makeOne()
+ self.assertEqual(response.accumulated_headers, [])
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.errmsg, 'OK')
+ self.assertEqual(response.base, '')
+ self.assertEqual(response.body, '')
+ self.assertEqual(response.cookies, {})
+ self.failUnless(response.stdout is sys.stdout)
+ self.failUnless(response.stderr is sys.stderr)
+ def test_ctor_w_body(self):
+ response = self._makeOne(body='ABC')
+ self.assertEqual(response.body, 'ABC')
+
+ def test_ctor_w_headers(self):
+ response = self._makeOne(headers={'foo': 'bar'})
+ self.assertEqual(response.headers, {'foo': 'bar',
+ })
+
+ def test_ctor_w_status_code(self):
+ response = self._makeOne(status=401)
+ self.assertEqual(response.status, 401)
+ self.assertEqual(response.errmsg, 'Unauthorized')
+ self.assertEqual(response.headers, {})
+
+ def test_ctor_w_status_errmsg(self):
+ response = self._makeOne(status='Unauthorized')
+ self.assertEqual(response.status, 401)
+ self.assertEqual(response.errmsg, 'Unauthorized')
+ self.assertEqual(response.headers, {})
+
+ def test_ctor_w_status_exception(self):
from zExceptions import Unauthorized
+ response = self._makeOne(status=Unauthorized)
+ self.assertEqual(response.status, 401)
+ self.assertEqual(response.errmsg, 'Unauthorized')
+ self.assertEqual(response.headers, {})
+
+ def test_ctor_charset_no_content_type_header(self):
+ response = self._makeOne(body='foo')
+ self.assertEqual(response.headers.get('content-type'),
+ 'text/plain; charset=iso-8859-15')
+
+ def test_ctor_charset_text_header_no_charset_defaults_latin1(self):
+ response = self._makeOne(body='foo',
+ headers={'content-type': 'text/plain'})
+ self.assertEqual(response.headers.get('content-type'),
+ 'text/plain; charset=iso-8859-15')
+
+ def test_ctor_charset_application_header_no_header(self):
+ response = self._makeOne(body='foo',
+ headers={'content-type': 'application/foo'})
+ self.assertEqual(response.headers.get('content-type'),
+ 'application/foo')
+
+ def test_ctor_charset_application_header_with_header(self):
+ response = self._makeOne(body='foo',
+ headers={'content-type':
+ 'application/foo; charset: something'})
+ self.assertEqual(response.headers.get('content-type'),
+ 'application/foo; charset: something')
+
+ def test_ctor_charset_unicode_body_application_header(self):
+ BODY = unicode('ärger', 'iso-8859-15')
+ response = self._makeOne(body=BODY,
+ headers={'content-type': 'application/foo'})
+ self.assertEqual(response.headers.get('content-type'),
+ 'application/foo; charset=iso-8859-15')
+ self.assertEqual(response.body, 'ärger')
+
+ def test_ctor_charset_unicode_body_application_header_diff_encoding(self):
+ BODY = unicode('ärger', 'iso-8859-15')
+ response = self._makeOne(body=BODY,
+ headers={'content-type':
+ 'application/foo; charset=utf-8'})
+ self.assertEqual(response.headers.get('content-type'),
+ 'application/foo; charset=utf-8')
+ # Body is re-encoded to match the header
+ self.assertEqual(response.body, BODY.encode('utf-8'))
+
+ def test_ctor_body_recodes_to_match_content_type_charset(self):
+ xml = (u'<?xml version="1.0" encoding="iso-8859-15" ?>\n'
+ '<foo><bar/></foo>')
+ response = self._makeOne(body=xml, headers={'content-type':
+ 'text/xml; charset=utf-8'})
+ self.assertEqual(response.body, xml.replace('iso-8859-15', 'utf-8'))
+
+ def test_ctor_body_already_matches_charset_unchanged(self):
+ xml = (u'<?xml version="1.0" encoding="iso-8859-15" ?>\n'
+ '<foo><bar/></foo>')
+ response = self._makeOne(body=xml, headers={'content-type':
+ 'text/xml; charset=iso-8859-15'})
+ self.assertEqual(response.body, xml)
+
+ def test_retry(self):
+ STDOUT, STDERR = object(), object()
+ response = self._makeOne(stdout=STDOUT, stderr=STDERR)
+ cloned = response.retry()
+ self.failUnless(isinstance(cloned, self._getTargetClass()))
+ self.failUnless(cloned.stdout is STDOUT)
+ self.failUnless(cloned.stderr is STDERR)
+
+ def test_setStatus_code(self):
+ response = self._makeOne()
+ response.setStatus(400)
+ self.assertEqual(response.status, 400)
+ self.assertEqual(response.errmsg, 'Bad Request')
+
+ def test_setStatus_errmsg(self):
+ response = self._makeOne()
+ response.setStatus('Bad Request')
+ self.assertEqual(response.status, 400)
+ self.assertEqual(response.errmsg, 'Bad Request')
+
+ def test_setStatus_BadRequest(self):
+ from zExceptions import BadRequest
+ response = self._makeOne()
+ response.setStatus(BadRequest)
+ self.assertEqual(response.status, 400)
+ self.assertEqual(response.errmsg, 'Bad Request')
+
+ def test_setStatus_Unauthorized_exception(self):
+ from zExceptions import Unauthorized
+ response = self._makeOne()
+ response.setStatus(Unauthorized)
+ self.assertEqual(response.status, 401)
+ self.assertEqual(response.errmsg, 'Unauthorized')
+
+ def test_setStatus_Forbidden_exception(self):
from zExceptions import Forbidden
+ response = self._makeOne()
+ response.setStatus(Forbidden)
+ self.assertEqual(response.status, 403)
+ self.assertEqual(response.errmsg, 'Forbidden')
+
+ def test_setStatus_NotFound_exception(self):
from zExceptions import NotFound
- from zExceptions import BadRequest
+ response = self._makeOne()
+ response.setStatus(NotFound)
+ self.assertEqual(response.status, 404)
+ self.assertEqual(response.errmsg, 'Not Found')
+
+ def test_setStatus_ResourceLockedError_exception(self):
+ response = self._makeOne()
+ from webdav.Lockable import ResourceLockedError
+ response.setStatus(ResourceLockedError)
+ self.assertEqual(response.status, 423)
+ self.assertEqual(response.errmsg, 'Locked')
+
+ def test_setStatus_InternalError_exception(self):
from zExceptions import InternalError
+ response = self._makeOne()
+ response.setStatus(InternalError)
+ self.assertEqual(response.status, 500)
+ self.assertEqual(response.errmsg, 'Internal Server Error')
- for exc_type, code in ((Unauthorized, 401),
- (Forbidden, 403),
- (NotFound, 404),
- (BadRequest, 400),
- (InternalError, 500)):
- response = self._makeOne()
- response.setStatus(exc_type)
- self.assertEqual(response.status, code)
+ def test_setCookie_no_existing(self):
+ response = self._makeOne()
+ response.setCookie('foo', 'bar')
+ cookie = response.cookies.get('foo', None)
+ self.assertEqual(len(cookie), 2)
+ self.assertEqual(cookie.get('value'), 'bar')
+ self.assertEqual(cookie.get('quoted'), True)
+ def test_setCookie_w_existing(self):
+ response = self._makeOne()
+ response.setCookie('foo', 'bar')
+ response.setCookie('foo', 'baz')
+ cookie = response.cookies.get('foo', None)
+ self.assertEqual(len(cookie), 2)
+ self.assertEqual(cookie.get('value'), 'baz')
+ self.assertEqual(cookie.get('quoted'), True)
+
def test_setCookie_no_attrs(self):
response = self._makeOne()
response.setCookie('foo', 'bar')
@@ -37,7 +206,6 @@
self.assertEqual(len(cookie), 2)
self.assertEqual(cookie.get('value'), 'bar')
self.assertEqual(cookie.get('quoted'), True)
-
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"')
@@ -123,15 +291,6 @@
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"')
- def test_expireCookie(self):
- response = self._makeOne()
- response.expireCookie('foo', path='/')
- cookie = response.cookies.get('foo', None)
- self.failUnless(cookie)
- self.assertEqual(cookie.get('expires'), 'Wed, 31-Dec-97 23:59:59 GMT')
- self.assertEqual(cookie.get('max_age'), 0)
- self.assertEqual(cookie.get('path'), '/')
-
def test_setCookie_w_httponly_true_value(self):
response = self._makeOne()
response.setCookie('foo', 'bar', http_only=True)
@@ -170,6 +329,31 @@
self.assertEqual(len(cookie_list), 1)
self.assertEqual(cookie_list[0], 'Set-Cookie: foo=bar')
+ def test_appendCookie_w_existing(self):
+ response = self._makeOne()
+ response.setCookie('foo', 'bar', path='/')
+ response.appendCookie('foo', 'baz')
+ cookie = response.cookies.get('foo', None)
+ self.failUnless(cookie)
+ self.assertEqual(cookie.get('value'), 'bar:baz')
+ self.assertEqual(cookie.get('path'), '/')
+
+ def test_appendCookie_no_existing(self):
+ response = self._makeOne()
+ response.appendCookie('foo', 'baz')
+ cookie = response.cookies.get('foo', None)
+ self.failUnless(cookie)
+ self.assertEqual(cookie.get('value'), 'baz')
+
+ def test_expireCookie(self):
+ response = self._makeOne()
+ response.expireCookie('foo', path='/')
+ cookie = response.cookies.get('foo', None)
+ self.failUnless(cookie)
+ self.assertEqual(cookie.get('expires'), 'Wed, 31-Dec-97 23:59:59 GMT')
+ self.assertEqual(cookie.get('max_age'), 0)
+ self.assertEqual(cookie.get('path'), '/')
+
def test_expireCookie1160(self):
# Verify that the cookie is expired even if an expires kw arg is passed
# http://zope.org/Collectors/Zope/1160
@@ -182,24 +366,22 @@
self.assertEqual(cookie.get('max_age'), 0)
self.assertEqual(cookie.get('path'), '/')
- def test_appendCookie(self):
+ def test_getHeader_nonesuch(self):
response = self._makeOne()
- response.setCookie('foo', 'bar', path='/')
- response.appendCookie('foo', 'baz')
- cookie = response.cookies.get('foo', None)
- self.failUnless(cookie)
- self.assertEqual(cookie.get('value'), 'bar:baz')
- self.assertEqual(cookie.get('path'), '/')
+ self.assertEqual(response.getHeader('nonesuch'), None)
- def test_appendHeader(self):
- response = self._makeOne()
- response.setHeader('foo', 'bar')
- response.appendHeader('foo', 'foo')
- self.assertEqual(response.headers.get('foo'), 'bar,\r\n\tfoo')
- response.setHeader('xxx', 'bar')
- response.appendHeader('XXX', 'foo')
- self.assertEqual(response.headers.get('xxx'), 'bar,\r\n\tfoo')
+ def test_getHeader_existing(self):
+ response = self._makeOne(headers={'foo': 'bar'})
+ self.assertEqual(response.getHeader('foo'), 'bar')
+ def test_getHeader_existing_not_literal(self):
+ response = self._makeOne(headers={'foo': 'bar'})
+ self.assertEqual(response.getHeader('Foo'), 'bar')
+
+ def test_getHeader_existing_w_literal(self):
+ response = self._makeOne(headers={'Foo': 'Bar'})
+ self.assertEqual(response.getHeader('Foo', literal=True), 'Bar')
+
def test_setHeader(self):
response = self._makeOne()
response.setHeader('foo', 'bar')
@@ -217,64 +399,57 @@
self.assertEqual(response.getHeader('SPAM', literal=True), 'eggs')
self.assertEqual(response.getHeader('spam'), None)
- def test_setStatus_ResourceLockedError(self):
+ def test_setHeader_drops_CRLF(self):
+ # RFC2616 disallows CRLF in a header value.
response = self._makeOne()
- from webdav.Lockable import ResourceLockedError
- response.setStatus(ResourceLockedError)
- self.assertEqual(response.status, 423)
+ response.setHeader('Location',
+ 'http://www.ietf.org/rfc/\r\nrfc2616.txt')
+ self.assertEqual(response.headers['location'],
+ 'http://www.ietf.org/rfc/rfc2616.txt')
- def test_charset_no_header(self):
- response = self._makeOne(body='foo')
- self.assertEqual(response.headers.get('content-type'),
- 'text/plain; charset=iso-8859-15')
+ def test_setHeader_Set_Cookie_special_case(self):
+ # This is crazy, given that we have APIs for cookies. Special
+ # behavior will go away in Zope 2.13
+ response = self._makeOne()
+ response.setHeader('Set-Cookie', 'foo="bar"')
+ self.assertEqual(response.getHeader('Set-Cookie'), None)
+ self.assertEqual(response.accumulated_headers,
+ [('Set-Cookie', 'foo="bar"')])
- def test_charset_text_header(self):
- response = self._makeOne(body='foo',
- headers={'content-type': 'text/plain'})
- self.assertEqual(response.headers.get('content-type'),
- 'text/plain; charset=iso-8859-15')
+ def test_setHeader_drops_CRLF_when_accumulating(self):
+ # RFC2616 disallows CRLF in a header value.
+ # This is crazy, given that we have APIs for cookies. Special
+ # behavior will go away in Zope 2.13
+ response = self._makeOne()
+ response.setHeader('Set-Cookie', 'allowed="OK"')
+ response.setHeader('Set-Cookie',
+ 'violation="http://www.ietf.org/rfc/\r\nrfc2616.txt"')
+ self.assertEqual(response.accumulated_headers,
+ [('Set-Cookie', 'allowed="OK"'),
+ ('Set-Cookie',
+ 'violation="http://www.ietf.org/rfc/rfc2616.txt"')])
- def test_charset_application_header_no_header(self):
- response = self._makeOne(body='foo',
- headers={'content-type': 'application/foo'})
- self.assertEqual(response.headers.get('content-type'),
- 'application/foo')
+ def test_appendHeader_no_existing(self):
+ response = self._makeOne()
+ response.appendHeader('foo', 'foo')
+ self.assertEqual(response.headers.get('foo'), 'foo')
- def test_charset_application_header_with_header(self):
- response = self._makeOne(body='foo',
- headers={'content-type': 'application/foo; charset: something'})
- self.assertEqual(response.headers.get('content-type'),
- 'application/foo; charset: something')
-
- def test_charset_application_header_unicode(self):
- response = self._makeOne(body=unicode('ärger', 'iso-8859-15'),
- headers={'content-type': 'application/foo'})
- self.assertEqual(response.headers.get('content-type'),
- 'application/foo; charset=iso-8859-15')
- self.assertEqual(response.body, 'ärger')
+ def test_appendHeader_no_existing_case_insensative(self):
+ response = self._makeOne()
+ response.appendHeader('Foo', 'foo')
+ self.assertEqual(response.headers.get('foo'), 'foo')
- def test_charset_application_header_unicode_1(self):
- response = self._makeOne(body=unicode('ärger', 'iso-8859-15'),
- headers={'content-type': 'application/foo; charset=utf-8'})
- self.assertEqual(response.headers.get('content-type'),
- 'application/foo; charset=utf-8')
- self.assertEqual(response.body, unicode('ärger',
- 'iso-8859-15').encode('utf-8'))
+ def test_appendHeader_w_existing(self):
+ response = self._makeOne()
+ response.setHeader('foo', 'bar')
+ response.appendHeader('foo', 'foo')
+ self.assertEqual(response.headers.get('foo'), 'bar,\r\n\tfoo')
- def test_XMLEncodingRecoding(self):
- xml = u'<?xml version="1.0" encoding="iso-8859-15" ?>\n<foo><bar/></foo>'
- response = self._makeOne(body=xml, headers={'content-type': 'text/xml; charset=utf-8'})
- self.assertEqual(xml.replace('iso-8859-15', 'utf-8')==response.body, True)
- response = self._makeOne(body=xml, headers={'content-type': 'text/xml; charset=iso-8859-15'})
- self.assertEqual(xml==response.body, True)
-
- def test_addHeader_drops_CRLF(self):
- # RFC2616 disallows CRLF in a header value.
+ def test_appendHeader_w_existing_case_insenstative(self):
response = self._makeOne()
- response.addHeader('Location',
- 'http://www.ietf.org/rfc/\r\nrfc2616.txt')
- self.assertEqual(response.accumulated_headers,
- 'Location: http://www.ietf.org/rfc/rfc2616.txt\r\n')
+ response.setHeader('xxx', 'bar')
+ response.appendHeader('XXX', 'foo')
+ self.assertEqual(response.headers.get('xxx'), 'bar,\r\n\tfoo')
def test_appendHeader_drops_CRLF(self):
# RFC2616 disallows CRLF in a header value.
@@ -284,44 +459,659 @@
self.assertEqual(response.headers['location'],
'http://www.ietf.org/rfc/rfc2616.txt')
- def test_setHeader_drops_CRLF(self):
- # RFC2616 disallows CRLF in a header value.
+ def test_addHeader_is_case_sensitive(self):
response = self._makeOne()
- response.setHeader('Location',
- 'http://www.ietf.org/rfc/\r\nrfc2616.txt')
- self.assertEqual(response.headers['location'],
- 'http://www.ietf.org/rfc/rfc2616.txt')
+ response.addHeader('Location', 'http://www.ietf.org/rfc/rfc2616.txt')
+ self.assertEqual(response.accumulated_headers,
+ [('Location', 'http://www.ietf.org/rfc/rfc2616.txt')])
- def test_setHeader_drops_CRLF_when_accumulating(self):
+ def test_addHeader_drops_CRLF(self):
# RFC2616 disallows CRLF in a header value.
response = self._makeOne()
- response.setHeader('Set-Cookie', 'allowed="OK"')
- response.setHeader('Set-Cookie',
- 'violation="http://www.ietf.org/rfc/\r\nrfc2616.txt"')
+ response.addHeader('Location',
+ 'http://www.ietf.org/rfc/\r\nrfc2616.txt')
self.assertEqual(response.accumulated_headers,
- 'Set-Cookie: allowed="OK"\r\n' +
- 'Set-Cookie: '
- 'violation="http://www.ietf.org/rfc/rfc2616.txt"\r\n')
+ [('Location', 'http://www.ietf.org/rfc/rfc2616.txt')])
- def test_setBody_compression_vary(self):
+ def test_setBase_None(self):
+ response = self._makeOne()
+ response.base = 'BEFORE'
+ response.setBase(None)
+ self.assertEqual(response.base, '')
+
+ def test_setBase_no_trailing_path(self):
+ response = self._makeOne()
+ response.setBase('foo')
+ self.assertEqual(response.base, 'foo/')
+
+ def test_setBase_w_trailing_path(self):
+ response = self._makeOne()
+ response.setBase('foo/')
+ self.assertEqual(response.base, 'foo/')
+
+ def test_insertBase_not_HTML_no_change(self):
+ response = self._makeOne()
+ response.setHeader('Content-Type', 'application/pdf')
+ response.setHeader('Content-Length', 8)
+ response.body = 'BLAHBLAH'
+ response.insertBase()
+ self.assertEqual(response.body, 'BLAHBLAH')
+ self.assertEqual(response.getHeader('Content-Length'), '8')
+
+ def test_insertBase_HTML_no_base_w_head_not_munged(self):
+ HTML = '<html><head></head><body></body></html>'
+ response = self._makeOne()
+ response.setHeader('Content-Type', 'text/html')
+ response.setHeader('Content-Length', len(HTML))
+ response.body = HTML
+ response.insertBase()
+ self.assertEqual(response.body, HTML)
+ self.assertEqual(response.getHeader('Content-Length'), str(len(HTML)))
+
+ def test_insertBase_HTML_w_base_no_head_not_munged(self):
+ HTML = '<html><body></body></html>'
+ response = self._makeOne()
+ response.setHeader('Content-Type', 'text/html')
+ response.setHeader('Content-Length', len(HTML))
+ response.body = HTML
+ response.insertBase()
+ self.assertEqual(response.body, HTML)
+ self.assertEqual(response.getHeader('Content-Length'), str(len(HTML)))
+
+ def test_insertBase_HTML_w_base_w_head_munged(self):
+ HTML = '<html><head></head><body></body></html>'
+ MUNGED = ('<html><head>\n'
+ '<base href="http://example.com/base/" />\n'
+ '</head><body></body></html>')
+ response = self._makeOne()
+ response.setHeader('Content-Type', 'text/html')
+ response.setHeader('Content-Length', 8)
+ response.body = HTML
+ response.setBase('http://example.com/base/')
+ response.insertBase()
+ self.assertEqual(response.body, MUNGED)
+ self.assertEqual(response.getHeader('Content-Length'),
+ str(len(MUNGED)))
+
+ def test_setBody_w_locking(self):
+ response = self._makeOne()
+ response.setBody('BEFORE', lock=True)
+ result = response.setBody('AFTER')
+ self.failIf(result)
+ self.assertEqual(response.body, 'BEFORE')
+
+ def test_setBody_empty_unchanged(self):
+ response = self._makeOne()
+ response.body = 'BEFORE'
+ result = response.setBody('')
+ self.failUnless(result)
+ self.assertEqual(response.body, 'BEFORE')
+ self.assertEqual(response.getHeader('Content-Type'), None)
+ self.assertEqual(response.getHeader('Content-Length'), None)
+
+ def test_setBody_2_tuple_wo_is_error_converted_to_HTML(self):
+ EXPECTED = ("<html>\n"
+ "<head>\n<title>TITLE</title>\n</head>\n"
+ "<body>\nBODY\n</body>\n"
+ "</html>\n")
+ response = self._makeOne()
+ response.body = 'BEFORE'
+ result = response.setBody(('TITLE', 'BODY'))
+ self.failUnless(result)
+ self.assertEqual(response.body, EXPECTED)
+ self.assertEqual(response.getHeader('Content-Type'),
+ 'text/html; charset=iso-8859-15')
+ self.assertEqual(response.getHeader('Content-Length'),
+ str(len(EXPECTED)))
+
+ def test_setBody_2_tuple_w_is_error_converted_to_Site_Error(self):
+ response = self._makeOne()
+ response.body = 'BEFORE'
+ result = response.setBody(('TITLE', 'BODY'), is_error=True)
+ self.failUnless(result)
+ self.failIf('BEFORE' in response.body)
+ self.failUnless('<h2>Site Error</h2>' in response.body)
+ self.failUnless('TITLE' in response.body)
+ self.failUnless('BODY' in response.body)
+ self.assertEqual(response.getHeader('Content-Type'),
+ 'text/html; charset=iso-8859-15')
+
+ def test_setBody_string_not_HTML(self):
+ response = self._makeOne()
+ result = response.setBody('BODY')
+ self.failUnless(result)
+ self.assertEqual(response.body, 'BODY')
+ self.assertEqual(response.getHeader('Content-Type'),
+ 'text/plain; charset=iso-8859-15')
+ self.assertEqual(response.getHeader('Content-Length'), '4')
+
+ def test_setBody_string_HTML(self):
+ HTML = '<html><head></head><body></body></html>'
+ response = self._makeOne()
+ result = response.setBody(HTML)
+ self.failUnless(result)
+ self.assertEqual(response.body, HTML)
+ self.assertEqual(response.getHeader('Content-Type'),
+ 'text/html; charset=iso-8859-15')
+ self.assertEqual(response.getHeader('Content-Length'), str(len(HTML)))
+
+ def test_setBody_object_with_asHTML(self):
+ HTML = '<html><head></head><body></body></html>'
+ class Dummy:
+ def asHTML(self):
+ return HTML
+ response = self._makeOne()
+ result = response.setBody(Dummy())
+ self.failUnless(result)
+ self.assertEqual(response.body, HTML)
+ self.assertEqual(response.getHeader('Content-Type'),
+ 'text/html; charset=iso-8859-15')
+ self.assertEqual(response.getHeader('Content-Length'), str(len(HTML)))
+
+ def test_setBody_object_with_unicode(self):
+ HTML = u'<html><head></head><body><h1>Tr\u0039s Bien</h1></body></html>'
+ ENCODED = HTML.encode('iso-8859-15')
+ response = self._makeOne()
+ result = response.setBody(HTML)
+ self.failUnless(result)
+ self.assertEqual(response.body, ENCODED)
+ self.assertEqual(response.getHeader('Content-Type'),
+ 'text/html; charset=iso-8859-15')
+ self.assertEqual(response.getHeader('Content-Length'),
+ str(len(ENCODED)))
+
+ def test_setBody_w_bogus_pseudo_HTML(self):
+ # The 2001 checkin message which added the path-under-test says:
+ # (r19315): "merged content type on error fixes from 2.3
+ # If the str of the object returs a Python "pointer" looking mess,
+ # don't let it get treated as HTML.
+ from ZPublisher import NotFound
+ BOGUS = '<Bogus a39d53d>'
+ response = self._makeOne()
+ self.assertRaises(NotFound, response.setBody, BOGUS)
+
+ def test_setBody_html_no_charset_escapes_latin1_gt_lt(self):
+ response = self._makeOne()
+ BEFORE = ('<html><head></head><body><p>LT: \213</p>'
+ '<p>GT: \233</p></body></html>')
+ AFTER = ('<html><head></head><body><p>LT: <</p>'
+ '<p>GT: ></p></body></html>')
+ response.setHeader('Content-Type', 'text/html')
+ result = response.setBody(BEFORE)
+ self.failUnless(result)
+ self.assertEqual(response.body, AFTER)
+ self.assertEqual(response.getHeader('Content-Length'), str(len(AFTER)))
+
+ def test_setBody_latin_alias_escapes_latin1_gt_lt(self):
+ response = self._makeOne()
+ BEFORE = ('<html><head></head><body><p>LT: \213</p>'
+ '<p>GT: \233</p></body></html>')
+ AFTER = ('<html><head></head><body><p>LT: <</p>'
+ '<p>GT: ></p></body></html>')
+ response.setHeader('Content-Type', 'text/html; charset=latin1')
+ result = response.setBody(BEFORE)
+ self.failUnless(result)
+ self.assertEqual(response.body, AFTER)
+ self.assertEqual(response.getHeader('Content-Length'), str(len(AFTER)))
+
+ def test_setBody_calls_insertBase(self):
+ response = self._makeOne()
+ lamb = {}
+ def _insertBase():
+ lamb['flavor'] = 'CURRY'
+ response.insertBase = _insertBase
+ response.setBody('Garlic Naan')
+ self.assertEqual(lamb['flavor'], 'CURRY')
+
+ #def test_setBody_w_HTTP_content_compression(self):
+
+ def test_setBody_compression_uncompressible_mimetype(self):
+ BEFORE = 'foo' * 100 # body must get smaller on compression
+ response = self._makeOne()
+ response.setHeader('Content-Type', 'image/jpeg')
+ response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
+ response.setBody(BEFORE)
+ self.failIf(response.getHeader('Content-Encoding'))
+ self.assertEqual(response.body, BEFORE)
+
+ def test_setBody_compression_existing_encoding(self):
+ BEFORE = 'foo' * 100 # body must get smaller on compression
+ response = self._makeOne()
+ response.setHeader('Content-Encoding', 'piglatin')
+ response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
+ response.setBody(BEFORE)
+ self.assertEqual(response.getHeader('Content-Encoding'), 'piglatin')
+ self.assertEqual(response.body, BEFORE)
+
+ def test_setBody_compression_too_short_to_gzip(self):
+ BEFORE = 'foo' # body must get smaller on compression
+ response = self._makeOne()
+ response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
+ response.setBody(BEFORE)
+ self.failIf(response.getHeader('Content-Encoding'))
+ self.assertEqual(response.body, BEFORE)
+
+ def test_setBody_compression_no_prior_vary_header(self):
# Vary header should be added here
response = self._makeOne()
- response.enableHTTPCompression(REQUEST={'HTTP_ACCEPT_ENCODING': 'gzip'})
- response.setBody('foo'*100) # body must get smaller on compression
- self.assertEqual('Accept-Encoding' in response.getHeader('Vary'), True)
- # But here it would be unnecessary
+ response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
+ response.setBody('foo' * 100) # body must get smaller on compression
+ self.failUnless('Accept-Encoding' in response.getHeader('Vary'))
+
+ def test_setBody_compression_w_prior_vary_header_wo_encoding(self):
+ # Vary header should be added here
response = self._makeOne()
- response.enableHTTPCompression(REQUEST={'HTTP_ACCEPT_ENCODING': 'gzip'})
- response.setHeader('Vary', 'Accept-Encoding,Accept-Language')
- before = response.getHeader('Vary')
+ response.setHeader('Vary', 'Cookie')
+ response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
+ response.setBody('foo' * 100) # body must get smaller on compression
+ self.failUnless('Accept-Encoding' in response.getHeader('Vary'))
+
+ def test_setBody_compression_w_prior_vary_header_incl_encoding(self):
+ # Vary header already had Accept-Ecoding', do'nt munge
+ PRIOR = 'Accept-Encoding,Accept-Language'
+ response = self._makeOne()
+ response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
+ response.setHeader('Vary', PRIOR)
response.setBody('foo'*100)
- self.assertEqual(before, response.getHeader('Vary'))
+ self.assertEqual(response.getHeader('Vary'), PRIOR)
+ def test_setBody_compression_no_prior_vary_header_but_forced(self):
+ # Compression forced, don't add a Vary entry for compression.
+ response = self._makeOne()
+ response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'},
+ force=True)
+ response.setBody('foo' * 100) # body must get smaller on compression
+ self.assertEqual(response.getHeader('Vary'), None)
+ def test_redirect_defaults(self):
+ URL = 'http://example.com'
+ response = self._makeOne()
+ result = response.redirect(URL)
+ self.assertEqual(result, URL)
+ self.assertEqual(response.status, 302)
+ self.assertEqual(response.getHeader('Location'), URL)
+ self.failIf(response._locked_status)
+
+ def test_redirect_explicit_status(self):
+ URL = 'http://example.com'
+ response = self._makeOne()
+ result = response.redirect(URL, status=307)
+ self.assertEqual(response.status, 307)
+ self.assertEqual(response.getHeader('Location'), URL)
+ self.failIf(response._locked_status)
+
+ def test_redirect_w_lock(self):
+ URL = 'http://example.com'
+ response = self._makeOne()
+ result = response.redirect(URL, lock=True)
+ self.assertEqual(response.status, 302)
+ self.assertEqual(response.getHeader('Location'), URL)
+ self.failUnless(response._locked_status)
+
+ def test__encode_unicode_no_content_type_uses_default_encoding(self):
+ self._setDefaultEncoding('UTF8')
+ UNICODE = u'<h1>Tr\u0039s Bien</h1>'
+ response = self._makeOne()
+ self.assertEqual(response._encode_unicode(UNICODE),
+ UNICODE.encode('UTF8'))
+
+ def test__encode_unicode_w_content_type_no_charset_updates_charset(self):
+ self._setDefaultEncoding('UTF8')
+ UNICODE = u'<h1>Tr\u0039s Bien</h1>'
+ response = self._makeOne()
+ response.setHeader('Content-Type', 'text/html')
+ self.assertEqual(response._encode_unicode(UNICODE),
+ UNICODE.encode('UTF8'))
+ response.getHeader('Content-Type', 'text/html; charset=UTF8')
+
+ def test__encode_unicode_w_content_type_w_charset(self):
+ self._setDefaultEncoding('UTF8')
+ UNICODE = u'<h1>Tr\u0039s Bien</h1>'
+ response = self._makeOne()
+ response.setHeader('Content-Type', 'text/html; charset=latin1')
+ self.assertEqual(response._encode_unicode(UNICODE),
+ UNICODE.encode('latin1'))
+ response.getHeader('Content-Type', 'text/html; charset=latin1')
+
+ def test__encode_unicode_w_content_type_w_charset_xml_preamble(self):
+ self._setDefaultEncoding('UTF8')
+ PREAMBLE = u'<?xml version="1.0" ?>'
+ ELEMENT = u'<element>Tr\u0039s Bien</element>'
+ UNICODE = u'\n'.join([PREAMBLE, ELEMENT])
+ response = self._makeOne()
+ response.setHeader('Content-Type', 'text/html; charset=latin1')
+ self.assertEqual(response._encode_unicode(UNICODE),
+ '<?xml version="1.0" encoding="latin1" ?>\n'
+ + ELEMENT.encode('latin1'))
+ response.getHeader('Content-Type', 'text/html; charset=latin1')
+
+ def test_quoteHTML(self):
+ BEFORE = '<p>This is a story about a boy named "Sue"</p>'
+ AFTER = ('<p>This is a story about a boy named '
+ '"Sue"</p>')
+ response = self._makeOne()
+ self.assertEqual(response.quoteHTML(BEFORE), AFTER)
+
+ def test_notFoundError(self):
+ from ZPublisher import NotFound
+ response = self._makeOne()
+ try:
+ response.notFoundError()
+ except NotFound, raised:
+ self.assertEqual(response.status, 404)
+ self.failUnless("<p><b>Resource:</b> Unknown</p>" in str(raised))
+ else:
+ self.fail("Didn't raise NotFound")
+
+ def test_notFoundError_w_entry(self):
+ from ZPublisher import NotFound
+ response = self._makeOne()
+ try:
+ response.notFoundError('ENTRY')
+ except NotFound, raised:
+ self.assertEqual(response.status, 404)
+ self.failUnless("<p><b>Resource:</b> ENTRY</p>" in str(raised))
+ else:
+ self.fail("Didn't raise NotFound")
+
+ def test_forbiddenError(self):
+ from ZPublisher import NotFound
+ response = self._makeOne()
+ try:
+ response.forbiddenError()
+ except NotFound, raised:
+ self.assertEqual(response.status, 404)
+ self.failUnless("<p><b>Resource:</b> Unknown</p>" in str(raised))
+ else:
+ self.fail("Didn't raise NotFound")
+
+ def test_forbiddenError_w_entry(self):
+ from ZPublisher import NotFound
+ response = self._makeOne()
+ try:
+ response.forbiddenError('ENTRY')
+ except NotFound, raised:
+ self.assertEqual(response.status, 404)
+ self.failUnless("<p><b>Resource:</b> ENTRY</p>" in str(raised))
+ else:
+ self.fail("Didn't raise NotFound")
+
+ def test_debugError(self):
+ from ZPublisher import NotFound
+ response = self._makeOne()
+ try:
+ response.debugError('testing')
+ except NotFound, raised:
+ self.assertEqual(response.status, 200)
+ self.failUnless("Zope has encountered a problem publishing "
+ "your object.<p>\ntesting</p>" in str(raised))
+ else:
+ self.fail("Didn't raise NotFound")
+
+ def test_badRequestError_valid_parameter_name(self):
+ from ZPublisher import BadRequest
+ response = self._makeOne()
+ try:
+ response.badRequestError('some_parameter')
+ except BadRequest, raised:
+ self.assertEqual(response.status, 400)
+ self.failUnless("The parameter, <em>some_parameter</em>, "
+ "was omitted from the request." in str(raised))
+ else:
+ self.fail("Didn't raise BadRequest")
+
+ def test_badRequestError_invalid_parameter_name(self):
+ from ZPublisher import InternalError
+ response = self._makeOne()
+ try:
+ response.badRequestError('URL1')
+ except InternalError, raised:
+ self.assertEqual(response.status, 400)
+ self.failUnless("Sorry, an internal error occurred in this "
+ "resource." in str(raised))
+ else:
+ self.fail("Didn't raise InternalError")
+
+ def test__unauthorized_no_realm(self):
+ response = self._makeOne()
+ response.realm = ''
+ response._unauthorized()
+ self.failIf('WWW-Authenticate' in response.headers)
+
+ def test__unauthorized_w_default_realm(self):
+ response = self._makeOne()
+ response._unauthorized()
+ self.failUnless('WWW-Authenticate' in response.headers) #literal
+ self.assertEqual(response.headers['WWW-Authenticate'],
+ 'basic realm="Zope"')
+
+ def test__unauthorized_w_realm(self):
+ response = self._makeOne()
+ response.realm = 'Folly'
+ response._unauthorized()
+ self.failUnless('WWW-Authenticate' in response.headers) #literal
+ self.assertEqual(response.headers['WWW-Authenticate'],
+ 'basic realm="Folly"')
+
+ def test_unauthorized_no_debug_mode(self):
+ from zExceptions import Unauthorized
+ response = self._makeOne()
+ try:
+ response.unauthorized()
+ except Unauthorized, raised:
+ self.assertEqual(response.status, 200) # publisher sets 401 later
+ self.failUnless("<strong>You are not authorized "
+ "to access this resource.</strong>" in str(raised))
+ else:
+ self.fail("Didn't raise Unauthorized")
+
+ def test_unauthorized_w_debug_mode_no_credentials(self):
+ from zExceptions import Unauthorized
+ response = self._makeOne()
+ response.debug_mode = True
+ try:
+ response.unauthorized()
+ except Unauthorized, raised:
+ self.failUnless("<p>\nNo Authorization header found.</p>"
+ in str(raised))
+ else:
+ self.fail("Didn't raise Unauthorized")
+
+ def test_unauthorized_w_debug_mode_w_credentials(self):
+ from zExceptions import Unauthorized
+ response = self._makeOne()
+ response.debug_mode = True
+ response._auth = 'bogus'
+ try:
+ response.unauthorized()
+ except Unauthorized, raised:
+ self.failUnless("<p>\nUsername and password are not correct.</p>"
+ in str(raised))
+ else:
+ self.fail("Didn't raise Unauthorized")
+
+ def test___str__already_wrote(self):
+ response = self._makeOne()
+ response._wrote = True
+ self.assertEqual(str(response), '')
+
+ def test___str__empty(self):
+ response = self._makeOne()
+ result = str(response)
+ lines = result.split('\r\n')
+ self.assertEqual(len(lines), 5)
+ self.assertEqual(lines[0], 'Status: 200 OK')
+ self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
+ 'Python (www.python.org)')
+ self.assertEqual(lines[2], 'Content-Length: 0')
+ self.assertEqual(lines[3], '')
+ self.assertEqual(lines[4], '')
+
+ def test___str__existing_content_length(self):
+ # The application can break clients by setting a bogus length; we
+ # don't do anything to stop that.
+ response = self._makeOne()
+ response.setHeader('Content-Length', 42)
+ result = str(response)
+ lines = result.split('\r\n')
+ self.assertEqual(len(lines), 5)
+ self.assertEqual(lines[0], 'Status: 200 OK')
+ self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
+ 'Python (www.python.org)')
+ self.assertEqual(lines[2], 'Content-Length: 42')
+ self.assertEqual(lines[3], '')
+ self.assertEqual(lines[4], '')
+
+ def test___str__existing_transfer_encoding(self):
+ # If 'Transfer-Encoding' is set, don't force 'Content-Length'.
+ response = self._makeOne()
+ response.setHeader('Transfer-Encoding', 'slurry')
+ result = str(response)
+ lines = result.split('\r\n')
+ self.assertEqual(len(lines), 5)
+ self.assertEqual(lines[0], 'Status: 200 OK')
+ self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
+ 'Python (www.python.org)')
+ self.assertEqual(lines[2], 'Transfer-Encoding: slurry')
+ self.assertEqual(lines[3], '')
+ self.assertEqual(lines[4], '')
+
+ def test___str__after_setHeader(self):
+ response = self._makeOne()
+ response.setHeader('x-consistency', 'Foolish')
+ result = str(response)
+ lines = result.split('\r\n')
+ self.assertEqual(len(lines), 6)
+ self.assertEqual(lines[0], 'Status: 200 OK')
+ self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
+ 'Python (www.python.org)')
+ self.assertEqual(lines[2], 'Content-Length: 0')
+ self.assertEqual(lines[3], 'X-Consistency: Foolish')
+ self.assertEqual(lines[4], '')
+ self.assertEqual(lines[5], '')
+
+ def test___str__after_setHeader_literal(self):
+ response = self._makeOne()
+ response.setHeader('X-consistency', 'Foolish', literal=True)
+ result = str(response)
+ lines = result.split('\r\n')
+ self.assertEqual(len(lines), 6)
+ self.assertEqual(lines[0], 'Status: 200 OK')
+ self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
+ 'Python (www.python.org)')
+ self.assertEqual(lines[2], 'Content-Length: 0')
+ self.assertEqual(lines[3], 'X-consistency: Foolish')
+ self.assertEqual(lines[4], '')
+ self.assertEqual(lines[5], '')
+
+ def test___str__after_redirect(self):
+ response = self._makeOne()
+ response.redirect('http://example.com/')
+ result = str(response)
+ lines = result.split('\r\n')
+ self.assertEqual(len(lines), 6)
+ self.assertEqual(lines[0], 'Status: 302 Moved Temporarily')
+ self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
+ 'Python (www.python.org)')
+ self.assertEqual(lines[2], 'Content-Length: 0')
+ self.assertEqual(lines[3], 'Location: http://example.com/')
+ self.assertEqual(lines[4], '')
+ self.assertEqual(lines[5], '')
+
+ def test___str__after_setCookie_appendCookie(self):
+ response = self._makeOne()
+ response.setCookie('foo', 'bar', path='/')
+ response.appendCookie('foo', 'baz')
+ result = str(response)
+ lines = result.split('\r\n')
+ self.assertEqual(len(lines), 6)
+ self.assertEqual(lines[0], 'Status: 200 OK')
+ self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
+ 'Python (www.python.org)')
+ self.assertEqual(lines[2], 'Content-Length: 0')
+ self.assertEqual(lines[3], 'Set-Cookie: foo="bar%3Abaz"; '
+ 'Path=/')
+ self.assertEqual(lines[4], '')
+ self.assertEqual(lines[5], '')
+
+ def test___str__after_expireCookie(self):
+ response = self._makeOne()
+ response.expireCookie('qux', path='/')
+ result = str(response)
+ lines = result.split('\r\n')
+ self.assertEqual(len(lines), 6)
+ self.assertEqual(lines[0], 'Status: 200 OK')
+ self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
+ 'Python (www.python.org)')
+ self.assertEqual(lines[2], 'Content-Length: 0')
+ self.assertEqual(lines[3], 'Set-Cookie: qux="deleted"; '
+ 'Path=/; '
+ 'Expires=Wed, 31-Dec-97 23:59:59 GMT; '
+ 'Max-Age=0')
+ self.assertEqual(lines[4], '')
+ self.assertEqual(lines[5], '')
+
+ def test___str__after_addHeader(self):
+ response = self._makeOne()
+ response.addHeader('X-Consistency', 'Foolish')
+ response.addHeader('X-Consistency', 'Oatmeal')
+ result = str(response)
+ lines = result.split('\r\n')
+ self.assertEqual(len(lines), 7)
+ self.assertEqual(lines[0], 'Status: 200 OK')
+ self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
+ 'Python (www.python.org)')
+ self.assertEqual(lines[2], 'Content-Length: 0')
+ self.assertEqual(lines[3], 'X-Consistency: Foolish')
+ self.assertEqual(lines[4], 'X-Consistency: Oatmeal')
+ self.assertEqual(lines[5], '')
+ self.assertEqual(lines[6], '')
+
+ def test___str__w_body(self):
+ response = self._makeOne()
+ response.setBody('BLAH')
+ result = str(response)
+ lines = result.split('\r\n')
+ self.assertEqual(len(lines), 6)
+ self.assertEqual(lines[0], 'Status: 200 OK')
+ self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
+ 'Python (www.python.org)')
+ self.assertEqual(lines[2], 'Content-Length: 4')
+ self.assertEqual(lines[3],
+ 'Content-Type: text/plain; charset=iso-8859-15')
+ self.assertEqual(lines[4], '')
+ self.assertEqual(lines[5], 'BLAH')
+
+ def test_write_already_wrote(self):
+ from StringIO import StringIO
+ stdout = StringIO()
+ response = self._makeOne(stdout=stdout)
+ response.write('Kilroy was here!')
+ self.failUnless(response._wrote)
+ lines = stdout.getvalue().split('\r\n')
+ self.assertEqual(len(lines), 5)
+ self.assertEqual(lines[0], 'Status: 200 OK')
+ self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
+ 'Python (www.python.org)')
+ self.assertEqual(lines[2], 'Content-Length: 0')
+ self.assertEqual(lines[3], '')
+ self.assertEqual(lines[4], 'Kilroy was here!')
+
+ def test_write_not_already_wrote(self):
+ from StringIO import StringIO
+ stdout = StringIO()
+ response = self._makeOne(stdout=stdout)
+ response._wrote = True
+ response.write('Kilroy was here!')
+ lines = stdout.getvalue().split('\r\n')
+ self.assertEqual(len(lines), 1)
+ self.assertEqual(lines[0], 'Kilroy was here!')
+
+ #TODO
+ # def test_exception_* WAAAAAA!
+
+
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(HTTPResponseTests, 'test'))
return suite
-
-if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')
Modified: Zope/trunk/src/ZServer/HTTPResponse.py
===================================================================
--- Zope/trunk/src/ZServer/HTTPResponse.py 2010-06-01 18:39:30 UTC (rev 112891)
+++ Zope/trunk/src/ZServer/HTTPResponse.py 2010-06-01 21:08:52 UTC (rev 112892)
@@ -17,60 +17,66 @@
and logging duties.
"""
-import time, re, sys, tempfile
+import asyncore
from cStringIO import StringIO
+import re
+import tempfile
import thread
+import time
+
from zope.event import notify
-from ZPublisher.HTTPResponse import HTTPResponse
-from ZPublisher.Iterators import IStreamIterator
-from ZPublisher.pubevents import PubBeforeStreaming
-from medusa.http_date import build_http_date
-from PubCore.ZEvent import Wakeup
-from medusa.producers import hooked_producer
-from medusa import http_server
-import asyncore
-from Producers import ShutdownProducer, LoggingProducer, CallbackProducer, \
- file_part_producer, file_close_producer, iterator_producer
-import DebugLogger
+from ZPublisher.HTTPResponse import HTTPResponse # XXX WTF?
+from ZPublisher.Iterators import IStreamIterator # XXX WTF?
+from ZPublisher.pubevents import PubBeforeStreaming # XXX WTF?
+from ZServer.medusa.http_date import build_http_date
+from ZServer.PubCore.ZEvent import Wakeup
+from ZServer.medusa import http_server
+
+from ZServer.Producers import ShutdownProducer
+from ZServer.Producers import LoggingProducer
+from ZServer.Producers import CallbackProducer
+from ZServer.Producers import file_part_producer
+from ZServer.Producers import file_close_producer
+from ZServer.Producers import iterator_producer
+from ZServer.DebugLogger import log
+
+
class ZServerHTTPResponse(HTTPResponse):
"Used to push data into a channel's producer fifo"
# Set this value to 1 if streaming output in
# HTTP/1.1 should use chunked encoding
- http_chunk=1
- http_chunk_size=1024
+ http_chunk = 1
+ http_chunk_size = 1024
# defaults
- _http_version='1.0'
- _http_connection='close'
- _server_version='Zope/2.0 ZServer/2.0'
+ _http_version = '1.0'
+ _http_connection = 'close'
+ _server_version = 'Zope/2.0 ZServer/2.0'
# using streaming response
- _streaming=0
+ _streaming = 0
# using chunking transfer-encoding
- _chunking=0
+ _chunking = 0
_bodyproducer = None
- def __str__(self,
- html_search=re.compile('<html>',re.I).search,
- ):
+ def __str__(self):
if self._wrote:
if self._chunking:
return '0\r\n\r\n'
else:
return ''
- headers=self.headers
- body=self.body
+ headers = self.headers
+ body = self.body
# set 204 (no content) status if 200 and response is empty
# and not streaming
- if not headers.has_key('content-type') and \
- not headers.has_key('content-length') and \
- not self._streaming and \
- self.status == 200:
+ if ('content-type' not in headers and
+ 'content-length' not in headers and
+ not self._streaming and self.status == 200):
self.setStatus('nocontent')
if self.status in (100, 101, 102, 204, 304):
@@ -84,65 +90,65 @@
elif not headers.has_key('content-length') and not self._streaming:
self.setHeader('content-length', len(body))
- headersl=[]
- append=headersl.append
+ chunks = []
+ append = chunks.append
- status=headers.get('status', '200 OK')
# status header must come first.
- append("HTTP/%s %s" % (self._http_version or '1.0' , status))
- if headers.has_key('status'):
- del headers['status']
+ append("HTTP/%s %d %s" % (self._http_version or '1.0',
+ self.status, self.errmsg))
# add zserver headers
append('Server: %s' % self._server_version)
append('Date: %s' % build_http_date(time.time()))
- if self._http_version=='1.0':
- if self._http_connection=='keep-alive':
+ if self._http_version == '1.0':
+ if self._http_connection == 'keep-alive':
self.setHeader('Connection','Keep-Alive')
else:
self.setHeader('Connection','close')
# Close the connection if we have been asked to.
# Use chunking if streaming output.
- if self._http_version=='1.1':
- if self._http_connection=='close':
+ if self._http_version == '1.1':
+ if self._http_connection == 'close':
self.setHeader('Connection','close')
elif (not self.headers.has_key('content-length') and
self.http_chunk and self._streaming):
self.setHeader('Transfer-Encoding','chunked')
- self._chunking=1
+ self._chunking = 1
headers = headers.items()
- for line in self.accumulated_headers.splitlines():
+ for line in self.accumulated_headers:
if line[0] == '\t':
headers[-1][1] += '\n' + line
continue
headers.append(line.split(': ', 1))
for key, val in headers:
- if key.lower()==key:
+ if key.lower() == key:
# only change non-literal header names
- key="%s%s" % (key[:1].upper(), key[1:])
- start=0
- l=key.find('-',start)
+ key = "%s%s" % (key[:1].upper(), key[1:])
+ start = 0
+ l = key.find('-',start)
while l >= start:
- key="%s-%s%s" % (key[:l],key[l+1:l+2].upper(),key[l+2:])
- start=l+1
- l=key.find('-',start)
+ key = "%s-%s%s" % (key[:l],
+ key[l+1:l+2].upper(),
+ key[l+2:])
+ start = l + 1
+ l = key.find('-', start)
val = val.replace('\n\t', '\r\n\t')
append("%s: %s" % (key, val))
if self.cookies:
- headersl.extend(self._cookie_list())
+ chunks.extend(self._cookie_list())
append('')
append(body)
- return "\r\n".join(headersl)
+ return "\r\n".join(chunks)
- _tempfile=None
- _templock=None
- _tempstart=0
+ _tempfile = None
+ _templock = None
+ _tempstart = 0
def write(self,data):
"""\
@@ -164,45 +170,44 @@
if type(data) != type(''):
raise TypeError('Value must be a string')
- stdout=self.stdout
+ stdout = self.stdout
if not self._wrote:
-
notify(PubBeforeStreaming(self))
-
- l=self.headers.get('content-length', None)
+
+ l = self.headers.get('content-length', None)
if l is not None:
try:
- if type(l) is type(''): l=int(l)
+ if type(l) is type(''): l = int(l)
if l > 128000:
- self._tempfile=tempfile.TemporaryFile()
- self._templock=thread.allocate_lock()
+ self._tempfile = tempfile.TemporaryFile()
+ self._templock = thread.allocate_lock()
except: pass
- self._streaming=1
+ self._streaming = 1
stdout.write(str(self))
- self._wrote=1
+ self._wrote = 1
if not data: return
if self._chunking:
data = '%x\r\n%s\r\n' % (len(data),data)
- l=len(data)
+ l = len(data)
- t=self._tempfile
+ t = self._tempfile
if t is None or l<200:
stdout.write(data)
else:
- b=self._tempstart
- e=b+l
+ b = self._tempstart
+ e = b + l
self._templock.acquire()
try:
t.seek(b)
t.write(data)
finally:
self._templock.release()
- self._tempstart=e
+ self._tempstart = e
stdout.write(file_part_producer(t,self._templock,b,e), l)
_retried_response = None
@@ -214,18 +219,18 @@
finally:
self._retried_response = None
return
- stdout=self.stdout
+ stdout = self.stdout
- t=self._tempfile
+ t = self._tempfile
if t is not None:
stdout.write(file_close_producer(t), 0)
- self._tempfile=None
+ self._tempfile = None
stdout.finish(self)
stdout.close()
- self.stdout=None # need to break cycle?
- self._request=None
+ self.stdout = None # need to break cycle?
+ self._request = None
def retry(self):
"""Return a request object to be used in a retry attempt
@@ -234,11 +239,11 @@
# only stdout stderr were passed to the constructor. OTOH, I
# think that that's all that is ever passed.
- response=self.__class__(stdout=self.stdout, stderr=self.stderr)
- response.headers=self.headers
- response._http_version=self._http_version
- response._http_connection=self._http_connection
- response._server_version=self._server_version
+ response = self.__class__(stdout=self.stdout, stderr=self.stderr)
+ response.headers = self.headers
+ response._http_version = self._http_version
+ response._http_connection = self._http_connection
+ response._server_version = self._server_version
self._retried_response = response
return response
@@ -272,28 +277,28 @@
restrict access to channel to the push method only."""
def __init__(self, request):
- self._channel=request.channel
- self._request=request
- self._shutdown=0
- self._close=0
- self._bytes=0
+ self._channel = request.channel
+ self._request = request
+ self._shutdown = 0
+ self._close = 0
+ self._bytes = 0
def write(self, text, l=None):
if self._channel.closed:
return
- if l is None: l=len(text)
- self._bytes=self._bytes + l
+ if l is None: l = len(text)
+ self._bytes = self._bytes + l
self._channel.push(text,0)
Wakeup()
def close(self):
- DebugLogger.log('A', id(self._request),
+ log('A', id(self._request),
'%s %s' % (self._request.reply_code, self._bytes))
if not self._channel.closed:
self._channel.push(LoggingProducer(self._request, self._bytes), 0)
self._channel.push(CallbackProducer(self._channel.done), 0)
self._channel.push(CallbackProducer(
- lambda t=('E', id(self._request)): apply(DebugLogger.log, t)), 0)
+ lambda t=('E', id(self._request)): apply(log, t)), 0)
if self._shutdown:
self._channel.push(ShutdownProducer(), 0)
Wakeup()
@@ -304,15 +309,15 @@
# channel closed too soon
self._request.log(self._bytes)
- DebugLogger.log('E', id(self._request))
+ log('E', id(self._request))
if self._shutdown:
Wakeup(lambda: asyncore.close_all())
else:
Wakeup()
- self._channel=None #need to break cycles?
- self._request=None
+ self._channel = None #need to break cycles?
+ self._request = None
def flush(self): pass # yeah, whatever
@@ -321,8 +326,8 @@
self._shutdown = 1
if response.headers.get('connection','') == 'close' or \
response.headers.get('Connection','') == 'close':
- self._close=1
- self._request.reply_code=response.status
+ self._close = 1
+ self._request.reply_code = response.status
def start_response(self, status, headers, exc_info=None):
# Used for WSGI
@@ -342,9 +347,10 @@
"Simple http response factory"
# should this be integrated into the HTTPResponse constructor?
- response=ZServerHTTPResponse(stdout=ChannelPipe(request), stderr=StringIO())
- response._http_version=request.version
- if request.version=='1.0' and is_proxying_match(request.request):
+ response = ZServerHTTPResponse(stdout=ChannelPipe(request),
+ stderr=StringIO())
+ response._http_version = request.version
+ if request.version == '1.0' and is_proxying_match(request.request):
# a request that was made as if this zope was an http 1.0 proxy.
# that means we have to use some slightly different http
# headers to manage persistent connections.
@@ -354,5 +360,5 @@
connection_re = http_server.CONNECTION
response._http_connection = http_server.get_header(connection_re,
request.header).lower()
- response._server_version=request.channel.server.SERVER_IDENT
+ response._server_version = request.channel.server.SERVER_IDENT
return response
Modified: Zope/trunk/src/ZServer/tests/test_responses.py
===================================================================
--- Zope/trunk/src/ZServer/tests/test_responses.py 2010-06-01 18:39:30 UTC (rev 112891)
+++ Zope/trunk/src/ZServer/tests/test_responses.py 2010-06-01 21:08:52 UTC (rev 112892)
@@ -111,7 +111,9 @@
'Title-Cased': 'bar',
'mixed-CasED': 'spam',
'multilined': 'eggs\n\tham'}
- response.accumulated_headers = 'foo-bar: bar\n\tbaz\nFoo-bar: monty\n'
+ response.accumulated_headers = ['foo-bar: bar',
+ '\tbaz',
+ 'Foo-bar: monty']
response.cookies = dict(foo=dict(value='bar'))
response.body = 'A body\nwith multiple lines\n'
Added: Zope/trunk/src/Zope2/utilities/skel/bin/zope2.wsgi.in
===================================================================
--- Zope/trunk/src/Zope2/utilities/skel/bin/zope2.wsgi.in (rev 0)
+++ Zope/trunk/src/Zope2/utilities/skel/bin/zope2.wsgi.in 2010-06-01 21:08:52 UTC (rev 112892)
@@ -0,0 +1,6 @@
+from Zope2.Startup.run import configure
+from Zope2 import startup
+configure('<<INSTANCE_HOME>>/etc/zope.conf')
+startup()
+# mod_wsgi looks for the special name 'application'.
+from ZPublisher.WSGIPublisher import publish_module as application
More information about the Zope-Checkins
mailing list