[Zope3-checkins] CVS: Zope3/src/zope/app/services - event.py:1.8
Steve Alexander
steve@cat-box.net
Mon, 3 Feb 2003 10:59:48 -0500
Update of /cvs-repository/Zope3/src/zope/app/services
In directory cvs.zope.org:/tmp/cvs-serv23130/src/zope/app/services
Modified Files:
event.py
Log Message:
Large event service reimplementation.
=== Zope3/src/zope/app/services/event.py 1.7 => 1.8 ===
--- Zope3/src/zope/app/services/event.py:1.7 Tue Jan 28 06:30:55 2003
+++ Zope3/src/zope/app/services/event.py Mon Feb 3 10:59:16 2003
@@ -16,9 +16,11 @@
$Id$
"""
+from __future__ import generators
from zope.exceptions import NotFoundError
from zope.app.interfaces.event import ISubscribingAware, IPublisher, IEvent
+from zope.app.interfaces.event import ISubscriber
from zope.app.interfaces.traversing import ITraverser
from zope.app.interfaces.services.event import ISubscriptionService
from zope.app.interfaces.services.event import IEventChannel, IEventService
@@ -28,7 +30,7 @@
from zope.component import ComponentLookupError
from zope.app.component.nextservice import getNextService, queryNextService
-from zope.proxy.context import ContextMethod
+from zope.proxy.context import ContextMethod, ContextSuper
from zope.proxy.introspection import removeAllProxies
from zope.app.event.subs import Subscribable, SubscriptionTracker
@@ -38,31 +40,38 @@
return getService(context, "Subscription")
def subscribe(subscriber, event_type=IEvent, filter=None, context=None):
- if context is None:
+ if context is None and not isinstance(subscriber, (int, str, unicode)):
context = subscriber
return getSubscriptionService(context).subscribe(
subscriber, event_type, filter)
def subscribeMany(subscriber, event_types=(IEvent,),
filter=None, context=None):
- if context is None:
+ if context is None and not isinstance(subscriber, (int, str, unicode)):
context = subscriber
subscribe = getSubscriptionService(context).subscribe
for event_type in event_types:
subscribe(subscriber, event_type, filter)
-def unsubscribe(subscriber, event_type=None, filter=None, context=None):
- if context is None:
+def unsubscribe(subscriber, event_type, filter=None, context=None):
+ if context is None and not isinstance(subscriber, (int, str, unicode)):
context = subscriber
return getSubscriptionService(context).unsubscribe(
subscriber, event_type, filter)
-def listSubscriptions(subscriber, event_type=None, context=None):
- if context is None:
+def unsubscribeAll(subscriber, event_type=IEvent, context=None):
+ if context is None and not isinstance(subscriber, (int, str, unicode)):
context = subscriber
- return getSubscriptionService(context).listSubscriptions(
+ return getSubscriptionService(context).unsubscribeAll(
subscriber, event_type)
+def iterSubscriptions(subscriber=None, event_type=None, local_only=False,
+ context=None):
+ if context is None and not isinstance(subscriber, (int, str, unicode)):
+ context = subscriber
+ return getSubscriptionService(context).iterSubscriptions(
+ subscriber, event_type, local_only)
+
class EventChannel(Subscribable):
@@ -71,7 +80,7 @@
# needs __init__ from zope.app.event.subs.Subscribable
def _notify(clean_self, wrapped_self, event):
- subscriptionsForEvent = clean_self._registry.getAllForObject(event)
+ subscriptionsForEvent = clean_self._registry.getAllForObject(event)
hubGet = getService(wrapped_self, "HubIds").getObject
pathGet = getAdapter(wrapped_self, ITraverser).traverse
@@ -93,9 +102,17 @@
except NotFoundError:
badSubscribers[subscriber] = 1
continue
- obj.notify(event)
+ # Get an ISubscriber adapter in the context of the object
+ # This is probably the right context to use.
+ #
+ # Using getAdapter rather than queryAdapter because if there
+ # is no ISubscriber adapter available, that is an application
+ # error that should be fixed. So, failing is appropriate, and
+ # adding this subscriber to badSubscribers is inappropriate.
+ getAdapter(obj, ISubscriber).notify(event)
for subscriber in badSubscribers.keys():
+ # XXX this ought to be logged
clean_self.unsubscribe(subscriber)
def notify(wrapped_self, event):
@@ -133,6 +150,14 @@
_subscribeToServiceInterface = IEvent
_subscribeToServiceFilter = None
+ def subscribe(wrapped_self, reference, event_type=IEvent, filter=None):
+ if getattr(wrapped_self, "_v_ssecunbinding", None) is not None:
+ raise Exception(
+ 'Cannot subscribe to a subscriber that is unbinding.')
+ return ContextSuper(ServiceSubscriberEventChannel, wrapped_self
+ ).subscribe(reference, event_type, filter)
+ subscribe = ContextMethod(subscribe)
+
def bound(wrapped_self, name):
"See IBindingAware"
# Note: if a component is used for more than one service then
@@ -153,12 +178,27 @@
def unbound(wrapped_self, name):
"See IBindingAware"
# see comment in "bound" above
+
clean_self = removeAllProxies(wrapped_self)
- getPath = getAdapter(wrapped_self, ITraverser).traverse
- for subscription in clean_self._subscriptions:
- subscribable = getPath(subscription[0])
- subscribable.unsubscribe(wrapped_self)
- clean_self._subscriptions = ()
+
+ # unsubscribe all subscriptions
+ hubIds = clean_self._hubIds
+ unsubscribeAll = wrapped_self.unsubscribeAll
+ try:
+ clean_self._v_ssecunbinding = True
+ while hubIds:
+ hubId = iter(hubIds).next()
+ unsubscribeAll(hubId, local_only=True)
+
+ paths = clean_self._paths
+ while paths:
+ path = iter(paths).next()
+ unsubscribeAll(path, local_only=True)
+ finally:
+ del clean_self._v_ssecunbinding
+
+ assert len(paths) == len(hubIds) == len(clean_self._registry) == 0
+
clean_self._serviceName = None
unbound = ContextMethod(unbound)
@@ -169,6 +209,8 @@
* unsubscribe() asks the next higher service to unsubscribe if this
service cannot.
+ * unsubscribeAll() does the same.
+
* listSubscriptions() includes this service's subscriptions, and
those of the next higher service.
"""
@@ -178,137 +220,79 @@
_serviceName = None # should be replaced; usually done in "bound"
# method of a subclass that is IBindingAware
- # uses (and needs) __init__ from zope.app.event.subs.Subscribable
-
- def unsubscribe(wrapped_self, subscriber, event_type=None, filter=None):
- originalSubscriber = subscriber
- clean_self = removeAllProxies(wrapped_self)
- subscribers, clean_subObj, subObj = clean_self._getSubscribers(
- wrapped_self, subscriber)
+ # requires __init__ from zope.app.event.subs.Subscribable
+ def unsubscribe(wrapped_self, reference, event_type, filter=None):
+ # The point here is that if we can't unsubscribe here, we should
+ # allow the next event service to unsubscribe.
try:
- ev_sets = clean_self._getEventSets(subscribers)
+ ContextSuper(ServiceSubscribable, wrapped_self).unsubscribe(
+ reference, event_type, filter)
except NotFoundError:
next_service = queryNextService(wrapped_self,
- clean_self._serviceName)
+ wrapped_self._serviceName)
if next_service is not None:
- next_service.unsubscribe(originalSubscriber,
- event_type,
- filter)
- elif event_type is not None:
- raise NotFoundError(originalSubscriber,
- event_type,
- filter)
- return
-
- # XXX need to check if subObj is not None?
- subscribingaware = queryAdapter(subObj, ISubscribingAware)
-
- clean_self._p_changed = 1
-
- if event_type is not None:
- # we have to clean out one and only one subscription of this
- # subscriber for event_type, filter (there may be more, even for
- # this exact combination of subscriber, event_type, filter; we
- # only delete *one*)
- ev_type = event_type
-
- # *** handle optimization: a subscription to IEvent is a
- # subscription to all events; this is converted to 'None' so
- # that the _registry can shortcut some of its tests
- if event_type is IEvent:
- ev_type = None
- for (subscriber, subscriber_index), ev_set in ev_sets.items():
- if ev_type in ev_set:
- subscriptions = clean_self._registry.get(ev_type)
- if subscriptions:
- try:
- subscriptions.remove((subscriber, filter))
- except ValueError:
- pass
- else:
- if subscribingaware:
- subscribingaware.unsubscribedFrom(
- wrapped_self, event_type, filter)
- ev_set[ev_type] -= 1
- if ev_set[ev_type] < 1:
- for sub in subscriptions:
- if sub[0] == subscriber:
- break
- else:
- if len(ev_set) > 1:
- del ev_set[ev_type]
- else: # len(ev_set) == 1
- del clean_self._subscribers[
- subscriber_index]
- break
+ next_service.unsubscribe(reference, event_type, filter)
else:
- next_service = queryNextService(wrapped_self,
- clean_self._serviceName)
- if next_service is not None:
- next_service.unsubscribe(originalSubscriber,
- event_type,
- filter)
- else:
- raise NotFoundError(originalSubscriber, event_type, filter)
- else:
- # we have to clean all the event types out (ignoring filter)
- clean_self._cleanAllForSubscriber(wrapped_self,
- ev_sets,
- subscribingaware,
- subObj)
- next_service = queryNextService(wrapped_self,
- clean_self._serviceName)
- if next_service is not None:
- next_service.unsubscribe(originalSubscriber,
- event_type,
- filter)
+ raise
unsubscribe = ContextMethod(unsubscribe)
- def listSubscriptions(wrapped_self, subscriber, event_type=None):
- clean_self = removeAllProxies(wrapped_self)
- subscribers, clean_subObj, subObj = clean_self._getSubscribers(
- wrapped_self, subscriber)
+ def unsubscribeAll(wrapped_self, reference, event_type=IEvent,
+ local_only=False):
+ # unsubscribe all from here, and from the next service
+
+ # n is the number of subscriptions removed
+ n = ContextSuper(ServiceSubscribable, wrapped_self).unsubscribeAll(
+ reference, event_type)
+ if not local_only:
+ next_service = queryNextService(wrapped_self,
+ wrapped_self._serviceName)
+ if next_service is not None:
+ n += next_service.unsubscribeAll(reference, event_type)
+ return n
+ unsubscribeAll = ContextMethod(unsubscribeAll)
+
+ def resubscribeByHubId(wrapped_self, reference):
+ n = ContextSuper(ServiceSubscribable, wrapped_self
+ ).resubscribeByHubId(reference)
+ next_service = queryNextService(wrapped_self,
+ wrapped_self._serviceName)
+ if next_service is not None:
+ n += next_service.resubscribeByHubId(reference)
+ return n
- result=[]
- if event_type:
- ev_type = event_type
- if event_type is IEvent:
- ev_type = None # handle optimization
- subscriptions = clean_self._registry.get(ev_type)
- if subscriptions:
- for sub in subscriptions:
- for subscriber in subscribers:
- if sub[0] == subscriber:
- result.append((event_type, sub[1]))
- else:
- try:
- ev_sets = clean_self._getEventSets(subscribers)
- except NotFoundError:
- return result
- for (subscriber, subscriber_index), ev_set in ev_sets.items():
- for ev_type in ev_set:
- subscriptions = clean_self._registry.get(ev_type)
- if subscriptions:
- if ev_type is None:
- ev_type = IEvent
- for sub in subscriptions:
- if sub[0] == subscriber:
- result.append((ev_type, sub[1]))
- next_service = queryNextService(wrapped_self, clean_self._serviceName)
+ def resubscribeByPath(wrapped_self, reference):
+ n = ContextSuper(ServiceSubscribable, wrapped_self
+ ).resubscribeByPath(reference)
+ next_service = queryNextService(wrapped_self,
+ wrapped_self._serviceName)
if next_service is not None:
- result.extend(
- next_service.listSubscriptions(subscriber, event_type)
- )
- return result
- listSubscriptions = ContextMethod(listSubscriptions)
+ n += next_service.resubscribeByPath(reference)
+ return n
+ def iterSubscriptions(wrapped_self, reference=None, event_type=IEvent,
+ local_only=False):
+ 'See ISubscriptionService'
+ subs = ContextSuper(ServiceSubscribable, wrapped_self
+ ).iterSubscriptions(reference, event_type)
+ for subscription in subs:
+ yield subscription
+
+ if not local_only:
+ next_service = queryNextService(wrapped_self,
+ wrapped_self._serviceName)
+ if next_service is not None:
+ for subscription in next_service.iterSubscriptions(
+ reference, event_type):
+ yield subscription
+ iterSubscriptions = ContextMethod(iterSubscriptions)
class EventService(ServiceSubscriberEventChannel, ServiceSubscribable):
__implements__ = (
IEventService,
+ ISubscriptionService,
ServiceSubscribable.__implements__,
ServiceSubscriberEventChannel.__implements__
)
@@ -333,7 +317,7 @@
publishedEvents = getattr(clean_self, "_v_publishedEvents", None)
if publishedEvents is None:
- publishedEvents = clean_self._v_publishedEvents=[event]
+ publishedEvents = clean_self._v_publishedEvents = [event]
else:
publishedEvents.append(event)
if (clean_self.isPromotableEvent(event)):
@@ -351,6 +335,7 @@
def bound(wrapped_self, name):
"See IBindingAware"
+ ContextSuper(EventService, wrapped_self).bound(name)
if name == "Subscription":
clean_self = removeAllProxies(wrapped_self)
clean_self._serviceName = name # for LocalServiceSubscribable
@@ -363,13 +348,14 @@
es.subscribe(wrapped_self)
bound = ContextMethod(bound)
- # _unbound = ServiceSubscriberEventChannel.unbound # see comment below
-
def unbound(wrapped_self, name):
"See IBindingAware"
if name == "Subscription":
clean_self = removeAllProxies(wrapped_self)
clean_self._v_unbinding = True
+ try:
+ ContextSuper(EventService, wrapped_self).unbound(name)
+
# this flag is used by the unsubscribedFrom method (below) to
# determine that it doesn't need to further unsubscribe beyond
# what we're already doing.
@@ -385,36 +371,16 @@
# ServiceSubscriberEventChannel:
#
# start copy/paste
- getPath = getAdapter(wrapped_self, ITraverser).traverse
- for subscription in clean_self._subscriptions:
- subscribable = getPath(subscription[0])
- subscribable.unsubscribe(wrapped_self)
- clean_self._subscriptions = ()
- clean_self._serviceName = None
# end copy/paste
-
- for subscriber in clean_self._subscribers:
- clean_self.__unsubscribeAllFromSelf(
- wrapped_self, subscriber[0])
- # unset flag
- clean_self._v_unbinding = None
+ finally:
+ # unset flag
+ del clean_self._v_unbinding
unbound = ContextMethod(unbound)
- def __unsubscribeAllFromSelf(clean_self, wrapped_self, subscriber):
- subscribers, clean_subObj, subObj = clean_self._getSubscribers(
- wrapped_self, subscriber)
- ev_sets = clean_self._getEventSets(subscribers)
- # XXX need to check if subObj is not None?
- subscribingaware = queryAdapter(subObj, ISubscribingAware)
-
- clean_self._p_changed = 1 # trigger persistence before change
- clean_self._cleanAllForSubscriber(wrapped_self,
- ev_sets,
- subscribingaware,
- subObj)
-
def unsubscribedFrom(wrapped_self, subscribable, event_type, filter):
"See ISubscribingAware"
+ ContextSuper(EventService, wrapped_self).unsubscribedFrom(
+ subscribable, event_type, filter)
clean_self = removeAllProxies(wrapped_self)
if getattr(clean_self, "_v_unbinding", None) is None:
# we presumably have been unsubscribed from a higher-level
@@ -422,8 +388,6 @@
# itself: we need to remove the higher level event service
# from our subscriptions list and try to find another event
# service to which to attach
- ServiceSubscriberEventChannel.unsubscribedFrom(
- clean_self, subscribable, event_type, filter)
clean_subscribable = removeAllProxies(subscribable)
if ISubscriptionService.isImplementedBy(
removeAllProxies(clean_subscribable)):