[Zope-CVS] CVS: Products/AdaptableStorage/serial_ofs - SecurityAttributes.py:1.1
Shane Hathaway
shane@zope.com
Sat, 1 Mar 2003 10:25:34 -0500
Update of /cvs-repository/Products/AdaptableStorage/serial_ofs
In directory cvs.zope.org:/tmp/cvs-serv20325/serial_ofs
Added Files:
SecurityAttributes.py
Log Message:
Added an untested module for (de)serializing Zope 2 security attributes.
=== Added File Products/AdaptableStorage/serial_ofs/SecurityAttributes.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Aspect for loading/storing Zope 2 security attributes.
$Id: SecurityAttributes.py,v 1.1 2003/03/01 15:25:33 shane Exp $
"""
from types import TupleType
from AccessControl.Permission import pname
import Products
from mapper_public import IAspectSerializer, RowSequenceSchema
_permission_dict_cache = None
def getPermissionDict():
"""Returns a dictionary mapping permission attribute name to permission.
Does not discover permissions defined in ZClass products, since that
would require access to the Zope application in the database.
"""
if _permission_dict_cache is not None:
return _permission_dict_cache
res = {}
for item in Products.__ac_permissions__:
p = item[0]
attr = pname(p)
res[attr] = p
_permission_dict_cache = res
return res
## Declaration types:
##
## executable owner
## "executable-owner", "", "", path/to/userfolder/username
## local roles
## "local-role", role_name, "", username
## user-defined roles
## "define-role", role_name, "", ""
## proxy roles
## "proxy-role", role_name, "", ""
## permission mapping
## "permission-role", role_name, permission_name, ""
## "permission-no-acquire", "", permission_name, ""
class SecurityAttributes:
__implements__ = IAspectSerializer
schema = RowSequenceSchema()
schema.addField('declaration_type', 'string')
schema.addField('role', 'string')
schema.addField('permission', 'string')
schema.addField('username', 'string')
def getSchema(self):
return self.schema
def canSerialize(self, obj):
return 1
def serialize(self, obj, event):
res = []
eo = getattr(obj, '_owner', None)
if eo is not None:
event.ignoreAttribute('_owner')
path, username = eo
if '/' in username:
raise ValueError, '/ not allowed in user names'
s = '%s/%s' % ('/'.join(path), username)
res.append(('executable-owner', '', '', s))
roles = getattr(obj, '__ac_roles__', None)
if roles is not None:
event.ignoreAttribute('__ac_roles__')
for role in roles:
res.append(('define-role', role, '', ''))
local_roles = getattr(obj, '__ac_local_roles__', None)
if local_roles is not None:
event.ignoreAttribute('__ac_local_roles__')
for username, roles in local_roles.items():
for role in roles:
res.append(('local-role', role, '', username))
proxy_roles = getattr(obj, '_proxy_roles', None)
if proxy_roles is not None:
event.ignoreAttribute('_proxy_roles')
for role in proxy_roles:
res.append(('proxy-role', role, '', ''))
pdict = None
for key, value in obj.__dict__.items():
if key.endswith('_Permission') and key.startswith('_'):
if p_dict is None:
p_dict = getPermissionDict()
p = p_dict.get(attr)
if attr is not None:
event.ignoreAttribute(attr)
for role in value:
res.append(('permission-role', role, p, ''))
# List means acquired, tuple means not acquired.
if isinstance(role, TupleType):
res.append(('permission-no-acquire', '', p, ''))
return res
def deserialize(self, obj, event, state):
local_roles = {} # { username -> [role,] }
defined_roles = [] # [role,]
proxy_roles = [] # [role,]
permission_roles = {} # { permission -> [role,] }
permission_acquired = {} # { permission -> 0 or 1 }
for decl_type, role, permission, username in state:
if decl_type == 'executable-owner':
assert not role
assert not permission
assert username
pos = username.rfind('/')
if pos < 0:
# Default to the root folder
ufolder = ['acl_users']
uname = username
else:
ufolder = list(username[:pos].split('/'))
uname = username[pos + 1:]
assert ufolder
assert uname
obj._owner = (ufolder, uname)
elif decl_type == 'local-role':
assert role
assert not permission
assert username
r = local_roles.get(username)
if r is None:
r = []
local_roles[username] = r
r.append(role)
elif decl_type == 'define-role':
assert role
assert not permission
assert not username
defined_roles.append(role)
elif decl_type == 'proxy-role':
assert role
assert not permission
assert not username
proxy_roles.append(role)
elif decl_type == 'permission-role':
assert role
assert permission
assert not username
r = permission_roles.get(permission)
if r is None:
r = []
permission_roles[permission] = r
r.append(role)
if not permission_acquired.has_key(permission):
permission_acquired[permission] = 1
elif decl_type == 'permission-no-acquire':
assert not role
assert permission
assert not username
permission_acquired[permission] = 0
else:
raise ValueError, (
'declaration_type %s unknown' % repr(decl_type))
if local_roles:
obj.__ac_local_roles__ = local_roles
if defined_roles:
obj.__ac_roles = defined_roles
if proxy_roles:
obj._proxy_roles = proxy_roles
p, acquired in permission_acquired.items():
roles = permission_roles.get(p)
if not acquired:
roles = tuple(roles)
setattr(obj, pname(p), roles)