[Zope-CVS] CVS: Products/PluggableAuthService/tests - test_Caching.py:1.1.2.1

Jens Vagelpohl jens at dataflake.org
Sat Nov 6 06:38:04 EST 2004


Update of /cvs-repository/Products/PluggableAuthService/tests
In directory cvs.zope.org:/tmp/cvs-serv5131/tests

Added Files:
      Tag: jens-implement_caching_branch
	test_Caching.py 
Log Message:
- tests for caching in the PAS instance


=== Added File Products/PluggableAuthService/tests/test_Caching.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
# Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
# distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import unittest

from Acquisition import Implicit, aq_base, aq_parent
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManager import setSecurityPolicy
from OFS.ObjectManager import ObjectManager
from OFS.Folder import Folder
from OFS.Cache import isCacheable
from zExceptions import Unauthorized, Redirect

from Products.PluggableAuthService.utils import directlyProvides
from Products.PluggableAuthService.PASCache import PASRAMCacheManager

class FauxRequest:

    def __init__( self, steps=(), **kw ):

        self.steps = steps
        self._dict = {}
        self._dict.update( kw )

    def get( self, key, default=None ):

        return self._dict.get( key, default )

    def _authUserPW( self ):
        form = self.get( 'form' )
        return ( form.get( 'login' ), form.get( 'password' ) )

    def __getitem__( self, key ):

        return self._dict[ key ]

    def __setitem__( self, key, value ):

        self._dict[ key ] = value

class PluggableAuthServiceCachingTests( unittest.TestCase ):

    def tearDown( self ):
        pass


    def _getTargetClass( self ):

        from Products.PluggableAuthService.PluggableAuthService \
            import PluggableAuthService

        return PluggableAuthService

    def _makeOne( self, plugins=None, *args, **kw ):

        zcuf = self._getTargetClass()( *args, **kw )

        if plugins is not None:
            zcuf._setObject( 'plugins', plugins )

        rcm = PASRAMCacheManager('ramcache')
        zcuf._setObject('ramcache', rcm)

        return zcuf

    def _makePlugins( self, plugin_type_info=None ):

        from Products.PluggableAuthService.PluggableAuthService \
            import _PLUGIN_TYPE_INFO
        from Products.PluginRegistry.PluginRegistry import PluginRegistry

        if plugin_type_info is None:
            plugin_type_info = _PLUGIN_TYPE_INFO

        reg = PluginRegistry( plugin_type_info=plugin_type_info )
        reg._setId( 'plugins' )
        reg._plugins = {}

        return reg

    def _makeTree( self ):

        rc = FauxObject( 'rc' )
        root = FauxRoot( 'root' ).__of__( rc )
        folder = FauxContainer( 'folder' ).__of__( root )
        object = FauxObject( 'object' ).__of__( folder )

        return rc, root, folder, object

    def _makeAndFill(self):

        from Products.PluggableAuthService.plugins import ZODBUserManager
        from Products.PluggableAuthService.plugins import ZODBRoleManager

        plugin_registry = self._makePlugins()
        user_source = ZODBUserManager.ZODBUserManager('zodb_users')
        roles_source = ZODBRoleManager.ZODBRoleManager('zodb_roles')
        pas_instance = self._makeOne(plugins=plugin_registry)
        pas_instance._setObject('zodb_users', user_source)
        pas_instance._setObject('zodb_roles', roles_source)

        return pas_instance

    def test_empty( self ):
        zcuf = self._makeOne()
        rcm = getattr(zcuf, 'ramcache')

        # This is needed because some underlying ZCacheable code wants to
        # use self.REQUEST :/
        setattr(rcm, 'REQUEST', FauxRequest())

        mt_info = zcuf.all_meta_types()
        mt_types = [x['name'] for x in mt_info]
        
        # Make sure that the PASRAMCacheManager can be instantiated through the ZMI
        self.assert_(PASRAMCacheManager.meta_type in mt_types)

        # Make sure the PAS instance itself is Cacheable
        self.assert_(isCacheable(zcuf))

        # Make sure the PAS instance is not associated with any cache manager
        # by default
        self.assert_(zcuf.ZCacheable_getManager() is None)

        # Make sure the PASRAMCacheManager is empty
        self.assert_(len(rcm.getCacheReport()) == 0)

    def test_caching_in_PAS(self):
        zcuf = self._makeAndFill()
        rcm = getattr(zcuf, 'ramcache')
        plugin_registry = getattr(zcuf, 'plugins')
        user_source = getattr(zcuf, 'zodb_users')
        roles_source = getattr(zcuf, 'zodb_roles')

        # This is needed because some underlying ZCacheable code wants to
        # use self.REQUEST :/
        setattr(zcuf, 'REQUEST', FauxRequest())

        # First, we register the ZODBUserManager as a plugin suitable
        # for storing and returning user objects and the ZODBRoleManager
        # for roles. Basic scaffolding to be able to store and retrieve users.
        from Products.PluggableAuthService.interfaces import plugins

        plugin_registry.activatePlugin( plugins.IUserEnumerationPlugin
                                      , user_source.getId()
                                      )
        plugin_registry.activatePlugin( plugins.IUserAdderPlugin
                                      , user_source.getId()
                                      )
        plugin_registry.activatePlugin( plugins.IRolesPlugin
                                      , roles_source.getId()
                                      )
        plugin_registry.activatePlugin( plugins.IRoleEnumerationPlugin
                                      , roles_source.getId()
                                      )
        plugin_registry.activatePlugin( plugins.IRoleAssignerPlugin
                                      , roles_source.getId()
                                      )

        # Now add a user and make sure it's there
        zcuf._doAddUser('testlogin', 'secret', ['Member', 'Anonymous'], [])
        self.failIf(zcuf.getUser('testlogin') is None)

        # Then we activate caching for the PAS instance itself
        zcuf.ZCacheable_setManagerId(rcm.getId())

        # Make sure the PAS instance is associated with the cache
        self.failUnless(aq_base(zcuf.ZCacheable_getManager()) is aq_base(rcm))

        # Now we can see if the cache is getting used. Test for emptiness
        # first, then retrieve a user, and the cache should have content.
        # Then test again to see if the cache entries are being used.
        # This is a bit nasty because I am relying on knowing the structure
        # of the cache report, which is really an internal implementation
        # detail.

        # First check: The cache must be empty
        report = rcm.getCacheReport()
        self.failUnless(len(report) == 0)

        # The user is being requested once. At this point there must be one
        # entry for the PAS instance. The number of "misses" must be >0 because
        # the first cache check will have failed. The number of cache hits must
        # be zero.
        zcuf.getUser('testlogin')
        report = rcm.getCacheReport()
        self.failUnless(len(report) == 1)
        report_item = report[0]
        firstpass_misses = report_item.get('misses')
        firstpass_hits = report_item.get('hits')
        firstpass_entries = report_item.get('entries')
        self.failUnless(firstpass_misses > 0)
        self.failUnless(firstpass_hits == 0)

        # The user is requested again. This request should produce a cache hit,
        # so the number of "misses" must have stayed the same as after the
        # first pass, but the number of hits must now be >0. Also, the number
        # of in-memory entries must have remained the same to prove that we are
        # reusing the same cache entries.
        zcuf.getUser('testlogin')
        report = rcm.getCacheReport()
        self.failUnless(len(report) == 1)
        report_item = report[0]
        self.failIf(report_item.get('misses') != firstpass_misses)
        self.failUnless(report_item.get('hits') > firstpass_hits)
        self.failIf(report_item.get('entries') != firstpass_entries)


if __name__ == "__main__":
    unittest.main()

def test_suite():
    return unittest.TestSuite((
        unittest.makeSuite( PluggableAuthServiceCachingTests ),
        ))



More information about the Zope-CVS mailing list