[Zope3-checkins] CVS: Zope3/src/zope/app/publication/tests - test_zopepublication.py:1.29

Philipp von Weitershausen philikon at philikon.de
Sat Mar 20 08:37:27 EST 2004


Update of /cvs-repository/Zope3/src/zope/app/publication/tests
In directory cvs.zope.org:/tmp/cvs-serv594/src/zope/app/publication/tests

Modified Files:
	test_zopepublication.py 
Log Message:


Provide some tests that should have been provided much earlier.
Added test for transaction annotation.
See http://dev.zope.org/Zope3/SimplifyUndoModel.




=== Zope3/src/zope/app/publication/tests/test_zopepublication.py 1.28 => 1.29 ===
--- Zope3/src/zope/app/publication/tests/test_zopepublication.py:1.28	Wed Mar 17 13:24:26 2004
+++ Zope3/src/zope/app/publication/tests/test_zopepublication.py	Sat Mar 20 08:37:27 2004
@@ -19,15 +19,19 @@
 import sys
 from cStringIO import StringIO
 
-from ZODB.tests.util import DB
+from persistent import Persistent
+from ZODB.DB import DB
+from ZODB.DemoStorage import DemoStorage
 
 from zope.interface.verify import verifyClass
 from zope.interface import implements, classImplements, implementedBy
 from zope.i18n.interfaces import IUserPreferredCharsets
+from zope.component import getServiceManager
 from zope.component.interfaces import IServiceService
-from zope.publisher.base import TestPublication
+from zope.publisher.base import TestPublication, TestRequest
 from zope.publisher.http import IHTTPRequest, HTTPCharsets
 from zope.publisher.interfaces import IRequest, IPublishTraverse
+from zope.publisher.browser import BrowserResponse
 from zope.security import simplepolicies
 from zope.security.management import setSecurityPolicy, getSecurityManager
 
@@ -35,51 +39,13 @@
 from zope.app.tests.placelesssetup import PlacelessSetup
 from zope.app.tests import ztapi
 
-from zope.app.servicenames import Authentication
+from zope.app.errorservice.interfaces import IErrorReportingService
+from zope.app.servicenames import ErrorLogging, Authentication
+from zope.app.location.interfaces import ILocation
 from zope.app.security.principalregistry import principalRegistry
 from zope.app.security.interfaces import IUnauthenticatedPrincipal, IPrincipal
 from zope.app.publication.zopepublication import ZopePublication
 from zope.app.folder import Folder, rootFolder
-from zope.publisher.base import TestRequest
-from zope.publisher.browser import BrowserResponse
-
-class BasePublicationTests(PlacelessSetup, unittest.TestCase):
-
-    def setUp(self):
-        super(BasePublicationTests, self).setUp()
-        ztapi.provideAdapter(IHTTPRequest, IUserPreferredCharsets,
-                             HTTPCharsets)
-        self.policy = setSecurityPolicy(
-            simplepolicies.PermissiveSecurityPolicy()
-            )
-        self.db = DB()
-
-        connection = self.db.open()
-        root = connection.root()
-        app = getattr(root, ZopePublication.root_name, None)
-
-        if app is None:
-            from zope.app.folder import rootFolder
-            app = rootFolder()
-            root[ZopePublication.root_name] = app
-
-            get_transaction().commit()
-
-        connection.close()
-
-        from zope.app.traversing.namespace import provideNamespaceHandler
-        from zope.app.traversing.namespace import view, resource, etc
-        provideNamespaceHandler('view', view)
-        provideNamespaceHandler('resource', resource)
-        provideNamespaceHandler('etc', etc)
-
-    def tearDown(self):
-        setSecurityPolicy(self.policy) # XXX still needed?
-        PlacelessSetup.tearDown(self)
-
-    def testInterfacesVerify(self):
-        for interface in implementedBy(ZopePublication):
-            verifyClass(interface, TestPublication)
 
 class Principal:
     implements(IPrincipal)
@@ -91,7 +57,6 @@
 class UnauthenticatedPrincipal(Principal):
     implements(IUnauthenticatedPrincipal)
 
-
 class AuthService1:
 
     def authenticate(self, request):
@@ -106,7 +71,6 @@
     def getPrincipal(self, id):
         return UnauthenticatedPrincipal(id)
 
-
 class AuthService2(AuthService1):
 
     def authenticate(self, request):
@@ -115,9 +79,16 @@
     def getPrincipal(self, id):
         return Principal(id)
 
+class ErrorLoggingService:
+    implements(IErrorReportingService)
 
-class ServiceManager:
+    def __init__(self):
+        self.exceptions = []
+
+    def raising(self, info, request=None):
+        self.exceptions.append([info, request])
 
+class ServiceManager:
     implements(IServiceService) # a dirty lie
 
     def __init__(self, auth):
@@ -129,18 +100,61 @@
         else:
             return default
 
+class LocatableObject(Persistent):
+    implements(ILocation)
+    __parent__ = None
+    __name__ = None
 
-class ZopePublicationErrorHandling(BasePublicationTests):
+class BasePublicationTests(PlacelessSetup, unittest.TestCase):
 
     def setUp(self):
-        BasePublicationTests.setUp(self)
+        super(BasePublicationTests, self).setUp()
+        ztapi.provideAdapter(IHTTPRequest, IUserPreferredCharsets,
+                             HTTPCharsets)
+        self.policy = setSecurityPolicy(
+            simplepolicies.PermissiveSecurityPolicy()
+            )
+        self.storage = DemoStorage('test_storage')
+        self.db = db = DB(self.storage)
+
+        connection = db.open()
+        root = connection.root()
+        app = getattr(root, ZopePublication.root_name, None)
+
+        if app is None:
+            from zope.app.folder import rootFolder
+            app = rootFolder()
+            root[ZopePublication.root_name] = app
+            get_transaction().commit()
+
+        connection.close()
+        self.app = app
+
+        from zope.app.traversing.namespace import provideNamespaceHandler
+        from zope.app.traversing.namespace import view, resource, etc
+        provideNamespaceHandler('view', view)
+        provideNamespaceHandler('resource', resource)
+        provideNamespaceHandler('etc', etc)
+
         self.out = StringIO()
         self.request = TestRequest('/f1/f2', outstream=self.out)
+        self.user = Principal('test.principal')
+        self.request.setUser(self.user)
         from zope.interface import Interface
         self.presentation_type = Interface
         self.request._presentation_type = self.presentation_type
+        self.object = object()
         self.publication = ZopePublication(self.db)
-        self.object = object()  # doesn't matter what it is
+
+    def tearDown(self):
+        setSecurityPolicy(self.policy) # XXX still needed?
+        super(BasePublicationTests, self).tearDown()
+
+    def testInterfacesVerify(self):
+        for interface in implementedBy(ZopePublication):
+            verifyClass(interface, TestPublication)
+
+class ZopePublicationErrorHandling(BasePublicationTests):
 
     def testRetryAllowed(self):
         from ZODB.POSException import ConflictError
@@ -263,9 +277,71 @@
         self.publication.handleException(
             self.object, self.request, sys.exc_info(), retry_allowed=False)
         self.request.response.outputBody()
-        self.assertEqual(self.request.response.getHeader('Content-Type'), 'text/html')
+        self.assertEqual(self.request.response.getHeader('Content-Type'),
+                         'text/html')
         self.assertEqual(self.request.response._cookies, {})
 
+    def testAbortOrCommitTransaction(self):
+        txn = get_transaction()
+        try:
+            raise Exception
+        except:
+            pass
+        self.publication.handleException(
+            self.object, self.request, sys.exc_info(), retry_allowed=False)
+        # assert that we get a new transaction
+        self.assert_(txn is not get_transaction())
+
+    def testCommitTransactionNoErrorLoggingService(self):
+        # XXX This test is disabled because it would fail.
+
+        # Though the publication commits a transaction with a note in
+        # case the error logging service is absent, the transaction is
+        # not logged in the undo log because no persistent objects are
+        # modified in it.
+        return
+
+        root = self.db.open().root()
+        last_txn_info = self.db.undoInfo()[0]
+
+        try:
+            raise Exception
+        except:
+            pass
+        self.publication.handleException(
+            self.object, self.request, sys.exc_info(), retry_allowed=False)
+
+        new_txn_info = self.db.undoInfo()[0]
+        self.failIfEqual(last_txn_info, new_txn_info)
+
+    def testAbortTransactionWithErrorLoggingService(self):
+        # provide our fake error logging service
+        sm = getServiceManager(None)
+        sm.defineService(ErrorLogging, IErrorReportingService)
+        sm.provideService(ErrorLogging, ErrorLoggingService())
+
+        class FooError(Exception):
+            pass
+
+        last_txn_info = self.db.undoInfo()[0]
+        try:
+            raise FooError
+        except:
+            pass
+        self.publication.handleException(
+            self.object, self.request, sys.exc_info(), retry_allowed=False)
+
+        # assert that the last transaction is NOT our transaction
+        new_txn_info = self.db.undoInfo()[0]
+        self.assertEqual(last_txn_info, new_txn_info)
+
+        # instead, we expect a message in our logging service
+        error_log = sm.getService(ErrorLogging)
+        self.assertEqual(len(error_log.exceptions), 1)
+        error_info, request = error_log.exceptions[0]
+        self.assertEqual(error_info[0], FooError)
+        self.assert_(isinstance(error_info[1], FooError))
+        self.assert_(request is self.request)
 
 class ZopePublicationTests(BasePublicationTests):
 
@@ -282,8 +358,6 @@
         f2.setSiteManager(ServiceManager(AuthService2()))
         get_transaction().commit()
 
-        request = TestRequest('/f1/f2')
-
         from zope.app.container.interfaces import ISimpleReadContainer
         from zope.app.container.traversal import ContainerTraverser
 
@@ -295,24 +369,63 @@
         from zope.security.checker import defineChecker, InterfaceChecker
         defineChecker(Folder, InterfaceChecker(IFolder))
 
-        publication = ZopePublication(self.db)
-
-        publication.beforeTraversal(request)
+        self.publication.beforeTraversal(self.request)
         user = getSecurityManager().getPrincipal()
-        self.assertEqual(user, request.user)
-        self.assertEqual(request.user.id, 'anonymous')
-        root = publication.getApplication(request)
-        publication.callTraversalHooks(request, root)
-        self.assertEqual(request.user.id, 'anonymous')
-        ob = publication.traverseName(request, root, 'f1')
-        publication.callTraversalHooks(request, ob)
-        self.assertEqual(request.user.id, 'test.anonymous')
-        ob = publication.traverseName(request, ob, 'f2')
-        publication.afterTraversal(request, ob)
-        self.assertEqual(request.user.id, 'test.bob')
+        self.assertEqual(user, self.request.user)
+        self.assertEqual(self.request.user.id, 'anonymous')
+        root = self.publication.getApplication(self.request)
+        self.publication.callTraversalHooks(self.request, root)
+        self.assertEqual(self.request.user.id, 'anonymous')
+        ob = self.publication.traverseName(self.request, root, 'f1')
+        self.publication.callTraversalHooks(self.request, ob)
+        self.assertEqual(self.request.user.id, 'test.anonymous')
+        ob = self.publication.traverseName(self.request, ob, 'f2')
+        self.publication.afterTraversal(self.request, ob)
+        self.assertEqual(self.request.user.id, 'test.bob')
         user = getSecurityManager().getPrincipal()
-        self.assertEqual(user, request.user)
-        
+        self.assertEqual(user, self.request.user)
+
+    def testTransactionCommitAfterCall(self):
+        root = self.db.open().root()
+        txn = get_transaction()
+        # we just need a change in the database to make the
+        # transaction notable in the undo log
+        root['foo'] = object()
+        last_txn_info = self.db.undoInfo()[0]
+        self.publication.afterCall(self.request, self.object)
+        self.assert_(txn is not get_transaction())
+        new_txn_info = self.db.undoInfo()[0]
+        self.failIfEqual(last_txn_info, new_txn_info)
+
+    def testTransactionAnnotation(self):
+        from zope.interface import directlyProvides
+        from zope.app.location import LocationPhysicallyLocatable
+        from zope.app.location.interfaces import ILocation
+        from zope.app.traversing.interfaces import IPhysicallyLocatable
+        from zope.app.traversing.interfaces import IContainmentRoot
+        ztapi.provideAdapter(ILocation, IPhysicallyLocatable,
+                             LocationPhysicallyLocatable)
+
+        root = self.db.open().root()
+        root['foo'] = foo = LocatableObject()
+        root['bar'] = bar = LocatableObject()
+        bar.__name__ = 'bar'
+        foo.__name__ = 'foo'
+        bar.__parent__ = foo
+        foo.__parent__ = root
+        directlyProvides(root, IContainmentRoot)
+
+        self.publication.afterCall(self.request, bar)
+
+        from zope.publisher.interfaces import IRequest
+        expected_path = "/foo/bar"
+        expected_user = "/ " + self.user.id
+        expected_request = IRequest.__module__ + '.' + IRequest.getName()
+
+        txn_info = self.db.undoInfo()[0]
+        self.assertEqual(txn_info['location'], expected_path)
+        self.assertEqual(txn_info['user_name'], expected_user)
+        self.assertEqual(txn_info['request_type'], expected_request)
 
 def test_suite():
     return unittest.TestSuite((




More information about the Zope3-Checkins mailing list