[Zope3-checkins] CVS: Zope3/src/zope/security/examples - __init__.py:1.1.2.1 sandbox.py:1.1.2.1 sandbox_security.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:33:18 -0500


Update of /cvs-repository/Zope3/src/zope/security/examples
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/security/examples

Added Files:
      Tag: NameGeddon-branch
	__init__.py sandbox.py sandbox_security.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zope/security/examples/__init__.py ===
#
# This file is necessary to make this directory a package.


=== Added File Zope3/src/zope/security/examples/sandbox.py ===
import time, whrandom

from zope.interface import Interface

class IAgent(Interface):
    """
    represents an autonomous unit, that lives in
    various homes/sandboxes and accesses services
    present at the sandboxes. agents are imbued with
    a sense of wanderlust and attempt to find new homes
    after a few turns of the time generator
    (think turn based games).
    """
     
    def action():
        " agent performs their action "
    def setHome(self, home):
        " agent moves from home to home"
    def getHome():
        " where does this agent live "
    def getAuthenticationToken(self):
        " by what authority should the agent perform actions "
        
class IService(Interface):
    """
    marker interface. services are available from sandboxes,
    examples include time service, agent discovery, and sandbox
    discovery.
    """
 
class ISandbox(Interface):
    """
    a container for agents and services.
    """
    def getService(service_id):
        " retrieve a service offered by this sandbox "
    def getAgents():
        " what agents live in this sandbox "
    def addAgent(agent):
        " add an agent to this sandbox "
    def transportAgent(agent, destination):
        " move an agent to the destination sandbox "

class Identity:
    """
    mixin for pretty printing and identity method
    """
    def __init__(self, id, *args, **kw):
        self.id = id
    def getId(self):
        return self.id
    def __str__ (self):
        return "<%s> %s"%(str(self.__class__.__name__), str(self.id))
    __repr__ = __str__

class Agent(Identity):
    """
    see IAgent doc
    """
    __implements__ = IAgent
    
    def __init__(self, id, home, auth_token, action):
        self.id = id
        self.auth_token = auth_token
        self.home = home
        self._action = action
        
    def action(self):
        self._action(self, self.getHome())
        
    def setHome(self, home):
        self.home = home
        
    def getHome(self):
        return self.home
    
    def getAuthenticationToken(self):
        return self.auth_token

class SandboxError(Exception): pass
        
class Sandbox(Identity):
    """
    see ISandbox doc
    """
    __implements__ = ISandbox
    
    def __init__(self, id, service_factories):
        self.id = id
        self._services = {}
        self._agents = {}

        for sf in service_factories:
            self.addService(sf())

    def getAgentIds(self):
        return self._agents.keys()
    def getAgents(self):
        return self._agents.values()
    def getServiceIds(self):
        return self._services.keys()
    def getService(self, sid):
        return self._services.get(sid)
    def getHome(self):
        return self


    def addAgent(self, agent):
        if not self._agents.has_key(agent.getId()) \
           and IAgent.isImplementedBy(agent):
            self._agents[agent.getId()]=agent
            agent.setHome(self)
        else:
            raise SandboxError("couldn't add agent %s"%agent)
        
    def addService(self, service):
        
        if not self._services.has_key(service.getId()) \
           and IService.isImplementedBy(service):
            self._services[service.getId()]=service
            service.setHome(self)
        else:
            raise SandboxError("couldn't add service %s"%service)
        
    def transportAgent(self, agent, destination):
        if self._agents.has_key(agent.getId()) \
            and destination is not self \
            and ISandbox.isImplementedBy(destination):            
            destination.addAgent(agent)
            del self._agents[agent.getId()]
        else:
            raise SandboxError("couldn't transport agent %s to %s"%(
                agent, destination)
                               )

class Service:
    __implements__ = IService
    def getId(self):
        return self.__class__.__name__
    def setHome(self, home):
        self._home = home
    def getHome(self):
        return getattr(self, '_home')
    
class HomeDiscoveryService(Service):
    """
    returns the ids of available agent homes
    """
    def getAvailableHomes(self):
        return _homes.keys()

class AgentDiscoveryService(Service):
    """
    returns the agents available at a given home
    """
    def getLocalAgents(self, home):
        return home.getAgents()

class TimeService(Service):
    """
    returns the local time
    """
    def getTime(self):
        return time.time()

default_service_factories = (
    HomeDiscoveryService,
    AgentDiscoveryService,
    TimeService
    )
    
def action_find_homes(agent, home):
    home_service = home.getService('HomeDiscoveryService')
    return home_service.getAvailableHomes()

def action_find_neighbors(agent, home):
    agent_service = home.getService('AgentDiscoveryService')
    return agent_service.getLocalAgents(home)

def action_find_time(agent, home):
    time_service = home.getService('TimeService')
    return time_service.getTime()

class TimeGenerator:
    """
    Represents the passage of time in the agent simulation.

    each turn represents some discrete unit of time, during
    which all agents attempt to perform their action. Additionally,
    all agents are checked to see if they have a desire to move,
    and if so are transported to a new random home.
    """

    def setupAgent(self, agent):
        pass

    def turn(self):

        global _homes
        
        for h in _homes.values():
            agents = h.getAgents()
            for a in agents:
                self.setupAgent(a)
                try:
                    a.action()
                except Exception, e:
                    print a, h, e

            agents = filter(WanderLust, agents)

            for a in agents:
                try:
                    self.setupAgent(a)
                    home = a.getHome()            
                    new_home = GreenerPastures(a)
                    home.transportAgent(a, new_home)
                except Exception, e:
                    print 'moving', a, h, new_home, e                    
                
def WanderLust(agent):
    """ is agent ready to move """
    if int(whrandom.random()*100) <= 30:
        return 1

def GreenerPastures(agent):
    """ where do they want to go today """
    global _homes
    possible_homes = _homes.keys()
    possible_homes.remove(agent.getHome().getId())
    return _homes.get(whrandom.choice(possible_homes))


# boot strap initial setup.

# global list of homes
_homes = {}

all_homes = (
    Sandbox('jail', default_service_factories),
    Sandbox('origin', default_service_factories),
    Sandbox('valhalla', default_service_factories)
)

origin = all_homes[1]

for h in all_homes:
    _homes[h.getId()]=h


agents = [
    Agent('odin', None, 'norse legend', action_find_time),
    Agent('loki', None, 'norse legend', action_find_neighbors),
    Agent('thor', None, 'norse legend', action_find_homes),
    Agent('thucydides', None, 'greek men', action_find_time),
    Agent('archimedes', None, 'greek men', action_find_neighbors),
    Agent('prometheus', None, 'greek men', action_find_homes),    
    ]

for a in agents:
    origin.addAgent(a)


def main():    
    world = TimeGenerator()

    for x in range(5):
        print 'world turning'
        world.turn()

    for h in _homes.values():
        print h.getId(), h.getAgentIds()

if __name__ == '__main__':
    main()


=== Added File Zope3/src/zope/security/examples/sandbox_security.py ===
import zope.security.examples.sandbox
from Zope.Security import ISecurityPolicy, Checker, IChecker

#################################
# 1. map permissions to actions
# 2. map authentication tokens/principals onto permissions
# 3. implement checker and security policies that affect 1,2
# 4. bind checkers to classes/instances
# 5. proxy wrap as nesc.
#################################

#################################
# permissions
NotAllowed = 'Not Allowed'
Public = Checker.CheckerPublic
TransportAgent = 'Transport Agent'
AccessServices = 'Access Services'
AccessAgents = 'Access Agents'
AccessTimeService = 'Access Time Services'
AccessAgentService = 'Access Agent Service'
AccessHomeService = 'Access Home Service'

AddAgent = 'Add Agent'
ALL='All'

NoSetAttr = lambda name: NotAllowed

#################################
# location -> auth token -> permission mapping 

class SimulationSecurityDatabase:
    
    origin = {
        'any':[ALL]
        }
    
    jail = {
        'norse legend':[TransportAgent,
                         AccessServices,
                         AccessAgentService,
                         AccessHomeService,
                         TransportAgent,
                         AccessAgents,],
                         
        'any':[AccessTimeService, AddAgent],
        
        }

    valhalla = {
        'norse legend':[AddAgent],
        'any': [AccessServices,
               AccessTimeService,
               AccessAgentService,
               AccessHomeService,
               TransportAgent,
               AccessAgents,]
        }

                              
class SimulationSecurityPolicy:

    __implements__ = ISecurityPolicy
    
    def checkPermission(self, permission, object, context):
        token = context.user.getAuthenticationToken()
        home = object.getHome()
        db = getattr(SimulationSecurityDatabase, home.getId(), None)
        
        if db is None:
            return False

        allowed = db.get('any', ())
        if permission in allowed or ALL in allowed:
            return True

        allowed = db.get(token, ())
        if permission in allowed:
            return True
        
        return False
        


def PermissionMapChecker(permissions_map={}, setattr_permission_func=NoSetAttr):
    res = {}
    for k,v in permissions_map.items():
        for iv in v:
            res[iv]=k
    return Checker.Checker(res.get, setattr_permission_func)


#################################
# sandbox security settings
sandbox_security = {AccessServices:['getService', 'addService', 'getServiceIds'],
                    AccessAgents:['getAgentsIds', 'getAgents'],
                    AddAgent:['addAgent'],
                    TransportAgent:['transportAgent'],
                    Public:['getId','getHome']
                    }
sandbox_checker = PermissionMapChecker(sandbox_security)

#################################
# service security settings

# time service
tservice_security = { AccessTimeService:['getTime'] }
time_service_checker = PermissionMapChecker(tservice_security)

# home service
hservice_security = { AccessHomeService:['getAvailableHomes'] }
home_service_checker = PermissionMapChecker(hservice_security)

# agent service
aservice_security = { AccessAgentService:['getLocalAgents'] }
agent_service_checker = PermissionMapChecker(aservice_security)

                                      
def wire_security():

    from Zope.Security import SecurityManagement
    SecurityManagement.setSecurityPolicy(SimulationSecurityPolicy())
    
    import zope.security.examples.sandbox

    Checker.defineChecker(sandbox.Sandbox, sandbox_checker)    
    Checker.defineChecker(sandbox.TimeService, time_service_checker)
    Checker.defineChecker(sandbox.AgentDiscoveryService, agent_service_checker)
    Checker.defineChecker(sandbox.HomeDiscoveryService, home_service_checker)
    
    def addAgent(self, agent):
        if not self._agents.has_key(agent.getId()) \
           and sandbox.IAgent.isImplementedBy(agent):
            self._agents[agent.getId()]=agent
            checker = Checker.selectChecker(self)
            wrapped_home = checker.proxy(self)
            agent.setHome(wrapped_home)
        else:
            raise SandboxError("couldn't add agent %s"%agent)        

    sandbox.Sandbox.addAgent = addAgent
    
    def setupAgent(self, agent):
        SecurityManagement.newSecurityManager(agent)

    sandbox.TimeGenerator.setupAgent = setupAgent
        
    def GreenerPastures(agent):
        """ where do they want to go today """
        import whrandom
        _homes = sandbox._homes
        possible_homes = _homes.keys()
        possible_homes.remove(agent.getHome().getId())
        new_home =  _homes.get(whrandom.choice(possible_homes))
        return Checker.selectChecker(new_home).proxy(new_home)

    sandbox.GreenerPastures = GreenerPastures


if __name__ == '__main__':
    wire_security()    
    sandbox.main()