[Zope3-checkins] SVN: Zope3/trunk/src/zope/app/tests/functional.py
- refactor cookie handling in the functional test framework
Fred L. Drake, Jr.
fdrake at gmail.com
Tue Jan 11 12:24:49 EST 2005
Log message for revision 28779:
- refactor cookie handling in the functional test framework
- add support for cookies in functional doctests
Changed:
U Zope3/trunk/src/zope/app/tests/functional.py
-=-
Modified: Zope3/trunk/src/zope/app/tests/functional.py
===================================================================
--- Zope3/trunk/src/zope/app/tests/functional.py 2005-01-11 14:00:30 UTC (rev 28778)
+++ Zope3/trunk/src/zope/app/tests/functional.py 2005-01-11 17:24:49 UTC (rev 28779)
@@ -143,7 +143,7 @@
# strang FunctionalTestSetup is. Later, when we
# have time, we should clean up this (perhaps with an
# event) and clean up FunctionalTestSetup.
- response = http(grant_request, handle_errors=False)
+ response = HTTPCaller()(grant_request, handle_errors=False)
FunctionalTestSetup().connection = None
elif config_file and config_file != self._config_file:
@@ -205,17 +205,38 @@
def abort(self):
abort()
-class BrowserTestCase(FunctionalTestCase):
- """Functional test case for Browser requests."""
- def setUp(self):
- super(BrowserTestCase, self).setUp()
+class CookieHandler(object):
+
+ def __init__(self, *args, **kw):
# Somewhere to store cookies between consecutive requests
self.cookies = SimpleCookie()
+ super(CookieHandler, self).__init__(*args, **kw)
+ def httpCookie(self, path):
+ """Return self.cookies as an HTTP_COOKIE environment value."""
+ l = [m.OutputString() for m in self.cookies.values()
+ if path.startswith(m['path'])]
+ return '; '.join(l)
+
+ def loadCookies(self, envstring):
+ self.cookies.load(envstring)
+
+ def saveCookies(self, response):
+ """Save cookies from the response."""
+ # Urgh - need to play with the response's privates to extract
+ # cookies that have been set
+ for k,v in response._cookies.items():
+ k = k.encode('utf8')
+ self.cookies[k] = v['value'].encode('utf8')
+ if self.cookies[k].has_key('Path'):
+ self.cookies[k]['Path'] = v['Path']
+
+
+class BrowserTestCase(CookieHandler, FunctionalTestCase):
+ """Functional test case for Browser requests."""
+
def tearDown(self):
- del self.cookies
-
self.setSite(None)
super(BrowserTestCase, self).tearDown()
@@ -246,7 +267,7 @@
outstream = HTTPTaskStub()
environment = {"HTTP_HOST": 'localhost',
"HTTP_REFERER": 'localhost',
- "HTTP_COOKIE": self.__http_cookie(path)}
+ "HTTP_COOKIE": self.httpCookie(path)}
environment.update(env)
app = FunctionalTestSetup().getApplication()
request = app._request(path, '', outstream,
@@ -256,12 +277,6 @@
zope.interface.directlyProvides(request, _getDefaultSkin())
return request
- def __http_cookie(self, path):
- '''Return self.cookies as an HTTP_COOKIE environment format string'''
- l = [m.OutputString() for m in self.cookies.values()
- if path.startswith(m['path'])]
- return '; '.join(l)
-
def publish(self, path, basic=None, form=None, env={},
handle_errors=False):
"""Renders an object at a given location.
@@ -284,22 +299,16 @@
# We pull it apart and reassemble the header to block cookies
# with invalid paths going through, which may or may not be correct
if env.has_key('HTTP_COOKIE'):
- self.cookies.load(env['HTTP_COOKIE'])
+ self.loadCookies(env['HTTP_COOKIE'])
del env['HTTP_COOKIE'] # Added again in makeRequest
request = self.makeRequest(path, basic=basic, form=form, env=env,
outstream=outstream)
response = ResponseWrapper(request.response, outstream, path)
if env.has_key('HTTP_COOKIE'):
- self.cookies.load(env['HTTP_COOKIE'])
+ self.loadCookies(env['HTTP_COOKIE'])
publish(request, handle_errors=handle_errors)
- # Urgh - need to play with the response's privates to extract
- # cookies that have been set
- for k,v in response._cookies.items():
- k = k.encode('utf8')
- self.cookies[k] = v['value'].encode('utf8')
- if self.cookies[k].has_key('Path'):
- self.cookies[k]['Path'] = v['Path']
+ self.saveCookies(response)
self.setSite(old_site)
return response
@@ -474,82 +483,7 @@
def getBody(self):
return self.getOutput()
-def http(request_string, handle_errors=True):
- """Execute an HTTP request string via the publisher
- This is used for HTTP doc tests.
- """
- # Commit work done by previous python code.
- commit()
-
- # Discard leading white space to make call layout simpler
- request_string = request_string.lstrip()
-
- # split off and parse the command line
- l = request_string.find('\n')
- command_line = request_string[:l].rstrip()
- request_string = request_string[l+1:]
- method, path, protocol = command_line.split()
- path = urllib.unquote(path)
-
-
- instream = StringIO(request_string)
- environment = {"HTTP_HOST": 'localhost',
- "HTTP_REFERER": 'localhost',
- "REQUEST_METHOD": method,
- "SERVER_PROTOCOL": protocol,
- }
-
- headers = [split_header(header)
- for header in rfc822.Message(instream).headers]
- for name, value in headers:
- name = ('_'.join(name.upper().split('-')))
- if name not in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
- name = 'HTTP_' + name
- environment[name] = value.rstrip()
-
- auth_key = 'HTTP_AUTHORIZATION'
- if environment.has_key(auth_key):
- environment[auth_key] = auth_header(environment[auth_key])
-
- outstream = HTTPTaskStub()
-
-
- old_site = getSite()
- setSite(None)
- app = FunctionalTestSetup().getApplication()
- header_output = HTTPHeaderOutput(
- protocol, ('x-content-type-warning', 'x-powered-by'))
-
- if method in ('GET', 'POST', 'HEAD'):
- if (method == 'POST' and
- environment.get('CONTENT_TYPE', '').startswith('text/xml')
- ):
- request_cls = XMLRPCRequest
- publication_cls = XMLRPCPublication
- else:
- request_cls = type(BrowserRequest.__name__, (BrowserRequest,), {})
- zope.interface.classImplements(request_cls, _getDefaultSkin())
- publication_cls = BrowserPublication
- else:
- request_cls = HTTPRequest
- publication_cls = HTTPPublication
-
- request = app._request(path, instream, outstream,
- environment=environment,
- request=request_cls, publication=publication_cls)
- request.response.setHeaderOutput(header_output)
- response = DocResponseWrapper(request.response, outstream, path,
- header_output)
-
- publish(request, handle_errors=handle_errors)
- setSite(old_site)
-
- # sync Python connection:
- getRootFolder()._p_jar.sync()
-
- return response
-
headerre = re.compile('(\S+): (.+)$')
def split_header(header):
return headerre.match(header).group(1, 2)
@@ -603,9 +537,88 @@
suite.addTest(unittest.makeSuite(SampleFunctionalTest))
return suite
+
+class HTTPCaller(CookieHandler):
+ """Execute an HTTP request string via the publisher"""
+
+ def __call__(self, request_string, handle_errors=True):
+ # Commit work done by previous python code.
+ commit()
+
+ # Discard leading white space to make call layout simpler
+ request_string = request_string.lstrip()
+
+ # split off and parse the command line
+ l = request_string.find('\n')
+ command_line = request_string[:l].rstrip()
+ request_string = request_string[l+1:]
+ method, path, protocol = command_line.split()
+ path = urllib.unquote(path)
+
+ instream = StringIO(request_string)
+ environment = {"HTTP_COOKIE": self.httpCookie(path),
+ "HTTP_HOST": 'localhost',
+ "HTTP_REFERER": 'localhost',
+ "REQUEST_METHOD": method,
+ "SERVER_PROTOCOL": protocol,
+ }
+
+ headers = [split_header(header)
+ for header in rfc822.Message(instream).headers]
+ for name, value in headers:
+ name = ('_'.join(name.upper().split('-')))
+ if name not in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
+ name = 'HTTP_' + name
+ environment[name] = value.rstrip()
+
+ auth_key = 'HTTP_AUTHORIZATION'
+ if environment.has_key(auth_key):
+ environment[auth_key] = auth_header(environment[auth_key])
+
+ outstream = HTTPTaskStub()
+
+ old_site = getSite()
+ setSite(None)
+ app = FunctionalTestSetup().getApplication()
+ header_output = HTTPHeaderOutput(
+ protocol, ('x-content-type-warning', 'x-powered-by'))
+
+ if method in ('GET', 'POST', 'HEAD'):
+ if (method == 'POST' and
+ environment.get('CONTENT_TYPE', '').startswith('text/xml')
+ ):
+ request_cls = XMLRPCRequest
+ publication_cls = XMLRPCPublication
+ else:
+ request_cls = type(BrowserRequest.__name__,
+ (BrowserRequest,),
+ {})
+ zope.interface.classImplements(request_cls, _getDefaultSkin())
+ publication_cls = BrowserPublication
+ else:
+ request_cls = HTTPRequest
+ publication_cls = HTTPPublication
+
+ request = app._request(
+ path, instream, outstream,
+ environment=environment,
+ request=request_cls, publication=publication_cls)
+ request.response.setHeaderOutput(header_output)
+ response = DocResponseWrapper(
+ request.response, outstream, path, header_output)
+
+ publish(request, handle_errors=handle_errors)
+ self.saveCookies(response)
+ setSite(old_site)
+
+ # sync Python connection:
+ getRootFolder()._p_jar.sync()
+
+ return response
+
def FunctionalDocFileSuite(*paths, **kw):
globs = kw.setdefault('globs', {})
- globs['http'] = http
+ globs['http'] = HTTPCaller()
globs['getRootFolder'] = getRootFolder
globs['sync'] = sync
More information about the Zope3-Checkins
mailing list