[Zope3-checkins] CVS: Zope3/lib/python/Zope/Security/examples - sandbox.py:1.1 sandbox_security.py:1.1
Kapil Thangavelu
kvthan@wm.edu
Fri, 6 Dec 2002 07:11:36 -0500
Update of /cvs-repository/Zope3/lib/python/Zope/Security/examples
In directory cvs.zope.org:/tmp/cvs-serv21232/examples
Added Files:
sandbox.py sandbox_security.py
Log Message:
adding in a readme and example showing use of security package outside
of zope.
=== Added File Zope3/lib/python/Zope/Security/examples/sandbox.py ===
import time, whrandom
from Interface import Interface
class IAgent(Interface):
"""
represents an autonomous unit, that lives in
various homes/sandboxes and accesses services
present at the sandboxes. agents are imbued with
a sense of wanderlust and attempt to find new homes
after a few turns of the time generator
(think turn based games).
"""
def action():
" agent performs their action "
def setHome(self, home):
" agent moves from home to home"
def getHome():
" where does this agent live "
def getAuthenticationToken(self):
" by what authority should the agent perform actions "
class IService(Interface):
"""
marker interface. services are available from sandboxes,
examples include time service, agent discovery, and sandbox
discovery.
"""
class ISandbox(Interface):
"""
a container for agents and services.
"""
def getService(service_id):
" retrieve a service offered by this sandbox "
def getAgents():
" what agents live in this sandbox "
def addAgent(agent):
" add an agent to this sandbox "
def transportAgent(agent, destination):
" move an agent to the destination sandbox "
class Identity:
"""
mixin for pretty printing and identity method
"""
def __init__(self, id, *args, **kw):
self.id = id
def getId(self):
return self.id
def __str__ (self):
return "<%s> %s"%(str(self.__class__.__name__), str(self.id))
__repr__ = __str__
class Agent(Identity):
"""
see IAgent doc
"""
__implements__ = IAgent
def __init__(self, id, home, auth_token, action):
self.id = id
self.auth_token = auth_token
self.home = home
self._action = action
def action(self):
self._action(self, self.getHome())
def setHome(self, home):
self.home = home
def getHome(self):
return self.home
def getAuthenticationToken(self):
return self.auth_token
class SandboxError(Exception): pass
class Sandbox(Identity):
"""
see ISandbox doc
"""
__implements__ = ISandbox
def __init__(self, id, service_factories):
self.id = id
self._services = {}
self._agents = {}
for sf in service_factories:
self.addService(sf())
def getAgentIds(self):
return self._agents.keys()
def getAgents(self):
return self._agents.values()
def getServiceIds(self):
return self._services.keys()
def getService(self, sid):
return self._services.get(sid)
def getHome(self):
return self
def addAgent(self, agent):
if not self._agents.has_key(agent.getId()) \
and IAgent.isImplementedBy(agent):
self._agents[agent.getId()]=agent
agent.setHome(self)
else:
raise SandboxError("couldn't add agent %s"%agent)
def addService(self, service):
if not self._services.has_key(service.getId()) \
and IService.isImplementedBy(service):
self._services[service.getId()]=service
service.setHome(self)
else:
raise SandboxError("couldn't add service %s"%service)
def transportAgent(self, agent, destination):
if self._agents.has_key(agent.getId()) \
and destination is not self \
and ISandbox.isImplementedBy(destination):
destination.addAgent(agent)
del self._agents[agent.getId()]
else:
raise SandboxError("couldn't transport agent %s to %s"%(
agent, destination)
)
class Service:
__implements__ = IService
def getId(self):
return self.__class__.__name__
def setHome(self, home):
self._home = home
def getHome(self):
return getattr(self, '_home')
class HomeDiscoveryService(Service):
"""
returns the ids of available agent homes
"""
def getAvailableHomes(self):
return _homes.keys()
class AgentDiscoveryService(Service):
"""
returns the agents available at a given home
"""
def getLocalAgents(self, home):
return home.getAgents()
class TimeService(Service):
"""
returns the local time
"""
def getTime(self):
return time.time()
default_service_factories = (
HomeDiscoveryService,
AgentDiscoveryService,
TimeService
)
def action_find_homes(agent, home):
home_service = home.getService('HomeDiscoveryService')
return home_service.getAvailableHomes()
def action_find_neighbors(agent, home):
agent_service = home.getService('AgentDiscoveryService')
return agent_service.getLocalAgents(home)
def action_find_time(agent, home):
time_service = home.getService('TimeService')
return time_service.getTime()
class TimeGenerator:
"""
Represents the passage of time in the agent simulation.
each turn represents some discrete unit of time, during
which all agents attempt to perform their action. Additionally,
all agents are checked to see if they have a desire to move,
and if so are transported to a new random home.
"""
def setupAgent(self, agent):
pass
def turn(self):
global _homes
for h in _homes.values():
agents = h.getAgents()
for a in agents:
self.setupAgent(a)
try:
a.action()
except Exception, e:
print a, h, e
agents = filter(WanderLust, agents)
for a in agents:
try:
self.setupAgent(a)
home = a.getHome()
new_home = GreenerPastures(a)
home.transportAgent(a, new_home)
except Exception, e:
print 'moving', a, h, new_home, e
def WanderLust(agent):
""" is agent ready to move """
if int(whrandom.random()*100) <= 30:
return 1
def GreenerPastures(agent):
""" where do they want to go today """
global _homes
possible_homes = _homes.keys()
possible_homes.remove(agent.getHome().getId())
return _homes.get(whrandom.choice(possible_homes))
# boot strap initial setup.
# global list of homes
_homes = {}
all_homes = (
Sandbox('jail', default_service_factories),
Sandbox('origin', default_service_factories),
Sandbox('valhalla', default_service_factories)
)
origin = all_homes[1]
for h in all_homes:
_homes[h.getId()]=h
agents = [
Agent('odin', None, 'norse legend', action_find_time),
Agent('loki', None, 'norse legend', action_find_neighbors),
Agent('thor', None, 'norse legend', action_find_homes),
Agent('thucydides', None, 'greek men', action_find_time),
Agent('archimedes', None, 'greek men', action_find_neighbors),
Agent('prometheus', None, 'greek men', action_find_homes),
]
for a in agents:
origin.addAgent(a)
def main():
world = TimeGenerator()
for x in range(5):
print 'world turning'
world.turn()
for h in _homes.values():
print h.getId(), h.getAgentIds()
if __name__ == '__main__':
main()
=== Added File Zope3/lib/python/Zope/Security/examples/sandbox_security.py ===
import sandbox
from Zope.Security import ISecurityPolicy, Checker, IChecker
#################################
# 1. map permissions to actions
# 2. map authentication tokens/principals onto permissions
# 3. implement checker and security policies that affect 1,2
# 4. bind checkers to classes/instances
# 5. proxy wrap as nesc.
#################################
#################################
# permissions
NotAllowed = 'Not Allowed'
Public = Checker.CheckerPublic
TransportAgent = 'Transport Agent'
AccessServices = 'Access Services'
AccessAgents = 'Access Agents'
AccessTimeService = 'Access Time Services'
AccessAgentService = 'Access Agent Service'
AccessHomeService = 'Access Home Service'
AddAgent = 'Add Agent'
ALL='All'
NoSetAttr = lambda name: NotAllowed
#################################
# location -> auth token -> permission mapping
class SimulationSecurityDatabase:
origin = {
'any':[ALL]
}
jail = {
'norse legend':[TransportAgent,
AccessServices,
AccessAgentService,
AccessHomeService,
TransportAgent,
AccessAgents,],
'any':[AccessTimeService, AddAgent],
}
valhalla = {
'norse legend':[AddAgent],
'any': [AccessServices,
AccessTimeService,
AccessAgentService,
AccessHomeService,
TransportAgent,
AccessAgents,]
}
class SimulationSecurityPolicy:
__implements__ = ISecurityPolicy
def checkPermission(self, permission, object, context):
token = context.user.getAuthenticationToken()
home = object.getHome()
db = getattr(SimulationSecurityDatabase, home.getId(), None)
if db is None:
return False
allowed = db.get('any', ())
if permission in allowed or ALL in allowed:
return True
allowed = db.get(token, ())
if permission in allowed:
return True
return False
def PermissionMapChecker(permissions_map={}, setattr_permission_func=NoSetAttr):
res = {}
for k,v in permissions_map.items():
for iv in v:
res[iv]=k
return Checker.Checker(res.get, setattr_permission_func)
#################################
# sandbox security settings
sandbox_security = {AccessServices:['getService', 'addService', 'getServiceIds'],
AccessAgents:['getAgentsIds', 'getAgents'],
AddAgent:['addAgent'],
TransportAgent:['transportAgent'],
Public:['getId','getHome']
}
sandbox_checker = PermissionMapChecker(sandbox_security)
#################################
# service security settings
# time service
tservice_security = { AccessTimeService:['getTime'] }
time_service_checker = PermissionMapChecker(tservice_security)
# home service
hservice_security = { AccessHomeService:['getAvailableHomes'] }
home_service_checker = PermissionMapChecker(hservice_security)
# agent service
aservice_security = { AccessAgentService:['getLocalAgents'] }
agent_service_checker = PermissionMapChecker(aservice_security)
def wire_security():
from Zope.Security import SecurityManagement
SecurityManagement.setSecurityPolicy(SimulationSecurityPolicy())
import sandbox
Checker.defineChecker(sandbox.Sandbox, sandbox_checker)
Checker.defineChecker(sandbox.TimeService, time_service_checker)
Checker.defineChecker(sandbox.AgentDiscoveryService, agent_service_checker)
Checker.defineChecker(sandbox.HomeDiscoveryService, home_service_checker)
def addAgent(self, agent):
if not self._agents.has_key(agent.getId()) \
and sandbox.IAgent.isImplementedBy(agent):
self._agents[agent.getId()]=agent
checker = Checker.selectChecker(self)
wrapped_home = checker.proxy(self)
agent.setHome(wrapped_home)
else:
raise SandboxError("couldn't add agent %s"%agent)
sandbox.Sandbox.addAgent = addAgent
def setupAgent(self, agent):
SecurityManagement.newSecurityManager(agent)
sandbox.TimeGenerator.setupAgent = setupAgent
def GreenerPastures(agent):
""" where do they want to go today """
import whrandom
_homes = sandbox._homes
possible_homes = _homes.keys()
possible_homes.remove(agent.getHome().getId())
new_home = _homes.get(whrandom.choice(possible_homes))
return Checker.selectChecker(new_home).proxy(new_home)
sandbox.GreenerPastures = GreenerPastures
if __name__ == '__main__':
wire_security()
sandbox.main()