[Zope-Checkins] SVN: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/ PEP 8, import cleanups, etc.
Tres Seaver
tseaver at palladion.com
Mon Dec 21 15:13:15 EST 2009
Log message for revision 106830:
PEP 8, import cleanups, etc.
Changed:
U Zope/branches/tseaver-fix_wsgi/src/ZPublisher/HTTPResponse.py
U Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/testHTTPResponse.py
-=-
Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/HTTPResponse.py
===================================================================
--- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/HTTPResponse.py 2009-12-21 19:01:51 UTC (rev 106829)
+++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/HTTPResponse.py 2009-12-21 20:13:09 UTC (rev 106830)
@@ -1,6 +1,6 @@
#############################################################################
#
-# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
+# Copyright (c) 2001-2009 Zope Foundation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
@@ -10,23 +10,29 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
-'''CGI Response Output formatter
-
-$Id$'''
-__version__ = '$Revision: 1.81 $'[11:-2]
-
-import types, os, sys, re
-import zlib, struct
-from string import translate, maketrans
-from BaseResponse import BaseResponse
-from zExceptions import Unauthorized, Redirect
-from zExceptions.ExceptionFormatter import format_exception
-from ZPublisher import BadRequest, InternalError, NotFound
+""" CGI Response Output formatter
+"""
from cgi import escape
+import os
+import re
+from string import maketrans
+from string import translate
+import struct
+import sys
+import types
from urllib import quote
+import zlib
-nl2sp = maketrans('\n',' ')
+from zExceptions import Redirect
+from zExceptions import Unauthorized
+from zExceptions.ExceptionFormatter import format_exception
+from ZPublisher import BadRequest
+from ZPublisher import InternalError
+from ZPublisher import NotFound
+from ZPublisher.BaseResponse import BaseResponse
+nl2sp = maketrans('\n', ' ')
+
# This may get overwritten during configuration
default_encoding = 'iso-8859-15'
@@ -103,9 +109,6 @@
start_of_header_search = re.compile('(<head[^>]*>)', re.IGNORECASE).search
-accumulate_header = {'set-cookie': 1}.has_key
-
-
_gzip_header = ("\037\213" # magic
"\010" # compression method
"\000" # flags
@@ -129,8 +132,7 @@
return ''.join(_CRLF.split(str(name))), ''.join(_CRLF.split(str(value)))
class HTTPResponse(BaseResponse):
- """\
- An object representation of an HTTP response.
+ """ An object representation of an HTTP response.
The Response type encapsulates all possible responses to HTTP
requests. Responses are normally created by the object publisher.
@@ -149,7 +151,6 @@
passed into the object must be used.
""" #'
- accumulated_headers = ''
body = ''
realm = 'Zope'
_error_format = 'text/html'
@@ -162,16 +163,14 @@
# 2 - ignore accept-encoding (i.e. force)
use_HTTP_content_compression = 0
- def __init__(self,body='',status=200,headers=None,
+ def __init__(self, body='', status=200, headers=None,
stdout=sys.stdout, stderr=sys.stderr,):
- '''\
- Creates a new response. In effect, the constructor calls
- "self.setBody(body); self.setStatus(status); for name in
- headers.keys(): self.setHeader(name, headers[name])"
- '''
+ """ Creates a new response using the given values.
+ """
if headers is None:
headers = {}
self.headers = headers
+ self.accumulated_headers = []
if status == 200:
self.status = 200
@@ -187,36 +186,34 @@
self.stderr = stderr
def retry(self):
- """Return a response object to be used in a retry attempt
+ """ Return a cloned response object to be used in a retry attempt.
"""
-
# This implementation is a bit lame, because it assumes that
# only stdout stderr were passed to the constructor. OTOH, I
# think that that's all that is ever passed.
-
return self.__class__(stdout=self.stdout, stderr=self.stderr)
_shutdown_flag = None
def _requestShutdown(self, exitCode=0):
- """Request that the server shut down with exitCode after fulfilling
- the current request."""
+ """ Request that the server shut down with exitCode after fulfilling
+ the current request.
+ """
import ZServer
ZServer.exit_code = exitCode
self._shutdown_flag = 1
def _shutdownRequested(self):
- """Returns true if this request requested a server shutdown."""
+ """ Returns true if this request requested a server shutdown.
+ """
return self._shutdown_flag is not None
def setStatus(self, status, reason=None, lock=None):
- '''\
- Sets the HTTP status code of the response; the argument may
- either be an integer or a string from { OK, Created, Accepted,
- NoContent, MovedPermanently, MovedTemporarily,
- NotModified, BadRequest, Unauthorized, Forbidden,
- NotFound, InternalError, NotImplemented, BadGateway,
- ServiceUnavailable } that will be converted to the correct
- integer value. '''
+ """ Set the HTTP status code of the response
+
+ o The argument may either be an integer or a string from the
+ 'status_reasons' dict values: status messages will be converted
+ to the correct integer value.
+ """
if self._locked_status:
# Don't change the response status.
# It has already been determined.
@@ -241,6 +238,7 @@
reason = status_reasons[status]
else:
reason = 'Unknown'
+
self.setHeader('Status', "%d %s" % (status,str(reason)))
self.errmsg = reason
# lock the status if we're told to
@@ -256,12 +254,11 @@
if not scrubbed:
name, value = _scrubHeader(name, value)
key = name.lower()
- if accumulate_header(key):
- self.accumulated_headers = (
- "%s%s: %s\r\n" % (self.accumulated_headers, name, value))
- return
- name = literal and name or key
- self.headers[name] = value
+ if key == 'set-cookie':
+ self.accumulated_headers.append((name, value))
+ else:
+ name = literal and name or key
+ self.headers[name] = value
def getHeader(self, name, literal=0):
'''\
@@ -280,8 +277,7 @@
Set a new HTTP return header with the given value, while retaining
any previously set headers with the same name.'''
name, value = _scrubHeader(name, value)
- self.accumulated_headers = (
- "%s%s: %s\r\n" % (self.accumulated_headers, name, value))
+ self.accumulated_headers.append((name, value))
__setitem__ = setHeader
@@ -805,7 +801,8 @@
if fatal and t is SystemExit and v.code == 0:
body = self.setBody(
(str(t),
- 'Zope has exited normally.<p>' + self._traceback(t, v, tb) + '</p>'),
+ 'Zope has exited normally.<p>'
+ + self._traceback(t, v, tb) + '</p>'),
is_error=1)
else:
try:
@@ -881,15 +878,15 @@
not headers.has_key('transfer-encoding'):
self.setHeader('content-length',len(body))
- headersl = []
- append = headersl.append
+ chunks = []
+ append = chunks.append
# status header must come first.
append("Status: %s" % headers.get('status', '200 OK'))
append("X-Powered-By: Zope (www.zope.org), Python (www.python.org)")
if headers.has_key('status'):
del headers['status']
- for key, val in headers.items():
+ for key, value in headers.items():
if key.lower() == key:
# only change non-literal header names
key = "%s%s" % (key[:1].upper(), key[1:])
@@ -899,11 +896,12 @@
key = "%s-%s%s" % (key[:l],key[l+1:l+2].upper(),key[l+2:])
start = l + 1
l = key.find('-', start)
- append("%s: %s" % (key, val))
- if self.cookies:
- headersl = headersl+self._cookie_list()
- headersl[len(headersl):] = [self.accumulated_headers, body]
- return '\r\n'.join(headersl)
+ append("%s: %s" % (key, value))
+ chunks.extend(self._cookie_list())
+ for key, value in self.accumulated_headers:
+ append("%s: %s" % (key, value))
+ append(body) # WTF?
+ return '\r\n'.join(chunks)
def write(self,data):
"""\
Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/testHTTPResponse.py
===================================================================
--- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/testHTTPResponse.py 2009-12-21 19:01:51 UTC (rev 106829)
+++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/testHTTPResponse.py 2009-12-21 20:13:09 UTC (rev 106830)
@@ -202,37 +202,37 @@
response.setStatus(ResourceLockedError)
self.assertEqual(response.status, 423)
- def test_charset_no_header(self):
+ def test_ctor_charset_no_header(self):
response = self._makeOne(body='foo')
self.assertEqual(response.headers.get('content-type'),
'text/plain; charset=iso-8859-15')
- def test_charset_text_header(self):
+ def test_ctor_charset_text_header(self):
response = self._makeOne(body='foo',
headers={'content-type': 'text/plain'})
self.assertEqual(response.headers.get('content-type'),
'text/plain; charset=iso-8859-15')
- def test_charset_application_header_no_header(self):
+ def test_ctor_charset_application_header_no_header(self):
response = self._makeOne(body='foo',
headers={'content-type': 'application/foo'})
self.assertEqual(response.headers.get('content-type'),
'application/foo')
- def test_charset_application_header_with_header(self):
- response = self._makeOne(body='foo',
- headers={'content-type': 'application/foo; charset: something'})
+ def test_ctor_charset_application_header_with_header(self):
+ response = self._makeOne(body='foo', headers={'content-type':
+ 'application/foo; charset: something'})
self.assertEqual(response.headers.get('content-type'),
'application/foo; charset: something')
- def test_charset_application_header_unicode(self):
+ def test_ctor_charset_application_header_unicode(self):
response = self._makeOne(body=unicode('ärger', 'iso-8859-15'),
headers={'content-type': 'application/foo'})
self.assertEqual(response.headers.get('content-type'),
'application/foo; charset=iso-8859-15')
self.assertEqual(response.body, 'ärger')
- def test_charset_application_header_unicode_1(self):
+ def test_ctor_charset_application_header_unicode_1(self):
response = self._makeOne(body=unicode('ärger', 'iso-8859-15'),
headers={'content-type': 'application/foo; charset=utf-8'})
self.assertEqual(response.headers.get('content-type'),
@@ -240,20 +240,27 @@
self.assertEqual(response.body, unicode('ärger',
'iso-8859-15').encode('utf-8'))
- def test_XMLEncodingRecoding(self):
- xml = u'<?xml version="1.0" encoding="iso-8859-15" ?>\n<foo><bar/></foo>'
- response = self._makeOne(body=xml, headers={'content-type': 'text/xml; charset=utf-8'})
- self.assertEqual(xml.replace('iso-8859-15', 'utf-8')==response.body, True)
- response = self._makeOne(body=xml, headers={'content-type': 'text/xml; charset=iso-8859-15'})
- self.assertEqual(xml==response.body, True)
+ def test_ctor_body_recodes_to_match_content_type_charset(self):
+ xml = (u'<?xml version="1.0" encoding="iso-8859-15" ?>\n'
+ '<foo><bar/></foo>')
+ response = self._makeOne(body=xml, headers={'content-type':
+ 'text/xml; charset=utf-8'})
+ self.assertEqual(response.body, xml.replace('iso-8859-15', 'utf-8'))
+ def test_ctor_body_already_matches_charset_unchanged(self):
+ xml = (u'<?xml version="1.0" encoding="iso-8859-15" ?>\n'
+ '<foo><bar/></foo>')
+ response = self._makeOne(body=xml, headers={'content-type':
+ 'text/xml; charset=iso-8859-15'})
+ self.assertEqual(response.body, xml)
+
def test_addHeader_drops_CRLF(self):
# RFC2616 disallows CRLF in a header value.
response = self._makeOne()
response.addHeader('Location',
'http://www.ietf.org/rfc/\r\nrfc2616.txt')
self.assertEqual(response.accumulated_headers,
- 'Location: http://www.ietf.org/rfc/rfc2616.txt\r\n')
+ [('Location', 'http://www.ietf.org/rfc/rfc2616.txt')])
def test_appendHeader_drops_CRLF(self):
# RFC2616 disallows CRLF in a header value.
@@ -278,9 +285,9 @@
response.setHeader('Set-Cookie',
'violation="http://www.ietf.org/rfc/\r\nrfc2616.txt"')
self.assertEqual(response.accumulated_headers,
- 'Set-Cookie: allowed="OK"\r\n' +
- 'Set-Cookie: '
- 'violation="http://www.ietf.org/rfc/rfc2616.txt"\r\n')
+ [('Set-Cookie', 'allowed="OK"'),
+ ('Set-Cookie',
+ 'violation="http://www.ietf.org/rfc/rfc2616.txt"')])
def test_setBody_compression_vary(self):
# Vary header should be added here
@@ -301,6 +308,3 @@
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(HTTPResponseTests, 'test'))
return suite
-
-if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')
More information about the Zope-Checkins
mailing list