[Zope3-checkins] CVS: Zope3/src/zope/security - .cvsignore:1.1.2.1 __init__.py:1.1.2.1 _proxy.c:1.1.2.1 checker.py:1.1.2.1 interfaces.py:1.1.2.1 proxy.py:1.1.2.1 readme.txt:1.1.2.1 restrictedbuiltins.py:1.1.2.1 restrictedinterpreter.py:1.1.2.1 securitycontext.py:1.1.2.1 securitymanagement.py:1.1.2.1 securitymanager.py:1.1.2.1 setup.py:1.1.2.1 simplesecuritypolicies.py:1.1.2.1
Jim Fulton
jim@zope.com
Mon, 23 Dec 2002 14:33:17 -0500
Update of /cvs-repository/Zope3/src/zope/security
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/security
Added Files:
Tag: NameGeddon-branch
.cvsignore __init__.py _proxy.c checker.py interfaces.py
proxy.py readme.txt restrictedbuiltins.py
restrictedinterpreter.py securitycontext.py
securitymanagement.py securitymanager.py setup.py
simplesecuritypolicies.py
Log Message:
Initial renaming before debugging
=== Added File Zope3/src/zope/security/.cvsignore ===
build
=== Added File Zope3/src/zope/security/__init__.py ===
#
# This file is necessary to make this directory a package.
=== Added File Zope3/src/zope/security/_proxy.c === (824/924 lines abridged)
/*
* Security proxy.
*/
#include <Python.h>
#include "Zope/Proxy/proxy.h"
static PyObject *__class__str = 0, *__name__str = 0, *__module__str = 0;
typedef struct {
ProxyObject proxy;
PyObject *proxy_checker;
} SecurityProxy;
#undef Proxy_Check
#define Proxy_Check(proxy) \
PyObject_TypeCheck(proxy, &SecurityProxyType)
#define Proxy_GetChecker(proxy) \
(((SecurityProxy *)proxy)->proxy_checker)
/* Replace the "safe" version from the proxy.h API with a faster version. */
#undef Proxy_GetObject
#define Proxy_GetObject(o) \
(((SecurityProxy *)o)->proxy.proxy_object)
static PyTypeObject SecurityProxyType;
/*
* Machinery to call the checker.
*/
typedef PyObject *(*function1)(PyObject *);
static int
check(PyObject *checker, char *opname, PyObject *object)
{
PyObject *checked;
checked = PyObject_CallMethod(checker, "check", "(Os)",
object, opname);
if (checked == NULL)
return 0;
Py_DECREF(checked);
return 1;
}
static int
[-=- -=- -=- 824 lines omitted -=- -=- -=-]
PyErr_SetString(PyExc_TypeError,
"getChecker argument must be a _Proxy");
return NULL;
}
result = Proxy_GetChecker(arg);
Py_INCREF(result);
return result;
}
static PyMethodDef
module_functions[] = {
{"getObject", module_getObject, METH_O, "get object from proxy"},
{"getChecker", module_getChecker, METH_O, "get checker from proxy"},
{NULL}
};
static char
module___doc__[] = "Security proxy implementation.";
void
init_Proxy(void)
{
PyObject *m;
if (Proxy_Import() < 0)
return;
__class__str = PyString_FromString("__class__");
if (! __class__str) return;
__name__str = PyString_FromString("__name__");
if (! __name__str) return;
__module__str = PyString_FromString("__module__");
if (! __module__str) return;
SecurityProxyType.ob_type = &PyType_Type;
SecurityProxyType.tp_alloc = PyType_GenericAlloc;
SecurityProxyType.tp_free = _PyObject_GC_Del;
SecurityProxyType.tp_base = ProxyType;
if (PyType_Ready(&SecurityProxyType) < 0)
return;
m = Py_InitModule3("_Proxy", module_functions, module___doc__);
if (m == NULL)
return;
Py_INCREF(&SecurityProxyType);
PyModule_AddObject(m, "_Proxy", (PyObject *)&SecurityProxyType);
}
=== Added File Zope3/src/zope/security/checker.py ===
from zope.security.interfaces import IChecker
from zope.exceptions \
import Unauthorized, ForbiddenAttribute, Forbidden, DuplicationError
from zope.interface.interfaces import IInterface
from zope.interface import Interface
from zope.security._proxy import _Proxy as Proxy
from zope.security.interfaces import ISecurityProxyFactory
from zope.security.securitymanagement import getSecurityManager
import sys, os, types
import datetime
if os.environ.get('ZOPE_WATCH_CHECKERS'):
WATCH_CHECKERS = 1
else:
WATCH_CHECKERS = 0
# Marker for public attributes
CheckerPublic = object()
def ProxyFactory(object, checker=None):
"""Factory function that creates a proxy for an object
The proxy checker is looked up if not provided.
"""
if checker is None:
checker = getattr(object, '__Security_checker__', None)
if checker is None:
checker = selectChecker(object)
if checker is None:
return object
else:
# Maybe someone passed us a proxy and a checker
if type(object) is Proxy:
# XXX should we keep the existing proxy or create a new one.
return object
return Proxy(object, checker)
ProxyFactory.__implements__ = ISecurityProxyFactory
class Checker:
__implements__ = IChecker
def __init__(self, permission_func,
setattr_permission_func=lambda name: None
):
"""Create a checker
A dictionary or a callable must be provided for computing
permissions for names. The callable will be called with
attribute names and must return a permission id, None, or the
special marker, CheckerPublic. If None is returned, then
access to the name is forbidden. If CheckerPublic is returned,
then access will be granted without checking a permission.
An optional setattr permission function or dictionary may be
provided for checking set attribute access.
"""
if type(permission_func) is dict:
permission_func = permission_func.get
self._permission_func = permission_func
if type(setattr_permission_func) is dict:
setattr_permission_func = setattr_permission_func.get
self._setattr_permission_func = setattr_permission_func
def getPermission_func(self):
return self._permission_func
def getSetattrPermission_func(self):
return self._setattr_permission_func
def permission_id(self, name):
"""Return the result of calling the permission func
"""
return self._permission_func(name)
def setattr_permission_id(self, name):
"""Return the result of calling the permission func
"""
return self._setattr_permission_func(name)
############################################################
# Implementation methods for interface
# Zope.Security.IChecker.
def check_getattr(self, object, name):
'See Zope.Security.IChecker.IChecker'
self.check(object, name)
def check_setattr(self, object, name):
'See Zope.Security.IChecker.IChecker'
if WATCH_CHECKERS:
print >> sys.stderr, ('Checking %r.%s:' % (object, name)),
# We have the information we need already
permission = self._setattr_permission_func(name)
if permission:
if permission is CheckerPublic:
if WATCH_CHECKERS:
print >> sys.stderr, 'Public.'
return # Public
manager = getSecurityManager()
if manager.checkPermission(permission, object):
if WATCH_CHECKERS:
print >> sys.stderr, 'Granted.'
return
else:
if WATCH_CHECKERS:
print >> sys.stderr, 'Unauthorized.'
raise Unauthorized(name=name)
if WATCH_CHECKERS:
print >> sys.stderr, 'Forbidden.'
raise ForbiddenAttribute(name)
def check(self, object, name):
'See Zope.Security.IChecker.IChecker'
if WATCH_CHECKERS:
print >> sys.stderr, ('Checking %r.%s:' % (object, name)),
# We have the information we need already
permission = self._permission_func(name)
if permission:
if permission is CheckerPublic:
if WATCH_CHECKERS:
print >> sys.stderr, 'Public.'
return # Public
manager = getSecurityManager()
if manager.checkPermission(permission, object):
if WATCH_CHECKERS:
print >> sys.stderr, 'Granted.'
return
else:
if WATCH_CHECKERS:
print >> sys.stderr, 'Unauthorized.'
raise Unauthorized(name=name)
elif name in _always_available:
if WATCH_CHECKERS:
print >> sys.stderr, 'Always available.'
return
if WATCH_CHECKERS:
print >> sys.stderr, 'Forbidden.'
raise ForbiddenAttribute(name)
def proxy(self, value):
'See Zope.Security.IChecker.IChecker'
# Now we need to create a proxy
checker = getattr(value, '__Security_checker__', None)
if checker is None:
checker = selectChecker(value)
if checker is None:
return value
return Proxy(value, checker)
#
############################################################
def NamesChecker(names=(), permission_id=CheckerPublic, **__kw__):
"""Return a checker that grants access to a set of names.
A sequence of names is given as the first argument. If a second
argument, permission_id, is given, it is the permission required
to access the names. Additional names and persmission ids can be
supplied as keyword arguments.
"""
data = {}
data.update(__kw__)
for name in names:
if data.get(name, permission_id) is not permission_id:
raise DuplicationError(name)
data[name] = permission_id
return Checker(data.get)
def InterfaceChecker(interface, permission_id=CheckerPublic):
return NamesChecker(interface.names(1), permission_id)
def MultiChecker(specs):
"""Create a checker from a sequence of specifications
A specification is:
- A two-tuple with:
o a sequence of names or an interface
o a permission id
All the names in the sequence of names or the interface are
protected by the permission.
- A dictionoid (having anitems method), with items that are
name/permission-id pairs.
"""
data = {}
for spec in specs:
if type(spec) is tuple:
names, permission_id = spec
if IInterface.isImplementedBy(names):
names = names.names(1)
for name in names:
if data.get(name, permission_id) is not permission_id:
raise DuplicationError(name)
data[name] = permission_id
else:
for name, permission_id in spec.items():
if data.get(name, permission_id) is not permission_id:
raise DuplicationError(name)
data[name] = permission_id
return Checker(data.get)
def NonPrivateChecker(permission_id = CheckerPublic):
def check(name, permission_id=permission_id):
if name.startswith('_'):
return None
return permission_id
return Checker(check)
def selectChecker(object):
"""Get a checker for the given object
The appropriate checker is returned or None is returned. If the
return value is None, then object should not be wrapped in a proxy.
"""
# We need to be careful here. We might have a proxy, in which case
# we can't use the type. OTOH, we might not be able to use the
# __class__ either, since not everything has one.
# XXX we really need formal proxy introspection
if type(object) is Proxy:
# Is this already a security proxy?
return None
checker = _getChecker(getattr(object, '__class__', type(object)),
_defaultChecker)
if checker is NoProxy:
return None
if checker is _defaultChecker and isinstance(object, Exception):
return None
while not isinstance(checker, Checker):
checker = checker(object)
if checker is NoProxy or checker is None:
return None
return checker
def getCheckerForInstancesOf(class_):
return _checkers.get(class_)
def defineChecker(type_, checker):
"""Define a checker for a given type of object
The checker can be a Checker, or a function that, when called with
an object, returns a Checker.
"""
if not isinstance(type_, (type, types.ClassType, types.ModuleType)):
raise TypeError(
'type_ must be a type, class or module, not a %s' % type_)
if type_ in _checkers:
raise DuplicationError(type_)
_checkers[type_] = checker
NoProxy = object()
# _checkers is a mapping.
#
# - Keys are types
#
# - Values are
#
# o None => rock
# o a Checker
# o a function returning None or a Checker
#
_checkers = {}
_getChecker = _checkers.get
_defaultChecker = Checker({}.get)
def _instanceChecker(inst):
checker = _checkers.get(inst.__class__, _defaultChecker)
if checker is _defaultChecker and isinstance(inst, Exception):
return NoProxy # XXX we should be more careful
return checker
def _classChecker(class_):
checker = _checkers.get(class_, _typeChecker)
if checker is _typeChecker and issubclass(class_, Exception):
return NoProxy # XXX we should be more careful
return checker
def _moduleChecker(module):
return _checkers.get(module, _typeChecker)
_always_available = ['__lt__', '__le__', '__eq__',
'__gt__', '__ge__', '__ne__',
'__hash__', '__nonzero__',
'__class__', '__implements__',
]
_callableChecker = NamesChecker(['__str__', '__repr__', '__name__',
'__call__'])
_typeChecker = NamesChecker(['__str__', '__repr__', '__name__', '__module__',
'__bases__'])
_interfaceChecker = NamesChecker(['__str__', '__repr__', '__name__',
'__module__', '__bases__', 'getBases',
'isImplementedBy', 'extends'])
_iteratorChecker = NamesChecker(['next'])
BasicTypes = {
object: NoProxy,
int: NoProxy,
float: NoProxy,
long: NoProxy,
complex: NoProxy,
types.NoneType: NoProxy,
str: NoProxy,
unicode: NoProxy,
type(not 1): NoProxy, # Boolean, if available :)
}
class _Sequence(object):
def __len__(self): return 0
def __getitem__(self, i): raise IndexError
_default_checkers = {
dict: NamesChecker(['__getitem__', '__len__', '__iter__',
'get', 'has_key', '__copy__', '__str__', '__repr__',
'keys', 'values', 'items',
'iterkeys', 'iteritems', 'itervalues', '__contains__',
]),
list: NamesChecker(['__getitem__', '__getslice__', '__len__', '__iter__',
'__contains__', 'index', 'count', '__str__',
'__repr__']),
# YAGNI: () a rock
tuple: NamesChecker(['__getitem__', '__getslice__', '__add__',
'__contains__', '__len__', '__iter__', '__iadd__',
'__str__', '__repr__']),
types.InstanceType: _instanceChecker,
Proxy: NoProxy,
types.ClassType: _classChecker,
types.FunctionType: _callableChecker,
types.MethodType: _callableChecker,
types.BuiltinFunctionType: _callableChecker,
types.BuiltinMethodType: _callableChecker,
type: _typeChecker,
types.ModuleType: _moduleChecker,
type(iter([])): _iteratorChecker, # same types in Python 2.2.1,
type(iter(())): _iteratorChecker, # different in Python 2.3
type(iter(_Sequence())): NamesChecker(['next']),
type(Interface): _interfaceChecker,
datetime.timedelta: NamesChecker(['__repr__', '__str__', '__add__',
'__radd__', '__sub__', '__rsub__',
'__neg__', '__pos__', '__abs__',
'__mul__', '__rmul__', '__div__',
'__floordiv__', '__cmp__', 'days',
'seconds', 'microseconds']),
datetime.date: NamesChecker(['__repr__', '__str__', 'year', 'month', 'day',
'timetuple', 'toordinal', '__cmp__',
'__hash__', 'ctime', 'strftime', '__add__',
'__radd__', '__sub__', '__rsub__', 'weekday',
'isoweekday', 'isocalendar', 'isoformat',
'min', 'max', 'resolution']),
datetime.datetime: NamesChecker(['__repr__', '__str__', 'year', 'month',
'day', 'hour', 'minute', 'second',
'microsecond', 'timetuple',
'toordinal', '__cmp__',
'__hash__', 'ctime', 'strftime',
'__add__', '__radd__', '__sub__',
'__rsub__', 'weekday', 'isoweekday',
'isocalendar', 'isoformat', 'min', 'max',
'resolution']),
datetime.datetimetz: NamesChecker(['__repr__', '__str__', 'year', 'month',
'day', 'hour', 'minute', 'second',
'microsecond', 'tzinfo', 'timetuple',
'utctimetuple', 'toordinal', '__cmp__',
'__hash__', 'ctime', 'strftime',
'__add__', '__radd__', '__sub__',
'__rsub__', 'weekday', 'isoweekday',
'isocalendar', 'isoformat', 'min',
'max', 'resolution', 'utcoffset',
'tzname', 'dst']),
datetime.time: NamesChecker(['hour', 'minute', 'second', 'microsecond',
'__cmp__', '__hash__', '__repr__', '__str__',
'isoformat', 'strftime', 'min', 'max',
'resolution']),
datetime.timetz: NamesChecker(['hour', 'minute', 'second', 'microsecond',
'__cmp__', '__hash__', '__repr__',
'__str__', 'isoformat', 'strftime', 'min',
'max', 'resolution', 'tzinfo', 'utcoffset',
'tzname', 'dst'])
}
def _clear():
_checkers.clear()
_checkers.update(_default_checkers)
_checkers.update(BasicTypes)
_clear()
from zope.testing.cleanup import addCleanUp
addCleanUp(_clear)
=== Added File Zope3/src/zope/security/interfaces.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from zope.interface import Interface
class ISecurityManagementSetup( Interface ):
"""
Infrastructure (including tests, etc.) calls these things to
tweak the security manager.
"""
def newSecurityManager( user ):
"""
Install a new SecurityManager, using user. Return the
old SecurityManager, if any, or None.
"""
def replaceSecurityManager( old_manager ):
"""
Replace the SecurityManager with 'old_manager', which
must implement ISecurityManager.
"""
def noSecurityManager():
"""
Clear any existing SecurityManager.
"""
class ISecurityManagement( Interface ):
"""
"Public" SM API.
"""
def getSecurityManager():
"""
Get a SecurityManager (create if needed).
"""
def setSecurityPolicy( aSecurityPolicy ):
"""
Set the system default security policy.
This method should only be called by system startup code.
It should never, for example, be called during a web request.
"""
"""
$Id: interfaces.py,v 1.1.2.1 2002/12/23 19:33:15 jim Exp $
"""
from zope.interface import Interface
class ISecurityProxyFactory(Interface):
def __call__(object, checker=None):
"""Create a security proxy
If a checker is given, then use it, otherwise, try to figure
out a checker.
If the object is already a security proxy, then it will be
returned.
"""
from zope.interface import Interface
# XXX This interface has too muct Zope application dependence. This
# needs to be refactored somehow.
class ISecurityManager( Interface ):
"""
A security manager provides methods for checking access and managing
executable context and policies.
"""
def getPrincipal():
"""
Return the authenticated principal.
This is equivalent to something like::
REQUEST['AUTHENTICATED_USER']
but is a bit cleaner, especially if 'REQUEST' isn't handy.
"""
def checkPermission( permission, object ):
"""
Check whether the security context allows the given
permission on the given object. Return a boolean value.
Arguments:
permission -- A permission name
object -- The object being accessed according to the permission
"""
def pushExecutable( anExecutableObject ):
"""
Push an ExecutableObject onto the manager's stack, and
activate its custom security policy, if any.
"""
def popExecutable( anExecutableObject ):
"""
Pop the topmost ExecutableObject from the stack, deactivating
any custom security policy it might have installed.
"""
def calledByExecutable():
"""
Return a boolean indicating whether the current request has
invoked any IExecutableObjects.
This can be used to determine if an object was called
(more or less) directly from a URL, or if it was called by
through-the-web provided code.
"""
"""
$Id: interfaces.py,v 1.1.2.1 2002/12/23 19:33:15 jim Exp $
"""
from zope.interface import Interface
class IChecker(Interface):
"""Security-proxy plugin objects that implement low-level checks
The checker is responsible for creating proxies for
operation return values, via the proxy method.
There are check_getattr() and check_setattr() methods for checking
getattr and setattr, and a check() method for all other operations.
The check methods may raise errors. They return no value.
Example (for __getitem__):
checker.check(ob, "__getitem__")
return checker.proxy(ob[key])
"""
def check_getattr(ob, name):
"""Check whether attribute access is allowed.
"""
def check_setattr(ob, name):
"""Check whether attribute assignment is allowed.
"""
def check(ob, operation):
"""Check whether operation is allowed.
The operation name is the Python special method name,
e.g. "__getitem__".
"""
def proxy(value):
"""Return a security proxy for the value.
"""
from zope.interface import Interface
class ISecurityPolicy( Interface ):
def checkPermission( permission
, object
, context
):
"""
Check whether the security context allows the given permission on
the given object, returning a boolean value.
Arguments:
permission -- A permission name
object -- The object being accessed according to the permission
context -- A SecurityContext, which provides access to information
shuch as the context stack and AUTHENTICATED_USER.
"""
from zope.interface import Interface
from zope.interface.element import Attribute
class ISecurityContext( Interface ):
"""
Capture transient request-specific security information.
"""
Attribute( 'stack'
, 'A stack of elements, each either be an ExecutableObject'
'or a tuple consisting of an ExecutableObject and a'
'custom SecurityPolicy.'
)
Attribute( 'user'
, 'The AUTHENTICATED_USER for the request.'
)
=== Added File Zope3/src/zope/security/proxy.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: proxy.py,v 1.1.2.1 2002/12/23 19:33:15 jim Exp $
"""
from zope.security._proxy import getObject, getChecker
from zope.security._proxy import _Proxy as Proxy
from zope.security.checker import ProxyFactory, Checker as _trustedChecker
def trustedRemoveSecurityProxy(object):
if ((type(object) is Proxy) and
isinstance(getChecker(object), _trustedChecker)
):
return getObject(object)
return object
def getTestProxyItems(proxy):
"""Try to get checker names and permissions for testing
If this succeeds, a sorted sequence of items is returned,
otherwise, None is retirned.
"""
checker = getChecker(proxy)
func = checker.getPermission_func()
dict = getattr(func, '__self__', None)
if dict is None:
return None
items = dict.items()
items.sort()
return items
=== Added File Zope3/src/zope/security/readme.txt ===
Zope3 Security
Introduction
The Security framework provides a generic mechanism to implement
security policies on Python objects. This introduction provides a
tutorial of the framework explaining concepts, design, and going
through sample usage from the perspective of a Python programmer
using the framework outside of Zope.
Definitions
Principal
A generalization of a concept of a user. A principal may be
associated with different roles and permissions.
Permission
A kind of access, i.e. permission to READ vs. permission to
WRITE. Fundamentally the whole security framework is organized
around checking permissions on objects.
Roles
Represents a responsibility of a user in the context of an
object. Roles are associated with the permissions necessary to
fulfill the user's responsibility.
Purpose
The security framework's primary purpose is to guard and check
access to Python objects. It does this by providing mechanisms
for explicit and implicit security checks on attribute access for
objects. Attribute names are mapped onto permission names when
checking access and the implementation of the security check is
defined by the security policy, which receives the object, the
permission name, and a context.
Security contexts are containers of transient information such as
the current principal and the context stack.
To explain the concept and usage of the context stack, a little
background into the design influences of the default Zope policy
is needed, namely the Java language security model. Within the
base language, code is associated with identifiers. I.e. this code
came from "Joe Schmoe", and another code archive comes signed from
Verisign. When executing restricted code, it's important access
is checked not only for the code currently executing but for the
entire call/context stack (unless explicitly short-circuited).
I.e. if Joe Schmoe's code does haven't access to the filesystem,
but if the Verisign code does, Joe's code could circumvent the
security policy by accessing the filesystem via the Verisign code.
Its important to keep in mind that the policy provided is just a
default, and it can be substituted with one which doesn't care
about principals or context stacks at all.
Framework Components
Low Level Components
These components provide the infrastructure for guarding
attribute access and providing hooks into the higher level
security framework.
Checkers
A checker is associated with an object kind, and provides the
hooks that map attribute checks onto permissions deferring to
the security manager (which in turn defers to the policy) to
perform the check.
Additionally, checkers provide for creating proxies of objects
associated with the checker.
There are several implementation variants of checkers, such as
checkers that grant access based on attribute names.
Proxies
Wrappers around Python objects that implicitly guard access to
their wrapped contents by delegating to their associated
checker. Proxies are also viral in nature, in that values
returned by proxies are also proxied.
High Level Components
Security Management
Provides accessors for setting up security manager and global
security policy.
Security Context
Stores transient information on the current principal and the
context stack.
Security Manager
Manages security context (execution stack) and delegates
permission checks to security policy.
Security Policy
Provides a single method that accepts the object, the
permission, and the context of the access being checked and is
used to implement the application logic for the security
framework.
Narrative (agent sandbox)
As an example we take a look at constructing a multi-agent
distributed system, and then adding a security layer using the
Zope security model onto it.
Scenario
Our agent simulation consists of autonomous agents that live in
various agent homes/sandboxes and perform actions that access
services available at their current home. Agents carry around
authentication tokens which signify their level of access within
any given home. Additionally agents attempt to migrate from
home to home randomly.
The agent simulation was constructed separately from any
security aspects. now we want to define and integrate a
security model into the simulation. The full code for the
simulation and the security model is available separately; we
present only relevant code snippets here for illustration as we
go through the implementation process.
For the agent simulation we want to add a security model such
that we group agents into two authentication groups, "norse
legends", including the principals thor, odin, and loki, and
"greek men", including prometheus, archimedes, and thucydides.
We associate permissions with access to services and homes. We
differentiate the homes such that certain authentication groups
only have access to services or the home itself based on the
local settings of the home in which they reside.
We define the homes/sandboxes
- origin - all agents start here, and have access to all
services here.
- valhalla - only agents in the authentication group 'norse
legend' can reside here.
- jail - all agents can come here, but only 'norse legend's
can leave or access services.
Process
Loosely we define a process for implementing this security model
- mapping permissions onto actions
- mapping authentication tokens onto permissions
- implementing checkers and security policies that use our
authentication tokens and permissions.
- binding checkers to our simulation classes
- inserting the hooks into the original simulation code to add
proxy wrappers to automatically check security.
- inserting hooks into the original simulation to register the
agents as the active principal within a security manager's
context....
Defining Permission Model
We define the following permissions::
NotAllowed = 'Not Allowed'
Public = Checker.CheckerPublic
TransportAgent = 'Transport Agent'
AccessServices = 'Access Services'
AccessAgents = 'Access Agents'
AccessTimeService = 'Access Time Services'
AccessAgentService = 'Access Agent Service'
AccessHomeService = 'Access Home Service'
and create a dictionary database mapping homes to authentication
groups which are linked to associated permissions.
Defining and Binding Checkers
Checkers are the foundational unit for the security framework.
They define what attributes can be accessed or set on a given
instance. They can be used implicitly via Proxy objects, to
guard all attribute access automatically or explicitly to check a
given access for an operation.
Checker construction expects two functions or dictionaries, one
is used to map attribute names to permissions for attribute
access and another to do the same for setting attributes.
We use the following checker factory function::
def PermissionMapChecker(permissions_map={},
setattr_permission_func=NoSetAttr):
res = {}
for k,v in permissions_map.items():
for iv in v:
res[iv]=k
return Checker.Checker(res.get, setattr_permission_func)
time_service_checker = PermissionMapChecker(
# permission : [methods]
{'AccessTimeService':['getTime']}
)
with the NoSetAttr function defined as a lambda which always
return the permission NotAllowed
To bind the checkers to the simulation classes we register our
checkers with the security model's global checker registry::
import sandbox_simulation
from Zope.Security.Checker import defineChecker
defineChecker(sandbox_simulation.TimeService, time_service_checker)
Defining a Security Policy
We implement our security policy such that it checks the current
agent's authentication token against the given permission in the
home of the object being accessed::
class SimulationSecurityPolicy:
__implements__ = ISecurityPolicy
def checkPermission(self, permission, object, context):
token = context.user.getAuthenticationToken()
home = object.getHome()
db = getattr(SimulationSecurityDatabase, home.getId(), None)
if db is None:
return False
allowed = db.get('any', ())
if permission in allowed or ALL in allowed:
return True
allowed = db.get(token, ())
if permission in allowed:
return True
return False
There is some additional code present to allow for shortcuts in
defining the permission database when defining permissions for
all auth groups and all permissions.
Integration
At this point we have implemented our security model, and we
need to integrate it with our simulation model. We do so in
three separate steps.
First we make it such that agents only access homes that are
wrapped in a security proxy. By doing this all access to homes
and services (proxies have proxied return values for their
methods) is implicitly guarded by our security policy.
The second step is that we want to associate the active agent
with the security context so the security policy will know which
agent's authentication token to validate against.
The third step is to set our security policy as the default
policy for the Zope security framework. It is possible to
create custom security policies at a finer grained than global,
but such is left as an exercise for the reader.
Security Manager Access
The *default* implementation of the security management
interfaces defines security managers on a per thread basis with
a function for an accessor. This model is not appropriate for
all systems, as it restricts one to a single active user per
thread at any given moment. Reimplementing the manager access
methods though is easily doable and is noted here for
completeness.
Perspectives
It's important to keep in mind that there is a lot more that is
possible using the security framework than what's been presented
here. All of the interactions are interface based, such that if
you need to re-implement the semantics to suite your application
a new implementation of the interface will be sufficient.
Additional possibilities range from restricted interpreters and
dynamic loading of untrusted code to non Zope web application
security systems. Insert imagination here ;-).
Zope Perspective
A Zope3 programmer will never commonly need to interact with the
low level security framework. Zope3 defines a second security
package over top the low level framework that implements concepts
of roles, and authentication sources and checkers are handled
via zcml registration. Still those developing Zope3 will
hopefully find this useful as an introduction into the
underpinnings of the security framework.
Code
The complete code for this example is available.
sandbox.py - the agent framework
sandbox_security.py - the security implementation and binding to
the agent framework.
Author
Kapil Thangavelu <hazmat at objectrealms.net>
Guido Wesdorp <guido at infrae.com>
=== Added File Zope3/src/zope/security/restrictedbuiltins.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
Revision information:
$Id: restrictedbuiltins.py,v 1.1.2.1 2002/12/23 19:33:15 jim Exp $
"""
import sys
def RestrictedBuiltins():
from zope.security.proxy import ProxyFactory
from zope.security.checker import NamesChecker
# It's better to say what is safe than it say what is not safe
_safe = [
'ArithmeticError', 'AssertionError', 'AttributeError',
'DeprecationWarning', 'EOFError', 'Ellipsis', 'EnvironmentError',
'Exception', 'FloatingPointError', 'IOError', 'ImportError',
'IndentationError', 'IndexError', 'KeyError', 'KeyboardInterrupt',
'LookupError', 'MemoryError', 'NameError', 'None', 'NotImplemented',
'NotImplementedError', 'OSError', 'OverflowError', 'OverflowWarning',
'ReferenceError', 'RuntimeError', 'RuntimeWarning', 'StandardError',
'StopIteration', 'SyntaxError', 'SyntaxWarning', 'SystemError',
'SystemExit', 'TabError', 'TypeError', 'UnboundLocalError',
'UnicodeError', 'UserWarning', 'ValueError', 'Warning',
'ZeroDivisionError',
'__debug__', '__doc__', '__name__', 'abs', 'apply', 'buffer',
'callable', 'chr', 'classmethod', 'cmp', 'coerce', 'compile',
'complex', 'copyright', 'credits', 'delattr', 'dict',
'divmod', 'eval', 'filter', 'float', 'getattr', 'globals',
'hasattr', 'hash', 'hex', 'id', 'int', 'isinstance',
'issubclass', 'iter', 'len', 'license', 'list', 'locals',
'long', 'map', 'max', 'min', 'object', 'oct', 'ord', 'pow',
'property', 'quit', 'range', 'reduce', 'repr', 'round',
'setattr', 'slice', 'staticmethod', 'str', 'super', 'tuple',
'type', 'unichr', 'unicode', 'vars', 'xrange', 'zip',
]
# XXX dir segfaults with a seg fault due to a bas tuple check in
# merge_class_dict in object.c. The assert macro seems to be doing
# the wrong think. Basically, if an object has bases, then bases
# is assumed to be a tuple.
# Anything that accesses an external file is a no no:
# 'open', 'execfile', 'file'
# We dont want restricted code to call exit: 'SystemExit', 'exit'
# Other no nos:
# help prints
# input does I/O
# raw_input does I/O
# intern's effect is too global
# reload does import, XXX doesn't it use __import__?
_builtinTypeChecker = NamesChecker(
['__str__', '__repr__', '__name__', '__module__',
'__bases__', '__call__'])
import __builtin__
builtins = {}
for name in _safe:
value = getattr(__builtin__, name)
if isinstance(value, type):
value = ProxyFactory(value, _builtinTypeChecker)
else:
value = ProxyFactory(value)
builtins[name] = value
def __import__(name, globals=None, locals=None, fromlist=()):
# Waaa, we have to emulate __import__'s weird semantics.
try:
module = sys.modules[name]
if fromlist:
return module
l = name.find('.')
if l < 0:
return module
return sys.modules[name[:l]]
except KeyError:
raise ImportError(name)
builtins['__import__'] = ProxyFactory(__import__)
return builtins
RestrictedBuiltins = RestrictedBuiltins()
=== Added File Zope3/src/zope/security/restrictedinterpreter.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
Revision information:
$Id: restrictedinterpreter.py,v 1.1.2.1 2002/12/23 19:33:15 jim Exp $
"""
import sys
from zope.security.proxy import ProxyFactory
from zope.security.restrictedbuiltins import RestrictedBuiltins
class RestrictedInterpreter:
def __init__(self):
self.globals = {'__builtins__' : RestrictedBuiltins}
def ri_exec(self, code):
# what is the type of code?
exec code in self.globals
=== Added File Zope3/src/zope/security/securitycontext.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Default ISecurityContext impl """
from zope.security.interfaces import ISecurityContext
class SecurityContext:
"""
Capture transient request-specific security information.
Attribute( 'stack'
, 'A stack of elements, each either be an ExecutableObject'
'or a tuple consisting of an ExecutableObject and a'
'custom SecurityPolicy.'
)
Attribute( 'user'
, 'The AUTHENTICATED_USER for the request.'
)
"""
def __init__( self, user ):
self.stack = []
self.user = user
self.objectCache = {}
=== Added File Zope3/src/zope/security/securitymanagement.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Default ISecurityManagement implementation """
system_user = object() # Special system user that has all permissions
from zope.security.interfaces import ISecurityManagement, ISecurityManagementSetup
from zope.security.securitymanager import SecurityManager
from zope.security.securitymanager import setSecurityPolicy as _setSecurityPolicy
from zope.security.securitycontext import SecurityContext
__implements__ = ( ISecurityManagement, ISecurityManagementSetup )
try:
import thread
except:
get_ident=lambda: 0
else:
get_ident=thread.get_ident
_managers={}
from zope.testing.cleanup import addCleanUp
addCleanUp(_managers.clear)
#
# ISecurityManagementSetup implementation
#
def newSecurityManager( user ):
"""
Install a new SecurityManager, using user. Return the
old SecurityManager, if any, or None.
"""
return replaceSecurityManager( SecurityManager( SecurityContext( user ) ) )
def replaceSecurityManager( old_manager ):
"""
Replace the SecurityManager with 'old_manager', which
must implement ISecurityManager.
"""
thread_id = get_ident()
old = _managers.get( thread_id, None )
_managers[ thread_id ] = old_manager
return old
def noSecurityManager():
"""
Clear any existing SecurityManager.
"""
try:
del _managers[ get_ident() ]
except KeyError:
pass
#
# ISecurityManagement implementation
#
def getSecurityManager():
"""
Get a SecurityManager (create if needed).
"""
thread_id = get_ident()
manager=_managers.get( thread_id, None )
if manager is None:
newSecurityManager( None )
manager=_managers.get( thread_id, None )
return manager
def setSecurityPolicy( aSecurityPolicy ):
"""
Set the system default security policy, and return the previous
value.
This method should only be called by system startup code.
It should never, for example, be called during a web request.
"""
return _setSecurityPolicy( aSecurityPolicy )
=== Added File Zope3/src/zope/security/securitymanager.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Default ISecurityManager implementation """
import os
from zope.security.simplesecuritypolicies import ParanoidSecurityPolicy
MAX_STACK_SIZE = 100
_defaultPolicy = ParanoidSecurityPolicy()
def _clear():
global _defaultPolicy
_defaultPolicy = ParanoidSecurityPolicy()
from zope.testing.cleanup import addCleanUp
addCleanUp(_clear)
def setSecurityPolicy( aSecurityPolicy ):
"""
Set the system default security policy.
This method should only be caused by system startup code. It should
never, for example, be called during a web request.
"""
global _defaultPolicy
last, _defaultPolicy = _defaultPolicy, aSecurityPolicy
return last
from zope.security.interfaces import ISecurityManager
class SecurityManager:
"""
A security manager provides methods for checking access and managing
executable context and policies.
"""
__implements__ = ISecurityManager
def __init__( self, context ):
self._context = context
self._policy = None
def _getPolicy( self ):
"""
Find current policy, or default.
"""
policy = self._policy
if policy is None:
policy = _defaultPolicy
return policy
#
# ISecurityManager implementation
#
def getPrincipal( self ):
"""
Return the authenticated user.
This is equivalent to something like::
REQUEST['AUTHENTICATED_USER']
but is a bit cleaner, especially if 'REQUEST' isn't handy.
"""
return self._context.user
def checkPermission( self, permission, object ):
"""
Check whether the security context allows the given
permission on the given object. Return a boolean value.
Arguments:
permission -- A permission name
object -- The object being accessed according to the permission
"""
return self._getPolicy().checkPermission( permission, object
, self._context )
def pushExecutable( self, anExecutableObject ):
"""
Push an ExecutableObject onto the manager's stack, and
activate its custom security policy, if any.
"""
stack=self._context.stack
if len( stack ) >= MAX_STACK_SIZE:
raise SystemError, 'Excessive recursion'
stack.append( anExecutableObject )
p = getattr( anExecutableObject, '_customSecurityPolicy', None )
if p is not None:
p = p()
self._policy = p
def popExecutable( self, anExecutableObject ):
"""
Pop the topmost ExecutableObject from the stack, deactivating
any custom security policy it might have installed.
"""
stack=self._context.stack
if not stack:
return
top = stack[-1]
if top is anExecutableObject:
del stack[-1]
else:
indexes = range(len(stack))
indexes.reverse()
for i in indexes:
top=stack[i]
if top is anExecutableObject:
del stack[i:]
break
else:
return
if stack:
top = stack[-1]
p = getattr( top, '_customSecurityPolicy', None )
if p is not None:
p=p()
self._policy=p
else:
self._policy=None
def calledByExecutable( self ):
"""
Return a boolean indicating whether the current request has
invoked any IExecutableObjects.
This can be used to determine if an object was called
(more or less) directly from a URL, or if it was called by
through-the-web provided code.
"""
return len( self._context.stack )
=== Added File Zope3/src/zope/security/setup.py ===
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from distutils.core import setup, Extension
setup(name="_Proxy", version = "0.1",
ext_modules=[Extension("_Proxy", ["_Proxy.c"])])
=== Added File Zope3/src/zope/security/simplesecuritypolicies.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Simple ISecurityPolicy implementations."""
from zope.security.interfaces import ISecurityPolicy
from zope.exceptions import Unauthorized
from zope.security.securitymanagement import system_user
class ParanoidSecurityPolicy:
"""
Deny all access.
"""
__implements__ = ISecurityPolicy
def checkPermission( sel, permission, object, context ):
if (context.user is system_user # no user
and not context.stack # no untrusted code
):
return 1 # Nobody not to trust!
return 0
class PermissiveSecurityPolicy:
"""
Allow all access
"""
__implements__ = ISecurityPolicy
def checkPermission( self, permission, object, context ):
return 1