[Zope3-checkins] CVS: Zope3/src/zope/app/event/tests - test_globaleventchannel.py:1.2
Steve Alexander
steve@cat-box.net
Mon, 27 Jan 2003 13:17:00 -0500
Update of /cvs-repository/Zope3/src/zope/app/event/tests
In directory cvs.zope.org:/tmp/cvs-serv10963/src/zope/app/event/tests
Modified Files:
test_globaleventchannel.py
Log Message:
oops, forgot this file.
=== Zope3/src/zope/app/event/tests/test_globaleventchannel.py 1.1 => 1.2 ===
--- Zope3/src/zope/app/event/tests/test_globaleventchannel.py:1.1 Mon Dec 30 09:03:05 2002
+++ Zope3/src/zope/app/event/tests/test_globaleventchannel.py Mon Jan 27 13:16:57 2003
@@ -17,7 +17,11 @@
"""
from unittest import TestCase, TestSuite, main, makeSuite
-from zope.app.interfaces.event import IEvent
+from zope.app.interfaces.event import IEvent, ISubscribingAware, ISubscriber
+from zope.interface import Interface
+from zope.component.tests.placelesssetup import PlacelessSetup
+from zope.component.tests.components import RecordingAdapter
+from zope.component.adapter import provideAdapter
class ISomeEvent(IEvent):
pass
@@ -28,6 +32,12 @@
class ISomeOtherEvent(IEvent):
pass
+class ISubscriberStub(ISubscriber):
+ pass
+
+class INonSubscriberStub(Interface):
+ pass
+
class SomeEvent:
__implements__ = ISomeEvent
@@ -38,11 +48,16 @@
__implements__ = ISomeOtherEvent
class SubscriberStub:
+ __implements__ = ISubscriberStub
received = None
def notify(self, event):
self.received = event
-class Test(TestCase):
+class NonSubscriberStub:
+ __implements__ = INonSubscriberStub
+
+
+class Test(PlacelessSetup, TestCase):
def test_notify(self):
from zope.app.event.globalservice import GlobalEventChannel
@@ -89,10 +104,103 @@
self.assertEquals(subscriber.received, None,
"Event was not filtered")
+class SubAware(RecordingAdapter):
+ __implements__ = ISubscribingAware
+
+ def subscribedTo(self, subscribable, event_type, filter):
+ self.record.append(('subscribed', self.context, subscribable,
+ event_type, filter))
+
+ def unsubscribedFrom(self, subscribable, event_type, filter):
+ self.record.append(('unsubscribed', self.context, subscribable,
+ event_type, filter))
+
+
+class TestSubscribingAwareChannel(PlacelessSetup, TestCase):
+
+ def setUpChannel(self):
+ from zope.app.event.globalservice import GlobalEventChannel
+ self.ec = GlobalEventChannel()
+
+ def setUp(self):
+ PlacelessSetup.setUp(self)
+ self.setUpChannel()
+ self.subscriber = SubscriberStub()
+ self.filter = lambda x: True
+ self.subaware = SubAware()
+ provideAdapter(ISubscriberStub, ISubscribingAware, self.subaware)
+
+ def test_subscribe(self):
+ self.ec.globalSubscribe(self.subscriber, ISomeEvent, self.filter)
+ self.subaware.check(
+ ('subscribed', self.subscriber, self.ec, ISomeEvent, self.filter)
+ )
+
+ def test_unsubscribe(self):
+ self.test_subscribe()
+ self.ec.unsubscribe(self.subscriber, ISomeEvent, self.filter)
+ self.subaware.check(
+ ('subscribed', self.subscriber, self.ec, ISomeEvent, self.filter),
+ ('unsubscribed', self.subscriber, self.ec, ISomeEvent, self.filter),
+ )
+
+class TestSubscribingAwareGlobalPublisher(TestSubscribingAwareChannel):
+
+ def setUpChannel(self):
+ from zope.app.event.globalservice import GlobalEventPublisher
+ self.ec = GlobalEventPublisher()
+
+class SubscriberAdapter(RecordingAdapter):
+ __implements__ = ISubscriber
+
+ def notify(self, event):
+ self.record.append(('notified', self.context, event))
+
+class TestAdaptingToISubscriberBase(PlacelessSetup, TestCase):
+
+ def setUpChannel(self):
+ raise NotImplementedError('You need to write a setUpChannel method.')
+
+ def setUp(self):
+ PlacelessSetup.setUp(self)
+ self.setUpChannel()
+ self.subscriber = NonSubscriberStub()
+ self.event = SomeEvent()
+ self.adapter = SubscriberAdapter()
+ provideAdapter(INonSubscriberStub, ISubscriber, self.adapter)
+
+class TestAdaptingToISubscriberOnNotify(TestAdaptingToISubscriberBase):
+ def setUpChannel(self):
+ from zope.app.event.globalservice import GlobalEventChannel
+ self.ec = GlobalEventChannel()
+
+ def test_notify(self):
+ self.ec.globalSubscribe(self.subscriber, ISomeEvent)
+ self.ec.notify(self.event)
+ self.adapter.check(
+ ('notified', self.subscriber, self.event)
+ )
+
+class TestAdaptingToISubscriberOnPublish(TestAdaptingToISubscriberBase):
+ def setUpChannel(self):
+ from zope.app.event.globalservice import GlobalEventPublisher
+ self.ec = GlobalEventPublisher()
+
+ def test_notify(self):
+ self.ec.globalSubscribe(self.subscriber, ISomeEvent)
+ self.ec.publish(self.event)
+ self.adapter.check(
+ ('notified', self.subscriber, self.event)
+ )
+
def test_suite():
return TestSuite((
makeSuite(Test),
+ makeSuite(TestSubscribingAwareChannel),
+ makeSuite(TestSubscribingAwareGlobalPublisher),
+ makeSuite(TestAdaptingToISubscriberOnNotify),
+ makeSuite(TestAdaptingToISubscriberOnPublish),
))
if __name__=='__main__':