[Zope-Checkins] SVN: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/ Coverage for ZPublisher.WSGIPublisher.publish_module.
Tres Seaver
tseaver at palladion.com
Mon May 31 15:41:13 EDT 2010
Log message for revision 112876:
Coverage for ZPublisher.WSGIPublisher.publish_module.
Changed:
U Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py
U Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py
-=-
Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py
===================================================================
--- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py 2010-05-31 18:26:04 UTC (rev 112875)
+++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py 2010-05-31 19:41:12 UTC (rev 112876)
@@ -15,17 +15,15 @@
from cStringIO import StringIO
import time
+import transaction
from zExceptions import Redirect
from zExceptions import Unauthorized
-from ZServer.medusa.http_date import build_http_date
from zope.event import notify
-
-from ZPublisher.HTTPResponse import HTTPResponse
-from ZPublisher.HTTPRequest import HTTPRequest
-
-from zope.publisher.interfaces import ISkinnable
from zope.publisher.skinnable import setDefaultSkin
+from ZServer.medusa.http_date import build_http_date
+from ZPublisher.HTTPRequest import HTTPRequest
+from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.mapply import mapply
from ZPublisher.pubevents import PubBeforeStreaming
from ZPublisher.Publish import call_object
@@ -148,11 +146,8 @@
raise NotImplementedError
def publish(request, module_name,
- _get_module_info=None, # only for testing
+ _get_module_info=get_module_info, # only for testing
):
- if _get_module_info is None:
- _get_module_info = get_module_info
-
(bobo_before,
bobo_after,
object,
@@ -204,23 +199,26 @@
return response
-def publish_module(environ, start_response):
+def publish_module(environ, start_response,
+ _publish=publish, # only for testing
+ _response_factory=WSGIResponse, # only for testing
+ _request_factory=HTTPRequest, # only for testing
+ ):
status = 200
stdout = StringIO()
stderr = StringIO()
- response = WSGIResponse(stdout=stdout, stderr=stderr)
+ response = _response_factory(stdout=stdout, stderr=stderr)
response._http_version = environ['SERVER_PROTOCOL'].split('/')[1]
response._http_connection = environ.get('CONNECTION_TYPE', 'close')
response._server_version = environ.get('SERVER_SOFTWARE')
- request = HTTPRequest(environ['wsgi.input'], environ, response)
- if ISkinnable.providedBy(request):
- setDefaultSkin(request)
+ request = _request_factory(environ['wsgi.input'], environ, response)
+ setDefaultSkin(request)
try:
- response = publish(request, 'Zope2')
+ response = _publish(request, 'Zope2')
except Unauthorized, v:
- pass
+ response._unauthorized()
except Redirect, v:
response.redirect(v)
@@ -234,10 +232,9 @@
# If somebody used response.write, that data will be in the
# stdout StringIO, so we put that before the body.
# XXX This still needs verification that it really works.
- result=(stdout.getvalue(), response.body)
+ result = (stdout.getvalue(), response.body)
if 'repoze.tm.active' in environ:
- import transaction
txn = transaction.get()
txn.addAfterCommitHook(lambda ok: request.close())
else:
Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py
===================================================================
--- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py 2010-05-31 18:26:04 UTC (rev 112875)
+++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py 2010-05-31 19:41:12 UTC (rev 112876)
@@ -219,6 +219,7 @@
self._callFUT(request, 'okmodule', _gmi)
self.assertEqual(response.realm, None)
+
class Test_publish_module(unittest.TestCase):
def setUp(self):
@@ -230,8 +231,21 @@
from zope.testing.cleanup import cleanUp
cleanUp()
- def _callFUT(self, environ, start_response):
+ def _callFUT(self, environ, start_response,
+ _publish=None, _response_factory=None, _request_factory=None):
from ZPublisher.WSGIPublisher import publish_module
+ if _publish is not None:
+ if _response_factory is not None:
+ if _request_factory is not None:
+ return publish_module(environ, start_response, _publish,
+ _response_factory, _request_factory)
+ return publish_module(environ, start_response, _publish,
+ _response_factory)
+ else:
+ if _request_factory is not None:
+ return publish_module(environ, start_response, _publish,
+ _request_factory=_request_factory)
+ return publish_module(environ, start_response, _publish)
return publish_module(environ, start_response)
def _registerView(self, factory, name, provides=None):
@@ -264,7 +278,7 @@
environ.update(kw)
return environ
- def test_publish_module_uses_setDefaultSkin(self):
+ def test_calls_setDefaultSkin(self):
from zope.traversing.interfaces import ITraversable
from zope.traversing.namespace import view
@@ -286,8 +300,113 @@
environ = self._makeEnviron(PATH_INFO='/@@testing')
self.assertEqual(self._callFUT(environ, noopStartResponse),
('', 'foobar'))
-
+ def test_publish_can_return_new_response(self):
+ from ZPublisher.HTTPRequest import HTTPRequest
+ _response = DummyResponse()
+ _response.body = 'BODY'
+ _after1 = DummyCallable()
+ _after2 = DummyCallable()
+ _response.after_list = (_after1, _after2)
+ environ = self._makeEnviron()
+ start_response = DummyCallable()
+ _publish = DummyCallable()
+ _publish._result = _response
+ app_iter = self._callFUT(environ, start_response, _publish)
+ self.assertEqual(app_iter, ('', 'BODY'))
+ (status, headers), kw = start_response._called_with
+ self.assertEqual(status, '204 No Content')
+ self.assertEqual(headers, [('Content-Length', '0')])
+ self.assertEqual(kw, {})
+ (request, module), kw = _publish._called_with
+ self.failUnless(isinstance(request, HTTPRequest))
+ self.assertEqual(module, 'Zope2')
+ self.assertEqual(kw, {})
+ self.failUnless(_response._finalized)
+ self.assertEqual(_after1._called_with, ((), {}))
+ self.assertEqual(_after2._called_with, ((), {}))
+
+ def test_swallows_Unauthorized(self):
+ from zExceptions import Unauthorized
+ environ = self._makeEnviron()
+ start_response = DummyCallable()
+ _publish = DummyCallable()
+ _publish._raise = Unauthorized('TESTING')
+ app_iter = self._callFUT(environ, start_response, _publish)
+ self.assertEqual(app_iter, ('', ''))
+ (status, headers), kw = start_response._called_with
+ self.assertEqual(status, '401 Unauthorized')
+ self.failUnless(('Content-Length', '0') in headers)
+ self.assertEqual(kw, {})
+
+ def test_swallows_Redirect(self):
+ from zExceptions import Redirect
+ environ = self._makeEnviron()
+ start_response = DummyCallable()
+ _publish = DummyCallable()
+ _publish._raise = Redirect('/redirect_to')
+ app_iter = self._callFUT(environ, start_response, _publish)
+ self.assertEqual(app_iter, ('', ''))
+ (status, headers), kw = start_response._called_with
+ self.assertEqual(status, '302 Moved Temporarily')
+ self.failUnless(('Location', '/redirect_to') in headers)
+ self.failUnless(('Content-Length', '0') in headers)
+ self.assertEqual(kw, {})
+
+ def test_response_body_is_file(self):
+ class DummyFile(file):
+ def __init__(self):
+ pass
+ def read(self, *args, **kw):
+ raise NotImplementedError()
+ _response = DummyResponse()
+ _response._status = '200 OK'
+ _response._headers = [('Content-Length', '4')]
+ body = _response.body = DummyFile()
+ environ = self._makeEnviron()
+ start_response = DummyCallable()
+ _publish = DummyCallable()
+ _publish._result = _response
+ app_iter = self._callFUT(environ, start_response, _publish)
+ self.failUnless(app_iter is body)
+
+ def test_request_closed_when_tm_middleware_not_active(self):
+ environ = self._makeEnviron()
+ start_response = DummyCallable()
+ _request = DummyRequest()
+ _request._closed = False
+ def _close():
+ _request._closed = True
+ _request.close = _close
+ def _request_factory(stdin, environ, response):
+ return _request
+ _publish = DummyCallable()
+ _publish._result = DummyResponse()
+ app_iter = self._callFUT(environ, start_response, _publish,
+ _request_factory=_request_factory)
+ self.failUnless(_request._closed)
+
+ def test_request_not_closed_when_tm_middleware_active(self):
+ import transaction
+ environ = self._makeEnviron()
+ environ['repoze.tm.active'] = 1
+ start_response = DummyCallable()
+ _request = DummyRequest()
+ _request._closed = False
+ def _close():
+ _request._closed = True
+ _request.close = _close
+ def _request_factory(stdin, environ, response):
+ return _request
+ _publish = DummyCallable()
+ _publish._result = DummyResponse()
+ app_iter = self._callFUT(environ, start_response, _publish,
+ _request_factory=_request_factory)
+ self.failIf(_request._closed)
+ txn = transaction.get()
+ self.failUnless(list(txn.getAfterCommitHooks()))
+
+
class DummyRequest(dict):
_processedInputs = False
_traversed = None
@@ -306,10 +425,19 @@
after_list = ()
realm = None
_body = None
+ _finalized = False
+ _status = '204 No Content'
+ _headers = [('Content-Length', '0')]
+ def finalize(self):
+ self._finalized = True
+ return self._status, self._headers
+
def setBody(self, body):
self._body = body
+ body = property(lambda self: self._body, setBody)
+
class DummyCallable(object):
_called_with = _raise = _result = None
More information about the Zope-Checkins
mailing list