[Zope-Checkins] SVN: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/ Coverage for ZPublisher.WSGIPublisher.publish.
Tres Seaver
tseaver at palladion.com
Mon May 31 14:26:04 EDT 2010
Log message for revision 112875:
Coverage for ZPublisher.WSGIPublisher.publish.
Changed:
U Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py
U Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py
-=-
Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py
===================================================================
--- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py 2010-05-31 17:27:45 UTC (rev 112874)
+++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py 2010-05-31 18:26:04 UTC (rev 112875)
@@ -56,6 +56,9 @@
# HTTP/1.1 should use chunked encoding
http_chunk = 0
+ # Append any "cleanup" functions to this list.
+ after_list = ()
+
def finalize(self):
headers = self.headers
@@ -144,7 +147,12 @@
# return ''
raise NotImplementedError
-def publish(request, module_name, after_list, debug=0):
+def publish(request, module_name,
+ _get_module_info=None, # only for testing
+ ):
+ if _get_module_info is None:
+ _get_module_info = get_module_info
+
(bobo_before,
bobo_after,
object,
@@ -153,12 +161,13 @@
err_hook,
validated_hook,
transactions_manager,
- )= get_module_info(module_name)
+ )= _get_module_info(module_name)
request.processInputs()
response = request.response
- after_list[0] = bobo_after
+ if bobo_after is not None:
+ response.after_list += (bobo_after,)
if debug_mode:
response.debug_mode = debug_mode
@@ -197,7 +206,6 @@
def publish_module(environ, start_response):
status = 200
- after_list = [None]
stdout = StringIO()
stderr = StringIO()
response = WSGIResponse(stdout=stdout, stderr=stderr)
@@ -208,13 +216,9 @@
request = HTTPRequest(environ['wsgi.input'], environ, response)
if ISkinnable.providedBy(request):
setDefaultSkin(request)
-
- # Let's support post-mortem debugging
- handle_errors = environ.get('wsgi.handleErrors', True)
try:
- response = publish(request, 'Zope2', after_list=[None],
- debug=handle_errors)
+ response = publish(request, 'Zope2')
except Unauthorized, v:
pass
except Redirect, v:
@@ -241,8 +245,8 @@
stdout.close()
- if after_list[0] is not None:
- after_list[0]()
+ for callable in response.after_list:
+ callable()
# Return the result body iterable.
return result
Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py
===================================================================
--- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py 2010-05-31 17:27:45 UTC (rev 112874)
+++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py 2010-05-31 18:26:04 UTC (rev 112875)
@@ -154,6 +154,71 @@
self.assertRaises(NotImplementedError, lambda: str(response))
+class Test_publish(unittest.TestCase):
+
+ def _callFUT(self, request, module_name, _get_module_info=None):
+ from ZPublisher.WSGIPublisher import publish
+ if _get_module_info is None:
+ return publish(request, module_name)
+
+ return publish(request, module_name, _get_module_info)
+
+ def test_invalid_module_doesnt_catch_error(self):
+ _gmi = DummyCallable()
+ _gmi._raise = ImportError('testing')
+ self.assertRaises(ImportError, self._callFUT, None, 'nonesuch', _gmi)
+ self.assertEqual(_gmi._called_with, (('nonesuch',), {}))
+
+ def test_wo_REMOTE_USER(self):
+ request = DummyRequest(PATH_INFO='/')
+ response = request.response = DummyResponse()
+ _before = DummyCallable()
+ _after = object()
+ _object = DummyCallable()
+ _object._result = 'RESULT'
+ request._traverse_to = _object
+ _realm = 'TESTING'
+ _debug_mode = True
+ _err_hook = DummyCallable()
+ _validated_hook = object()
+ _tm = DummyTM()
+ _gmi = DummyCallable()
+ _gmi._result = (_before, _after, _object, _realm, _debug_mode,
+ _err_hook, _validated_hook, _tm)
+ returned = self._callFUT(request, 'okmodule', _gmi)
+ self.failUnless(returned is response)
+ self.assertEqual(_gmi._called_with, (('okmodule',), {}))
+ self.failUnless(request._processedInputs)
+ self.assertEqual(response.after_list, (_after,))
+ self.failUnless(response.debug_mode)
+ self.assertEqual(response.realm, 'TESTING')
+ self.assertEqual(_before._called_with, ((), {}))
+ self.assertEqual(request['PARENTS'], [_object])
+ self.assertEqual(request._traversed, ('/', None, _validated_hook))
+ self.assertEqual(_tm._recorded, (_object, request))
+ self.assertEqual(_object._called_with, ((), {}))
+ self.assertEqual(response._body, 'RESULT')
+ self.assertEqual(_err_hook._called_with, None)
+
+ def test_w_REMOTE_USER(self):
+ request = DummyRequest(PATH_INFO='/', REMOTE_USER='phred')
+ response = request.response = DummyResponse()
+ _before = DummyCallable()
+ _after = object()
+ _object = DummyCallable()
+ _object._result = 'RESULT'
+ request._traverse_to = _object
+ _realm = 'TESTING'
+ _debug_mode = True
+ _err_hook = DummyCallable()
+ _validated_hook = object()
+ _tm = DummyTM()
+ _gmi = DummyCallable()
+ _gmi._result = (_before, _after, _object, _realm, _debug_mode,
+ _err_hook, _validated_hook, _tm)
+ self._callFUT(request, 'okmodule', _gmi)
+ self.assertEqual(response.realm, None)
+
class Test_publish_module(unittest.TestCase):
def setUp(self):
@@ -223,12 +288,50 @@
('', 'foobar'))
+class DummyRequest(dict):
+ _processedInputs = False
+ _traversed = None
+ _traverse_to = None
+ args = ()
+
+ def processInputs(self):
+ self._processedInputs = True
+
+ def traverse(self, path, response=None, validated_hook=None):
+ self._traversed = (path, response, validated_hook)
+ return self._traverse_to
+
+class DummyResponse(object):
+ debug_mode = False
+ after_list = ()
+ realm = None
+ _body = None
+
+ def setBody(self, body):
+ self._body = body
+
+class DummyCallable(object):
+ _called_with = _raise = _result = None
+
+ def __call__(self, *args, **kw):
+ self._called_with = (args, kw)
+ if self._raise:
+ raise self._raise
+ return self._result
+
+class DummyTM(object):
+ _recorded = _raise = _result = None
+
+ def recordMetaData(self, *args):
+ self._recorded = args
+
def noopStartResponse(status, headers):
pass
def test_suite():
return unittest.TestSuite((
- unittest.makeSuite(WSGIResponseTests, 'test'),
- unittest.makeSuite(Test_publish_module, 'test'),
+ unittest.makeSuite(WSGIResponseTests),
+ unittest.makeSuite(Test_publish),
+ unittest.makeSuite(Test_publish_module),
))
More information about the Zope-Checkins
mailing list