[Zope3-checkins]
SVN: Zope3/branches/srichter-blow-services/src/zope/app/testing/
All test support code is now in zope.app.testing. Code from
any tests
Stephan Richter
srichter at cosmos.phy.tufts.edu
Tue Dec 21 11:55:28 EST 2004
Log message for revision 28669:
All test support code is now in zope.app.testing. Code from any tests
module or package should *never* be reused.
Changed:
A Zope3/branches/srichter-blow-services/src/zope/app/testing/
A Zope3/branches/srichter-blow-services/src/zope/app/testing/__init__.py
A Zope3/branches/srichter-blow-services/src/zope/app/testing/dochttp.py
A Zope3/branches/srichter-blow-services/src/zope/app/testing/dochttp.txt
A Zope3/branches/srichter-blow-services/src/zope/app/testing/functional.py
A Zope3/branches/srichter-blow-services/src/zope/app/testing/placelesssetup.py
A Zope3/branches/srichter-blow-services/src/zope/app/testing/recorded/
A Zope3/branches/srichter-blow-services/src/zope/app/testing/setup.py
A Zope3/branches/srichter-blow-services/src/zope/app/testing/test.py
A Zope3/branches/srichter-blow-services/src/zope/app/testing/tests.py
A Zope3/branches/srichter-blow-services/src/zope/app/testing/ztapi.py
-=-
Added: Zope3/branches/srichter-blow-services/src/zope/app/testing/__init__.py
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/testing/__init__.py 2004-12-21 16:39:25 UTC (rev 28668)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/__init__.py 2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1 @@
+# Make directory a Python package.
Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/dochttp.py (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/dochttp.py)
Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/dochttp.txt (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/dochttp.txt)
Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/functional.py (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/functional.py)
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/tests/functional.py 2004-12-17 21:36:22 UTC (rev 28644)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/functional.py 2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1,634 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Functional testing framework for Zope 3.
+
+There should be a file 'ftesting.zcml' in the current directory.
+
+$Id$
+"""
+import logging
+import re
+import rfc822
+import sys
+import traceback
+import unittest
+import urllib
+
+from StringIO import StringIO
+from Cookie import SimpleCookie
+
+from transaction import abort, commit
+from ZODB.DB import DB
+from ZODB.DemoStorage import DemoStorage
+import zope.interface
+from zope.publisher.browser import BrowserRequest
+from zope.publisher.http import HTTPRequest
+from zope.publisher.publish import publish
+from zope.publisher.xmlrpc import XMLRPCRequest
+from zope.security.interfaces import Forbidden, Unauthorized
+from zope.security.management import endInteraction
+import zope.publisher.interfaces.http
+from zope.testing import doctest
+
+import zope.app.pluggableauth
+import zope.app.testing.setup
+
+from zope.app import zapi
+from zope.app.debug import Debugger
+from zope.app.publication.http import HTTPPublication
+from zope.app.publication.browser import BrowserPublication
+from zope.app.publication.xmlrpc import XMLRPCPublication
+from zope.app.publication.zopepublication import ZopePublication
+from zope.app.publication.http import HTTPPublication
+from zope.publisher.interfaces.browser import IDefaultBrowserLayer
+from zope.publisher.interfaces.browser import IDefaultSkin
+from zope.publisher.interfaces.browser import IBrowserRequest
+from zope.app.component.hooks import setSite, getSite
+
+HTTPTaskStub = StringIO
+
+
+class ResponseWrapper(object):
+ """A wrapper that adds several introspective methods to a response."""
+
+ def __init__(self, response, outstream, path):
+ self._response = response
+ self._outstream = outstream
+ self._path = path
+
+ def getOutput(self):
+ """Returns the full HTTP output (headers + body)"""
+ return self._outstream.getvalue()
+
+ def getBody(self):
+ """Returns the response body"""
+ output = self._outstream.getvalue()
+ idx = output.find('\r\n\r\n')
+ if idx == -1:
+ return None
+ else:
+ return output[idx+4:]
+
+ def getPath(self):
+ """Returns the path of the request"""
+ return self._path
+
+ def __getattr__(self, attr):
+ return getattr(self._response, attr)
+
+
+def _getDefaultSkin():
+ """Returns the current default skin as an interface."""
+ adapters = zapi.getService(zapi.servicenames.Adapters)
+ skin = adapters.lookup((IBrowserRequest,), IDefaultSkin, '')
+ return skin or IDefaultBrowserLayer
+
+
+grant_request = (r"""
+POST /@@grant.html HTTP/1.1
+Authorization: Basic Z2xvYmFsbWdyOmdsb2JhbG1ncnB3
+Content-Length: 5796
+Content-Type: application/x-www-form-urlencoded
+
+field.principal=em9wZS5tZ3I_"""
+"""&field.principal.displayed=y"""
+"""&GRANT_SUBMIT=Change"""
+"""&field.em9wZS5tZ3I_.role.zope.Manager=allow"""
+"""&field.em9wZS5tZ3I_.role.zope.Manager-empty-marker=1""")
+
+class FunctionalTestSetup(object):
+ """Keeps shared state across several functional test cases."""
+
+ __shared_state = { '_init': False }
+
+ def __init__(self, config_file=None):
+ """Initializes Zope 3 framework.
+
+ Creates a volatile memory storage. Parses Zope3 configuration files.
+ """
+ self.__dict__ = self.__shared_state
+
+ if not self._init:
+
+ # Make sure unit tests are cleaned up
+ zope.app.tests.setup.placefulSetUp()
+ zope.app.tests.setup.placefulTearDown()
+
+ if not config_file:
+ config_file = 'ftesting.zcml'
+ self.log = StringIO()
+ # Make it silent but keep the log available for debugging
+ logging.root.addHandler(logging.StreamHandler(self.log))
+ self.base_storage = DemoStorage("Memory Storage")
+ self.db = DB(self.base_storage)
+ self.app = Debugger(self.db, config_file)
+ self.connection = None
+ self._config_file = config_file
+ self._init = True
+
+ # Make a local grant for the test user
+ # TODO, find a better way to make this grant happen.
+ # The way I did this is way too messy, given how
+ # strang FunctionalTestSetup is. Later, when we
+ # have time, we should clean up this (perhaps with an
+ # event) and clean up FunctionalTestSetup.
+ response = http(grant_request, handle_errors=False)
+ FunctionalTestSetup().connection = None
+
+ elif config_file and config_file != self._config_file:
+ # Running different tests with different configurations is not
+ # supported at the moment
+ raise NotImplementedError('Already configured'
+ ' with a different config file')
+
+ def setUp(self):
+ """Prepares for a functional test case."""
+ # Tear down the old demo storage (if any) and create a fresh one
+ abort()
+ self.db.close()
+ storage = DemoStorage("Demo Storage", self.base_storage)
+ self.db = self.app.db = DB(storage)
+ self.connection = None
+
+ def tearDown(self):
+ """Cleans up after a functional test case."""
+ abort()
+ if self.connection:
+ self.connection.close()
+ self.connection = None
+ self.db.close()
+
+ def getRootFolder(self):
+ """Returns the Zope root folder."""
+ if not self.connection:
+ self.connection = self.db.open()
+ root = self.connection.root()
+ return root[ZopePublication.root_name]
+
+ def getApplication(self):
+ """Returns the Zope application instance."""
+ return self.app
+
+
+class FunctionalTestCase(unittest.TestCase):
+ """Functional test case."""
+
+ def setUp(self):
+ """Prepares for a functional test case."""
+ super(FunctionalTestCase, self).setUp()
+ FunctionalTestSetup().setUp()
+
+ def tearDown(self):
+ """Cleans up after a functional test case."""
+
+ FunctionalTestSetup().tearDown()
+ super(FunctionalTestCase, self).tearDown()
+
+ def getRootFolder(self):
+ """Returns the Zope root folder."""
+ return FunctionalTestSetup().getRootFolder()
+
+ def commit(self):
+ commit()
+
+ def abort(self):
+ abort()
+
+class BrowserTestCase(FunctionalTestCase):
+ """Functional test case for Browser requests."""
+
+ def setUp(self):
+ super(BrowserTestCase, self).setUp()
+ # Somewhere to store cookies between consecutive requests
+ self.cookies = SimpleCookie()
+
+ def tearDown(self):
+ del self.cookies
+
+ self.setSite(None)
+ super(BrowserTestCase, self).tearDown()
+
+ def setSite(self, site):
+ """Set the site which will be used to look up local services"""
+ setSite(site)
+
+ def getSite(self):
+ """Returns the site which is used to look up local services"""
+ return getSite()
+
+ def makeRequest(self, path='', basic=None, form=None, env={},
+ outstream=None):
+ """Creates a new request object.
+
+ Arguments:
+ path -- the path to be traversed (e.g. "/folder1/index.html")
+ basic -- basic HTTP authentication credentials ("user:password")
+ form -- a dictionary emulating a form submission
+ (Note that field values should be Unicode strings)
+ env -- a dictionary of additional environment variables
+ (You can emulate HTTP request header
+ X-Header: foo
+ by adding 'HTTP_X_HEADER': 'foo' to env)
+ outstream -- a stream where the HTTP response will be written
+ """
+ if outstream is None:
+ outstream = HTTPTaskStub()
+ environment = {"HTTP_HOST": 'localhost',
+ "HTTP_REFERER": 'localhost',
+ "HTTP_COOKIE": self.__http_cookie(path)}
+ environment.update(env)
+ app = FunctionalTestSetup().getApplication()
+ request = app._request(path, '', outstream,
+ environment=environment,
+ basic=basic, form=form,
+ request=BrowserRequest)
+ zope.interface.directlyProvides(request, _getDefaultSkin())
+ return request
+
+ def __http_cookie(self, path):
+ '''Return self.cookies as an HTTP_COOKIE environment format string'''
+ l = [m.OutputString() for m in self.cookies.values()
+ if path.startswith(m['path'])]
+ return '; '.join(l)
+
+ def publish(self, path, basic=None, form=None, env={},
+ handle_errors=False):
+ """Renders an object at a given location.
+
+ Arguments are the same as in makeRequest with the following exception:
+ handle_errors -- if False (default), exceptions will not be caught
+ if True, exceptions will return a formatted error
+ page.
+
+ Returns the response object enhanced with the following methods:
+ getOutput() -- returns the full HTTP output as a string
+ getBody() -- returns the full response body as a string
+ getPath() -- returns the path used in the request
+ """
+ outstream = HTTPTaskStub()
+ old_site = self.getSite()
+ self.setSite(None)
+ # A cookie header has been sent - ensure that future requests
+ # in this test also send the cookie, as this is what browsers do.
+ # We pull it apart and reassemble the header to block cookies
+ # with invalid paths going through, which may or may not be correct
+ if env.has_key('HTTP_COOKIE'):
+ self.cookies.load(env['HTTP_COOKIE'])
+ del env['HTTP_COOKIE'] # Added again in makeRequest
+
+ request = self.makeRequest(path, basic=basic, form=form, env=env,
+ outstream=outstream)
+ response = ResponseWrapper(request.response, outstream, path)
+ if env.has_key('HTTP_COOKIE'):
+ self.cookies.load(env['HTTP_COOKIE'])
+ publish(request, handle_errors=handle_errors)
+ # Urgh - need to play with the response's privates to extract
+ # cookies that have been set
+ for k,v in response._cookies.items():
+ k = k.encode('utf8')
+ self.cookies[k] = v['value'].encode('utf8')
+ if self.cookies[k].has_key('Path'):
+ self.cookies[k]['Path'] = v['Path']
+ self.setSite(old_site)
+ return response
+
+ def checkForBrokenLinks(self, body, path, basic=None):
+ """Looks for broken links in a page by trying to traverse relative
+ URIs.
+ """
+ if not body: return
+
+ old_site = self.getSite()
+ self.setSite(None)
+
+ from htmllib import HTMLParser
+ from formatter import NullFormatter
+ class SimpleHTMLParser(HTMLParser):
+ def __init__(self, fmt, base):
+ HTMLParser.__init__(self, fmt)
+ self.base = base
+ def do_base(self, attrs):
+ self.base = dict(attrs).get('href', self.base)
+
+ parser = SimpleHTMLParser(NullFormatter(), path)
+ parser.feed(body)
+ parser.close()
+ base = parser.base
+ while not base.endswith('/'):
+ base = base[:-1]
+ if base.startswith('http://localhost/'):
+ base = base[len('http://localhost/') - 1:]
+
+ errors = []
+ for a in parser.anchorlist:
+ if a.startswith('http://localhost/'):
+ a = a[len('http://localhost/') - 1:]
+ elif a.find(':') != -1:
+ # Assume it is an external link
+ continue
+ elif not a.startswith('/'):
+ a = base + a
+ if a.find('#') != -1:
+ a = a[:a.index('#') - 1]
+ # XXX what about queries (/path/to/foo?bar=baz&etc)?
+ request = None
+ try:
+ try:
+ request = self.makeRequest(a, basic=basic)
+ publication = request.publication
+ request.processInputs()
+ publication.beforeTraversal(request)
+ object = publication.getApplication(request)
+ object = request.traverse(object)
+ publication.afterTraversal(request, object)
+ except (KeyError, NameError, AttributeError, Unauthorized, Forbidden):
+ e = traceback.format_exception_only(*sys.exc_info()[:2])[-1]
+ errors.append((a, e.strip()))
+ finally:
+ publication.endRequest(request, object)
+ self.setSite(old_site)
+ # Bad Things(TM) related to garbage collection and special
+ # __del__ methods happen if request.close() is not called here
+ if request:
+ request.close()
+ if errors:
+ self.fail("%s contains broken links:\n" % path
+ + "\n".join([" %s:\t%s" % (a, e) for a, e in errors]))
+
+
+class HTTPTestCase(FunctionalTestCase):
+ """Functional test case for HTTP requests."""
+
+ def makeRequest(self, path='', basic=None, form=None, env={},
+ instream=None, outstream=None):
+ """Creates a new request object.
+
+ Arguments:
+ path -- the path to be traversed (e.g. "/folder1/index.html")
+ basic -- basic HTTP authentication credentials ("user:password")
+ form -- a dictionary emulating a form submission
+ (Note that field values should be Unicode strings)
+ env -- a dictionary of additional environment variables
+ (You can emulate HTTP request header
+ X-Header: foo
+ by adding 'HTTP_X_HEADER': 'foo' to env)
+ instream -- a stream from where the HTTP request will be read
+ outstream -- a stream where the HTTP response will be written
+ """
+ if outstream is None:
+ outstream = HTTPTaskStub()
+ if instream is None:
+ instream = ''
+ environment = {"HTTP_HOST": 'localhost',
+ "HTTP_REFERER": 'localhost'}
+ environment.update(env)
+ app = FunctionalTestSetup().getApplication()
+ request = app._request(path, instream, outstream,
+ environment=environment,
+ basic=basic, form=form,
+ request=HTTPRequest, publication=HTTPPublication)
+ return request
+
+ def publish(self, path, basic=None, form=None, env={},
+ handle_errors=False, request_body=''):
+ """Renders an object at a given location.
+
+ Arguments are the same as in makeRequest with the following exception:
+ handle_errors -- if False (default), exceptions will not be caught
+ if True, exceptions will return a formatted error
+ page.
+
+ Returns the response object enhanced with the following methods:
+ getOutput() -- returns the full HTTP output as a string
+ getBody() -- returns the full response body as a string
+ getPath() -- returns the path used in the request
+ """
+ outstream = HTTPTaskStub()
+ request = self.makeRequest(path, basic=basic, form=form, env=env,
+ instream=request_body, outstream=outstream)
+ response = ResponseWrapper(request.response, outstream, path)
+ publish(request, handle_errors=handle_errors)
+ return response
+
+
+class HTTPHeaderOutput:
+
+ zope.interface.implements(zope.publisher.interfaces.http.IHeaderOutput)
+
+ def __init__(self, protocol, omit):
+ self.headers = {}
+ self.headersl = []
+ self.protocol = protocol
+ self.omit = omit
+
+ def setResponseStatus(self, status, reason):
+ self.status, self.reason = status, reason
+
+ def setResponseHeaders(self, mapping):
+ self.headers.update(dict(
+ [('-'.join([s.capitalize() for s in name.split('-')]), v)
+ for name, v in mapping.items()
+ if name.lower() not in self.omit]
+ ))
+
+ def appendResponseHeaders(self, lst):
+ headers = [split_header(header) for header in lst]
+ self.headersl.extend(
+ [('-'.join([s.capitalize() for s in name.split('-')]), v)
+ for name, v in headers
+ if name.lower() not in self.omit]
+ )
+
+ def __str__(self):
+ out = ["%s: %s" % header for header in self.headers.items()]
+ out.extend(["%s: %s" % header for header in self.headersl])
+ out.sort()
+ out.insert(0, "%s %s %s" % (self.protocol, self.status, self.reason))
+ return '\n'.join(out)
+
+class DocResponseWrapper(ResponseWrapper):
+ """Response Wrapper for use in doc tests
+ """
+
+ def __init__(self, response, outstream, path, header_output):
+ ResponseWrapper.__init__(self, response, outstream, path)
+ self.header_output = header_output
+
+ def __str__(self):
+ body = self.getOutput()
+ if body:
+ return "%s\n\n%s" % (self.header_output, body)
+ return "%s\n" % (self.header_output)
+
+ def getBody(self):
+ return self.getOutput()
+
+def http(request_string, handle_errors=True):
+ """Execute an HTTP request string via the publisher
+
+ This is used for HTTP doc tests.
+ """
+ # Commit work done by previous python code.
+ commit()
+
+ # Discard leading white space to make call layout simpler
+ request_string = request_string.lstrip()
+
+ # split off and parse the command line
+ l = request_string.find('\n')
+ command_line = request_string[:l].rstrip()
+ request_string = request_string[l+1:]
+ method, path, protocol = command_line.split()
+ path = urllib.unquote(path)
+
+
+ instream = StringIO(request_string)
+ environment = {"HTTP_HOST": 'localhost',
+ "HTTP_REFERER": 'localhost',
+ "REQUEST_METHOD": method,
+ "SERVER_PROTOCOL": protocol,
+ }
+
+ headers = [split_header(header)
+ for header in rfc822.Message(instream).headers]
+ for name, value in headers:
+ name = ('_'.join(name.upper().split('-')))
+ if name not in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
+ name = 'HTTP_' + name
+ environment[name] = value.rstrip()
+
+ auth_key = 'HTTP_AUTHORIZATION'
+ if environment.has_key(auth_key):
+ environment[auth_key] = auth_header(environment[auth_key])
+
+ outstream = HTTPTaskStub()
+
+
+ old_site = getSite()
+ setSite(None)
+ app = FunctionalTestSetup().getApplication()
+ header_output = HTTPHeaderOutput(
+ protocol, ('x-content-type-warning', 'x-powered-by'))
+
+ if method in ('GET', 'POST', 'HEAD'):
+ if (method == 'POST' and
+ environment.get('CONTENT_TYPE', '').startswith('text/xml')
+ ):
+ request_cls = XMLRPCRequest
+ publication_cls = XMLRPCPublication
+ else:
+ request_cls = type(BrowserRequest.__name__, (BrowserRequest,), {})
+ zope.interface.classImplements(request_cls, _getDefaultSkin())
+ publication_cls = BrowserPublication
+ else:
+ request_cls = HTTPRequest
+ publication_cls = HTTPPublication
+
+ request = app._request(path, instream, outstream,
+ environment=environment,
+ request=request_cls, publication=publication_cls)
+ request.response.setHeaderOutput(header_output)
+ response = DocResponseWrapper(request.response, outstream, path,
+ header_output)
+
+ publish(request, handle_errors=handle_errors)
+ setSite(old_site)
+
+ # sync Python connection:
+ getRootFolder()._p_jar.sync()
+
+ return response
+
+headerre = re.compile('(\S+): (.+)$')
+def split_header(header):
+ return headerre.match(header).group(1, 2)
+
+basicre = re.compile('Basic (.+)?:(.+)?$')
+def auth_header(header):
+ match = basicre.match(header)
+ if match:
+ import base64
+ u, p = match.group(1, 2)
+ if u is None:
+ u = ''
+ if p is None:
+ p = ''
+ auth = base64.encodestring('%s:%s' % (u, p))
+ return 'Basic %s' % auth[:-1]
+ return header
+
+def getRootFolder():
+ return FunctionalTestSetup().getRootFolder()
+
+def sync():
+ getRootFolder()._p_jar.sync()
+
+#
+# Sample functional test case
+#
+
+class SampleFunctionalTest(BrowserTestCase):
+
+ def testRootPage(self):
+ response = self.publish('/')
+ self.assertEquals(response.getStatus(), 200)
+
+ def testRootPage_preferred_languages(self):
+ response = self.publish('/', env={'HTTP_ACCEPT_LANGUAGE': 'en'})
+ self.assertEquals(response.getStatus(), 200)
+
+ def testNotExisting(self):
+ response = self.publish('/nosuchthing', handle_errors=True)
+ self.assertEquals(response.getStatus(), 404)
+
+ def testLinks(self):
+ response = self.publish('/')
+ self.assertEquals(response.getStatus(), 200)
+ self.checkForBrokenLinks(response.getBody(), response.getPath())
+
+
+def sample_test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(SampleFunctionalTest))
+ return suite
+
+def FunctionalDocFileSuite(*paths, **kw):
+ globs = kw.setdefault('globs', {})
+ globs['http'] = http
+ globs['getRootFolder'] = getRootFolder
+ globs['sync'] = sync
+
+ kw['package'] = doctest._normalize_module(kw.get('package'))
+
+ kwsetUp = kw.get('setUp')
+ def setUp(test):
+ FunctionalTestSetup().setUp()
+
+ if kwsetUp is not None:
+ kwsetUp(test)
+ kw['setUp'] = setUp
+
+ kwtearDown = kw.get('tearDown')
+ def tearDown(test):
+ if kwtearDown is not None:
+ kwtearDown(test)
+ FunctionalTestSetup().tearDown()
+ kw['tearDown'] = tearDown
+
+ kw['optionflags'] = doctest.ELLIPSIS | doctest.REPORT_NDIFF
+
+ return doctest.DocFileSuite(*paths, **kw)
+
+if __name__ == '__main__':
+ unittest.main()
Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/placelesssetup.py (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/placelesssetup.py)
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/tests/placelesssetup.py 2004-12-17 21:36:22 UTC (rev 28644)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/placelesssetup.py 2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1,69 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Unit test logic for setting up and tearing down basic infrastructure
+
+$Id$
+"""
+from zope.app.testing import ztapi
+from zope.schema.vocabulary import setVocabularyRegistry
+from zope.component.tests.placelesssetup \
+ import PlacelessSetup as CAPlacelessSetup
+from zope.app.event.tests.placelesssetup \
+ import PlacelessSetup as EventPlacelessSetup
+from zope.app.i18n.tests.placelesssetup \
+ import PlacelessSetup as I18nPlacelessSetup
+from zope.app.container.tests.placelesssetup \
+ import PlacelessSetup as ContainerPlacelessSetup
+from zope.app.security._protections import protect
+from zope.app.traversing.browser.interfaces import IAbsoluteURL
+from zope.app.traversing.browser.absoluteurl import AbsoluteURL
+
+class PlacelessSetup(CAPlacelessSetup,
+ EventPlacelessSetup,
+ I18nPlacelessSetup,
+ ContainerPlacelessSetup
+ ):
+
+ def setUp(self, doctesttest=None):
+ CAPlacelessSetup.setUp(self)
+ ContainerPlacelessSetup.setUp(self)
+ EventPlacelessSetup.setUp(self)
+ I18nPlacelessSetup.setUp(self)
+ # Register app-specific security declarations
+ protect()
+
+ ztapi.browserView(None, 'absolute_url', AbsoluteURL)
+ ztapi.browserViewProviding(None, AbsoluteURL, IAbsoluteURL)
+
+ from zope.app.security.tests import addCheckerPublic
+ addCheckerPublic()
+
+ from zope.security.management import newInteraction
+ newInteraction()
+
+ setVocabularyRegistry(None)
+
+
+ps = PlacelessSetup()
+setUp = ps.setUp
+
+def tearDown():
+ tearDown_ = ps.tearDown
+ def tearDown(doctesttest=None):
+ tearDown_()
+ return tearDown
+
+tearDown = tearDown()
+
+del ps
Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/recorded (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/recorded)
Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/setup.py (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/setup.py)
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/tests/setup.py 2004-12-17 21:36:22 UTC (rev 28644)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/setup.py 2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1,207 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Setting up an environment for testing context-dependent objects
+
+$Id$
+"""
+
+import zope.component
+import zope.interface
+from zope.app import zapi
+from zope.app.testing import ztapi
+from zope.interface import classImplements
+
+#------------------------------------------------------------------------
+# Annotations
+from zope.app.annotation.attribute import AttributeAnnotations
+from zope.app.annotation.interfaces import IAnnotations
+from zope.app.annotation.interfaces import IAttributeAnnotatable
+def setUpAnnotations():
+ ztapi.provideAdapter(IAttributeAnnotatable, IAnnotations,
+ AttributeAnnotations)
+
+#------------------------------------------------------------------------
+# Dependencies
+from zope.app.dependable import Dependable
+from zope.app.dependable.interfaces import IDependable
+def setUpDependable():
+ ztapi.provideAdapter(IAttributeAnnotatable, IDependable,
+ Dependable)
+
+#------------------------------------------------------------------------
+# Traversal
+from zope.app.traversing.browser.interfaces import IAbsoluteURL
+from zope.app.container.traversal import ContainerTraversable
+from zope.app.container.interfaces import ISimpleReadContainer
+from zope.app.traversing.interfaces import IContainmentRoot
+from zope.app.traversing.interfaces import IPhysicallyLocatable
+from zope.app.traversing.interfaces import ITraverser, ITraversable
+from zope.app.traversing.adapters import DefaultTraversable
+from zope.app.traversing.adapters import Traverser, RootPhysicallyLocatable
+from zope.app.location.traversing import LocationPhysicallyLocatable
+from zope.app.traversing.namespace import etc
+
+def setUpTraversal():
+ from zope.app.traversing.browser import SiteAbsoluteURL, AbsoluteURL
+
+ ztapi.provideAdapter(None, ITraverser, Traverser)
+ ztapi.provideAdapter(None, ITraversable, DefaultTraversable)
+
+ ztapi.provideAdapter(
+ ISimpleReadContainer, ITraversable, ContainerTraversable)
+ ztapi.provideAdapter(
+ None, IPhysicallyLocatable, LocationPhysicallyLocatable)
+ ztapi.provideAdapter(
+ IContainmentRoot, IPhysicallyLocatable, RootPhysicallyLocatable)
+
+ # set up etc namespace
+ ztapi.provideAdapter(None, ITraversable, etc, name="etc")
+ ztapi.provideView(None, None, ITraversable, "etc", etc)
+
+ ztapi.browserView(None, "absolute_url", AbsoluteURL)
+ ztapi.browserView(IContainmentRoot, "absolute_url", SiteAbsoluteURL)
+
+ ztapi.browserView(None, '', AbsoluteURL, providing=IAbsoluteURL)
+ ztapi.browserView(IContainmentRoot, '', SiteAbsoluteURL,
+ providing=IAbsoluteURL)
+
+
+#------------------------------------------------------------------------
+# Use registration
+from zope.app.registration.interfaces import IAttributeRegisterable
+from zope.app.registration.interfaces import IRegistered
+from zope.app.registration.registration import Registered
+def setUpRegistered():
+ ztapi.provideAdapter(IAttributeRegisterable, IRegistered,
+ Registered)
+
+#------------------------------------------------------------------------
+# Service service lookup
+from zope.app.component.localservice import serviceServiceAdapter
+from zope.app.registration.interfaces import IRegistrationActivatedEvent
+from zope.app.registration.interfaces import IRegistrationDeactivatedEvent
+from zope.app.site.service import handleActivated, handleDeactivated
+from zope.component.interfaces import IServiceService
+from zope.interface import Interface
+def setUpServiceService():
+ ztapi.subscribe((IRegistrationActivatedEvent,), None, handleActivated)
+ ztapi.subscribe((IRegistrationDeactivatedEvent,), None, handleDeactivated)
+ ztapi.provideAdapter(Interface, IServiceService, serviceServiceAdapter)
+
+#------------------------------------------------------------------------
+# Placeful setup
+import zope.app.component.hooks
+from zope.app.testing.placelesssetup import setUp as placelessSetUp
+from zope.app.testing.placelesssetup import tearDown as placelessTearDown
+def placefulSetUp(site=False):
+ placelessSetUp()
+ zope.app.component.hooks.setHooks()
+ setUpAnnotations()
+ setUpDependable()
+ setUpTraversal()
+ setUpRegistered()
+ setUpServiceService()
+
+ if site:
+ site = rootFolder()
+ createServiceManager(site, setsite=True)
+ return site
+
+from zope.app.component.hooks import setSite
+def placefulTearDown():
+ placelessTearDown()
+ zope.app.component.hooks.resetHooks()
+ setSite()
+
+
+from zope.app.folder import Folder, rootFolder
+
+def buildSampleFolderTree():
+ # set up a reasonably complex folder structure
+ #
+ # ____________ rootFolder ____________
+ # / \
+ # folder1 __________________ folder2
+ # | \ |
+ # folder1_1 ____ folder1_2 folder2_1
+ # | \ | |
+ # folder1_1_1 folder1_1_2 folder1_2_1 folder2_1_1
+
+ root = rootFolder()
+ root['folder1'] = Folder()
+ root['folder1']['folder1_1'] = Folder()
+ root['folder1']['folder1_1']['folder1_1_1'] = Folder()
+ root['folder1']['folder1_1']['folder1_1_2'] = Folder()
+ root['folder1']['folder1_2'] = Folder()
+ root['folder1']['folder1_2']['folder1_2_1'] = Folder()
+ root['folder2'] = Folder()
+ root['folder2']['folder2_1'] = Folder()
+ root['folder2']['folder2_1']['folder2_1_1'] = Folder()
+
+ return root
+
+
+from zope.app.site.service import ServiceManager
+from zope.app.site.interfaces import ISite
+def createServiceManager(folder, setsite=False):
+ if not ISite.providedBy(folder):
+ folder.setSiteManager(ServiceManager(folder))
+ if setsite:
+ setSite(folder)
+ return zapi.traverse(folder, "++etc++site")
+
+from zope.app.site.service import ServiceRegistration
+from zope.app.site.interfaces import ISimpleService
+from zope.app.registration.interfaces import ActiveStatus
+
+def addService(servicemanager, name, service, suffix=''):
+ """Add a service to a service manager
+
+ This utility is useful for tests that need to set up services.
+ """
+ # Most local services implement ISimpleService in ZCML; therefore make
+ # sure we got it here as well.
+ zope.interface.directlyProvides(service, ISimpleService)
+
+ default = zapi.traverse(servicemanager, 'default')
+ default[name+suffix] = service
+ registration = ServiceRegistration(name, service, default)
+ key = default.getRegistrationManager().addRegistration(registration)
+ zapi.traverse(default.getRegistrationManager(), key).status = ActiveStatus
+ return default[name+suffix]
+
+from zope.app.utility import UtilityRegistration
+
+def addUtility(servicemanager, name, iface, utility, suffix=''):
+ """Add a utility to a service manager
+
+ This utility is useful for tests that need to set up utilities.
+ """
+
+ folder_name = (name or (iface.__name__ + 'Utility')) + suffix
+ default = zapi.traverse(servicemanager, 'default')
+ default[folder_name] = utility
+ registration = UtilityRegistration(name, iface, default[folder_name])
+ key = default.getRegistrationManager().addRegistration(registration)
+ zapi.traverse(default.getRegistrationManager(), key).status = ActiveStatus
+ return default[folder_name]
+
+def createStandardServices(folder):
+ '''Create a bunch of standard placeful services
+
+ Well, uh, 0
+ '''
+ sm = createServiceManager(folder)
+
+
Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/test.py (from rev 28645, Zope3/branches/srichter-blow-services/src/zope/app/tests/test.py)
Added: Zope3/branches/srichter-blow-services/src/zope/app/testing/tests.py
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/testing/tests.py 2004-12-21 16:39:25 UTC (rev 28668)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/tests.py 2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1,205 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Test tcpdoc
+
+$Id$
+"""
+import os
+import unittest
+import StringIO
+
+from zope.testing.doctestunit import DocTestSuite
+
+import zope.app.testing
+from zope.app.testing import functional
+from zope.app.testing.dochttp import dochttp
+
+HEADERS = """\
+HTTP/1.1 200 Ok
+Content-Type: text/plain
+"""
+
+BODY = """\
+This is the response body.
+"""
+
+directory = os.path.join(os.path.split(zope.app.testing.__file__)[0],
+ 'recorded')
+
+expected = r'''
+
+ >>> print http(r"""
+ ... GET /@@contents.html HTTP/1.1
+ ... """)
+ HTTP/1.1 401 Unauthorized
+ Content-Length: 89
+ Content-Type: text/html;charset=utf-8
+ Www-Authenticate: basic realm=zope
+ <BLANKLINE>
+ <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+ lang="en">
+ <BLANKLINE>
+ ...
+ <BLANKLINE>
+ </html>
+ <BLANKLINE>
+ <BLANKLINE>
+
+
+ >>> print http(r"""
+ ... GET /@@contents.html HTTP/1.1
+ ... Authorization: Basic bWdyOm1ncnB3
+ ... """)
+ HTTP/1.1 200 Ok
+ Content-Length: 89
+ Content-Type: text/html;charset=utf-8
+ <BLANKLINE>
+ <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+ lang="en">
+ <BLANKLINE>
+ ...
+ <BLANKLINE>
+ </html>
+ <BLANKLINE>
+ <BLANKLINE>
+
+
+ >>> print http(r"""
+ ... GET /++etc++site/@@manage HTTP/1.1
+ ... Authorization: Basic bWdyOm1ncnB3
+ ... Referer: http://localhost:8081/
+ ... """)
+ HTTP/1.1 303 See Other
+ Content-Length: 0
+ Content-Type: text/plain;charset=utf-8
+ Location: @@tasks.html
+ <BLANKLINE>
+
+
+ >>> print http(r"""
+ ... GET / HTTP/1.1
+ ... Authorization: Basic bWdyOm1ncnB3
+ ... """)
+ HTTP/1.1 200 Ok
+ Content-Length: 89
+ Content-Type: text/html;charset=utf-8
+ <BLANKLINE>
+ <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+ lang="en">
+ <BLANKLINE>
+ ...
+ <BLANKLINE>
+ </html>
+ <BLANKLINE>
+ <BLANKLINE>
+
+
+ >>> print http(r"""
+ ... GET /++etc++site/@@tasks.html HTTP/1.1
+ ... Authorization: Basic bWdyOm1ncnB3
+ ... Referer: http://localhost:8081/
+ ... """)
+ HTTP/1.1 200 Ok
+ Content-Length: 89
+ Content-Type: text/html;charset=utf-8
+ <BLANKLINE>
+ <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
+ lang="en">
+ <BLANKLINE>
+ ...
+ <BLANKLINE>
+ </html>
+ <BLANKLINE>
+ <BLANKLINE>
+'''
+
+class FunctionalHTTPDocTest(unittest.TestCase):
+
+ def test_dochttp(self):
+ import sys, StringIO
+ old = sys.stdout
+ sys.stdout = StringIO.StringIO()
+ dochttp(['-p', 'test', directory])
+ got = sys.stdout.getvalue()
+ sys.stdout = old
+ self.assert_(got == expected)
+
+
+class DocResponseWrapperTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.body_output = StringIO.StringIO()
+ self.path = "/foo/bar/"
+ self.response = object()
+
+ self.wrapper = functional.DocResponseWrapper(
+ self.response, self.body_output, self.path, HEADERS)
+
+ def test__str__(self):
+ self.assertEqual(str(self.wrapper),
+ HEADERS + "\n")
+ self.body_output.write(BODY)
+ self.assertEqual(str(self.wrapper),
+ "%s\n\n%s" % (HEADERS, BODY))
+
+ def test_getBody(self):
+ self.assertEqual(self.wrapper.getBody(), "")
+ self.body_output.write(BODY)
+ self.assertEqual(self.wrapper.getBody(), BODY)
+
+ def test_getOutput(self):
+ self.assertEqual(self.wrapper.getOutput(), "")
+ self.body_output.write(BODY)
+ self.assertEqual(self.wrapper.getOutput(), BODY)
+
+
+class AuthHeaderTestCase(unittest.TestCase):
+
+ def test_auth_encoded(self):
+ auth_header = functional.auth_header
+ header = 'Basic Z2xvYmFsbWdyOmdsb2JhbG1ncnB3'
+ self.assertEquals(auth_header(header), header)
+
+ def test_auth_non_encoded(self):
+ auth_header = functional.auth_header
+ header = 'Basic globalmgr:globalmgrpw'
+ expected = 'Basic Z2xvYmFsbWdyOmdsb2JhbG1ncnB3'
+ self.assertEquals(auth_header(header), expected)
+
+ def test_auth_non_encoded_empty(self):
+ auth_header = functional.auth_header
+ header = 'Basic globalmgr:'
+ expected = 'Basic Z2xvYmFsbWdyOg=='
+ self.assertEquals(auth_header(header), expected)
+ header = 'Basic :pass'
+ expected = 'Basic OnBhc3M='
+ self.assertEquals(auth_header(header), expected)
+
+ def test_auth_non_encoded_colon(self):
+ auth_header = zope.app.testing.functional.auth_header
+ header = 'Basic globalmgr:pass:pass'
+ expected = 'Basic Z2xvYmFsbWdyOnBhc3M6cGFzcw=='
+ self.assertEquals(auth_header(header), expected)
+
+
+def test_suite():
+ return unittest.TestSuite((
+ unittest.makeSuite(FunctionalHTTPDocTest),
+ unittest.makeSuite(DocResponseWrapperTestCase),
+ unittest.makeSuite(AuthHeaderTestCase)
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
+
Copied: Zope3/branches/srichter-blow-services/src/zope/app/testing/ztapi.py (from rev 28644, Zope3/branches/srichter-blow-services/src/zope/app/tests/ztapi.py)
===================================================================
--- Zope3/branches/srichter-blow-services/src/zope/app/tests/ztapi.py 2004-12-17 21:36:22 UTC (rev 28644)
+++ Zope3/branches/srichter-blow-services/src/zope/app/testing/ztapi.py 2004-12-21 16:55:26 UTC (rev 28669)
@@ -0,0 +1,101 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Testing helper functions
+
+$Id$
+"""
+import zope.interface
+from zope.component.interfaces import IDefaultViewName
+from zope.publisher.browser import IBrowserRequest
+from zope.publisher.interfaces.browser import IDefaultBrowserLayer
+from zope.app import zapi
+from zope.app.traversing.interfaces import ITraversable
+
+def provideView(for_, type, providing, name, factory, layer=None):
+ if layer is None:
+ layer = type
+ provideAdapter(for_, providing, factory, name, (layer,))
+
+def provideMultiView(for_, type, providing, name, factory, layer=None):
+ if layer is None:
+ layer = type
+ provideAdapter(for_[0], providing, factory, name, tuple(for_[1:])+(layer,))
+
+def browserView(for_, name, factory, layer=IDefaultBrowserLayer,
+ providing=zope.interface.Interface):
+ """Define a global browser view
+ """
+ if isinstance(factory, (list, tuple)):
+ raise ValueError("Factory cannot be a list or tuple")
+ provideAdapter(for_, providing, factory, name, (layer,))
+
+def browserViewProviding(for_, factory, providing, layer=IDefaultBrowserLayer):
+ """Define a view providing a particular interface."""
+ if isinstance(factory, (list, tuple)):
+ raise ValueError("Factory cannot be a list or tuple")
+ return browserView(for_, '', factory, layer, providing)
+
+def browserResource(name, factory, layer=IDefaultBrowserLayer,
+ providing=zope.interface.Interface):
+ """Define a global browser view
+ """
+ if isinstance(factory, (list, tuple)):
+ raise ValueError("Factory cannot be a list or tuple")
+ provideAdapter((layer,), providing, factory, name)
+
+def setDefaultViewName(for_, name, layer=IDefaultBrowserLayer,
+ type=IBrowserRequest):
+ if layer is None:
+ layer = type
+ gsm = zapi.getGlobalSiteManager()
+ gsm.provideAdapter((for_, layer), IDefaultViewName, '', name)
+
+stypes = list, tuple
+def provideAdapter(required, provided, factory, name='', with=()):
+ if isinstance(factory, (list, tuple)):
+ raise ValueError("Factory cannot be a list or tuple")
+ gsm = zapi.getGlobalSiteManager()
+
+ if with:
+ required = (required, ) + tuple(with)
+ elif not isinstance(required, stypes):
+ required = (required,)
+
+ gsm.provideAdapter(required, provided, name, factory)
+
+def subscribe(required, provided, factory):
+ gsm = zapi.getGlobalSiteManager()
+ gsm.subscribe(required, provided, factory)
+
+def handle(required, handler):
+ subscribe(required, None, handler)
+
+def provideUtility(provided, component, name=''):
+ gsm = zapi.getGlobalSiteManager()
+ gsm.provideUtility(provided, component, name)
+
+def unprovideUtility(provided, name=''):
+ gsm = zapi.getGlobalSiteManager()
+ gsm.provideAdapter((), provided, name, None)
+
+def provideNamespaceHandler(name, handler):
+ provideAdapter(None, ITraversable, handler, name=name)
+ provideView(None, None, ITraversable, name, handler)
+
+def provideService(name, service, interface=None):
+ services = zapi.getGlobalServices()
+ if interface is not None:
+ services.defineService(name, interface)
+ services.provideService(name, service)
+
More information about the Zope3-Checkins
mailing list