[Zope-CVS] CVS: Products/PluggableAuthService/Extensions -
upgrade.py:1.1.2.1
Lennart Regebro
regebro at nuxeo.com
Tue Aug 31 10:41:43 EDT 2004
Update of /cvs-repository/Products/PluggableAuthService/Extensions
In directory cvs.zope.org:/tmp/cvs-serv26731/Extensions
Added Files:
Tag: regebro-implement_challenge-branch
upgrade.py
Log Message:
Merge from HEAD + new challenge implementation.
=== Added File Products/PluggableAuthService/Extensions/upgrade.py ===
##############################################################################
#
# Copyright (c) 2004 Zope Corporation. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Visible Source
# License, Version 1.0 (ZVSL). A copy of the ZVSL should accompany this
# distribution.
#
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" External method for upgrading existing AccessControl.User.UserFolder
NOTA BENE: Use at your own risk. This external method will replace a
stock User Folder (AccessControl.User.UserFolder) with a
PluggableAuthService consisting of the following:
- ZODBUserManager with a record for each existing User
(AccessControl.User.User)
- ZODBRoleManger with a record for each existing role present
in the __ac_roles__ attribute of the container (minus Anonymous
and Authenticated)
Each migrated user will be assigned the global roles they have in the
previous acl_users record.
$Id: upgrade.py,v 1.1.2.1 2004/08/31 14:41:42 regebro Exp $
"""
from zLOG import LOG, INFO
def _write( response, tool, message, level=INFO ):
LOG( tool, level, message )
if response is not None:
response.write( message )
def _replaceUserFolder(self, RESPONSE=None):
"""replaces the old acl_users folder with a PluggableAuthService,
preserving users and passwords, if possible
"""
from Acquisition import aq_base
from Products.PluggableAuthService.PluggableAuthService \
import PluggableAuthService, _PLUGIN_TYPE_INFO
from Products.PluginRegistry.PluginRegistry import PluginRegistry
from Products.PluggableAuthService.plugins.ZODBUserManager \
import ZODBUserManager
from Products.PluggableAuthService.plugins.ZODBRoleManager \
import ZODBRoleManager
from Products.PluggableAuthService.interfaces.plugins \
import IAuthenticationPlugin, IUserEnumerationPlugin
from Products.PluggableAuthService.interfaces.plugins \
import IRolesPlugin, IRoleEnumerationPlugin, IRoleAssignerPlugin
if getattr( aq_base(self), '__allow_groups__', None ):
if self.__allow_groups__.__class__ is PluggableAuthService:
_write( RESPONSE
, 'replaceUserFolder'
, 'Already replaced this user folder\n' )
return
old_acl = self.__allow_groups__
new_acl = PluggableAuthService()
preg = PluginRegistry( _PLUGIN_TYPE_INFO )
preg._setId( 'plugins' )
new_acl._setObject( 'plugins', preg )
self._setObject( 'new_acl_users', new_acl )
new_acl = getattr( self, 'new_acl_users' )
user_folder = ZODBUserManager( 'users' )
new_acl._setObject( 'users', user_folder )
role_manager = ZODBRoleManager( 'roles' )
new_acl._setObject( 'roles', role_manager )
plugins = getattr( new_acl, 'plugins' )
plugins.activatePlugin( IAuthenticationPlugin, 'users' )
plugins.activatePlugin( IUserEnumerationPlugin, 'users' )
plugins.activatePlugin( IRolesPlugin, 'roles' )
plugins.activatePlugin( IRoleEnumerationPlugin, 'roles' )
plugins.activatePlugin( IRoleAssignerPlugin, 'roles' )
for user_name in old_acl.getUserNames():
old_user = old_acl.getUser( user_name )
_write( RESPONSE
, 'replaceRootUserFolder'
, 'Translating user %s\n' % user_name )
_migrate_user( new_acl.users, user_name, old_user._getPassword() )
new_user = new_acl.getUser( user_name )
for role_id in old_user.getRoles():
if role_id not in ['Authenticated', 'Anonymous']:
new_acl.roles.assignRoleToPrincipal( role_id,
new_user.getId() )
self._delObject( 'acl_users' )
self._setObject( 'acl_users', aq_base( new_acl ) )
self._delObject( 'new_acl_users' )
self.__allow_groups__ = aq_base( new_acl )
_write( RESPONSE
, 'replaceRootUserFolder'
, 'Replaced root acl_users with PluggableAuthService\n' )
get_transaction().commit()
def _migrate_user( new_user_folder, login, password ):
from AccessControl import AuthEncoding
if AuthEncoding.is_encrypted( password ):
new_user_folder._user_passwords[ login ] = password
new_user_folder._login_to_userid[ login ] = login
new_user_folder._userid_to_login[ login ] = login
else:
new_user_folder.addUser( login, login, password )
def _upgradeLocalRoleAssignments(self, RESPONSE=None):
""" upgrades the __ac_local_roles__ attributes on objects to account
for a move to using the PluggableAuthService.
"""
from Acquisition import aq_base
seen = {}
def descend(user_folder, obj):
path = obj.getPhysicalPath()
if path not in seen:
# get __ac_local_roles__, break it apart and refashion it
# with new spellings.
seen[path] = 1
if getattr( aq_base( obj ), '__ac_local_roles__', None ):
if not callable(obj.__ac_local_roles__):
new_map = {}
map = obj.__ac_local_roles__
for key in map.keys():
new_principals = user_folder.searchPrincipals(id=key)
if not new_principals:
_write(
RESPONSE
, 'upgradeLocalRoleAssignmentsFromRoot'
, ' Ignoring map for unknown principal %s\n'
% key )
new_map[key] = map[key]
continue
npid = new_principals[0]['id']
new_map[npid] = map[key]
_write( RESPONSE
, 'upgradeLocalRoleAssignmentsFromRoot'
, ' Translated %s to %s\n' % ( key, npid ) )
_write( RESPONSE
, 'upgradeLocalRoleAssignmentsFromRoot'
, ' Assigned roles %s to %s\n' % ( map[key]
, npid ) )
obj.__ac_local_roles__ = new_map
_write( RESPONSE
, 'upgradeLocalRoleAssignmentsFromRoot'
, ( 'Local Roles map changed for (%s)\n'
% '/'.join(path) ) )
if (len(seen) % 100 ) == 0:
get_transaction().commit()
_write( RESPONSE
, 'upgradeLocalRoleAssignmentsFromRoot'
, " -- committed at object # %d\n" % len( seen ) )
if getattr(aq_base(obj), 'isPrincipiaFolderish', 0):
for o in obj.objectValues():
descend(user_folder, o)
if getattr( self, '_upgraded_acl_users', None ):
_write( RESPONSE
, '_upgradeLocalRoleAssignments'
, 'Local role assignments have already been updated.\n' )
return
descend(self.acl_users, self)
get_transaction().commit()
# External Method to use
def replace_acl_users(self, RESPONSE=None):
_replaceUserFolder(self, RESPONSE)
_upgradeLocalRoleAssignments(self, RESPONSE)
self._upgraded_acl_users = 1
_write( RESPONSE
, 'replace_acl_users'
, 'Root acl_users has been replaced,'
' and local role assignments updated.\n' )
More information about the Zope-CVS
mailing list