[Zope-CVS] CVS: Products/PluggableAuthService/tests -
test_Caching.py:1.2 test_PluggableAuthService.py:1.14
Jens Vagelpohl
jens at dataflake.org
Mon Nov 8 17:35:45 EST 2004
Update of /cvs-repository/Products/PluggableAuthService/tests
In directory cvs.zope.org:/tmp/cvs-serv5460/tests
Modified Files:
test_PluggableAuthService.py
Added Files:
test_Caching.py
Log Message:
- merge the jens-implement_caching_branch . For some details please
see doc/caching.stx.
=== Products/PluggableAuthService/tests/test_Caching.py 1.1 => 1.2 ===
--- /dev/null Mon Nov 8 17:35:45 2004
+++ Products/PluggableAuthService/tests/test_Caching.py Mon Nov 8 17:35:45 2004
@@ -0,0 +1,205 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
+# Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this
+# distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import unittest
+
+from Acquisition import aq_base
+from OFS.Cache import isCacheable
+
+from Products.StandardCacheManagers.RAMCacheManager import RAMCacheManager
+
+class FauxRequest:
+
+ def __init__( self, steps=(), **kw ):
+
+ self.steps = steps
+ self._dict = {}
+ self._dict.update( kw )
+
+ def get( self, key, default=None ):
+
+ return self._dict.get( key, default )
+
+ def _authUserPW( self ):
+ form = self.get( 'form' )
+ return ( form.get( 'login' ), form.get( 'password' ) )
+
+ def __getitem__( self, key ):
+
+ return self._dict[ key ]
+
+ def __setitem__( self, key, value ):
+
+ self._dict[ key ] = value
+
+class PluggableAuthServiceCachingTests( unittest.TestCase ):
+
+ def tearDown( self ):
+ pass
+
+
+ def _getTargetClass( self ):
+
+ from Products.PluggableAuthService.PluggableAuthService \
+ import PluggableAuthService
+
+ return PluggableAuthService
+
+ def _makeOne( self, plugins=None, *args, **kw ):
+
+ zcuf = self._getTargetClass()( *args, **kw )
+
+ if plugins is not None:
+ zcuf._setObject( 'plugins', plugins )
+
+ rcm = RAMCacheManager('ramcache')
+ zcuf._setObject('ramcache', rcm)
+
+ return zcuf
+
+ def _makePlugins( self, plugin_type_info=None ):
+
+ from Products.PluggableAuthService.PluggableAuthService \
+ import _PLUGIN_TYPE_INFO
+ from Products.PluginRegistry.PluginRegistry import PluginRegistry
+
+ if plugin_type_info is None:
+ plugin_type_info = _PLUGIN_TYPE_INFO
+
+ reg = PluginRegistry( plugin_type_info=plugin_type_info )
+ reg._setId( 'plugins' )
+ reg._plugins = {}
+
+ return reg
+
+ def _makeAndFill(self):
+
+ from Products.PluggableAuthService.plugins import ZODBUserManager
+ from Products.PluggableAuthService.plugins import ZODBRoleManager
+
+ plugin_registry = self._makePlugins()
+ user_source = ZODBUserManager.ZODBUserManager('zodb_users')
+ roles_source = ZODBRoleManager.ZODBRoleManager('zodb_roles')
+ pas_instance = self._makeOne(plugins=plugin_registry)
+ pas_instance._setObject('zodb_users', user_source)
+ pas_instance._setObject('zodb_roles', roles_source)
+
+ return pas_instance
+
+ def test_empty( self ):
+ zcuf = self._makeOne()
+ rcm = getattr(zcuf, 'ramcache')
+
+ # This is needed because some underlying ZCacheable code wants to
+ # use self.REQUEST :/
+ setattr(rcm, 'REQUEST', FauxRequest())
+
+ # Make sure the PAS instance itself is Cacheable
+ self.assert_(isCacheable(zcuf))
+
+ # Make sure the PAS instance is not associated with any cache manager
+ # by default
+ self.assert_(zcuf.ZCacheable_getManager() is None)
+
+ # Make sure the RAMCacheManager is empty
+ self.assert_(len(rcm.getCacheReport()) == 0)
+
+ def test_caching_in_PAS(self):
+ zcuf = self._makeAndFill()
+ rcm = getattr(zcuf, 'ramcache')
+ plugin_registry = getattr(zcuf, 'plugins')
+ user_source = getattr(zcuf, 'zodb_users')
+ roles_source = getattr(zcuf, 'zodb_roles')
+
+ # This is needed because some underlying ZCacheable code wants to
+ # use self.REQUEST :/
+ setattr(zcuf, 'REQUEST', FauxRequest())
+
+ # First, we register the ZODBUserManager as a plugin suitable
+ # for storing and returning user objects and the ZODBRoleManager
+ # for roles. Basic scaffolding to be able to store and retrieve users.
+ from Products.PluggableAuthService.interfaces import plugins
+
+ plugin_registry.activatePlugin( plugins.IUserEnumerationPlugin
+ , user_source.getId()
+ )
+ plugin_registry.activatePlugin( plugins.IUserAdderPlugin
+ , user_source.getId()
+ )
+ plugin_registry.activatePlugin( plugins.IRolesPlugin
+ , roles_source.getId()
+ )
+ plugin_registry.activatePlugin( plugins.IRoleEnumerationPlugin
+ , roles_source.getId()
+ )
+ plugin_registry.activatePlugin( plugins.IRoleAssignerPlugin
+ , roles_source.getId()
+ )
+
+ # Now add a user and make sure it's there
+ zcuf._doAddUser('testlogin', 'secret', ['Member', 'Anonymous'], [])
+ self.failIf(zcuf.getUser('testlogin') is None)
+
+ # Then we activate caching for the PAS instance itself
+ zcuf.ZCacheable_setManagerId(rcm.getId())
+
+ # Make sure the PAS instance is associated with the cache
+ self.failUnless(aq_base(zcuf.ZCacheable_getManager()) is aq_base(rcm))
+
+ # Now we can see if the cache is getting used. Test for emptiness
+ # first, then retrieve a user, and the cache should have content.
+ # Then test again to see if the cache entries are being used.
+ # This is a bit nasty because I am relying on knowing the structure
+ # of the cache report, which is really an internal implementation
+ # detail.
+
+ # First check: The cache must be empty
+ report = rcm.getCacheReport()
+ self.failUnless(len(report) == 0)
+
+ # The user is being requested once. At this point there must be one
+ # entry for the PAS instance. The number of "misses" must be >0 because
+ # the first cache check will have failed. The number of cache hits must
+ # be zero.
+ zcuf.getUser('testlogin')
+ report = rcm.getCacheReport()
+ self.failUnless(len(report) == 1)
+ report_item = report[0]
+ firstpass_misses = report_item.get('misses')
+ firstpass_hits = report_item.get('hits')
+ firstpass_entries = report_item.get('entries')
+ self.failUnless(firstpass_misses > 0)
+ self.failUnless(firstpass_hits == 0)
+
+ # The user is requested again. This request should produce a cache hit,
+ # so the number of "misses" must have stayed the same as after the
+ # first pass, but the number of hits must now be >0. Also, the number
+ # of in-memory entries must have remained the same to prove that we are
+ # reusing the same cache entries.
+ zcuf.getUser('testlogin')
+ report = rcm.getCacheReport()
+ self.failUnless(len(report) == 1)
+ report_item = report[0]
+ self.failIf(report_item.get('misses') != firstpass_misses)
+ self.failUnless(report_item.get('hits') > firstpass_hits)
+ self.failIf(report_item.get('entries') != firstpass_entries)
+
+
+if __name__ == "__main__":
+ unittest.main()
+
+def test_suite():
+ return unittest.TestSuite((
+ unittest.makeSuite( PluggableAuthServiceCachingTests ),
+ ))
=== Products/PluggableAuthService/tests/test_PluggableAuthService.py 1.13 => 1.14 ===
--- Products/PluggableAuthService/tests/test_PluggableAuthService.py:1.13 Sat Oct 16 16:15:46 2004
+++ Products/PluggableAuthService/tests/test_PluggableAuthService.py Mon Nov 8 17:35:45 2004
@@ -632,68 +632,6 @@
self.assertEqual( len( user_ids ), 1 )
self.assertEqual( user_ids[ 0 ][0], 'login__foo' )
- def test__extractUserIds_cache( self ):
-
- from Products.PluggableAuthService.interfaces.plugins \
- import IExtractionPlugin, IAuthenticationPlugin
-
- plugins = self._makePlugins()
- zcuf = self._makeOne( plugins )
-
- login = DummyPlugin()
- directlyProvides( login, ( IExtractionPlugin, IAuthenticationPlugin ) )
- login.extractCredentials = _extractLogin
- login.authenticateCredentials = _authLogin
-
- zcuf._setObject( 'login', login )
-
- extra = DummyPlugin()
- directlyProvides( extra, ( IExtractionPlugin, IAuthenticationPlugin ) )
- extra.extractCredentials = _extractExtra
- extra.authenticateCredentials = _authExtra
-
- zcuf._setObject( 'extra', extra )
-
- plugins = zcuf._getOb( 'plugins' )
-
- plugins.activatePlugin( IExtractionPlugin, 'extra' )
- plugins.activatePlugin( IExtractionPlugin, 'login' )
- plugins.activatePlugin( IAuthenticationPlugin, 'extra' )
- plugins.activatePlugin( IAuthenticationPlugin, 'login' )
-
- cache = {}
- request = FauxRequest( form={ 'login' : 'foo' , 'password' : 'bar' }
- , extra='qux'
- )
-
-
- user_ids = zcuf._extractUserIds( request=request
- , plugins=zcuf.plugins
- , cache=cache
- )
-
- self.assertEqual( len( user_ids ), 2 )
- self.assertEqual( user_ids[ 0 ][0], 'extra__qux' )
- self.assertEqual( user_ids[ 1 ][0], 'login__foo' )
-
- self.assertEqual( len( cache ), 2 )
- self.failUnless( [ ('login__foo', 'foo') ] in cache.values() )
- self.failUnless( [ ('extra__qux', 'qux') ] in cache.values() )
-
- key = [ x[0] for x in cache.items()
- if x[1] == [('login__foo', 'foo')] ][0]
- cache[ key ].append( ('forced__baz', 'baz' ) )
-
- user_ids = zcuf._extractUserIds( request=request
- , plugins=zcuf.plugins
- , cache=cache
- )
-
- self.assertEqual( len( user_ids ), 3, user_ids )
- self.assertEqual( user_ids[ 0 ][0], 'extra__qux' )
- self.assertEqual( user_ids[ 1 ][0], 'login__foo' )
- self.assertEqual( user_ids[ 2 ][0], 'forced__baz' )
-
def test__getObjectContext_no_steps( self ):
zcuf = self._makeOne()
@@ -1026,28 +964,6 @@
self.assertEqual( len( groups ), 2 )
self.failIf( 'bar:group3' in groups )
self.failIf( 'bar:group4' in groups )
-
- def test__findUser_from_cache( self ):
-
- plugins = self._makePlugins()
- zcuf = self._makeOne(plugins)
- faux = FauxUser( 'faux' )
- cache = { 'faux' : faux }
-
- user = zcuf._findUser( plugins, 'faux', 'faux', cache )
-
- self.failUnless( aq_base( user ) is faux )
- self.failUnless( aq_parent( user ) is zcuf )
-
- def test__findUser_loads_cache( self ):
-
- plugins = self._makePlugins()
-
- zcuf = self._makeOne(plugins)
- cache = {}
- user = zcuf._findUser( plugins, 'someone', 'someone', cache )
-
- self.failUnless( aq_base(cache[ 'someone' ]) is aq_base( user ) )
def test__authorizeUser_force_ok( self ):
More information about the Zope-CVS
mailing list