[Zope3-checkins]
SVN: Zope3/branches/jim-simplifyevents/src/zope/app/event/
Got rid of the event services.
Jim Fulton
jim at zope.com
Wed May 26 07:30:37 EDT 2004
Log message for revision 25001:
Got rid of the event services.
-=-
Deleted: Zope3/branches/jim-simplifyevents/src/zope/app/event/globalservice.py
===================================================================
--- Zope3/branches/jim-simplifyevents/src/zope/app/event/globalservice.py 2004-05-26 11:28:36 UTC (rev 25000)
+++ Zope3/branches/jim-simplifyevents/src/zope/app/event/globalservice.py 2004-05-26 11:30:37 UTC (rev 25001)
@@ -1,237 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""
-$Id$
-"""
-
-__metaclass__ = type
-
-from zope.interface.type import TypeRegistry
-from zope.interface import implements
-from zope.exceptions import NotFoundError
-from zope.proxy import removeAllProxies
-
-from zope.app.event.interfaces import IEvent, ISubscriber, ISubscribingAware
-from zope.app.event.interfaces import IGlobalSubscribable, IPublisher
-
-import logging
-import pprint
-from StringIO import StringIO
-
-def checkEventType(event_type, allow_none=False):
- if not (
- (allow_none and event_type is None)
- or event_type.extends(IEvent, strict=False)
- ):
- raise TypeError('event_type must extend IEvent: %s' % repr(event_type))
-
-class Logger:
- """Class to log all events sent out by an event service.
-
- This is an event subscriber that you can add via ZCML to log all
- events sent out by Zope.
- """
-
- implements(ISubscriber)
-
- def __init__(self, severity=logging.INFO):
- self.severity = severity
- self.logger = logging.getLogger("Event.Logger")
-
- def notify(self, event):
- c = event.__class__
- detail = StringIO()
- data = event.__dict__.items()
- data.sort()
- # XXX The detail output could be improved. Not sure what is wanted.
- pprint.pprint(data, detail)
- self.logger.log(self.severity, "%s.%s: %s",
- c.__module__, c.__name__, detail.getvalue())
-
-
-class GlobalSubscribable:
- """A global mix-in"""
-
- implements(IGlobalSubscribable)
-
- def __init__(self):
- self._registry = TypeRegistry()
- self._subscribers = [] # use dict?
-
- _clear = __init__
-
- def globalSubscribe(self, subscriber, event_type=IEvent, filter=None):
- checkEventType(event_type)
- clean_subscriber = removeAllProxies(subscriber)
-
- subscribingaware = ISubscribingAware(subscriber, None)
- if subscribingaware is not None:
- subscribingaware.subscribedTo(self, event_type, filter)
-
- if event_type is IEvent:
- event_type = None # optimization
-
- subscribers = self._registry.setdefault(event_type, [])
- subscribers.append((clean_subscriber, filter))
-
- for sub in self._subscribers:
- if sub[0] == clean_subscriber:
- try:
- sub[1][event_type] += 1
- except KeyError:
- sub[1][event_type] = 1
- break
- else:
- self._subscribers.append((clean_subscriber, {event_type: 1}))
-
- # Trigger persistence, if pertinent
- # self._registry = self._registry
-
-
- def unsubscribe(self, subscriber, event_type=None, filter=None):
- checkEventType(event_type, allow_none=True)
- clean_subscriber = removeAllProxies(subscriber)
-
- for subscriber_index in range(len(self._subscribers)):
- sub = self._subscribers[subscriber_index]
- if sub[0] == clean_subscriber:
- ev_set = sub[1] # the dict of type:subscriptionCount
- break
- else:
- if event_type is not None:
- raise NotFoundError(subscriber)
- else:
- # this was a generic unsubscribe all request; work may have
- # been done by a local service
- return
-
- subscribingaware = ISubscribingAware(subscriber, None)
-
- if event_type:
- ev_type = event_type
- if event_type is IEvent:
- ev_type = None # handle optimization
- if ev_type not in ev_set:
- raise NotFoundError(subscriber, event_type, filter)
- subscriptions = self._registry.get(ev_type)
- if not subscriptions:
- raise NotFoundError(subscriber, event_type, filter)
- try:
- subscriptions.remove((clean_subscriber, filter))
- except ValueError:
- raise NotFoundError(subscriber, event_type, filter)
- if subscribingaware is not None:
- subscribingaware.unsubscribedFrom(self, event_type, filter)
- ev_set[ev_type] -= 1
- if ev_set[ev_type] < 1:
- for asubscriber, afilter in subscriptions:
- if asubscriber == clean_subscriber:
- break
- else:
- if len(ev_set) > 1:
- del ev_set[ev_type]
- else: # len(ev_set) == 1, and we just eliminated it
- del self._subscribers[subscriber_index]
- else:
- for ev_type in ev_set:
- subscriptions = self._registry.get(ev_type)
- if ev_type is None:
- ev_type = IEvent
- subs = subscriptions[:]
- subscriptions[:] = []
- for asubscriber, afilter in subs:
- if asubscriber == clean_subscriber:
- # deleted (not added back)
- if subscribingaware is not None:
- subscribingaware.unsubscribedFrom(
- self, ev_type, afilter)
- else:
- # kept (added back)
- subscriptions.append(sub)
- del self._subscribers[subscriber_index]
- # Trigger persistence, if pertinent
- # self._registry = self._registry
-
- def listSubscriptions(self, subscriber, event_type=None):
- checkEventType(event_type, allow_none=True)
- subscriber = removeAllProxies(subscriber)
-
- result = []
- if event_type:
- ev_type = event_type
- if event_type is IEvent:
- ev_type = None # handle optimization
- subscriptions = self._registry.get(ev_type)
- if subscriptions:
- for sub in subscriptions:
- if sub[0] == subscriber:
- result.append((event_type, sub[1]))
- else:
- for subscriber_index in range(len(self._subscribers)):
- sub = self._subscribers[subscriber_index]
- if sub[0] == subscriber:
- ev_set = sub[1]
- break
- else:
- return result
- for ev_type in ev_set:
- subscriptions = self._registry.get(ev_type)
- if subscriptions:
- if ev_type is None:
- ev_type = IEvent
- for sub in subscriptions:
- if sub[0] == subscriber:
- result.append((ev_type, sub[1]))
- return result
-
-
-def globalNotifyOrPublish(self, event):
- assert IEvent.providedBy(event)
- subscriptionsForEvent = self._registry.getAllForObject(event)
- for subscriptions in subscriptionsForEvent:
- for subscriber, filter in subscriptions:
- if filter is not None and not filter(event):
- continue
- ISubscriber(subscriber).notify(event)
-
-class GlobalEventChannel(GlobalSubscribable):
-
- implements(IGlobalSubscribable, ISubscriber)
-
- notify = globalNotifyOrPublish
-
-
-class GlobalEventPublisher(GlobalSubscribable):
-
- implements(IGlobalSubscribable, IPublisher)
-
- publish = globalNotifyOrPublish
-
-
-# Repeated here, and in zope/app/event/__init__.py to avoid circular import.
-def globalSubscribeMany(subscriber, event_types=(IEvent,), filter=None):
- subscribe_func = eventPublisher.globalSubscribe
- for event_type in event_types:
- subscribe_func(subscriber, event_type, filter)
-
-eventPublisher = GlobalEventPublisher()
-eventLogger = Logger()
-
-_clear = eventPublisher._clear
-
-# Register our cleanup with Testing.CleanUp to make writing unit tests simpler.
-from zope.testing.cleanup import addCleanUp
-addCleanUp(_clear)
-del addCleanUp
-
Deleted: Zope3/branches/jim-simplifyevents/src/zope/app/event/localservice.py
===================================================================
--- Zope3/branches/jim-simplifyevents/src/zope/app/event/localservice.py 2004-05-26 11:28:36 UTC (rev 25000)
+++ Zope3/branches/jim-simplifyevents/src/zope/app/event/localservice.py 2004-05-26 11:30:37 UTC (rev 25001)
@@ -1,428 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Local Event Service and related classes.
-
-$Id$
-"""
-import logging
-from zope.exceptions import NotFoundError
-
-from zope.app import zapi
-
-from zope.component import queryService
-from zope.app.event.interfaces import IEvent, ISubscriber, IEventChannel
-from zope.app.event.interfaces import ISubscriptionService, IEventService
-from zope.app.site.interfaces import IBindingAware
-
-from zope.component import ComponentLookupError
-from zope.app.servicenames import HubIds, EventPublication, EventSubscription
-from zope.app.component.localservice import getNextService, queryNextService
-
-from zope.proxy import removeAllProxies
-from zope.interface import implements
-
-from zope.app.event.subs import Subscribable, SubscriptionTracker
-
-from zope.security.proxy import trustedRemoveSecurityProxy
-from zope.app.container.contained import Contained
-
-
-def getSubscriptionService(context):
- return zapi.getService(context, EventSubscription)
-
-def subscribe(subscriber, event_type=IEvent, filter=None, context=None):
- if context is None and not isinstance(subscriber, (int, str, unicode)):
- context = subscriber
- return getSubscriptionService(context).subscribe(
- subscriber, event_type, filter)
-
-def subscribeMany(subscriber, event_types=(IEvent,),
- filter=None, context=None):
- if context is None and not isinstance(subscriber, (int, str, unicode)):
- context = subscriber
- subscribe = getSubscriptionService(context).subscribe
- for event_type in event_types:
- subscribe(subscriber, event_type, filter)
-
-def unsubscribe(subscriber, event_type, filter=None, context=None):
- if context is None and not isinstance(subscriber, (int, str, unicode)):
- context = subscriber
- return getSubscriptionService(context).unsubscribe(
- subscriber, event_type, filter)
-
-def unsubscribeAll(subscriber, event_type=IEvent, context=None,
- local_only=False):
- if context is None and not isinstance(subscriber, (int, str, unicode)):
- context = subscriber
- return getSubscriptionService(context).unsubscribeAll(
- subscriber, event_type, local_only=local_only)
-
-def iterSubscriptions(subscriber=None, event_type=None, local_only=False,
- context=None):
- if context is None and not isinstance(subscriber, (int, str, unicode)):
- context = subscriber
- return getSubscriptionService(context).iterSubscriptions(
- subscriber, event_type, local_only)
-
-
-class EventChannel(Subscribable):
-
- implements(IEventChannel)
-
- # needs __init__ from zope.app.event.subs.Subscribable
-
- def _notify(clean_self, wrapped_self, event):
- subscriptionsForEvent = clean_self._registry.getAllForObject(event)
- hubIdsService = queryService(wrapped_self, HubIds)
- if hubIdsService is None:
- # This will only happen if there is no HubIds service.
- # This is only true at start-up, so we don't bother testing
- # whether hubGet is None in the loop below.
- hubGet = None
- else:
- hubGet = hubIdsService.getObject
-
- root = removeAllProxies(zapi.getRoot(wrapped_self))
-
- badSubscribers = {} # using a dict as a set
- for subscriptions in subscriptionsForEvent:
- for subscriber,filter in subscriptions:
- if filter is not None and not filter(event):
- continue
- if isinstance(subscriber, int):
- try:
- obj = hubGet(subscriber)
-
- # XXX we need to figure out exactly how we want to
- # handle this. For now, we'll assume that all
- # subscriptions are trusted, so can always notify
- obj = trustedRemoveSecurityProxy(obj)
- except NotFoundError:
- badSubscribers[subscriber] = None
- continue
- else:
- try:
- obj = zapi.traverse(root, subscriber)
- except NotFoundError:
- badSubscribers[subscriber] = None
- continue
- # Get an ISubscriber adapter in the context of the object
- # This is probably the right context to use.
- #
- # Using getAdapter rather than queryAdapter because if there
- # is no ISubscriber adapter available, that is an application
- # error that should be fixed. So, failing is appropriate, and
- # adding this subscriber to badSubscribers is inappropriate.
- ISubscriber(obj).notify(event)
-
- for subscriber in badSubscribers:
- logging.getLogger('SiteError').warn(
- "Notifying a subscriber that does not exist."
- " Unsubscribing it: %s" % subscriber)
- # Also, is it right that we should sometimes have
- # "write caused by a read" semantics? I'm seeing notify() as
- # basically a read, and (un)subscribe as a write.
- wrapped_self.unsubscribeAll(subscriber)
-
- def notify(wrapped_self, event):
- clean_self = removeAllProxies(wrapped_self)
- clean_self._notify(wrapped_self, event)
-
-
-class ServiceSubscriberEventChannel(SubscriptionTracker, EventChannel):
- """An event channel that wants to subscribe to the nearest
- event service when bound, and unsubscribe when unbound.
- """
-
- implements(IBindingAware)
-
- def __init__(self):
- SubscriptionTracker.__init__(self)
- EventChannel.__init__(self)
-
- subscribeOnBind = True
- # if true, event service will subscribe
- # to the parent event service on binding, unless the parent
- # service is the global event service; see 'bound' method
- # below
-
- _serviceName = None
- # the name of the service that this object is providing, or
- # None if unbound
-
- _subscribeToServiceName = EventSubscription
- _subscribeToServiceInterface = IEvent
- _subscribeToServiceFilter = None
-
- def subscribe(self, reference, event_type=IEvent, filter=None):
- if getattr(self, "_v_ssecunbinding", None) is not None:
- raise Exception(
- 'Cannot subscribe to a subscriber that is unbinding.')
- return super(ServiceSubscriberEventChannel, self
- ).subscribe(reference, event_type, filter)
-
- def bound(wrapped_self, name):
- "See IBindingAware"
- # Note: if a component is used for more than one service then
- # this and the unbound code must be conditional for the
- # pertinent service that should trigger event subscription
- clean_self = removeAllProxies(wrapped_self)
- clean_self._serviceName = name # for ServiceSubscribable
- if clean_self.subscribeOnBind:
- es = queryService(
- wrapped_self, clean_self._subscribeToServiceName)
- if es is not None:
- if removeAllProxies(es) is clean_self:
- es = queryNextService(
- wrapped_self, clean_self._subscribeToServiceName)
- if es is None:
- subscribe_to = clean_self._subscribeToServiceName
- logging.getLogger('SiteError').warn(
- "Unable to subscribe %s service to the %s service "
- "while binding the %s service. This is because the "
- "%s service could not be found." %
- (name, subscribe_to, name, subscribe_to))
- else:
- es.subscribe(
- wrapped_self,
- clean_self._subscribeToServiceInterface,
- clean_self._subscribeToServiceFilter
- )
-
- def unbound(wrapped_self, name):
- "See IBindingAware"
- # Note: if a component is used for more than one service then
- # this and the unbound code must be conditional for the
- # pertinent service that should trigger event subscription
-
- clean_self = removeAllProxies(wrapped_self)
-
- # unsubscribe all subscriptions
- hubIds = clean_self._hubIds
- unsubscribeAll = wrapped_self.unsubscribeAll
-
- # XXX Temporary hack to make unsubscriptions local in scope when
- # this mix-in is used as part of a subscriptions service.
- # The dependences of these mixins need to be documented and
- # reevaluated.
- if ISubscriptionService.providedBy(wrapped_self):
- real_unsubscribeAll = unsubscribeAll
- unsubscribeAll = lambda x: real_unsubscribeAll(x, local_only=True)
-
- try:
- clean_self._v_ssecunbinding = True
- while hubIds:
- hubId = iter(hubIds).next()
- # XXX This code path needs a unit test!
- # This code is also wrong.
- # The call to unsubscribeAll assumes that whatever class
- # mixes this class in provides an unsubscribeAll method
- # that correctly uses the self._subscribeToServiceName
- # to decide what it should be unsubscribing from.
- # This could be any service that implements
- # ISubscriptionService
- unsubscribeAll(hubId)
-
- paths = clean_self._paths
- while paths:
- path = iter(paths).next()
- # XXX This code path needs a unit test!
- # Also, see comment above.
- unsubscribeAll(path)
- finally:
- del clean_self._v_ssecunbinding
-
- assert len(paths) == len(hubIds) == len(clean_self._registry) == 0
-
- clean_self._serviceName = None
-
-
-class ServiceSubscribable(Subscribable):
- """A mix-in for local event services.
-
- * unsubscribe() asks the next higher service to unsubscribe if this
- service cannot.
-
- * unsubscribeAll() does the same.
-
- * listSubscriptions() includes this service's subscriptions, and
- those of the next higher service.
- """
-
- _serviceName = None # should be replaced; usually done in "bound"
- # method of a subclass that is IBindingAware
-
- # requires __init__ from zope.app.event.subs.Subscribable
-
- def unsubscribe(self, reference, event_type, filter=None):
- # The point here is that if we can't unsubscribe here, we should
- # allow the next event service to unsubscribe.
- try:
- super(ServiceSubscribable, self
- ).unsubscribe(reference, event_type, filter)
- except NotFoundError:
- next_service = queryNextService(self,
- self._serviceName)
- if next_service is not None:
- next_service.unsubscribe(reference, event_type, filter)
- else:
- raise
-
- def unsubscribeAll(self, reference, event_type=IEvent,
- local_only=False):
- # unsubscribe all from here, and from the next service
-
- # n is the number of subscriptions removed
- n = super(ServiceSubscribable, self
- ).unsubscribeAll(reference, event_type)
- if not local_only:
- next_service = queryNextService(self, self._serviceName)
- if next_service is not None:
- n += next_service.unsubscribeAll(reference, event_type)
- return n
-
- def resubscribeByHubId(self, reference):
- n = super(ServiceSubscribable, self
- ).resubscribeByHubId(reference)
- next_service = queryNextService(self, self._serviceName)
- if next_service is not None:
- n += next_service.resubscribeByHubId(reference)
- return n
-
- def resubscribeByPath(self, reference):
- n = super(ServiceSubscribable, self
- ).resubscribeByPath(reference)
- next_service = queryNextService(self, self._serviceName)
- if next_service is not None:
- n += next_service.resubscribeByPath(reference)
- return n
-
- def iterSubscriptions(self, reference=None, event_type=IEvent,
- local_only=False):
- 'See ISubscriptionService'
- subs = super(ServiceSubscribable, self
- ).iterSubscriptions(reference, event_type)
- for subscription in subs:
- yield subscription
-
- if not local_only:
- next_service = queryNextService(self, self._serviceName)
- if next_service is not None:
- for subscription in next_service.iterSubscriptions(
- reference, event_type):
- yield subscription
-
-
-from zope.app.site.interfaces import ISimpleService
-
-class EventService(ServiceSubscriberEventChannel, ServiceSubscribable,
- Contained):
-
- implements(IEventService, ISubscriptionService, ISimpleService)
-
- def __init__(self):
- ServiceSubscriberEventChannel.__init__(self)
- ServiceSubscribable.__init__(self)
-
- def isPromotableEvent(self, event):
- """A hook. Returns True if, when publishing an event, the event
- should also be promoted to the next (higher) level of event service,
- and False otherwise."""
- # XXX A probably temporary appendage. Depending on the usage,
- # this should be (a) kept as is, (b) made into a registry, or
- # (c) removed.
- return True
-
- def publish(wrapped_self, event):
- "see IEventPublisher"
- clean_self = removeAllProxies(wrapped_self)
-
- publishedEvents = getattr(clean_self, "_v_publishedEvents", [])
- clean_self._v_publishedEvents = publishedEvents
- publishedEvents.append(event)
- try:
- clean_self._notify(wrapped_self, event)
- if clean_self.isPromotableEvent(event):
- getNextService(wrapped_self, EventPublication).publish(event)
- finally:
- publishedEvents.remove(event)
-
- def notify(wrapped_self, event):
- "see ISubscriber"
- clean_self = removeAllProxies(wrapped_self)
- publishedEvents = getattr(clean_self, "_v_publishedEvents", [])
- if event not in publishedEvents:
- clean_self._notify(wrapped_self, event)
-
- def bound(wrapped_self, name):
- "See IBindingAware"
- # An event service is bound as EventSubscription and EventPublication.
- # We only want to subscribe to the next event service when we're bound
- # as EventSubscription
- if name == EventSubscription:
- clean_self = removeAllProxies(wrapped_self)
- clean_self._serviceName = name # for ServiceSubscribable
- if clean_self.subscribeOnBind:
- try:
- es = getNextService(wrapped_self, EventSubscription)
- except ComponentLookupError:
- pass
- else:
- es.subscribe(wrapped_self)
-
- def unbound(self, name):
- "See IBindingAware"
- # An event service is bound as EventSubscription and EventPublication.
- # We only want to unsubscribe from the next event service when
- # we're unbound as EventSubscription
- if name == EventSubscription:
- clean_self = removeAllProxies(self)
-
- # This flag is used by the unsubscribedFrom method (below) to
- # determine that it doesn't need to further unsubscribe beyond
- # what we're already doing.
- clean_self._v_unbinding = True
- try:
- super(EventService, self).unbound(name)
- finally:
- # unset flag
- del clean_self._v_unbinding
-
- def unsubscribedFrom(self, subscribable, event_type, filter):
- "See ISubscribingAware"
- super(EventService, self
- ).unsubscribedFrom(subscribable, event_type, filter)
- clean_self = removeAllProxies(self)
- if getattr(clean_self, "_v_unbinding", None) is None:
- # we presumably have been unsubscribed from a higher-level
- # event service because that event service is unbinding
- # itself: we need to remove the higher level event service
- # from our subscriptions list and try to find another event
- # service to which to attach
- clean_subscribable = removeAllProxies(subscribable)
- if ISubscriptionService.providedBy(
- removeAllProxies(clean_subscribable)):
- try:
- context = zapi.getService(self, EventSubscription)
- # we do this instead of getNextService because the order
- # of unbinding and notification of unbinding is not
- # guaranteed
- while removeAllProxies(context) in (
- clean_subscribable, clean_self):
- context = getNextService(context, EventSubscription)
- except ComponentLookupError:
- pass
- else:
- context.subscribe(self)
-
Deleted: Zope3/branches/jim-simplifyevents/src/zope/app/event/subs.py
===================================================================
--- Zope3/branches/jim-simplifyevents/src/zope/app/event/subs.py 2004-05-26 11:28:36 UTC (rev 25000)
+++ Zope3/branches/jim-simplifyevents/src/zope/app/event/subs.py 2004-05-26 11:30:37 UTC (rev 25001)
@@ -1,520 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""
-Revision information:
-
-$Id$
-"""
-from zope.exceptions import NotFoundError
-from persistent import Persistent
-from BTrees.OOBTree import OOBTree
-from BTrees.IOBTree import IOBTree
-
-from zope.proxy import removeAllProxies
-
-from zope.app.traversing.api import getPath
-from zope.app.traversing.api import canonicalPath, traverse
-from zope.app.event.interfaces import IEvent, ISubscriber, ISubscribable
-from zope.app.event.interfaces import ISubscribingAware
-
-from zope.component import getService, queryService
-from zope.app.servicenames import HubIds
-from zope.app.interface.type import PersistentTypeRegistry
-from cPickle import dumps, PicklingError
-from zope.interface import implements
-from zope.app.container.contained import Contained
-
-import logging
-
-__metaclass__ = type
-
-class Subscribable(Persistent, Contained):
- """A local mix-in"""
-
- implements(ISubscribable)
-
- def __init__(self):
- # the type registry
- # key is an event interfaces
- # value is a list of tuples (subscriber_token, filter)
- self._registry = PersistentTypeRegistry()
-
- # hubId : { event_type : number of subscriptions }
- self._hubIds = IOBTree()
- # path : { event_type : number of subscriptions }
- self._paths = OOBTree()
-
- def subscribe(wrapped_self, reference, event_type=IEvent, filter=None):
- '''See ISubscribable'''
- if filter is not None:
- try:
- dumps(filter)
- except PicklingError:
- raise ValueError('The filter argument must be picklable',
- filter)
-
- if not event_type.extends(IEvent, strict=False):
- raise TypeError('event_type must be IEvent or extend IEvent',
- event_type)
- reftype, token, wrapped_object, clean_object = getWayToSubscribe(
- wrapped_self, reference)
-
- # If wrapped_object is None, the object can't be traversed to,
- # so raise an exception
- if wrapped_object is None:
- raise NotFoundError(reference)
-
- clean_self = removeAllProxies(wrapped_self)
-
- if clean_object is clean_self:
- raise RuntimeError("Cannot subscribe to self")
-
- # Check that ISubscriber adapter exists for the wrapped object.
- # This will raise an error if there is no such adapter.
- ISubscriber(wrapped_object)
-
- # Optimisation
- if event_type is IEvent:
- ev_type = None
- else:
- ev_type = event_type
-
- subscribers = clean_self._registry.get(ev_type)
- if subscribers is None:
- subscribers = []
- subscribers.append((token, filter))
- # Ensure that type registry is triggered for persistence
- clean_self._registry.register(ev_type, subscribers)
-
- # increment the subscription count for this subscriber
- if reftype is int:
- tokens = clean_self._hubIds
- elif reftype is unicode:
- tokens = clean_self._paths
- else:
- raise AssertionError('reftype must be int or unicode')
-
- # evtype_numsubs is a dict {ev_type : number of subscriptions}
- evtype_numsubs = tokens.get(token, {})
- evtype_numsubs.setdefault(ev_type, 0)
- evtype_numsubs[ev_type] += 1
- tokens[token] = evtype_numsubs
-
- subscribingaware = ISubscribingAware(wrapped_object, None)
- if subscribingaware is not None:
- subscribingaware.subscribedTo(wrapped_self, event_type, filter)
-
- return token
-
- def unsubscribe(wrapped_self, reference, event_type, filter=None):
- 'See ISubscribable. Remove just one subscription.'
- # Remove just one subscription.
- # If reference is an int, try removing by hubId, and if that fails
- # try by path.
- # If reference is a string, try removing by path, and if that fails
- # try by hubId.
- # If reference is an object, try removing by hubId and if that fails,
- # try by path.
-
- # check that filter is picklable
- if filter is not None:
- try:
- dumps(filter)
- except PicklingError:
- raise ValueError('The filter argument must be picklable',
- filter)
- # check that event_type is an IEvent
- if not event_type.extends(IEvent, strict=False):
- raise TypeError('event_type must extend IEvent', event_type)
-
- # Optimisation
- if event_type is IEvent:
- ev_type = None
- else:
- ev_type = event_type
-
- clean_self = removeAllProxies(wrapped_self)
- cleanobj, wrappedobj, path, hubId, reftype = getWaysToSubscribe(
- wrapped_self, reference)
-
- if path is None and hubId is None:
- raise ValueError('Can get neither path nor hubId for reference',
- reference)
-
- if cleanobj is None:
- logging.getLogger('SiteError').warn(
- "Unsubscribing an object that doesn't exist: %s" % reference)
-
- registry = clean_self._registry
-
- if reftype is unicode:
- where_to_search = ((path, clean_self._paths),
- (hubId, clean_self._hubIds))
- else:
- where_to_search = ((hubId, clean_self._hubIds),
- (path, clean_self._paths))
-
- for token, tokens in where_to_search:
-
- if token is not None:
- evtype_numsubs = tokens.get(token)
- if evtype_numsubs is not None:
- numsubs = evtype_numsubs.get(ev_type)
- if numsubs:
-
- # Go through the registrations in self._registry for
- # exactly ev_type, looking for the first occurence
- # of (path, filter)
- subscriptions = registry.get(ev_type)
- try:
- subscriptions.remove((token, filter))
- except ValueError:
- # The subscription (token, filter) was not present
- pass
- else:
- if subscriptions:
- registry.register(ev_type, subscriptions)
- else:
- registry.unregister(ev_type)
-
- if numsubs == 1:
- del evtype_numsubs[ev_type]
- else:
- evtype_numsubs[ev_type] = numsubs - 1
- if evtype_numsubs:
- tokens[token] = evtype_numsubs
- else:
- del tokens[token]
-
- break
- else:
- # No subscription was removed.
- raise NotFoundError(reference)
-
- subscribingaware = ISubscribingAware(wrappedobj, None)
- if subscribingaware is not None:
- subscribingaware.unsubscribedFrom(wrapped_self, event_type, filter)
-
- def unsubscribeAll(wrapped_self, reference, event_type=IEvent):
- 'See ISubscribable. Remove all matching subscriptions.'
- # check that event_type is an IEvent
- if not IEvent.isEqualOrExtendedBy(event_type):
- raise TypeError('event_type must extend IEvent')
-
- clean_self = removeAllProxies(wrapped_self)
- cleanobj, wrappedobj, path, hubId, reftype = getWaysToSubscribe(
- wrapped_self, reference)
-
- if path is None and hubId is None:
- raise ValueError('Can get neither path nor hubId for reference',
- reference)
-
- if cleanobj is None:
- logging.getLogger('SiteError').warn(
- "Unsubscribing all for an object that doesn't exist: %s" %
- reference)
-
- subscribingaware = ISubscribingAware(wrappedobj, None)
-
- registry = clean_self._registry
- if event_type is IEvent:
- ev_type = None
- else:
- ev_type = IEvent
- eventtypes = registry.getTypesMatching(ev_type)
-
- eventtypes_used = {} # used as a set, so values are all None
- num_registrations_removed = 0
- num_subscriptions_removed = 0
- for token, tokens in ((path, clean_self._paths),
- (hubId, clean_self._hubIds)):
-
- if token is not None:
- evtype_numsubs = tokens.get(token)
- if evtype_numsubs is not None:
- for et in eventtypes:
- numsubs = evtype_numsubs.get(et)
- if numsubs is not None:
- num_subscriptions_removed += numsubs
- del evtype_numsubs[et]
- eventtypes_used[et] = None # add key to the set
- if not evtype_numsubs:
- del tokens[token]
-
- for et in eventtypes_used:
- subscriptions = []
- for token, filter in registry.get(et):
- if token == path or token == hubId:
- num_registrations_removed += 1
- if subscribingaware is not None:
- subscribingaware.unsubscribedFrom(
- wrapped_self, et or IEvent, filter)
- else:
- subscriptions.append((token, filter))
- if subscriptions:
- registry.register(et, subscriptions)
- else:
- registry.unregister(et)
-
- assert num_registrations_removed == num_subscriptions_removed
- return num_registrations_removed
-
- def resubscribeByHubId(wrapped_self, reference):
- 'Where a subscriber has a hubId, resubscribe it by that hubid'
- clean_self = removeAllProxies(wrapped_self)
- cleanobj, wrappedobj, path, hubId, reftype = getWaysToSubscribe(
- wrapped_self, reference)
-
- if hubId is None:
- raise ValueError('Cannot get hubId for reference', reference)
-
- if path is None:
- raise ValueError('Cannot get path for reference', reference)
-
- if cleanobj is None:
- # Perhaps this should raise an exception?
- logging.getLogger('SiteError').warn(
- "resubscribeByHubId for an object that doesn't exist: %s" %
- reference)
-
- self._resubscribe(path, clean_self._paths, hubId, clean_self._hubIds)
-
- def resubscribeByPath(wrapped_self, reference):
- clean_self = removeAllProxies(wrapped_self)
- cleanobj, wrappedobj, path, hubId, reftype = getWaysToSubscribe(
- wrapped_self, reference)
-
- if path is None:
- raise ValueError('Cannot get path for reference', reference)
-
- if hubId is None:
- raise ValueError('Cannot get hubId for reference', reference)
-
- if cleanobj is None:
- # Perhaps this should raise an exception?
- logging.getLogger('SiteError').warn(
- "resubscribeByPath for an object that doesn't exist: %s" %
- reference)
-
- self._resubscribe(hubId, clean_self._hubIds, path, clean_self._paths)
-
- def iterSubscriptions(wrapped_self, reference=None, event_type=IEvent):
- '''See ISubscribable'''
- if reference is None:
- return wrapped_self._iterAllSubscriptions(wrapped_self, event_type)
- else:
- return wrapped_self._iterSomeSubscriptions(wrapped_self,
- reference,
- event_type)
-
- def _iterAllSubscriptions(self, wrapped_self, event_type):
- clean_self = removeAllProxies(wrapped_self)
- if event_type is IEvent:
- ev_type = None
- else:
- ev_type = event_type
- registry = clean_self._registry
- eventtypes = registry.getTypesMatching(ev_type)
- for et in eventtypes:
- subscribers = registry.get(et)
- if et is None:
- et = IEvent
- for token, filter in subscribers:
- yield token, et, filter
-
- def _iterSomeSubscriptions(self, wrapped_self, reference, event_type):
- clean_self = removeAllProxies(wrapped_self)
-
- cleanobj, wrappedobj, path, hubId, reftype = getWaysToSubscribe(
- wrapped_self, reference)
-
- if path is None and hubId is None:
- raise ValueError('Can get neither path nor hubId for reference',
- reference)
-
- if cleanobj is None:
- logging.getLogger('SiteError').warn(
- "iterSubscriptions for an object that doesn't exist: %s" %
- reference)
- if event_type is IEvent:
- ev_type = None
- else:
- ev_type = event_type
- registry = clean_self._registry
- eventtypes = registry.getTypesMatching(ev_type)
- eventtypes_used = {} # used as a set, so values are all None
-
- for token, tokens in ((path, clean_self._paths),
- (hubId, clean_self._hubIds)):
-
- if token is not None:
- evtype_numsubs = tokens.get(token)
- if evtype_numsubs is not None:
- for et in eventtypes:
- numsubs = evtype_numsubs.get(et)
- if numsubs is not None:
- eventtypes_used[et] = None # add key to the set
-
- for et in eventtypes_used:
- if et is None:
- et = IEvent
- for token, filter in registry.get(et):
- if token == path or token == hubId:
- yield token, et, filter
-
- def _resubscribe(self, fromtoken, fromsubs, totoken, tosubs):
- path_evtype_numsubs = fromsubs.get(fromtoken)
- if not path_evtype_numsubs:
- # nothing to do
- return 0
- del fromsubs[fromtoken]
- hubId_evtype_numsubs = tosubs.get(totoken, {})
- num_registrations_converted = 0
- num_subscriptions_converted = 0
- registry = self._registry
- for ev_type, num_subscriptions in path_evtype_numsubs.iteritems():
- num_subscriptions_converted += num_subscriptions
- hubId_evtype_numsubs[ev_type] = (
- hubId_evtype_numsubs.get(ev_type, 0) + num_subscriptions)
-
- subscriptions = registry.get(ev_type)
- if subscriptions:
- new_subscriptions = []
- for token, filter in registry.get(ev_type):
- if token == fromtoken:
- new_subscriptions.append((totoken, filter))
- num_registrations_converted += 1
- else:
- new_subscriptions.append((token, filter))
- registry[ev_type] = new_subscriptions
-
- # I'm not using this one-liner because I want to count the number
- # of registrations converted for sanity-checking.
- #
- # registry[ev_type] = [
- # (token==fromtoken and totoken or token, filter)
- # for token, filter in registry.get(ev_type)
- # ]
-
- if hubId_evtype_numsubs:
- tosubs[totoken] = hubId_evtype_numsubs
-
- assert num_registrations_converted == num_subscriptions_converted
- return num_subscriptions_converted
-
-num = 0
-class SubscriptionTracker:
- "Mix-in for subscribers that want to know to whom they are subscribed"
-
- implements(ISubscribingAware)
-
- def __init__(self):
- self._subscriptions = ()
- global num
- self.number = num
- num += 1
-
- def subscribedTo(self, subscribable, event_type, filter):
- # XXX insert super() call here
- # This raises an error for subscriptions to global event service.
- subscribable_path = getPath(subscribable)
- self._subscriptions += ((subscribable_path, event_type, filter),)
-
- def unsubscribedFrom(self, subscribable, event_type, filter):
- # XXX insert super() call here
- # This raises an error for subscriptions to global event service.
- subscribable_path = getPath(subscribable)
- sub = list(self._subscriptions)
- sub.remove((subscribable_path, event_type, filter))
- self._subscriptions = tuple(sub)
-
-def getWayToSubscribe(context, reference):
- '''Figure out the most appropriate means of subscribing the subscriber.
-
- Returns a tuple:
- (subscription_token, token-type, wrapped_subscriber_object)
- '''
- clean, wrapped, path, hubId, reftype = getWaysToSubscribe(
- context, reference, allways=False)
- if reftype is unicode or hubId is None:
- return unicode, path, wrapped, clean
- if path is None and hubId is None:
- raise Exception('Can get neither path nor hubId for reference',
- reference)
- return int, hubId, wrapped, clean
-
-def getWaysToSubscribe(context, reference, allways=True):
- '''Get the various means of subscription available for the given
- reference.
-
- Returns a tuple of:
- clean_object, wrapped_object, unicode path, hubId, reftype
-
- reftype is object, int or unicode.
- '''
- cleanobj = None
- wrappedobj = None
- hubId = None
- path = None
-
- clean_reference = removeAllProxies(reference)
-
- if isinstance(clean_reference, int):
- reftype = int
- hubId = clean_reference
- hub = getService(context, HubIds)
- try:
- wrappedobj = hub.getObject(hubId)
- except NotFoundError:
- wrappedobj = None
- else:
- if allways:
- try:
- # Hub can't resolve the object, but it might still know
- # the location the object is supposed to be at.
- path = hub.getLocation(hubId)
- # XXX remove this next line when objecthub is refactored
- path = canonicalPath(path)
- except NotFoundError:
- path = getPath(wrappedobj)
- cleanobj = removeAllProxies(wrappedobj)
- elif isinstance(clean_reference, basestring):
- reftype = unicode
- path = canonicalPath(clean_reference)
- try:
- wrappedobj = traverse(context, path)
- except NotFoundError:
- wrappedobj = None
- else:
- cleanobj = removeAllProxies(wrappedobj)
- if allways:
- hub = queryService(context, HubIds)
- if hub is not None:
- try:
- hubId = hub.getHubId(path)
- except NotFoundError:
- pass
- else:
- reftype = object
- wrappedobj = reference
- cleanobj = clean_reference
- path = getPath(wrappedobj)
- hub = queryService(context, HubIds)
- if hub is not None:
- try:
- hubId = hub.getHubId(path)
- except NotFoundError:
- pass
-
- return cleanobj, wrappedobj, path, hubId, reftype
Deleted: Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/eventsetup.py
===================================================================
--- Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/eventsetup.py 2004-05-26 11:28:36 UTC (rev 25000)
+++ Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/eventsetup.py 2004-05-26 11:30:37 UTC (rev 25001)
@@ -1,25 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Event Setup
-
-$Id$
-"""
-from zope.app.site.tests.placefulsetup import PlacefulSetup
-
-class EventSetup(PlacefulSetup):
-
- def setUp(self):
- super(EventSetup, self).setUp(site=True)
- self.createStandardServices()
-
Deleted: Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/subscriber.py
===================================================================
--- Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/subscriber.py 2004-05-26 11:28:36 UTC (rev 25000)
+++ Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/subscriber.py 2004-05-26 11:30:37 UTC (rev 25001)
@@ -1,44 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""
-This contains some dummy stuff to do with subscribing to event channels
-that's useful in several test modules.
-
-$Id$
-"""
-from zope.app.event.interfaces import IFilter, ISubscriber
-from zope.interface import implements
-
-class DummySubscriber:
-
- implements(ISubscriber)
-
- def __init__(self):
- self.notified = 0
-
- def notify(self, event):
- self.notified += 1
-
-subscriber = DummySubscriber()
-
-class DummyFilter:
- implements(IFilter)
-
- def __init__(self,value=1):
- self.value = value
-
- def __call__(self, event):
- return self.value
-
-filter = DummyFilter
Deleted: Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_eventpublisher.py
===================================================================
--- Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_eventpublisher.py 2004-05-26 11:28:36 UTC (rev 25000)
+++ Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_eventpublisher.py 2004-05-26 11:30:37 UTC (rev 25001)
@@ -1,283 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""
-$Id$
-"""
-import unittest
-
-from zope.app.event.interfaces import IObjectEvent
-from zope.app.container.interfaces import IObjectAddedEvent
-from zope.app.container.interfaces import IObjectRemovedEvent
-from zope.app.event.interfaces import IObjectModifiedEvent
-from zope.app.event.objectevent import ObjectModifiedEvent
-from zope.app.container.contained import ObjectAddedEvent
-from zope.app.event.globalservice import GlobalEventPublisher
-from zope.exceptions import NotFoundError
-from zope.app.event.interfaces import IEvent
-from zope.component.tests.placelesssetup import PlacelessSetup
-
-from zope.app.event.tests.subscriber import DummySubscriber, DummyFilter
-from zope.interface import implements
-
-class DummyEvent:
-
- implements(IObjectAddedEvent, IObjectRemovedEvent)
- object = None
-
-class ObjectEvent:
-
- implements(IObjectEvent)
- object = None
-
-class TestEventService(PlacelessSetup, unittest.TestCase):
-
- def setUp(self):
- super(TestEventService, self).setUp()
- self.service = GlobalEventPublisher()
- parent = object()
- self.event = ObjectAddedEvent(None, parent, 'foo')
- self.subscriber = DummySubscriber()
-
- def testSubscribe1(self):
- # Test subscribe method with one parameter
- self.service.globalSubscribe(self.subscriber)
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 1)
-
- def testSubscribe2(self):
- # Test subscribe method with two parameters
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent
- )
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 1)
-
- def testSubscribe3(self):
- # Test subscribe method with three parameters
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent,
- filter=DummyFilter()
- )
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 1)
-
- def testSubscribe4(self):
- # Test subscribe method with three parameters and an always failing
- # filter.
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent,
- filter=DummyFilter(0)
- )
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 0)
-
- def testSubscribe5(self):
- # Test subscribe method with three parameters and an irrelevent event
- # type.
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectModifiedEvent,
- filter=DummyFilter()
- )
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 0)
-
- def testSubscribe6(self):
- # Test subscribe method where the event type registered is a
- # generalised interface of the event passed to the 'publish' method.
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectEvent
- )
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 1)
-
- def testSubscribe7(self):
- # Test subscribe method where one of the event types registered is not
- # interested in the published event.
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectModifiedEvent
- )
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent
- )
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 1)
-
- def testSubscribe8(self):
- # Test subscribe method where the same subscriber subscribes multiple
- # times.
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent,
- filter=DummyFilter()
- )
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent,
- filter=DummyFilter()
- )
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent,
- filter=DummyFilter(0)
- )
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 2)
-
- def testUnsubscribe1(self):
- # Test unsubscribe method
- subscriber = self.subscriber
- self.service.globalSubscribe(subscriber)
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 1)
- self.service.unsubscribe(subscriber)
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 1)
-
- def testUnsubscribe2(self):
- # Test unsubscribe of something that hasn't been subscribed
- subscriber = self.subscriber
- self.assertRaises(NotFoundError,
- self.service.unsubscribe,
- subscriber, IObjectEvent)
- self.assertEqual(None,
- self.service.unsubscribe(subscriber))
-
- def testUnsubscribe3(self):
- # Test selective unsubscribe
- subscriber2=DummySubscriber()
- filter=DummyFilter()
- event2=ObjectModifiedEvent(None)
- self.service.globalSubscribe(
- self.subscriber)
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent,
- filter=filter
- )
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent
- )
- self.service.globalSubscribe(
- subscriber2,
- event_type=IObjectAddedEvent
- )
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 3)
- self.assertEqual(subscriber2.notified, 1)
- self.service.publish(event2)
- self.assertEqual(self.subscriber.notified, 4)
- self.assertEqual(subscriber2.notified, 1)
- self.service.unsubscribe(self.subscriber, IObjectAddedEvent)
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 6)
- self.assertEqual(subscriber2.notified, 2)
- self.service.unsubscribe(self.subscriber, IEvent)
- self.service.publish(event2)
- self.assertEqual(self.subscriber.notified, 6)
- self.assertEqual(subscriber2.notified, 2)
- self.assertRaises(NotFoundError, self.service.unsubscribe,
- self.subscriber, IObjectAddedEvent)
- self.service.unsubscribe(self.subscriber, IObjectAddedEvent, filter)
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 6)
- self.assertEqual(subscriber2.notified, 3)
- self.service.unsubscribe(subscriber2, IObjectAddedEvent)
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 6)
- self.assertEqual(subscriber2.notified, 3)
-
- def testpublish1(self):
- # Test publish method
- subscriber = self.subscriber
- self.service.globalSubscribe(subscriber)
- self.assertEqual(self.subscriber.notified, 0)
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 1)
-
- def testpublish2(self):
- # Test publish method where subscriber has been subscribed twice, with
- # a more generalised version of the initially subscribed interface in
- # the second subscription.
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectEvent,
- )
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent,
- )
- self.service.publish(self.event)
- self.assertEqual(self.subscriber.notified, 2)
-
- def testpublish3(self):
- # Test publish method where subscriber has been to two interfaces and
- # a single event implements both of those interfaces.
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectRemovedEvent
- )
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent
- )
- self.service.publish(DummyEvent())
- self.assertEqual(self.subscriber.notified, 2)
-
- def testpublish4(self):
- # Test publish method to make sure that we don't 'leak registrations
- # up' sez Jim.
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectEvent
- )
- self.service.globalSubscribe(
- self.subscriber,
- event_type=IObjectAddedEvent
- )
- self.service.publish(ObjectEvent())
- self.assertEqual(self.subscriber.notified, 1)
-
- def testListSubscriptions1(self):
- # a non-subscribed subscriber gets an empty array
- self.assertEqual([], self.service.listSubscriptions(self.subscriber))
-
- def testListSubscriptions2(self):
- # one subscription
- self.service.globalSubscribe(
- self.subscriber, event_type=IObjectAddedEvent)
- self.assertEqual([(IObjectAddedEvent, None)],
- self.service.listSubscriptions(self.subscriber))
-
- def testListSubscriptions3(self):
- # listing limited subscription
- self.service.globalSubscribe(
- self.subscriber, event_type=IObjectAddedEvent)
- L = self.service.listSubscriptions(self.subscriber,
- IObjectRemovedEvent)
- self.assertEqual([], L)
-
-
-def test_suite():
- return unittest.makeSuite(TestEventService)
-
-if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')
Deleted: Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_globaleventchannel.py
===================================================================
--- Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_globaleventchannel.py 2004-05-26 11:28:36 UTC (rev 25000)
+++ Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_globaleventchannel.py 2004-05-26 11:30:37 UTC (rev 25001)
@@ -1,207 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""A functional GlobalEventChannel test.
-
-$Id$
-"""
-from unittest import TestCase, TestSuite, main, makeSuite
-
-from zope.interface import Interface, implements
-from zope.component.tests.placelesssetup import PlacelessSetup
-from zope.component.tests.components import RecordingAdapter
-from zope.app.event.interfaces import IEvent, ISubscribingAware, ISubscriber
-from zope.app.tests import ztapi
-
-class ISomeEvent(IEvent):
- pass
-
-class ISomeSubEvent(ISomeEvent):
- pass
-
-class ISomeOtherEvent(IEvent):
- pass
-
-class ISubscriberStub(ISubscriber):
- pass
-
-class INonSubscriberStub(Interface):
- pass
-
-class SomeEvent:
- implements(ISomeEvent)
-
-class SomeSubEvent:
- implements(ISomeSubEvent)
-
-class SomeOtherEvent:
- implements(ISomeOtherEvent)
-
-class SubscriberStub:
- implements(ISubscriberStub)
- received = None
- def notify(self, event):
- self.received = event
-
-class NonSubscriberStub:
- implements(INonSubscriberStub)
-
-
-class Test(PlacelessSetup, TestCase):
-
- def test_notify(self):
- from zope.app.event.globalservice import GlobalEventChannel
-
- subscriber = SubscriberStub()
- ec = GlobalEventChannel()
- ec.globalSubscribe(subscriber, ISomeEvent)
-
- ev = SomeEvent()
- ec.notify(ev)
- self.assertEquals(subscriber.received, ev,
- "Did not get event registered for")
-
- ev = SomeSubEvent()
- ec.notify(ev)
- self.assertEquals(
- subscriber.received, ev, "Did not get subclassed event")
-
- ev = SomeOtherEvent()
- ec.notify(ev)
- self.assertNotEquals(subscriber.received, ev, "Got unrelated event")
-
- def test_notify_filter(self):
- from zope.app.event.globalservice import GlobalEventChannel
-
- true = lambda x: True
- false = lambda x: False
-
- subscriber = SubscriberStub()
- ec = GlobalEventChannel()
- ec.globalSubscribe(subscriber, ISomeEvent, true)
-
- ev = SomeEvent()
- ec.notify(ev)
- self.assertEquals(subscriber.received, ev,
- "Did not get event registered for")
-
- subscriber = SubscriberStub()
- ec = GlobalEventChannel()
- ec.globalSubscribe(subscriber, ISomeEvent, false)
-
- ev = SomeEvent()
- ec.notify(ev)
- self.assertEquals(subscriber.received, None,
- "Event was not filtered")
-
-class SubAware(RecordingAdapter):
- implements(ISubscribingAware)
-
- def subscribedTo(self, subscribable, event_type, filter):
- self.record.append(('subscribed', self.context, subscribable,
- event_type, filter))
-
- def unsubscribedFrom(self, subscribable, event_type, filter):
- self.record.append(('unsubscribed', self.context, subscribable,
- event_type, filter))
-
-
-class TestSubscribingAwareChannel(PlacelessSetup, TestCase):
-
- def setUpChannel(self):
- from zope.app.event.globalservice import GlobalEventChannel
- self.ec = GlobalEventChannel()
-
- def setUp(self):
- super(TestSubscribingAwareChannel, self).setUp()
- self.setUpChannel()
- self.subscriber = SubscriberStub()
- self.filter = lambda x: True
- self.subaware = SubAware()
- ztapi.provideAdapter(ISubscriberStub, ISubscribingAware, self.subaware)
-
- def test_subscribe(self):
- self.ec.globalSubscribe(self.subscriber, ISomeEvent, self.filter)
- self.subaware.check(
- ('subscribed', self.subscriber, self.ec, ISomeEvent, self.filter)
- )
-
- def test_unsubscribe(self):
- self.test_subscribe()
- self.ec.unsubscribe(self.subscriber, ISomeEvent, self.filter)
- self.subaware.check(
- ('subscribed', self.subscriber, self.ec, ISomeEvent, self.filter),
- ('unsubscribed', self.subscriber, self.ec, ISomeEvent, self.filter),
- )
-
-class TestSubscribingAwareGlobalPublisher(TestSubscribingAwareChannel):
-
- def setUpChannel(self):
- from zope.app.event.globalservice import GlobalEventPublisher
- self.ec = GlobalEventPublisher()
-
-class SubscriberAdapter(RecordingAdapter):
- implements(ISubscriber)
-
- def notify(self, event):
- self.record.append(('notified', self.context, event))
-
-class TestAdaptingToISubscriberBase(PlacelessSetup, TestCase):
-
- def setUpChannel(self):
- raise NotImplementedError('You need to write a setUpChannel method.')
-
- def setUp(self):
- super(TestAdaptingToISubscriberBase, self).setUp()
- self.setUpChannel()
- self.subscriber = NonSubscriberStub()
- self.event = SomeEvent()
- self.adapter = SubscriberAdapter()
- ztapi.provideAdapter(INonSubscriberStub, ISubscriber, self.adapter)
-
-class TestAdaptingToISubscriberOnNotify(TestAdaptingToISubscriberBase):
- def setUpChannel(self):
- from zope.app.event.globalservice import GlobalEventChannel
- self.ec = GlobalEventChannel()
-
- def test_notify(self):
- self.ec.globalSubscribe(self.subscriber, ISomeEvent)
- self.ec.notify(self.event)
- self.adapter.check(
- ('notified', self.subscriber, self.event)
- )
-
-class TestAdaptingToISubscriberOnPublish(TestAdaptingToISubscriberBase):
- def setUpChannel(self):
- from zope.app.event.globalservice import GlobalEventPublisher
- self.ec = GlobalEventPublisher()
-
- def test_notify(self):
- self.ec.globalSubscribe(self.subscriber, ISomeEvent)
- self.ec.publish(self.event)
- self.adapter.check(
- ('notified', self.subscriber, self.event)
- )
-
-
-def test_suite():
- return TestSuite((
- makeSuite(Test),
- makeSuite(TestSubscribingAwareChannel),
- makeSuite(TestSubscribingAwareGlobalPublisher),
- makeSuite(TestAdaptingToISubscriberOnNotify),
- makeSuite(TestAdaptingToISubscriberOnPublish),
- ))
-
-if __name__=='__main__':
- main(defaultTest='test_suite')
Deleted: Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_localservice.py
===================================================================
--- Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_localservice.py 2004-05-26 11:28:36 UTC (rev 25000)
+++ Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_localservice.py 2004-05-26 11:30:37 UTC (rev 25001)
@@ -1,979 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""
-$Id$
-"""
-from unittest import TestCase, TestLoader, TextTestRunner
-
-import zope.interface
-from zope.interface import Interface, implements
-from zope.exceptions import NotFoundError
-from zope.component.tests.components import RecordingAdapter
-
-from zope.app.servicenames import EventPublication, EventSubscription
-from zope.app.event.localservice import EventService
-from zope.app.event.localservice import subscribe, unsubscribe, unsubscribeAll
-from zope.app.event.localservice import getSubscriptionService
-
-from zope.app.event import publish
-from zope.app.event.tests.subscriber import DummySubscriber, DummyFilter
-from zope.app.event.interfaces import IObjectEvent, IObjectModifiedEvent
-from zope.app.event.objectevent import ObjectModifiedEvent
-from zope.app.event.interfaces import IEvent, ISubscriber, ISubscribingAware
-
-from zope.app.traversing.api import getPath, traverse
-from zope.app.traversing.interfaces import IContainmentRoot
-from zope.app.container.interfaces import IObjectAddedEvent, IObjectRemovedEvent
-from zope.app.container.contained import ObjectAddedEvent, Contained
-from zope.app.registration.interfaces import RegisteredStatus
-from zope.app.site.interfaces import ISimpleService
-from zope.app.tests import ztapi, setup
-from eventsetup import EventSetup
-
-class Folder(Contained):
- pass
-
-root = Folder()
-zope.interface.directlyProvides(root, IContainmentRoot)
-foo = Folder()
-foo.__parent__ = root
-foo.__name__ = 'foo'
-
-
-class UnpromotingEventService(EventService):
-
- def isPromotableEvent(self, event):
- "see EventService implementation"
- return False
-
-class DummyEvent:
-
- implements(IObjectAddedEvent, IObjectRemovedEvent)
- object = None
-
-class ObjectEvent:
-
- implements(IObjectEvent)
- object = None
-
-class IObjectHub(Interface):
- def getObject(hubid):
- "gets object"
-
- def getHubId(object):
- "gets hubid"
-
- def getLocation(hubId):
- "gets location"
-
-class DumbObjectHub:
- implements(IObjectHub, ISimpleService)
-
- __name__ = __parent__ = None
-
- def __init__(self):
- # (location, object)
- self.lib = []
-
- def getObject(self, hubid):
- try:
- return self.lib[hubid][1]
- except IndexError:
- raise NotFoundError
-
- def getHubId(self, object_or_path):
- for i in range(len(self.lib)):
- if self.lib[i][0] == object_or_path:
- return i
- if self.lib[i][1] is object_or_path:
- return i
- raise NotFoundError, object_or_path
-
- def getLocation(self, hubId):
- return self.lib[hubId][0]
-
-class IHasSubscribingAwareAdapter(Interface):
- pass
-
-class HasSubscribingAwareAdapter(DummySubscriber):
- implements(IHasSubscribingAwareAdapter, ISubscriber)
-
-
-class SubscribingAwareAdapter(RecordingAdapter):
-
- implements(ISubscribingAware)
-
- def subscribedTo(self, subscribable, event_type, filter):
- self.record.append(('subscribed', self.context, subscribable,
- event_type, filter))
-
- def unsubscribedFrom(self, subscribable, event_type, filter):
- self.record.append(('unsubscribed', self.context, subscribable,
- event_type, filter))
-
-
-class TestEventPublisher(EventSetup, TestCase):
-
- def setUp(self):
- EventSetup.setUp(self)
-
- def getObjectHub(self):
- self.objectHub = DumbObjectHub()
- return self.objectHub
-
- def _createNestedServices(self):
- for path in ('folder1', 'folder1/folder1_1',
- 'folder1/folder1_1/folder1_1_1'):
- sm = self.makeSite(path)
- events = EventService()
- setup.addService(sm, EventPublication, events)
- setup.addService(sm, EventSubscription, events, suffix='s')
-
- def _createSubscribers(self):
- self.rootFolder["rootFolderSubscriber"] = DummySubscriber()
- self.rootFolderSubscriber = self.rootFolder["rootFolderSubscriber"]
- self.folder1["folder1Subscriber"] = DummySubscriber()
- self.folder1Subscriber = self.folder1["folder1Subscriber"]
- self.folder1_1["folder1_1Subscriber"] = DummySubscriber()
- self.folder1_1Subscriber = self.folder1_1["folder1_1Subscriber"]
-
- def _createHubIdSubscribers(self):
- self._createSubscribers()
- self.objectHub.lib = [
- ('/rootFolderSubscriber', self.rootFolderSubscriber),
- ('/folder1/folder1Subscriber', self.folder1Subscriber),
- ('/folder1/folder1_1/folder1_1Subscriber', self.folder1_1Subscriber)
- ]
- self.rootSubscriberHubId = 0
- self.folder1SubscriberHubId = 1
- self.folder1_1SubscriberHubId = 2
-
- def testCreateNestedServices(self):
- self._createNestedServices()
-
- def testByPath(self):
- # test complex interaction, with no hubids available
- self._createSubscribers()
- root = subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- folder1 = subscribe(self.folder1Subscriber,
- event_type=IObjectAddedEvent)
- folder1_1 = subscribe(self.folder1_1Subscriber,
- event_type=IObjectAddedEvent)
- self.assertEqual(root, getPath(self.rootFolderSubscriber))
- self.assertEqual(folder1, getPath(self.folder1Subscriber))
- self.assertEqual(folder1_1, getPath(self.folder1_1Subscriber))
- publish(self.folder1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
- self.assertEqual(self.folder1Subscriber.notified, 1)
- self.assertEqual(self.folder1_1Subscriber.notified, 1)
- rootPath = getPath(self.rootFolderSubscriber)
- folder1Path = getPath(self.folder1Subscriber)
- folder1_1Path = getPath(self.folder1_1Subscriber)
- unsubscribeAll(rootPath, context=self.rootFolder)
- # curve ball:
- unsubscribeAll(self.folder1Subscriber, context=self.folder1_1)
- unsubscribe(folder1_1Path,
- event_type=IObjectAddedEvent,
- context=self.folder1_1)
- publish(self.folder1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
- self.assertEqual(self.folder1Subscriber.notified, 1)
- self.assertEqual(self.folder1_1Subscriber.notified, 1)
-
- def testByHubId(self):
- # test complex interaction, with hubids available
- self._createHubIdSubscribers()
- root = subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- folder1 = subscribe(self.folder1Subscriber,
- event_type=IObjectAddedEvent)
- folder1_1 = subscribe(self.folder1_1Subscriber,
- event_type=IObjectAddedEvent)
- self.assertEqual(
- self.objectHub.lib[self.rootSubscriberHubId],
- ('/rootFolderSubscriber', self.rootFolderSubscriber)
- )
- self.assertEqual(
- self.objectHub.lib[self.folder1SubscriberHubId],
- ('/folder1/folder1Subscriber', self.folder1Subscriber)
- )
- self.assertEqual(
- self.objectHub.lib[self.folder1_1SubscriberHubId],
- ('/folder1/folder1_1/folder1_1Subscriber',
- self.folder1_1Subscriber)
- )
- publish(self.folder1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
- self.assertEqual(self.folder1Subscriber.notified, 1)
- self.assertEqual(self.folder1_1Subscriber.notified, 1)
- unsubscribe(getPath(self.rootFolderSubscriber),
- event_type=IObjectAddedEvent,
- context=self.rootFolder)
- subscribe(self.rootFolderSubscriber,
- event_type=IObjectAddedEvent)
- unsubscribeAll(self.rootSubscriberHubId, context=self.rootFolder)
- # curve balls:
- unsubscribeAll(self.folder1Subscriber, context=self.folder1_1)
- unsubscribe(2,
- event_type=IObjectAddedEvent,
- context=self.folder1_1)
- publish(self.folder1, ObjectAddedEvent(None, 'fauxparent', 'foo'))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
- self.assertEqual(self.folder1Subscriber.notified, 1)
- self.assertEqual(self.folder1_1Subscriber.notified, 1)
-
- def testBadSubscriber(self):
- self._createSubscribers()
- root = subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- folder1 = subscribe(self.folder1Subscriber,
- event_type=IObjectAddedEvent)
- folder1_1 = subscribe(self.folder1_1Subscriber,
- event_type=IObjectAddedEvent)
- self.assertEqual(root, getPath(self.rootFolderSubscriber))
- self.assertEqual(folder1, getPath(self.folder1Subscriber))
- self.assertEqual(folder1_1, getPath(self.folder1_1Subscriber))
- # Remove folder1Subscriber, so that the event service will not
- # be able to notify it.
- folder1Subscriber = self.folder1['folder1Subscriber']
- del self.folder1['folder1Subscriber']
-
- publish(self.folder1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
- self.assertEqual(self.folder1Subscriber.notified, 0)
- self.assertEqual(self.folder1_1Subscriber.notified, 1)
-
- # Now, put folder1Subscriber back. This incidentally fires off a
- # ObjectAddedEvent, since self.folder1 is decorated with a context
- # decorator.
- self.folder1['folder1Subscriber'] = folder1Subscriber
- self.assertEqual(self.rootFolderSubscriber.notified, 2)
- self.assertEqual(self.folder1Subscriber.notified, 0)
- self.assertEqual(self.folder1_1Subscriber.notified, 2)
-
- # folder1Subscriber should not be notified now, because it was removed
- # as a bad subscriber.
- publish(self.folder1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 3)
- self.assertEqual(self.folder1Subscriber.notified, 0)
- self.assertEqual(self.folder1_1Subscriber.notified, 3)
-
- def testByPathExplicit(self):
- # test complex interaction, with hubids available but explicitly
- # using paths
- self._createHubIdSubscribers()
- rootPath = getPath(self.rootFolderSubscriber)
- folder1Path = getPath(self.folder1Subscriber)
- folder1_1Path = getPath(self.folder1_1Subscriber)
- self.assertEqual(
- rootPath,
- subscribe(
- rootPath,
- event_type=IObjectAddedEvent,
- context=self.rootFolder))
- self.assertEqual(
- folder1Path,
- subscribe(
- folder1Path,
- event_type=IObjectAddedEvent,
- context=self.folder1))
- self.assertEqual(
- folder1_1Path,
- subscribe(
- folder1_1Path,
- event_type=IObjectAddedEvent,
- context=self.folder1_1))
- publish(self.folder1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
- self.assertEqual(self.folder1Subscriber.notified, 1)
- self.assertEqual(self.folder1_1Subscriber.notified, 1)
- unsubscribeAll(rootPath, context=self.rootFolder)
- # curve balls:
- unsubscribeAll(self.folder1Subscriber, context=self.folder1_1)
- unsubscribe(2, event_type=IObjectAddedEvent, context=self.folder1_1)
- subscribe(2, event_type=IObjectAddedEvent, context=self.folder1_1)
-
- # this is supposed to unsubscribe '2'
- unsubscribe(folder1_1Path,
- event_type=IObjectAddedEvent,
- context=self.folder1_1)
- publish(self.folder1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
- self.assertEqual(self.folder1Subscriber.notified, 1)
- self.assertEqual(self.folder1_1Subscriber.notified, 1)
-
- def testByHubIdExplicit(self):
- # test complex interaction, with hubids available and explicitly
- # using them
- self._createHubIdSubscribers()
- root = subscribe(
- 0,
- event_type=IObjectAddedEvent,
- context = self.folder1_1
- )
- folder1 = subscribe(1,
- event_type=IObjectAddedEvent,
- context = self.folder1)
- folder1_1 = subscribe(2,
- event_type=IObjectAddedEvent,
- context = self.rootFolder)
- self.assertEqual(
- self.objectHub.lib[root],
- ('/rootFolderSubscriber', self.rootFolderSubscriber)
- )
- self.assertEqual(
- self.objectHub.lib[folder1],
- ('/folder1/folder1Subscriber', self.folder1Subscriber)
- )
- self.assertEqual(
- self.objectHub.lib[folder1_1],
- ('/folder1/folder1_1/folder1_1Subscriber',
- self.folder1_1Subscriber)
- )
- publish(self.folder1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
- self.assertEqual(self.folder1Subscriber.notified, 1)
- self.assertEqual(self.folder1_1Subscriber.notified, 1)
- unsubscribe(getPath(self.rootFolderSubscriber),
- event_type=IObjectAddedEvent,
- context=self.rootFolder)
- subscribe(self.rootFolderSubscriber, event_type=IObjectAddedEvent,
- context=self.rootFolder)
- unsubscribeAll(root, context=self.rootFolder)
- # curve ball:
- unsubscribeAll(self.folder1Subscriber, context=self.folder1_1)
- unsubscribe(2,
- event_type=IObjectAddedEvent,
- context=self.folder1_1)
- publish(self.folder1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
- self.assertEqual(self.folder1Subscriber.notified, 1)
- self.assertEqual(self.folder1_1Subscriber.notified, 1)
-
- def _testIterSubscriptions1(self):
- # a non-subscribed subscriber gets an empty array
- events = getSubscriptionService(self.rootFolder)
-
- self.assertEqual(
- [x for x in events.iterSubscriptions(self.rootFolderSubscriber)],
- [])
-
- def testPathIterSubscriptions1(self):
- self._createSubscribers()
- self._testIterSubscriptions1()
-
- def testHubIdIterSubscriptions1(self):
- self._createHubIdSubscribers()
- self._testIterSubscriptions1()
-
- def _testIterSubscriptions2(self, subscription_type):
- # one subscription
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- if subscription_type is int:
- reference = 0
- else:
- reference = u'/rootFolderSubscriber'
- events = getSubscriptionService(self.rootFolder)
- self.assertEqual(
- [x for x in events.iterSubscriptions(self.rootFolderSubscriber)],
- [(reference, IObjectAddedEvent, None)]
- )
-
- def testPathIterSubscriptions2(self):
- self._createSubscribers()
- self._testIterSubscriptions2(unicode)
-
- def testHubIdIterSubscriptions2(self):
- self._createHubIdSubscribers()
- self._testIterSubscriptions2(int)
-
- def _testIterSubscriptions3(self):
- # listing limited subscription
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- self.assertEqual(
- [],
- [x for x in getSubscriptionService(self.rootFolder)
- .iterSubscriptions(self.rootFolderSubscriber, IObjectRemovedEvent)]
- )
-
- def testPathIterSubscriptions3(self):
- self._createSubscribers()
- self._testIterSubscriptions3()
-
- def testHubIdIterSubscriptions3(self):
- self._createHubIdSubscribers()
- self._testIterSubscriptions3()
-
- def _testIterSubscriptions4(self):
- # a non-subscribed subscriber gets an empty array
- events = getSubscriptionService(self.rootFolder)
-
- self.assertEqual(
- [x for x in events.iterSubscriptions(self.rootFolderSubscriber)],
- [])
-
- def testPathIterSubscriptions4(self):
- self._createSubscribers()
- self._testIterSubscriptions4()
-
- def testHubIdIterSubscriptions4(self):
- self._createHubIdSubscribers()
- self._testIterSubscriptions4()
-
- def _testIterSubscriptions5(self, subscription_type):
- if subscription_type is int:
- reference = 0
- else:
- reference = u'/rootFolderSubscriber'
- # one subscription
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- self.assertEqual(
- [x for x in getSubscriptionService(self.rootFolder)
- .iterSubscriptions(self.rootFolderSubscriber)],
- [(reference, IObjectAddedEvent, None)]
- )
-
- def testPathIterSubscriptions5(self):
- self._createSubscribers()
- self._testIterSubscriptions5(unicode)
-
- def testHubIdIterSubscriptions5(self):
- self._createHubIdSubscribers()
- self._testIterSubscriptions5(int)
-
- def _testIterSubscriptions6(self):
- # listing limited subscription
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- self.assertEqual(
- [x for x in getSubscriptionService(self.rootFolder)
- .iterSubscriptions(self.rootFolderSubscriber, IObjectRemovedEvent)],
- []
- )
-
- def testPathIterSubscriptions6(self):
- self._createSubscribers()
- self._testIterSubscriptions6()
-
- def testHubIdIterSubscriptions6(self):
- self._createHubIdSubscribers()
- self._testIterSubscriptions6()
-
- def _testSubscribe1(self):
- # Test subscribe method with one parameter
- subscribe(self.rootFolderSubscriber)
- publish(self.rootFolder, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
-
- def testPathSubscribe1(self):
- self._createSubscribers()
- self._testSubscribe1()
-
- def testHubIdSubscribe1(self):
- self._createHubIdSubscribers()
- self._testSubscribe1()
-
- def _testSubscribe2(self):
- # Test subscribe method with two parameters
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- publish(self.folder1_1_1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
-
- def testPathSubscribe2(self):
- self._createSubscribers()
- self._testSubscribe2()
-
- def testHubIdSubscribe2(self):
- self._createHubIdSubscribers()
- self._testSubscribe2()
-
- def _testSubscribe3(self):
- # Test subscribe method with three parameters
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent,
- filter=DummyFilter()
- )
- publish(self.folder1_1_1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
-
- def testPathSubscribe3(self):
- self._createSubscribers()
- self._testSubscribe3()
-
- def testHubIdSubscribe3(self):
- self._createHubIdSubscribers()
- self._testSubscribe3()
-
- def _testSubscribe4(self):
- # Test subscribe method with three parameters and an always failing
- # filter.
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent,
- filter=DummyFilter(0)
- )
- publish(self.folder1_1_1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 0)
-
- def testPathSubscribe4(self):
- self._createSubscribers()
- self._testSubscribe4()
-
- def testHubIdSubscribe4(self):
- self._createHubIdSubscribers()
- self._testSubscribe4()
-
- def _testSubscribe5(self):
- # Test subscribe method with three parameters and an irrelevent event
- # type.
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectModifiedEvent,
- filter=DummyFilter()
- )
- publish(self.folder1_1_1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 0)
-
- def testPathSubscribe5(self):
- self._createSubscribers()
- self._testSubscribe5()
-
- def testHubIdSubscribe5(self):
- self._createHubIdSubscribers()
- self._testSubscribe5()
-
- def _testSubscribe6(self):
- # Test subscribe method where the event type registered is a
- # generalised interface of the event passed to the 'publish' method.
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectEvent
- )
- publish(self.folder1_1_1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
-
- def testPathSubscribe6(self):
- self._createSubscribers()
- self._testSubscribe6()
-
- def testHubIdSubscribe6(self):
- self._createHubIdSubscribers()
- self._testSubscribe6()
-
- def _testSubscribe7(self):
- # Test subscribe method where one of the event types registered is not
- # interested in the published event.
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectModifiedEvent
- )
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- publish(self.folder1_1_1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
-
- def testPathSubscribe7(self):
- self._createSubscribers()
- self._testSubscribe7()
-
- def testHubIdSubscribe7(self):
- self._createHubIdSubscribers()
- self._testSubscribe7()
-
- def _testSubscribe8(self):
- # Test subscribe method where the same subscriber subscribes multiple
- # times.
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent,
- filter=DummyFilter(0)
- )
- publish(self.folder1_1_1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 2)
-
- def testPathSubscribe8(self):
- self._createSubscribers()
- self._testSubscribe8()
-
- def testHubIdSubscribe8(self):
- self._createHubIdSubscribers()
- self._testSubscribe8()
-
- def _testUnsubscribe1(self):
- # Test unsubscribe method
- subscribe(
- self.rootFolderSubscriber
- )
- publish(self.folder1_1_1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
- unsubscribeAll(
- self.rootFolderSubscriber
- )
- publish(self.folder1_1_1, ObjectAddedEvent(foo))
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
-
- def testPathUnsubscribe1(self):
- self._createSubscribers()
- self._testUnsubscribe1()
-
- def testHubIdUnsubscribe1(self):
- self._createHubIdSubscribers()
- self._testUnsubscribe1()
-
- def _testUnsubscribe2(self):
- # Test unsubscribe of something that hasn't been subscribed
- self.assertRaises(NotFoundError,
- unsubscribe,
- 69,
- IObjectEvent)
- unsubscribeAll(self.rootFolderSubscriber)
-
- def testPathUnsubscribe2(self):
- self._createSubscribers()
- self._testUnsubscribe2()
-
- def testHubIdUnsubscribe2(self):
- self._createHubIdSubscribers()
- self._testUnsubscribe2()
-
- def _testUnsubscribe3(self):
- # Test selective unsubscribe
- subscriber = self.rootFolderSubscriber
- subscriber2 = self.folder1Subscriber
- filter = DummyFilter()
- event = ObjectAddedEvent(foo)
- event2 = ObjectModifiedEvent(None)
- subscribe(
- subscriber)
- subscribe(
- subscriber,
- event_type=IObjectAddedEvent,
- filter=filter
- )
- subscribe(
- subscriber,
- event_type=IObjectAddedEvent
- )
- subscribe(
- subscriber2,
- event_type=IObjectAddedEvent
- )
- publish(self.folder1_1_1, event)
- self.assertEqual(subscriber.notified, 3)
- self.assertEqual(subscriber2.notified, 1)
- publish(self.folder1_1_1, event2)
- self.assertEqual(subscriber.notified, 4)
- self.assertEqual(subscriber2.notified, 1)
- unsubscribe(subscriber, IObjectAddedEvent)
- publish(self.folder1_1_1, event)
- self.assertEqual(subscriber.notified, 6)
- self.assertEqual(subscriber2.notified, 2)
- unsubscribe(subscriber, IEvent)
- publish(self.folder1_1_1, event2)
- self.assertEqual(subscriber.notified, 6)
- self.assertEqual(subscriber2.notified, 2)
- publish(self.folder1_1_1, event)
- self.assertEqual(subscriber.notified, 7)
- self.assertEqual(subscriber2.notified, 3)
- self.assertRaises(NotFoundError, unsubscribe, subscriber,
- IObjectAddedEvent)
- unsubscribe(subscriber, IObjectAddedEvent, filter)
- publish(self.folder1_1_1, event)
- self.assertEqual(subscriber.notified, 7)
- self.assertEqual(subscriber2.notified, 4)
- unsubscribe(subscriber2, IObjectAddedEvent)
- publish(self.folder1_1_1, event)
- self.assertEqual(subscriber.notified, 7)
- self.assertEqual(subscriber2.notified, 4)
-
- def testPathUnsubscribe3(self):
- self._createSubscribers()
- self._testUnsubscribe3()
-
- def testHubIdUnsubscribe3(self):
- self._createHubIdSubscribers()
- self._testUnsubscribe3()
-
- def testPathUnsubscribe4(self):
- self._createNestedServices()
- self._createSubscribers()
- self._testUnsubscribe3()
-
- def testHubIdUnsubscribe4(self):
- self._createNestedServices()
- self._createHubIdSubscribers()
- self._testUnsubscribe3()
-
- def _testpublish1(self):
- # Test publish method
- subscriber = self.rootFolderSubscriber
- subscribe(subscriber)
- self.assertEqual(subscriber.notified, 0)
- publish(self.folder1_1_1, ObjectAddedEvent(foo))
- self.assertEqual(subscriber.notified, 1)
-
- def testPathPublish1(self):
- self._createSubscribers()
- self._testpublish1()
-
- def testHubIdPublish1(self):
- self._createHubIdSubscribers()
- self._testpublish1()
-
- def _testpublish2(self):
- # Test publish method where subscriber has been subscribed twice, with
- # a more generalised version of the initially subscribed interface in
- # the second subscription.
- subscriber = self.rootFolderSubscriber
- subscribe(
- subscriber,
- event_type=IObjectEvent,
- )
- subscribe(
- subscriber,
- event_type=IObjectAddedEvent,
- )
- publish(self.folder1_1_1, ObjectAddedEvent(foo))
- self.assertEqual(subscriber.notified, 2)
-
- def testPathPublish2(self):
- self._createSubscribers()
- self._testpublish2()
-
- def testHubIdPublish2(self):
- self._createHubIdSubscribers()
- self._testpublish2()
-
- def _testpublish3(self):
- # Test publish method where subscriber has been to two interfaces and
- # a single event implements both of those interfaces.
- subscriber = self.rootFolderSubscriber
- subscribe(
- subscriber,
- event_type=IObjectRemovedEvent
- )
- subscribe(
- subscriber,
- event_type=IObjectAddedEvent
- )
- publish(self.folder1_1_1, DummyEvent())
- self.assertEqual(subscriber.notified, 2)
-
- def testPathPublish3(self):
- self._createSubscribers()
- self._testpublish3()
-
- def testHubIdPublish3(self):
- self._createHubIdSubscribers()
- self._testpublish3()
-
- def _testpublish4(self):
- # Test publish method to make sure that we don't 'leak registrations
- # up' sez Jim.
- subscriber = self.rootFolderSubscriber
- subscribe(
- subscriber,
- event_type=IObjectEvent
- )
- subscribe(
- subscriber,
- event_type=IObjectAddedEvent
- )
- publish(self.folder1_1_1, ObjectEvent())
- self.assertEqual(subscriber.notified, 1)
-
- def testPathPublish4(self):
- self._createSubscribers()
- self._testpublish4()
-
- def testHubIdPublish4(self):
- self._createHubIdSubscribers()
- self._testpublish4()
-
- def _createAlternateService(self, service):
- self.folder2["folder2Subscriber"] = DummySubscriber()
- self.folder2Subscriber = self.folder2["folder2Subscriber"]
-
- sm = self.makeSite('folder2')
- setup.addService(sm, EventPublication, service);
- setup.addService(sm, EventSubscription, service, suffix='s');
-
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- subscribe(
- self.folder2Subscriber,
- event_type=IObjectAddedEvent
- )
-
- def testNonPromotingEventPublisher(self):
- # test to see if events are not passed on to a parent event service
- # with the appropriate isPromotableEvent setting
- self._createHubIdSubscribers()
- self._createAlternateService(UnpromotingEventService())
- publish(self.folder2, ObjectAddedEvent(foo))
- self.assertEqual(self.folder2Subscriber.notified, 1)
- self.assertEqual(self.rootFolderSubscriber.notified, 0)
-
- def testPromotingEventPublisher1(self):
- # test to see if events are passed on to a parent event service with
- # the appropriate isPromotableEvent setting
- self._createHubIdSubscribers()
- self._createAlternateService(EventService())
- publish(self.folder2, ObjectAddedEvent(foo))
- self.assertEqual(self.folder2Subscriber.notified, 1)
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
-
- def testUnbindingResubscribing1(self):
- # an event service is unbound; a lower event service should then
- # rebind to upper event service
- self._createNestedServices()
- self._createHubIdSubscribers()
- subscribe(
- self.rootFolderSubscriber,
- event_type=IObjectAddedEvent
- )
- subscribe(
- self.folder1Subscriber,
- event_type=IObjectAddedEvent
- )
- subscribe(
- self.folder1_1Subscriber,
- event_type=IObjectAddedEvent
- )
-
- sm = traverse(self.rootFolder, "folder1/++etc++site")
- registration = sm.queryRegistrations(EventPublication).active()
- registration.status = RegisteredStatus
- publish(self.rootFolder, ObjectAddedEvent(foo))
- self.assertEqual(self.folder1Subscriber.notified, 1)
- self.assertEqual(self.folder1_1Subscriber.notified, 1)
-
- registration = sm.queryRegistrations(EventSubscription).active()
- registration.status = RegisteredStatus
-
- publish(self.rootFolder, ObjectAddedEvent(foo))
- self.assertEqual(self.folder1Subscriber.notified, 1)
- self.assertEqual(self.folder1_1Subscriber.notified, 2)
-
- def testNoSubscribeOnBind(self):
- # if subscribeOnBind is 0, service should not subscribe to parent
- sv = EventService()
- sv.subscribeOnBind = 0
- self._createHubIdSubscribers()
- self._createAlternateService(sv)
- publish(self.rootFolder, ObjectAddedEvent(foo))
- self.assertEqual(self.folder2Subscriber.notified, 0)
- self.assertEqual(self.rootFolderSubscriber.notified, 1)
-
- sm = traverse(self.rootFolder, "folder2/++etc++site")
- registration = sm.queryRegistrations(EventSubscription).active()
- # make sure it doesn't raise any errors
- registration.status = RegisteredStatus
- registration = sm.queryRegistrations(EventPublication).active()
- # make sure it doesn't raise any errors
- registration.status = RegisteredStatus
-
- def testSubscriptionAwareInteraction(self):
- adapter = SubscribingAwareAdapter()
- ztapi.provideAdapter(IHasSubscribingAwareAdapter,
- ISubscribingAware,
- adapter)
- self.rootFolder["mySubscriber"] = HasSubscribingAwareAdapter()
- self.mySubscriber = self.rootFolder["mySubscriber"]
- filter = DummyFilter()
- subscribe(
- self.mySubscriber,
- event_type=IObjectAddedEvent,
- filter=filter)
- adapter.check(
- ('subscribed',
- self.mySubscriber,
- getSubscriptionService(self.rootFolder),
- IObjectAddedEvent,
- filter
- )
- )
- #self.assertEqual(
- # self.mySubscriber.subscribable,
- # getEventService(self.rootFolder))
- #self.assertEqual(
- # self.mySubscriber.subscribable,
- # getSubscriptionService(self.rootFolder))
- #self.assertEqual(
- # self.mySubscriber.event_type,
- # IObjectAddedEvent)
- #self.assertEqual(
- # self.mySubscriber.filter,
- # filter)
- unsubscribe(
- self.mySubscriber,
- event_type=IObjectAddedEvent,
- filter=filter)
- adapter.check(
- ('subscribed',
- self.mySubscriber,
- getSubscriptionService(self.rootFolder),
- IObjectAddedEvent,
- filter
- ),
- ('unsubscribed',
- self.mySubscriber,
- getSubscriptionService(self.rootFolder),
- IObjectAddedEvent,
- filter
- )
- )
-
-
-
-def test_suite():
- loader = TestLoader()
- return loader.loadTestsFromTestCase(TestEventPublisher)
-
-if __name__=='__main__':
- TextTestRunner().run(test_suite())
Deleted: Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_logger.py
===================================================================
--- Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_logger.py 2004-05-26 11:28:36 UTC (rev 25000)
+++ Zope3/branches/jim-simplifyevents/src/zope/app/event/tests/test_logger.py 2004-05-26 11:30:37 UTC (rev 25001)
@@ -1,116 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""
-$Id$
-"""
-
-import unittest
-import logging
-
-from zope.component.tests.placelesssetup import PlacelessSetup
-from zope.component import getGlobalServices
-from zope.app.servicenames import EventPublication
-
-from zope.app.event import globalSubscribe, globalUnsubscribe, publish
-from zope.app.container.contained import ObjectAddedEvent
-from zope.app.event.globalservice import Logger, eventPublisher
-from zope.app.event.interfaces import IPublisher
-
-class DopeyHandler(logging.Handler):
-
- def __init__(self):
- logging.Handler.__init__(self)
- self.results = []
-
- def emit(self, record):
- self.results.append(record)
-
-class TestLogger1(PlacelessSetup,unittest.TestCase):
-
- eventlogger = Logger()
-
- def setUp(self):
- super(TestLogger1, self).setUp()
- getGlobalServices().defineService(EventPublication, IPublisher)
- getGlobalServices().provideService(EventPublication, eventPublisher)
- # futz a handler in for testing
- self.logger = logging.getLogger("Event.Logger")
- self.oldlevel = self.logger.level
- self.oldprop = self.logger.propagate
- self.logger.propagate = False
- self.logger.setLevel(logging.DEBUG)
- self.handler = DopeyHandler()
- self.logger.addHandler(self.handler)
- # register a logger
- globalSubscribe(self.eventlogger)
- # send an event
- publish(None, ObjectAddedEvent(None, "parent", 'foo'))
-
- def tearDown(self):
- globalUnsubscribe(self.eventlogger)
- self.logger.removeHandler(self.handler)
- self.logger.setLevel(self.oldlevel)
- self.logger.propagate = self.oldprop
- PlacelessSetup.tearDown(self)
-
- def testLogger(self):
- # Test the logger logs appropriately
- results = self.handler.results
- self.assertEqual(len(results), 1)
- result = results[0]
- self.assertEqual(result.name, "Event.Logger")
- self.assertEqual(result.levelno, logging.INFO)
- self.assertEqual(
- result.getMessage(),
- "zope.app.container.contained.ObjectAddedEvent: ["
- "('newName', 'foo'),\n "
- "('newParent', 'parent'),\n "
- "('object', None),\n "
- "('oldName', None),\n "
- "('oldParent', None)"
- "]\n"
- )
- self.assertEqual(result.exc_info, None)
-
-class TestLogger2(TestLogger1):
-
- eventlogger = Logger(logging.CRITICAL)
-
- def testLogger(self):
- # Test the logger logs appropriately
- results = self.handler.results
- self.assertEqual(len(results), 1)
- result = results[0]
- self.assertEqual(result.name, "Event.Logger")
- self.assertEqual(result.levelno, logging.CRITICAL)
- self.assertEqual(
- result.getMessage(),
- "zope.app.container.contained.ObjectAddedEvent: ["
- "('newName', 'foo'),\n "
- "('newParent', 'parent'),\n "
- "('object', None),\n "
- "('oldName', None),\n "
- "('oldParent', None)"
- "]\n"
- )
- self.assertEqual(result.exc_info, None)
-
-def test_suite():
- return unittest.TestSuite([
- unittest.makeSuite(TestLogger1),
- unittest.makeSuite(TestLogger2),
- ])
-
-if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')
More information about the Zope3-Checkins
mailing list