[Zope-Checkins] SVN: Zope/trunk/s Merge r112823-112876 from the tseaver-fix_wsgi branch.
Tres Seaver
tseaver at palladion.com
Tue Jun 1 17:43:37 EDT 2010
Log message for revision 112894:
Merge r112823-112876 from the tseaver-fix_wsgi branch.
- Full test coverage for ZPublisher.WSGIPublisher.
- Add 'add_user' script and finder help, borrowed from 'repoze.zope2'.
- Add call to 'setDefaultSkin(request)' to fix view lookups.
- Override the 'write' method in 'WSGIHTTPReponse' to avoid inappropriate
stringification, allowing things like the Plone resource registry to work
properly.
- Defer closing the request until the transaction commits, if and only if
we know that middleware is handling the transaction.
- Make the WSGI publish function deal with three special cases:
- App returns a file-like object as the response body (keep the app from
stringifying it).
- App raises an Unauthorized exception (just set the response status, to
let middleware handle issuing a challenge).
- App raises a Redirect exception (just call redirect on the response).
- Adapt test code to the new signature of 'HTTPResponse._cookie_list',
which now returns a list of two-tuples, rather than rendered strings.
- Get quickstart page rendering under plain paste config.
- Make WSGIResponse.__str__ raise and exception, preventing an
'attractive nuisance.'
The real logic is in finalize and listHeaders now, and the publish*
functions call those directly.
- Move finalization logic out of HTTPResponse.listHeaders.
- Refactor WSGIHTTPResponse to avoid the need to use str() and parse.
o Instead, compute status and headers directly.
- Chop out error and transaction handling from the 'publish*' functions:
the point of doing WSGI is to move that stuff out of the application,
and out into middleware.
- One backward incompatibility: the special "shutdown" behavior is gone
here. It should be replaced by something in view code.
- Factor out computation of the list of response headers from stringifying
them, allowing WSGIHTTPResponse do reuse them as tuples.
- Chop out copy-and-paste fossils irrelevant to WSGI publication.
- Replace contorted logic for case-normalizing response headers with
idiomatic Python.
- More PEP8 conformance.
Changed:
U Zope/trunk/setup.py
U Zope/trunk/src/Testing/ZopeTestCase/zopedoctest/functional.py
U Zope/trunk/src/Testing/testbrowser.py
U Zope/trunk/src/ZPublisher/HTTPResponse.py
U Zope/trunk/src/ZPublisher/WSGIPublisher.py
U Zope/trunk/src/ZPublisher/tests/testHTTPResponse.py
U Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py
U Zope/trunk/src/ZServer/HTTPResponse.py
U Zope/trunk/src/Zope2/Startup/run.py
A Zope/trunk/src/Zope2/utilities/adduser.py
A Zope/trunk/src/Zope2/utilities/finder.py
-=-
Modified: Zope/trunk/setup.py
===================================================================
--- Zope/trunk/setup.py 2010-06-01 21:22:15 UTC (rev 112893)
+++ Zope/trunk/setup.py 2010-06-01 21:43:36 UTC (rev 112894)
@@ -116,11 +116,15 @@
include_package_data=True,
zip_safe=False,
entry_points={
+ 'paste.app_factory': [
+ 'main=Zope2.Startup.run:make_wsgi_app',
+ ],
'console_scripts': [
'mkzopeinstance=Zope2.utilities.mkzopeinstance:main',
'runzope=Zope2.Startup.run:run',
'zopectl=Zope2.Startup.zopectl:run',
'zpasswd=Zope2.utilities.zpasswd:main',
+ 'addzope2user=Zope2.utilities.adduser:main'
],
},
)
Modified: Zope/trunk/src/Testing/ZopeTestCase/zopedoctest/functional.py
===================================================================
--- Zope/trunk/src/Testing/ZopeTestCase/zopedoctest/functional.py 2010-06-01 21:22:15 UTC (rev 112893)
+++ Zope/trunk/src/Testing/ZopeTestCase/zopedoctest/functional.py 2010-06-01 21:43:36 UTC (rev 112894)
@@ -184,7 +184,7 @@
)
header_output.setResponseStatus(response.getStatus(), response.errmsg)
header_output.setResponseHeaders(response.headers)
- header_output.appendResponseHeaders(response._cookie_list())
+ header_output.headersl.extend(response._cookie_list())
header_output.appendResponseHeaders(response.accumulated_headers)
sync()
Modified: Zope/trunk/src/Testing/testbrowser.py
===================================================================
--- Zope/trunk/src/Testing/testbrowser.py 2010-06-01 21:22:15 UTC (rev 112893)
+++ Zope/trunk/src/Testing/testbrowser.py 2010-06-01 21:43:36 UTC (rev 112894)
@@ -59,7 +59,7 @@
l = key.find('-', start)
headers.append((key, val))
# get the cookies, breaking them into tuples for sorting
- cookies = [(c[:10], c[12:]) for c in real_response._cookie_list()]
+ cookies = real_response._cookie_list()
headers.extend(cookies)
headers.sort()
headers.insert(0, ('Status', "%s %s" % (status, reason)))
Modified: Zope/trunk/src/ZPublisher/HTTPResponse.py
===================================================================
--- Zope/trunk/src/ZPublisher/HTTPResponse.py 2010-06-01 21:22:15 UTC (rev 112893)
+++ Zope/trunk/src/ZPublisher/HTTPResponse.py 2010-06-01 21:43:36 UTC (rev 112894)
@@ -883,9 +883,9 @@
# of name=value pairs may be quoted.
if attrs.get('quoted', True):
- cookie = 'Set-Cookie: %s="%s"' % (name, quote(attrs['value']))
+ cookie = '%s="%s"' % (name, quote(attrs['value']))
else:
- cookie = 'Set-Cookie: %s=%s' % (name, quote(attrs['value']))
+ cookie = '%s=%s' % (name, quote(attrs['value']))
for name, v in attrs.items():
name = name.lower()
if name == 'expires':
@@ -904,41 +904,60 @@
# and block read/write access via JavaScript
elif name == 'http_only' and v:
cookie = '%s; HTTPOnly' % cookie
- cookie_list.append(cookie)
+ cookie_list.append(('Set-Cookie', cookie))
# Should really check size of cookies here!
return cookie_list
+ def finalize(self):
+ """ Set headers required by various parts of protocol.
+ """
+ body = self.body
+ if (not 'content-length' in self.headers and
+ not 'transfer-encoding' in self.headers):
+ self.setHeader('content-length', len(body))
+ return "%d %s" % (self.status, self.errmsg), self.listHeaders()
+
+ def listHeaders(self):
+ """ Return a list of (key, value) pairs for our headers.
+
+ o Do appropriate case normalization.
+ """
+
+ result = [
+ ('X-Powered-By', 'Zope (www.zope.org), Python (www.python.org)')
+ ]
+
+ for key, value in self.headers.items():
+ if key.lower() == key:
+ # only change non-literal header names
+ key = '-'.join([x.capitalize() for x in key.split('-')])
+ result.append((key, value))
+
+ result.extend(self._cookie_list())
+ result.extend(self.accumulated_headers)
+ return result
+
def __str__(self,
html_search=re.compile('<html>',re.I).search,
):
if self._wrote:
return '' # Streaming output was used.
- headers = self.headers
+ status, headers = self.finalize()
body = self.body
- if not headers.has_key('content-length') and \
- not headers.has_key('transfer-encoding'):
- self.setHeader('content-length',len(body))
-
chunks = []
- append = chunks.append
# status header must come first.
- append("Status: %d %s" % (self.status, self.errmsg))
- append("X-Powered-By: Zope (www.zope.org), Python (www.python.org)")
- for key, value in headers.items():
- if key.lower() == key:
- # only change non-literal header names
- key = '-'.join([x.capitalize() for x in key.split('-')])
- append("%s: %s" % (key, value))
- chunks.extend(self._cookie_list())
- for key, value in self.accumulated_headers:
- append("%s: %s" % (key, value))
- append('') # RFC 2616 mandates empty line between headers and payload
- append(body)
+ chunks.append("Status: %s" % status)
+
+ for key, value in headers:
+ chunks.append("%s: %s" % (key, value))
+ # RFC 2616 mandates empty line between headers and payload
+ chunks.append('')
+ chunks.append(body)
return '\r\n'.join(chunks)
def write(self,data):
Modified: Zope/trunk/src/ZPublisher/WSGIPublisher.py
===================================================================
--- Zope/trunk/src/ZPublisher/WSGIPublisher.py 2010-06-01 21:22:15 UTC (rev 112893)
+++ Zope/trunk/src/ZPublisher/WSGIPublisher.py 2010-06-01 21:43:36 UTC (rev 112894)
@@ -13,16 +13,23 @@
""" Python Object Publisher -- Publish Python objects on web servers
"""
from cStringIO import StringIO
-import sys
import time
import transaction
from zExceptions import Redirect
+from zExceptions import Unauthorized
+from zope.event import notify
+from zope.publisher.skinnable import setDefaultSkin
from ZServer.medusa.http_date import build_http_date
-from ZPublisher.HTTPResponse import HTTPResponse
+
from ZPublisher.HTTPRequest import HTTPRequest
-from ZPublisher.maybe_lock import allocate_lock
+from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.mapply import mapply
+from ZPublisher.pubevents import PubBeforeStreaming
+from ZPublisher.Publish import call_object
+from ZPublisher.Publish import dont_publish_class
+from ZPublisher.Publish import get_module_info
+from ZPublisher.Publish import missing_name
_NOW = None # overwrite for testing
def _now():
@@ -34,30 +41,31 @@
"""A response object for WSGI
This Response object knows nothing about ZServer, but tries to be
- compatible with the ZServerHTTPResponse.
-
+ compatible with the ZServerHTTPResponse.
+
Most significantly, streaming is not (yet) supported.
"""
- _streaming = 0
+ _streaming = _chunking = 0
_http_version = None
_server_version = None
_http_connection = None
-
- def __str__(self):
- if self._wrote:
- if self._chunking:
- return '0\r\n\r\n'
- else:
- return ''
+ # Set this value to 1 if streaming output in
+ # HTTP/1.1 should use chunked encoding
+ http_chunk = 0
+ # Append any "cleanup" functions to this list.
+ after_list = ()
+
+ def finalize(self):
+
headers = self.headers
body = self.body
# set 204 (no content) status if 200 and response is empty
# and not streaming
- if ('content-type' not in headers and
- 'content-length' not in headers and
+ if ('content-type' not in headers and
+ 'content-length' not in headers and
not self._streaming and self.status == 200):
self.setStatus('nocontent')
@@ -67,21 +75,8 @@
if content_length is None and not self._streaming:
self.setHeader('content-length', len(body))
- chunks = []
- append = chunks.append
-
- # status header must come first.
- version = self._http_version or '1.0'
- append("HTTP/%s %d %s" % (version, self.status, self.errmsg))
-
- # add zserver headers
- if self._server_version is not None:
- append('Server: %s' % self._server_version)
-
- append('Date: %s' % build_http_date(_now()))
-
if self._http_version == '1.0':
- if (self._http_connection == 'keep-alive' and
+ if (self._http_connection == 'keep-alive' and
'content-length' in self.headers):
self.setHeader('Connection', 'Keep-Alive')
else:
@@ -89,398 +84,166 @@
# Close the connection if we have been asked to.
# Use chunking if streaming output.
- if self._http_version=='1.1':
- if self._http_connection=='close':
- self.setHeader('Connection','close')
+ if self._http_version == '1.1':
+ if self._http_connection == 'close':
+ self.setHeader('Connection', 'close')
elif not self.headers.has_key('content-length'):
if self.http_chunk and self._streaming:
- self.setHeader('Transfer-Encoding','chunked')
- self._chunking=1
+ self.setHeader('Transfer-Encoding', 'chunked')
+ self._chunking = 1
else:
self.setHeader('Connection','close')
- for key, val in headers.items():
- if key.lower()==key:
- # only change non-literal header names
- key="%s%s" % (key[:1].upper(), key[1:])
- start=0
- l=key.find('-',start)
- while l >= start:
- key="%s-%s%s" % (key[:l],key[l+1:l+2].upper(),key[l+2:])
- start=l+1
- l=key.find('-',start)
- append("%s: %s" % (key, val))
+ return '%s %s' % (self.status, self.errmsg), self.listHeaders()
- if self.cookies:
- chunks.extend(self._cookie_list())
+ def listHeaders(self):
+ result = []
+ if self._server_version:
+ result.append(('Server', self._server_version))
- for key, value in self.accumulated_headers:
- append("%s: %s" % (key, value))
+ result.append(('Date', build_http_date(_now())))
+ result.extend(HTTPResponse.listHeaders(self))
+ return result
- append('') # RFC 2616 mandates empty line between headers and payload
- append(body)
+ def _unauthorized(self):
+ self.setStatus(401)
- return "\r\n".join(chunks)
+ def write(self,data):
+ """ Add data to our output stream.
-
-class Retry(Exception):
- """Raise this to retry a request
- """
+ HTML data may be returned using a stream-oriented interface.
+ This allows the browser to display partial results while
+ computation of a response to proceed.
+ """
+ if not self._streaming:
+
+ notify(PubBeforeStreaming(self))
+
+ self._streaming = 1
+ self.stdout.flush()
- def __init__(self, t=None, v=None, tb=None):
- self._args=t, v, tb
+ self.stdout.write(data)
- def reraise(self):
- t, v, tb = self._args
- if t is None: t=Retry
- if tb is None: raise t, v
- try: raise t, v, tb
- finally: tb=None
+ def setBody(self, body, title='', is_error=0):
+ if isinstance(body, file):
+ body.seek(0, 2)
+ length = body.tell()
+ body.seek(0)
+ self.setHeader('Content-Length', '%d' % length)
+ self.body = body
+ else:
+ HTTPResponse.setBody(self, body, title, is_error)
-def call_object(object, args, request):
- result=apply(object,args) # Type s<cr> to step into published object.
- return result
+ def __str__(self):
-def missing_name(name, request):
- if name=='self': return request['PARENTS'][0]
- request.response.badRequestError(name)
+ # XXX Consider how we are to handle the cases this logic was trying
+ # to cover
+ #if self._wrote:
+ # if self._chunking:
+ # return '0\r\n\r\n'
+ # else:
+ # return ''
+ raise NotImplementedError
-def dont_publish_class(klass, request):
- request.response.forbiddenError("class %s" % klass.__name__)
+def publish(request, module_name,
+ _get_module_info=get_module_info, # only for testing
+ ):
+ (bobo_before,
+ bobo_after,
+ object,
+ realm,
+ debug_mode,
+ err_hook,
+ validated_hook,
+ transactions_manager,
+ )= _get_module_info(module_name)
-_default_debug_mode = False
-_default_realm = None
+ request.processInputs()
+ response = request.response
-def set_default_debug_mode(debug_mode):
- global _default_debug_mode
- _default_debug_mode = debug_mode
+ if bobo_after is not None:
+ response.after_list += (bobo_after,)
-def set_default_authentication_realm(realm):
- global _default_realm
- _default_realm = realm
+ if debug_mode:
+ response.debug_mode = debug_mode
-def publish(request, module_name, after_list, debug=0,
- # Optimize:
- call_object=call_object,
- missing_name=missing_name,
- dont_publish_class=dont_publish_class,
- mapply=mapply,
- ):
+ if realm and not request.get('REMOTE_USER', None):
+ response.realm = realm
- (bobo_before, bobo_after, object, realm, debug_mode, err_hook,
- validated_hook, transactions_manager)= get_module_info(module_name)
+ if bobo_before is not None:
+ bobo_before()
- parents=None
- response=None
- try:
- request.processInputs()
+ # Get the path list.
+ # According to RFC1738 a trailing space in the path is valid.
+ path = request.get('PATH_INFO')
- request_get=request.get
- response=request.response
+ request['PARENTS'] = parents = [object]
+ object = request.traverse(path, validated_hook=validated_hook)
- # First check for "cancel" redirect:
- if request_get('SUBMIT','').strip().lower()=='cancel':
- cancel=request_get('CANCEL_ACTION','')
- if cancel:
- raise Redirect, cancel
+ if transactions_manager:
+ transactions_manager.recordMetaData(object, request)
- after_list[0]=bobo_after
- if debug_mode:
- response.debug_mode=debug_mode
- if realm and not request.get('REMOTE_USER',None):
- response.realm=realm
+ result = mapply(object,
+ request.args,
+ request,
+ call_object,
+ 1,
+ missing_name,
+ dont_publish_class,
+ request,
+ bind=1,
+ )
- if bobo_before is not None:
- bobo_before()
+ if result is not response:
+ response.setBody(result)
- # Get the path list.
- # According to RFC1738 a trailing space in the path is valid.
- path=request_get('PATH_INFO')
+ return response
- request['PARENTS']=parents=[object]
-
- if transactions_manager:
- transactions_manager.begin()
-
- object=request.traverse(path, validated_hook=validated_hook)
-
- if transactions_manager:
- transactions_manager.recordMetaData(object, request)
-
- result=mapply(object, request.args, request,
- call_object,1,
- missing_name,
- dont_publish_class,
- request, bind=1)
-
- if result is not response:
- response.setBody(result)
-
- if transactions_manager:
- transactions_manager.commit()
-
- return response
- except:
-
- # DM: provide nicer error message for FTP
- sm = None
- if response is not None:
- sm = getattr(response, "setMessage", None)
-
- if sm is not None:
- from asyncore import compact_traceback
- cl,val= sys.exc_info()[:2]
- sm('%s: %s %s' % (
- getattr(cl,'__name__',cl), val,
- debug_mode and compact_traceback()[-1] or ''))
-
- if err_hook is not None:
- if parents:
- parents=parents[0]
- try:
- try:
- return err_hook(parents, request,
- sys.exc_info()[0],
- sys.exc_info()[1],
- sys.exc_info()[2],
- )
- except Retry:
- if not request.supports_retry():
- return err_hook(parents, request,
- sys.exc_info()[0],
- sys.exc_info()[1],
- sys.exc_info()[2],
- )
- finally:
- if transactions_manager:
- transactions_manager.abort()
-
- # Only reachable if Retry is raised and request supports retry.
- newrequest=request.retry()
- request.close() # Free resources held by the request.
- try:
- return publish(newrequest, module_name, after_list, debug)
- finally:
- newrequest.close()
-
- else:
- if transactions_manager:
- transactions_manager.abort()
- raise
-
-def publish_module_standard(environ, start_response):
-
- must_die=0
- status=200
- after_list=[None]
+def publish_module(environ, start_response,
+ _publish=publish, # only for testing
+ _response_factory=WSGIResponse, # only for testing
+ _request_factory=HTTPRequest, # only for testing
+ ):
+ status = 200
stdout = StringIO()
stderr = StringIO()
- response = WSGIResponse(stdout=stdout, stderr=stderr)
+ response = _response_factory(stdout=stdout, stderr=stderr)
response._http_version = environ['SERVER_PROTOCOL'].split('/')[1]
response._http_connection = environ.get('CONNECTION_TYPE', 'close')
- response._server_version = environ['SERVER_SOFTWARE']
+ response._server_version = environ.get('SERVER_SOFTWARE')
- request = HTTPRequest(environ['wsgi.input'], environ, response)
+ request = _request_factory(environ['wsgi.input'], environ, response)
+ setDefaultSkin(request)
- # Let's support post-mortem debugging
- handle_errors = environ.get('wsgi.handleErrors', True)
-
try:
- response = publish(request, 'Zope2', after_list=[None],
- debug=handle_errors)
- except SystemExit, v:
- must_die=sys.exc_info()
- request.response.exception(must_die)
- except ImportError, v:
- if isinstance(v, tuple) and len(v)==3: must_die=v
- elif hasattr(sys, 'exc_info'): must_die=sys.exc_info()
- else: must_die = SystemExit, v, sys.exc_info()[2]
- request.response.exception(1, v)
- except:
- request.response.exception()
- status=response.getStatus()
+ response = _publish(request, 'Zope2')
+ except Unauthorized, v:
+ response._unauthorized()
+ except Redirect, v:
+ response.redirect(v)
- if response:
- # Start the WSGI server response
- status = response.getHeader('status')
- # ZServerHTTPResponse calculates all headers and things when you
- # call it's __str__, so we need to get it, and then munge out
- # the headers from it. It's a bit backwards, and we might optimize
- # this by not using ZServerHTTPResponse at all, and making the
- # HTTPResponses more WSGI friendly. But this works.
- result = str(response)
- headers, body = result.split('\r\n\r\n',1)
- headers = [tuple(n.split(': ',1)) for n in headers.split('\r\n')[1:]]
- start_response(status, headers)
+ # Start the WSGI server response
+ status, headers = response.finalize()
+ start_response(status, headers)
+
+ if isinstance(response.body, file):
+ result = response.body
+ else:
# If somebody used response.write, that data will be in the
# stdout StringIO, so we put that before the body.
# XXX This still needs verification that it really works.
- result=(stdout.getvalue(), body)
- request.close()
+ result = (stdout.getvalue(), response.body)
+
+ if 'repoze.tm.active' in environ:
+ txn = transaction.get()
+ txn.addAfterCommitHook(lambda ok: request.close())
+ else:
+ request.close() # this aborts the transation!
+
stdout.close()
- if after_list[0] is not None: after_list[0]()
-
- if must_die:
- # Try to turn exception value into an exit code.
- try:
- if hasattr(must_die[1], 'code'):
- code = must_die[1].code
- else: code = int(must_die[1])
- except:
- code = must_die[1] and 1 or 0
- if hasattr(request.response, '_requestShutdown'):
- request.response._requestShutdown(code)
+ for callable in response.after_list:
+ callable()
- try: raise must_die[0], must_die[1], must_die[2]
- finally: must_die=None
-
# Return the result body iterable.
return result
-
-
-_l=allocate_lock()
-def get_module_info(module_name, modules={},
- acquire=_l.acquire,
- release=_l.release,
- ):
-
- if modules.has_key(module_name): return modules[module_name]
-
- if module_name[-4:]=='.cgi': module_name=module_name[:-4]
-
- acquire()
- tb=None
- g = globals()
- try:
- try:
- module=__import__(module_name, g, g, ('__doc__',))
-
- # Let the app specify a realm
- if hasattr(module,'__bobo_realm__'):
- realm=module.__bobo_realm__
- elif _default_realm is not None:
- realm=_default_realm
- else:
- realm=module_name
-
- # Check for debug mode
- debug_mode=None
- if hasattr(module,'__bobo_debug_mode__'):
- debug_mode=not not module.__bobo_debug_mode__
- else:
- debug_mode = _default_debug_mode
-
- bobo_before = getattr(module, "__bobo_before__", None)
- bobo_after = getattr(module, "__bobo_after__", None)
-
- if hasattr(module,'bobo_application'):
- object=module.bobo_application
- elif hasattr(module,'web_objects'):
- object=module.web_objects
- else: object=module
-
- error_hook=getattr(module,'zpublisher_exception_hook', None)
- validated_hook=getattr(module,'zpublisher_validated_hook', None)
-
- transactions_manager=getattr(
- module,'zpublisher_transactions_manager', None)
- if not transactions_manager:
- # Create a default transactions manager for use
- # by software that uses ZPublisher and ZODB but
- # not the rest of Zope.
- transactions_manager = DefaultTransactionsManager()
-
- info= (bobo_before, bobo_after, object, realm, debug_mode,
- error_hook, validated_hook, transactions_manager)
-
- modules[module_name]=modules[module_name+'.cgi']=info
-
- return info
- except:
- t,v,tb=sys.exc_info()
- v=str(v)
- raise ImportError, (t, v), tb
- finally:
- tb=None
- release()
-
-
-class DefaultTransactionsManager:
- def begin(self):
- transaction.begin()
- def commit(self):
- transaction.commit()
- def abort(self):
- transaction.abort()
- def recordMetaData(self, object, request):
- # Is this code needed?
- request_get = request.get
- T= transaction.get()
- T.note(request_get('PATH_INFO'))
- auth_user=request_get('AUTHENTICATED_USER',None)
- if auth_user is not None:
- T.setUser(auth_user, request_get('AUTHENTICATION_PATH'))
-
-# profiling support
-
-_pfile = None # profiling filename
-_plock=allocate_lock() # profiling lock
-_pfunc=publish_module_standard
-_pstat=None
-
-def install_profiling(filename):
- global _pfile
- _pfile = filename
-
-def pm(environ, start_response):
- try:
- r=_pfunc(environ, start_response)
- except: r=None
- sys._pr_=r
-
-def publish_module_profiled(environ, start_response):
- import profile, pstats
- global _pstat
- _plock.acquire()
- try:
- if request is not None:
- path_info=request.get('PATH_INFO')
- else: path_info=environ.get('PATH_INFO')
- if path_info[-14:]=='manage_profile':
- return _pfunc(environ, start_response)
- pobj=profile.Profile()
- pobj.runcall(pm, menviron, start_response)
- result=sys._pr_
- pobj.create_stats()
- if _pstat is None:
- _pstat=sys._ps_=pstats.Stats(pobj)
- else: _pstat.add(pobj)
- finally:
- _plock.release()
-
- if result is None:
- try:
- error=sys.exc_info()
- file=open(_pfile, 'w')
- file.write(
- "See the url "
- "http://www.python.org/doc/current/lib/module-profile.html"
- "\n for information on interpreting profiler statistics.\n\n"
- )
- sys.stdout=file
- _pstat.strip_dirs().sort_stats('cumulative').print_stats(250)
- _pstat.strip_dirs().sort_stats('time').print_stats(250)
- file.flush()
- file.close()
- except: pass
- raise error[0], error[1], error[2]
- return result
-
-def publish_module(environ, start_response):
- """ publish a Python module, with or without profiling enabled """
- if _pfile: # profiling is enabled
- return publish_module_profiled(environ, start_response)
- else:
- return publish_module_standard(environ, start_response)
-
Modified: Zope/trunk/src/ZPublisher/tests/testHTTPResponse.py
===================================================================
--- Zope/trunk/src/ZPublisher/tests/testHTTPResponse.py 2010-06-01 21:22:15 UTC (rev 112893)
+++ Zope/trunk/src/ZPublisher/tests/testHTTPResponse.py 2010-06-01 21:43:36 UTC (rev 112894)
@@ -208,7 +208,7 @@
self.assertEqual(cookie.get('quoted'), True)
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
- self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"')
+ self.assertEqual(cookies[0], ('Set-Cookie', 'foo="bar"'))
def test_setCookie_w_expires(self):
EXPIRES = 'Wed, 31-Dec-97 23:59:59 GMT'
@@ -223,7 +223,7 @@
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0],
- 'Set-Cookie: foo="bar"; Expires=%s' % EXPIRES)
+ ('Set-Cookie', 'foo="bar"; Expires=%s' % EXPIRES))
def test_setCookie_w_domain(self):
response = self._makeOne()
@@ -237,7 +237,7 @@
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0],
- 'Set-Cookie: foo="bar"; Domain=example.com')
+ ('Set-Cookie', 'foo="bar"; Domain=example.com'))
def test_setCookie_w_path(self):
response = self._makeOne()
@@ -250,7 +250,7 @@
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
- self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"; Path=/')
+ self.assertEqual(cookies[0], ('Set-Cookie', 'foo="bar"; Path=/'))
def test_setCookie_w_comment(self):
response = self._makeOne()
@@ -263,7 +263,8 @@
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
- self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"; Comment=COMMENT')
+ self.assertEqual(cookies[0],
+ ('Set-Cookie', 'foo="bar"; Comment=COMMENT'))
def test_setCookie_w_secure_true_value(self):
response = self._makeOne()
@@ -276,7 +277,7 @@
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
- self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"; Secure')
+ self.assertEqual(cookies[0], ('Set-Cookie','foo="bar"; Secure'))
def test_setCookie_w_secure_false_value(self):
response = self._makeOne()
@@ -289,7 +290,7 @@
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
- self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"')
+ self.assertEqual(cookies[0], ('Set-Cookie', 'foo="bar"'))
def test_setCookie_w_httponly_true_value(self):
response = self._makeOne()
@@ -302,7 +303,7 @@
cookie_list = response._cookie_list()
self.assertEqual(len(cookie_list), 1)
- self.assertEqual(cookie_list[0], 'Set-Cookie: foo="bar"; HTTPOnly')
+ self.assertEqual(cookie_list[0], ('Set-Cookie', 'foo="bar"; HTTPOnly'))
def test_setCookie_w_httponly_false_value(self):
response = self._makeOne()
@@ -315,7 +316,7 @@
cookie_list = response._cookie_list()
self.assertEqual(len(cookie_list), 1)
- self.assertEqual(cookie_list[0], 'Set-Cookie: foo="bar"')
+ self.assertEqual(cookie_list[0], ('Set-Cookie', 'foo="bar"'))
def test_setCookie_unquoted(self):
response = self._makeOne()
@@ -327,7 +328,7 @@
cookie_list = response._cookie_list()
self.assertEqual(len(cookie_list), 1)
- self.assertEqual(cookie_list[0], 'Set-Cookie: foo=bar')
+ self.assertEqual(cookie_list[0], ('Set-Cookie', 'foo=bar'))
def test_appendCookie_w_existing(self):
response = self._makeOne()
@@ -930,6 +931,177 @@
else:
self.fail("Didn't raise Unauthorized")
+ def test_finalize_empty(self):
+ response = self._makeOne()
+ status, headers = response.finalize()
+ self.assertEqual(status, '200 OK')
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('Content-Length', '0'),
+ ])
+
+ def test_finalize_w_body(self):
+ response = self._makeOne()
+ response.body = 'TEST'
+ status, headers = response.finalize()
+ self.assertEqual(status, '200 OK')
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('Content-Length', '4'),
+ ])
+
+ def test_finalize_w_existing_content_length(self):
+ response = self._makeOne()
+ response.setHeader('Content-Length', '42')
+ status, headers = response.finalize()
+ self.assertEqual(status, '200 OK')
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('Content-Length', '42'),
+ ])
+
+ def test_finalize_w_transfer_encoding(self):
+ response = self._makeOne()
+ response.setHeader('Transfer-Encoding', 'slurry')
+ status, headers = response.finalize()
+ self.assertEqual(status, '200 OK')
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('Transfer-Encoding', 'slurry'),
+ ])
+
+ def test_finalize_after_redirect(self):
+ response = self._makeOne()
+ response.redirect('http://example.com/')
+ status, headers = response.finalize()
+ self.assertEqual(status, '302 Moved Temporarily')
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('Content-Length', '0'),
+ ('Location', 'http://example.com/'),
+ ])
+
+ def test_listHeaders_empty(self):
+ response = self._makeOne()
+ headers = response.listHeaders()
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ])
+
+ def test_listHeaders_already_wrote(self):
+ # listHeaders doesn't do the short-circuit on _wrote.
+ response = self._makeOne()
+ response._wrote = True
+ headers = response.listHeaders()
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ])
+
+ def test_listHeaders_existing_content_length(self):
+ response = self._makeOne()
+ response.setHeader('Content-Length', 42)
+ headers = response.listHeaders()
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('Content-Length', '42'),
+ ])
+
+ def test_listHeaders_existing_transfer_encoding(self):
+ # If 'Transfer-Encoding' is set, don't force 'Content-Length'.
+ response = self._makeOne()
+ response.setHeader('Transfer-Encoding', 'slurry')
+ headers = response.listHeaders()
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('Transfer-Encoding', 'slurry'),
+ ])
+
+ def test_listHeaders_after_setHeader(self):
+ response = self._makeOne()
+ response.setHeader('x-consistency', 'Foolish')
+ headers = response.listHeaders()
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('X-Consistency', 'Foolish'),
+ ])
+
+ def test_listHeaders_after_setHeader_literal(self):
+ response = self._makeOne()
+ response.setHeader('X-consistency', 'Foolish', literal=True)
+ headers = response.listHeaders()
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('X-consistency', 'Foolish'),
+ ])
+
+ def test_listHeaders_after_redirect(self):
+ response = self._makeOne()
+ response.redirect('http://example.com/')
+ headers = response.listHeaders()
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('Location', 'http://example.com/'),
+ ])
+
+ def test_listHeaders_after_setCookie_appendCookie(self):
+ response = self._makeOne()
+ response.setCookie('foo', 'bar', path='/')
+ response.appendCookie('foo', 'baz')
+ headers = response.listHeaders()
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('Set-Cookie', 'foo="bar%3Abaz"; Path=/'),
+ ])
+
+ def test_listHeaders_after_expireCookie(self):
+ response = self._makeOne()
+ response.expireCookie('qux', path='/')
+ headers = response.listHeaders()
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('Set-Cookie', 'qux="deleted"; '
+ 'Path=/; '
+ 'Expires=Wed, 31-Dec-97 23:59:59 GMT; '
+ 'Max-Age=0'),
+ ])
+
+ def test_listHeaders_after_addHeader(self):
+ response = self._makeOne()
+ response.addHeader('X-Consistency', 'Foolish')
+ response.addHeader('X-Consistency', 'Oatmeal')
+ headers = response.listHeaders()
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('X-Consistency', 'Foolish'),
+ ('X-Consistency', 'Oatmeal'),
+ ])
+
+ def test_listHeaders_w_body(self):
+ response = self._makeOne()
+ response.setBody('BLAH')
+ headers = response.listHeaders()
+ self.assertEqual(headers,
+ [('X-Powered-By', 'Zope (www.zope.org), '
+ 'Python (www.python.org)'),
+ ('Content-Length', '4'),
+ ('Content-Type', 'text/plain; charset=iso-8859-15'),
+ ])
+
def test___str__already_wrote(self):
response = self._makeOne()
response._wrote = True
Modified: Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py
===================================================================
--- Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py 2010-06-01 21:22:15 UTC (rev 112893)
+++ Zope/trunk/src/ZPublisher/tests/test_WSGIPublisher.py 2010-06-01 21:43:36 UTC (rev 112894)
@@ -31,105 +31,435 @@
from ZPublisher import WSGIPublisher
WSGIPublisher._NOW, self._old_NOW = value, WSGIPublisher._NOW
- def test___str__already_wrote_not_chunking(self):
+ def test_finalize_sets_204_on_empty_not_streaming(self):
response = self._makeOne()
- response._wrote = True
- response._chunking = False
- self.assertEqual(str(response), '')
-
- def test___str__already_wrote_w_chunking(self):
- response = self._makeOne()
- response._wrote = True
- response._chunking = True
- self.assertEqual(str(response), '0\r\n\r\n')
-
- def test___str__sets_204_on_empty_not_streaming(self):
- response = self._makeOne()
- str(response) # not checking value
+ response.finalize()
self.assertEqual(response.status, 204)
- def test___str__sets_204_on_empty_not_streaming_ignores_non_200(self):
+ def test_finalize_sets_204_on_empty_not_streaming_ignores_non_200(self):
response = self._makeOne()
response.setStatus(302)
- str(response) # not checking value
+ response.finalize()
self.assertEqual(response.status, 302)
- def test___str___sets_content_length_if_missing(self):
+ def test_finalize_sets_content_length_if_missing(self):
response = self._makeOne()
response.setBody('TESTING')
- str(response) # not checking value
- self.assertEqual(response.getHeader('Content-Length'),
- str(len('TESTING')))
+ response.finalize()
+ self.assertEqual(response.getHeader('Content-Length'), '7')
- def test___str___skips_setting_content_length_if_missing_w_streaming(self):
+ def test_finalize_skips_setting_content_length_if_missing_w_streaming(self):
response = self._makeOne()
response._streaming = True
response.body = 'TESTING'
- str(response) # not checking value
+ response.finalize()
self.failIf(response.getHeader('Content-Length'))
- def test___str___w_default_http_version(self):
+ def test_finalize_HTTP_1_0_keep_alive_w_content_length(self):
response = self._makeOne()
+ response._http_version = '1.0'
+ response._http_connection = 'keep-alive'
response.setBody('TESTING')
- result = str(response).splitlines()
- self.assertEqual(result[0], 'HTTP/1.0 200 OK')
+ response.finalize()
+ self.assertEqual(response.getHeader('Connection'), 'Keep-Alive')
- def test___str___w_explicit_http_version(self):
+ def test_finalize_HTTP_1_0_keep_alive_wo_content_length_streaming(self):
response = self._makeOne()
+ response._http_version = '1.0'
+ response._http_connection = 'keep-alive'
+ response._streaming = True
+ response.finalize()
+ self.assertEqual(response.getHeader('Connection'), 'close')
+
+ def test_finalize_HTTP_1_0_not_keep_alive_w_content_length(self):
+ response = self._makeOne()
+ response._http_version = '1.0'
response.setBody('TESTING')
+ response.finalize()
+ self.assertEqual(response.getHeader('Connection'), 'close')
+
+ def test_finalize_HTTP_1_1_connection_close(self):
+ response = self._makeOne()
response._http_version = '1.1'
- result = str(response).splitlines()
- self.assertEqual(result[0], 'HTTP/1.1 200 OK')
+ response._http_connection = 'close'
+ response.finalize()
+ self.assertEqual(response.getHeader('Connection'), 'close')
- def test___str___skips_Server_header_wo_server_version_set(self):
+ def test_finalize_HTTP_1_1_wo_content_length_streaming_wo_http_chunk(self):
response = self._makeOne()
+ response._http_version = '1.1'
+ response._streaming = True
+ response.http_chunk = 0
+ response.finalize()
+ self.assertEqual(response.getHeader('Connection'), 'close')
+ self.assertEqual(response.getHeader('Transfer-Encoding'), None)
+ self.failIf(response._chunking)
+
+ def test_finalize_HTTP_1_1_wo_content_length_streaming_w_http_chunk(self):
+ response = self._makeOne()
+ response._http_version = '1.1'
+ response._streaming = True
+ response.http_chunk = 1
+ response.finalize()
+ self.assertEqual(response.getHeader('Connection'), None)
+
+ def test_finalize_HTTP_1_1_w_content_length_wo_chunk_wo_streaming(self):
+ response = self._makeOne()
+ response._http_version = '1.1'
response.setBody('TESTING')
- result = str(response).splitlines()
- sv = [x for x in result if x.lower().startswith('server-version')]
+ response.finalize()
+ self.assertEqual(response.getHeader('Connection'), None)
+
+ def test_listHeaders_skips_Server_header_wo_server_version_set(self):
+ response = self._makeOne()
+ response.setBody('TESTING')
+ headers = response.listHeaders()
+ sv = [x for x in headers if x[0] == 'Server']
self.failIf(sv)
- def test___str___includes_Server_header_w_server_version_set(self):
+ def test_listHeaders_includes_Server_header_w_server_version_set(self):
response = self._makeOne()
response._server_version = 'TESTME'
response.setBody('TESTING')
- result = str(response).splitlines()
- self.assertEqual(result[1], 'Server: TESTME')
+ headers = response.listHeaders()
+ sv = [x for x in headers if x[0] == 'Server']
+ self.failUnless(('Server', 'TESTME') in sv)
- def test___str___includes_Date_header(self):
+ def test_listHeaders_includes_Date_header(self):
import time
WHEN = time.localtime()
self._setNOW(time.mktime(WHEN))
response = self._makeOne()
response.setBody('TESTING')
- result = str(response).splitlines()
- self.assertEqual(result[1], 'Date: %s' %
- time.strftime('%a, %d %b %Y %H:%M:%S GMT',
- time.gmtime(time.mktime(WHEN))))
+ headers = response.listHeaders()
+ whenstr = time.strftime('%a, %d %b %Y %H:%M:%S GMT',
+ time.gmtime(time.mktime(WHEN)))
+ self.failUnless(('Date', whenstr) in headers)
- def test___str___HTTP_1_0_keep_alive_w_content_length(self):
- response = self._makeOne()
- response._http_version = '1.0'
- response._http_connection = 'keep-alive'
- response.setBody('TESTING')
- str(response) # not checking value
- self.assertEqual(response.getHeader('Connection'), 'Keep-Alive')
+ #def test___str__already_wrote_not_chunking(self):
+ # response = self._makeOne()
+ # response._wrote = True
+ # response._chunking = False
+ # self.assertEqual(str(response), '')
- def test___str___HTTP_1_0_keep_alive_wo_content_length_streaming(self):
- response = self._makeOne()
- response._http_version = '1.0'
- response._http_connection = 'keep-alive'
- response._streaming = True
- str(response) # not checking value
- self.assertEqual(response.getHeader('Connection'), 'close')
+ #def test___str__already_wrote_w_chunking(self):
+ # response = self._makeOne()
+ # response._wrote = True
+ # response._chunking = True
+ # self.assertEqual(str(response), '0\r\n\r\n')
- def test___str___HTTP_1_0_not_keep_alive_w_content_length(self):
+ def test___str___raises(self):
response = self._makeOne()
- response._http_version = '1.0'
response.setBody('TESTING')
- str(response) # not checking value
- self.assertEqual(response.getHeader('Connection'), 'close')
+ self.assertRaises(NotImplementedError, lambda: str(response))
+
+class Test_publish(unittest.TestCase):
+
+ def _callFUT(self, request, module_name, _get_module_info=None):
+ from ZPublisher.WSGIPublisher import publish
+ if _get_module_info is None:
+ return publish(request, module_name)
+
+ return publish(request, module_name, _get_module_info)
+
+ def test_invalid_module_doesnt_catch_error(self):
+ _gmi = DummyCallable()
+ _gmi._raise = ImportError('testing')
+ self.assertRaises(ImportError, self._callFUT, None, 'nonesuch', _gmi)
+ self.assertEqual(_gmi._called_with, (('nonesuch',), {}))
+
+ def test_wo_REMOTE_USER(self):
+ request = DummyRequest(PATH_INFO='/')
+ response = request.response = DummyResponse()
+ _before = DummyCallable()
+ _after = object()
+ _object = DummyCallable()
+ _object._result = 'RESULT'
+ request._traverse_to = _object
+ _realm = 'TESTING'
+ _debug_mode = True
+ _err_hook = DummyCallable()
+ _validated_hook = object()
+ _tm = DummyTM()
+ _gmi = DummyCallable()
+ _gmi._result = (_before, _after, _object, _realm, _debug_mode,
+ _err_hook, _validated_hook, _tm)
+ returned = self._callFUT(request, 'okmodule', _gmi)
+ self.failUnless(returned is response)
+ self.assertEqual(_gmi._called_with, (('okmodule',), {}))
+ self.failUnless(request._processedInputs)
+ self.assertEqual(response.after_list, (_after,))
+ self.failUnless(response.debug_mode)
+ self.assertEqual(response.realm, 'TESTING')
+ self.assertEqual(_before._called_with, ((), {}))
+ self.assertEqual(request['PARENTS'], [_object])
+ self.assertEqual(request._traversed, ('/', None, _validated_hook))
+ self.assertEqual(_tm._recorded, (_object, request))
+ self.assertEqual(_object._called_with, ((), {}))
+ self.assertEqual(response._body, 'RESULT')
+ self.assertEqual(_err_hook._called_with, None)
+
+ def test_w_REMOTE_USER(self):
+ request = DummyRequest(PATH_INFO='/', REMOTE_USER='phred')
+ response = request.response = DummyResponse()
+ _before = DummyCallable()
+ _after = object()
+ _object = DummyCallable()
+ _object._result = 'RESULT'
+ request._traverse_to = _object
+ _realm = 'TESTING'
+ _debug_mode = True
+ _err_hook = DummyCallable()
+ _validated_hook = object()
+ _tm = DummyTM()
+ _gmi = DummyCallable()
+ _gmi._result = (_before, _after, _object, _realm, _debug_mode,
+ _err_hook, _validated_hook, _tm)
+ self._callFUT(request, 'okmodule', _gmi)
+ self.assertEqual(response.realm, None)
+
+
+class Test_publish_module(unittest.TestCase):
+
+ def setUp(self):
+ from zope.testing.cleanup import cleanUp
+
+ cleanUp()
+
+ def tearDown(self):
+ from zope.testing.cleanup import cleanUp
+ cleanUp()
+
+ def _callFUT(self, environ, start_response,
+ _publish=None, _response_factory=None, _request_factory=None):
+ from ZPublisher.WSGIPublisher import publish_module
+ if _publish is not None:
+ if _response_factory is not None:
+ if _request_factory is not None:
+ return publish_module(environ, start_response, _publish,
+ _response_factory, _request_factory)
+ return publish_module(environ, start_response, _publish,
+ _response_factory)
+ else:
+ if _request_factory is not None:
+ return publish_module(environ, start_response, _publish,
+ _request_factory=_request_factory)
+ return publish_module(environ, start_response, _publish)
+ return publish_module(environ, start_response)
+
+ def _registerView(self, factory, name, provides=None):
+ from zope.component import provideAdapter
+ from zope.interface import Interface
+ from zope.publisher.browser import IDefaultBrowserLayer
+ from OFS.interfaces import IApplication
+ if provides is None:
+ provides = Interface
+ requires = (IApplication, IDefaultBrowserLayer)
+ provideAdapter(factory, requires, provides, name)
+
+ def _makeEnviron(self, **kw):
+ from StringIO import StringIO
+ environ = {
+ 'SCRIPT_NAME' : '',
+ 'REQUEST_METHOD' : 'GET',
+ 'QUERY_STRING' : '',
+ 'SERVER_NAME' : '127.0.0.1',
+ 'REMOTE_ADDR': '127.0.0.1',
+ 'wsgi.url_scheme': 'http',
+ 'SERVER_PORT': '8080',
+ 'HTTP_HOST': '127.0.0.1:8080',
+ 'SERVER_PROTOCOL' : 'HTTP/1.1',
+ 'wsgi.input' : StringIO(''),
+ 'CONTENT_LENGTH': '0',
+ 'HTTP_CONNECTION': 'keep-alive',
+ 'CONTENT_TYPE': ''
+ }
+ environ.update(kw)
+ return environ
+
+ def test_calls_setDefaultSkin(self):
+ from zope.traversing.interfaces import ITraversable
+ from zope.traversing.namespace import view
+
+ class TestView:
+ __name__ = 'testing'
+
+ def __init__(self, context, request):
+ pass
+
+ def __call__(self):
+ return 'foobar'
+
+ # Define the views
+ self._registerView(TestView, 'testing')
+
+ # Bind the 'view' namespace (for @@ traversal)
+ self._registerView(view, 'view', ITraversable)
+
+ environ = self._makeEnviron(PATH_INFO='/@@testing')
+ self.assertEqual(self._callFUT(environ, noopStartResponse),
+ ('', 'foobar'))
+
+ def test_publish_can_return_new_response(self):
+ from ZPublisher.HTTPRequest import HTTPRequest
+ _response = DummyResponse()
+ _response.body = 'BODY'
+ _after1 = DummyCallable()
+ _after2 = DummyCallable()
+ _response.after_list = (_after1, _after2)
+ environ = self._makeEnviron()
+ start_response = DummyCallable()
+ _publish = DummyCallable()
+ _publish._result = _response
+ app_iter = self._callFUT(environ, start_response, _publish)
+ self.assertEqual(app_iter, ('', 'BODY'))
+ (status, headers), kw = start_response._called_with
+ self.assertEqual(status, '204 No Content')
+ self.assertEqual(headers, [('Content-Length', '0')])
+ self.assertEqual(kw, {})
+ (request, module), kw = _publish._called_with
+ self.failUnless(isinstance(request, HTTPRequest))
+ self.assertEqual(module, 'Zope2')
+ self.assertEqual(kw, {})
+ self.failUnless(_response._finalized)
+ self.assertEqual(_after1._called_with, ((), {}))
+ self.assertEqual(_after2._called_with, ((), {}))
+
+ def test_swallows_Unauthorized(self):
+ from zExceptions import Unauthorized
+ environ = self._makeEnviron()
+ start_response = DummyCallable()
+ _publish = DummyCallable()
+ _publish._raise = Unauthorized('TESTING')
+ app_iter = self._callFUT(environ, start_response, _publish)
+ self.assertEqual(app_iter, ('', ''))
+ (status, headers), kw = start_response._called_with
+ self.assertEqual(status, '401 Unauthorized')
+ self.failUnless(('Content-Length', '0') in headers)
+ self.assertEqual(kw, {})
+
+ def test_swallows_Redirect(self):
+ from zExceptions import Redirect
+ environ = self._makeEnviron()
+ start_response = DummyCallable()
+ _publish = DummyCallable()
+ _publish._raise = Redirect('/redirect_to')
+ app_iter = self._callFUT(environ, start_response, _publish)
+ self.assertEqual(app_iter, ('', ''))
+ (status, headers), kw = start_response._called_with
+ self.assertEqual(status, '302 Moved Temporarily')
+ self.failUnless(('Location', '/redirect_to') in headers)
+ self.failUnless(('Content-Length', '0') in headers)
+ self.assertEqual(kw, {})
+
+ def test_response_body_is_file(self):
+ class DummyFile(file):
+ def __init__(self):
+ pass
+ def read(self, *args, **kw):
+ raise NotImplementedError()
+ _response = DummyResponse()
+ _response._status = '200 OK'
+ _response._headers = [('Content-Length', '4')]
+ body = _response.body = DummyFile()
+ environ = self._makeEnviron()
+ start_response = DummyCallable()
+ _publish = DummyCallable()
+ _publish._result = _response
+ app_iter = self._callFUT(environ, start_response, _publish)
+ self.failUnless(app_iter is body)
+
+ def test_request_closed_when_tm_middleware_not_active(self):
+ environ = self._makeEnviron()
+ start_response = DummyCallable()
+ _request = DummyRequest()
+ _request._closed = False
+ def _close():
+ _request._closed = True
+ _request.close = _close
+ def _request_factory(stdin, environ, response):
+ return _request
+ _publish = DummyCallable()
+ _publish._result = DummyResponse()
+ app_iter = self._callFUT(environ, start_response, _publish,
+ _request_factory=_request_factory)
+ self.failUnless(_request._closed)
+
+ def test_request_not_closed_when_tm_middleware_active(self):
+ import transaction
+ environ = self._makeEnviron()
+ environ['repoze.tm.active'] = 1
+ start_response = DummyCallable()
+ _request = DummyRequest()
+ _request._closed = False
+ def _close():
+ _request._closed = True
+ _request.close = _close
+ def _request_factory(stdin, environ, response):
+ return _request
+ _publish = DummyCallable()
+ _publish._result = DummyResponse()
+ app_iter = self._callFUT(environ, start_response, _publish,
+ _request_factory=_request_factory)
+ self.failIf(_request._closed)
+ txn = transaction.get()
+ self.failUnless(list(txn.getAfterCommitHooks()))
+
+
+class DummyRequest(dict):
+ _processedInputs = False
+ _traversed = None
+ _traverse_to = None
+ args = ()
+
+ def processInputs(self):
+ self._processedInputs = True
+
+ def traverse(self, path, response=None, validated_hook=None):
+ self._traversed = (path, response, validated_hook)
+ return self._traverse_to
+
+class DummyResponse(object):
+ debug_mode = False
+ after_list = ()
+ realm = None
+ _body = None
+ _finalized = False
+ _status = '204 No Content'
+ _headers = [('Content-Length', '0')]
+
+ def finalize(self):
+ self._finalized = True
+ return self._status, self._headers
+
+ def setBody(self, body):
+ self._body = body
+
+ body = property(lambda self: self._body, setBody)
+
+class DummyCallable(object):
+ _called_with = _raise = _result = None
+
+ def __call__(self, *args, **kw):
+ self._called_with = (args, kw)
+ if self._raise:
+ raise self._raise
+ return self._result
+
+class DummyTM(object):
+ _recorded = _raise = _result = None
+
+ def recordMetaData(self, *args):
+ self._recorded = args
+
+def noopStartResponse(status, headers):
+ pass
+
+
def test_suite():
- suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(WSGIResponseTests))
- return suite
+ return unittest.TestSuite((
+ unittest.makeSuite(WSGIResponseTests),
+ unittest.makeSuite(Test_publish),
+ unittest.makeSuite(Test_publish_module),
+ ))
Modified: Zope/trunk/src/ZServer/HTTPResponse.py
===================================================================
--- Zope/trunk/src/ZServer/HTTPResponse.py 2010-06-01 21:22:15 UTC (rev 112893)
+++ Zope/trunk/src/ZServer/HTTPResponse.py 2010-06-01 21:43:36 UTC (rev 112894)
@@ -140,7 +140,7 @@
val = val.replace('\n\t', '\r\n\t')
append("%s: %s" % (key, val))
if self.cookies:
- chunks.extend(self._cookie_list())
+ chunks.extend(['%s: %s' % x for x in self._cookie_list()])
append('')
append(body)
Modified: Zope/trunk/src/Zope2/Startup/run.py
===================================================================
--- Zope/trunk/src/Zope2/Startup/run.py 2010-06-01 21:22:15 UTC (rev 112893)
+++ Zope/trunk/src/Zope2/Startup/run.py 2010-06-01 21:43:36 UTC (rev 112894)
@@ -52,6 +52,22 @@
App.config.setConfiguration(opts.configroot)
return opts
+def make_wsgi_app(global_config, zope_conf):
+ from App.config import setConfiguration
+ from Zope2.Startup import get_starter
+ from Zope2.Startup.handlers import handleConfig
+ from Zope2.Startup.options import ZopeOptions
+ from ZPublisher.WSGIPublisher import publish_module
+ starter = get_starter()
+ opts = ZopeOptions()
+ opts.configfile = zope_conf
+ opts.realize(args=(), progname='Zope2WSGI', raise_getopt_errs=False)
+ handleConfig(opts.configroot, opts.confighandlers)
+ setConfiguration(opts.configroot)
+ starter.setConfiguration(opts.configroot)
+ starter.prepare()
+ return publish_module
+
if __name__ == '__main__':
run()
Added: Zope/trunk/src/Zope2/utilities/adduser.py
===================================================================
--- Zope/trunk/src/Zope2/utilities/adduser.py (rev 0)
+++ Zope/trunk/src/Zope2/utilities/adduser.py 2010-06-01 21:43:36 UTC (rev 112894)
@@ -0,0 +1,32 @@
+##############################################################################
+#
+# This was yanked out of repoze.zope2
+#
+##############################################################################
+
+""" Add a Zope management user to the root Zope user folder """
+
+import sys
+from Zope2.utilities.finder import ZopeFinder
+
+def adduser(app, user, pwd):
+ import transaction
+ result = app.acl_users._doAddUser(user, pwd, ['Manager'], [])
+ transaction.commit()
+ return result
+
+def main(argv=sys.argv):
+ import sys
+ try:
+ user, pwd = argv[1], argv[2]
+ except IndexError:
+ print "%s <username> <password>" % argv[0]
+ sys.exit(255)
+ finder = ZopeFinder(argv)
+ finder.filter_warnings()
+ app = finder.get_app()
+ adduser(app, user, pwd)
+
+if __name__ == '__main__':
+ main()
+
Added: Zope/trunk/src/Zope2/utilities/finder.py
===================================================================
--- Zope/trunk/src/Zope2/utilities/finder.py (rev 0)
+++ Zope/trunk/src/Zope2/utilities/finder.py 2010-06-01 21:43:36 UTC (rev 112894)
@@ -0,0 +1,41 @@
+##############################################################################
+#
+# yanked from repoze.zope2
+#
+##############################################################################
+
+import os
+
+class ZopeFinder:
+ def __init__(self, argv):
+ self.cmd = argv[0]
+
+ def filter_warnings(self):
+ import warnings
+ warnings.simplefilter('ignore', Warning, append=True)
+
+ def get_app(self, config_file=None):
+ # given a config file, return a Zope application object
+ if config_file is None:
+ config_file = self.get_zope_conf()
+ from Zope2.Startup import options, handlers
+ import App.config
+ import Zope2
+ opts = options.ZopeOptions()
+ opts.configfile = config_file
+ opts.realize(args=[], doc="", raise_getopt_errs=0)
+ handlers.handleConfig(opts.configroot, opts.confighandlers)
+ App.config.setConfiguration(opts.configroot)
+ app = Zope2.app()
+ return app
+
+ def get_zope_conf(self):
+ # the default config file path is assumed to live in
+ # $instance_home/etc/zope.conf, and the console scripts that use this
+ # are assumed to live in $instance_home/bin; override if the
+ # environ contains "ZOPE_CONF".
+ ihome = os.path.dirname(os.path.abspath(os.path.dirname(self.cmd)))
+ default_config_file = os.path.join(ihome, 'etc', 'zope.conf')
+ zope_conf = os.environ.get('ZOPE_CONF', default_config_file)
+ return zope_conf
+
More information about the Zope-Checkins
mailing list