[Zope3-checkins] CVS: Zope3/src/zope/app/services - __init__.py:1.1.2.1 adapter.py:1.1.2.1 auth.py:1.1.2.1 auth.txt:1.1.2.1 cache.py:1.1.2.1 configuration.py:1.1.2.1 configurationmanager.py:1.1.2.1 configure.zcml:1.1.2.1 connection.py:1.1.2.1 error.gif:1.1.2.1 error_service.gif:1.1.2.1 errorr.py:1.1.2.1 event.py:1.1.2.1 event_service.gif:1.1.2.1 field.py:1.1.2.1 hub.gif:1.1.2.1 hub.py:1.1.2.1 hubcollaborations.txt:1.1.2.1 module.py:1.1.2.1 package.py:1.1.2.1 principalannotation.py:1.1.2.1 role.gif:1.1.2.1 role.py:1.1.2.1 role_service.gif:1.1.2.1 service.gif:1.1.2.1 service.py:1.1.2.1 session.py:1.1.2.1 session.txt:1.1.2.1 test_cookiesessionservice.py:1.1.2.1 view.py:1.1.2.1 viewpackage.py:1.1.2.1 zpt.py:1.1.2.1
Jim Fulton
jim@zope.com
Mon, 23 Dec 2002 14:32:25 -0500
Update of /cvs-repository/Zope3/src/zope/app/services
In directory cvs.zope.org:/tmp/cvs-serv19908/zope/app/services
Added Files:
Tag: NameGeddon-branch
__init__.py adapter.py auth.py auth.txt cache.py
configuration.py configurationmanager.py configure.zcml
connection.py error.gif error_service.gif errorr.py event.py
event_service.gif field.py hub.gif hub.py
hubcollaborations.txt module.py package.py
principalannotation.py role.gif role.py role_service.gif
service.gif service.py session.py session.txt
test_cookiesessionservice.py view.py viewpackage.py zpt.py
Log Message:
Initial renaming before debugging
=== Added File Zope3/src/zope/app/services/__init__.py ===
#
# This file is necessary to make this directory a package.
=== Added File Zope3/src/zope/app/services/adapter.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Adapter Service
$Id: adapter.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
__metaclass__ = type
from zope.interface.adapter import AdapterRegistry
from persistence import Persistent
from persistence.dict import PersistentDict
from zope.component.interfaces import IAdapterService
from zope.component.exceptions import ComponentLookupError
from zope.component import getServiceManager
from zope.app.interfaces.services.configuration import IConfigurable
from zope.app.services.configuration import ConfigurationRegistry
from zope.app.services.configuration import SimpleConfiguration
from zope.proxy.context.context import ContextWrapper
from zope.proxy.context import ContextMethod
from zope.app.services.configuration import ConfigurationStatusProperty
from zope.app.component.nextservice import getNextService
from zope.app.interfaces.services.interfaces import IAdapterConfiguration
class PersistentAdapterRegistry(Persistent, AdapterRegistry):
def __init__(self):
AdapterRegistry.__init__(self, PersistentDict())
class AdapterService(Persistent):
__implements__ = IAdapterService, IConfigurable
def __init__(self):
self._byName = PersistentDict()
def queryConfigurationsFor(self, configuration, default=None):
"See Zope.App.OFS.Services.ConfigurationInterfaces.IConfigurable"
# XXX Need to add named adapter support
return self.queryConfigurations(
configuration.forInterface, configuration.providedInterface, '',
default)
queryConfigurationsFor = ContextMethod(queryConfigurationsFor)
def queryConfigurations(self,
forInterface, providedInterface, name,
default=None):
adapters = self._byName.get(name)
if adapters is None:
return default
registry = adapters.getRegistered(forInterface, providedInterface)
if registry is None:
return default
return ContextWrapper(registry, self)
queryConfigurations = ContextMethod(queryConfigurations)
def createConfigurationsFor(self, configuration):
"See Zope.App.OFS.Services.ConfigurationInterfaces.IConfigurable"
# XXX Need to add named adapter support
return self.createConfigurations(
configuration.forInterface, configuration.providedInterface, '')
createConfigurationsFor = ContextMethod(createConfigurationsFor)
def createConfigurations(self, forInterface, providedInterface, name):
adapters = self._byName.get(name)
if adapters is None:
adapters = PersistentAdapterRegistry()
self._byName[name] = adapters
registry = adapters.getRegistered(forInterface, providedInterface)
if registry is None:
registry = ConfigurationRegistry()
adapters.register(forInterface, providedInterface, registry)
return ContextWrapper(registry, self)
createConfigurations = ContextMethod(createConfigurations)
def getAdapter(self, object, interface, name=''):
"See Zope.ComponentArchitecture.IAdapterService.IAdapterService"
adapter = self.queryAdapter(object, interface, None, name)
if adapter is None:
raise ComponentLookupError(object, interface)
return adapter
getAdapter = ContextMethod(getAdapter)
def queryAdapter(self, object, interface, default=None, name=''):
"See Zope.ComponentArchitecture.IAdapterService.IAdapterService"
if not name and interface.isImplementedBy(object):
return object
adapters = self._byName.get(name)
if adapters:
registry = adapters.getForObject(
object, interface,
filter = lambda registry:
ContextWrapper(registry, self).active(),
)
if registry is not None:
registry = ContextWrapper(registry, self)
adapter = registry.active().getAdapter(object)
return adapter
adapters = getNextService(self, 'Adapters')
return adapters.queryAdapter(object, interface, default)
queryAdapter = ContextMethod(queryAdapter)
# XXX need to add name support
def getRegisteredMatching(self,
for_interfaces=None,
provided_interfaces=None):
adapters = self._byName.get('')
if adapters is None:
return ()
return adapters.getRegisteredMatching(for_interfaces,
provided_interfaces)
class AdapterConfiguration(SimpleConfiguration):
__implements__ = IAdapterConfiguration
status = ConfigurationStatusProperty('Adapters')
# XXX These should be positional arguments, except that forInterface
# isn't passed in if it is omitted. To fix this, we need a
# required=False,explicitly_unrequired=True in the schema field
# so None will get passed in.
def __init__(self, forInterface=None, providedInterface=None,
factoryName=None):
if None in (providedInterface, factoryName):
raise TypeError(
"Must provide 'providedInterface' and 'factoryName'")
self.forInterface = forInterface
self.providedInterface = providedInterface
self.factoryName = factoryName
def getAdapter(self, object):
sm = getServiceManager(self)
factory = sm.resolve(self.factoryName)
return factory(object)
getAdapter = ContextMethod(getAdapter)
=== Added File Zope3/src/zope/app/services/auth.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: auth.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from types import TupleType
from zope.exceptions import NotFoundError
from zope.component import getAdapter, queryAdapter
from zope.app.interfaces.container import IContainer
from zope.app.container.btree import BTreeContainer
from zope.app.interfaces.security import ILoginPassword
from zope.app.interfaces.security import IAuthenticationService
from zope.app.interfaces.services.auth import IUser
class DuplicateLogin(Exception): pass
class DuplicateId(Exception): pass
class ILocalAuthenticationService(IAuthenticationService, IContainer):
"""TTW manageable authentication service"""
def getAllUsers():
"""Get all users of the Service."""
class AuthenticationService(BTreeContainer):
__implements__ = ILocalAuthenticationService
def __init__(self):
super(AuthenticationService, self).__init__()
def getPrincipalByLogin(self, login):
for p in self.values():
if p.getLogin() == login:
return p
return None
def getAllUsers(self):
return self.values()
############################################################
# Implementation methods for interface
# Zope.App.OFS.Services.AuthenticationService.AuthenticationService.
######################################
# from: Zope.App.Security.IAuthenticationService.IAuthenticationService
def authenticate(self, request):
'See Zope.App.Security.IAuthenticationService.IAuthenticationService'
a = queryAdapter(request, ILoginPassword, None)
if a is not None:
login = a.getLogin()
if login is not None:
p = self.getPrincipalByLogin(login)
if p is not None:
password = a.getPassword()
if p.validate(password):
return p
return None
def unauthenticatedPrincipal(self):
'See Zope.App.Security.IAuthenticationService.IAuthenticationService'
return None
def unauthorized(self, id, request):
'See Zope.App.Security.IAuthenticationService.IAuthenticationService'
# XXX This is a mess. request has no place here!
if id is None:
a = getAdapter(request, ILoginPassword)
a.needLogin(realm="zope")
def getPrincipal(self, id):
'See Zope.App.Security.IAuthenticationService.IAuthenticationService'
r = self.get(id)
return r
def getPrincipals(self, name):
'See Zope.App.Security.IAuthenticationService.IAuthenticationService'
name = name.lower()
return [p for p in self.values()
if p.getTitle().lower().startswith(name) or
p.getLogin().lower().startswith(name)]
"""A persistent implementation of thr IPrincipal interface
$Id: auth.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from persistence import Persistent
from zope.proxy.introspection import removeAllProxies
from zope.app.interfaces.annotation import IAttributeAnnotatable
from zope.app.attributeannotations import AttributeAnnotations
from zope.app.interfaces.services.auth import IUser
from zope.app.security.grants.global.principalrolemanager import \
principalRoleManager
class User(Persistent):
"""A persistent implementation of the IUser interface """
__implements__ = IUser, IAttributeAnnotatable
def __init__(self, id, title, description, login, pw):
self.__id = id
self.__title = title
self.__description = description
self.__login = login
self.__pw = pw
############################################################
# Implementation methods for interface
# Zope.App.OFS.Services.AuthenticationService.IUser.
######################################
# from: Zope.App.OFS.Services.AuthenticationService.IUser.IReadUser
def getLogin(self):
'See Zope.App.OFS.Services.AuthenticationService.IUser.IReadUser'
return self.__login
def getRoles(self):
'See Zope.App.OFS.Services.AuthenticationService.IUser.IReadUser'
annotations = AttributeAnnotations(self)
roles = annotations.get('roles', [])
roles = removeAllProxies(roles)
return roles
def validate(self, pw):
'See Zope.App.OFS.Services.AuthenticationService.IUser.IReadUser'
return pw == self.__pw
######################################
# from: Zope.App.Security.IPrincipal.IPrincipal
def getId(self):
'See Zope.App.Security.IPrincipal.IPrincipal'
return self.__id
def getTitle(self):
'See Zope.App.Security.IPrincipal.IPrincipal'
return self.__title
def getDescription(self):
'See Zope.App.Security.IPrincipal.IPrincipal'
return self.__description
######################################
# from: Zope.App.OFS.Services.AuthenticationService.IUser.IWriteUser
def setTitle(self, title):
'See Zope.App.OFS.Services.AuthenticationService.IUser.IWriteUser'
self.__title = title
def setDescription(self, description):
'See Zope.App.OFS.Services.AuthenticationService.IUser.IWriteUser'
self.__description = description
def setLogin(self, login):
'See Zope.App.OFS.Services.AuthenticationService.IUser.IWriteUser'
def setPassword(self, password):
'See Zope.App.OFS.Services.AuthenticationService.IUser.IWriteUser'
self.__pw = password
def setRoles(self, roles):
'See Zope.App.OFS.Services.AuthenticationService.IUser.IReadUser'
annotations = AttributeAnnotations(self)
annotations['roles'] = roles
#
############################################################
=== Added File Zope3/src/zope/app/services/auth.txt ===
$Id: auth.txt,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
The current implementation will be replaced. Following is design
I came up with together with Jim Fulton.
-- itamar
Design notes for new AuthenticationService
==========================================
The service contains a list of user sources. They implement interfaces,
starting with:
class IUserPassUserSource:
"""Authenticate using username and password."""
def authenticate(username, password):
"Returns boolean saying if such username/password pair exists"
class IDigestSupportingUserSource(IUserPassUserSource):
"""Allow fetching password, which is required by digest auth methods"""
def getPassword(username):
"Return password for username"
etc.. Probably there will be others as well, for dealing with certificate
authentication and what not. Probably we need to expand above interfaces
to deal with principal titles and descriptions, and so on.
A login method - cookie auth, HTTP basic auth, digest auth, FTP auth,
is registered as a view on one of the above interfaces.
class ILoginMethodView:
def authenticate():
"""Return principal for request, or None."""
def unauthorized():
"""Tell request that a login is required."""
The authentication service is then implemented something like this:
class AuthenticationService:
def authenticate(self, request):
for us in self.userSources:
loginView = getView(self, us, "login", request)
principal = loginView.authenticate()
if principal is not None:
return principal
def unauthorized(self, request):
loginView = getView(self, self.userSources[0], request)
loginView.unauthorized()
=== Added File Zope3/src/zope/app/services/cache.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Caching service.
$Id: cache.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
__metaclass__ = type
from persistence import Persistent
from zope.app.interfaces.cache.cache import ICachingService
from zope.app.component.nextservice import queryNextService
from zope.app.interfaces.services.configuration \
import INameComponentConfigurable
from zope.app.services.configuration import NameComponentConfigurable
from zope.app.services.event \
import ProtoServiceEventChannel
from zope.proxy.context import ContextMethod
from zope.interfaces.event import IEventChannel
from zope.app.interfaces.event import IObjectModifiedEvent
class ILocalCachingService(ICachingService, IEventChannel,
INameComponentConfigurable):
"""TTW manageable caching service"""
class CachingService(ProtoServiceEventChannel, NameComponentConfigurable):
__implements__ = (ILocalCachingService,
ProtoServiceEventChannel.__implements__)
_subscribeToServiceInterface = IObjectModifiedEvent
def __init__(self):
# XXX If we know that all the superclasses do the right thing in
# __init__ with respect to calling
# super(ClassName, self).__init__(*args, **kw), then we can
# replace the following with just a call to super.
Persistent.__init__(self)
ProtoServiceEventChannel.__init__(self)
NameComponentConfigurable.__init__(self)
def getCache(wrapped_self, name):
'See Zope.App.Caching.ICachingService.ICachingService'
cache = wrapped_self.queryActiveComponent(name)
if cache:
return cache
service = queryNextService(wrapped_self, "Caching")
if service is not None:
return service.getCache(name)
raise KeyError, name
getCache = ContextMethod(getCache)
def queryCache(wrapped_self, name, default=None):
'See Zope.App.Caching.ICachingService.ICachingService'
try:
return wrapped_self.getCache(name)
except KeyError:
return default
queryCache = ContextMethod(queryCache)
def getAvailableCaches(wrapped_self):
'See Zope.App.Caching.ICachingService.ICachingService'
caches = {}
for name in wrapped_self.listConfigurationNames():
registry = wrapped_self.queryConfigurations(name)
if registry.active() is not None:
caches[name] = 0
service = queryNextService(wrapped_self, "Caching")
if service is not None:
for name in service.getAvailableCaches():
caches[name] = 0
return caches.keys()
getAvailableCaches = ContextMethod(getAvailableCaches)
"""A configuration for a cache.
$Id: cache.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from zope.app.interfaces.services.cache import ICacheConfiguration
from zope.app.services.configuration import NamedComponentConfiguration
from zope.app.services.configuration import ConfigurationStatusProperty
from zope.component import getService
from zope.app.interfaces.event import IObjectModifiedEvent
from zope.proxy.context import ContextMethod
class CacheConfiguration(NamedComponentConfiguration):
__doc__ = ICacheConfiguration.__doc__
__implements__ = (ICacheConfiguration,
NamedComponentConfiguration.__implements__)
status = ConfigurationStatusProperty('Caching')
label = "Cache"
def __init__(self, *args, **kw):
super(CacheConfiguration, self).__init__(*args, **kw)
def activated(wrapped_self):
cache = wrapped_self.getComponent()
service = getService(wrapped_self, 'Caching')
service.subscribe(cache, IObjectModifiedEvent)
activated = ContextMethod(activated)
def deactivated(wrapped_self):
cache = wrapped_self.getComponent()
service = getService(wrapped_self, 'Caching')
service.unsubscribe(cache, IObjectModifiedEvent)
cache.invalidateAll()
deactivated = ContextMethod(deactivated)
=== Added File Zope3/src/zope/app/services/configuration.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Component registration support for services
$Id: configuration.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
__metaclass__ = type
from persistence import Persistent
from zope.app.interfaces.services.configuration import IConfigurationRegistry, IConfiguration
from zope.app.interfaces.services.configuration import INamedConfiguration
from zope.app.interfaces.services.configuration import INamedComponentConfiguration
from zope.app.interfaces.services.configuration import INameConfigurable
from zope.app.interfaces.services.configuration import INameComponentConfigurable
from zope.component import getService, queryService
from zope.component import getServiceManager
from zope.component import getAdapter
from zope.proxy.context import ContextMethod
from zope.proxy.context.context import ContextWrapper
from zope.proxy.introspection import removeAllProxies
from zope.security.proxy import Proxy
from zope.security.checker import InterfaceChecker
from zope.app.interfaces.container import IAddNotifiable
from zope.app.interfaces.container import IDeleteNotifiable
from zope.app.interfaces.dependable import IDependable
from zope.app.interfaces.dependable import DependencyError
from Zope.App.Traversing import getPhysicalPathString, traverse
from Zope.App.Traversing import getPhysicalRoot
from zope.app.interfaces.services.configuration \
import Unregistered, Registered, Active
class ConfigurationStatusProperty:
__Zope_ContextWrapper_contextful_get__ = True
__Zope_ContextWrapper_contextful_set__ = True
def __init__(self, service):
self.service = service
def __get__(self, inst, klass):
if inst is None:
return self
configuration = inst
service = queryService(configuration, self.service)
registry = service and service.queryConfigurationsFor(configuration)
if registry:
if registry.active() == configuration:
return Active
if registry.registered(configuration):
return Registered
return Unregistered
def __set__(self, inst, value):
configuration = inst
service = queryService(configuration, self.service)
registry = service and service.queryConfigurationsFor(configuration)
if value == Unregistered:
if registry:
registry.unregister(configuration)
else:
if not service:
# raise an error
service = getService(configuration, self.service)
if registry is None:
registry = service.createConfigurationsFor(configuration)
if value == Registered:
if registry.active() == configuration:
registry.deactivate(configuration)
else:
registry.register(configuration)
elif value == Active:
if not registry.registered(configuration):
registry.register(configuration)
registry.activate(configuration)
class ConfigurationRegistry(Persistent):
__implements__ = IConfigurationRegistry
_data = ()
def _id(self, ob):
# Get and check relative path
prefix = "/++etc++Services/Packages/"
path = getPhysicalPathString(ob)
lpackages = path.rfind(prefix)
if lpackages < 0:
raise ValueError("Configuration object is in an invalid location",
path)
rpath = path[lpackages+len(prefix):]
if not rpath or (".." in rpath.split("/")):
raise ValueError("Configuration object is in an invalid location",
path)
return rpath
def register(wrapped_self, configuration):
cid = wrapped_self._id(configuration)
if wrapped_self._data:
if cid in wrapped_self._data:
return # already registered
else:
# Nothing registered. Need to stick None in front so that nothing
# is active.
wrapped_self._data = (None, )
wrapped_self._data += (cid, )
register = ContextMethod(register)
def unregister(wrapped_self, configuration):
cid = wrapped_self._id(configuration)
data = wrapped_self._data
if data:
if data[0] == cid:
# It's active, we need to switch in None
data = (None, ) + data[1:]
# we need to notify it that it's inactive.
configuration.deactivated()
else:
data = tuple([item for item in data if item != cid])
# Check for empty registry
if len(data) == 1 and data[0] is None:
data = ()
wrapped_self._data = data
unregister = ContextMethod(unregister)
def registered(wrapped_self, configuration):
cid = wrapped_self._id(configuration)
return cid in wrapped_self._data
registered = ContextMethod(registered)
def activate(wrapped_self, configuration):
cid = wrapped_self._id(configuration)
data = wrapped_self._data
if cid in data:
if data[0] == cid:
return # already active
if data[0] is None:
# Remove leading None marker
data = data[1:]
else:
# We need to deactivate the currently active component
sm = getServiceManager(wrapped_self)
old = traverse(sm, 'Packages/'+data[0])
old.deactivated()
wrapped_self._data = (cid, ) + tuple(
[item for item in data if item != cid]
)
configuration.activated()
else:
raise ValueError(
"Configuration to be activated is not registered",
configuration)
activate = ContextMethod(activate)
def deactivate(wrapped_self, configuration):
cid = wrapped_self._id(configuration)
if cid in wrapped_self._data:
if wrapped_self._data[0] != cid:
return # already inactive
# Just stick None on the front
wrapped_self._data = (None, ) + wrapped_self._data
configuration.deactivated()
else:
raise ValueError(
"Configuration to be deactivated is not registered",
configuration)
deactivate = ContextMethod(deactivate)
def active(wrapped_self):
if wrapped_self._data:
path = wrapped_self._data[0]
if path is not None:
# Make sure we can traverse to it.
sm = getServiceManager(wrapped_self)
configuration = traverse(sm, 'Packages/'+path)
return configuration
return None
active = ContextMethod(active)
def __nonzero__(self):
return bool(self._data)
def info(wrapped_self):
sm = getServiceManager(wrapped_self)
result = [{'id': path,
'active': False,
'configuration': (path and traverse(sm, 'Packages/'+path))
}
for path in wrapped_self._data
]
if result:
if result[0]['configuration'] is None:
del result[0]
else:
result[0]['active'] = True
return result
info = ContextMethod(info)
class SimpleConfiguration(Persistent):
"""Configuration objects that just contain configuration data
"""
__implements__ = IConfiguration, IDeleteNotifiable
title = description = u''
def activated(self):
pass
def deactivated(self):
pass
def manage_beforeDelete(self, configuration, container):
"See Zope.App.OFS.Container.IDeleteNotifiable"
objectstatus = configuration.status
if objectstatus == Active:
try: objectpath = getPhysicalPathString(configuration)
except: objectpath = str(configuration)
raise DependencyError("Can't delete active configuration (%s)"
% objectpath)
elif objectstatus == Registered:
configuration.status = Unregistered
class NamedConfiguration(SimpleConfiguration):
"""Named configuration
"""
__implements__ = INamedConfiguration, SimpleConfiguration.__implements__
def __init__(self, name, *args, **kw):
self.name = name
super(NamedConfiguration, self).__init__(*args, **kw)
class NamedComponentConfiguration(NamedConfiguration):
"""Named component configuration
Subclasses should define a getInterface() method returning the interface
of the component.
"""
# NamedConfiguration.__implements__ includes IDeleteNotifiable
__implements__ = (INamedComponentConfiguration,
NamedConfiguration.__implements__, IAddNotifiable)
# XXX is all this '*args, **kw' business the right way to use super?
def __init__(self, name, component_path, permission=None, *args, **kw):
self.componentPath = component_path
if permission == 'Zope.Public':
permission = CheckerPublic
self.permission = permission
super(NamedComponentConfiguration, self).__init__(name, *args, **kw)
def getComponent(wrapped_self):
service_manager = getServiceManager(wrapped_self)
# We have to be clever here. We need to do an honest to
# god unrestricted traveral, which means we have to
# traverse from an unproxied object. But, it's not enough
# for the service manager to be unproxied, because the
# path is an absolute path. When absolute paths are
# traversed, the traverser finds the physical root and
# traverses from there, so we need to make sure the
# physical root isn't proxied.
# get the root and unproxy it.
root = removeAllProxies(getPhysicalRoot(service_manager))
component = traverse(root, wrapped_self.componentPath)
if wrapped_self.permission:
if type(component) is Proxy:
# XXX what is this?
# Answer: There should be at most one security Proxy around
# an object. So, if we're going to add a new security proxy,
# we need to remove any existing one.
component = removeSecurityProxy(component)
interface = wrapped_self.getInterface()
checker = InterfaceChecker(interface, wrapped_self.permission)
component = Proxy(component, checker)
return component
getComponent = ContextMethod(getComponent)
def manage_afterAdd(self, configuration, container):
"See Zope.App.OFS.Container.IAddNotifiable"
component = configuration.getComponent()
dependents = getAdapter(component, IDependable)
objectpath = getPhysicalPathString(configuration)
dependents.addDependent(objectpath)
def manage_beforeDelete(self, configuration, container):
"See Zope.App.OFS.Container.IDeleteNotifiable"
super(NamedComponentConfiguration, self
).manage_beforeDelete(configuration, container)
component = configuration.getComponent()
dependents = getAdapter(component, IDependable)
objectpath = getPhysicalPathString(configuration)
dependents.removeDependent(objectpath)
class NameConfigurable:
"""Mixin for implementing INameConfigurable
"""
__implements__ = INameConfigurable
def __init__(self, *args, **kw):
self._bindings = {}
super(NameConfigurable, self).__init__(*args, **kw)
def queryConfigurationsFor(wrapped_self, cfg, default=None):
"""See Zope.App.OFS.Services.ConfigurationInterfaces.IConfigurable"""
return wrapped_self.queryConfigurations(cfg.name, default)
queryConfigurationsFor = ContextMethod(queryConfigurationsFor)
def queryConfigurations(wrapped_self, name, default=None):
"""See
Zope.App.OFS.Services.ConfigurationInterfaces.INameConfigurable"""
registry = wrapped_self._bindings.get(name, default)
return ContextWrapper(registry, wrapped_self)
queryConfigurations = ContextMethod(queryConfigurations)
def createConfigurationsFor(wrapped_self, cfg):
"""See Zope.App.OFS.Services.ConfigurationInterfaces.IConfigurable"""
return wrapped_self.createConfigurations(cfg.name)
createConfigurationsFor = ContextMethod(createConfigurationsFor)
def createConfigurations(wrapped_self, name):
"""See
Zope.App.OFS.Services.ConfigurationInterfaces.INameConfigurable"""
try:
registry = wrapped_self._bindings[name]
except KeyError:
wrapped_self._bindings[name] = registry = ConfigurationRegistry()
wrapped_self._p_changed = 1
return ContextWrapper(registry, wrapped_self)
createConfigurations = ContextMethod(createConfigurations)
def listConfigurationNames(wrapped_self):
"""See
Zope.App.OFS.Services.ConfigurationInterfaces.INameConfigurable"""
return filter(wrapped_self._bindings.get,
wrapped_self._bindings.keys())
class NameComponentConfigurable(NameConfigurable):
"""Mixin for implementing INameComponentConfigurable
"""
__implements__ = INameComponentConfigurable
def queryActiveComponent(wrapped_self, name, default=None):
"""See Zope.App.OFS.Services.ConfigurationInterfaces
.INameComponentConfigurable"""
registry = wrapped_self.queryConfigurations(name)
if registry:
configuration = registry.active()
if configuration is not None:
return configuration.getComponent()
return default
queryActiveComponent = ContextMethod(queryActiveComponent)
=== Added File Zope3/src/zope/app/services/configurationmanager.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: configurationmanager.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
__metaclass__ = type
from persistence import Persistent
from zope.app.interfaces.services.configurationmanager import IConfigurationManager
from zope.app.interfaces.container import IDeleteNotifiable
from zope.app.interfaces.container import IZopeWriteContainer
from zope.component import getAdapter
class ConfigurationManager(Persistent):
"""Configuration manager
Manages configurations within a package.
"""
__implements__ = IConfigurationManager, IDeleteNotifiable
def __init__(self):
self._data = ()
self._next = 0
def __getitem__(self, key):
"See Zope.App.OFS.Container.IContainer.IItemContainer"
v = self.get(key)
if v is None:
raise KeyError, key
return v
def get(self, key, default=None):
"See Interface.Common.Mapping.IReadMapping"
for k, v in self._data:
if k == key:
return v
return default
def __contains__(self, key):
"See Interface.Common.Mapping.IReadMapping"
return self.get(key) is not None
def keys(self):
"See Interface.Common.Mapping.IEnumerableMapping"
return [k for k, v in self._data]
def values(self):
"See Interface.Common.Mapping.IEnumerableMapping"
return [v for k, v in self._data]
def items(self):
"See Interface.Common.Mapping.IEnumerableMapping"
return self._data
def __len__(self):
"See Interface.Common.Mapping.IEnumerableMapping"
return len(self._data)
def setObject(self, key, object):
"See Zope.App.OFS.Container.IContainer.IWriteContainer"
self._next += 1
key = str(self._next)
self._data += ((key, object), )
return key
def __delitem__(self, key):
"See Zope.App.OFS.Container.IContainer.IWriteContainer"
if key not in self:
raise KeyError, key
self._data = tuple(
[item
for item in self._data
if item[0] != key]
)
def moveTop(self, names):
self._data = tuple(
[item for item in self._data if (item[0] in names)]
+
[item for item in self._data if (item[0] not in names)]
)
def moveBottom(self, names):
self._data = tuple(
[item for item in self._data if (item[0] not in names)]
+
[item for item in self._data if (item[0] in names)]
)
def _moveUpOrDown(self, names, direction):
# Move each named item by one position. Note that this
# might require moving some unnamed objects by more than
# one position.
indexes = {}
# Copy named items to positions one less than they currently have
i = -1
for item in self._data:
i += 1
if item[0] in names:
j = max(i + direction, 0)
while j in indexes:
j += 1
indexes[j] = item
# Fill in the rest where there's room.
i = 0
for item in self._data:
if item[0] not in names:
while i in indexes:
i += 1
indexes[i] = item
items = indexes.items()
items.sort()
self._data = tuple([item[1] for item in items])
def moveUp(self, names):
self._moveUpOrDown(names, -1)
def moveDown(self, names):
self._moveUpOrDown(names, 1)
def manage_beforeDelete(self, object, container):
assert object == self
container = getAdapter(object, IZopeWriteContainer)
for k, v in self._data:
del container[k]
__doc__ = ConfigurationManager.__doc__ + __doc__
=== Added File Zope3/src/zope/app/services/configure.zcml === (423/523 lines abridged)
<zopeConfigure
xmlns='http://namespaces.zope.org/zope'
xmlns:browser="http://namespaces.zope.org/browser"
>
<!-- Configuration registries -->
<content class="zope.app.services.configuration.ConfigurationRegistry">
<require permission="Zope.ManageServices"
interface="zope.app.interfaces.services.configuration.IConfigurationRegistry"
/>
</content>
<!-- Adapter Service -->
<content class="zope.app.services.adapter.AdapterService">
<implements interface="zope.app.interfaces.annotation.IAttributeAnnotatable" />
<factory id="Zope.App.OFS.Services.AdapterService"
permission="Zope.ManageServices"
/>
<require permission="Zope.ManageServices"
interface="zope.app.interfaces.services.configuration.IConfigurable"
attributes="getRegisteredMatching"
/>
</content>
<content class="zope.app.services.adapter.AdapterConfiguration">
<require
permission="Zope.ManageServices"
interface="zope.app.interfaces.services.interfaces.IAdapterConfiguration"
set_schema=
"Zope.App.OFS.Services.ConfigurationInterfaces.IConfiguration"
/>
<require
permission="Zope.ManageServices"
interface="zope.app.interfaces.container.IDeleteNotifiable"
/>
</content>
<!-- View Service -->
<content class="zope.app.services.view.ViewService">
<implements interface="zope.app.interfaces.annotation.IAttributeAnnotatable" />
<factory id="Zope.App.OFS.Services.ViewService"
permission="Zope.ManageServices"
/>
<require permission="Zope.ManageServices"
interface="zope.app.interfaces.services.configuration.IConfigurable"
attributes="getRegisteredMatching"
/>
[-=- -=- -=- 423 lines omitted -=- -=- -=-]
<include package=".Views" />
</zopeConfigure>
<zopeConfigure
xmlns='http://namespaces.zope.org/zope'>
<!-- Authentication Service Directives -->
<content class="zope.app.services.auth.AuthenticationService">
<factory id="AuthenticationService" permission="Zope.ManageServices" />
<!-- XXX Should you *really* be able to get all this with just the
view permission? -->
<require
permission="Zope.View"
interface="zope.app.interfaces.security.IAuthenticationService" />
<require
permission="Zope.View"
attributes="getAllUsers" />
<require
permission="Zope.ManageServices"
interface="zope.app.interfaces.container.IContainer" />
<implements interface="zope.app.interfaces.annotation.IAttributeAnnotatable" />
</content>
<!-- User Directives -->
<content class="zope.app.services.auth.User">
<factory id="User" permission="Zope.ManageServices" />
<require
permission="Zope.View"
interface="zope.app.interfaces.services.auth.IReadUser" />
<require
permission="Zope.ManageContent"
interface="zope.app.interfaces.services.auth.IWriteUser" />
<implements interface="zope.app.interfaces.annotation.IAttributeAnnotatable" />
</content>
<!-- Further directives -->
<include package=".Views" />
</zopeConfigure>
=== Added File Zope3/src/zope/app/services/connection.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: connection.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from persistence import Persistent
from zope.proxy.context import ContextMethod
from zope.app.component.nextservice import queryNextService
from zope.app.interfaces.services.configuration \
import INameComponentConfigurable
from zope.app.services.configuration import NameComponentConfigurable
from zope.app.interfaces.rdb import IConnectionService
class ILocalConnectionService(IConnectionService, INameComponentConfigurable):
"""A local (placeful) connection service"""
class ConnectionService(Persistent, NameComponentConfigurable):
__doc__ = ILocalConnectionService.__doc__
__implements__ = ILocalConnectionService
def __init__(self):
super(ConnectionService, self).__init__()
def getConnection(self, name):
'See Zope.App.RDB.IConnectionService.IConnectionService'
adapter = self.queryActiveComponent(name)
if adapter is not None:
return adapter()
service = queryNextService(self, "SQLDatabaseConnections")
if service is not None:
return service.getConnection(name)
raise KeyError, name
getConnection = ContextMethod(getConnection)
def queryConnection(self, name, default=None):
'See Zope.App.RDB.IConnectionService.IConnectionService'
try:
return self.getConnection(name)
except KeyError:
return default
queryConnection = ContextMethod(queryConnection)
def getAvailableConnections(self):
'See Zope.App.RDB.IConnectionService.IConnectionService'
connections = {}
for name in self.listConfigurationNames():
registry = self.queryConfigurations(name)
if registry.active() is not None:
connections[name] = 0
service = queryNextService(self, "SQLDatabaseConnections")
if service is not None:
for name in service.getAvailableConnections():
connections[name] = 0
return connections.keys()
getAvailableConnections = ContextMethod(getAvailableConnections)
"""A configuration for a database adapter.
$Id: connection.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from zope.app.interfaces.services.connection import IConnectionConfiguration
from zope.app.services.configuration import NamedComponentConfiguration
from zope.app.services.configuration import ConfigurationStatusProperty
class ConnectionConfiguration(NamedComponentConfiguration):
__doc__ = IConnectionConfiguration.__doc__
__implements__ = (IConnectionConfiguration,
NamedComponentConfiguration.__implements__)
status = ConfigurationStatusProperty('SQLDatabaseConnections')
label = "Connection"
def __init__(self, *args, **kw):
super(ConnectionConfiguration, self).__init__(*args, **kw)
=== Added File Zope3/src/zope/app/services/error.gif ===
<Binary-ish file>
=== Added File Zope3/src/zope/app/services/error_service.gif ===
<Binary-ish file>
=== Added File Zope3/src/zope/app/services/errorr.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Revision information:
$Id: errorr.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
import time
from random import random
from thread import allocate_lock
from persistence import Persistent
from types import StringTypes
import logging
from zope.exceptions.exceptionformatter import format_exception
from zope.proxy.context import ContextMethod
from zope.app.interfaces.services.error \
import IErrorReportingService
#Restrict the rate at which errors are sent to the Event Log
_rate_restrict_pool = {}
# The number of seconds that must elapse on average between sending two
# exceptions of the same name into the the Event Log. one per minute.
_rate_restrict_period = 60
# The number of exceptions to allow in a burst before the above limit
# kicks in. We allow five exceptions, before limiting them to one per
# minute.
_rate_restrict_burst = 5
# _temp_logs holds the logs.
_temp_logs = {} # { oid -> [ traceback string ] }
cleanup_lock = allocate_lock()
class ErrorReportingService(Persistent):
"""Error Reporting Service
"""
__implements__ = IErrorReportingService
keep_entries = 20
copy_to_zlog = 0
_ignored_exceptions = ('Unauthorized',)
def _getLog(self):
"""Returns the log for this object.
Careful, the log is shared between threads.
"""
log = _temp_logs.get(self._p_oid, None)
if log is None:
log = []
_temp_logs[self._p_oid] = log
return log
# Exceptions that happen all the time, so we dont need
# to log them. Eventually this should be configured
# through-the-web.
def raising(self, info, request=None):
"""Log an exception.
Called by ZopePublication.handleException method.
"""
now = time.time()
try:
tb_text = None
tb_html = None
strtype = str(getattr(info[0], '__name__', info[0]))
if strtype in self._ignored_exceptions:
return
if not isinstance(info[2], StringTypes):
tb_text = ''.join(
format_exception(*info, **{'as_html': 0}))
tb_html = ''.join(
format_exception(*info, **{'as_html': 1}))
else:
tb_text = info[2]
url = None
username = None
req_html = None
if request:
url = request.URL
try:
username = ', '.join((request.user.getLogin(),
request.user.getId(),
request.user.getTitle(),
request.user.getDescription()
))
# When there's an unauthorized access, request.user is
# not set, so we get an AttributeError
# XXX is this right? Surely request.user should be set!
except AttributeError:
pass
req_html = ''.join(['%s : %s<br>' % item
for item in request.items()])
try:
strv = str(info[1])
# A call to str(obj) could raise anything at all.
# We'll ignore these errors, and print something
# useful instead, but also log the error.
except:
logging.getLogger('SiteError').exception(
'Error in ErrorReportingService while getting a str '
'representation of an object')
strv = '<unprintable %s object>' % (
str(type(info[1]).__name__)
)
log = self._getLog()
entry_id = str(now) + str(random()) # Low chance of collision
log.append({
'type': strtype,
'value': strv,
'time': time.ctime(now),
'id': entry_id,
'tb_text': tb_text,
'tb_html': tb_html,
'username': username,
'url': url,
'req_html': req_html,
})
cleanup_lock.acquire()
try:
if len(log) >= self.keep_entries:
del log[:-self.keep_entries]
finally:
cleanup_lock.release()
if self.copy_to_zlog:
self._do_copy_to_zlog(now, strtype, str(url), info)
finally:
info = None
raising = ContextMethod(raising)
def _do_copy_to_zlog(self, now, strtype, url, info):
# XXX info is unused; logging.exception() will call sys.exc_info()
when = _rate_restrict_pool.get(strtype,0)
if now > when:
next_when = max(when,
now - _rate_restrict_burst*_rate_restrict_period)
next_when += _rate_restrict_period
_rate_restrict_pool[strtype] = next_when
logging.getLogger('SiteError').exception(str(url))
def getProperties(self):
return {
'keep_entries': self.keep_entries,
'copy_to_zlog': self.copy_to_zlog,
'ignored_exceptions': self._ignored_exceptions,
}
getProperties = ContextMethod(getProperties)
def setProperties(self, keep_entries, copy_to_zlog=0,
ignored_exceptions=()):
"""Sets the properties of this site error log.
"""
copy_to_zlog = bool(copy_to_zlog)
self.keep_entries = int(keep_entries)
self.copy_to_zlog = copy_to_zlog
self._ignored_exceptions = tuple(
filter(None, map(str, ignored_exceptions))
)
setProperties = ContextMethod(setProperties)
def getLogEntries(self):
"""Returns the entries in the log, most recent first.
Makes a copy to prevent changes.
"""
res = [entry.copy() for entry in self._getLog()]
res.reverse()
return res
getLogEntries = ContextMethod(getLogEntries)
def getLogEntryById(self, id):
"""Returns the specified log entry.
Makes a copy to prevent changes. Returns None if not found.
"""
for entry in self._getLog():
if entry['id'] == id:
return entry.copy()
return None
getLogEntryById = ContextMethod(getLogEntryById)
def _cleanup_temp_log():
_temp_logs.clear()
_clear = _cleanup_temp_log
# Register our cleanup with Testing.CleanUp to make writing unit tests simpler.
from zope.testing.cleanup import addCleanUp
addCleanUp(_clear)
del addCleanUp
=== Added File Zope3/src/zope/app/services/event.py === (549/649 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Revision information:
$Id: event.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from zope.exceptions import NotFoundError
from zope.interfaces.event import ISubscriptionAware
from zope.interfaces.event import IEvent
from zope.proxy.context import ContextMethod
from zope.proxy.introspection import removeAllProxies
from zope.proxy.context.context import ContextWrapper
from zope.app.component.nextservice import getNextService, queryNextService
class LocalServiceSubscribable(LocalSubscribable):
"""a local mix-in for services"""
__implements__ = LocalSubscribable.__implements__
_serviceName = None # should be replaced; usually done in "bound"
# method of a subclass
# uses (and needs) __init__ from Zope.Event.Subscribable (via
# LocalSubscribable)
def unsubscribe(wrapped_self,
subscriber,
event_type=None,
filter=None):
# might be wrapped, might not
subscriber = removeAllProxies(subscriber)
clean_self = removeAllProxies(wrapped_self)
wrapped_subscriber = ContextWrapper(subscriber, wrapped_self)
for subscriber_index in range(len(clean_self._subscribers)):
sub = clean_self._subscribers[subscriber_index]
[-=- -=- -=- 549 lines omitted -=- -=- -=-]
from zope.app.interfaces.traversing.traverser import ITraverser
from zope.interfaces.event import ISubscriptionAware
from zope.proxy.introspection import removeAllProxies
from zope.app.interfaces.services.event import IPathSubscriber
from zope.interface import Attribute
from Zope.App.Traversing import getPhysicalPathString, traverse
from zope.proxy.context import ContextMethod
class AbstractIndirectSubscriber:
def notify(wrapped_self, event):
removeAllProxies(wrapped_self)._getSubscriber(
wrapped_self).notify(event)
notify=ContextMethod(notify)
def subscribedTo(wrapped_self, subscribable, event_type, filter):
proxiedObj = removeAllProxies(
wrapped_self)._getSubscriber(wrapped_self)
if ISubscriptionAware.isImplementedBy(
removeAllProxies(proxiedObj)):
proxiedObj.subscribedTo(
subscribable, event_type, filter )
subscribedTo=ContextMethod(subscribedTo)
def unsubscribedFrom(wrapped_self, subscribable, event_type, filter):
proxiedObj = removeAllProxies(
wrapped_self)._getSubscriber(wrapped_self)
if ISubscriptionAware.isImplementedBy(
removeAllProxies(proxiedObj)):
proxiedObj.unsubscribedFrom(
subscribable, event_type, filter )
unsubscribedFrom=ContextMethod(unsubscribedFrom)
class PathSubscriber(AbstractIndirectSubscriber):
__implements__ = IPathSubscriber, ISubscriptionAware
def __init__(self, wrapped_subscriber):
self.subscriber_path = getPhysicalPathString(wrapped_subscriber)
def __eq__(self, other):
return (IPathSubscriber.isImplementedBy(other) and
other.subscriber_path == self.subscriber_path)
def _getSubscriber(self, wrapped_self):
return traverse(wrapped_self, self.subscriber_path)
=== Added File Zope3/src/zope/app/services/event_service.gif ===
<Binary-ish file>
=== Added File Zope3/src/zope/app/services/field.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Component location field.
$Id: field.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
__metaclass__ = type
from zope.schema.interfaces import IField
from zope.schema import Field
from zope.schema.interfaces import ValidationError
from Zope.App.Traversing import traverse
from zope.app.component.interfacefield import InterfaceField
from zope.exceptions import NotFoundError
class IComponentLocation(IField):
"""A field containing a component path.
"""
type = InterfaceField(
title = u"An interface that must be implemented by the component.",
required = True,
readonly = True,
)
class ComponentLocation(Field):
__implements__ = IComponentLocation
_type = unicode
def __init__(self, type, *args, **kw):
self.type = type
super(ComponentLocation, self).__init__(*args, **kw)
def _validate(self, value):
super(ComponentLocation, self)._validate(value)
if not value.startswith('/'):
raise ValidationError("Not an absolute path", value)
try:
component = traverse(self.context, value)
except NotFoundError:
raise ValidationError("Path for non-existent object", value)
if not self.type.isImplementedBy(component):
raise ValidationError("Wrong component type")
=== Added File Zope3/src/zope/app/services/hub.gif ===
<Binary-ish file>
=== Added File Zope3/src/zope/app/services/hub.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Revision information:
$Id: hub.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
__metaclass__ = type
from zope.app.interfaces.services.hub import IObjectRegisteredHubEvent
from zope.app.interfaces.services.hub import IObjectUnregisteredHubEvent
from zope.app.interfaces.services.hub import IObjectModifiedHubEvent
from zope.app.interfaces.services.hub import IObjectMovedHubEvent
from zope.app.interfaces.services.hub import IObjectRemovedHubEvent
from Zope.App.Traversing import traverse
from zope.app.interfaces.traversing.traverser import ITraverser
from zope.component import getAdapter
class HubEvent:
"""Convenient mix-in for HubEvents"""
hub = None
hubid = None
# object = None
# location = None
def __init__(self, hub, hubid, location=None, object=None):
# we keep all four, to avoid unnecessary lookups
# and to give the objecthub an opportunity to do
# caching of objects
self.hub = hub
self.hubid = hubid
self.__object = object
self.__location = location
def __getObject(self):
obj = self.__object
if obj is None:
obj = self.__object = self.hub.getObject(self.hubid)
return obj
object = property(__getObject)
def __getLocation(self):
loc = self.__location
if loc is None:
loc = self.__location = self.hub.getLocation(self.hubid)
return loc
location = property(__getLocation)
class ObjectRegisteredHubEvent(HubEvent):
"""A hubid has been freshly created and mapped against an object."""
__implements__ = IObjectRegisteredHubEvent
class ObjectUnregisteredHubEvent:
"""We are no longer interested in this object.
"""
hub = None
hubid = None
# object = None
location = None
def __init__(self, hub, hubid, location, object=None):
# location *must* be supplied because the object hub cannot be
# relied upon to translate an unregistered hubid
self.hub = hub
self.hubid = hubid
self.__object = object
self.location = location
__implements__ = IObjectUnregisteredHubEvent
def __getObject(self):
obj = self.__object
if obj is None:
adapter = getAdapter(self.hub, ITraverser)
obj = self.__object = adapter.traverse(self.location)
return obj
object = property(__getObject)
class ObjectModifiedHubEvent(HubEvent):
"""An object with a hubid has been modified."""
__implements__ = IObjectModifiedHubEvent
class ObjectMovedHubEvent(HubEvent):
"""An object with a hubid has had its context changed. Typically, this
means that it has been moved."""
def __init__(self, hub, hubid, fromLocation, location=None, object=None):
self.fromLocation = fromLocation
HubEvent.__init__(self, hub, hubid, location, object)
__implements__ = IObjectMovedHubEvent
class ObjectRemovedHubEvent(ObjectUnregisteredHubEvent):
"""An object with a hubid has been removed."""
__implements__ = IObjectRemovedHubEvent
# ...which is a subclass of IObjectUnregisteredHubEvent
hub = None
hubid = None
object = None
location = None
def __init__(self, hub, hubid, location, object):
# all four *must* be supplied because the object hub cannot be
# relied upon to translate an unregistered hubid
self.hub = hub
self.hubid = hubid
self.object = object
self.location = location
"""
Revision information:
$Id: hub.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from __future__ import generators
from zope.app.services.event \
import ProtoServiceEventChannel
from zope.app.interfaces.services.hub import IObjectHub, ObjectHubError
from zope.exceptions import NotFoundError
from zope.app.interfaces.event import IObjectRemovedEvent, IObjectEvent
from zope.app.interfaces.event import IObjectMovedEvent, IObjectCreatedEvent
from zope.app.interfaces.event import IObjectModifiedEvent
from zodb.btrees.IOBTree import IOBTree
from zodb.btrees.OIBTree import OIBTree
from zope.proxy.context import ContextMethod
from zope.proxy.context.context import isWrapper
from zope.app.interfaces.traversing.traverser import ITraverser
from Zope.App.Traversing import getPhysicalPath
from Zope.App.Traversing import locationAsTuple, locationAsUnicode
from zope.proxy.introspection import removeAllProxies
from zope.proxy.context.context import ContextWrapper
from zope.component import getAdapter
import random
def randid():
# Return a random number between -2*10**9 and 2*10**9, but not 0.
abs = random.randrange(1, 2000000001)
if random.random() < 0.5:
return -abs
else:
return abs
class ObjectHub(ProtoServiceEventChannel):
# this implementation makes the decision to not interact with any
# object hubs above it: it is a world unto itself, as far as it is
# concerned, and if it doesn't know how to do something, it won't
# ask anything else to try. Everything else is YAGNI for now.
__implements__ = (
IObjectHub,
ProtoServiceEventChannel.__implements__)
def __init__(self):
ProtoServiceEventChannel.__init__(self)
# int --> tuple of unicodes
self.__hubid_to_location = IOBTree()
# tuple of unicodes --> int
self.__location_to_hubid = OIBTree()
# XXX this is copied because of some context method problems
# with moving LocalEventChannel.notify to this _notify via a simple
# assignment, i.e. _notify = LocalEventChannel.notify
def _notify(clean_self, wrapped_self, event):
subscriptionses = clean_self.subscriptionsForEvent(event)
# that's a non-interface shortcut for
# subscriptionses = clean_self._registry.getAllForObject(event)
for subscriptions in subscriptionses:
for subscriber, filter in subscriptions:
if filter is not None and not filter(event):
continue
ContextWrapper(subscriber, wrapped_self).notify(event)
def notify(wrapped_self, event):
'''See interface ISubscriber'''
clean_self = removeAllProxies(wrapped_self)
clean_self._notify(wrapped_self, event)
if IObjectEvent.isImplementedBy(event):
# generate NotificationHubEvents only if object is known
# ie registered
if IObjectMovedEvent.isImplementedBy(event):
canonical_location = locationAsTuple(event.fromLocation)
hubid = clean_self.__location_to_hubid.get(canonical_location)
if hubid is not None:
canonical_new_location = locationAsTuple(
event.location)
location_to_hubid = clean_self.__location_to_hubid
if location_to_hubid.has_key(canonical_new_location):
raise ObjectHubError(
'Cannot move to location %s, '
'as there is already something there'
% locationAsUnicode(canonical_new_location))
hubid = location_to_hubid[canonical_location]
del location_to_hubid[canonical_location]
location_to_hubid[canonical_new_location] = hubid
clean_self.__hubid_to_location[hubid] = (
canonical_new_location)
# send out IObjectMovedHubEvent to plugins
event = ObjectMovedHubEvent(
wrapped_self,
hubid,
canonical_location,
canonical_new_location,
event.object)
clean_self._notify(wrapped_self, event)
elif IObjectCreatedEvent.isImplementedBy(event):
# a newly created object that has not been added to a
# container yet has no location. So, we're not interested in
# it.
pass
else:
canonical_location = locationAsTuple(event.location)
hubid = clean_self.__location_to_hubid.get(canonical_location)
if hubid is not None:
if IObjectModifiedEvent.isImplementedBy(event):
# send out IObjectModifiedHubEvent to plugins
event = ObjectModifiedHubEvent(
wrapped_self,
hubid,
canonical_location,
event.object)
clean_self._notify(wrapped_self, event)
elif IObjectRemovedEvent.isImplementedBy(event):
del clean_self.__hubid_to_location[hubid]
del clean_self.__location_to_hubid[canonical_location]
# send out IObjectRemovedHubEvent to plugins
event = ObjectRemovedHubEvent(
event.object,
hubid,
canonical_location,
event.object)
clean_self._notify(wrapped_self, event)
notify = ContextMethod(notify)
def getHubId(self, location):
'''See interface ILocalObjectHub'''
if isWrapper(location):
location = getPhysicalPath(location)
else:
location = locationAsTuple(location)
hubid = self.__location_to_hubid.get(location)
if hubid is None:
raise NotFoundError(locationAsUnicode(location))
else:
return hubid
def getLocation(self, hubid):
'''See interface IObjectHub'''
try:
return self.__hubid_to_location[hubid]
except KeyError:
raise NotFoundError(hubid)
def getObject(wrapped_self, hubid):
'''See interface IObjectHub'''
location = wrapped_self.getLocation(hubid)
adapter = getAdapter(wrapped_self, ITraverser)
return adapter.traverse(location)
getObject = ContextMethod(getObject)
def register(wrapped_self, obj_or_loc):
'''See interface ILocalObjectHub'''
clean_self = removeAllProxies(wrapped_self)
# XXX Need a new unit test for this; previously we tested
# whether it's wrapped, which is wrong because the root
# isn't wrapped (and it certainly makes sense to want to
# register the root).
if isinstance(obj_or_loc, (str, unicode, tuple)):
obj = None
location = obj_or_loc
else:
obj = obj_or_loc
location = getPhysicalPath(obj_or_loc)
canonical_location = locationAsTuple(location)
if not canonical_location[0] == u'':
raise ValueError("Location must be absolute")
# This is here to make sure the 'registrations' method won't
# trip up on using unichar ffff as a sentinel.
for segment in canonical_location:
if segment.startswith(u'\uffff'):
raise ValueError(
"Location contains a segment starting with \\uffff")
location_to_hubid = clean_self.__location_to_hubid
if location_to_hubid.has_key(canonical_location):
# XXX It would be more convenient if register() returned
# a bool indicating whether the object is already
# registered, rather than raising an exception.
# Then a more useful distinction between real errors
# and this (common) condition could be made.
raise ObjectHubError(
'location %s already in object hub' %
locationAsUnicode(canonical_location))
hubid = clean_self._generateHubId(canonical_location)
location_to_hubid[canonical_location] = hubid
# send out IObjectRegisteredHubEvent to plugins
event = ObjectRegisteredHubEvent(
wrapped_self,
hubid,
canonical_location,
obj)
clean_self._notify(wrapped_self, event)
return hubid
register = ContextMethod(register)
def unregister(wrapped_self, location):
'''See interface ILocalObjectHub'''
clean_self = removeAllProxies(wrapped_self)
if isWrapper(location):
location = getPhysicalPath(location) # XXX this branch is
# not exercised: needs unit test
canonical_location = locationAsTuple(location)
elif isinstance(location, int):
canonical_location = clean_self.getLocation(location)
else:
canonical_location = locationAsTuple(location)
location_to_hubid = clean_self.__location_to_hubid
hubid_to_location = clean_self.__hubid_to_location
try:
hubid = location_to_hubid[canonical_location]
except KeyError:
raise NotFoundError('location %s is not in object hub' %
locationAsUnicode(canonical_location))
else:
del hubid_to_location[hubid]
del location_to_hubid[canonical_location]
# send out IObjectUnregisteredHubEvent to plugins
event = ObjectUnregisteredHubEvent(
wrapped_self,
hubid,
canonical_location)
clean_self._notify(wrapped_self, event)
unregister = ContextMethod(unregister)
def numRegistrations(self):
"""See interface IObjectHub"""
# The hubid<-->location mappings should be the same size.
# The IOBTree of hubid-->location might be faster to find the
# size of, as the keys are ints. But, I haven't tested that.
# assert len(self.__hubid_to_location)==len(self.__location_to_hubid)
return len(self.__hubid_to_location)
def getRegistrations(self, location=(u'',)):
"""See interface IObjectHub"""
# Location can be an ascii string a unicode or a tuple of strings
# or unicodes. So, get a canonical location first of all.
location = locationAsTuple(location)
if location == (u'',):
# Optimisation when we're asked for all the registered objects.
# Returns an IOBTreeItems object.
return self.__location_to_hubid.items()
# BTrees only support searches including the min and max.
# So, I need to add to the end of the location a string that will
# be larger than any other. I could also use a type that
# sorts after unicodes.
return self.__location_to_hubid.items(location, location+(u'\uffff',))
def iterObjectRegistrations(wrapped_self):
"""See interface IHubEventChannel"""
traverser = getAdapter(wrapped_self, ITraverser)
for location, hubId in wrapped_self.getRegistrations():
yield (location, hubId, traverser.traverse(location))
iterObjectRegistrations = ContextMethod(iterObjectRegistrations)
############################################################
def _generateHubId(self, location):
index = getattr(self, '_v_nextid', 0)
if index%4000 == 0:
index = randid()
hubid_to_location = self.__hubid_to_location
while not hubid_to_location.insert(index, location):
index = randid()
self._v_nextid = index + 1
return index
=== Added File Zope3/src/zope/app/services/hubcollaborations.txt ===
Sample Object-Hub collaborations
Participants:
eventService: IEventService
hub: IObjectHub
auto_reg_plugin: ISubscriber
"""An autoregistration plugin
This implements a policy of automatically registring objects
when they are added. It also implements a policy of
automatically removing objects that are moved to (or out of)
special locations.
This plugin is subscribed to the hub for IObjectAddedEvents and
IObjectMovedEvents.
"""
plugin1: ISubscriber
"""Some plugin
This plugin is subscribed to ObjectHubEvents
"""
queue: ISubscriber
"""An event queue plugin.
This plugin is subscribed to ObjectHubEvents.
"""
path_index: ISubscriber
"""An index that supports searching for objects by their paths
This plugin is subscribed to ObjectHubEvents
"""
links: ISubscriber
"""A link tracker
It will sometimes veto removal hub events if removing an
object would violate referential integrity.
"""
creation_view:
"some creation view"
adding: IAdding
folder:
"a folder containing cotent objects"
some_admin_view:
"A view that allows an unregistered object to be registered"
some_management_view:
"A view for managing the contents of a container"
objectRemovedEvent:IObjectRemovedEvent
"An event computed as: ObjectRemovedEvent(location, object)
Values:
add_event:IObjectAddedEvent
"Computed as ObjectAddedEvent(newLocation)"
newLocation:
"The location of newObject"
newObject:
"an object object added in a scenario"
id:Text
"The given id for the new object"
object:
"An object that exists prior to a scenario"
objectRegisteredHubEvent:IObjectRegisteredHubEvent
"Computed as ObjectRegisteredHubEvent(hub, hid, location)
location:
"The location of object"
hid:
"The hub-generated hub-id of the object.
Scenario: Object created and added to the hub
creation_view.action()
adding.add(newObject)
folder.setObject(id, newObject)
eventService.publishEvent(AddEvent(location))
hub.notify(addedEvent)
auto_reg_plugin.notify(addedEvent)
hub.registerAdded(location, object)
plugin1.notify(objectAddedHubEvent)
queue.notify(objectAddedHubEvent)
path_index.notify(objectAddedHubEvent)
links.notify(objectAddedHubEvent)
Scenario: Previously created object added to the hub
some_admin_view.action()
hub.register(location, object)
plugin1.notify(objectRegisteredHubEvent)
queue.notify(objectRegisteredHubEvent)
path_index.notify(objectRegisteredHubEvent)
links.notify(objectRegisteredHubEvent)
Scenario: Moved an object that has been registered
some_management_view.action()
eventService.publishEvent(objectMovedEvent)
hub.notify(objectMovedEvent)
auto_reg_plugin.notify(objectMovedEvent)
# It might have decided to unregister the object
# on the basis of the destination
path_index.notify(objectMovedHubEvent)
Scenario: A previously registered object is deleted
some_management_view.delete()
del folder[id]
eventService.publishEvent(objectRemovedEvent)
hub.notify(objectRemovedEvent)
plugin1.notify(objectRemovedHubEvent)
queue.notify(objectRemovedHubEvent)
path_index.notify(objectRemovedHubEvent)
links.notify(objectRemovedHubEvent)
Scenario: A previously registered object is deleted, but would break
references. We assume we have a links plugin that tracks
links between objects.
some_management_view.delete()
eventService.publishEvent(objectRemovedEvent)
hub.notify(objectRemovedEvent)
plugin1.notify(objectRemovedHubEvent)
queue.notify(objectRemovedHubEvent)
path_index.notify(objectRemovedHubEvent)
links.notify(objectRemovedHubEvent)
raise "That would break a link"
=== Added File Zope3/src/zope/app/services/module.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Manager for persistent modules associated with a service manager.
$Id: module.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from persistence import Persistent
from zodb.code.module import PersistentModuleManager
from zodb.code.interfaces import IPersistentModuleManager
from zope.component import getServiceManager
from zope.proxy.context import ContextMethod
class Registry:
# The registry is found via context, but the PersistentModuleManager
# doesn't know about context. To make it behave contextually, this
# Registry class collaborates with Manager to delegate to the
# registry found via context.
def __init__(self):
self._v_manager = None
def setManager(self, ctx):
self._v_manager = ctx
def findModule(self, name):
return self._v_manager.findModule(name)
def setModule(self, name, module):
return self._v_manager.setModule(name, module)
def delModule(self, name):
return self._v_manager.delModule(name)
class Manager(Persistent):
__implements__ = IPersistentModuleManager
# The registry for the manager is the ServiceManager.
# The association between this manager and the registry
# is static, but the static association can't be stored
# explicitly in Zope.
# XXX There is no locking, but every call to setManager() for a
# particular instance should have the same manager argument.
# XXX It would be nice if the lookup via getServiceManager()
# occurred less often. Best would be to do it only when the
# object is unpickled.
def __init__(self):
self._registry = Registry()
self._manager = PersistentModuleManager(self._registry)
def new(self, name, source):
self._registry.setManager(getServiceManager(self))
self._manager.new(name, source)
def update(self, source):
self._registry.setManager(getServiceManager(self))
self._manager.update(source)
def remove(self, source):
self._registry.setManager(getServiceManager(self))
self._manager.remove(source)
new = ContextMethod(new)
update = ContextMethod(update)
remove = ContextMethod(remove)
name = property(lambda self: self._manager.name)
source = property(lambda self: self._manager.source)
=== Added File Zope3/src/zope/app/services/package.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""XXX short summary goes here.
XXX longer description goes here.
$Id: package.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
__metaclass__ = type
from zope.app.container.btree import BTreeContainer
from zope.proxy.context import ContextMethod
from zope.proxy.context.context import ContextWrapper
from Zope.App.Traversing import getPhysicalPathString
from zope.app.component.nextservice import getNextServiceManager
from zope.app.interfaces.services.service \
import IServiceManager
from zope.app.interfaces.services.service \
import IComponentManager
from zope.app.interfaces.services.package import IPackages
from zope.app.interfaces.services.package import IPackage
class Packages(BTreeContainer):
__implements__ = IPackages
def __init__(self):
super(Packages, self).__init__()
self.setObject('default', Package())
def queryComponent(self, type=None, filter=None, all=0):
local = []
path = getPhysicalPathString(self)
for package_name in self:
package = ContextWrapper(self[package_name], self,
name=package_name)
for name in package:
component = package[name]
if type is not None and not type.isImplementedBy(component):
continue
if filter is not None and not filter(component):
continue
local.append({'path': "%s/%s/%s" % (path, package_name, name),
'component': ContextWrapper(component, package,
name=name),
})
if all:
next_service_manager = getNextServiceManager(self)
if IComponentManager.isImplementedBy(next_service_manager):
next_service_manager.queryComponent(type, filter, all)
local += list(all)
return local
queryComponent = ContextMethod(queryComponent)
def setObject(self, name, object):
if not IPackage.isImplementedBy(object):
raise TypeError("Can only add packages")
return super(Packages, self).setObject(name, object)
"""XXX short summary goes here.
XXX longer description goes here.
$Id: package.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
__metaclass__ = type
from zope.app.container.btree import BTreeContainer
from zope.app.interfaces.services.package import IPackage
from zope.app.services.configurationmanager import ConfigurationManager
class Package(BTreeContainer):
__implements__ = IPackage
def __init__(self):
super(Package, self).__init__()
self.setObject('configure', ConfigurationManager())
=== Added File Zope3/src/zope/app/services/principalannotation.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Implementation of IPrincipalAnnotationService."""
# TODO: register service as adapter for IAnnotations on service activation
# this depends on existence of LocalAdapterService, so once that's done
# implement this.
# Zope3 imports
from persistence import Persistent
from zodb.btrees.OOBTree import OOBTree
from zope.app.component.nextservice import getNextService
from zope.proxy.context import ContextMethod
from zope.proxy.context.context import ContextWrapper
from zope.app.interfaces.annotation import IAnnotations
# Sibling imports
from zope.app.interfaces.services.principalannotation import IPrincipalAnnotationService
class PrincipalAnnotationService(Persistent):
"""Stores IAnnotations for IPrinicipals.
The service ID is 'PrincipalAnnotation'.
"""
__implements__ = IPrincipalAnnotationService, Persistent.__implements__
def __init__(self):
self.annotations = OOBTree()
# implementation of IPrincipalAnnotationService
def getAnnotation(self, principalId):
"""Return object implementing IAnnotations for the givin principal.
If there is no IAnnotations it will be created and then returned.
"""
if not self.annotations.has_key(principalId):
self.annotations[principalId] = Annotations(principalId)
return ContextWrapper(self.annotations[principalId], self, name=principalId)
getAnnotation = ContextMethod(getAnnotation)
def hasAnnotation(self, principalId):
"""Return boolean indicating if given principal has IAnnotations."""
return self.annotations.has_key(principalId)
class Annotations(Persistent):
"""Stores annotations."""
__implements__ = IAnnotations, Persistent.__implements__
def __init__(self, principalId):
self.principalId = principalId
self.data = OOBTree()
def __getitem__(wrapped_self, key):
try:
return wrapped_self.data[key]
except KeyError:
# We failed locally: delegate to a higher-level service.
service = getNextService(wrapped_self, 'PrincipalAnnotation')
if service:
return service.getAnnotation(wrapped_self.principalId)[key]
raise
__getitem__ = ContextMethod(__getitem__)
def __setitem__(self, key, value):
self.data[key] = value
def __delitem__(self, key):
del self.data[key]
def get(self, key, default=None):
try:
return self.data[key]
except KeyError:
return default
class AnnotationsForPrincipal(object):
"""Adapter from IPrincipal to IAnnotations for a PrincipalAnnotationService.
Register an *instance* of this class as an adapter.
"""
def __init__(self, service):
self.service = service
def __call__(self, principal):
return self.service.getAnnotation(principal.getId())
=== Added File Zope3/src/zope/app/services/role.gif ===
<Binary-ish file>
=== Added File Zope3/src/zope/app/services/role.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Revision information:
$Id: role.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from zope.app.security.registries.roleregistry import Role
from persistence import Persistent
class Role(Role, Persistent):
"Persistent Role"
"""
Revision information:
$Id: role.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from zope.app.container.btree import BTreeContainer
from zope.app.interfaces.security import IRoleService
from zope.app.interfaces.container import IContainer
from zope.proxy.context import ContextMethod
from zope.app.component.nextservice import getNextService
class ILocalRoleService(IRoleService, IContainer):
"""TTW manageable role service"""
class RoleService(BTreeContainer):
__implements__ = ILocalRoleService
############################################################
# Implementation methods for interface
# Zope.App.Security.IRoleService.
def getRole(wrapped_self, rid):
'''See interface IRoleService'''
try:
return wrapped_self[rid]
except KeyError:
# We failed locally: delegate to a higher-level service.
sv = getNextService(wrapped_self, 'Roles')
if sv:
return sv.getRole(rid)
raise # will be original Key Error
getRole = ContextMethod(getRole)
def getRoles(wrapped_self):
'''See interface IRoleService'''
roles = list(wrapped_self.values())
roleserv = getNextService(wrapped_self, 'Roles')
if roleserv:
roles.extend(roleserv.getRoles())
return roles
getRoles = ContextMethod(getRoles)
#
############################################################
=== Added File Zope3/src/zope/app/services/role_service.gif ===
<Binary-ish file>
=== Added File Zope3/src/zope/app/services/service.gif ===
<Binary-ish file>
=== Added File Zope3/src/zope/app/services/service.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Revision information:
$Id: service.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from zope.app.interfaces.services.service import IServiceManagerContainer
from zope.component.interfaces import IServiceService
from zope.component.exceptions import ComponentLookupError
_marker = object()
class ServiceManagerContainer:
__implements__ = IServiceManagerContainer
############################################################
# Implementation methods for interface
# Zope.App.ComponentArchitecture.IServiceManagerContainer.
def hasServiceManager(self):
'''See interface IReadServiceManagerContainer'''
return hasattr(self, '_ServiceManagerContainer__sm')
def getServiceManager(self):
'''See interface IReadServiceManagerContainer'''
try:
return self.__sm
except AttributeError:
raise ComponentLookupError('no service manager defined')
def queryServiceManager(self, default=None):
'''See interface IReadServiceManagerContainer'''
return getattr(self, '_ServiceManagerContainer__sm', default)
def setServiceManager(self, sm):
'''See interface IWriteServiceManagerContainer'''
if IServiceService.isImplementedBy(sm):
self.__sm = sm
else:
raise ValueError('setServiceManager requires an IServiceService')
#
############################################################
"""
$Id: service.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
from zope.app.interfaces.services.service import IServiceConfiguration
from zope.app.interfaces.services.service import IBindingAware
from zope.app.services.configuration import ConfigurationStatusProperty
from zope.app.services.configuration import NamedComponentConfiguration
from zope.proxy.context import ContextMethod
from zope.component import getServiceManager
class ServiceConfiguration(NamedComponentConfiguration):
__doc__ = IServiceConfiguration.__doc__
__implements__ = (IServiceConfiguration,
NamedComponentConfiguration.__implements__)
status = ConfigurationStatusProperty('Services')
label = "Service"
def __init__(self, *args, **kw):
super(ServiceConfiguration, self).__init__(*args, **kw)
def getInterface(self):
service_manager = getServiceManager(self)
return service_manager.getInterfaceFor(self.name)
getInterface = ContextMethod(getInterface)
def activated(self):
service = self.getComponent()
if IBindingAware.isImplementedBy(service):
service.bound(self.name)
activated = ContextMethod(activated)
def deactivated(self):
service = self.getComponent()
if IBindingAware.isImplementedBy(service):
service.unbound(self.name)
deactivated = ContextMethod(deactivated)
__doc__ = ServiceConfiguration.__doc__ + __doc__
"""XXX I need a summary line.
In addition, a ServiceManager acts as a registry for persistent
modules. The Zope import hook uses the ServiceManager to search for
modules.
$Id: service.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
import sys
from zope.app.component.nextservice \
import getNextServiceManager, getNextService
from zope.component.exceptions import ComponentLookupError
from zope.app.interfaces.container import ISimpleReadContainer
from zope.proxy.context import ContextMethod
from zope.proxy.context.context import ContextWrapper
from zope.proxy.introspection import removeAllProxies
from zope.app.services.package import Packages
from zope.app.interfaces.services.service import IServiceManager
from zope.app.services.configuration import NameComponentConfigurable
from zodb.code.module import PersistentModuleRegistry
from zodb.code.module import PersistentModule
from zope.app.interfaces.services.service import INameResolver
ModuleType = type(INameResolver)
ModuleType = ModuleType, PersistentModule
class ServiceManager(PersistentModuleRegistry, NameComponentConfigurable):
__implements__ = (IServiceManager, ISimpleReadContainer,
PersistentModuleRegistry.__implements__,
NameComponentConfigurable.__implements__,
INameResolver)
def __init__(self):
super(ServiceManager, self).__init__()
NameComponentConfigurable.__init__(self)
self.Packages = Packages()
def getServiceDefinitions(wrapped_self):
"See Zope.ComponentArchitecture.IServiceService.IServiceService"
# Get the services defined here and above us, if any (as held
# in a ServiceInterfaceService, presumably)
sm = getNextServiceManager(wrapped_self)
if sm is not None:
serviceDefs = sm.getServiceDefinitions()
else: serviceDefs = {}
return serviceDefs
getServiceDefinitions = ContextMethod(getServiceDefinitions)
def queryService(wrapped_self, name, default=None):
"See Zope.ComponentArchitecture.IServiceService.IServiceService"
try:
return wrapped_self.getService(name)
except ComponentLookupError:
return default
queryService = ContextMethod(queryService)
def getService(wrapped_self, name):
"See Zope.ComponentArchitecture.IServiceService.IServiceService"
# This is rather tricky. Normally, getting a service requires
# the use of other services, like the adapter service. We
# need to be careful not to get into an infinate recursion by
# getting out getService to be called while looking up
# services, so we'll
if name == 'Services':
return wrapped_self # We are the service service
if not getattr(wrapped_self, '_v_calling', 0):
wrapped_self._v_calling = 1
try:
service = wrapped_self.queryActiveComponent(name)
if service is not None:
return service
finally:
wrapped_self._v_calling = 0
return getNextService(wrapped_self, name)
getService = ContextMethod(getService)
def getInterfaceFor(wrapped_self, service_type):
"See Zope.ComponentArchitecture.IServiceService.IServiceService"
for type, interface in wrapped_self.getServiceDefinitions():
if type == service_type:
return interface
raise NameError(service_type)
getInterfaceFor = ContextMethod(getInterfaceFor)
############################################################
# Implementation methods for interface
# Zope.App.OFS.Services.ServiceManager.IComponentManager.
def queryComponent(wrapped_self, type=None, filter=None, all=0):
Packages = ContextWrapper(wrapped_self.Packages, wrapped_self,
name='Packages')
return Packages.queryComponent(type, filter, all)
queryComponent = ContextMethod(queryComponent)
#
############################################################
# We provide a mapping interface for traversal, but we only expose
# local services through the mapping interface.
def __getitem__(self, key):
"See Interface.Common.Mapping.IReadMapping"
result = self.get(key)
if result is None:
raise KeyError(key)
return result
def get(wrapped_self, key, default=None):
"See Interface.Common.Mapping.IReadMapping"
if key == 'Packages':
return wrapped_self.Packages
service = wrapped_self.queryActiveComponent(key)
if service is None:
return default
return service
get = ContextMethod(get)
def __contains__(self, key):
"See Interface.Common.Mapping.IReadMapping"
return self.get(key) is not None
def findModule(wrapped_self, name):
# override to pass call up to next service manager
mod = super(ServiceManager,
removeAllProxies(wrapped_self)).findModule(name)
if mod is not None:
return mod
sm = getNextServiceManager(wrapped_self)
try:
findModule = sm.findModule
except AttributeError:
# The only service manager that doesn't implement this
# interface is the global service manager. There is no
# direct way to ask if sm is the global service manager.
return None
return findModule(name)
findModule = ContextMethod(findModule)
def __import(wrapped_self, module_name):
mod = wrapped_self.findModule(module_name)
if mod is None:
mod = sys.modules.get(module_name)
if mod is None:
raise ImportError(module_name)
return mod
__import = ContextMethod(__import)
def resolve(wrapped_self, name):
name = name.strip()
if name.endswith('.') or name.endswith('+'):
name = name[:-1]
repeat = 1
else:
repeat = 0
names=name.split('.')
last=names[-1]
mod='.'.join(names[:-1])
if not mod:
return wrapped_self.__import(name)
while 1:
m = wrapped_self.__import(mod)
try:
a=getattr(m, last)
except AttributeError:
if not repeat:
return wrapped_self.__import(name)
else:
if not repeat or (not isinstance(a, ModuleType)):
return a
mod += '.' + last
resolve = ContextMethod(resolve)
=== Added File Zope3/src/zope/app/services/session.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Simplistic session service implemented using cookies.
This is more of a demonstration than a full implementation, but it should
work.
"""
# System imports
import sha, time, string, random, hmac
# Zope3 imports
from persistence import Persistent
from persistence.dict import PersistentDict
from zope.server.http.http_date import build_http_date
# Sibling imports
from zope.app.interfaces.services.session import ISessionService, IConfigureSessionService
cookieSafeTrans = string.maketrans("+/", "-.")
def digestEncode(s):
"""Encode SHA digest for cookie."""
return s.encode("base64")[:-2].translate(cookieSafeTrans)
class CookieSessionService(Persistent):
"""Session service implemented using cookies."""
__implements__ = Persistent.__implements__, ISessionService, IConfigureSessionService
def __init__(self):
self.dataManagers = PersistentDict()
self.namespace = "zope3-cs-%x" % (int(time.time()) - 1000000000)
self.secret = "%.20f" % random.random()
def generateUniqueId(self):
"""Generate a new, random, unique id."""
data = "%.20f%.20f%.20f" % (random.random(), time.time(), time.clock())
digest = sha.sha(data).digest()
s = digestEncode(digest)
# we store a HMAC of the random value together with it, which makes
# our session ids unforgeable.
mac = hmac.new(s, self.secret, digestmod=sha).digest()
return s + digestEncode(mac)
def getRequestId(self, request):
"""Return the sessionId encoded in request or None if it's non-existent."""
sid = request.cookies.get(self.namespace)
if sid is None or len(sid) != 54:
return None
s, mac = sid[:27], sid[27:]
if digestEncode(hmac.new(s, self.secret, digestmod=sha).digest()) != mac:
return None
else:
return sid
def setRequestId(self, request, id):
"""Set cookie with id on request."""
request.response.setCookie(self.namespace, id, expires=build_http_date(time.time() + 1800))
#####################################
# Implementation of ISessionService #
def getSessionId(self, request):
sid = self.getRequestId(request)
if sid is None:
sid = self.generateUniqueId()
self.setRequestId(request, sid)
return sid
def invalidate(self, sessionId):
for d in self.dataManagers.values():
d.deleteData(sessionId)
def getDataManager(self, name):
return self.dataManagers[name]
def registerDataManager(self, name, dataManager):
if self.dataManagers.has_key(name):
raise ValueError, "DataManager already registered with name %r" % name
self.dataManagers[name] = dataManager
def unregisterDataManager(self, name):
del self.dataManagers[name]
=== Added File Zope3/src/zope/app/services/session.txt ===
Sessions allow us to fake state over a stateless protocol - HTTP. We do this
by having a unique identifier stored across multiple HTTP requests, be it
a cookie or some id mangled into the URL.
ISessionService provides this unique id. ISessionDataManagers attach data
objects of some sort to a specific session id. This data object may be
a Persistent ZODB object, or an object that stores data in a RDBMS using
the session id as the key.
ISessionDataManagers are registered with ISessionServices using a name,
which ought to be unique for each application. Thus you can have multiple
data objects registered with a single session.
ISessionServices may choose to expire sessions. In this case they
should notify all ISessionDataManagers registered with them and
invalidate the attached data objects. Likewise ISessionDataManagers
probably want to expire data.
--
$Id: session.txt,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
=== Added File Zope3/src/zope/app/services/test_cookiesessionservice.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from unittest import TestCase, TestLoader, TextTestRunner
from zope.app.services.tests.placefulsetup \
import PlacefulSetup
from zope.component import getServiceManager, getService
from zope.server.http.http_date import parse_http_date
from zope.app.interfaces.services.session import \
ISessionService, ISessionDataManager
from zope.app.services.session import \
CookieSessionService
import time
class DummyDataManager:
__implements__ = ISessionDataManager
def __init__(self):
self.data = {}
def getDataObject(self, sid):
return self.data.setdefault(sid, {})
def deleteData(self, sid):
del self.data[sid]
class FakeRequest:
def __init__(self):
self.sets = 0
self.cookies = {}
self.response = self
def setCookie(self, k, v, **kw):
self.sets += 1
self.cookies[k] = v
if not abs(parse_http_date(kw["expires"]) - int(time.time()) - 1800) < 3:
raise AssertionError
class SessionServiceTestCaseMixin(PlacefulSetup):
serviceFactory = None
def setUp(self):
PlacefulSetup.setUp(self)
self.buildFolders()
root_sm = getServiceManager(None)
svc = self.serviceFactory()
root_sm.defineService("SessionService", ISessionService)
root_sm.provideService("SessionService", svc)
self.svc = getService(self.rootFolder, "SessionService")
def testRegister(self):
d = DummyDataManager()
d2 = DummyDataManager()
self.svc.registerDataManager("foo", d)
self.assertRaises(ValueError, self.svc.registerDataManager, "foo", d2)
self.assertEquals(self.svc.getDataManager("foo"), d)
self.svc.unregisterDataManager("foo")
self.assertRaises(KeyError, self.svc.getDataManager, "foo")
self.svc.registerDataManager("foo", d2)
def testCookie(self):
req = FakeRequest()
sid = self.svc.generateUniqueId()
self.svc.setRequestId(req, sid)
self.assertEquals(self.svc.getRequestId(req), sid)
def testGetSession(self):
req = FakeRequest()
sid = self.svc.getSessionId(req)
self.assertEquals(req.sets, 1)
self.assertEquals(self.svc.getRequestId(req), sid)
self.assertEquals(self.svc.getSessionId(req), sid)
# make sure cookie was also set during 2nd getSessionId
self.assertEquals(req.sets, 2)
def testLookupAndInvalidate(self):
dm = DummyDataManager()
svc = self.svc
svc.registerDataManager("dm", dm)
req = FakeRequest()
from Zope.App.OFS.Services.SessionService import getSessionDataObject
d = getSessionDataObject(self.rootFolder, req, "dm")
d["a"] = "b"
self.assert_(d is dm.getDataObject(svc.getSessionId(req)))
self.assertEquals("b", dm.getDataObject(svc.getSessionId(req))["a"])
svc.invalidate(svc.getSessionId(req))
d2 = getSessionDataObject(self.rootFolder, req, "dm")
self.assertEquals(d2, {})
def testForgingCookies(self):
for fakeValue in ["dsada", "2" * 54]:
req = FakeRequest()
self.svc.setRequestId(req, fakeValue)
self.assertEquals(self.svc.getRequestId(req), None)
class CookieServiceTestCase(SessionServiceTestCaseMixin, TestCase):
serviceFactory = CookieSessionService
def test_suite():
loader=TestLoader()
return loader.loadTestsFromTestCase(CookieServiceTestCase)
if __name__=='__main__':
TextTestRunner().run(test_suite())
=== Added File Zope3/src/zope/app/services/view.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""View Service
$Id: view.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
__metaclass__ = type
from persistence import Persistent
from persistence.dict import PersistentDict
from zope.component.interfaces import IViewService
from zope.component.exceptions import ComponentLookupError
from zope.component import getServiceManager
from zope.app.interfaces.services.configuration import IConfigurable
from zope.app.services.configuration import ConfigurationRegistry
from zope.app.services.configuration import SimpleConfiguration
from zope.proxy.context.context import ContextWrapper
from zope.proxy.context import ContextMethod
from zope.app.services.configuration import ConfigurationStatusProperty
from zope.app.component.nextservice import getNextService
from zope.component import getSkin
from zope.proxy.introspection import removeAllProxies
from Zope.App.Traversing import getPhysicalRoot, traverse
from zope.exceptions import NotFoundError
from zope.app.interfaces.services.interfaces import IViewConfiguration, IPageConfiguration
from zope.app.services.adapter import PersistentAdapterRegistry
class ViewService(Persistent):
__implements__ = IViewService, IConfigurable
def __init__(self):
self._layers = PersistentDict()
def queryConfigurationsFor(self, configuration, default=None):
"See Zope.App.OFS.Services.ConfigurationInterfaces.IConfigurable"
return self.queryConfigurations(
configuration.viewName, configuration.layer,
configuration.forInterface, configuration.presentationType,
default)
queryConfigurationsFor = ContextMethod(queryConfigurationsFor)
def queryConfigurations(self, name, layer,
forInterface, presentationType, default=None):
names = self._layers.get(layer)
if names is None:
return default
adapter_registry = names.get(name)
if adapter_registry is None:
return default
registry = adapter_registry.getRegistered(
forInterface, presentationType)
if registry is None:
return default
return ContextWrapper(registry, self)
queryConfigurations = ContextMethod(queryConfigurations)
def createConfigurationsFor(self, configuration):
"See Zope.App.OFS.Services.ConfigurationInterfaces.IConfigurable"
return self.createConfigurations(
configuration.viewName, configuration.layer,
configuration.forInterface, configuration.presentationType)
createConfigurationsFor = ContextMethod(createConfigurationsFor)
def createConfigurations(self,
viewName, layer, forInterface, presentationType):
names = self._layers.get(layer)
if names is None:
names = PersistentDict()
self._layers[layer] = names
adapter_registry = names.get(viewName)
if adapter_registry is None:
adapter_registry = PersistentAdapterRegistry()
names[viewName] = adapter_registry
registry = adapter_registry.getRegistered(
forInterface, presentationType)
if registry is None:
registry = ConfigurationRegistry()
adapter_registry.register(forInterface, presentationType, registry)
return ContextWrapper(registry, self)
createConfigurations = ContextMethod(createConfigurations)
def getView(self, object, name, request):
view = self.queryView(object, name, request)
if view is None:
raise ComponentLookupError(object, name)
return view
getView = ContextMethod(getView)
def queryView(self, object, name, request, default=None):
type = request.getPresentationType()
skin = request.getPresentationSkin()
for layername in getSkin(object, skin, type):
layer = self._layers.get(layername)
if not layer:
continue
reg = layer.get(name, None)
if reg is None:
continue
registry = reg.getForObject(
object, type,
filter = lambda registry:
ContextWrapper(registry, self).active(),
)
if registry is None:
continue
registry = ContextWrapper(registry, self)
view = registry.active().getView(object, request)
return view
views = getNextService(self, 'Views')
return views.queryView(object, name, request, default)
queryView = ContextMethod(queryView)
def getDefaultViewName(self, object, request):
"See Zope.ComponentArchitecture.IViewService.IViewService"
name = self.queryDefaultViewName(object, request)
if name is None:
raise NotFoundError, \
'No default view name found for object %s' % object
return name
getDefaultViewName = ContextMethod(getDefaultViewName)
def queryDefaultViewName(self, object, request, default=None):
"See Zope.ComponentArchitecture.IViewService.IViewService"
# XXX: need to do our own defaults as well.
views = getNextService(self, 'Views')
return views.queryDefaultViewName(object, request, default)
queryDefaultViewName = ContextMethod(queryDefaultViewName)
def getRegisteredMatching(self,
required_interfaces=None,
presentation_type=None,
viewName=None,
layer=None,
):
if layer is None:
layers = self._layers.keys()
else:
layers = (layer, )
result = []
for layer in layers:
names_dict = self._layers.get(layer)
if names_dict is None:
continue
if viewName is None:
viewNames = names_dict.keys()
else:
viewNames = (viewName, )
for viewName in viewNames:
registry = names_dict.get(viewName)
if registry is None:
continue
for match in registry.getRegisteredMatching(
required_interfaces,
presentation_type):
result.append(match + (layer, viewName))
return result
class ViewConfiguration(SimpleConfiguration):
__implements__ = IViewConfiguration
status = ConfigurationStatusProperty('Views')
def __init__(self,
forInterface, viewName, presentationType,
factoryName, layer='default'):
self.forInterface = forInterface
self.presentationType = presentationType
self.factoryName = factoryName
self.viewName = viewName
self.layer = layer
def getView(self, object, request):
sm = getServiceManager(self)
factory = sm.resolve(self.factoryName)
return factory(object, request)
getView = ContextMethod(getView)
class PageConfiguration(ViewConfiguration):
__implements__ = IPageConfiguration
def __init__(self,
forInterface, viewName, presentationType,
factoryName=None, template=None,
layer='default'):
super(PageConfiguration, self).__init__(
forInterface, viewName, presentationType,
factoryName, layer)
self.template = template
def getView(self, object, request):
sm = getServiceManager(self)
if self.factoryName:
factory = sm.resolve(self.factoryName)
else:
factory = DefaultFactory
view = factory(object, request)
# This is needed because we need to do an unrestricted traverse
root = removeAllProxies(getPhysicalRoot(sm))
template = traverse(root, self.template)
return BoundTemplate(template, view)
getView = ContextMethod(getView)
class DefaultFactory:
def __init__(self, context, request):
self.context = context
self.request = request
class BoundTemplate:
def __init__(self, template, view):
self.template = template
self.view = view
def __call__(self, *args, **kw):
return self.template.render(self.view, *args, **kw)
=== Added File Zope3/src/zope/app/services/viewpackage.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""View package.
$Id: viewpackage.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
__metaclass__ = type
from zope.app.container.btree import BTreeContainer
from zope.app.interfaces.services.interfaces import IZPTTemplate
from zope.publisher.interfaces.browser import IBrowserPresentation
from Zope.App.Traversing import getPhysicalPathString, traverse
from zope.proxy.context.context import getItem, getAttr
from zope.proxy.context import ContextMethod
from zope.app.interfaces.services.configuration import Active
from zope.app.services.configurationmanager \
import ConfigurationManager
from zope.app.services.configuration import ConfigurationStatusProperty
from zope.proxy.introspection import removeAllProxies
from zope.app.services.view import PageConfiguration
from zope.app.interfaces.services.servicemanager.interfaces import IViewPackage
class ViewPackage(BTreeContainer):
__implements__ = IViewPackage
presentationType = IBrowserPresentation
layer = "default"
description = ''
title = ''
def __init__(self):
super(ViewPackage, self).__init__()
super(ViewPackage, self).setObject('configure', ConfigurationManager())
def setObject(self, name, object):
if not IZPTTemplate.isImplementedBy(object):
raise TypeError("Can only add packages")
# super() does not work on a context wrapped instance
base = removeAllProxies(self)
name = super(ViewPackage, base).setObject(name, object)
template = getItem(self, name)
template = getPhysicalPathString(template)
config = PageConfiguration(self.forInterface, name,
self.presentationType,
self.factoryName, template,
self.layer)
configure = traverse(self, 'configure')
id = configure.setObject('', config)
config = getItem(configure, id)
config.status = Active
return name
setObject = ContextMethod(setObject)
def activated(self):
"See Zope.App.OFS.Services.ConfigurationInterfaces.IConfiguration"
def deactivated(self):
"See Zope.App.OFS.Services.ConfigurationInterfaces.IConfiguration"
=== Added File Zope3/src/zope/app/services/zpt.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: zpt.py,v 1.1.2.1 2002/12/23 19:32:22 jim Exp $
"""
import re
from zope.interface import Interface
from zope.interface.element import Attribute
import zope.schema
from persistence import Persistent
from zope.proxy.context import ContextMethod
from zope.proxy.context.context import getWrapperContainer
from zope.security.proxy import ProxyFactory
from zope.app.interfaces.content.file import IFileContent
from zope.pagetemplate.pagetemplate import PageTemplate
from zope.app.pagetemplate.engine import AppPT
from zope.app.interfaces.services.interfaces import IZPTTemplate
class ZPTTemplate(AppPT, PageTemplate, Persistent):
__implements__ = IZPTTemplate
contentType = 'text/html'
source = property(
# get
lambda self: self.read(),
# set
lambda self, text: self.pt_edit(text.encode('utf-8'), self.contentType)
)
def pt_getContext(self, view, **_kw):
# instance is a View component
namespace = super(ZPTTemplate, self).pt_getContext(**_kw)
namespace['view'] = view
namespace['request'] = view.request
namespace['context'] = view.context
return namespace
def render(self, view, *args, **keywords):
if args:
args = ProxyFactory(args)
kw = ProxyFactory(keywords)
namespace = self.pt_getContext(view, args=args, options=kw)
return self.pt_render(namespace)
# Adapter for ISearchableText
from zope.app.interfaces.index.text.interfaces import ISearchableText
tag = re.compile(r"<[^>]+>")
class SearchableText:
__implements__ = ISearchableText
__used_for__ = IZPTTemplate
def __init__(self, page):
self.page = page
def getSearchableText(self):
text = self.page.source
if isinstance(text, str):
text = unicode(self.page.source, 'utf-8')
# else:
# text was already Unicode, which happens, but unclear how it
# gets converted to Unicode since the ZPTPage stores UTF-8 as
# an 8-bit string.
if self.page.contentType.startswith('text/html'):
text = tag.sub('', text)
return [text]