[Zope3-checkins] CVS: Zope3/src/zope/app/workflow/stateful/tests - test_contentworkflow.py:1.3 test_xmlimportexport.py:1.5
Stephan Richter
srichter@cosmos.phy.tufts.edu
Tue, 29 Jul 2003 20:00:33 -0400
Update of /cvs-repository/Zope3/src/zope/app/workflow/stateful/tests
In directory cvs.zope.org:/tmp/cvs-serv20793/src/zope/app/workflow/stateful/tests
Modified Files:
test_contentworkflow.py test_xmlimportexport.py
Log Message:
Workflow enhancements to make them more useful. (More to come)
- ContentWorkflowsUtility has been renamed to ContentWorkflowsManager
- Specific Process Definitions can now be mapped to interfaces (later
content types), so that a content type only receives the workflows that
were inteneded for it.
- Supplied a management screen for the above.
- Wrote a bunch of tests for ContentWorkflowsManager.
- Added a default screen to the Workflow Service, which lists all registered
process definitions.
=== Zope3/src/zope/app/workflow/stateful/tests/test_contentworkflow.py 1.2 => 1.3 ===
--- Zope3/src/zope/app/workflow/stateful/tests/test_contentworkflow.py:1.2 Tue Jun 3 18:46:23 2003
+++ Zope3/src/zope/app/workflow/stateful/tests/test_contentworkflow.py Tue Jul 29 20:00:28 2003
@@ -11,35 +11,179 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-
-"""Stateful content workflow utility.
+"""Stateful content workflow manager.
$Id$
"""
import unittest
+from zope.app import zapi
+from zope.app.event.objectevent import ObjectCreatedEvent
+from zope.app.event.tests.placelesssetup import \
+ eventPublisher, EventRecorder, events, clearEvents
+from zope.app.interfaces.annotation import IAnnotatable, IAttributeAnnotatable
+from zope.app.interfaces.event import IObjectCreatedEvent
+from zope.app.interfaces.services.event import ISubscriptionService
+from zope.app.interfaces.services.registration import ActiveStatus
+from zope.app.interfaces.workflow import IProcessInstanceContainerAdaptable
+from zope.app.interfaces.workflow import IProcessInstanceContainer
+from zope.app.interfaces.workflow.stateful import IContentWorkflowsManager
+from zope.app.services.event import EventService
+from zope.app.workflow.instance import ProcessInstanceContainerAdapter
+from zope.app.workflow.service import ProcessDefinitionRegistration
+from zope.app.services.servicenames import EventSubscription
+from zope.app.workflow.stateful.contentworkflow import ContentWorkflowsManager
+from zope.app.workflow.tests.workflowsetup import WorkflowSetup
+from zope.app.workflow.tests.test_service import DummyProcessDefinition
+from zope.component import getServiceManager
+from zope.component.adapter import provideAdapter
+from zope.interface import Interface, implements
from zope.interface.verify import verifyClass
-from zope.app.interfaces.workflow.stateful import IContentWorkflowsUtility
-from zope.app.workflow.stateful.contentworkflow import ContentWorkflowsUtility
+from zope.app.tests import setup
+class IFace1(Interface):
+ pass
-# XXX How to test this without fake a hole zope3-server ?????
-class ContentWorkflowsUtilityTests(unittest.TestCase):
+class IFace2(Interface):
+ pass
- def testInterface(self):
- verifyClass(IContentWorkflowsUtility,
- ContentWorkflowsUtility)
+class IFace3(Interface):
+ pass
+
+class TestObject1(object):
+ implements(IFace1, IProcessInstanceContainerAdaptable,
+ IAttributeAnnotatable)
+
+class TestObject2(object):
+ implements(IFace2, IProcessInstanceContainerAdaptable,
+ IAttributeAnnotatable)
+
+class TestObject3(object):
+ implements(IFace3, IProcessInstanceContainerAdaptable,
+ IAttributeAnnotatable)
+class ContentWorkflowsManagerTest(WorkflowSetup, unittest.TestCase):
+
+ def setUp(self):
+ WorkflowSetup.setUp(self)
+ sm = getServiceManager(None)
+ sm.defineService(EventSubscription, ISubscriptionService)
+ self.events = EventService()
+ setup.addService(self.sm, EventSubscription, self.events)
+ clearEvents()
+ eventPublisher.globalSubscribe(EventRecorder)
+ provideAdapter(IAnnotatable, IProcessInstanceContainer,
+ ProcessInstanceContainerAdapter)
+
+ def testInterface(self):
+ verifyClass(IContentWorkflowsManager, ContentWorkflowsManager)
+
+ def getManager(self):
+ manager = ContentWorkflowsManager()
+ manager._registry = {IFace1: ('default',), IFace2: ('default',)}
+ self.default.setObject('manager', manager)
+ return zapi.traverse(self.default, 'manager')
+
+ def test_subscribe(self):
+ manager = self.getManager()
+ self.assertEqual(manager.currentlySubscribed, False)
+ manager.subscribe()
+ self.assertEqual(manager.currentlySubscribed, True)
+ self.assertEqual(self.events._registry._reg.keys()[0],
+ IObjectCreatedEvent)
+ self.assertEqual(self.events._registry._reg.values()[0][0][0],
+ u'/++etc++site/default/manager')
+
+ def test_unsubscribe(self):
+ manager = self.getManager()
+ self.assertEqual(manager.currentlySubscribed, False)
+ manager.subscribe()
+ manager.unsubscribe()
+ self.assertEqual(manager.currentlySubscribed, False)
+ self.assertEqual(len(self.events._registry._reg.values()), 0)
+
+ def test_isSubscribed(self):
+ manager = self.getManager()
+ self.assertEqual(manager.isSubscribed(), False)
+ manager.subscribe()
+ self.assertEqual(manager.isSubscribed(), True)
+ manager.unsubscribe()
+ self.assertEqual(manager.isSubscribed(), False)
+
+ def test_getProcessDefinitionNamesForObject(self):
+ manager = self.getManager()
+ self.assertEqual(
+ manager.getProcessDefinitionNamesForObject(TestObject1()),
+ ('default',))
+ self.assertEqual(
+ manager.getProcessDefinitionNamesForObject(TestObject2()),
+ ('default',))
+ self.assertEqual(
+ manager.getProcessDefinitionNamesForObject(TestObject3()),
+ ())
+
+ def test_register(self):
+ manager = self.getManager()
+ manager._registry = {}
+ manager.register(IFace1, 'default')
+ self.assertEqual(manager._registry, {IFace1: ('default',)})
+
+ def test_unregister(self):
+ manager = self.getManager()
+ manager.unregister(IFace1, 'default')
+ self.assertEqual(manager._registry, {IFace2: ('default',)})
+
+ def test_getProcessNamesForInterface(self):
+ manager = self.getManager()
+ self.assertEqual(
+ manager.getProcessNamesForInterface(IFace1),
+ ('default',))
+ self.assertEqual(
+ manager.getProcessNamesForInterface(IFace2),
+ ('default',))
+ self.assertEqual(
+ manager.getProcessNamesForInterface(IFace3),
+ ())
+
+ def test_getInterfacesForProcessName(self):
+ manager = self.getManager()
+ self.assertEqual(
+ manager.getInterfacesForProcessName(u'default'),
+ (IFace2, IFace1))
+ self.assertEqual(
+ manager.getInterfacesForProcessName(u'foo'),
+ ())
+
+ def test_notify(self):
+ # setup ProcessDefinitions
+ self.default.setObject('pd1', DummyProcessDefinition(1))
+ self.default.setObject('pd2', DummyProcessDefinition(2))
+
+ id = self.cm.setObject('', ProcessDefinitionRegistration('definition1',
+ '/++etc++site/default/pd1'))
+ zapi.traverse(self.default.getRegistrationManager(),
+ id).status = ActiveStatus
+ id = self.cm.setObject('', ProcessDefinitionRegistration('definition2',
+ '/++etc++site/default/pd2'))
+ zapi.traverse(self.default.getRegistrationManager(),
+ id).status = ActiveStatus
+ manager = self.getManager()
+ manager._registry = {IFace1: ('definition1',),
+ IFace2: ('definition1', 'definition2')}
+
+ obj = TestObject2()
+ manager.notify(ObjectCreatedEvent(obj))
+ pi = obj.__annotations__['zope.app.worfklow.ProcessInstanceContainer']
+ self.assertEqual(pi.keys(), ['definition2', 'definition1'])
def test_suite():
- suite = unittest.TestSuite()
- suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(
- ContentWorkflowsUtilityTests))
- return suite
+ return unittest.TestSuite((
+ unittest.makeSuite(ContentWorkflowsManagerTest),
+ ))
if __name__ == '__main__':
unittest.main()
=== Zope3/src/zope/app/workflow/stateful/tests/test_xmlimportexport.py 1.4 => 1.5 ===
--- Zope3/src/zope/app/workflow/stateful/tests/test_xmlimportexport.py:1.4 Sat Jun 21 17:22:17 2003
+++ Zope3/src/zope/app/workflow/stateful/tests/test_xmlimportexport.py Tue Jul 29 20:00:28 2003
@@ -169,8 +169,9 @@
def test_suite():
- loader=unittest.TestLoader()
- return loader.loadTestsFromTestCase(Test)
+ return unittest.TestSuite((
+ unittest.makeSuite(Test),
+ ))
if __name__=='__main__':
unittest.TextTestRunner().run(test_suite())