[Zope-CVS] CVS: Products/AdaptableStorage/serial_ofs - SecurityAttributes.py:1.1

Shane Hathaway shane@zope.com
Sat, 1 Mar 2003 10:25:34 -0500


Update of /cvs-repository/Products/AdaptableStorage/serial_ofs
In directory cvs.zope.org:/tmp/cvs-serv20325/serial_ofs

Added Files:
	SecurityAttributes.py 
Log Message:
Added an untested module for (de)serializing Zope 2 security attributes.


=== Added File Products/AdaptableStorage/serial_ofs/SecurityAttributes.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Aspect for loading/storing Zope 2 security attributes.

$Id: SecurityAttributes.py,v 1.1 2003/03/01 15:25:33 shane Exp $
"""

from types import TupleType

from AccessControl.Permission import pname
import Products

from mapper_public import IAspectSerializer, RowSequenceSchema


_permission_dict_cache = None

def getPermissionDict():
    """Returns a dictionary mapping permission attribute name to permission.

    Does not discover permissions defined in ZClass products, since that
    would require access to the Zope application in the database.
    """
    if _permission_dict_cache is not None:
        return _permission_dict_cache
    res = {}
    for item in Products.__ac_permissions__:
        p = item[0]
        attr = pname(p)
        res[attr] = p
    _permission_dict_cache = res
    return res


## Declaration types:
##
## executable owner
##   "executable-owner", "", "", path/to/userfolder/username
## local roles
##   "local-role", role_name, "", username
## user-defined roles
##   "define-role", role_name, "", ""
## proxy roles
##   "proxy-role", role_name, "", ""
## permission mapping
##   "permission-role", role_name, permission_name, ""
##   "permission-no-acquire", "", permission_name, ""


class SecurityAttributes:

    __implements__ = IAspectSerializer

    schema = RowSequenceSchema()
    schema.addField('declaration_type', 'string')
    schema.addField('role', 'string')
    schema.addField('permission', 'string')
    schema.addField('username', 'string')

    def getSchema(self):
        return self.schema

    def canSerialize(self, obj):
        return 1

    def serialize(self, obj, event):
        res = []

        eo = getattr(obj, '_owner', None)
        if eo is not None:
            event.ignoreAttribute('_owner')
            path, username = eo
            if '/' in username:
                raise ValueError, '/ not allowed in user names'
            s = '%s/%s' % ('/'.join(path), username)
            res.append(('executable-owner', '', '', s))

        roles = getattr(obj, '__ac_roles__', None)
        if roles is not None:
            event.ignoreAttribute('__ac_roles__')
            for role in roles:
                res.append(('define-role', role, '', ''))

        local_roles = getattr(obj, '__ac_local_roles__', None)
        if local_roles is not None:
            event.ignoreAttribute('__ac_local_roles__')
            for username, roles in local_roles.items():
                for role in roles:
                    res.append(('local-role', role, '', username))

        proxy_roles = getattr(obj, '_proxy_roles', None)
        if proxy_roles is not None:
            event.ignoreAttribute('_proxy_roles')
            for role in proxy_roles:
                res.append(('proxy-role', role, '', ''))

        pdict = None
        for key, value in obj.__dict__.items():
            if key.endswith('_Permission') and key.startswith('_'):
                if p_dict is None:
                    p_dict = getPermissionDict()
                p = p_dict.get(attr)
                if attr is not None:
                    event.ignoreAttribute(attr)
                    for role in value:
                        res.append(('permission-role', role, p, ''))
                    # List means acquired, tuple means not acquired.
                    if isinstance(role, TupleType):
                        res.append(('permission-no-acquire', '', p, ''))

        return res
        

    def deserialize(self, obj, event, state):
        local_roles = {}       # { username -> [role,] }
        defined_roles = []     # [role,]
        proxy_roles = []       # [role,]
        permission_roles = {}  # { permission -> [role,] }
        permission_acquired = {}  # { permission -> 0 or 1 }

        for decl_type, role, permission, username in state:
            if decl_type == 'executable-owner':
                assert not role
                assert not permission
                assert username
                pos = username.rfind('/')
                if pos < 0:
                    # Default to the root folder
                    ufolder = ['acl_users']
                    uname = username
                else:
                    ufolder = list(username[:pos].split('/'))
                    uname = username[pos + 1:]
                assert ufolder
                assert uname
                obj._owner = (ufolder, uname)

            elif decl_type == 'local-role':
                assert role
                assert not permission
                assert username
                r = local_roles.get(username)
                if r is None:
                    r = []
                    local_roles[username] = r
                r.append(role)

            elif decl_type == 'define-role':
                assert role
                assert not permission
                assert not username
                defined_roles.append(role)

            elif decl_type == 'proxy-role':
                assert role
                assert not permission
                assert not username
                proxy_roles.append(role)

            elif decl_type == 'permission-role':
                assert role
                assert permission
                assert not username
                r = permission_roles.get(permission)
                if r is None:
                    r = []
                    permission_roles[permission] = r
                r.append(role)
                if not permission_acquired.has_key(permission):
                    permission_acquired[permission] = 1

            elif decl_type == 'permission-no-acquire':
                assert not role
                assert permission
                assert not username
                permission_acquired[permission] = 0

            else:
                raise ValueError, (
                    'declaration_type %s unknown' % repr(decl_type))

        if local_roles:
            obj.__ac_local_roles__ = local_roles
        if defined_roles:
            obj.__ac_roles = defined_roles
        if proxy_roles:
            obj._proxy_roles = proxy_roles
        
        p, acquired in permission_acquired.items():
            roles = permission_roles.get(p)
            if not acquired:
                roles = tuple(roles)
            setattr(obj, pname(p), roles)