[Zope3-checkins] CVS: Zope3/src/zope/app/services - event.py:1.6
Steve Alexander
steve@cat-box.net
Mon, 30 Dec 2002 09:04:45 -0500
Update of /cvs-repository/Zope3/src/zope/app/services
In directory cvs.zope.org:/tmp/cvs-serv24841/src/zope/app/services
Added Files:
event.py
Log Message:
Had to commit this file separately due to CVS strangeness.
=== Zope3/src/zope/app/services/event.py 1.5 => 1.6 ===
--- /dev/null Mon Dec 30 09:04:45 2002
+++ Zope3/src/zope/app/services/event.py Mon Dec 30 09:04:45 2002
@@ -0,0 +1,444 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Local Event Service and related classes.
+
+$Id$
+"""
+
+from zope.exceptions import NotFoundError
+
+from zope.app.interfaces.event import ISubscribingAware, IPublisher, IEvent
+from zope.app.interfaces.traversing import ITraverser
+from zope.app.interfaces.services.event import ISubscriptionService
+from zope.app.interfaces.services.event import IEventChannel, IEventService
+from zope.app.interfaces.services.service import IBindingAware
+
+from zope.component import getAdapter, getService, queryService
+from zope.component import ComponentLookupError
+from zope.app.component.nextservice import getNextService, queryNextService
+
+from zope.proxy.context import ContextMethod
+from zope.proxy.introspection import removeAllProxies
+
+from zope.app.event.subs import Subscribable, SubscriptionTracker
+
+
+def getSubscriptionService(context):
+ return getService(context, "Subscription")
+
+def subscribe(subscriber, event_type=IEvent, filter=None, context=None):
+ if context is None:
+ context = subscriber
+ return getSubscriptionService(context).subscribe(
+ subscriber, event_type, filter)
+
+def subscribeMany(subscriber, event_types=(IEvent,),
+ filter=None, context=None):
+ if context is None:
+ context = subscriber
+ subscribe = getSubscriptionService(context).subscribe
+ for event_type in event_types:
+ subscribe(subscriber, event_type, filter)
+
+def unsubscribe(subscriber, event_type=None, filter=None, context=None):
+ if context is None:
+ context = subscriber
+ return getSubscriptionService(context).unsubscribe(
+ subscriber, event_type, filter)
+
+def listSubscriptions(subscriber, event_type=None, context=None):
+ if context is None:
+ context = subscriber
+ return getSubscriptionService(context).listSubscriptions(
+ subscriber, event_type)
+
+
+class EventChannel(Subscribable):
+
+ __implements__ = IEventChannel
+
+ # needs __init__ from zope.app.event.subs.Subscribable
+
+ def _notify(clean_self, wrapped_self, event):
+ subscriptionsForEvent = clean_self._registry.getAllForObject(event)
+ hubGet = getService(wrapped_self, "HubIds").getObject
+ pathGet = getAdapter(wrapped_self, ITraverser).traverse
+
+ badSubscribers = {}
+
+ for subscriptions in subscriptionsForEvent:
+ for subscriber,filter in subscriptions:
+ if filter is not None and not filter(event):
+ continue
+ if isinstance(subscriber, int):
+ try:
+ obj = hubGet(subscriber)
+ except NotFoundError:
+ badSubscribers[subscriber] = 1
+ continue
+ else:
+ try:
+ obj = pathGet(subscriber)
+ except NotFoundError:
+ badSubscribers[subscriber] = 1
+ continue
+ obj.notify(event)
+
+ for subscriber in badSubscribers.keys():
+ clean_self.unsubscribe(subscriber)
+
+
+ def notify(wrapped_self, event):
+ clean_self = removeAllProxies(wrapped_self)
+ clean_self._notify(wrapped_self, event)
+ notify = ContextMethod(notify)
+
+
+class ServiceSubscriberEventChannel(SubscriptionTracker, EventChannel):
+ """An event channel that wants to subscribe to the nearest
+ event service when bound, and unsubscribe when unbound.
+ """
+
+ __implements__ = (
+ EventChannel.__implements__,
+ SubscriptionTracker.__implements__,
+ IBindingAware
+ )
+
+ def __init__(self):
+ SubscriptionTracker.__init__(self)
+ EventChannel.__init__(self)
+
+ subscribeOnBind = True
+ # if true, event service will subscribe
+ # to the parent event service on binding, unless the parent
+ # service is the global event service; see 'bound' method
+ # below
+
+ _serviceName = None
+ # the name of the service that this object is providing, or
+ # None if unbound
+
+ _subscribeToServiceName = "Subscriptions"
+ _subscribeToServiceInterface = IEvent
+ _subscribeToServiceFilter = None
+
+ def bound(wrapped_self, name):
+ "See IBindingAware"
+ # Note: if a component is used for more than one service then
+ # this and the unbound code must be conditional for the
+ # pertinent service that should trigger event subscription
+ clean_self = removeAllProxies(wrapped_self)
+ clean_self._serviceName = name # for LocalServiceSubscribable
+ if clean_self.subscribeOnBind:
+ es = queryService(wrapped_self, clean_self._subscribeToServiceName)
+ if es is not None:
+ es.subscribe(
+ wrapped_self,
+ clean_self._subscribeToServiceInterface,
+ clean_self._subscribeToServiceFilter
+ )
+ bound = ContextMethod(bound)
+
+ def unbound(wrapped_self, name):
+ "See IBindingAware"
+ # see comment in "bound" above
+ clean_self = removeAllProxies(wrapped_self)
+ getPath = getAdapter(wrapped_self, ITraverser).traverse
+ for subscription in clean_self._subscriptions:
+ subscribable = getPath(subscription[0])
+ subscribable.unsubscribe(wrapped_self)
+ clean_self._subscriptions = ()
+ clean_self._serviceName = None
+ unbound = ContextMethod(unbound)
+
+
+
+class ServiceSubscribable(Subscribable):
+ """A mix-in for local event services.
+
+ * unsubscribe() asks the next higher service to unsubscribe if this
+ service cannot.
+
+ * listSubscriptions() includes this service's subscriptions, and
+ those of the next higher service.
+ """
+
+ __implements__ = Subscribable.__implements__
+
+ _serviceName = None # should be replaced; usually done in "bound"
+ # method of a subclass that is IBindingAware
+
+ # uses (and needs) __init__ from zope.app.event.subs.Subscribable
+
+ def unsubscribe(wrapped_self, subscriber, event_type=None, filter=None):
+ originalSubscriber = subscriber
+ clean_self = removeAllProxies(wrapped_self)
+ subscribers, clean_subObj, subObj = clean_self._getSubscribers(
+ wrapped_self, subscriber)
+
+ try:
+ ev_sets = clean_self._getEventSets(subscribers)
+ except NotFoundError:
+ next_service = queryNextService(wrapped_self,
+ clean_self._serviceName)
+ if next_service is not None:
+ next_service.unsubscribe(originalSubscriber,
+ event_type,
+ filter)
+ elif event_type is not None:
+ raise NotFoundError(originalSubscriber,
+ event_type,
+ filter)
+ return
+
+ do_alert = (subObj is not None and
+ ISubscribingAware.isImplementedBy(clean_subObj))
+
+ clean_self._p_changed = 1
+
+ if event_type is not None:
+ # we have to clean out one and only one subscription of this
+ # subscriber for event_type, filter (there may be more, even for
+ # this exact combination of subscriber, event_type, filter; we
+ # only delete *one*)
+ ev_type = event_type
+
+ # *** handle optimization: a subscription to IEvent is a
+ # subscription to all events; this is converted to 'None' so
+ # that the _registry can shortcut some of its tests
+ if event_type is IEvent:
+ ev_type = None
+ for (subscriber, subscriber_index), ev_set in ev_sets.items():
+ if ev_type in ev_set:
+ subscriptions = clean_self._registry.get(ev_type)
+ if subscriptions:
+ try:
+ subscriptions.remove((subscriber, filter))
+ except ValueError:
+ pass
+ else:
+ if do_alert:
+ subObj.unsubscribedFrom(
+ wrapped_self, event_type, filter)
+ ev_set[ev_type] -= 1
+ if ev_set[ev_type] < 1:
+ for sub in subscriptions:
+ if sub[0] == subscriber:
+ break
+ else:
+ if len(ev_set) > 1:
+ del ev_set[ev_type]
+ else: # len(ev_set) == 1
+ del clean_self._subscribers[
+ subscriber_index]
+ break
+ else:
+ next_service = queryNextService(wrapped_self,
+ clean_self._serviceName)
+ if next_service is not None:
+ next_service.unsubscribe(originalSubscriber,
+ event_type,
+ filter)
+ else:
+ raise NotFoundError(originalSubscriber, event_type, filter)
+ else:
+ # we have to clean all the event types out (ignoring filter)
+ clean_self._cleanAllForSubscriber(wrapped_self,
+ ev_sets,
+ do_alert,
+ subObj)
+ next_service = queryNextService(wrapped_self,
+ clean_self._serviceName)
+ if next_service is not None:
+ next_service.unsubscribe(originalSubscriber,
+ event_type,
+ filter)
+ unsubscribe = ContextMethod(unsubscribe)
+
+ def listSubscriptions(wrapped_self, subscriber, event_type=None):
+ clean_self = removeAllProxies(wrapped_self)
+ subscribers, clean_subObj, subObj = clean_self._getSubscribers(
+ wrapped_self, subscriber)
+
+ result=[]
+ if event_type:
+ ev_type = event_type
+ if event_type is IEvent:
+ ev_type = None # handle optimization
+ subscriptions = clean_self._registry.get(ev_type)
+ if subscriptions:
+ for sub in subscriptions:
+ for subscriber in subscribers:
+ if sub[0] == subscriber:
+ result.append((event_type, sub[1]))
+ else:
+ try:
+ ev_sets = clean_self._getEventSets(subscribers)
+ except NotFoundError:
+ return result
+ for (subscriber, subscriber_index), ev_set in ev_sets.items():
+ for ev_type in ev_set:
+ subscriptions = clean_self._registry.get(ev_type)
+ if subscriptions:
+ if ev_type is None:
+ ev_type = IEvent
+ for sub in subscriptions:
+ if sub[0] == subscriber:
+ result.append((ev_type, sub[1]))
+ next_service = queryNextService(wrapped_self, clean_self._serviceName)
+ if next_service is not None:
+ result.extend(
+ next_service.listSubscriptions(subscriber, event_type)
+ )
+ return result
+ listSubscriptions = ContextMethod(listSubscriptions)
+
+
+
+class EventService(ServiceSubscriberEventChannel, ServiceSubscribable):
+
+ __implements__ = (
+ IEventService,
+ ServiceSubscribable.__implements__,
+ ServiceSubscriberEventChannel.__implements__
+ )
+
+ def __init__(self):
+ ServiceSubscriberEventChannel.__init__(self)
+ ServiceSubscribable.__init__(self)
+
+ def isPromotableEvent(self, event):
+ """A hook. Returns True if, when publishing an event, the event
+ should also be promoted to the next (higher) level of event service,
+ and False otherwise."""
+ # XXX A probably temporary appendage. Depending on the usage,
+ # this should be (a) kept as is, (b) made into a registry, or
+ # (c) removed.
+ return True
+
+ def publish(wrapped_self, event):
+ "see IEventPublisher"
+ clean_self = removeAllProxies(wrapped_self)
+ clean_self._notify(wrapped_self, event)
+
+ publishedEvents = getattr(clean_self, "_v_publishedEvents", None)
+ if publishedEvents is None:
+ publishedEvents = clean_self._v_publishedEvents=[event]
+ else:
+ publishedEvents.append(event)
+ if (clean_self.isPromotableEvent(event)):
+ getNextService(wrapped_self, 'Events').publish(event)
+ publishedEvents.remove(event)
+ publish = ContextMethod(publish)
+
+ def notify(wrapped_self, event):
+ "see ISubscriber"
+ clean_self = removeAllProxies(wrapped_self)
+ publishedEvents = getattr(clean_self, "_v_publishedEvents", None)
+ if publishedEvents is None or event not in publishedEvents:
+ clean_self._notify(wrapped_self, event)
+ notify = ContextMethod(notify)
+
+ def bound(wrapped_self, name):
+ "See IBindingAware"
+ if name == "Subscription":
+ clean_self = removeAllProxies(wrapped_self)
+ clean_self._serviceName = name # for LocalServiceSubscribable
+ if clean_self.subscribeOnBind:
+ try:
+ es = getNextService(wrapped_self, "Subscription")
+ except ComponentLookupError:
+ pass
+ else:
+ es.subscribe(wrapped_self)
+ bound = ContextMethod(bound)
+
+ # _unbound = ServiceSubscriberEventChannel.unbound # see comment below
+
+ def unbound(wrapped_self, name):
+ "See IBindingAware"
+ if name == "Subscription":
+ clean_self = removeAllProxies(wrapped_self)
+ clean_self._v_unbinding = True
+ # this flag is used by the unsubscribedFrom method (below) to
+ # determine that it doesn't need to further unsubscribe beyond
+ # what we're already doing.
+
+ # Both of the following approaches have wrapper/security
+ # problems:
+ #
+ # wrapped_self._unbound(name) # using _unbound above
+ # and
+ # ServiceSubscriberEventChannel.unbound(wrapped_self, name)
+ #
+ # so we're doing a copy and paste from
+ # ServiceSubscriberEventChannel:
+ #
+ # start copy/paste
+ getPath = getAdapter(wrapped_self, ITraverser).traverse
+ for subscription in clean_self._subscriptions:
+ subscribable = getPath(subscription[0])
+ subscribable.unsubscribe(wrapped_self)
+ clean_self._subscriptions = ()
+ clean_self._serviceName = None
+ # end copy/paste
+
+ for subscriber in clean_self._subscribers:
+ clean_self.__unsubscribeAllFromSelf(
+ wrapped_self, subscriber[0])
+ # unset flag
+ clean_self._v_unbinding = None
+ unbound = ContextMethod(unbound)
+
+ def __unsubscribeAllFromSelf(clean_self, wrapped_self, subscriber):
+ subscribers, clean_subObj, subObj = clean_self._getSubscribers(
+ wrapped_self, subscriber)
+ ev_sets = clean_self._getEventSets(subscribers)
+ do_alert = (subObj is not None and
+ ISubscribingAware.isImplementedBy(clean_subObj))
+ clean_self._p_changed = 1 # trigger persistence before change
+ clean_self._cleanAllForSubscriber(wrapped_self,
+ ev_sets,
+ do_alert,
+ subObj)
+
+ def unsubscribedFrom(wrapped_self, subscribable, event_type, filter):
+ "See ISubscribingAware"
+ clean_self = removeAllProxies(wrapped_self)
+ if getattr(clean_self, "_v_unbinding", None) is None:
+ # we presumably have been unsubscribed from a higher-level
+ # event service because that event service is unbinding
+ # itself: we need to remove the higher level event service
+ # from our subscriptions list and try to find another event
+ # service to which to attach
+ ServiceSubscriberEventChannel.unsubscribedFrom(
+ clean_self, subscribable, event_type, filter)
+ clean_subscribable = removeAllProxies(subscribable)
+ if ISubscriptionService.isImplementedBy(
+ removeAllProxies(clean_subscribable)):
+ try:
+ context = getService(wrapped_self, "Subscription")
+ # we do this instead of getNextService because the order
+ # of unbinding and notification of unbinding is not
+ # guaranteed
+ while removeAllProxies(context) in (
+ clean_subscribable, clean_self):
+ context = getNextService(context, "Subscription")
+ except ComponentLookupError:
+ pass
+ else:
+ context.subscribe(wrapped_self)
+ unsubscribedFrom = ContextMethod(unsubscribedFrom)
+