[Zope3-checkins] CVS: Zope3/src/zope/security/examples - __init__.py:1.2 sandbox.py:1.2 sandbox_security.py:1.2
Jim Fulton
jim@zope.com
Wed, 25 Dec 2002 09:15:53 -0500
Update of /cvs-repository/Zope3/src/zope/security/examples
In directory cvs.zope.org:/tmp/cvs-serv20790/src/zope/security/examples
Added Files:
__init__.py sandbox.py sandbox_security.py
Log Message:
Grand renaming:
- Renamed most files (especially python modules) to lower case.
- Moved views and interfaces into separate hierarchies within each
project, where each top-level directory under the zope package
is a separate project.
- Moved everything to src from lib/python.
lib/python will eventually go away. I need access to the cvs
repository to make this happen, however.
There are probably some bits that are broken. All tests pass
and zope runs, but I haven't tried everything. There are a number
of cleanups I'll work on tomorrow.
=== Zope3/src/zope/security/examples/__init__.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:53 2002
+++ Zope3/src/zope/security/examples/__init__.py Wed Dec 25 09:15:22 2002
@@ -0,0 +1,2 @@
+#
+# This file is necessary to make this directory a package.
=== Zope3/src/zope/security/examples/sandbox.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:53 2002
+++ Zope3/src/zope/security/examples/sandbox.py Wed Dec 25 09:15:22 2002
@@ -0,0 +1,275 @@
+import time, whrandom
+
+from zope.interface import Interface
+
+class IAgent(Interface):
+ """
+ represents an autonomous unit, that lives in
+ various homes/sandboxes and accesses services
+ present at the sandboxes. agents are imbued with
+ a sense of wanderlust and attempt to find new homes
+ after a few turns of the time generator
+ (think turn based games).
+ """
+
+ def action():
+ " agent performs their action "
+ def setHome(self, home):
+ " agent moves from home to home"
+ def getHome():
+ " where does this agent live "
+ def getAuthenticationToken(self):
+ " by what authority should the agent perform actions "
+
+class IService(Interface):
+ """
+ marker interface. services are available from sandboxes,
+ examples include time service, agent discovery, and sandbox
+ discovery.
+ """
+
+class ISandbox(Interface):
+ """
+ a container for agents and services.
+ """
+ def getService(service_id):
+ " retrieve a service offered by this sandbox "
+ def getAgents():
+ " what agents live in this sandbox "
+ def addAgent(agent):
+ " add an agent to this sandbox "
+ def transportAgent(agent, destination):
+ " move an agent to the destination sandbox "
+
+class Identity:
+ """
+ mixin for pretty printing and identity method
+ """
+ def __init__(self, id, *args, **kw):
+ self.id = id
+ def getId(self):
+ return self.id
+ def __str__ (self):
+ return "<%s> %s"%(str(self.__class__.__name__), str(self.id))
+ __repr__ = __str__
+
+class Agent(Identity):
+ """
+ see IAgent doc
+ """
+ __implements__ = IAgent
+
+ def __init__(self, id, home, auth_token, action):
+ self.id = id
+ self.auth_token = auth_token
+ self.home = home
+ self._action = action
+
+ def action(self):
+ self._action(self, self.getHome())
+
+ def setHome(self, home):
+ self.home = home
+
+ def getHome(self):
+ return self.home
+
+ def getAuthenticationToken(self):
+ return self.auth_token
+
+class SandboxError(Exception): pass
+
+class Sandbox(Identity):
+ """
+ see ISandbox doc
+ """
+ __implements__ = ISandbox
+
+ def __init__(self, id, service_factories):
+ self.id = id
+ self._services = {}
+ self._agents = {}
+
+ for sf in service_factories:
+ self.addService(sf())
+
+ def getAgentIds(self):
+ return self._agents.keys()
+ def getAgents(self):
+ return self._agents.values()
+ def getServiceIds(self):
+ return self._services.keys()
+ def getService(self, sid):
+ return self._services.get(sid)
+ def getHome(self):
+ return self
+
+
+ def addAgent(self, agent):
+ if not self._agents.has_key(agent.getId()) \
+ and IAgent.isImplementedBy(agent):
+ self._agents[agent.getId()]=agent
+ agent.setHome(self)
+ else:
+ raise SandboxError("couldn't add agent %s"%agent)
+
+ def addService(self, service):
+
+ if not self._services.has_key(service.getId()) \
+ and IService.isImplementedBy(service):
+ self._services[service.getId()]=service
+ service.setHome(self)
+ else:
+ raise SandboxError("couldn't add service %s"%service)
+
+ def transportAgent(self, agent, destination):
+ if self._agents.has_key(agent.getId()) \
+ and destination is not self \
+ and ISandbox.isImplementedBy(destination):
+ destination.addAgent(agent)
+ del self._agents[agent.getId()]
+ else:
+ raise SandboxError("couldn't transport agent %s to %s"%(
+ agent, destination)
+ )
+
+class Service:
+ __implements__ = IService
+ def getId(self):
+ return self.__class__.__name__
+ def setHome(self, home):
+ self._home = home
+ def getHome(self):
+ return getattr(self, '_home')
+
+class HomeDiscoveryService(Service):
+ """
+ returns the ids of available agent homes
+ """
+ def getAvailableHomes(self):
+ return _homes.keys()
+
+class AgentDiscoveryService(Service):
+ """
+ returns the agents available at a given home
+ """
+ def getLocalAgents(self, home):
+ return home.getAgents()
+
+class TimeService(Service):
+ """
+ returns the local time
+ """
+ def getTime(self):
+ return time.time()
+
+default_service_factories = (
+ HomeDiscoveryService,
+ AgentDiscoveryService,
+ TimeService
+ )
+
+def action_find_homes(agent, home):
+ home_service = home.getService('HomeDiscoveryService')
+ return home_service.getAvailableHomes()
+
+def action_find_neighbors(agent, home):
+ agent_service = home.getService('AgentDiscoveryService')
+ return agent_service.getLocalAgents(home)
+
+def action_find_time(agent, home):
+ time_service = home.getService('TimeService')
+ return time_service.getTime()
+
+class TimeGenerator:
+ """
+ Represents the passage of time in the agent simulation.
+
+ each turn represents some discrete unit of time, during
+ which all agents attempt to perform their action. Additionally,
+ all agents are checked to see if they have a desire to move,
+ and if so are transported to a new random home.
+ """
+
+ def setupAgent(self, agent):
+ pass
+
+ def turn(self):
+
+ global _homes
+
+ for h in _homes.values():
+ agents = h.getAgents()
+ for a in agents:
+ self.setupAgent(a)
+ try:
+ a.action()
+ except Exception, e:
+ print a, h, e
+
+ agents = filter(WanderLust, agents)
+
+ for a in agents:
+ try:
+ self.setupAgent(a)
+ home = a.getHome()
+ new_home = GreenerPastures(a)
+ home.transportAgent(a, new_home)
+ except Exception, e:
+ print 'moving', a, h, new_home, e
+
+def WanderLust(agent):
+ """ is agent ready to move """
+ if int(whrandom.random()*100) <= 30:
+ return 1
+
+def GreenerPastures(agent):
+ """ where do they want to go today """
+ global _homes
+ possible_homes = _homes.keys()
+ possible_homes.remove(agent.getHome().getId())
+ return _homes.get(whrandom.choice(possible_homes))
+
+
+# boot strap initial setup.
+
+# global list of homes
+_homes = {}
+
+all_homes = (
+ Sandbox('jail', default_service_factories),
+ Sandbox('origin', default_service_factories),
+ Sandbox('valhalla', default_service_factories)
+)
+
+origin = all_homes[1]
+
+for h in all_homes:
+ _homes[h.getId()]=h
+
+
+agents = [
+ Agent('odin', None, 'norse legend', action_find_time),
+ Agent('loki', None, 'norse legend', action_find_neighbors),
+ Agent('thor', None, 'norse legend', action_find_homes),
+ Agent('thucydides', None, 'greek men', action_find_time),
+ Agent('archimedes', None, 'greek men', action_find_neighbors),
+ Agent('prometheus', None, 'greek men', action_find_homes),
+ ]
+
+for a in agents:
+ origin.addAgent(a)
+
+
+def main():
+ world = TimeGenerator()
+
+ for x in range(5):
+ print 'world turning'
+ world.turn()
+
+ for h in _homes.values():
+ print h.getId(), h.getAgentIds()
+
+if __name__ == '__main__':
+ main()
=== Zope3/src/zope/security/examples/sandbox_security.py 1.1 => 1.2 ===
--- /dev/null Wed Dec 25 09:15:53 2002
+++ Zope3/src/zope/security/examples/sandbox_security.py Wed Dec 25 09:15:22 2002
@@ -0,0 +1,163 @@
+import zope.security.examples.sandbox
+
+# XXX These imports are wrong and were wrong before the renaming
+from zope.security import ISecurityPolicy, Checker, IChecker
+
+#################################
+# 1. map permissions to actions
+# 2. map authentication tokens/principals onto permissions
+# 3. implement checker and security policies that affect 1,2
+# 4. bind checkers to classes/instances
+# 5. proxy wrap as nesc.
+#################################
+
+#################################
+# permissions
+NotAllowed = 'Not Allowed'
+Public = Checker.CheckerPublic
+TransportAgent = 'Transport Agent'
+AccessServices = 'Access Services'
+AccessAgents = 'Access Agents'
+AccessTimeService = 'Access Time Services'
+AccessAgentService = 'Access Agent Service'
+AccessHomeService = 'Access Home Service'
+
+AddAgent = 'Add Agent'
+ALL='All'
+
+NoSetAttr = lambda name: NotAllowed
+
+#################################
+# location -> auth token -> permission mapping
+
+class SimulationSecurityDatabase:
+
+ origin = {
+ 'any':[ALL]
+ }
+
+ jail = {
+ 'norse legend':[TransportAgent,
+ AccessServices,
+ AccessAgentService,
+ AccessHomeService,
+ TransportAgent,
+ AccessAgents,],
+
+ 'any':[AccessTimeService, AddAgent],
+
+ }
+
+ valhalla = {
+ 'norse legend':[AddAgent],
+ 'any': [AccessServices,
+ AccessTimeService,
+ AccessAgentService,
+ AccessHomeService,
+ TransportAgent,
+ AccessAgents,]
+ }
+
+
+class SimulationSecurityPolicy:
+
+ __implements__ = ISecurityPolicy
+
+ def checkPermission(self, permission, object, context):
+ token = context.user.getAuthenticationToken()
+ home = object.getHome()
+ db = getattr(SimulationSecurityDatabase, home.getId(), None)
+
+ if db is None:
+ return False
+
+ allowed = db.get('any', ())
+ if permission in allowed or ALL in allowed:
+ return True
+
+ allowed = db.get(token, ())
+ if permission in allowed:
+ return True
+
+ return False
+
+
+
+def PermissionMapChecker(permissions_map={}, setattr_permission_func=NoSetAttr):
+ res = {}
+ for k,v in permissions_map.items():
+ for iv in v:
+ res[iv]=k
+ return Checker.Checker(res.get, setattr_permission_func)
+
+
+#################################
+# sandbox security settings
+sandbox_security = {AccessServices:['getService', 'addService', 'getServiceIds'],
+ AccessAgents:['getAgentsIds', 'getAgents'],
+ AddAgent:['addAgent'],
+ TransportAgent:['transportAgent'],
+ Public:['getId','getHome']
+ }
+sandbox_checker = PermissionMapChecker(sandbox_security)
+
+#################################
+# service security settings
+
+# time service
+tservice_security = { AccessTimeService:['getTime'] }
+time_service_checker = PermissionMapChecker(tservice_security)
+
+# home service
+hservice_security = { AccessHomeService:['getAvailableHomes'] }
+home_service_checker = PermissionMapChecker(hservice_security)
+
+# agent service
+aservice_security = { AccessAgentService:['getLocalAgents'] }
+agent_service_checker = PermissionMapChecker(aservice_security)
+
+
+def wire_security():
+
+ from zope.security import securitymanagement
+ securitymanagement.setSecurityPolicy(SimulationSecurityPolicy())
+
+ import zope.security.examples.sandbox
+
+ Checker.defineChecker(sandbox.Sandbox, sandbox_checker)
+ Checker.defineChecker(sandbox.TimeService, time_service_checker)
+ Checker.defineChecker(sandbox.AgentDiscoveryService, agent_service_checker)
+ Checker.defineChecker(sandbox.HomeDiscoveryService, home_service_checker)
+
+ def addAgent(self, agent):
+ if not self._agents.has_key(agent.getId()) \
+ and sandbox.IAgent.isImplementedBy(agent):
+ self._agents[agent.getId()]=agent
+ checker = Checker.selectChecker(self)
+ wrapped_home = checker.proxy(self)
+ agent.setHome(wrapped_home)
+ else:
+ raise SandboxError("couldn't add agent %s"%agent)
+
+ sandbox.Sandbox.addAgent = addAgent
+
+ def setupAgent(self, agent):
+ SecurityManagement.newSecurityManager(agent)
+
+ sandbox.TimeGenerator.setupAgent = setupAgent
+
+ def GreenerPastures(agent):
+ """ where do they want to go today """
+ import whrandom
+ _homes = sandbox._homes
+ possible_homes = _homes.keys()
+ possible_homes.remove(agent.getHome().getId())
+ new_home = _homes.get(whrandom.choice(possible_homes))
+ return Checker.selectChecker(new_home).proxy(new_home)
+
+ sandbox.GreenerPastures = GreenerPastures
+
+
+if __name__ == '__main__':
+ wire_security()
+ sandbox.main()