[Zope-Checkins] SVN: Zope/trunk/src/ZPublisher/ Merge r107029 from the tseaver-fix_wsgi branch.
Tres Seaver
tseaver at palladion.com
Tue Jun 1 17:22:15 EDT 2010
Log message for revision 112893:
Merge r107029 from the tseaver-fix_wsgi branch.
- Start test coverage for WSGIPublisher.
Changed:
U Zope/trunk/src/ZPublisher/WSGIPublisher.py
A Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py
-=-
Modified: Zope/trunk/src/ZPublisher/WSGIPublisher.py
===================================================================
--- Zope/trunk/src/ZPublisher/WSGIPublisher.py 2010-06-01 21:08:52 UTC (rev 112892)
+++ Zope/trunk/src/ZPublisher/WSGIPublisher.py 2010-06-01 21:22:15 UTC (rev 112893)
@@ -13,7 +13,6 @@
""" Python Object Publisher -- Publish Python objects on web servers
"""
from cStringIO import StringIO
-import re
import sys
import time
@@ -25,6 +24,12 @@
from ZPublisher.maybe_lock import allocate_lock
from ZPublisher.mapply import mapply
+_NOW = None # overwrite for testing
+def _now():
+ if _NOW is not None:
+ return _NOW
+ return time.time()
+
class WSGIResponse(HTTPResponse):
"""A response object for WSGI
@@ -34,57 +39,53 @@
Most significantly, streaming is not (yet) supported.
"""
_streaming = 0
+ _http_version = None
+ _server_version = None
+ _http_connection = None
- def __str__(self,
- html_search=re.compile('<html>',re.I).search,
- ):
+ def __str__(self):
+
if self._wrote:
if self._chunking:
return '0\r\n\r\n'
else:
return ''
- headers=self.headers
- body=self.body
+ headers = self.headers
+ body = self.body
# set 204 (no content) status if 200 and response is empty
# and not streaming
- if not headers.has_key('content-type') and \
- not headers.has_key('content-length') and \
- not self._streaming and \
- self.status == 200:
+ if ('content-type' not in headers and
+ 'content-length' not in headers and
+ not self._streaming and self.status == 200):
self.setStatus('nocontent')
# add content length if not streaming
- if not headers.has_key('content-length') and \
- not self._streaming:
- self.setHeader('content-length',len(body))
+ content_length = headers.get('content-length')
+ if content_length is None and not self._streaming:
+ self.setHeader('content-length', len(body))
- content_length= headers.get('content-length', None)
- if content_length>0 :
- self.setHeader('content-length', content_length)
+ chunks = []
+ append = chunks.append
- headersl=[]
- append=headersl.append
-
- status=headers.get('status', '200 OK')
-
# status header must come first.
- append("HTTP/%s %s" % (self._http_version or '1.0' , status))
- if headers.has_key('status'):
- del headers['status']
+ version = self._http_version or '1.0'
+ append("HTTP/%s %d %s" % (version, self.status, self.errmsg))
# add zserver headers
- append('Server: %s' % self._server_version)
- append('Date: %s' % build_http_date(time.time()))
+ if self._server_version is not None:
+ append('Server: %s' % self._server_version)
- if self._http_version=='1.0':
- if self._http_connection=='keep-alive' and \
- self.headers.has_key('content-length'):
- self.setHeader('Connection','Keep-Alive')
+ append('Date: %s' % build_http_date(_now()))
+
+ if self._http_version == '1.0':
+ if (self._http_connection == 'keep-alive' and
+ 'content-length' in self.headers):
+ self.setHeader('Connection', 'Keep-Alive')
else:
- self.setHeader('Connection','close')
+ self.setHeader('Connection', 'close')
# Close the connection if we have been asked to.
# Use chunking if streaming output.
@@ -109,11 +110,18 @@
start=l+1
l=key.find('-',start)
append("%s: %s" % (key, val))
+
if self.cookies:
- headersl=headersl+self._cookie_list()
- headersl[len(headersl):]=[self.accumulated_headers, body]
- return "\r\n".join(headersl)
+ chunks.extend(self._cookie_list())
+ for key, value in self.accumulated_headers:
+ append("%s: %s" % (key, value))
+
+ append('') # RFC 2616 mandates empty line between headers and payload
+ append(body)
+
+ return "\r\n".join(chunks)
+
class Retry(Exception):
"""Raise this to retry a request
Added: Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py
===================================================================
--- Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py (rev 0)
+++ Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py 2010-06-01 21:22:15 UTC (rev 112893)
@@ -0,0 +1,135 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import unittest
+
+class WSGIResponseTests(unittest.TestCase):
+
+ _old_NOW = None
+
+ def tearDown(self):
+ if self._old_NOW is not None:
+ self._setNOW(self._old_NOW)
+
+ def _getTargetClass(self):
+ from ZPublisher.WSGIPublisher import WSGIResponse
+ return WSGIResponse
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def _setNOW(self, value):
+ from ZPublisher import WSGIPublisher
+ WSGIPublisher._NOW, self._old_NOW = value, WSGIPublisher._NOW
+
+ def test___str__already_wrote_not_chunking(self):
+ response = self._makeOne()
+ response._wrote = True
+ response._chunking = False
+ self.assertEqual(str(response), '')
+
+ def test___str__already_wrote_w_chunking(self):
+ response = self._makeOne()
+ response._wrote = True
+ response._chunking = True
+ self.assertEqual(str(response), '0\r\n\r\n')
+
+ def test___str__sets_204_on_empty_not_streaming(self):
+ response = self._makeOne()
+ str(response) # not checking value
+ self.assertEqual(response.status, 204)
+
+ def test___str__sets_204_on_empty_not_streaming_ignores_non_200(self):
+ response = self._makeOne()
+ response.setStatus(302)
+ str(response) # not checking value
+ self.assertEqual(response.status, 302)
+
+ def test___str___sets_content_length_if_missing(self):
+ response = self._makeOne()
+ response.setBody('TESTING')
+ str(response) # not checking value
+ self.assertEqual(response.getHeader('Content-Length'),
+ str(len('TESTING')))
+
+ def test___str___skips_setting_content_length_if_missing_w_streaming(self):
+ response = self._makeOne()
+ response._streaming = True
+ response.body = 'TESTING'
+ str(response) # not checking value
+ self.failIf(response.getHeader('Content-Length'))
+
+ def test___str___w_default_http_version(self):
+ response = self._makeOne()
+ response.setBody('TESTING')
+ result = str(response).splitlines()
+ self.assertEqual(result[0], 'HTTP/1.0 200 OK')
+
+ def test___str___w_explicit_http_version(self):
+ response = self._makeOne()
+ response.setBody('TESTING')
+ response._http_version = '1.1'
+ result = str(response).splitlines()
+ self.assertEqual(result[0], 'HTTP/1.1 200 OK')
+
+ def test___str___skips_Server_header_wo_server_version_set(self):
+ response = self._makeOne()
+ response.setBody('TESTING')
+ result = str(response).splitlines()
+ sv = [x for x in result if x.lower().startswith('server-version')]
+ self.failIf(sv)
+
+ def test___str___includes_Server_header_w_server_version_set(self):
+ response = self._makeOne()
+ response._server_version = 'TESTME'
+ response.setBody('TESTING')
+ result = str(response).splitlines()
+ self.assertEqual(result[1], 'Server: TESTME')
+
+ def test___str___includes_Date_header(self):
+ import time
+ WHEN = time.localtime()
+ self._setNOW(time.mktime(WHEN))
+ response = self._makeOne()
+ response.setBody('TESTING')
+ result = str(response).splitlines()
+ self.assertEqual(result[1], 'Date: %s' %
+ time.strftime('%a, %d %b %Y %H:%M:%S GMT',
+ time.gmtime(time.mktime(WHEN))))
+
+ def test___str___HTTP_1_0_keep_alive_w_content_length(self):
+ response = self._makeOne()
+ response._http_version = '1.0'
+ response._http_connection = 'keep-alive'
+ response.setBody('TESTING')
+ str(response) # not checking value
+ self.assertEqual(response.getHeader('Connection'), 'Keep-Alive')
+
+ def test___str___HTTP_1_0_keep_alive_wo_content_length_streaming(self):
+ response = self._makeOne()
+ response._http_version = '1.0'
+ response._http_connection = 'keep-alive'
+ response._streaming = True
+ str(response) # not checking value
+ self.assertEqual(response.getHeader('Connection'), 'close')
+
+ def test___str___HTTP_1_0_not_keep_alive_w_content_length(self):
+ response = self._makeOne()
+ response._http_version = '1.0'
+ response.setBody('TESTING')
+ str(response) # not checking value
+ self.assertEqual(response.getHeader('Connection'), 'close')
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(WSGIResponseTests))
+ return suite
More information about the Zope-Checkins
mailing list