[CMF-checkins] SVN: CMF/branches/yuppie-workflow_setup/ - moved
WorkflowTool setup to CMFCore
Yvo Schubbe
y.2005- at wcm-solutions.de
Tue Nov 22 11:12:02 EST 2005
Log message for revision 40321:
- moved WorkflowTool setup to CMFCore
- moved DCWorkflowDefinition setup to DCWorkflow
- use PythonScripts adapter from GenericSetup
Changed:
U CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/configure.zcml
A CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/tests/test_workflow.py
A CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/workflow.py
U CMF/branches/yuppie-workflow_setup/CMFSetup/DEPENDENCIES.txt
D CMF/branches/yuppie-workflow_setup/CMFSetup/Extensions/
D CMF/branches/yuppie-workflow_setup/CMFSetup/tests/test_workflow.py
U CMF/branches/yuppie-workflow_setup/CMFSetup/utils.py
U CMF/branches/yuppie-workflow_setup/CMFSetup/workflow.py
D CMF/branches/yuppie-workflow_setup/CMFSetup/xml/wtcToolExport.xml
D CMF/branches/yuppie-workflow_setup/CMFSetup/xml/wtcWorkflowExport.xml
U CMF/branches/yuppie-workflow_setup/DCWorkflow/DCWorkflow.py
A CMF/branches/yuppie-workflow_setup/DCWorkflow/Extensions/
U CMF/branches/yuppie-workflow_setup/DCWorkflow/configure.zcml
A CMF/branches/yuppie-workflow_setup/DCWorkflow/exportimport.py
A CMF/branches/yuppie-workflow_setup/DCWorkflow/interfaces.py
A CMF/branches/yuppie-workflow_setup/DCWorkflow/tests/test_exportimport.py
U CMF/branches/yuppie-workflow_setup/DCWorkflow/utils.py
A CMF/branches/yuppie-workflow_setup/DCWorkflow/xml/
A CMF/branches/yuppie-workflow_setup/DCWorkflow/xml/wtcWorkflowExport.xml
-=-
Modified: CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/configure.zcml
===================================================================
--- CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/configure.zcml 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/configure.zcml 2005-11-22 16:12:02 UTC (rev 40321)
@@ -1,7 +1,5 @@
<configure
xmlns="http://namespaces.zope.org/zope"
- xmlns:five="http://namespaces.zope.org/five"
- i18n_domain="cmf"
>
<adapter
@@ -124,4 +122,11 @@
for="Products.CMFCore.interfaces.ITypeInformation"
/>
+ <adapter
+ factory=".workflow.WorkflowToolXMLAdapter"
+ provides="Products.GenericSetup.interfaces.IBody"
+ for="Products.CMFCore.interfaces.IWorkflowTool
+ Products.GenericSetup.interfaces.ISetupContext"
+ />
+
</configure>
Added: CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/tests/test_workflow.py
===================================================================
--- CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/tests/test_workflow.py 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/tests/test_workflow.py 2005-11-22 16:12:02 UTC (rev 40321)
@@ -0,0 +1,396 @@
+##############################################################################
+#
+# Copyright (c) 2005 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Workflow tool xml adapter and setup handler unit tests.
+
+$Id$
+"""
+
+import unittest
+import Testing
+
+import Products
+from OFS.Folder import Folder
+from OFS.SimpleItem import SimpleItem
+from Products.Five import zcml
+from zope.interface import implements
+
+from Products.CMFCore.interfaces import IWorkflowDefinition
+from Products.CMFCore.interfaces import IWorkflowTool
+from Products.CMFCore.tests.base.testcase import PlacelessSetup
+from Products.GenericSetup.testing import BodyAdapterTestCase
+from Products.GenericSetup.tests.common import BaseRegistryTests
+from Products.GenericSetup.tests.common import DummyExportContext
+from Products.GenericSetup.tests.common import DummyImportContext
+
+
+_DUMMY_ZCML = """\
+<configure
+ xmlns="http://namespaces.zope.org/zope"
+ xmlns:five="http://namespaces.zope.org/five"
+ i18n_domain="dummy">
+ <permission id="dummy.add" title="Add Dummy Workflow"/>
+ <five:registerClass
+ class="Products.CMFCore.exportimport.tests.test_workflow.DummyWorkflow"
+ meta_type="Dummy Workflow"
+ permission="dummy.add"
+ addview="addDummyWorkflow.html"
+ global="false"
+ />
+</configure>
+"""
+
+_WORKFLOWTOOL_BODY = """\
+<?xml version="1.0"?>
+<object name="portal_workflow" meta_type="CMF Workflow Tool">
+ <property name="title"></property>
+ <object name="foo_workflow" meta_type="Dummy Workflow"/>
+ <bindings>
+ <default>
+ <bound-workflow workflow_id="foo_workflow"/>
+ </default>
+ <type type_id="Foo Type"/>
+ </bindings>
+</object>
+"""
+
+_EMPTY_TOOL_EXPORT = """\
+<?xml version="1.0"?>
+<object name="portal_workflow" meta_type="Dummy Workflow Tool">
+ <property name="title"></property>
+ <bindings>
+ <default/>
+ </bindings>
+</object>
+"""
+
+_BINDINGS_TOOL_EXPORT = """\
+<?xml version="1.0"?>
+<object name="portal_workflow" meta_type="Dummy Workflow Tool">
+ <bindings>
+ <default>
+ <bound-workflow workflow_id="non_dcworkflow_0"/>
+ <bound-workflow workflow_id="non_dcworkflow_1"/>
+ </default>
+ <type type_id="sometype">
+ <bound-workflow workflow_id="non_dcworkflow_2"/>
+ </type>
+ <type type_id="anothertype">
+ <bound-workflow workflow_id="non_dcworkflow_3"/>
+ </type>
+ </bindings>
+</object>
+"""
+
+_NORMAL_TOOL_EXPORT = """\
+<?xml version="1.0"?>
+<object name="portal_workflow" meta_type="Dummy Workflow Tool">
+ <property name="title"></property>
+ <object name="Non-DCWorkflow" meta_type="Dummy Workflow"/>
+ <bindings>
+ <default/>
+ </bindings>
+</object>
+"""
+
+_EMPTY_TOOL_EXPORT_V1 = """\
+<?xml version="1.0"?>
+<workflow-tool>
+ <bindings>
+ <default>
+ </default>
+ </bindings>
+</workflow-tool>
+"""
+
+_BINDINGS_TOOL_EXPORT_V1 = """\
+<?xml version="1.0"?>
+<workflow-tool>
+ <bindings>
+ <default>
+ <bound-workflow workflow_id="non_dcworkflow_0" />
+ <bound-workflow workflow_id="non_dcworkflow_1" />
+ </default>
+ <type type_id="sometype">
+ <bound-workflow workflow_id="non_dcworkflow_2" />
+ </type>
+ <type type_id="anothertype">
+ <bound-workflow workflow_id="non_dcworkflow_3" />
+ </type>
+ </bindings>
+</workflow-tool>
+"""
+
+
+class DummyWorkflowTool(Folder):
+
+ implements(IWorkflowTool)
+
+ meta_type = 'Dummy Workflow Tool'
+
+ def __init__(self, id='portal_workflow'):
+ Folder.__init__(self, id)
+ self._default_chain = ()
+ self._chains_by_type = {}
+
+ def getWorkflowIds(self):
+ return self.objectIds()
+
+ def getWorkflowById(self, workflow_id):
+ return self._getOb(workflow_id)
+
+ def setDefaultChain(self, chain):
+ chain = chain.replace(',', ' ')
+ self._default_chain = tuple(chain.split())
+
+ def setChainForPortalTypes(self, pt_names, chain, verify=True):
+ chain = chain.replace(',', ' ')
+ chain = tuple(chain.split())
+
+ if self._chains_by_type is None:
+ self._chains_by_type = {}
+
+ for pt_name in pt_names:
+ self._chains_by_type[pt_name] = chain
+
+
+class DummyWorkflow(SimpleItem):
+
+ implements(IWorkflowDefinition)
+
+ meta_type = 'Dummy Workflow'
+
+ def __init__(self, id):
+ self._id = id
+
+ def getId(self):
+ return self._id
+
+
+class WorkflowToolXMLAdapterTests(BodyAdapterTestCase):
+
+ def _getTargetClass(self):
+ from Products.CMFCore.exportimport.workflow \
+ import WorkflowToolXMLAdapter
+
+ return WorkflowToolXMLAdapter
+
+ def _populate(self, obj):
+ obj._setObject('foo_workflow', DummyWorkflow('foo_workflow'))
+ obj.setDefaultChain('foo_workflow')
+ obj.setChainForPortalTypes(('Foo Type',), '', verify=False)
+
+ def setUp(self):
+ import Products.CMFCore.exportimport
+ from Products.CMFCore.WorkflowTool import WorkflowTool
+
+ BodyAdapterTestCase.setUp(self)
+ zcml.load_config('configure.zcml', Products.CMFCore.exportimport)
+ zcml.load_string(_DUMMY_ZCML)
+
+ self._obj = WorkflowTool()
+ self._BODY = _WORKFLOWTOOL_BODY
+
+
+class _WorkflowSetup(PlacelessSetup, BaseRegistryTests):
+
+ def _initSite(self):
+ self.root.site = Folder(id='site')
+ site = self.root.site
+ self.root.site.portal_workflow = DummyWorkflowTool()
+ return site
+
+ def setUp(self):
+ PlacelessSetup.setUp(self)
+ BaseRegistryTests.setUp(self)
+ zcml.load_config('meta.zcml', Products.Five)
+ zcml.load_config('configure.zcml', Products.CMFCore.exportimport)
+
+ def tearDown(self):
+ BaseRegistryTests.tearDown(self)
+ PlacelessSetup.tearDown(self)
+
+class exportWorkflowToolTests(_WorkflowSetup):
+
+ def test_empty(self):
+ from Products.CMFCore.exportimport.workflow import exportWorkflowTool
+
+ site = self._initSite()
+ context = DummyExportContext(site)
+ exportWorkflowTool(context)
+
+ self.assertEqual(len(context._wrote), 1)
+ filename, text, content_type = context._wrote[0]
+ self.assertEqual(filename, 'workflows.xml')
+ self._compareDOM(text, _EMPTY_TOOL_EXPORT)
+ self.assertEqual(content_type, 'text/xml')
+
+ def test_normal(self):
+ from Products.CMFCore.exportimport.workflow import exportWorkflowTool
+
+ WF_ID_NON = 'non_dcworkflow'
+ WF_TITLE_NON = 'Non-DCWorkflow'
+
+ site = self._initSite()
+
+ wf_tool = site.portal_workflow
+ nondcworkflow = DummyWorkflow(WF_TITLE_NON)
+ nondcworkflow.title = WF_TITLE_NON
+ wf_tool._setObject(WF_ID_NON, nondcworkflow)
+
+ context = DummyExportContext(site)
+ exportWorkflowTool(context)
+
+ self.assertEqual(len(context._wrote), 1)
+
+ filename, text, content_type = context._wrote[0]
+ self.assertEqual(filename, 'workflows.xml')
+ self._compareDOM(text, _NORMAL_TOOL_EXPORT)
+ self.assertEqual(content_type, 'text/xml')
+
+
+class importWorkflowToolTests(_WorkflowSetup):
+
+ _BINDINGS_TOOL_EXPORT = _BINDINGS_TOOL_EXPORT
+ _EMPTY_TOOL_EXPORT = _EMPTY_TOOL_EXPORT
+
+ def test_empty_default_purge(self):
+ from Products.CMFCore.exportimport.workflow import importWorkflowTool
+
+ WF_ID_NON = 'non_dcworkflow_%s'
+ WF_TITLE_NON = 'Non-DCWorkflow #%s'
+
+ site = self._initSite()
+ wf_tool = site.portal_workflow
+
+ for i in range(4):
+ nondcworkflow = DummyWorkflow(WF_TITLE_NON % i)
+ nondcworkflow.title = WF_TITLE_NON % i
+ wf_tool._setObject(WF_ID_NON % i, nondcworkflow)
+
+ wf_tool._default_chain = (WF_ID_NON % 1,)
+ wf_tool._chains_by_type['sometype'] = (WF_ID_NON % 2,)
+ self.assertEqual(len(wf_tool.objectIds()), 4)
+
+ context = DummyImportContext(site)
+ context._files['workflows.xml'] = self._EMPTY_TOOL_EXPORT
+
+ importWorkflowTool(context)
+
+ self.assertEqual(len(wf_tool.objectIds()), 0)
+ self.assertEqual(len(wf_tool._default_chain), 0)
+ self.assertEqual(len(wf_tool._chains_by_type), 0)
+
+ def test_empty_explicit_purge(self):
+ from Products.CMFCore.exportimport.workflow import importWorkflowTool
+
+ WF_ID_NON = 'non_dcworkflow_%s'
+ WF_TITLE_NON = 'Non-DCWorkflow #%s'
+
+ site = self._initSite()
+ wf_tool = site.portal_workflow
+
+ for i in range(4):
+ nondcworkflow = DummyWorkflow(WF_TITLE_NON % i)
+ nondcworkflow.title = WF_TITLE_NON % i
+ wf_tool._setObject(WF_ID_NON % i, nondcworkflow)
+
+ wf_tool._default_chain = (WF_ID_NON % 1,)
+ wf_tool._chains_by_type['sometype'] = (WF_ID_NON % 2,)
+ self.assertEqual(len(wf_tool.objectIds()), 4)
+
+ context = DummyImportContext(site, True)
+ context._files['workflows.xml'] = self._EMPTY_TOOL_EXPORT
+ importWorkflowTool(context)
+
+ self.assertEqual(len(wf_tool.objectIds()), 0)
+ self.assertEqual(len(wf_tool._default_chain), 0)
+ self.assertEqual(len(wf_tool._chains_by_type), 0)
+
+ def test_empty_skip_purge(self):
+ from Products.CMFCore.exportimport.workflow import importWorkflowTool
+
+ WF_ID_NON = 'non_dcworkflow_%s'
+ WF_TITLE_NON = 'Non-DCWorkflow #%s'
+
+ site = self._initSite()
+ wf_tool = site.portal_workflow
+
+ for i in range(4):
+ nondcworkflow = DummyWorkflow(WF_TITLE_NON % i)
+ nondcworkflow.title = WF_TITLE_NON % i
+ wf_tool._setObject(WF_ID_NON % i, nondcworkflow)
+
+ wf_tool._default_chain = (WF_ID_NON % 1,)
+ wf_tool._chains_by_type['sometype'] = (WF_ID_NON % 2,)
+ self.assertEqual(len(wf_tool.objectIds()), 4)
+
+ context = DummyImportContext(site, False)
+ context._files['typestool.xml'] = self._EMPTY_TOOL_EXPORT
+ importWorkflowTool(context)
+
+ self.assertEqual(len(wf_tool.objectIds()), 4)
+ self.assertEqual(len(wf_tool._default_chain), 1)
+ self.assertEqual(wf_tool._default_chain[0], WF_ID_NON % 1)
+ self.assertEqual(len(wf_tool._chains_by_type), 1)
+ self.assertEqual(wf_tool._chains_by_type['sometype'],
+ (WF_ID_NON % 2,))
+
+ def test_bindings_skip_purge(self):
+ from Products.CMFCore.exportimport.workflow import importWorkflowTool
+
+ WF_ID_NON = 'non_dcworkflow_%s'
+ WF_TITLE_NON = 'Non-DCWorkflow #%s'
+
+ site = self._initSite()
+ wf_tool = site.portal_workflow
+
+ for i in range(4):
+ nondcworkflow = DummyWorkflow(WF_TITLE_NON % i)
+ nondcworkflow.title = WF_TITLE_NON % i
+ wf_tool._setObject(WF_ID_NON % i, nondcworkflow)
+
+ wf_tool._default_chain = (WF_ID_NON % 1,)
+ wf_tool._chains_by_type['sometype'] = (WF_ID_NON % 2,)
+ self.assertEqual(len(wf_tool.objectIds()), 4)
+
+ context = DummyImportContext(site, False)
+ context._files['workflows.xml'] = self._BINDINGS_TOOL_EXPORT
+ importWorkflowTool(context)
+
+ self.assertEqual(len(wf_tool.objectIds()), 4)
+ self.assertEqual(len(wf_tool._default_chain), 2)
+ self.assertEqual(wf_tool._default_chain[0], WF_ID_NON % 0)
+ self.assertEqual(wf_tool._default_chain[1], WF_ID_NON % 1)
+ self.assertEqual(len(wf_tool._chains_by_type), 2)
+ self.assertEqual(wf_tool._chains_by_type['sometype'],
+ (WF_ID_NON % 2,))
+ self.assertEqual(wf_tool._chains_by_type['anothertype'],
+ (WF_ID_NON % 3,))
+
+
+class importWorkflowToolV1Tests(importWorkflowToolTests):
+
+ _BINDINGS_TOOL_EXPORT = _BINDINGS_TOOL_EXPORT_V1
+ _EMPTY_TOOL_EXPORT = _EMPTY_TOOL_EXPORT_V1
+
+
+def test_suite():
+ return unittest.TestSuite((
+ unittest.makeSuite(WorkflowToolXMLAdapterTests),
+ unittest.makeSuite(exportWorkflowToolTests),
+ unittest.makeSuite(importWorkflowToolTests),
+ unittest.makeSuite(importWorkflowToolV1Tests),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
Property changes on: CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/tests/test_workflow.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Added: CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/workflow.py
===================================================================
--- CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/workflow.py 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/workflow.py 2005-11-22 16:12:02 UTC (rev 40321)
@@ -0,0 +1,185 @@
+##############################################################################
+#
+# Copyright (c) 2005 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Workflow tool xml adapters and setup handlers.
+
+$Id$
+"""
+
+import Products
+from zope.app import zapi
+
+from Products.GenericSetup.interfaces import IBody
+from Products.GenericSetup.interfaces import PURGE
+from Products.GenericSetup.utils import exportObjects
+from Products.GenericSetup.utils import importObjects
+from Products.GenericSetup.utils import ObjectManagerHelpers
+from Products.GenericSetup.utils import PropertyManagerHelpers
+from Products.GenericSetup.utils import XMLAdapterBase
+
+from Products.CMFCore.interfaces import IWorkflowTool
+from Products.CMFCore.utils import getToolByName
+
+
+_FILENAME = 'workflows.xml'
+
+
+class WorkflowToolXMLAdapter(XMLAdapterBase, ObjectManagerHelpers,
+ PropertyManagerHelpers):
+
+ """Node im- and exporter for WorkflowTool.
+ """
+
+ __used_for__ = IWorkflowTool
+
+ _LOGGER_ID = 'workflow'
+
+ def exportNode(self, doc):
+ """Export the object as a DOM node.
+ """
+ self._doc = doc
+ node = self._getObjectNode('object')
+ node.appendChild(self._extractProperties())
+ node.appendChild(self._extractObjects())
+ node.appendChild(self._extractChains())
+
+ self._logger.info('Workflow tool exported.')
+ return node
+
+ def importNode(self, node, mode=PURGE):
+ """Import the object from the DOM node.
+ """
+ if mode == PURGE:
+ self._purgeProperties()
+ self._purgeObjects()
+ self._purgeChains()
+
+ self._initProperties(node, mode)
+ self._initObjects(node, mode)
+ self._initBBBObjects(node, mode)
+ self._initChains(node, mode)
+
+ self._logger.info('Workflow tool imported.')
+
+ def _extractObjects(self):
+ fragment = self._doc.createDocumentFragment()
+ objects = self.context.objectValues()
+ objects.sort(lambda x,y: cmp(x.getId(), y.getId()))
+ for obj in objects:
+ node = self._doc.createElement('object')
+ node.setAttribute('name', obj.getId())
+ node.setAttribute('meta_type', obj.meta_type)
+ fragment.appendChild(node)
+ return fragment
+
+ def _initBBBObjects(self, node, mode):
+ for child in node.childNodes:
+ if child.nodeName != 'workflow':
+ continue
+ parent = self.context
+
+ obj_id = str(child.getAttribute('workflow_id'))
+ if obj_id not in parent.objectIds():
+ meta_type = str(child.getAttribute('meta_type'))
+ for mt_info in Products.meta_types:
+ if mt_info['name'] == meta_type:
+ parent._setObject(obj_id, mt_info['instance'](obj_id))
+ break
+
+ def _extractChains(self):
+ fragment = self._doc.createDocumentFragment()
+ node = self._doc.createElement('bindings')
+ child = self._doc.createElement('default')
+ chain = self.context._default_chain
+ for workflow_id in chain:
+ sub = self._doc.createElement('bound-workflow')
+ sub.setAttribute('workflow_id', workflow_id)
+ child.appendChild(sub)
+ node.appendChild(child)
+ cbt = self.context._chains_by_type
+ if cbt:
+ overrides = cbt.items()
+ overrides.sort()
+ for type_id, chain in overrides:
+ child = self._doc.createElement('type')
+ child.setAttribute('type_id', type_id)
+ for workflow_id in chain:
+ sub = self._doc.createElement('bound-workflow')
+ sub.setAttribute('workflow_id', workflow_id)
+ child.appendChild(sub)
+ node.appendChild(child)
+ fragment.appendChild(node)
+ return fragment
+
+ def _purgeChains(self):
+ self.context.setDefaultChain('')
+ if self.context._chains_by_type is not None:
+ self.context._chains_by_type.clear()
+
+ def _initChains(self, node, mode):
+ for child in node.childNodes:
+ if child.nodeName != 'bindings':
+ continue
+ for sub in child.childNodes:
+ if sub.nodeName == 'default':
+ self.context.setDefaultChain(self._getChain(sub))
+ if sub.nodeName == 'type':
+ type_id = str(sub.getAttribute('type_id'))
+ self.context.setChainForPortalTypes((type_id,),
+ self._getChain(sub), verify=False)
+
+ def _getChain(self, node):
+ workflow_ids = []
+ for child in node.childNodes:
+ if child.nodeName != 'bound-workflow':
+ continue
+ workflow_ids.append(str(child.getAttribute('workflow_id')))
+ return ','.join(workflow_ids)
+
+
+def importWorkflowTool(context):
+ """ Import worflow tool and contained workflow definitions from XML files.
+ """
+ site = context.getSite()
+ logger = context.getLogger('workflow')
+ tool = getToolByName(site, 'portal_workflow')
+
+ body = context.readDataFile(_FILENAME)
+ if body is None:
+ logger.info('Nothing to import.')
+ return
+
+ importer = zapi.queryMultiAdapter((tool, context), IBody)
+ if importer is None:
+ logger.warning('Import adapter misssing.')
+ return
+
+ importer.body = body
+ importObjects(tool, 'workflows', context)
+
+def exportWorkflowTool(context):
+ """ Export worflow tool and contained workflow definitions as XML files.
+ """
+ site = context.getSite()
+ logger = context.getLogger('workflow')
+ tool = getToolByName(site, 'portal_workflow')
+ if tool is None:
+ logger.info('Nothing to export.')
+ return
+
+ exporter = zapi.queryMultiAdapter((tool, context), IBody)
+ if exporter is None:
+ logger.warning('Export adapter misssing.')
+ return
+
+ context.writeDataFile(_FILENAME, exporter.body, exporter.mime_type)
+ exportObjects(tool, 'workflows', context)
Property changes on: CMF/branches/yuppie-workflow_setup/CMFCore/exportimport/workflow.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Modified: CMF/branches/yuppie-workflow_setup/CMFSetup/DEPENDENCIES.txt
===================================================================
--- CMF/branches/yuppie-workflow_setup/CMFSetup/DEPENDENCIES.txt 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/CMFSetup/DEPENDENCIES.txt 2005-11-22 16:12:02 UTC (rev 40321)
@@ -1,5 +1,4 @@
Zope >= 2.8.2
Five >= 1.2
CMFCore
-DCWorkflow
GenericSetup
Deleted: CMF/branches/yuppie-workflow_setup/CMFSetup/tests/test_workflow.py
===================================================================
--- CMF/branches/yuppie-workflow_setup/CMFSetup/tests/test_workflow.py 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/CMFSetup/tests/test_workflow.py 2005-11-22 16:12:02 UTC (rev 40321)
@@ -1,2674 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2004 Zope Corporation and Contributors. All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-""" Unit tests for export / import of DCWorkflows and bindings.
-
-$Id$
-"""
-
-import unittest
-import Testing
-import Zope2
-Zope2.startup()
-
-from OFS.Folder import Folder
-from Products.PythonScripts.PythonScript import PythonScript
-from Products.ExternalMethod.ExternalMethod import ExternalMethod
-
-from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition
-from Products.DCWorkflow.Transitions import TRIGGER_USER_ACTION
-from Products.DCWorkflow.Transitions import TRIGGER_AUTOMATIC
-from Products.GenericSetup.tests.common import BaseRegistryTests
-from Products.GenericSetup.tests.common import DummyExportContext
-from Products.GenericSetup.tests.common import DummyImportContext
-
-
-class DummyWorkflowTool( Folder ):
-
- def __init__( self, id='portal_workflow' ):
- Folder.__init__( self, id )
- self._default_chain = ()
- self._chains_by_type = {}
-
- def getWorkflowIds( self ):
-
- return self.objectIds()
-
- def getWorkflowById( self, workflow_id ):
-
- return self._getOb( workflow_id )
-
- def setDefaultChain( self, chain ):
-
- chain = chain.replace( ',', ' ' )
- self._default_chain = tuple( chain.split() )
-
- def setChainForPortalTypes( self, pt_names, chain ):
-
- chain = chain.replace( ',', ' ' )
- chain = tuple( chain.split() )
-
- if self._chains_by_type is None:
- self._chains_by_type = {}
-
- for pt_name in pt_names:
- self._chains_by_type[ pt_name ] = chain
-
-class DummyWorkflow( Folder ):
-
- meta_type = 'Dummy Workflow'
-
-class _GuardChecker:
-
- def _genGuardProps( self, permissions, roles, groups, expr ):
-
- return { 'guard_permissions' : '; '.join( permissions )
- , 'guard_roles' : '; '.join( roles )
- , 'guard_groups' : '; '.join( groups )
- , 'guard_expr' : expr
- }
-
- def _assertGuard( self, info, permissions, roles, groups, expr ):
-
- self.assertEqual( len( info[ 'guard_permissions' ] )
- , len( permissions ) )
-
- for expected in permissions:
- self.failUnless( expected in info[ 'guard_permissions' ] )
-
- self.assertEqual( len( info[ 'guard_roles' ] )
- , len( roles ) )
-
- for expected in roles:
- self.failUnless( expected in info[ 'guard_roles' ] )
-
- self.assertEqual( len( info[ 'guard_groups' ] )
- , len( groups ) )
-
- for expected in groups:
- self.failUnless( expected in info[ 'guard_groups' ] )
-
- self.assertEqual( info[ 'guard_expr' ], expr )
-
-
-class _WorkflowSetup( BaseRegistryTests ):
-
- def _initSite( self ):
-
- self.root.site = Folder( id='site' )
- site = self.root.site
- self.root.site.portal_workflow = DummyWorkflowTool()
-
- return site
-
- def _initDCWorkflow( self, workflow_id ):
-
- wf_tool = self.root.site.portal_workflow
- wf_tool._setObject( workflow_id, DCWorkflowDefinition( workflow_id ) )
-
- return wf_tool._getOb( workflow_id )
-
- def _initVariables( self, dcworkflow ):
-
- for id, args in _WF_VARIABLES.items():
-
- dcworkflow.variables.addVariable( id )
- variable = dcworkflow.variables._getOb( id )
-
- ( descr, def_val, def_exp, for_cat, for_stat, upd_alw
- ) = args[ :-4 ]
-
- variable.setProperties( description=args[0]
- , default_value=args[1]
- , default_expr=args[2]
- , for_catalog=args[3]
- , for_status=args[4]
- , update_always=args[5]
- , props=self._genGuardProps( *args[ -4: ] )
- )
-
- def _initStates( self, dcworkflow ):
-
- dcworkflow.groups = _WF_GROUPS
-
- for k, v in _WF_STATES.items():
-
- dcworkflow.states.addState( k )
- state = dcworkflow.states._getOb( k )
-
- state.setProperties( title=v[ 0 ]
- , description=v[ 1 ]
- , transitions=v[ 2 ]
- )
- if not v[ 3 ]:
- state.permission_roles = {}
-
- for permission, roles in v[ 3 ].items():
- state.setPermission( permission
- , not isinstance( roles, tuple )
- , roles
- )
- faux_request = {}
-
- for group_id, roles in v[ 4 ]:
- for role in roles:
- faux_request[ '%s|%s' % ( group_id, role ) ] = True
-
- state.setGroups( REQUEST=faux_request )
-
- for k, v in v[ 5 ].items():
- state.addVariable( k, v )
-
- def _initTransitions( self, dcworkflow ):
-
- for k, v in _WF_TRANSITIONS.items():
-
- dcworkflow.transitions.addTransition( k )
- transition = dcworkflow.transitions._getOb( k )
-
- transition.setProperties( title=v[ 0 ]
- , description=v[ 1 ]
- , new_state_id=v[ 2 ]
- , trigger_type=v[ 3 ]
- , script_name=v[ 4 ]
- , after_script_name=v[ 5 ]
- , actbox_name=v[ 6 ]
- , actbox_url=v[ 7 ]
- , actbox_category=v[ 8 ]
- , props=self._genGuardProps( *v[ -4: ] )
- )
-
- for k, v in v[ 9 ].items():
- transition.addVariable( k, v )
-
- def _initWorklists( self, dcworkflow ):
-
- for k, v in _WF_WORKLISTS.items():
-
- dcworkflow.worklists.addWorklist( k )
- worklist = dcworkflow.worklists._getOb( k )
-
- worklist.title = v[ 0 ]
-
- props=self._genGuardProps( *v[ -4: ] )
-
- for var_id, matches in v[ 2 ].items():
- props[ 'var_match_%s' % var_id ] = ';'.join( matches )
-
- worklist.setProperties( description=v[ 1 ]
- , actbox_name=v[ 3 ]
- , actbox_url=v[ 4 ]
- , actbox_category=v[ 5 ]
- , props=props
- )
-
- def _initScripts( self, dcworkflow ):
-
- for k, v in _WF_SCRIPTS.items():
-
- if v[ 0 ] == PythonScript.meta_type:
- script = PythonScript( k )
- script.write( v[ 1 ] )
-
- elif v[ 0 ] == ExternalMethod.meta_type:
- script = ExternalMethod(k,'', v[3], v[4])
-
- else:
- raise ValueError, 'Unknown script type: %s' % v[ 0 ]
-
- dcworkflow.scripts._setObject( k, script )
-
-class WorkflowToolConfiguratorTests( _WorkflowSetup ):
-
- def _getTargetClass( self ):
-
- from Products.CMFSetup.workflow import WorkflowToolConfigurator
- return WorkflowToolConfigurator
-
- def test_listWorkflowInfo_empty( self ):
-
- site = self._initSite()
-
- configurator = self._makeOne( site ).__of__( site )
-
- self.assertEqual( len( configurator.listWorkflowInfo() ), 0 )
-
- def test_listWorkflowInfo_mixed( self ):
-
- from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition
-
- site = self._initSite()
-
- WF_ID_NON = 'non_dcworkflow'
- WF_TITLE_NON = 'Non-DCWorkflow'
- WF_ID_DC = 'dcworkflow'
- WF_TITLE_DC = 'DCWorkflow'
-
- site = self._initSite()
-
- wf_tool = site.portal_workflow
- nondcworkflow = DummyWorkflow( WF_TITLE_NON )
- nondcworkflow.title = WF_TITLE_NON
- wf_tool._setObject( WF_ID_NON, nondcworkflow )
-
- dcworkflow = self._initDCWorkflow( WF_ID_DC )
- dcworkflow.title = WF_TITLE_DC
-
- configurator = self._makeOne( site ).__of__( site )
-
- info_list = configurator.listWorkflowInfo()
-
- self.assertEqual( len( info_list ), 2 )
-
- non_info = [ x for x in info_list if x[ 'id' ] == WF_ID_NON ][0]
- self.assertEqual( non_info[ 'title' ], WF_TITLE_NON )
- self.assertEqual( non_info[ 'meta_type' ], DummyWorkflow.meta_type )
-
- dc_info = [ x for x in info_list if x[ 'id' ] == WF_ID_DC ][0]
- self.assertEqual( dc_info[ 'title' ], WF_TITLE_DC )
- self.assertEqual( dc_info[ 'meta_type' ]
- , DCWorkflowDefinition.meta_type )
- self.assertEqual( dc_info[ 'filename' ]
- , 'workflows/%s/definition.xml' % WF_ID_DC )
-
- def test_listWorkflowChains_no_default( self ):
-
- site = self._initSite()
- configurator = self._makeOne( site ).__of__( site )
-
- chains = configurator.listWorkflowChains()
-
- default_chain = [ x[1] for x in chains if x[0] is None ][0]
- self.assertEqual( len( default_chain ), 0 )
-
- def test_listWorkflowChains_with_default( self ):
-
- site = self._initSite()
- site.portal_workflow._default_chain = ( 'foo', 'bar' )
- configurator = self._makeOne( site ).__of__( site )
-
- chains = configurator.listWorkflowChains()
-
- self.assertEqual( chains[ 0 ][ 0 ], None )
- default_chain = chains[ 0 ][ 1 ]
- self.assertEqual( len( default_chain ), 2 )
- self.assertEqual( default_chain[ 0 ], 'foo' )
- self.assertEqual( default_chain[ 1 ], 'bar' )
-
- def test_listWorkflowChains_no_overrides( self ):
-
- site = self._initSite()
- configurator = self._makeOne( site ).__of__( site )
-
- chains = configurator.listWorkflowChains()
-
- self.assertEqual( len( chains ), 1 )
-
- def test_listWorkflowChains_with_overrides( self ):
-
- site = self._initSite()
- site.portal_workflow._chains_by_type[ 'qux' ] = ( 'foo', 'bar' )
- configurator = self._makeOne( site ).__of__( site )
-
- chains = configurator.listWorkflowChains()
-
- self.assertEqual( len( chains ), 2 )
-
- self.assertEqual( chains[ 0 ][ 0 ], None )
- default_chain = chains[ 0 ][ 1 ]
- self.assertEqual( len( default_chain ), 0 )
-
- self.assertEqual( chains[ 1 ][ 0 ], 'qux' )
- qux_chain = chains[ 1 ][ 1 ]
- self.assertEqual( len( qux_chain ), 2 )
- self.assertEqual( qux_chain[ 0 ], 'foo' )
- self.assertEqual( qux_chain[ 1 ], 'bar' )
-
- def test_listWorkflowChains_default_chain_plus_overrides( self ):
-
- site = self._initSite()
- site.portal_workflow._default_chain = ( 'foo', 'bar' )
- site.portal_workflow._chains_by_type[ 'qux' ] = ( 'baz', )
- configurator = self._makeOne( site ).__of__( site )
-
- chains = configurator.listWorkflowChains()
-
- self.assertEqual( chains[ 0 ][ 0 ], None )
- default_chain = chains[ 0 ][ 1 ]
- self.assertEqual( len( default_chain ), 2 )
- self.assertEqual( default_chain[ 0 ], 'foo' )
- self.assertEqual( default_chain[ 1 ], 'bar' )
-
- self.assertEqual( chains[ 1 ][ 0 ], 'qux' )
- qux_chain = chains[ 1 ][ 1 ]
- self.assertEqual( len( qux_chain ), 1 )
- self.assertEqual( qux_chain[ 0 ], 'baz' )
-
- def test_generateXML_empty( self ):
-
- site = self._initSite()
- configurator = self._makeOne( site ).__of__( site )
- self._compareDOM( configurator.generateXML(), _EMPTY_TOOL_EXPORT )
-
- def test_generateXML_default_chain_plus_overrides( self ):
-
- site = self._initSite()
- site.portal_workflow._default_chain = ( 'foo', 'bar' )
- site.portal_workflow._chains_by_type[ 'qux' ] = ( 'baz', )
-
- configurator = self._makeOne( site ).__of__( site )
-
- self._compareDOM( configurator.generateXML()
- , _OVERRIDE_TOOL_EXPORT )
-
- def test_generateXML_mixed( self ):
-
- WF_ID_NON = 'non_dcworkflow'
- WF_TITLE_NON = 'Non-DCWorkflow'
- WF_ID_DC = 'dcworkflow'
- WF_TITLE_DC = 'DCWorkflow'
-
- site = self._initSite()
-
- wf_tool = site.portal_workflow
- nondcworkflow = DummyWorkflow( WF_TITLE_NON )
- nondcworkflow.title = WF_TITLE_NON
- wf_tool._setObject( WF_ID_NON, nondcworkflow )
-
- dcworkflow = self._initDCWorkflow( WF_ID_DC )
- dcworkflow.title = WF_TITLE_DC
-
- configurator = self._makeOne( site ).__of__( site )
-
- self._compareDOM( configurator.generateXML(), _NORMAL_TOOL_EXPORT )
-
- def test_parseXML_empty( self ):
-
- site = self._initSite()
- configurator = self._makeOne( site ).__of__( site )
-
- tool_info = configurator.parseXML( _EMPTY_TOOL_EXPORT )
-
- self.assertEqual( len( tool_info[ 'workflows' ] ), 0 )
- self.assertEqual( len( tool_info[ 'bindings' ] ), 1 )
-
- def test_parseXML_default_chain_plus_overrides( self ):
-
- site = self._initSite()
- configurator = self._makeOne( site ).__of__( site )
-
- tool_info = configurator.parseXML( _OVERRIDE_TOOL_EXPORT )
-
- self.assertEqual( len( tool_info[ 'workflows' ] ), 0 )
- self.assertEqual( len( tool_info[ 'bindings' ] ), 2 )
-
- default = tool_info[ 'bindings' ][ None ]
- self.assertEqual( len( default ), 2 )
- self.assertEqual( default[ 0 ], 'foo' )
- self.assertEqual( default[ 1 ], 'bar' )
-
- override = tool_info[ 'bindings' ][ 'qux' ]
- self.assertEqual( len( override ), 1 )
- self.assertEqual( override[ 0 ], 'baz' )
-
- def test_parseXML_normal( self ):
-
- site = self._initSite()
- configurator = self._makeOne( site ).__of__( site )
-
- tool_info = configurator.parseXML( _NORMAL_TOOL_EXPORT )
-
- self.assertEqual( len( tool_info[ 'workflows' ] ), 2 )
-
- info = tool_info[ 'workflows' ][ 0 ]
- self.assertEqual( info[ 'workflow_id' ], 'non_dcworkflow' )
- self.assertEqual( info[ 'meta_type' ], DummyWorkflow.meta_type )
- self.assertEqual( info[ 'filename' ], None )
-
- info = tool_info[ 'workflows' ][ 1 ]
- self.assertEqual( info[ 'workflow_id' ], 'dcworkflow' )
- self.assertEqual( info[ 'meta_type' ], DCWorkflowDefinition.meta_type )
- self.assertEqual( info[ 'filename' ],
- 'workflows/dcworkflow/definition.xml' )
-
- self.assertEqual( len( tool_info[ 'bindings' ] ), 1 )
-
-
-class WorkflowDefinitionConfiguratorTests( _WorkflowSetup, _GuardChecker ):
-
- def _getTargetClass( self ):
-
- from Products.CMFSetup.workflow import WorkflowDefinitionConfigurator
- return WorkflowDefinitionConfigurator
-
- def test_getWorkflowInfo_non_dcworkflow( self ):
-
- WF_ID = 'dummy'
- WF_TITLE = 'Dummy'
-
- site = self._initSite()
- wf_tool = site.portal_workflow
- dummy = DummyWorkflow( WF_TITLE )
- wf_tool._setObject( WF_ID, dummy )
-
- dummy.title = WF_TITLE
-
- configurator = self._makeOne( site ).__of__( site )
- info = configurator.getWorkflowInfo( WF_ID )
-
- self.assertEqual( info[ 'id' ], WF_ID )
- self.assertEqual( info[ 'meta_type' ], DummyWorkflow.meta_type )
- self.assertEqual( info[ 'title' ], WF_TITLE )
-
- def test_getWorkflowInfo_dcworkflow_defaults( self ):
-
- WF_ID = 'dcworkflow_defaults'
-
- site = self._initSite()
- dcworkflow = self._initDCWorkflow( WF_ID )
-
- configurator = self._makeOne( site ).__of__( site )
- info = configurator.getWorkflowInfo( WF_ID )
-
- self.assertEqual( info[ 'id' ], WF_ID )
- self.assertEqual( info[ 'meta_type' ], DCWorkflowDefinition.meta_type )
- self.assertEqual( info[ 'title' ], dcworkflow.title )
-
- self.assertEqual( info[ 'state_variable' ], dcworkflow.state_var )
-
- self.assertEqual( len( info[ 'permissions' ] ), 0 )
- self.assertEqual( len( info[ 'variable_info' ] ), 0 )
- self.assertEqual( len( info[ 'state_info' ] ), 0 )
- self.assertEqual( len( info[ 'transition_info' ] ), 0 )
-
- def test_getWorkflowInfo_dcworkflow_permissions( self ):
-
- WF_ID = 'dcworkflow_permissions'
-
- site = self._initSite()
- dcworkflow = self._initDCWorkflow( WF_ID )
- dcworkflow.permissions = _WF_PERMISSIONS
-
- configurator = self._makeOne( site ).__of__( site )
- info = configurator.getWorkflowInfo( WF_ID )
-
- permissions = info[ 'permissions' ]
- self.assertEqual( len( permissions ), len( _WF_PERMISSIONS ) )
-
- for permission in _WF_PERMISSIONS:
- self.failUnless( permission in permissions )
-
- def test_getWorkflowInfo_dcworkflow_variables( self ):
-
- WF_ID = 'dcworkflow_variables'
-
- site = self._initSite()
- dcworkflow = self._initDCWorkflow( WF_ID )
- self._initVariables( dcworkflow )
-
- configurator = self._makeOne( site ).__of__( site )
- info = configurator.getWorkflowInfo( WF_ID )
-
- variable_info = info[ 'variable_info' ]
- self.assertEqual( len( variable_info ), len( _WF_VARIABLES ) )
-
- ids = [ x[ 'id' ] for x in variable_info ]
-
- for k in _WF_VARIABLES.keys():
- self.failUnless( k in ids )
-
- for info in variable_info:
-
- expected = _WF_VARIABLES[ info[ 'id' ] ]
-
- self.assertEqual( info[ 'description' ], expected[ 0 ] )
- self.assertEqual( info[ 'default_value' ], expected[ 1 ] )
- self.assertEqual( info[ 'default_expr' ], expected[ 2 ] )
- self.assertEqual( info[ 'for_catalog' ], expected[ 3 ] )
- self.assertEqual( info[ 'for_status' ], expected[ 4 ] )
- self.assertEqual( info[ 'update_always' ], expected[ 5 ] )
-
- self._assertGuard( info, *expected[ -4: ] )
-
- def test_getWorkflowInfo_dcworkflow_states( self ):
-
- WF_ID = 'dcworkflow_states'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
- dcworkflow = self._initDCWorkflow( WF_ID )
- dcworkflow.initial_state = WF_INITIAL_STATE
- self._initStates( dcworkflow )
-
- configurator = self._makeOne( site ).__of__( site )
- info = configurator.getWorkflowInfo( WF_ID )
-
- self.assertEqual( info[ 'state_variable' ], dcworkflow.state_var )
- self.assertEqual( info[ 'initial_state' ], dcworkflow.initial_state )
-
- state_info = info[ 'state_info' ]
- self.assertEqual( len( state_info ), len( _WF_STATES ) )
-
- ids = [ x[ 'id' ] for x in state_info ]
-
- for k in _WF_STATES.keys():
- self.failUnless( k in ids )
-
- for info in state_info:
-
- expected = _WF_STATES[ info[ 'id' ] ]
-
- self.assertEqual( info[ 'title' ], expected[ 0 ] )
- self.assertEqual( info[ 'description' ], expected[ 1 ] )
- self.assertEqual( info[ 'transitions' ], expected[ 2 ] )
-
- permissions = info[ 'permissions' ]
-
- self.assertEqual( len( permissions ), len( expected[ 3 ] ) )
-
- for ep_id, ep_roles in expected[ 3 ].items():
-
- fp = [ x for x in permissions if x[ 'name' ] == ep_id ][ 0 ]
-
- self.assertEqual( fp[ 'acquired' ]
- , not isinstance( ep_roles, tuple ) )
-
- self.assertEqual( len( fp[ 'roles' ] ), len( ep_roles ) )
-
- for ep_role in ep_roles:
- self.failUnless( ep_role in fp[ 'roles' ] )
-
- groups = info[ 'groups' ]
- self.assertEqual( len( groups ), len( expected[ 4 ] ) )
-
- for i in range( len( groups ) ):
- self.assertEqual( groups[ i ], expected[ 4 ][ i ] )
-
- variables = info[ 'variables' ]
- self.assertEqual( len( variables ), len( expected[ 5 ] ) )
-
- for v_info in variables:
-
- name, type, value = ( v_info[ 'name' ]
- , v_info[ 'type' ], v_info[ 'value' ] )
-
- self.assertEqual( value, expected[ 5 ][ name ] )
-
- if isinstance( value, bool ):
- self.assertEqual( type, 'bool' )
- elif isinstance( value, int ):
- self.assertEqual( type, 'int' )
- elif isinstance( value, float ):
- self.assertEqual( type, 'float' )
- elif isinstance( value, basestring ):
- self.assertEqual( type, 'string' )
-
- def test_getWorkflowInfo_dcworkflow_transitions( self ):
-
- from Products.CMFSetup.workflow import TRIGGER_TYPES
-
- WF_ID = 'dcworkflow_transitions'
-
- site = self._initSite()
- dcworkflow = self._initDCWorkflow( WF_ID )
- self._initTransitions( dcworkflow )
-
- configurator = self._makeOne( site ).__of__( site )
- info = configurator.getWorkflowInfo( WF_ID )
-
- transition_info = info[ 'transition_info' ]
- self.assertEqual( len( transition_info ), len( _WF_TRANSITIONS ) )
-
- ids = [ x[ 'id' ] for x in transition_info ]
-
- for k in _WF_TRANSITIONS.keys():
- self.failUnless( k in ids )
-
- for info in transition_info:
-
- expected = _WF_TRANSITIONS[ info[ 'id' ] ]
-
- self.assertEqual( info[ 'title' ], expected[ 0 ] )
- self.assertEqual( info[ 'description' ], expected[ 1 ] )
- self.assertEqual( info[ 'new_state_id' ], expected[ 2 ] )
- self.assertEqual( info[ 'trigger_type' ]
- , TRIGGER_TYPES[ expected[ 3 ] ] )
- self.assertEqual( info[ 'script_name' ], expected[ 4 ] )
- self.assertEqual( info[ 'after_script_name' ], expected[ 5 ] )
- self.assertEqual( info[ 'actbox_name' ], expected[ 6 ] )
- self.assertEqual( info[ 'actbox_url' ], expected[ 7 ] )
- self.assertEqual( info[ 'actbox_category' ], expected[ 8 ] )
-
- variables = info[ 'variables' ]
- self.assertEqual( len( variables ), len( expected[ 9 ] ) )
-
- for v_info in variables:
- self.assertEqual( v_info[ 'expr' ]
- , expected[ 9 ][ v_info[ 'name' ] ] )
-
- self._assertGuard( info, *expected[ -4: ] )
-
- def test_getWorkflowInfo_dcworkflow_worklists( self ):
-
- WF_ID = 'dcworkflow_worklists'
-
- site = self._initSite()
- dcworkflow = self._initDCWorkflow( WF_ID )
- self._initWorklists( dcworkflow )
-
- configurator = self._makeOne( site ).__of__( site )
- info = configurator.getWorkflowInfo( WF_ID )
-
- worklist_info = info[ 'worklist_info' ]
- self.assertEqual( len( worklist_info ), len( _WF_WORKLISTS ) )
-
- ids = [ x[ 'id' ] for x in worklist_info ]
-
- for k in _WF_WORKLISTS.keys():
- self.failUnless( k in ids )
-
- for info in worklist_info:
-
- expected = _WF_WORKLISTS[ info[ 'id' ] ]
-
- self.assertEqual( info[ 'title' ], expected[ 0 ] )
- self.assertEqual( info[ 'description' ], expected[ 1 ] )
- self.assertEqual( info[ 'actbox_name' ], expected[ 3 ] )
- self.assertEqual( info[ 'actbox_url' ], expected[ 4 ] )
- self.assertEqual( info[ 'actbox_category' ], expected[ 5 ] )
-
- var_match = info[ 'var_match' ]
- self.assertEqual( len( var_match ), len( expected[ 2 ] ) )
-
- for var_id, values_txt in var_match:
-
- values = [ x.strip() for x in values_txt.split( ';' ) ]
- e_values = expected[ 2 ][ var_id ]
- self.assertEqual( len( values ), len( e_values ) )
-
- for e_value in e_values:
- self.failUnless( e_value in values )
-
- self._assertGuard( info, *expected[ -4: ] )
-
- def test_getWorkflowInfo_dcworkflow_scripts( self ):
-
- WF_ID = 'dcworkflow_scripts'
-
- site = self._initSite()
- dcworkflow = self._initDCWorkflow( WF_ID )
- self._initScripts( dcworkflow )
-
- configurator = self._makeOne( site ).__of__( site )
- info = configurator.getWorkflowInfo( WF_ID )
-
- script_info = info[ 'script_info' ]
- self.assertEqual( len( script_info ), len( _WF_SCRIPTS ) )
-
- ids = [ x[ 'id' ] for x in script_info ]
-
- for k in _WF_SCRIPTS.keys():
- self.failUnless( k in ids )
-
- for info in script_info:
-
- expected = _WF_SCRIPTS[ info[ 'id' ] ]
-
- self.assertEqual( info[ 'meta_type' ], expected[ 0 ] )
- self.assertEqual( info[ 'body' ], expected[ 1 ] )
-
- if info[ 'meta_type' ] == PythonScript.meta_type:
- self.assertEqual( info[ 'filename' ]
- , expected[ 2 ] % WF_ID )
- else:
- self.assertEqual( info[ 'filename' ], expected[ 2 ] )
-
- def test_generateXML_empty( self ):
-
- WF_ID = 'empty'
- WF_TITLE = 'Empty DCWorkflow'
- WF_INITIAL_STATE = 'initial'
-
- site = self._initSite()
- dcworkflow = self._initDCWorkflow( WF_ID )
- dcworkflow.title = WF_TITLE
- dcworkflow.initial_state = WF_INITIAL_STATE
-
- configurator = self._makeOne( site ).__of__( site )
-
- self._compareDOM( configurator.generateWorkflowXML( WF_ID )
- , _EMPTY_WORKFLOW_EXPORT % ( WF_ID
- , WF_TITLE
- , WF_INITIAL_STATE
- ) )
-
- def test_generateWorkflowXML_nondc( self ):
-
- WF_ID_NON = 'non_dcworkflow'
- WF_TITLE_NON = 'Non-DCWorkflow'
-
- site = self._initSite()
-
- wf_tool = site.portal_workflow
- nondcworkflow = DummyWorkflow( WF_TITLE_NON )
- nondcworkflow.title = WF_TITLE_NON
- wf_tool._setObject( WF_ID_NON, nondcworkflow )
-
- configurator = self._makeOne( site ).__of__( site )
-
- self.assertEqual( configurator.generateWorkflowXML( WF_ID_NON ), None )
-
- def test_generateWorkflowXML_normal( self ):
-
- WF_ID = 'normal'
- WF_TITLE = 'Normal DCWorkflow'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
- dcworkflow = self._initDCWorkflow( WF_ID )
- dcworkflow.title = WF_TITLE
- dcworkflow.initial_state = WF_INITIAL_STATE
- dcworkflow.permissions = _WF_PERMISSIONS
- self._initVariables( dcworkflow )
- self._initStates( dcworkflow )
- self._initTransitions( dcworkflow )
- self._initWorklists( dcworkflow )
- self._initScripts( dcworkflow )
-
- configurator = self._makeOne( site ).__of__( site )
-
- self._compareDOM( configurator.generateWorkflowXML( WF_ID )
- , _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID
- , 'title' : WF_TITLE
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID.replace(' ', '_')
- } )
-
- def test_generateWorkflowXML_multiple( self ):
-
- WF_ID_1 = 'dc1'
- WF_TITLE_1 = 'Normal DCWorkflow #1'
- WF_ID_2 = 'dc2'
- WF_TITLE_2 = 'Normal DCWorkflow #2'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
-
- dcworkflow_1 = self._initDCWorkflow( WF_ID_1 )
- dcworkflow_1.title = WF_TITLE_1
- dcworkflow_1.initial_state = WF_INITIAL_STATE
- dcworkflow_1.permissions = _WF_PERMISSIONS
- self._initVariables( dcworkflow_1 )
- self._initStates( dcworkflow_1 )
- self._initTransitions( dcworkflow_1 )
- self._initWorklists( dcworkflow_1 )
- self._initScripts( dcworkflow_1 )
-
- dcworkflow_2 = self._initDCWorkflow( WF_ID_2 )
- dcworkflow_2.title = WF_TITLE_2
- dcworkflow_2.initial_state = WF_INITIAL_STATE
- dcworkflow_2.permissions = _WF_PERMISSIONS
- self._initVariables( dcworkflow_2 )
- self._initStates( dcworkflow_2 )
- self._initTransitions( dcworkflow_2 )
- self._initWorklists( dcworkflow_2 )
- self._initScripts( dcworkflow_2 )
-
- configurator = self._makeOne( site ).__of__( site )
-
- self._compareDOM( configurator.generateWorkflowXML( WF_ID_1 )
- , _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID_1
- , 'title' : WF_TITLE_1
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID_1.replace(' ', '_')
- } )
-
- self._compareDOM( configurator.generateWorkflowXML( WF_ID_2 )
- , _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID_2
- , 'title' : WF_TITLE_2
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID_2.replace(' ', '_')
- } )
-
- def test_parseWorkflowXML_empty( self ):
-
- WF_ID = 'empty'
- WF_TITLE = 'Empty DCWorkflow'
- WF_INITIAL_STATE = 'initial'
-
- site = self._initSite()
-
- configurator = self._makeOne( site ).__of__( site )
-
- ( workflow_id
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- ) = configurator.parseWorkflowXML( _EMPTY_WORKFLOW_EXPORT
- % ( WF_ID
- , WF_TITLE
- , WF_INITIAL_STATE
- ) )
-
- self.assertEqual( len( states ), 0 )
- self.assertEqual( len( transitions ), 0 )
- self.assertEqual( len( variables ), 0 )
- self.assertEqual( len( worklists ), 0 )
- self.assertEqual( len( permissions ), 0 )
- self.assertEqual( len( scripts ), 0 )
-
- def test_parseWorkflowXML_normal_attribs( self ):
-
- WF_ID = 'normal'
- WF_TITLE = 'Normal DCWorkflow'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
-
- configurator = self._makeOne( site ).__of__( site )
-
- ( workflow_id
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- ) = configurator.parseWorkflowXML(
- _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID
- , 'title' : WF_TITLE
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID.replace(' ', '_')
- } )
-
- self.assertEqual( workflow_id, WF_ID )
- self.assertEqual( title, WF_TITLE )
- self.assertEqual( state_variable, 'state' )
- self.assertEqual( initial_state, WF_INITIAL_STATE )
-
- def test_parseWorkflowXML_normal_states( self ):
-
- WF_ID = 'normal'
- WF_TITLE = 'Normal DCWorkflow'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
-
- configurator = self._makeOne( site ).__of__( site )
-
- ( workflow_id
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- ) = configurator.parseWorkflowXML(
- _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID
- , 'title' : WF_TITLE
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID.replace(' ', '_')
- } )
-
- self.assertEqual( len( states ), len( _WF_STATES ) )
-
- for state in states:
-
- state_id = state[ 'state_id' ]
- self.failUnless( state_id in _WF_STATES )
-
- expected = _WF_STATES[ state_id ]
-
- self.assertEqual( state[ 'title' ], expected[ 0 ] )
-
- description = ''.join( state[ 'description' ] )
- self.failUnless( expected[ 1 ] in description )
-
- self.assertEqual( tuple( state[ 'transitions' ] ), expected[ 2 ] )
- self.assertEqual( state[ 'permissions' ], expected[ 3 ] )
- self.assertEqual( tuple( state[ 'groups' ] )
- , tuple( expected[ 4 ] ) )
-
- for k, v_info in state[ 'variables' ].items():
-
- exp_value = expected[ 5 ][ k ]
- self.assertEqual( v_info[ 'value' ], str( exp_value ) )
-
- if isinstance( exp_value, bool ):
- self.assertEqual( v_info[ 'type' ], 'bool' )
- elif isinstance( exp_value, int ):
- self.assertEqual( v_info[ 'type' ], 'int' )
- elif isinstance( exp_value, float ):
- self.assertEqual( v_info[ 'type' ], 'float' )
- elif isinstance( exp_value, basestring ):
- self.assertEqual( v_info[ 'type' ], 'string' )
-
- def test_parseWorkflowXML_normal_transitions( self ):
-
- from Products.CMFSetup.workflow import TRIGGER_TYPES
-
- WF_ID = 'normal'
- WF_TITLE = 'Normal DCWorkflow'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
-
- configurator = self._makeOne( site ).__of__( site )
-
- ( workflow_id
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- ) = configurator.parseWorkflowXML(
- _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID
- , 'title' : WF_TITLE
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID.replace(' ', '_')
- } )
-
- self.assertEqual( len( transitions ), len( _WF_TRANSITIONS ) )
-
- for transition in transitions:
-
- transition_id = transition[ 'transition_id' ]
- self.failUnless( transition_id in _WF_TRANSITIONS )
-
- expected = _WF_TRANSITIONS[ transition_id ]
-
- self.assertEqual( transition[ 'title' ], expected[ 0 ] )
-
- description = ''.join( transition[ 'description' ] )
- self.failUnless( expected[ 1 ] in description )
-
- self.assertEqual( transition[ 'new_state' ], expected[ 2 ] )
- self.assertEqual( transition[ 'trigger' ]
- , TRIGGER_TYPES[ expected[ 3 ] ] )
- self.assertEqual( transition[ 'before_script' ], expected[ 4 ] )
- self.assertEqual( transition[ 'after_script' ]
- , expected[ 5 ] )
-
- action = transition[ 'action' ]
- self.assertEqual( action.get( 'name', '' ), expected[ 6 ] )
- self.assertEqual( action.get( 'url', '' ), expected[ 7 ] )
- self.assertEqual( action.get( 'category', '' ), expected[ 8 ] )
-
- self.assertEqual( transition[ 'variables' ], expected[ 9 ] )
-
- guard = transition[ 'guard' ]
- self.assertEqual( tuple( guard.get( 'permissions', () ) )
- , expected[ 10 ] )
- self.assertEqual( tuple( guard.get( 'roles', () ) )
- , expected[ 11 ] )
- self.assertEqual( tuple( guard.get( 'groups', () ) )
- , expected[ 12 ] )
- self.assertEqual( guard.get( 'expression', '' ), expected[ 13 ] )
-
- def test_parseWorkflowXML_normal_variables( self ):
-
- from Products.CMFSetup.workflow import TRIGGER_TYPES
-
- WF_ID = 'normal'
- WF_TITLE = 'Normal DCWorkflow'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
-
- configurator = self._makeOne( site ).__of__( site )
-
- ( workflow_id
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- ) = configurator.parseWorkflowXML(
- _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID
- , 'title' : WF_TITLE
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID.replace(' ', '_')
- } )
-
- self.assertEqual( len( variables ), len( _WF_VARIABLES ) )
-
- for variable in variables:
-
- variable_id = variable[ 'variable_id' ]
- self.failUnless( variable_id in _WF_VARIABLES )
-
- expected = _WF_VARIABLES[ variable_id ]
-
- description = ''.join( variable[ 'description' ] )
- self.failUnless( expected[ 0 ] in description )
-
- default = variable[ 'default' ]
- self.assertEqual( default[ 'value' ], expected[ 1 ] )
-
- exp_type = 'n/a'
-
- if expected[ 1 ]:
- exp_value = expected[ 1 ]
-
- if isinstance( exp_value, bool ):
- exp_type = 'bool'
- elif isinstance( exp_value, int ):
- exp_type = 'int'
- elif isinstance( exp_value, float ):
- exp_type = 'float'
- elif isinstance( exp_value, basestring ):
- exp_type = 'string'
- else:
- exp_type = 'XXX'
-
- self.assertEqual( default[ 'type' ], exp_type )
- self.assertEqual( default[ 'expression' ], expected[ 2 ] )
-
- self.assertEqual( variable[ 'for_catalog' ], expected[ 3 ] )
- self.assertEqual( variable[ 'for_status' ], expected[ 4 ] )
- self.assertEqual( variable[ 'update_always' ], expected[ 5 ] )
-
- guard = variable[ 'guard' ]
- self.assertEqual( tuple( guard.get( 'permissions', () ) )
- , expected[ 6 ] )
- self.assertEqual( tuple( guard.get( 'roles', () ) )
- , expected[ 7 ] )
- self.assertEqual( tuple( guard.get( 'groups', () ) )
- , expected[ 8 ] )
- self.assertEqual( guard.get( 'expression', '' ), expected[ 9 ] )
-
- def test_parseWorkflowXML_normal_worklists( self ):
-
- from Products.CMFSetup.workflow import TRIGGER_TYPES
-
- WF_ID = 'normal'
- WF_TITLE = 'Normal DCWorkflow'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
-
- configurator = self._makeOne( site ).__of__( site )
-
- ( workflow_id
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- ) = configurator.parseWorkflowXML(
- _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID
- , 'title' : WF_TITLE
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID.replace(' ', '_')
- } )
-
- self.assertEqual( len( worklists ), len( _WF_WORKLISTS ) )
-
- for worklist in worklists:
-
- worklist_id = worklist[ 'worklist_id' ]
- self.failUnless( worklist_id in _WF_WORKLISTS )
-
- expected = _WF_WORKLISTS[ worklist_id ]
-
- self.assertEqual( worklist[ 'title' ], expected[ 0 ] )
-
- description = ''.join( worklist[ 'description' ] )
- self.failUnless( expected[ 1 ] in description )
-
- self.assertEqual( tuple( worklist[ 'match' ] )
- , tuple( expected[ 2 ] ) )
-
- action = worklist[ 'action' ]
- self.assertEqual( action.get( 'name', '' ), expected[ 3 ] )
- self.assertEqual( action.get( 'url', '' ), expected[ 4 ] )
- self.assertEqual( action.get( 'category', '' ), expected[ 5 ] )
-
- guard = worklist[ 'guard' ]
- self.assertEqual( tuple( guard.get( 'permissions', () ) )
- , expected[ 6 ] )
- self.assertEqual( tuple( guard.get( 'roles', () ) )
- , expected[ 7 ] )
- self.assertEqual( tuple( guard.get( 'groups', () ) )
- , expected[ 8 ] )
- self.assertEqual( guard.get( 'expression', '' ), expected[ 9 ] )
-
- def test_parseWorkflowXML_normal_permissions( self ):
-
- from Products.CMFSetup.workflow import TRIGGER_TYPES
-
- WF_ID = 'normal'
- WF_TITLE = 'Normal DCWorkflow'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
-
- configurator = self._makeOne( site ).__of__( site )
-
- ( workflow_id
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- ) = configurator.parseWorkflowXML(
- _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID
- , 'title' : WF_TITLE
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID.replace(' ', '_')
- } )
-
- self.assertEqual( len( permissions ), len( _WF_PERMISSIONS ) )
-
- for permission in permissions:
-
- self.failUnless( permission in _WF_PERMISSIONS )
-
- def test_parseWorkflowXML_normal_scripts( self ):
-
- from Products.CMFSetup.workflow import TRIGGER_TYPES
-
- WF_ID = 'normal'
- WF_TITLE = 'Normal DCWorkflow'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
-
- configurator = self._makeOne( site ).__of__( site )
-
- ( workflow_id
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- ) = configurator.parseWorkflowXML(
- _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID
- , 'title' : WF_TITLE
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID.replace(' ', '_')
- } )
-
- self.assertEqual( len( scripts ), len( _WF_SCRIPTS ) )
-
- for script in scripts:
-
- script_id = script[ 'script_id' ]
- self.failUnless( script_id in _WF_SCRIPTS )
-
- expected = _WF_SCRIPTS[ script_id ]
-
- self.assertEqual( script[ 'meta_type' ], expected[ 0 ] )
-
- # Body is not kept as part of the workflow XML
-
- if script[ 'meta_type' ] == PythonScript.meta_type:
- self.assertEqual( script[ 'filename' ]
- , expected[ 2 ] % workflow_id )
- else:
- self.assertEqual( script[ 'filename' ], expected[ 2 ] )
-
-
-_WF_PERMISSIONS = \
-( 'Open content for modifications'
-, 'Modify content'
-, 'Query history'
-, 'Restore expired content'
-)
-
-_WF_GROUPS = \
-( 'Content_owners'
-, 'Content_assassins'
-)
-
-_WF_VARIABLES = \
-{ 'when_opened': ( 'Opened when'
- , ''
- , "python:None"
- , True
- , False
- , True
- , ( 'Query history', 'Open content for modifications' )
- , ()
- , ()
- , ""
- )
-, 'when_expired': ( 'Expired when'
- , ''
- , "nothing"
- , True
- , False
- , True
- , ( 'Query history', 'Open content for modifications' )
- , ()
- , ()
- , ""
- )
-, 'killed_by': ( 'Killed by'
- , 'n/a'
- , ""
- , True
- , False
- , True
- , ()
- , ( 'Hangman', 'Sherrif' )
- , ()
- , ""
- )
-}
-
-_WF_STATES = \
-{ 'closed': ( 'Closed'
- , 'Closed for modifications'
- , ( 'open', 'kill', 'expire' )
- , { 'Modify content': () }
- , ()
- , { 'is_opened': False, 'is_closed': True }
- )
-, 'opened': ( 'Opened'
- , 'Open for modifications'
- , ( 'close', 'kill', 'expire' )
- , { 'Modify content': [ 'Owner', 'Manager' ] }
- , [ ( 'Content_owners', ( 'Owner', ) ) ]
- , { 'is_opened': True, 'is_closed': False }
- )
-, 'killed': ( 'Killed'
- , 'Permanently unavailable'
- , ()
- , {}
- , ()
- , {}
- )
-, 'expired': ( 'Expired'
- , 'Expiration date has passed'
- , ( 'open', )
- , { 'Modify content': [ 'Owner', 'Manager' ] }
- , ()
- , { 'is_opened': False, 'is_closed': False }
- )
-}
-
-_WF_TRANSITIONS = \
-{ 'open': ( 'Open'
- , 'Open the object for modifications'
- , 'opened'
- , TRIGGER_USER_ACTION
- , 'before_open'
- , ''
- , 'Open'
- , 'string:${object_url}/open_for_modifications'
- , 'workflow'
- , { 'when_opened' : 'object/ZopeTime' }
- , ( 'Open content for modifications', )
- , ()
- , ()
- , ""
- )
-, 'close': ( 'Close'
- , 'Close the object for modifications'
- , 'closed'
- , TRIGGER_USER_ACTION
- , ''
- , 'after_close'
- , 'Close'
- , 'string:${object_url}/close_for_modifications'
- , 'workflow'
- , {}
- , ()
- , ( 'Owner', 'Manager' )
- , ()
- , ""
- )
-, 'kill': ( 'Kill'
- , 'Make the object permanently unavailable.'
- , 'killed'
- , TRIGGER_USER_ACTION
- , ''
- , 'after_kill'
- , 'Kill'
- , 'string:${object_url}/kill_object'
- , 'workflow'
- , { 'killed_by' : 'string:${user/getId}' }
- , ()
- , ()
- , ( 'Content_assassins', )
- , ""
- )
-, 'expire': ( 'Expire'
- , 'Retire objects whose expiration is past.'
- , 'expired'
- , TRIGGER_AUTOMATIC
- , 'before_expire'
- , ''
- , ''
- , ''
- , ''
- , { 'when_expired' : 'object/ZopeTime' }
- , ()
- , ()
- , ()
- , "python: object.expiration() <= object.ZopeTime()"
- )
-}
-
-_WF_WORKLISTS = \
-{ 'expired_list': ( 'Expired'
- , 'Worklist for expired content'
- , { 'state' : ( 'expired', ) }
- , 'Expired items'
- , 'string:${portal_url}/expired_items'
- , 'workflow'
- , ( 'Restore expired content', )
- , ()
- , ()
- , ""
- )
-, 'alive_list': ( 'Alive'
- , 'Worklist for content not yet expired / killed'
- , { 'state' : ( 'open', 'closed' ) }
- , 'Expired items'
- , 'string:${portal_url}/expired_items'
- , 'workflow'
- , ( 'Restore expired content', )
- , ()
- , ()
- , ""
- )
-}
-
-_BEFORE_OPEN_SCRIPT = """\
-## Script (Python) "before_open"
-##bind container=container
-##bind context=context
-##bind namespace=
-##bind script=script
-##bind subpath=traverse_subpath
-##parameters=
-##title=
-##
-return 'before_open'
-"""
-
-_AFTER_CLOSE_SCRIPT = """\
-## Script (Python) "after_close"
-##bind container=container
-##bind context=context
-##bind namespace=
-##bind script=script
-##bind subpath=traverse_subpath
-##parameters=
-##title=
-##
-return 'after_close'
-"""
-
-_AFTER_KILL_SCRIPT = """\
-## Script (Python) "after_kill"
-##bind container=container
-##bind context=context
-##bind namespace=
-##bind script=script
-##bind subpath=traverse_subpath
-##parameters=
-##title=
-##
-return 'after_kill'
-"""
-
-_WF_SCRIPTS = \
-{ 'before_open': ( PythonScript.meta_type
- , _BEFORE_OPEN_SCRIPT
- , 'workflows/%s/scripts/before_open.py'
- , None
- , None
- )
-, 'after_close': ( PythonScript.meta_type
- , _AFTER_CLOSE_SCRIPT
- , 'workflows/%s/scripts/after_close.py'
- , None
- , None
- )
-, 'after_kill': ( PythonScript.meta_type
- , _AFTER_KILL_SCRIPT
- , 'workflows/%s/scripts/after_kill.py'
- , None
- , None
- )
-, 'before_expire': ( ExternalMethod.meta_type
- , ''
- , ''
- , 'CMFSetup.test_method'
- , 'test'
- )
-}
-
-_EMPTY_TOOL_EXPORT = """\
-<?xml version="1.0"?>
-<workflow-tool>
- <bindings>
- <default>
- </default>
- </bindings>
-</workflow-tool>
-"""
-
-_BINDINGS_TOOL_EXPORT = """\
-<?xml version="1.0"?>
-<workflow-tool>
- <bindings>
- <default>
- <bound-workflow workflow_id="non_dcworkflow_0" />
- <bound-workflow workflow_id="non_dcworkflow_1" />
- </default>
- <type type_id="sometype">
- <bound-workflow workflow_id="non_dcworkflow_2" />
- </type>
- <type type_id="anothertype">
- <bound-workflow workflow_id="non_dcworkflow_3" />
- </type>
- </bindings>
-</workflow-tool>
-"""
-
-_OVERRIDE_TOOL_EXPORT = """\
-<?xml version="1.0"?>
-<workflow-tool>
- <bindings>
- <default>
- <bound-workflow workflow_id="foo" />
- <bound-workflow workflow_id="bar" />
- </default>
- <type type_id="qux">
- <bound-workflow workflow_id="baz" />
- </type>
- </bindings>
-</workflow-tool>
-"""
-
-_NORMAL_TOOL_EXPORT = """\
-<?xml version="1.0"?>
-<workflow-tool>
- <workflow
- workflow_id="non_dcworkflow"
- meta_type="Dummy Workflow"
- />
- <workflow
- workflow_id="dcworkflow"
- filename="workflows/dcworkflow/definition.xml"
- meta_type="Workflow"
- />
- <bindings>
- <default>
- </default>
- </bindings>
-</workflow-tool>
-"""
-
-_NORMAL_TOOL_EXPORT_WITH_FILENAME = """\
-<?xml version="1.0"?>
-<workflow-tool>
- <workflow
- workflow_id="non_dcworkflow"
- meta_type="Dummy Workflow"
- />
- <workflow
- workflow_id="dcworkflow"
- filename="workflows/%s/definition.xml"
- meta_type="Workflow"
- />
- <bindings>
- <default>
- </default>
- </bindings>
-</workflow-tool>
-"""
-
-_FILENAME_TOOL_EXPORT = """\
-<?xml version="1.0"?>
-<workflow-tool>
- <workflow
- workflow_id="name with spaces"
- filename="workflows/name_with_spaces/definition.xml"
- meta_type="Workflow"
- />
- <bindings>
- <default>
- </default>
- </bindings>
-</workflow-tool>
-"""
-
-_EMPTY_WORKFLOW_EXPORT = """\
-<?xml version="1.0"?>
-<dc-workflow
- workflow_id="%s"
- title="%s"
- state_variable="state"
- initial_state="%s">
-</dc-workflow>
-"""
-
-# Make sure old exports are still imported well. Changes:
-# - scripts are now in in a 'scripts' subdirectory
-_OLD_WORKFLOW_EXPORT = """\
-<?xml version="1.0"?>
-<dc-workflow
- workflow_id="%(workflow_id)s"
- title="%(title)s"
- state_variable="state"
- initial_state="%(initial_state)s">
- <permission>Open content for modifications</permission>
- <permission>Modify content</permission>
- <permission>Query history</permission>
- <permission>Restore expired content</permission>
- <state
- state_id="closed"
- title="Closed">
- <description>Closed for modifications</description>
- <exit-transition
- transition_id="open"/>
- <exit-transition
- transition_id="kill"/>
- <exit-transition
- transition_id="expire"/>
- <permission-map
- acquired="False"
- name="Modify content">
- </permission-map>
- <assignment
- name="is_closed"
- type="bool">True</assignment>
- <assignment
- name="is_opened"
- type="bool">False</assignment>
- </state>
- <state
- state_id="expired"
- title="Expired">
- <description>Expiration date has passed</description>
- <exit-transition
- transition_id="open"/>
- <permission-map
- acquired="True"
- name="Modify content">
- <permission-role>Owner</permission-role>
- <permission-role>Manager</permission-role>
- </permission-map>
- <assignment
- name="is_closed"
- type="bool">False</assignment>
- <assignment
- name="is_opened"
- type="bool">False</assignment>
- </state>
- <state
- state_id="killed"
- title="Killed">
- <description>Permanently unavailable</description>
- </state>
- <state
- state_id="opened"
- title="Opened">
- <description>Open for modifications</description>
- <exit-transition
- transition_id="close"/>
- <exit-transition
- transition_id="kill"/>
- <exit-transition
- transition_id="expire"/>
- <permission-map
- acquired="True"
- name="Modify content">
- <permission-role>Owner</permission-role>
- <permission-role>Manager</permission-role>
- </permission-map>
- <group-map name="Content_owners">
- <group-role>Owner</group-role>
- </group-map>
- <assignment
- name="is_closed"
- type="bool">False</assignment>
- <assignment
- name="is_opened"
- type="bool">True</assignment>
- </state>
- <transition
- transition_id="close"
- title="Close"
- trigger="USER"
- new_state="closed"
- before_script=""
- after_script="after_close">
- <description>Close the object for modifications</description>
- <action
- category="workflow"
- url="string:${object_url}/close_for_modifications">Close</action>
- <guard>
- <guard-role>Owner</guard-role>
- <guard-role>Manager</guard-role>
- </guard>
- </transition>
- <transition
- transition_id="expire"
- title="Expire"
- trigger="AUTOMATIC"
- new_state="expired"
- before_script="before_expire"
- after_script="">
- <description>Retire objects whose expiration is past.</description>
- <guard>
- <guard-expression>python: object.expiration() <= object.ZopeTime()</guard-expression>
- </guard>
- <assignment
- name="when_expired">object/ZopeTime</assignment>
- </transition>
- <transition
- transition_id="kill"
- title="Kill"
- trigger="USER"
- new_state="killed"
- before_script=""
- after_script="after_kill">
- <description>Make the object permanently unavailable.</description>
- <action
- category="workflow"
- url="string:${object_url}/kill_object">Kill</action>
- <guard>
- <guard-group>Content_assassins</guard-group>
- </guard>
- <assignment
- name="killed_by">string:${user/getId}</assignment>
- </transition>
- <transition
- transition_id="open"
- title="Open"
- trigger="USER"
- new_state="opened"
- before_script="before_open"
- after_script="">
- <description>Open the object for modifications</description>
- <action
- category="workflow"
- url="string:${object_url}/open_for_modifications">Open</action>
- <guard>
- <guard-permission>Open content for modifications</guard-permission>
- </guard>
- <assignment
- name="when_opened">object/ZopeTime</assignment>
- </transition>
- <worklist
- worklist_id="alive_list"
- title="Alive">
- <description>Worklist for content not yet expired / killed</description>
- <action
- category="workflow"
- url="string:${portal_url}/expired_items">Expired items</action>
- <guard>
- <guard-permission>Restore expired content</guard-permission>
- </guard>
- <match name="state" values="open; closed"/>
- </worklist>
- <worklist
- worklist_id="expired_list"
- title="Expired">
- <description>Worklist for expired content</description>
- <action
- category="workflow"
- url="string:${portal_url}/expired_items">Expired items</action>
- <guard>
- <guard-permission>Restore expired content</guard-permission>
- </guard>
- <match name="state" values="expired"/>
- </worklist>
- <variable
- variable_id="killed_by"
- for_catalog="True"
- for_status="False"
- update_always="True">
- <description>Killed by</description>
- <default>
- <value type="string">n/a</value>
- </default>
- <guard>
- <guard-role>Hangman</guard-role>
- <guard-role>Sherrif</guard-role>
- </guard>
- </variable>
- <variable
- variable_id="when_expired"
- for_catalog="True"
- for_status="False"
- update_always="True">
- <description>Expired when</description>
- <default>
- <expression>nothing</expression>
- </default>
- <guard>
- <guard-permission>Query history</guard-permission>
- <guard-permission>Open content for modifications</guard-permission>
- </guard>
- </variable>
- <variable
- variable_id="when_opened"
- for_catalog="True"
- for_status="False"
- update_always="True">
- <description>Opened when</description>
- <default>
- <expression>python:None</expression>
- </default>
- <guard>
- <guard-permission>Query history</guard-permission>
- <guard-permission>Open content for modifications</guard-permission>
- </guard>
- </variable>
- <script
- script_id="after_close"
- type="Script (Python)"
- filename="workflows/%(workflow_filename)s/after_close.py"
- module=""
- function=""
- />
- <script
- script_id="after_kill"
- type="Script (Python)"
- filename="workflows/%(workflow_filename)s/after_kill.py"
- module=""
- function=""
- />
- <script
- script_id="before_expire"
- type="External Method"
- filename=""
- module="CMFSetup.test_method"
- function="test"
- />
- <script
- script_id="before_open"
- type="Script (Python)"
- filename="workflows/%(workflow_filename)s/before_open.py"
- module=""
- function=""
- />
-</dc-workflow>
-"""
-
-_NORMAL_WORKFLOW_EXPORT = """\
-<?xml version="1.0"?>
-<dc-workflow
- workflow_id="%(workflow_id)s"
- title="%(title)s"
- state_variable="state"
- initial_state="%(initial_state)s">
- <permission>Open content for modifications</permission>
- <permission>Modify content</permission>
- <permission>Query history</permission>
- <permission>Restore expired content</permission>
- <state
- state_id="closed"
- title="Closed">
- <description>Closed for modifications</description>
- <exit-transition
- transition_id="open"/>
- <exit-transition
- transition_id="kill"/>
- <exit-transition
- transition_id="expire"/>
- <permission-map
- acquired="False"
- name="Modify content">
- </permission-map>
- <assignment
- name="is_closed"
- type="bool">True</assignment>
- <assignment
- name="is_opened"
- type="bool">False</assignment>
- </state>
- <state
- state_id="expired"
- title="Expired">
- <description>Expiration date has passed</description>
- <exit-transition
- transition_id="open"/>
- <permission-map
- acquired="True"
- name="Modify content">
- <permission-role>Owner</permission-role>
- <permission-role>Manager</permission-role>
- </permission-map>
- <assignment
- name="is_closed"
- type="bool">False</assignment>
- <assignment
- name="is_opened"
- type="bool">False</assignment>
- </state>
- <state
- state_id="killed"
- title="Killed">
- <description>Permanently unavailable</description>
- </state>
- <state
- state_id="opened"
- title="Opened">
- <description>Open for modifications</description>
- <exit-transition
- transition_id="close"/>
- <exit-transition
- transition_id="kill"/>
- <exit-transition
- transition_id="expire"/>
- <permission-map
- acquired="True"
- name="Modify content">
- <permission-role>Owner</permission-role>
- <permission-role>Manager</permission-role>
- </permission-map>
- <group-map name="Content_owners">
- <group-role>Owner</group-role>
- </group-map>
- <assignment
- name="is_closed"
- type="bool">False</assignment>
- <assignment
- name="is_opened"
- type="bool">True</assignment>
- </state>
- <transition
- transition_id="close"
- title="Close"
- trigger="USER"
- new_state="closed"
- before_script=""
- after_script="after_close">
- <description>Close the object for modifications</description>
- <action
- category="workflow"
- url="string:${object_url}/close_for_modifications">Close</action>
- <guard>
- <guard-role>Owner</guard-role>
- <guard-role>Manager</guard-role>
- </guard>
- </transition>
- <transition
- transition_id="expire"
- title="Expire"
- trigger="AUTOMATIC"
- new_state="expired"
- before_script="before_expire"
- after_script="">
- <description>Retire objects whose expiration is past.</description>
- <guard>
- <guard-expression>python: object.expiration() <= object.ZopeTime()</guard-expression>
- </guard>
- <assignment
- name="when_expired">object/ZopeTime</assignment>
- </transition>
- <transition
- transition_id="kill"
- title="Kill"
- trigger="USER"
- new_state="killed"
- before_script=""
- after_script="after_kill">
- <description>Make the object permanently unavailable.</description>
- <action
- category="workflow"
- url="string:${object_url}/kill_object">Kill</action>
- <guard>
- <guard-group>Content_assassins</guard-group>
- </guard>
- <assignment
- name="killed_by">string:${user/getId}</assignment>
- </transition>
- <transition
- transition_id="open"
- title="Open"
- trigger="USER"
- new_state="opened"
- before_script="before_open"
- after_script="">
- <description>Open the object for modifications</description>
- <action
- category="workflow"
- url="string:${object_url}/open_for_modifications">Open</action>
- <guard>
- <guard-permission>Open content for modifications</guard-permission>
- </guard>
- <assignment
- name="when_opened">object/ZopeTime</assignment>
- </transition>
- <worklist
- worklist_id="alive_list"
- title="Alive">
- <description>Worklist for content not yet expired / killed</description>
- <action
- category="workflow"
- url="string:${portal_url}/expired_items">Expired items</action>
- <guard>
- <guard-permission>Restore expired content</guard-permission>
- </guard>
- <match name="state" values="open; closed"/>
- </worklist>
- <worklist
- worklist_id="expired_list"
- title="Expired">
- <description>Worklist for expired content</description>
- <action
- category="workflow"
- url="string:${portal_url}/expired_items">Expired items</action>
- <guard>
- <guard-permission>Restore expired content</guard-permission>
- </guard>
- <match name="state" values="expired"/>
- </worklist>
- <variable
- variable_id="killed_by"
- for_catalog="True"
- for_status="False"
- update_always="True">
- <description>Killed by</description>
- <default>
- <value type="string">n/a</value>
- </default>
- <guard>
- <guard-role>Hangman</guard-role>
- <guard-role>Sherrif</guard-role>
- </guard>
- </variable>
- <variable
- variable_id="when_expired"
- for_catalog="True"
- for_status="False"
- update_always="True">
- <description>Expired when</description>
- <default>
- <expression>nothing</expression>
- </default>
- <guard>
- <guard-permission>Query history</guard-permission>
- <guard-permission>Open content for modifications</guard-permission>
- </guard>
- </variable>
- <variable
- variable_id="when_opened"
- for_catalog="True"
- for_status="False"
- update_always="True">
- <description>Opened when</description>
- <default>
- <expression>python:None</expression>
- </default>
- <guard>
- <guard-permission>Query history</guard-permission>
- <guard-permission>Open content for modifications</guard-permission>
- </guard>
- </variable>
- <script
- script_id="after_close"
- type="Script (Python)"
- filename="workflows/%(workflow_filename)s/scripts/after_close.py"
- module=""
- function=""
- />
- <script
- script_id="after_kill"
- type="Script (Python)"
- filename="workflows/%(workflow_filename)s/scripts/after_kill.py"
- module=""
- function=""
- />
- <script
- script_id="before_expire"
- type="External Method"
- filename=""
- module="CMFSetup.test_method"
- function="test"
- />
- <script
- script_id="before_open"
- type="Script (Python)"
- filename="workflows/%(workflow_filename)s/scripts/before_open.py"
- module=""
- function=""
- />
-</dc-workflow>
-"""
-
-class Test_exportWorkflow( _WorkflowSetup
- , _GuardChecker
- ):
-
- def test_empty( self ):
-
- site = self._initSite()
- context = DummyExportContext( site )
-
- from Products.CMFSetup.workflow import exportWorkflowTool
- exportWorkflowTool( context )
-
- self.assertEqual( len( context._wrote ), 1 )
- filename, text, content_type = context._wrote[ 0 ]
- self.assertEqual( filename, 'workflows.xml' )
- self._compareDOM( text, _EMPTY_TOOL_EXPORT )
- self.assertEqual( content_type, 'text/xml' )
-
- def test_normal( self ):
-
- WF_ID_NON = 'non_dcworkflow'
- WF_TITLE_NON = 'Non-DCWorkflow'
- WF_ID_DC = 'dcworkflow'
- WF_TITLE_DC = 'DCWorkflow'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
-
- wf_tool = site.portal_workflow
- nondcworkflow = DummyWorkflow( WF_TITLE_NON )
- nondcworkflow.title = WF_TITLE_NON
- wf_tool._setObject( WF_ID_NON, nondcworkflow )
-
- dcworkflow = self._initDCWorkflow( WF_ID_DC )
- dcworkflow.title = WF_TITLE_DC
- dcworkflow.initial_state = WF_INITIAL_STATE
- dcworkflow.permissions = _WF_PERMISSIONS
- self._initVariables( dcworkflow )
- self._initStates( dcworkflow )
- self._initTransitions( dcworkflow )
- self._initWorklists( dcworkflow )
- self._initScripts( dcworkflow )
-
- context = DummyExportContext( site )
-
- from Products.CMFSetup.workflow import exportWorkflowTool
- exportWorkflowTool( context )
-
- # workflows list, wf defintion and 3 scripts
- self.assertEqual( len( context._wrote ), 5 )
-
- filename, text, content_type = context._wrote[ 0 ]
- self.assertEqual( filename, 'workflows.xml' )
- self._compareDOM( text, _NORMAL_TOOL_EXPORT )
- self.assertEqual( content_type, 'text/xml' )
-
- filename, text, content_type = context._wrote[ 1 ]
- self.assertEqual( filename, 'workflows/%s/definition.xml' % WF_ID_DC )
- self._compareDOM( text
- , _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID_DC
- , 'title' : WF_TITLE_DC
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID_DC.replace(' ', '_')
- } )
- self.assertEqual( content_type, 'text/xml' )
-
- # just testing first script
- filename, text, content_type = context._wrote[ 2 ]
- self.assertEqual( filename, 'workflows/%s/scripts/after_close.py' % WF_ID_DC )
- self.assertEqual( text, _AFTER_CLOSE_SCRIPT)
- self.assertEqual( content_type, 'text/plain' )
-
- def test_with_filenames( self ):
-
- WF_ID_DC = 'name with spaces'
- WF_TITLE_DC = 'DCWorkflow with spaces'
- WF_INITIAL_STATE = 'closed'
-
- site = self._initSite()
-
- dcworkflow = self._initDCWorkflow( WF_ID_DC )
- dcworkflow.title = WF_TITLE_DC
- dcworkflow.initial_state = WF_INITIAL_STATE
- dcworkflow.permissions = _WF_PERMISSIONS
- self._initVariables( dcworkflow )
- self._initStates( dcworkflow )
- self._initTransitions( dcworkflow )
- self._initWorklists( dcworkflow )
- self._initScripts( dcworkflow )
-
- context = DummyExportContext( site )
-
- from Products.CMFSetup.workflow import exportWorkflowTool
- exportWorkflowTool( context )
-
- # workflows list, wf defintion and 3 scripts
- self.assertEqual( len( context._wrote ), 5 )
-
- filename, text, content_type = context._wrote[ 0 ]
- self.assertEqual( filename, 'workflows.xml' )
- self._compareDOM( text, _FILENAME_TOOL_EXPORT )
- self.assertEqual( content_type, 'text/xml' )
-
- filename, text, content_type = context._wrote[ 1 ]
- self.assertEqual( filename
- , 'workflows/name_with_spaces/definition.xml' )
- self._compareDOM( text
- , _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : WF_ID_DC
- , 'title' : WF_TITLE_DC
- , 'initial_state' : WF_INITIAL_STATE
- , 'workflow_filename' : WF_ID_DC.replace(' ', '_')
- } )
- self.assertEqual( content_type, 'text/xml' )
-
- # just testing first script
- filename, text, content_type = context._wrote[ 2 ]
- self.assertEqual( filename, 'workflows/%s/scripts/after_close.py' %
- WF_ID_DC.replace(' ', '_'))
- self.assertEqual( text, _AFTER_CLOSE_SCRIPT)
- self.assertEqual( content_type, 'text/plain' )
-
-class Test_importWorkflow( _WorkflowSetup
- , _GuardChecker
- ):
-
- def _importNormalWorkflow( self, wf_id, wf_title, wf_initial_state ):
-
- site = self._initSite()
- wf_tool = site.portal_workflow
- workflow_filename = wf_id.replace(' ', '_')
-
- context = DummyImportContext( site )
- context._files[ 'workflows.xml'
- ] = _NORMAL_TOOL_EXPORT_WITH_FILENAME % workflow_filename
-
- context._files[ 'workflows/%s/definition.xml' % wf_id
- ] = ( _NORMAL_WORKFLOW_EXPORT
- % { 'workflow_id' : wf_id
- , 'title' : wf_title
- , 'initial_state' : wf_initial_state
- , 'workflow_filename' : workflow_filename
- }
- )
-
- context._files[ 'workflows/%s/scripts/after_close.py' % workflow_filename
- ] = _AFTER_CLOSE_SCRIPT
-
- context._files[ 'workflows/%s/scripts/after_kill.py' % workflow_filename
- ] = _AFTER_KILL_SCRIPT
-
- context._files[ 'workflows/%s/scripts/before_open.py' % workflow_filename
- ] = _BEFORE_OPEN_SCRIPT
-
- from Products.CMFSetup.workflow import importWorkflowTool
- importWorkflowTool( context )
-
- return wf_tool
-
- def _importOldWorkflow( self, wf_id, wf_title, wf_initial_state ):
-
- site = self._initSite()
- wf_tool = site.portal_workflow
- workflow_filename = wf_id.replace(' ', '_')
-
- context = DummyImportContext( site )
- context._files[ 'workflows.xml'
- ] = _NORMAL_TOOL_EXPORT_WITH_FILENAME % workflow_filename
-
- context._files[ 'workflows/%s/definition.xml' % wf_id
- ] = ( _OLD_WORKFLOW_EXPORT
- % { 'workflow_id' : wf_id
- , 'title' : wf_title
- , 'initial_state' : wf_initial_state
- , 'workflow_filename' : workflow_filename
- }
- )
-
- context._files[ 'workflows/%s/after_close.py' % workflow_filename
- ] = _AFTER_CLOSE_SCRIPT
-
- context._files[ 'workflows/%s/after_kill.py' % workflow_filename
- ] = _AFTER_KILL_SCRIPT
-
- context._files[ 'workflows/%s/before_open.py' % workflow_filename
- ] = _BEFORE_OPEN_SCRIPT
-
- from Products.CMFSetup.workflow import importWorkflowTool
- importWorkflowTool( context )
-
- return wf_tool
-
- def test_empty_default_purge( self ):
-
- WF_ID_NON = 'non_dcworkflow_%s'
- WF_TITLE_NON = 'Non-DCWorkflow #%s'
-
- site = self._initSite()
- wf_tool = site.portal_workflow
-
- for i in range( 4 ):
- nondcworkflow = DummyWorkflow( WF_TITLE_NON % i )
- nondcworkflow.title = WF_TITLE_NON % i
- wf_tool._setObject( WF_ID_NON % i, nondcworkflow )
-
- wf_tool._default_chain = ( WF_ID_NON % 1, )
- wf_tool._chains_by_type[ 'sometype' ] = ( WF_ID_NON % 2, )
- self.assertEqual( len( wf_tool.objectIds() ), 4 )
-
- context = DummyImportContext( site )
- context._files[ 'workflows.xml' ] = _EMPTY_TOOL_EXPORT
-
- from Products.CMFSetup.workflow import importWorkflowTool
- importWorkflowTool( context )
-
- self.assertEqual( len( wf_tool.objectIds() ), 0 )
- self.assertEqual( len( wf_tool._default_chain ), 0 )
- self.assertEqual( len( wf_tool._chains_by_type ), 0 )
-
- def test_empty_explicit_purge( self ):
-
- WF_ID_NON = 'non_dcworkflow_%s'
- WF_TITLE_NON = 'Non-DCWorkflow #%s'
-
- site = self._initSite()
- wf_tool = site.portal_workflow
-
- for i in range( 4 ):
- nondcworkflow = DummyWorkflow( WF_TITLE_NON % i )
- nondcworkflow.title = WF_TITLE_NON % i
- wf_tool._setObject( WF_ID_NON % i, nondcworkflow )
-
- wf_tool._default_chain = ( WF_ID_NON % 1, )
- wf_tool._chains_by_type[ 'sometype' ] = ( WF_ID_NON % 2, )
- self.assertEqual( len( wf_tool.objectIds() ), 4 )
-
- context = DummyImportContext( site, True )
- context._files[ 'workflows.xml' ] = _EMPTY_TOOL_EXPORT
-
- from Products.CMFSetup.workflow import importWorkflowTool
- importWorkflowTool( context )
-
- self.assertEqual( len( wf_tool.objectIds() ), 0 )
- self.assertEqual( len( wf_tool._default_chain ), 0 )
- self.assertEqual( len( wf_tool._chains_by_type ), 0 )
-
- def test_empty_skip_purge( self ):
-
- WF_ID_NON = 'non_dcworkflow_%s'
- WF_TITLE_NON = 'Non-DCWorkflow #%s'
-
- site = self._initSite()
- wf_tool = site.portal_workflow
-
- for i in range( 4 ):
- nondcworkflow = DummyWorkflow( WF_TITLE_NON % i )
- nondcworkflow.title = WF_TITLE_NON % i
- wf_tool._setObject( WF_ID_NON % i, nondcworkflow )
-
- wf_tool._default_chain = ( WF_ID_NON % 1, )
- wf_tool._chains_by_type[ 'sometype' ] = ( WF_ID_NON % 2, )
- self.assertEqual( len( wf_tool.objectIds() ), 4 )
-
- context = DummyImportContext( site, False )
- context._files[ 'typestool.xml' ] = _EMPTY_TOOL_EXPORT
-
- from Products.CMFSetup.workflow import importWorkflowTool
- importWorkflowTool( context )
-
- self.assertEqual( len( wf_tool.objectIds() ), 4 )
- self.assertEqual( len( wf_tool._default_chain ), 1 )
- self.assertEqual( wf_tool._default_chain[ 0 ], WF_ID_NON % 1 )
- self.assertEqual( len( wf_tool._chains_by_type ), 1 )
- self.assertEqual( wf_tool._chains_by_type[ 'sometype' ]
- , ( WF_ID_NON % 2, )
- )
-
- def test_bindings_skip_purge( self ):
-
- WF_ID_NON = 'non_dcworkflow_%s'
- WF_TITLE_NON = 'Non-DCWorkflow #%s'
-
- site = self._initSite()
- wf_tool = site.portal_workflow
-
- for i in range( 4 ):
- nondcworkflow = DummyWorkflow( WF_TITLE_NON % i )
- nondcworkflow.title = WF_TITLE_NON % i
- wf_tool._setObject( WF_ID_NON % i, nondcworkflow )
-
- wf_tool._default_chain = ( WF_ID_NON % 1, )
- wf_tool._chains_by_type[ 'sometype' ] = ( WF_ID_NON % 2, )
- self.assertEqual( len( wf_tool.objectIds() ), 4 )
-
- context = DummyImportContext( site, False )
- context._files[ 'workflows.xml' ] = _BINDINGS_TOOL_EXPORT
-
- from Products.CMFSetup.workflow import importWorkflowTool
- importWorkflowTool( context )
-
- self.assertEqual( len( wf_tool.objectIds() ), 4 )
- self.assertEqual( len( wf_tool._default_chain ), 2 )
- self.assertEqual( wf_tool._default_chain[ 0 ], WF_ID_NON % 0 )
- self.assertEqual( wf_tool._default_chain[ 1 ], WF_ID_NON % 1 )
- self.assertEqual( len( wf_tool._chains_by_type ), 2 )
- self.assertEqual( wf_tool._chains_by_type[ 'sometype' ]
- , ( WF_ID_NON % 2, )
- )
- self.assertEqual( wf_tool._chains_by_type[ 'anothertype' ]
- , ( WF_ID_NON % 3, )
- )
-
- def test_from_empty_dcworkflow_top_level( self ):
-
- WF_ID = 'dcworkflow_tool'
- WF_TITLE = 'DC Workflow testing tool'
- WF_INITIAL_STATE = 'closed'
-
- tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
-
- self.assertEqual( len( tool.objectIds() ), 1 )
- self.assertEqual( tool.objectIds()[ 0 ], WF_ID )
-
- def test_from_empty_dcworkflow_workflow_attrs( self ):
-
- WF_ID = 'dcworkflow_attrs'
- WF_TITLE = 'DC Workflow testing attrs'
- WF_INITIAL_STATE = 'closed'
-
- tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
-
- workflow = tool.objectValues()[ 0 ]
- self.assertEqual( workflow.meta_type, DCWorkflowDefinition.meta_type )
- self.assertEqual( workflow.title, WF_TITLE )
- self.assertEqual( workflow.state_var, 'state' )
- self.assertEqual( workflow.initial_state, WF_INITIAL_STATE )
-
- def test_from_empty_dcworkflow_workflow_permissions( self ):
-
- WF_ID = 'dcworkflow_permissions'
- WF_TITLE = 'DC Workflow testing permissions'
- WF_INITIAL_STATE = 'closed'
-
- tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
-
- workflow = tool.objectValues()[ 0 ]
-
- permissions = workflow.permissions
- self.assertEqual( len( permissions ), len( _WF_PERMISSIONS ) )
-
- for permission in permissions:
- self.failUnless( permission in _WF_PERMISSIONS )
-
- def test_from_empty_dcworkflow_workflow_variables( self ):
-
- WF_ID = 'dcworkflow_variables'
- WF_TITLE = 'DC Workflow testing variables'
- WF_INITIAL_STATE = 'closed'
-
- tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
-
- workflow = tool.objectValues()[ 0 ]
-
- variables = workflow.variables
-
- self.assertEqual( len( variables.objectItems() )
- , len( _WF_VARIABLES ) )
-
- for id, variable in variables.objectItems():
-
- expected = _WF_VARIABLES[ variable.getId() ]
- self.failUnless( expected[ 0 ] in variable.description )
- self.assertEqual( variable.default_value, expected[ 1 ] )
- self.assertEqual( variable.getDefaultExprText(), expected[ 2 ] )
- self.assertEqual( variable.for_catalog, expected[ 3 ] )
- self.assertEqual( variable.for_status, expected[ 4 ] )
- self.assertEqual( variable.update_always, expected[ 5 ] )
-
- guard = variable.getInfoGuard()
-
- self.assertEqual( guard.permissions, expected[ 6 ] )
- self.assertEqual( guard.roles, expected[ 7 ] )
- self.assertEqual( guard.groups, expected[ 8 ] )
- self.assertEqual( guard.getExprText(), expected[ 9 ] )
-
- def test_from_empty_dcworkflow_workflow_states( self ):
-
- WF_ID = 'dcworkflow_states'
- WF_TITLE = 'DC Workflow testing states'
- WF_INITIAL_STATE = 'closed'
-
- tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
-
- workflow = tool.objectValues()[ 0 ]
-
- states = workflow.states
-
- self.assertEqual( len( states.objectItems() )
- , len( _WF_STATES ) )
-
- for id, state in states.objectItems():
-
- expected = _WF_STATES[ state.getId() ]
- self.assertEqual( state.title, expected[ 0 ] )
- self.failUnless( expected[ 1 ] in state.description )
-
- self.assertEqual( len( state.transitions ), len( expected[ 2 ] ) )
-
- for transition_id in state.transitions:
- self.failUnless( transition_id in expected[ 2 ] )
-
- for permission in state.getManagedPermissions():
-
- p_info = state.getPermissionInfo( permission )
- p_expected = expected[ 3 ].get( permission, [] )
-
- self.assertEqual( bool( p_info[ 'acquired' ] )
- , isinstance(p_expected, list) )
-
- self.assertEqual( len( p_info[ 'roles' ] ), len( p_expected ) )
-
- for role in p_info[ 'roles' ]:
- self.failIf( role not in p_expected )
-
- group_roles = state.group_roles or {}
- self.assertEqual( len( group_roles ), len( expected[ 4 ] ) )
-
- for group_id, exp_roles in expected[ 4 ]:
-
- self.assertEqual( len( state.getGroupInfo( group_id ) )
- , len( exp_roles ) )
-
- for role in state.getGroupInfo( group_id ):
- self.failUnless( role in exp_roles )
-
- self.assertEqual( len( state.getVariableValues() )
- , len( expected[ 5 ] ) )
-
- for var_id, value in state.getVariableValues():
-
- self.assertEqual( value, expected[ 5 ][ var_id ] )
-
- def test_from_empty_dcworkflow_workflow_transitions( self ):
-
- WF_ID = 'dcworkflow_transitions'
- WF_TITLE = 'DC Workflow testing transitions'
- WF_INITIAL_STATE = 'closed'
-
- tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
-
- workflow = tool.objectValues()[ 0 ]
-
- transitions = workflow.transitions
-
- self.assertEqual( len( transitions.objectItems() )
- , len( _WF_TRANSITIONS ) )
-
- for id, transition in transitions.objectItems():
-
- expected = _WF_TRANSITIONS[ transition.getId() ]
- self.assertEqual( transition.title, expected[ 0 ] )
- self.failUnless( expected[ 1 ] in transition.description )
- self.assertEqual( transition.new_state_id, expected[ 2 ] )
- self.assertEqual( transition.trigger_type, expected[ 3 ] )
- self.assertEqual( transition.script_name, expected[ 4 ] )
- self.assertEqual( transition.after_script_name, expected[ 5 ] )
- self.assertEqual( transition.actbox_name, expected[ 6 ] )
- self.assertEqual( transition.actbox_url, expected[ 7 ] )
- self.assertEqual( transition.actbox_category, expected[ 8 ] )
-
- var_exprs = transition.var_exprs
-
- self.assertEqual( len( var_exprs ), len( expected[ 9 ] ) )
-
- for var_id, expr in var_exprs.items():
- self.assertEqual( expr, expected[ 9 ][ var_id ] )
-
- guard = transition.getGuard()
-
- self.assertEqual( guard.permissions, expected[ 10 ] )
- self.assertEqual( guard.roles, expected[ 11 ] )
- self.assertEqual( guard.groups, expected[ 12 ] )
- self.assertEqual( guard.getExprText(), expected[ 13 ] )
-
- def test_from_empty_dcworkflow_workflow_worklists( self ):
-
- WF_ID = 'dcworkflow_worklists'
- WF_TITLE = 'DC Workflow testing worklists'
- WF_INITIAL_STATE = 'closed'
-
- tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
-
- workflow = tool.objectValues()[ 0 ]
-
- worklists = workflow.worklists
-
- self.assertEqual( len( worklists.objectItems() )
- , len( _WF_WORKLISTS ) )
-
- for id, worklist in worklists.objectItems():
-
- expected = _WF_WORKLISTS[ worklist.getId() ]
- self.failUnless( expected[ 1 ] in worklist.description )
-
- var_matches = worklist.var_matches
-
- self.assertEqual( len( var_matches ), len( expected[ 2 ] ) )
-
- for var_id, values in var_matches.items():
- exp_values = expected[ 2 ][ var_id ]
- self.assertEqual( len( values ), len( exp_values ) )
-
- for value in values:
- self.failUnless( value in exp_values, values )
-
- self.assertEqual( worklist.actbox_name, expected[ 3 ] )
- self.assertEqual( worklist.actbox_url, expected[ 4 ] )
- self.assertEqual( worklist.actbox_category, expected[ 5 ] )
-
- guard = worklist.getGuard()
-
- self.assertEqual( guard.permissions, expected[ 6 ] )
- self.assertEqual( guard.roles, expected[ 7 ] )
- self.assertEqual( guard.groups, expected[ 8 ] )
- self.assertEqual( guard.getExprText(), expected[ 9 ] )
-
- def test_from_old_dcworkflow_workflow_scripts( self ):
-
- WF_ID = 'old_dcworkflow_scripts'
- WF_TITLE = 'Old DC Workflow testing scripts'
- WF_INITIAL_STATE = 'closed'
-
- tool = self._importOldWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
-
- workflow = tool.objectValues()[ 0 ]
-
- scripts = workflow.scripts
-
- self.assertEqual( len( scripts.objectItems() )
- , len( _WF_SCRIPTS ) )
-
- for script_id, script in scripts.objectItems():
-
- expected = _WF_SCRIPTS[ script_id ]
-
- self.assertEqual( script.meta_type, expected[ 0 ] )
-
- if script.meta_type == PythonScript.meta_type:
- self.assertEqual( script.manage_FTPget(), expected[ 1 ] )
-
- def test_from_empty_dcworkflow_workflow_scripts( self ):
-
- WF_ID = 'dcworkflow_scripts'
- WF_TITLE = 'DC Workflow testing scripts'
- WF_INITIAL_STATE = 'closed'
-
- tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
-
- workflow = tool.objectValues()[ 0 ]
-
- scripts = workflow.scripts
-
- self.assertEqual( len( scripts.objectItems() )
- , len( _WF_SCRIPTS ) )
-
- for script_id, script in scripts.objectItems():
-
- expected = _WF_SCRIPTS[ script_id ]
-
- self.assertEqual( script.meta_type, expected[ 0 ] )
-
- if script.meta_type == PythonScript.meta_type:
- self.assertEqual( script.manage_FTPget(), expected[ 1 ] )
-
-def test_suite():
- return unittest.TestSuite((
- unittest.makeSuite( WorkflowToolConfiguratorTests ),
- unittest.makeSuite( WorkflowDefinitionConfiguratorTests ),
- unittest.makeSuite( Test_exportWorkflow ),
- unittest.makeSuite( Test_importWorkflow ),
- ))
-
-if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')
Modified: CMF/branches/yuppie-workflow_setup/CMFSetup/utils.py
===================================================================
--- CMF/branches/yuppie-workflow_setup/CMFSetup/utils.py 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/CMFSetup/utils.py 2005-11-22 16:12:02 UTC (rev 40321)
@@ -352,92 +352,3 @@
ExportConfiguratorBase.__init__(self, site, encoding)
InitializeClass(ConfiguratorBase)
-
-
-#
-# deprecated DOM parsing utilities
-#
-_marker = object()
-
-def _queryNodeAttribute( node, attr_name, default, encoding=None ):
-
- """ Extract a string-valued attribute from node.
-
- o Return 'default' if the attribute is not present.
- """
- attr_node = node.attributes.get( attr_name, _marker )
-
- if attr_node is _marker:
- return default
-
- value = attr_node.nodeValue
-
- if encoding is not None:
- value = value.encode( encoding )
-
- return value
-
-def _getNodeAttribute( node, attr_name, encoding=None ):
-
- """ Extract a string-valued attribute from node.
- """
- value = _queryNodeAttribute( node, attr_name, _marker, encoding )
-
- if value is _marker:
- raise ValueError, 'Invalid attribute: %s' % attr_name
-
- return value
-
-def _queryNodeAttributeBoolean( node, attr_name, default ):
-
- """ Extract a string-valued attribute from node.
-
- o Return 'default' if the attribute is not present.
- """
- attr_node = node.attributes.get( attr_name, _marker )
-
- if attr_node is _marker:
- return default
-
- value = node.attributes[ attr_name ].nodeValue.lower()
-
- return value in ( 'true', 'yes', '1' )
-
-def _getNodeAttributeBoolean( node, attr_name ):
-
- """ Extract a string-valued attribute from node.
- """
- value = node.attributes[ attr_name ].nodeValue.lower()
-
- return value in ( 'true', 'yes', '1' )
-
-def _coalesceTextNodeChildren( node, encoding=None ):
-
- """ Concatenate all childe text nodes into a single string.
- """
- from xml.dom import Node
- fragments = []
- node.normalize()
- child = node.firstChild
-
- while child is not None:
-
- if child.nodeType == Node.TEXT_NODE:
- fragments.append( child.nodeValue )
-
- child = child.nextSibling
-
- joined = ''.join( fragments )
-
- if encoding is not None:
- joined = joined.encode( encoding )
-
- return ''.join( [ line.lstrip() for line in joined.splitlines(True) ] )
-
-def _extractDescriptionNode(parent, encoding=None):
-
- d_nodes = parent.getElementsByTagName('description')
- if d_nodes:
- return _coalesceTextNodeChildren(d_nodes[0], encoding)
- else:
- return ''
Modified: CMF/branches/yuppie-workflow_setup/CMFSetup/workflow.py
===================================================================
--- CMF/branches/yuppie-workflow_setup/CMFSetup/workflow.py 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/CMFSetup/workflow.py 2005-11-22 16:12:02 UTC (rev 40321)
@@ -10,1386 +10,10 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-""" Classes: WorkflowConfigurator
+"""Workflow tool setup handlers.
$Id$
"""
-import re
-from xml.dom.minidom import parseString as domParseString
-
-from AccessControl import ClassSecurityInfo
-from Acquisition import Implicit
-from Globals import InitializeClass
-from Products.PageTemplates.PageTemplateFile import PageTemplateFile
-
-from Products.CMFCore.utils import getToolByName
-from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition
-
-from permissions import ManagePortal
-from utils import _coalesceTextNodeChildren
-from utils import _extractDescriptionNode
-from utils import _getNodeAttribute
-from utils import _getNodeAttributeBoolean
-from utils import _queryNodeAttribute
-from utils import _xmldir
-from utils import ConfiguratorBase
-from utils import CONVERTER, DEFAULT, KEY
-
-
-TRIGGER_TYPES = ( 'AUTOMATIC', 'USER' )
-
-#
-# Configurator entry points
-#
-_FILENAME = 'workflows.xml'
-
-def importWorkflowTool( context ):
-
- """ Import worflow tool and contained workflow definitions.
-
- o 'context' must implement IImportContext.
-
- o Register via Python:
-
- registry = site.portal_setup.getImportStepRegistry()
- registry.registerStep( 'importWorkflowTool'
- , '20040602-01'
- , Products.CMFSetup.workflow.importWorkflowTool
- , ()
- , 'Workflow import'
- , 'Import worflow tool and contained workflow '
- 'definitions.'
- )
-
- o Register via XML:
-
- <setup-step id="importWorkflowTool"
- version="20040602-01"
- handler="Products.CMFSetup.workflow.importWorkflowTool"
- title="Workflow import"
- >Import worflow tool and contained workflow definitions.</setup-step>
-
- """
- site = context.getSite()
- encoding = context.getEncoding()
- tool = getToolByName( site, 'portal_workflow' )
-
- if context.shouldPurge():
-
- tool.setDefaultChain( '' )
- if tool._chains_by_type is not None:
- tool._chains_by_type.clear()
-
- for workflow_id in tool.getWorkflowIds():
- tool._delObject( workflow_id )
-
- text = context.readDataFile( _FILENAME )
-
- if text is not None:
-
- wftc = WorkflowToolConfigurator( site, encoding )
- tool_info = wftc.parseXML( text )
-
- wfdc = WorkflowDefinitionConfigurator( site )
-
- for info in tool_info[ 'workflows' ]:
-
- if info[ 'meta_type' ] == DCWorkflowDefinition.meta_type:
-
- filename = info[ 'filename' ]
- sep = filename.rfind( '/' )
- if sep == -1:
- wf_text = context.readDataFile( filename )
- else:
- wf_text = context.readDataFile( filename[sep+1:],
- filename[:sep] )
-
- ( workflow_id
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- ) = wfdc.parseWorkflowXML( wf_text, encoding )
-
- workflow_id = str( workflow_id ) # No unicode!
-
- tool._setObject( workflow_id
- , DCWorkflowDefinition( workflow_id ) )
-
- workflow = tool._getOb( workflow_id )
-
- _initDCWorkflow( workflow
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- , context
- )
- else:
- pass # TODO: handle non-DCWorkflows
-
- for type_id, workflow_ids in tool_info[ 'bindings' ].items():
-
- chain = ','.join( workflow_ids )
- if type_id is None:
- tool.setDefaultChain( chain )
- else:
- tool.setChainForPortalTypes( ( type_id, ), chain )
-
- return 'Workflows imported.'
-
-
-def exportWorkflowTool( context ):
-
- """ Export worflow tool and contained workflow definitions as an XML file.
-
- o 'context' must implement IExportContext.
-
- o Register via Python:
-
- registry = site.portal_setup.getExportStepRegistry()
- registry.registerStep( 'exportWorkflowTool'
- , Products.CMFSetup.workflow.exportWorkflowTool
- , 'Workflow export'
- , 'Export worflow tool and contained workflow '
- 'definitions.'
- )
-
- o Register via XML:
-
- <export-script id="exportWorkflowTool"
- version="20040518-01"
- handler="Products.CMFSetup.workflow.exportWorkflowTool"
- title="Workflow export"
- >Export worflow tool and contained workflow definitions.</export-script>
-
- """
- site = context.getSite()
- wftc = WorkflowToolConfigurator( site ).__of__( site )
- wfdc = WorkflowDefinitionConfigurator( site ).__of__( site )
- wf_tool = getToolByName( site, 'portal_workflow' )
- text = wftc.generateXML()
-
- context.writeDataFile( _FILENAME, text, 'text/xml' )
-
- for wf_id in wf_tool.getWorkflowIds():
-
- wf_dirname = wf_id.replace( ' ', '_' )
- wf_xml = wfdc.generateWorkflowXML( wf_id )
- wf_scripts = wfdc.getWorkflowScripts(wf_id)
-
- if wf_xml is not None:
- context.writeDataFile( 'definition.xml'
- , wf_xml
- , 'text/xml'
- , 'workflows/%s' % wf_dirname
- )
- for script_info in wf_scripts:
- if script_info['filename']:
- context.writeDataFile(script_info['filename'],
- script_info['body'],
- 'text/plain')
-
- return 'Workflows exported.'
-
-
-class WorkflowToolConfigurator(ConfiguratorBase):
- """ Synthesize XML description of site's workflow tool.
- """
- security = ClassSecurityInfo()
-
- security.declareProtected(ManagePortal, 'getWorkflowInfo')
- def getWorkflowInfo(self, workflow_id):
- """ Return a mapping describing a given workflow.
- """
- workflow_tool = getToolByName( self._site, 'portal_workflow' )
- workflow = workflow_tool.getWorkflowById( workflow_id )
-
- workflow_info = { 'id' : workflow_id
- , 'meta_type' : workflow.meta_type
- , 'title' : workflow.title_or_id()
- }
-
- if workflow.meta_type == DCWorkflowDefinition.meta_type:
- workflow_info['filename'] = _getWorkflowFilename(workflow_id)
-
- return workflow_info
-
- security.declareProtected( ManagePortal, 'listWorkflowInfo' )
- def listWorkflowInfo( self ):
-
- """ Return a sequence of mappings for each workflow in the tool.
-
- o See 'getWorkflowInfo' for definition of the mappings.
- """
- workflow_tool = getToolByName( self._site, 'portal_workflow' )
- return [ self.getWorkflowInfo( workflow_id )
- for workflow_id in workflow_tool.getWorkflowIds() ]
-
- security.declareProtected( ManagePortal, 'listWorkflowChains' )
- def listWorkflowChains( self ):
-
- """ Return a sequence of tuples binding workflows to each content type.
-
- o Tuples are in the format, '( type_id, [ workflow_id ] )'.
-
- o The default chain will be first in the list, with None for the
- 'type_id'.
-
- o The list will only include type-specific chains for types which
- do not use the default chain.
- """
- workflow_tool = getToolByName( self._site, 'portal_workflow' )
-
- result = [ ( None, workflow_tool._default_chain ) ]
- if workflow_tool._chains_by_type is None:
- overrides = []
- else:
- overrides = workflow_tool._chains_by_type.items()
- overrides.sort()
-
- result.extend( overrides )
-
- return result
-
- def _getExportTemplate(self):
-
- return PageTemplateFile('wtcToolExport.xml', _xmldir)
-
- def _getImportMapping(self):
-
- return {
- 'workflow-tool':
- { 'workflow': {KEY: 'workflows', DEFAULT: (),
- CONVERTER: self._convertWorkflows},
- 'bindings': {CONVERTER: self._convertBindings} },
- 'workflow':
- { 'workflow_id': {},
- 'meta_type': {DEFAULT: '%(workflow_id)s'},
- 'filename': {DEFAULT: '%(workflow_id)s'} },
- 'bindings':
- { 'default': {KEY: 'bindings'},
- 'type': {KEY: 'bindings'} },
- 'default':
- { 'type_id': {DEFAULT: None},
- 'bound-workflow': {KEY: 'bound_workflows', DEFAULT: ()} },
- 'type':
- { 'type_id': {DEFAULT: None},
- 'bound-workflow': {KEY: 'bound_workflows', DEFAULT: ()} },
- 'bound-workflow':
- { 'workflow_id': {KEY: None} } }
-
- def _convertWorkflows(self, val):
-
- for wf in val:
- if wf['meta_type'] == DCWorkflowDefinition.meta_type:
- if wf['filename'] == wf['workflow_id']:
- wf['filename'] = _getWorkflowFilename( wf['filename'] )
- else:
- wf['filename'] = None
-
- return val
-
- def _convertBindings(self, val):
- if not val[0]:
- return {}
-
- result = {}
-
- for binding in val[0]['bindings']:
- result[ binding['type_id'] ] = binding['bound_workflows']
-
- return result
-
-InitializeClass(WorkflowToolConfigurator)
-
-
-class WorkflowDefinitionConfigurator( Implicit ):
- """ Synthesize XML description of site's workflows.
- """
- security = ClassSecurityInfo()
-
- def __init__( self, site ):
- self._site = site
-
- security.declareProtected( ManagePortal, 'getWorkflowInfo' )
- def getWorkflowInfo( self, workflow_id ):
-
- """ Return a mapping describing a given workflow.
-
- o Keys in the mappings:
-
- 'id' -- the ID of the workflow within the tool
-
- 'meta_type' -- the workflow's meta_type
-
- 'title' -- the workflow's title property
-
- o See '_extractDCWorkflowInfo' below for keys present only for
- DCWorkflow definitions.
-
- """
- workflow_tool = getToolByName( self._site, 'portal_workflow' )
- workflow = workflow_tool.getWorkflowById( workflow_id )
-
- workflow_info = { 'id' : workflow_id
- , 'meta_type' : workflow.meta_type
- , 'title' : workflow.title_or_id()
- }
-
- if workflow.meta_type == DCWorkflowDefinition.meta_type:
- self._extractDCWorkflowInfo( workflow, workflow_info )
-
- return workflow_info
-
-
- security.declareProtected( ManagePortal, 'generateWorkflowXML' )
- def generateWorkflowXML( self, workflow_id ):
-
- """ Pseudo API.
- """
- info = self.getWorkflowInfo( workflow_id )
-
- if info[ 'meta_type' ] != DCWorkflowDefinition.meta_type:
- return None
-
- return self._workflowConfig( workflow_id=workflow_id )
-
- security.declareProtected( ManagePortal, 'generateWorkflowScripts' )
- def getWorkflowScripts( self, workflow_id ):
- """ Get workflow scripts information
- """
- workflow_tool = getToolByName( self._site, 'portal_workflow' )
- workflow = workflow_tool.getWorkflowById( workflow_id )
-
- if workflow.meta_type != DCWorkflowDefinition.meta_type:
- return []
-
- scripts = self._extractScripts(workflow)
- return scripts
-
-
- security.declareProtected( ManagePortal, 'parseWorkflowXML' )
- def parseWorkflowXML( self, xml, encoding=None ):
-
- """ Pseudo API.
- """
- dom = domParseString( xml )
-
- root = dom.getElementsByTagName( 'dc-workflow' )[ 0 ]
-
- workflow_id = _getNodeAttribute( root, 'workflow_id', encoding )
- title = _getNodeAttribute( root, 'title', encoding )
- state_variable = _getNodeAttribute( root, 'state_variable', encoding )
- initial_state = _getNodeAttribute( root, 'initial_state', encoding )
-
- states = _extractStateNodes( root, encoding )
- transitions = _extractTransitionNodes( root, encoding )
- variables = _extractVariableNodes( root, encoding )
- worklists = _extractWorklistNodes( root, encoding )
- permissions = _extractPermissionNodes( root, encoding )
- scripts = _extractScriptNodes( root, encoding )
-
- return ( workflow_id
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- )
-
- security.declarePrivate( '_workflowConfig' )
- _workflowConfig = PageTemplateFile( 'wtcWorkflowExport.xml'
- , _xmldir
- , __name__='workflowConfig'
- )
-
- security.declarePrivate( '_extractDCWorkflowInfo' )
- def _extractDCWorkflowInfo( self, workflow, workflow_info ):
-
- """ Append the information for a 'workflow' into 'workflow_info'
-
- o 'workflow' must be a DCWorkflowDefinition instance.
-
- o 'workflow_info' must be a dictionary.
-
- o The following keys will be added to 'workflow_info':
-
- 'permissions' -- a list of names of permissions managed
- by the workflow
-
- 'state_variable' -- the name of the workflow's "main"
- state variable
-
- 'initial_state' -- the name of the state in the workflow
- in which objects start their lifecycle.
-
- 'variable_info' -- a list of mappings describing the
- variables tracked by the workflow (see '_extractVariables').
-
- 'state_info' -- a list of mappings describing the
- states tracked by the workflow (see '_extractStates').
-
- 'transition_info' -- a list of mappings describing the
- transitions tracked by the workflow (see '_extractTransitions').
-
- 'worklist_info' -- a list of mappings describing the
- worklists tracked by the workflow (see '_extractWorklists').
-
- 'script_info' -- a list of mappings describing the scripts which
- provide added business logic (see '_extractScripts').
- """
- workflow_info[ 'filename' ] = _getWorkflowFilename( workflow.getId() )
- workflow_info[ 'state_variable' ] = workflow.state_var
- workflow_info[ 'initial_state' ] = workflow.initial_state
- workflow_info[ 'permissions' ] = workflow.permissions
- workflow_info[ 'variable_info' ] = self._extractVariables( workflow )
- workflow_info[ 'state_info' ] = self._extractStates( workflow )
- workflow_info[ 'transition_info' ] = self._extractTransitions(
- workflow )
- workflow_info[ 'worklist_info' ] = self._extractWorklists( workflow )
- workflow_info[ 'script_info' ] = self._extractScripts( workflow )
-
- security.declarePrivate( '_extractVariables' )
- def _extractVariables( self, workflow ):
-
- """ Return a sequence of mappings describing DCWorkflow variables.
-
- o Keys for each mapping will include:
-
- 'id' -- the variable's ID
-
- 'description' -- a textual description of the variable
-
- 'for_catalog' -- whether to catalog this variable
-
- 'for_status' -- whether to ??? this variable (XXX)
-
- 'update_always' -- whether to update this variable whenever
- executing a transition (xxX)
-
- 'default_value' -- a default value for the variable (XXX)
-
- 'default_expression' -- a TALES expression for the default value
-
- 'guard_permissions' -- a list of permissions guarding access
- to the variable
-
- 'guard_roles' -- a list of roles guarding access
- to the variable
-
- 'guard_groups' -- a list of groups guarding the transition
-
- 'guard_expr' -- an expression guarding access to the variable
- """
- result = []
-
- items = workflow.variables.objectItems()
- items.sort()
-
- for k, v in items:
-
- guard = v.getInfoGuard()
-
- default_type = _guessVariableType( v.default_value )
-
- info = { 'id' : k
- , 'description' : v.description
- , 'for_catalog' : bool( v.for_catalog )
- , 'for_status' : bool( v.for_status )
- , 'update_always' : bool( v.update_always )
- , 'default_value' : v.default_value
- , 'default_type' : default_type
- , 'default_expr' : v.getDefaultExprText()
- , 'guard_permissions' : guard.permissions
- , 'guard_roles' : guard.roles
- , 'guard_groups' : guard.groups
- , 'guard_expr' : guard.getExprText()
- }
-
- result.append( info )
-
- return result
-
- security.declarePrivate( '_extractStates' )
- def _extractStates( self, workflow ):
-
- """ Return a sequence of mappings describing DCWorkflow states.
-
- o Within the workflow mapping, each 'state_info' mapping has keys:
-
- 'id' -- the state's ID
-
- 'title' -- the state's title
-
- 'description' -- the state's description
-
- 'transitions' -- a list of IDs of transitions out of the state
-
- 'permissions' -- a list of mappings describing the permission
- map for the state
-
- 'groups' -- a list of ( group_id, (roles,) ) tuples describing the
- group-role assignments for the state
-
- 'variables' -- a list of mapping for the variables
- to be set when entering the state.
-
- o Within the state_info mappings, each 'permissions' mapping
- has the keys:
-
- 'name' -- the name of the permission
-
- 'roles' -- a sequence of role IDs which have the permission
-
- 'acquired' -- whether roles are acquired for the permission
-
- o Within the state_info mappings, each 'variable' mapping
- has the keys:
-
- 'name' -- the name of the variable
-
- 'type' -- the type of the value (allowed values are:
- 'string', 'datetime', 'bool', 'int')
-
- 'value' -- the value to be set
- """
- result = []
-
- items = workflow.states.objectItems()
- items.sort()
-
- for k, v in items:
-
- groups = v.group_roles and list( v.group_roles.items() ) or []
- groups = [ x for x in groups if x[1] ]
- groups.sort()
-
- variables = list( v.getVariableValues() )
- variables.sort()
-
- v_info = []
-
- for v_name, value in variables:
- v_info.append( { 'name' : v_name
- , 'type' :_guessVariableType( value )
- , 'value' : value
- } )
-
- info = { 'id' : k
- , 'title' : v.title
- , 'description' : v.description
- , 'transitions' : v.transitions
- , 'permissions' : self._extractStatePermissions( v )
- , 'groups' : groups
- , 'variables' : v_info
- }
-
- result.append( info )
-
- return result
-
- security.declarePrivate( '_extractStatePermissions' )
- def _extractStatePermissions( self, state ):
-
- """ Return a sequence of mappings for the permissions in a state.
-
- o Each mapping has the keys:
-
- 'name' -- the name of the permission
-
- 'roles' -- a sequence of role IDs which have the permission
-
- 'acquired' -- whether roles are acquired for the permission
- """
- result = []
-
- items = state.permission_roles.items()
- items.sort()
-
- for k, v in items:
-
- result.append( { 'name' : k
- , 'roles' : v
- , 'acquired' : not isinstance( v, tuple )
- } )
-
- return result
-
-
- security.declarePrivate( '_extractTransitions' )
- def _extractTransitions( self, workflow ):
-
- """ Return a sequence of mappings describing DCWorkflow transitions.
-
- o Each mapping has the keys:
-
- 'id' -- the transition's ID
-
- 'title' -- the transition's ID
-
- 'description' -- the transition's description
-
- 'new_state_id' -- the ID of the state into which the transition
- moves an object
-
- 'trigger_type' -- one of the following values, indicating how the
- transition is fired:
-
- - "AUTOMATIC" -> fired opportunistically whenever the workflow
- notices that its guard conditions permit
-
- - "USER" -> fired in response to user request
-
- 'script_name' -- the ID of a script to be executed before
- the transition
-
- 'after_script_name' -- the ID of a script to be executed after
- the transition
-
- 'actbox_name' -- the name of the action by which the user
- triggers the transition
-
- 'actbox_url' -- the URL of the action by which the user
- triggers the transition
-
- 'actbox_category' -- the category of the action by which the user
- triggers the transition
-
- 'variables' -- a list of ( id, expr ) tuples defining how variables
- are to be set during the transition
-
- 'guard_permissions' -- a list of permissions guarding the transition
-
- 'guard_roles' -- a list of roles guarding the transition
-
- 'guard_groups' -- a list of groups guarding the transition
-
- 'guard_expr' -- an expression guarding the transition
-
- """
- result = []
-
- items = workflow.transitions.objectItems()
- items.sort()
-
- for k, v in items:
-
- guard = v.getGuard()
-
- v_info = []
-
- for v_name, expr in v.getVariableExprs():
- v_info.append( { 'name' : v_name, 'expr' : expr } )
-
- info = { 'id' : k
- , 'title' : v.title
- , 'description' : v.description
- , 'new_state_id' : v.new_state_id
- , 'trigger_type' : TRIGGER_TYPES[ v.trigger_type ]
- , 'script_name' : v.script_name
- , 'after_script_name' : v.after_script_name
- , 'actbox_name' : v.actbox_name
- , 'actbox_url' : v.actbox_url
- , 'actbox_category' : v.actbox_category
- , 'variables' : v_info
- , 'guard_permissions' : guard.permissions
- , 'guard_roles' : guard.roles
- , 'guard_groups' : guard.groups
- , 'guard_expr' : guard.getExprText()
- }
-
- result.append( info )
-
- return result
-
- security.declarePrivate( '_extractWorklists' )
- def _extractWorklists( self, workflow ):
-
- """ Return a sequence of mappings describing DCWorkflow transitions.
-
- o Each mapping has the keys:
-
- 'id' -- the ID of the worklist
-
- 'title' -- the title of the worklist
-
- 'description' -- a textual description of the worklist
-
- 'var_match' -- a list of ( key, value ) tuples defining
- the variables used to "activate" the worklist.
-
- 'actbox_name' -- the name of the "action" corresponding to the
- worklist
-
- 'actbox_url' -- the URL of the "action" corresponding to the
- worklist
-
- 'actbox_category' -- the category of the "action" corresponding
- to the worklist
-
- 'guard_permissions' -- a list of permissions guarding access
- to the worklist
-
- 'guard_roles' -- a list of roles guarding access
- to the worklist
-
- 'guard_expr' -- an expression guarding access to the worklist
-
- """
- result = []
-
- items = workflow.worklists.objectItems()
- items.sort()
-
- for k, v in items:
-
- guard = v.getGuard()
-
- var_match = [ ( id, v.getVarMatchText( id ) )
- for id in v.getVarMatchKeys() ]
-
- info = { 'id' : k
- , 'title' : v.title
- , 'description' : v.description
- , 'var_match' : var_match
- , 'actbox_name' : v.actbox_name
- , 'actbox_url' : v.actbox_url
- , 'actbox_category' : v.actbox_category
- , 'guard_permissions' : guard.permissions
- , 'guard_roles' : guard.roles
- , 'guard_groups' : guard.groups
- , 'guard_expr' : guard.getExprText()
- }
-
- result.append( info )
-
- return result
-
- security.declarePrivate( '_extractScripts' )
- def _extractScripts( self, workflow ):
-
- """ Return a sequence of mappings describing DCWorkflow scripts.
-
- o Each mapping has the keys:
-
- 'id' -- the ID of the script
-
- 'meta_type' -- the title of the worklist
-
- 'body' -- the text of the script (only applicable to scripts
- of type Script (Python))
-
- 'module' -- The module from where to load the function (only
- applicable to External Method scripts)
-
- 'function' -- The function to load from the 'module' given
- (Only applicable to External Method scripts)
-
- 'filename' -- the name of the file to / from which the script
- is stored / loaded (Script (Python) only)
- """
- result = []
-
- items = workflow.scripts.objectItems()
- items.sort()
-
- for k, v in items:
-
- filename = _getScriptFilename( workflow.getId(), k, v.meta_type )
- body = ''
- module = ''
- function = ''
-
- if v.meta_type == 'Script (Python)':
- body = v.read()
-
- if v.meta_type == 'External Method':
- module = v.module()
- function = v.function()
-
- info = { 'id' : k
- , 'meta_type' : v.meta_type
- , 'body' : body
- , 'module' : module
- , 'function' : function
- , 'filename' : filename
- }
-
- result.append( info )
-
- return result
-
-InitializeClass( WorkflowDefinitionConfigurator )
-
-
-def _getWorkflowFilename( workflow_id ):
-
- """ Return the name of the file which holds info for a given workflow.
- """
- return 'workflows/%s/definition.xml' % workflow_id.replace( ' ', '_' )
-
-def _getScriptFilename( workflow_id, script_id, meta_type ):
-
- """ Return the name of the file which holds the script.
- """
- wf_dir = workflow_id.replace( ' ', '_' )
- suffix = _METATYPE_SUFFIXES.get(meta_type, None)
-
- if suffix is None:
- return ''
-
- return 'workflows/%s/scripts/%s.%s' % ( wf_dir, script_id, suffix )
-
-def _extractStateNodes( root, encoding=None ):
-
- result = []
-
- for s_node in root.getElementsByTagName( 'state' ):
-
- info = { 'state_id' : _getNodeAttribute( s_node, 'state_id', encoding )
- , 'title' : _getNodeAttribute( s_node, 'title', encoding )
- , 'description' : _extractDescriptionNode( s_node, encoding )
- }
-
- info[ 'transitions' ] = [ _getNodeAttribute( x, 'transition_id'
- , encoding )
- for x in s_node.getElementsByTagName(
- 'exit-transition' ) ]
-
- info[ 'permissions' ] = permission_map = {}
-
- for p_map in s_node.getElementsByTagName( 'permission-map' ):
-
- name = _getNodeAttribute( p_map, 'name', encoding )
- acquired = _getNodeAttributeBoolean( p_map, 'acquired' )
-
- roles = [ _coalesceTextNodeChildren( x, encoding )
- for x in p_map.getElementsByTagName(
- 'permission-role' ) ]
-
- if not acquired:
- roles = tuple( roles )
-
- permission_map[ name ] = roles
-
- info[ 'groups' ] = group_map = []
-
- for g_map in s_node.getElementsByTagName( 'group-map' ):
-
- name = _getNodeAttribute( g_map, 'name', encoding )
-
- roles = [ _coalesceTextNodeChildren( x, encoding )
- for x in g_map.getElementsByTagName(
- 'group-role' ) ]
-
- group_map.append( ( name, tuple( roles ) ) )
-
- info[ 'variables' ] = var_map = {}
-
- for assignment in s_node.getElementsByTagName( 'assignment' ):
-
- name = _getNodeAttribute( assignment, 'name', encoding )
- type_id = _getNodeAttribute( assignment, 'type', encoding )
- value = _coalesceTextNodeChildren( assignment, encoding )
-
- var_map[ name ] = { 'name' : name
- , 'type' : type_id
- , 'value' : value
- }
-
- result.append( info )
-
- return result
-
-def _extractTransitionNodes( root, encoding=None ):
-
- result = []
-
- for t_node in root.getElementsByTagName( 'transition' ):
-
- info = { 'transition_id' : _getNodeAttribute( t_node, 'transition_id'
- , encoding )
- , 'title' : _getNodeAttribute( t_node, 'title', encoding )
- , 'description' : _extractDescriptionNode( t_node, encoding )
- , 'new_state' : _getNodeAttribute( t_node, 'new_state'
- , encoding )
- , 'trigger' : _getNodeAttribute( t_node, 'trigger', encoding )
- , 'before_script' : _getNodeAttribute( t_node, 'before_script'
- , encoding )
- , 'after_script' : _getNodeAttribute( t_node, 'after_script'
- , encoding )
- , 'action' : _extractActionNode( t_node, encoding )
- , 'guard' : _extractGuardNode( t_node, encoding )
- }
-
- info[ 'variables' ] = var_map = {}
-
- for assignment in t_node.getElementsByTagName( 'assignment' ):
-
- name = _getNodeAttribute( assignment, 'name', encoding )
- expr = _coalesceTextNodeChildren( assignment, encoding )
- var_map[ name ] = expr
-
- result.append( info )
-
- return result
-
-def _extractVariableNodes( root, encoding=None ):
-
- result = []
-
- for v_node in root.getElementsByTagName( 'variable' ):
-
- info = { 'variable_id' : _getNodeAttribute( v_node, 'variable_id'
- , encoding )
- , 'description' : _extractDescriptionNode( v_node, encoding )
- , 'for_catalog' : _getNodeAttributeBoolean( v_node
- , 'for_catalog'
- )
- , 'for_status' : _getNodeAttributeBoolean( v_node
- , 'for_status'
- )
- , 'update_always' : _getNodeAttributeBoolean( v_node
- , 'update_always'
- )
- , 'default' : _extractDefaultNode( v_node, encoding )
- , 'guard' : _extractGuardNode( v_node, encoding )
- }
-
- result.append( info )
-
- return result
-
-def _extractWorklistNodes( root, encoding=None ):
-
- result = []
-
- for w_node in root.getElementsByTagName( 'worklist' ):
-
- info = { 'worklist_id' : _getNodeAttribute( w_node, 'worklist_id'
- , encoding )
- , 'title' : _getNodeAttribute( w_node, 'title' , encoding )
- , 'description' : _extractDescriptionNode( w_node, encoding )
- , 'match' : _extractMatchNode( w_node, encoding )
- , 'action' : _extractActionNode( w_node, encoding )
- , 'guard' : _extractGuardNode( w_node, encoding )
- }
-
- result.append( info )
-
- return result
-
-def _extractScriptNodes( root, encoding=None ):
-
- result = []
-
- for s_node in root.getElementsByTagName( 'script' ):
-
- try:
- function = _getNodeAttribute( s_node, 'function' )
- except ValueError:
- function = ''
-
- try:
- module = _getNodeAttribute( s_node, 'module' )
- except ValueError:
- module = ''
-
- info = { 'script_id' : _getNodeAttribute( s_node, 'script_id' )
- , 'meta_type' : _getNodeAttribute( s_node, 'type' , encoding )
- , 'function' : function
- , 'module' : module
- }
-
- filename = _queryNodeAttribute( s_node, 'filename' , None, encoding )
-
- if filename is not None:
- info[ 'filename' ] = filename
-
- result.append( info )
-
- return result
-
-def _extractPermissionNodes( root, encoding=None ):
-
- result = []
-
- for p_node in root.getElementsByTagName( 'permission' ):
-
- result.append( _coalesceTextNodeChildren( p_node, encoding ) )
-
- return result
-
-def _extractActionNode( parent, encoding=None ):
-
- nodes = parent.getElementsByTagName( 'action' )
- assert len( nodes ) <= 1, nodes
-
- if len( nodes ) < 1:
- return { 'name' : '', 'url' : '', 'category' : '' }
-
- node = nodes[ 0 ]
-
- return { 'name' : _coalesceTextNodeChildren( node, encoding )
- , 'url' : _getNodeAttribute( node, 'url', encoding )
- , 'category' : _getNodeAttribute( node, 'category', encoding )
- }
-
-def _extractGuardNode( parent, encoding=None ):
-
- nodes = parent.getElementsByTagName( 'guard' )
- assert len( nodes ) <= 1, nodes
-
- if len( nodes ) < 1:
- return { 'permissions' : (), 'roles' : (), 'groups' : (), 'expr' : '' }
-
- node = nodes[ 0 ]
-
- expr_nodes = node.getElementsByTagName( 'guard-expression' )
- assert( len( expr_nodes ) <= 1 )
-
- expr_text = expr_nodes and _coalesceTextNodeChildren( expr_nodes[ 0 ]
- , encoding
- ) or ''
-
- return { 'permissions' : [ _coalesceTextNodeChildren( x, encoding )
- for x in node.getElementsByTagName(
- 'guard-permission' ) ]
- , 'roles' : [ _coalesceTextNodeChildren( x, encoding )
- for x in node.getElementsByTagName( 'guard-role' ) ]
- , 'groups' : [ _coalesceTextNodeChildren( x, encoding )
- for x in node.getElementsByTagName( 'guard-group' ) ]
- , 'expression' : expr_text
- }
-
-def _extractDefaultNode( parent, encoding=None ):
-
- nodes = parent.getElementsByTagName( 'default' )
- assert len( nodes ) <= 1, nodes
-
- if len( nodes ) < 1:
- return { 'value' : '', 'expression' : '', 'type' : 'n/a' }
-
- node = nodes[ 0 ]
-
- value_nodes = node.getElementsByTagName( 'value' )
- assert( len( value_nodes ) <= 1 )
-
- value_type = 'n/a'
- if value_nodes:
- value_type = value_nodes[ 0 ].getAttribute( 'type' ) or 'n/a'
-
- value_text = value_nodes and _coalesceTextNodeChildren( value_nodes[ 0 ]
- , encoding
- ) or ''
-
- expr_nodes = node.getElementsByTagName( 'expression' )
- assert( len( expr_nodes ) <= 1 )
-
- expr_text = expr_nodes and _coalesceTextNodeChildren( expr_nodes[ 0 ]
- , encoding
- ) or ''
-
- return { 'value' : value_text
- , 'type' : value_type
- , 'expression' : expr_text
- }
-
-_SEMICOLON_LIST_SPLITTER = re.compile( r';[ ]*' )
-
-def _extractMatchNode( parent, encoding=None ):
-
- nodes = parent.getElementsByTagName( 'match' )
-
- result = {}
-
- for node in nodes:
-
- name = _getNodeAttribute( node, 'name', encoding )
- values = _getNodeAttribute( node, 'values', encoding )
- result[ name ] = _SEMICOLON_LIST_SPLITTER.split( values )
-
- return result
-
-def _guessVariableType( value ):
-
- from DateTime.DateTime import DateTime
-
- if value is None:
- return 'none'
-
- if isinstance( value, DateTime ):
- return 'datetime'
-
- if isinstance( value, bool ):
- return 'bool'
-
- if isinstance( value, int ):
- return 'int'
-
- if isinstance( value, float ):
- return 'float'
-
- if isinstance( value, basestring ):
- return 'string'
-
- return 'unknown'
-
-def _convertVariableValue( value, type_id ):
-
- from DateTime.DateTime import DateTime
-
- if type_id == 'none':
- return None
-
- if type_id == 'datetime':
-
- return DateTime( value )
-
- if type_id == 'bool':
-
- if isinstance( value, basestring ):
-
- value = str( value ).lower()
-
- return value in ( 'true', 'yes', '1' )
-
- else:
- return bool( value )
-
- if type_id == 'int':
- return int( value )
-
- if type_id == 'float':
- return float( value )
-
- return value
-
-from Products.PythonScripts.PythonScript import PythonScript
-from Products.ExternalMethod.ExternalMethod import ExternalMethod
-from OFS.DTMLMethod import DTMLMethod
-
-_METATYPE_SUFFIXES = \
-{ PythonScript.meta_type : 'py'
-, DTMLMethod.meta_type : 'dtml'
-}
-
-def _initDCWorkflow( workflow
- , title
- , state_variable
- , initial_state
- , states
- , transitions
- , variables
- , worklists
- , permissions
- , scripts
- , context
- ):
- """ Initialize a DC Workflow using values parsed from XML.
- """
- workflow.title = title
- workflow.state_var = state_variable
- workflow.initial_state = initial_state
-
- permissions = permissions[:]
- permissions.sort()
- workflow.permissions = tuple(permissions)
-
- _initDCWorkflowVariables( workflow, variables )
- _initDCWorkflowStates( workflow, states )
- _initDCWorkflowTransitions( workflow, transitions )
- _initDCWorkflowWorklists( workflow, worklists )
- _initDCWorkflowScripts( workflow, scripts, context )
-
-
-def _initDCWorkflowVariables( workflow, variables ):
-
- """ Initialize DCWorkflow variables
- """
- from Products.DCWorkflow.Variables import VariableDefinition
-
- for v_info in variables:
-
- id = str( v_info[ 'variable_id' ] ) # no unicode!
- v = VariableDefinition( id )
- workflow.variables._setObject( id, v )
- v = workflow.variables._getOb( id )
-
- guard = v_info[ 'guard' ]
- props = { 'guard_roles' : ';'.join( guard[ 'roles' ] )
- , 'guard_permissions' : ';'.join( guard[ 'permissions' ] )
- , 'guard_groups' : ';'.join( guard[ 'groups' ] )
- , 'guard_expr' : guard[ 'expression' ]
- }
-
- default = v_info[ 'default' ]
- default_value = _convertVariableValue( default[ 'value' ]
- , default[ 'type' ] )
-
- v.setProperties( description = v_info[ 'description' ]
- , default_value = default_value
- , default_expr = default[ 'expression' ]
- , for_catalog = v_info[ 'for_catalog' ]
- , for_status = v_info[ 'for_status' ]
- , update_always = v_info[ 'update_always' ]
- , props = props
- )
-
-
-def _initDCWorkflowStates( workflow, states ):
-
- """ Initialize DCWorkflow states
- """
- from Globals import PersistentMapping
- from Products.DCWorkflow.States import StateDefinition
-
- for s_info in states:
-
- id = str( s_info[ 'state_id' ] ) # no unicode!
- s = StateDefinition( id )
- workflow.states._setObject( id, s )
- s = workflow.states._getOb( id )
-
- s.setProperties( title = s_info[ 'title' ]
- , description = s_info[ 'description' ]
- , transitions = s_info[ 'transitions' ]
- )
-
- for k, v in s_info[ 'permissions' ].items():
- s.setPermission( k, isinstance(v, list), v )
-
- gmap = s.group_roles = PersistentMapping()
-
- for group_id, roles in s_info[ 'groups' ]:
- gmap[ group_id ] = roles
-
- vmap = s.var_values = PersistentMapping()
-
- for name, v_info in s_info[ 'variables' ].items():
-
- value = _convertVariableValue( v_info[ 'value' ]
- , v_info[ 'type' ] )
-
- vmap[ name ] = value
-
-
-def _initDCWorkflowTransitions( workflow, transitions ):
-
- """ Initialize DCWorkflow transitions
- """
- from Globals import PersistentMapping
- from Products.DCWorkflow.Transitions import TransitionDefinition
-
- for t_info in transitions:
-
- id = str( t_info[ 'transition_id' ] ) # no unicode!
- t = TransitionDefinition( id )
- workflow.transitions._setObject( id, t )
- t = workflow.transitions._getOb( id )
-
- trigger_type = list( TRIGGER_TYPES ).index( t_info[ 'trigger' ] )
-
- action = t_info[ 'action' ]
-
- guard = t_info[ 'guard' ]
- props = { 'guard_roles' : ';'.join( guard[ 'roles' ] )
- , 'guard_permissions' : ';'.join( guard[ 'permissions' ] )
- , 'guard_groups' : ';'.join( guard[ 'groups' ] )
- , 'guard_expr' : guard[ 'expression' ]
- }
-
- t.setProperties( title = t_info[ 'title' ]
- , description = t_info[ 'description' ]
- , new_state_id = t_info[ 'new_state' ]
- , trigger_type = trigger_type
- , script_name = t_info[ 'before_script' ]
- , after_script_name = t_info[ 'after_script' ]
- , actbox_name = action[ 'name' ]
- , actbox_url = action[ 'url' ]
- , actbox_category = action[ 'category' ]
- , props = props
- )
-
- t.var_exprs = PersistentMapping( t_info[ 'variables' ].items() )
-
-def _initDCWorkflowWorklists( workflow, worklists ):
-
- """ Initialize DCWorkflow worklists
- """
- from Globals import PersistentMapping
- from Products.DCWorkflow.Worklists import WorklistDefinition
-
- for w_info in worklists:
-
- id = str( w_info[ 'worklist_id' ] ) # no unicode!
- w = WorklistDefinition( id )
- workflow.worklists._setObject( id, w )
-
- w = workflow.worklists._getOb( id )
-
- action = w_info[ 'action' ]
-
- guard = w_info[ 'guard' ]
- props = { 'guard_roles' : ';'.join( guard[ 'roles' ] )
- , 'guard_permissions' : ';'.join( guard[ 'permissions' ] )
- , 'guard_groups' : ';'.join( guard[ 'groups' ] )
- , 'guard_expr' : guard[ 'expression' ]
- }
-
- w.setProperties( description = w_info[ 'description' ]
- , actbox_name = action[ 'name' ]
- , actbox_url = action[ 'url' ]
- , actbox_category = action[ 'category' ]
- , props = props
- )
-
- w.var_matches = PersistentMapping()
- for k, v in w_info[ 'match' ].items():
- w.var_matches[ str( k ) ] = tuple( [ str(x) for x in v ] )
-
-def _initDCWorkflowScripts( workflow, scripts, context ):
-
- """ Initialize DCWorkflow scripts
- """
- for s_info in scripts:
-
- id = str( s_info[ 'script_id' ] ) # no unicode!
- meta_type = s_info[ 'meta_type' ]
- filename = s_info[ 'filename' ]
- file = ''
-
- if filename:
- file = context.readDataFile( filename )
-
- if meta_type == PythonScript.meta_type:
- script = PythonScript( id )
- script.write( file )
-
- elif meta_type == ExternalMethod.meta_type:
- script = ExternalMethod( id
- , ''
- , s_info['module']
- , s_info['function']
- )
-
- elif meta_type == DTMLMethod.meta_type:
- script = DTMLMethod( file, __name__=id )
-
- workflow.scripts._setObject( id, script )
+from Products.CMFCore.exportimport.workflow import exportWorkflowTool
+from Products.CMFCore.exportimport.workflow import importWorkflowTool
Deleted: CMF/branches/yuppie-workflow_setup/CMFSetup/xml/wtcToolExport.xml
===================================================================
--- CMF/branches/yuppie-workflow_setup/CMFSetup/xml/wtcToolExport.xml 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/CMFSetup/xml/wtcToolExport.xml 2005-11-22 16:12:02 UTC (rev 40321)
@@ -1,33 +0,0 @@
-<?xml version="1.0"?>
-<workflow-tool
- xmlns:tal="http://xml.zope.org/namespaces/tal">
- <workflow
- workflow_id="WORKFLOW_ID"
- meta_type="META_TYPE"
- tal:repeat="workflow here/listWorkflowInfo"
- tal:attributes="workflow_id workflow/id;
- meta_type workflow/meta_type;
- filename workflow/filename | default;
- " />
- <bindings
- tal:define="chains here/listWorkflowChains;
- default_chain python: chains[ 0 ][ 1 ];
- overrides python: chains[ 1: ];
- ">
- <default>
- <bound-workflow
- workflow_id="WORKFLOW_ID"
- tal:repeat="bound default_chain"
- tal:attributes="workflow_id bound" />
- </default>
- <type
- type_id="TYPE_ID"
- tal:repeat="binding overrides"
- tal:attributes="type_id python: binding[ 0 ]">
- <bound-workflow
- workflow_id="WORKFLOW_ID"
- tal:repeat="bound python: binding[ 1 ]"
- tal:attributes="workflow_id bound" />
- </type>
- </bindings>
-</workflow-tool>
Deleted: CMF/branches/yuppie-workflow_setup/CMFSetup/xml/wtcWorkflowExport.xml
===================================================================
--- CMF/branches/yuppie-workflow_setup/CMFSetup/xml/wtcWorkflowExport.xml 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/CMFSetup/xml/wtcWorkflowExport.xml 2005-11-22 16:12:02 UTC (rev 40321)
@@ -1,213 +0,0 @@
-<?xml version="1.0"?>
-<dc-workflow
- xmlns:tal="http://xml.zope.org/namespaces/tal"
- workflow_id="dcworkflow"
- title="Some DCWorkflow"
- state_variable="review_state"
- initial_state="visible"
- tal:define="info python: here.getWorkflowInfo(
- options[ 'workflow_id' ] )"
- tal:attributes="workflow_id info/id;
- title info/title;
- state_variable info/state_variable;
- initial_state info/initial_state">
- <permission
- tal:repeat="permission info/permissions"
- tal:content="permission">PERMISSION</permission>
- <state
- state_id="pending"
- title="Waiting for reviewer"
- tal:repeat="state info/state_info"
- tal:attributes="state_id state/id;
- title state/title;
- "
- ><tal:case tal:condition="state/description">
- <description
- tal:content="state/description">DESCRIPTION</description></tal:case>
- <exit-transition
- transition_id="TRANSITION_ID"
- tal:repeat="exit state/transitions"
- tal:attributes="transition_id exit"
- />
- <permission-map
- name=""
- acquired="True"
- tal:repeat="perm state/permissions"
- tal:attributes="name perm/name;
- acquired perm/acquired;
- ">
- <permission-role
- tal:repeat="role perm/roles"
- tal:content="role">ROLE</permission-role>
- </permission-map>
- <group-map
- name=""
- tal:repeat="group state/groups"
- tal:attributes="name python: group[ 0 ];
- ">
- <group-role
- tal:repeat="role python: group[ 1 ]"
- tal:content="role">ROLE</group-role>
- </group-map>
- <assignment
- name="VAR_NAME"
- type="VAR_TYPE"
- tal:repeat="var state/variables"
- tal:attributes="name var/name;
- type var/type;
- "
- tal:content="var/value">VALUE</assignment>
- </state>
- <transition
- transition_id="Publish"
- title="Reviewer publishes content"
- new_state="published"
- trigger="USER"
- before_script=""
- after_script=""
- tal:repeat="transition info/transition_info"
- tal:attributes="transition_id transition/id;
- title transition/title;
- new_state transition/new_state_id;
- trigger transition/trigger_type;
- before_script transition/script_name;
- after_script transition/after_script_name;
- "
- ><tal:case tal:condition="transition/description">
- <description
- tal:content="transition/description">DESCRIPTION</description></tal:case>
- <action
- url="URL"
- category="CATEGORY"
- tal:condition="transition/actbox_name"
- tal:attributes="url transition/actbox_url;
- category transition/actbox_category;
- "
- tal:content="transition/actbox_name">ACTION NAME</action>
- <guard
- ><tal:case tal:condition="transition/guard_permissions">
- <guard-permission
- tal:repeat="permission transition/guard_permissions"
- tal:content="permission">PERMISSION</guard-permission></tal:case
- ><tal:case tal:condition="transition/guard_roles">
- <guard-role
- tal:repeat="role transition/guard_roles"
- tal:content="role">ROLE</guard-role></tal:case
- ><tal:case tal:condition="transition/guard_groups">
- <guard-group
- tal:repeat="group transition/guard_groups"
- tal:content="group">GROUP</guard-group></tal:case
- ><tal:case tal:condition="transition/guard_expr">
- <guard-expression
- tal:content="transition/guard_expr">EXPRESSION</guard-expression
- ></tal:case>
- </guard>
- <assignment
- name="VAR_NAME"
- tal:repeat="var transition/variables"
- tal:attributes="name var/name"
- tal:content="var/expr">EXPRESSION</assignment>
- </transition>
- <worklist
- worklist_id="reviewer_queue"
- title="For Review"
- tal:repeat="worklist info/worklist_info"
- tal:attributes="worklist_id worklist/id;
- title worklist/title;
- "
- ><tal:case tal:condition="worklist/description">
- <description
- tal:content="worklist/description">DESCRIPTION</description></tal:case>
- <action
- url="URL"
- category="CATEGORY"
- tal:condition="worklist/actbox_name"
- tal:attributes="url worklist/actbox_url;
- category worklist/actbox_category;
- "
- tal:content="worklist/actbox_name">ACTION NAME</action>
- <guard
- ><tal:case tal:condition="worklist/guard_permissions">
- <guard-permission
- tal:repeat="permission worklist/guard_permissions"
- tal:content="permission">PERMISSION</guard-permission></tal:case
- ><tal:case tal:condition="worklist/guard_roles">
- <guard-role
- tal:repeat="role worklist/guard_roles"
- tal:content="role">ROLE</guard-role></tal:case
- ><tal:case tal:condition="worklist/guard_groups">
- <guard-group
- tal:repeat="group worklist/guard_groups"
- tal:content="group">GROUP</guard-group></tal:case
- ><tal:case tal:condition="worklist/guard_expr">
- <guard-expression
- tal:content="worklist/guard_expr">EXPRESSION</guard-expression
- ></tal:case>
- </guard>
- <match
- name="review_state"
- values="pending"
- tal:repeat="match worklist/var_match"
- tal:attributes="name python: match[ 0 ];
- values python: match[ 1 ];
- "
- />
- </worklist>
- <variable
- variable_id="action"
- for_catalog="True"
- for_status="True"
- update_always="True"
- tal:repeat="variable info/variable_info"
- tal:attributes="variable_id variable/id;
- for_catalog variable/for_catalog;
- for_status variable/for_status;
- update_always variable/update_always;
- "
- ><tal:case tal:condition="variable/description">
- <description
- tal:content="variable/description">DESCRIPTION</description></tal:case>
- <default>
- <value
- type="VAR_TYPE"
- tal:attributes="type variable/default_type"
- tal:condition="variable/default_value"
- tal:content="variable/default_value">VALUE</value>
- <expression
- tal:condition="variable/default_expr"
- tal:content="variable/default_expr">EXPRESSION</expression>
- </default>
- <guard
- ><tal:case tal:condition="variable/guard_permissions">
- <guard-permission
- tal:repeat="permission variable/guard_permissions"
- tal:content="permission">PERMISSION</guard-permission></tal:case
- ><tal:case tal:condition="variable/guard_roles">
- <guard-role
- tal:repeat="role variable/guard_roles"
- tal:content="role">ROLE</guard-role></tal:case
- ><tal:case tal:condition="variable/guard_groups">
- <guard-group
- tal:repeat="group variable/guard_groups"
- tal:content="group">GROUP</guard-group></tal:case
- ><tal:case tal:condition="variable/guard_expr">
- <guard-expression
- tal:content="variable/guard_expr">EXPRESSION</guard-expression
- ></tal:case>
- </guard>
- </variable>
- <script
- script_id="SCRIPT_ID"
- type="Script (Python)"
- filename="/path/to/SCRIPT_ID.py"
- module=""
- function=""
- tal:repeat="script info/script_info"
- tal:attributes="script_id script/id;
- type script/meta_type;
- filename script/filename;
- module script/module;
- function script/function
- "
- />
-</dc-workflow>
Modified: CMF/branches/yuppie-workflow_setup/DCWorkflow/DCWorkflow.py
===================================================================
--- CMF/branches/yuppie-workflow_setup/DCWorkflow/DCWorkflow.py 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/DCWorkflow/DCWorkflow.py 2005-11-22 16:12:02 UTC (rev 40321)
@@ -37,6 +37,7 @@
from Products.CMFCore.WorkflowCore import WorkflowException
# DCWorkflow
+from interfaces import IDCWorkflowDefinition
from permissions import ManagePortal
from utils import modifyRolesForPermission
from utils import modifyRolesForGroup
@@ -62,7 +63,7 @@
UI methods are in WorkflowUIMixin.
'''
- implements(IWorkflowDefinition)
+ implements(IDCWorkflowDefinition, IWorkflowDefinition)
__implements__ = z2IWorkflowDefinition
title = 'DC Workflow Definition'
Copied: CMF/branches/yuppie-workflow_setup/DCWorkflow/Extensions (from rev 40305, CMF/branches/yuppie-workflow_setup/CMFSetup/Extensions)
Modified: CMF/branches/yuppie-workflow_setup/DCWorkflow/configure.zcml
===================================================================
--- CMF/branches/yuppie-workflow_setup/DCWorkflow/configure.zcml 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/DCWorkflow/configure.zcml 2005-11-22 16:12:02 UTC (rev 40321)
@@ -12,4 +12,11 @@
global="False"
/>
+ <adapter
+ factory=".exportimport.DCWorkflowDefinitionBodyAdapter"
+ provides="Products.GenericSetup.interfaces.IBody"
+ for=".interfaces.IDCWorkflowDefinition
+ Products.GenericSetup.interfaces.ISetupContext"
+ />
+
</configure>
Copied: CMF/branches/yuppie-workflow_setup/DCWorkflow/exportimport.py (from rev 40305, CMF/branches/yuppie-workflow_setup/CMFSetup/workflow.py)
===================================================================
--- CMF/branches/yuppie-workflow_setup/CMFSetup/workflow.py 2005-11-21 19:08:48 UTC (rev 40305)
+++ CMF/branches/yuppie-workflow_setup/DCWorkflow/exportimport.py 2005-11-22 16:12:02 UTC (rev 40321)
@@ -0,0 +1,1225 @@
+##############################################################################
+#
+# Copyright (c) 2005 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""DCWorkflow export / import support.
+
+$Id$
+"""
+
+import re
+from xml.dom.minidom import parseString
+
+from AccessControl import ClassSecurityInfo
+from Acquisition import Implicit
+from Globals import InitializeClass
+from Products.PageTemplates.PageTemplateFile import PageTemplateFile
+
+from Products.GenericSetup.utils import BodyAdapterBase
+
+from utils import _xmldir
+from DCWorkflow import DCWorkflowDefinition
+from interfaces import IDCWorkflowDefinition
+from permissions import ManagePortal
+
+
+TRIGGER_TYPES = ( 'AUTOMATIC', 'USER' )
+_FILENAME = 'workflows.xml'
+
+
+class DCWorkflowDefinitionBodyAdapter(BodyAdapterBase):
+
+ """Body im- and exporter for DCWorkflowDefinition.
+ """
+
+ __used_for__ = IDCWorkflowDefinition
+
+ def _exportBody(self):
+ """Export the object as a file body.
+ """
+ wfdc = WorkflowDefinitionConfigurator(self.context)
+ return wfdc.__of__(self.context).generateWorkflowXML()
+
+ def _importBody(self, body):
+ """Import the object from the file body.
+ """
+ encoding = 'utf-8'
+ wfdc = WorkflowDefinitionConfigurator(self.context)
+
+ ( workflow_id
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ ) = wfdc.parseWorkflowXML(body, encoding)
+
+ _initDCWorkflow( self.context
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ , self.environ
+ )
+
+ body = property(_exportBody, _importBody)
+
+ mime_type = 'text/xml'
+
+ suffix = '/definition.xml'
+
+
+class WorkflowDefinitionConfigurator( Implicit ):
+ """ Synthesize XML description of site's workflows.
+ """
+ security = ClassSecurityInfo()
+
+ def __init__(self, obj):
+ self._obj = obj
+
+ security.declareProtected( ManagePortal, 'getWorkflowInfo' )
+ def getWorkflowInfo( self, workflow_id ):
+
+ """ Return a mapping describing a given workflow.
+
+ o Keys in the mappings:
+
+ 'id' -- the ID of the workflow within the tool
+
+ 'meta_type' -- the workflow's meta_type
+
+ 'title' -- the workflow's title property
+
+ o See '_extractDCWorkflowInfo' below for keys present only for
+ DCWorkflow definitions.
+
+ """
+ workflow = self._obj
+
+ workflow_info = { 'id' : workflow_id
+ , 'meta_type' : workflow.meta_type
+ , 'title' : workflow.title_or_id()
+ }
+
+ if workflow.meta_type == DCWorkflowDefinition.meta_type:
+ self._extractDCWorkflowInfo( workflow, workflow_info )
+
+ return workflow_info
+
+ security.declareProtected( ManagePortal, 'generateWorkflowXML' )
+ def generateWorkflowXML(self):
+ """ Pseudo API.
+ """
+ return self._workflowConfig(workflow_id=self._obj.getId())
+
+ security.declareProtected( ManagePortal, 'generateWorkflowScripts' )
+ def getWorkflowScripts(self):
+ """ Get workflow scripts information
+ """
+ return self._extractScripts(self._obj)
+
+ security.declareProtected( ManagePortal, 'parseWorkflowXML' )
+ def parseWorkflowXML( self, xml, encoding=None ):
+ """ Pseudo API.
+ """
+ dom = parseString( xml )
+
+ root = dom.getElementsByTagName( 'dc-workflow' )[ 0 ]
+
+ workflow_id = _getNodeAttribute( root, 'workflow_id', encoding )
+ title = _getNodeAttribute( root, 'title', encoding )
+ state_variable = _getNodeAttribute( root, 'state_variable', encoding )
+ initial_state = _getNodeAttribute( root, 'initial_state', encoding )
+
+ states = _extractStateNodes( root, encoding )
+ transitions = _extractTransitionNodes( root, encoding )
+ variables = _extractVariableNodes( root, encoding )
+ worklists = _extractWorklistNodes( root, encoding )
+ permissions = _extractPermissionNodes( root, encoding )
+ scripts = _extractScriptNodes( root, encoding )
+
+ return ( workflow_id
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ )
+
+ security.declarePrivate( '_workflowConfig' )
+ _workflowConfig = PageTemplateFile( 'wtcWorkflowExport.xml'
+ , _xmldir
+ , __name__='workflowConfig'
+ )
+
+ security.declarePrivate( '_extractDCWorkflowInfo' )
+ def _extractDCWorkflowInfo( self, workflow, workflow_info ):
+
+ """ Append the information for a 'workflow' into 'workflow_info'
+
+ o 'workflow' must be a DCWorkflowDefinition instance.
+
+ o 'workflow_info' must be a dictionary.
+
+ o The following keys will be added to 'workflow_info':
+
+ 'permissions' -- a list of names of permissions managed
+ by the workflow
+
+ 'state_variable' -- the name of the workflow's "main"
+ state variable
+
+ 'initial_state' -- the name of the state in the workflow
+ in which objects start their lifecycle.
+
+ 'variable_info' -- a list of mappings describing the
+ variables tracked by the workflow (see '_extractVariables').
+
+ 'state_info' -- a list of mappings describing the
+ states tracked by the workflow (see '_extractStates').
+
+ 'transition_info' -- a list of mappings describing the
+ transitions tracked by the workflow (see '_extractTransitions').
+
+ 'worklist_info' -- a list of mappings describing the
+ worklists tracked by the workflow (see '_extractWorklists').
+
+ 'script_info' -- a list of mappings describing the scripts which
+ provide added business logic (see '_extractScripts').
+ """
+ workflow_info[ 'state_variable' ] = workflow.state_var
+ workflow_info[ 'initial_state' ] = workflow.initial_state
+ workflow_info[ 'permissions' ] = workflow.permissions
+ workflow_info[ 'variable_info' ] = self._extractVariables( workflow )
+ workflow_info[ 'state_info' ] = self._extractStates( workflow )
+ workflow_info[ 'transition_info' ] = self._extractTransitions(
+ workflow )
+ workflow_info[ 'worklist_info' ] = self._extractWorklists( workflow )
+ workflow_info[ 'script_info' ] = self._extractScripts( workflow )
+
+ security.declarePrivate( '_extractVariables' )
+ def _extractVariables( self, workflow ):
+
+ """ Return a sequence of mappings describing DCWorkflow variables.
+
+ o Keys for each mapping will include:
+
+ 'id' -- the variable's ID
+
+ 'description' -- a textual description of the variable
+
+ 'for_catalog' -- whether to catalog this variable
+
+ 'for_status' -- whether to ??? this variable (XXX)
+
+ 'update_always' -- whether to update this variable whenever
+ executing a transition (xxX)
+
+ 'default_value' -- a default value for the variable (XXX)
+
+ 'default_expression' -- a TALES expression for the default value
+
+ 'guard_permissions' -- a list of permissions guarding access
+ to the variable
+
+ 'guard_roles' -- a list of roles guarding access
+ to the variable
+
+ 'guard_groups' -- a list of groups guarding the transition
+
+ 'guard_expr' -- an expression guarding access to the variable
+ """
+ result = []
+
+ items = workflow.variables.objectItems()
+ items.sort()
+
+ for k, v in items:
+
+ guard = v.getInfoGuard()
+
+ default_type = _guessVariableType( v.default_value )
+
+ info = { 'id' : k
+ , 'description' : v.description
+ , 'for_catalog' : bool( v.for_catalog )
+ , 'for_status' : bool( v.for_status )
+ , 'update_always' : bool( v.update_always )
+ , 'default_value' : v.default_value
+ , 'default_type' : default_type
+ , 'default_expr' : v.getDefaultExprText()
+ , 'guard_permissions' : guard.permissions
+ , 'guard_roles' : guard.roles
+ , 'guard_groups' : guard.groups
+ , 'guard_expr' : guard.getExprText()
+ }
+
+ result.append( info )
+
+ return result
+
+ security.declarePrivate( '_extractStates' )
+ def _extractStates( self, workflow ):
+
+ """ Return a sequence of mappings describing DCWorkflow states.
+
+ o Within the workflow mapping, each 'state_info' mapping has keys:
+
+ 'id' -- the state's ID
+
+ 'title' -- the state's title
+
+ 'description' -- the state's description
+
+ 'transitions' -- a list of IDs of transitions out of the state
+
+ 'permissions' -- a list of mappings describing the permission
+ map for the state
+
+ 'groups' -- a list of ( group_id, (roles,) ) tuples describing the
+ group-role assignments for the state
+
+ 'variables' -- a list of mapping for the variables
+ to be set when entering the state.
+
+ o Within the state_info mappings, each 'permissions' mapping
+ has the keys:
+
+ 'name' -- the name of the permission
+
+ 'roles' -- a sequence of role IDs which have the permission
+
+ 'acquired' -- whether roles are acquired for the permission
+
+ o Within the state_info mappings, each 'variable' mapping
+ has the keys:
+
+ 'name' -- the name of the variable
+
+ 'type' -- the type of the value (allowed values are:
+ 'string', 'datetime', 'bool', 'int')
+
+ 'value' -- the value to be set
+ """
+ result = []
+
+ items = workflow.states.objectItems()
+ items.sort()
+
+ for k, v in items:
+
+ groups = v.group_roles and list( v.group_roles.items() ) or []
+ groups = [ x for x in groups if x[1] ]
+ groups.sort()
+
+ variables = list( v.getVariableValues() )
+ variables.sort()
+
+ v_info = []
+
+ for v_name, value in variables:
+ v_info.append( { 'name' : v_name
+ , 'type' :_guessVariableType( value )
+ , 'value' : value
+ } )
+
+ info = { 'id' : k
+ , 'title' : v.title
+ , 'description' : v.description
+ , 'transitions' : v.transitions
+ , 'permissions' : self._extractStatePermissions( v )
+ , 'groups' : groups
+ , 'variables' : v_info
+ }
+
+ result.append( info )
+
+ return result
+
+ security.declarePrivate( '_extractStatePermissions' )
+ def _extractStatePermissions( self, state ):
+
+ """ Return a sequence of mappings for the permissions in a state.
+
+ o Each mapping has the keys:
+
+ 'name' -- the name of the permission
+
+ 'roles' -- a sequence of role IDs which have the permission
+
+ 'acquired' -- whether roles are acquired for the permission
+ """
+ result = []
+
+ items = state.permission_roles.items()
+ items.sort()
+
+ for k, v in items:
+
+ result.append( { 'name' : k
+ , 'roles' : v
+ , 'acquired' : not isinstance( v, tuple )
+ } )
+
+ return result
+
+
+ security.declarePrivate( '_extractTransitions' )
+ def _extractTransitions( self, workflow ):
+
+ """ Return a sequence of mappings describing DCWorkflow transitions.
+
+ o Each mapping has the keys:
+
+ 'id' -- the transition's ID
+
+ 'title' -- the transition's ID
+
+ 'description' -- the transition's description
+
+ 'new_state_id' -- the ID of the state into which the transition
+ moves an object
+
+ 'trigger_type' -- one of the following values, indicating how the
+ transition is fired:
+
+ - "AUTOMATIC" -> fired opportunistically whenever the workflow
+ notices that its guard conditions permit
+
+ - "USER" -> fired in response to user request
+
+ 'script_name' -- the ID of a script to be executed before
+ the transition
+
+ 'after_script_name' -- the ID of a script to be executed after
+ the transition
+
+ 'actbox_name' -- the name of the action by which the user
+ triggers the transition
+
+ 'actbox_url' -- the URL of the action by which the user
+ triggers the transition
+
+ 'actbox_category' -- the category of the action by which the user
+ triggers the transition
+
+ 'variables' -- a list of ( id, expr ) tuples defining how variables
+ are to be set during the transition
+
+ 'guard_permissions' -- a list of permissions guarding the transition
+
+ 'guard_roles' -- a list of roles guarding the transition
+
+ 'guard_groups' -- a list of groups guarding the transition
+
+ 'guard_expr' -- an expression guarding the transition
+
+ """
+ result = []
+
+ items = workflow.transitions.objectItems()
+ items.sort()
+
+ for k, v in items:
+
+ guard = v.getGuard()
+
+ v_info = []
+
+ for v_name, expr in v.getVariableExprs():
+ v_info.append( { 'name' : v_name, 'expr' : expr } )
+
+ info = { 'id' : k
+ , 'title' : v.title
+ , 'description' : v.description
+ , 'new_state_id' : v.new_state_id
+ , 'trigger_type' : TRIGGER_TYPES[ v.trigger_type ]
+ , 'script_name' : v.script_name
+ , 'after_script_name' : v.after_script_name
+ , 'actbox_name' : v.actbox_name
+ , 'actbox_url' : v.actbox_url
+ , 'actbox_category' : v.actbox_category
+ , 'variables' : v_info
+ , 'guard_permissions' : guard.permissions
+ , 'guard_roles' : guard.roles
+ , 'guard_groups' : guard.groups
+ , 'guard_expr' : guard.getExprText()
+ }
+
+ result.append( info )
+
+ return result
+
+ security.declarePrivate( '_extractWorklists' )
+ def _extractWorklists( self, workflow ):
+
+ """ Return a sequence of mappings describing DCWorkflow transitions.
+
+ o Each mapping has the keys:
+
+ 'id' -- the ID of the worklist
+
+ 'title' -- the title of the worklist
+
+ 'description' -- a textual description of the worklist
+
+ 'var_match' -- a list of ( key, value ) tuples defining
+ the variables used to "activate" the worklist.
+
+ 'actbox_name' -- the name of the "action" corresponding to the
+ worklist
+
+ 'actbox_url' -- the URL of the "action" corresponding to the
+ worklist
+
+ 'actbox_category' -- the category of the "action" corresponding
+ to the worklist
+
+ 'guard_permissions' -- a list of permissions guarding access
+ to the worklist
+
+ 'guard_roles' -- a list of roles guarding access
+ to the worklist
+
+ 'guard_expr' -- an expression guarding access to the worklist
+
+ """
+ result = []
+
+ items = workflow.worklists.objectItems()
+ items.sort()
+
+ for k, v in items:
+
+ guard = v.getGuard()
+
+ var_match = [ ( id, v.getVarMatchText( id ) )
+ for id in v.getVarMatchKeys() ]
+
+ info = { 'id' : k
+ , 'title' : v.title
+ , 'description' : v.description
+ , 'var_match' : var_match
+ , 'actbox_name' : v.actbox_name
+ , 'actbox_url' : v.actbox_url
+ , 'actbox_category' : v.actbox_category
+ , 'guard_permissions' : guard.permissions
+ , 'guard_roles' : guard.roles
+ , 'guard_groups' : guard.groups
+ , 'guard_expr' : guard.getExprText()
+ }
+
+ result.append( info )
+
+ return result
+
+ security.declarePrivate( '_extractScripts' )
+ def _extractScripts( self, workflow ):
+
+ """ Return a sequence of mappings describing DCWorkflow scripts.
+
+ o Each mapping has the keys:
+
+ 'id' -- the ID of the script
+
+ 'meta_type' -- the title of the worklist
+
+ 'body' -- the text of the script (only applicable to scripts
+ of type Script (Python))
+
+ 'module' -- The module from where to load the function (only
+ applicable to External Method scripts)
+
+ 'function' -- The function to load from the 'module' given
+ (Only applicable to External Method scripts)
+
+ 'filename' -- the name of the file to / from which the script
+ is stored / loaded (Script (Python) only)
+ """
+ result = []
+
+ items = workflow.scripts.objectItems()
+ items.sort()
+
+ for k, v in items:
+
+ filename = _getScriptFilename( workflow.getId(), k, v.meta_type )
+ module = ''
+ function = ''
+
+ if v.meta_type == 'External Method':
+ module = v.module()
+ function = v.function()
+
+ info = { 'id' : k
+ , 'meta_type' : v.meta_type
+ , 'module' : module
+ , 'function' : function
+ , 'filename' : filename
+ }
+
+ result.append( info )
+
+ return result
+
+InitializeClass( WorkflowDefinitionConfigurator )
+
+
+def _getScriptFilename( workflow_id, script_id, meta_type ):
+
+ """ Return the name of the file which holds the script.
+ """
+ wf_dir = workflow_id.replace( ' ', '_' )
+ suffix = _METATYPE_SUFFIXES.get(meta_type, None)
+
+ if suffix is None:
+ return ''
+
+ return 'workflows/%s/scripts/%s.%s' % ( wf_dir, script_id, suffix )
+
+def _extractStateNodes( root, encoding=None ):
+
+ result = []
+
+ for s_node in root.getElementsByTagName( 'state' ):
+
+ info = { 'state_id' : _getNodeAttribute( s_node, 'state_id', encoding )
+ , 'title' : _getNodeAttribute( s_node, 'title', encoding )
+ , 'description' : _extractDescriptionNode( s_node, encoding )
+ }
+
+ info[ 'transitions' ] = [ _getNodeAttribute( x, 'transition_id'
+ , encoding )
+ for x in s_node.getElementsByTagName(
+ 'exit-transition' ) ]
+
+ info[ 'permissions' ] = permission_map = {}
+
+ for p_map in s_node.getElementsByTagName( 'permission-map' ):
+
+ name = _getNodeAttribute( p_map, 'name', encoding )
+ acquired = _getNodeAttributeBoolean( p_map, 'acquired' )
+
+ roles = [ _coalesceTextNodeChildren( x, encoding )
+ for x in p_map.getElementsByTagName(
+ 'permission-role' ) ]
+
+ if not acquired:
+ roles = tuple( roles )
+
+ permission_map[ name ] = roles
+
+ info[ 'groups' ] = group_map = []
+
+ for g_map in s_node.getElementsByTagName( 'group-map' ):
+
+ name = _getNodeAttribute( g_map, 'name', encoding )
+
+ roles = [ _coalesceTextNodeChildren( x, encoding )
+ for x in g_map.getElementsByTagName(
+ 'group-role' ) ]
+
+ group_map.append( ( name, tuple( roles ) ) )
+
+ info[ 'variables' ] = var_map = {}
+
+ for assignment in s_node.getElementsByTagName( 'assignment' ):
+
+ name = _getNodeAttribute( assignment, 'name', encoding )
+ type_id = _getNodeAttribute( assignment, 'type', encoding )
+ value = _coalesceTextNodeChildren( assignment, encoding )
+
+ var_map[ name ] = { 'name' : name
+ , 'type' : type_id
+ , 'value' : value
+ }
+
+ result.append( info )
+
+ return result
+
+def _extractTransitionNodes( root, encoding=None ):
+
+ result = []
+
+ for t_node in root.getElementsByTagName( 'transition' ):
+
+ info = { 'transition_id' : _getNodeAttribute( t_node, 'transition_id'
+ , encoding )
+ , 'title' : _getNodeAttribute( t_node, 'title', encoding )
+ , 'description' : _extractDescriptionNode( t_node, encoding )
+ , 'new_state' : _getNodeAttribute( t_node, 'new_state'
+ , encoding )
+ , 'trigger' : _getNodeAttribute( t_node, 'trigger', encoding )
+ , 'before_script' : _getNodeAttribute( t_node, 'before_script'
+ , encoding )
+ , 'after_script' : _getNodeAttribute( t_node, 'after_script'
+ , encoding )
+ , 'action' : _extractActionNode( t_node, encoding )
+ , 'guard' : _extractGuardNode( t_node, encoding )
+ }
+
+ info[ 'variables' ] = var_map = {}
+
+ for assignment in t_node.getElementsByTagName( 'assignment' ):
+
+ name = _getNodeAttribute( assignment, 'name', encoding )
+ expr = _coalesceTextNodeChildren( assignment, encoding )
+ var_map[ name ] = expr
+
+ result.append( info )
+
+ return result
+
+def _extractVariableNodes( root, encoding=None ):
+
+ result = []
+
+ for v_node in root.getElementsByTagName( 'variable' ):
+
+ info = { 'variable_id' : _getNodeAttribute( v_node, 'variable_id'
+ , encoding )
+ , 'description' : _extractDescriptionNode( v_node, encoding )
+ , 'for_catalog' : _getNodeAttributeBoolean( v_node
+ , 'for_catalog'
+ )
+ , 'for_status' : _getNodeAttributeBoolean( v_node
+ , 'for_status'
+ )
+ , 'update_always' : _getNodeAttributeBoolean( v_node
+ , 'update_always'
+ )
+ , 'default' : _extractDefaultNode( v_node, encoding )
+ , 'guard' : _extractGuardNode( v_node, encoding )
+ }
+
+ result.append( info )
+
+ return result
+
+def _extractWorklistNodes( root, encoding=None ):
+
+ result = []
+
+ for w_node in root.getElementsByTagName( 'worklist' ):
+
+ info = { 'worklist_id' : _getNodeAttribute( w_node, 'worklist_id'
+ , encoding )
+ , 'title' : _getNodeAttribute( w_node, 'title' , encoding )
+ , 'description' : _extractDescriptionNode( w_node, encoding )
+ , 'match' : _extractMatchNode( w_node, encoding )
+ , 'action' : _extractActionNode( w_node, encoding )
+ , 'guard' : _extractGuardNode( w_node, encoding )
+ }
+
+ result.append( info )
+
+ return result
+
+def _extractScriptNodes( root, encoding=None ):
+
+ result = []
+
+ for s_node in root.getElementsByTagName( 'script' ):
+
+ try:
+ function = _getNodeAttribute( s_node, 'function' )
+ except ValueError:
+ function = ''
+
+ try:
+ module = _getNodeAttribute( s_node, 'module' )
+ except ValueError:
+ module = ''
+
+ info = { 'script_id' : _getNodeAttribute( s_node, 'script_id' )
+ , 'meta_type' : _getNodeAttribute( s_node, 'type' , encoding )
+ , 'function' : function
+ , 'module' : module
+ }
+
+ filename = _queryNodeAttribute( s_node, 'filename' , None, encoding )
+
+ if filename is not None:
+ info[ 'filename' ] = filename
+
+ result.append( info )
+
+ return result
+
+def _extractPermissionNodes( root, encoding=None ):
+
+ result = []
+
+ for p_node in root.getElementsByTagName( 'permission' ):
+
+ result.append( _coalesceTextNodeChildren( p_node, encoding ) )
+
+ return result
+
+def _extractActionNode( parent, encoding=None ):
+
+ nodes = parent.getElementsByTagName( 'action' )
+ assert len( nodes ) <= 1, nodes
+
+ if len( nodes ) < 1:
+ return { 'name' : '', 'url' : '', 'category' : '' }
+
+ node = nodes[ 0 ]
+
+ return { 'name' : _coalesceTextNodeChildren( node, encoding )
+ , 'url' : _getNodeAttribute( node, 'url', encoding )
+ , 'category' : _getNodeAttribute( node, 'category', encoding )
+ }
+
+def _extractGuardNode( parent, encoding=None ):
+
+ nodes = parent.getElementsByTagName( 'guard' )
+ assert len( nodes ) <= 1, nodes
+
+ if len( nodes ) < 1:
+ return { 'permissions' : (), 'roles' : (), 'groups' : (), 'expr' : '' }
+
+ node = nodes[ 0 ]
+
+ expr_nodes = node.getElementsByTagName( 'guard-expression' )
+ assert( len( expr_nodes ) <= 1 )
+
+ expr_text = expr_nodes and _coalesceTextNodeChildren( expr_nodes[ 0 ]
+ , encoding
+ ) or ''
+
+ return { 'permissions' : [ _coalesceTextNodeChildren( x, encoding )
+ for x in node.getElementsByTagName(
+ 'guard-permission' ) ]
+ , 'roles' : [ _coalesceTextNodeChildren( x, encoding )
+ for x in node.getElementsByTagName( 'guard-role' ) ]
+ , 'groups' : [ _coalesceTextNodeChildren( x, encoding )
+ for x in node.getElementsByTagName( 'guard-group' ) ]
+ , 'expression' : expr_text
+ }
+
+def _extractDefaultNode( parent, encoding=None ):
+
+ nodes = parent.getElementsByTagName( 'default' )
+ assert len( nodes ) <= 1, nodes
+
+ if len( nodes ) < 1:
+ return { 'value' : '', 'expression' : '', 'type' : 'n/a' }
+
+ node = nodes[ 0 ]
+
+ value_nodes = node.getElementsByTagName( 'value' )
+ assert( len( value_nodes ) <= 1 )
+
+ value_type = 'n/a'
+ if value_nodes:
+ value_type = value_nodes[ 0 ].getAttribute( 'type' ) or 'n/a'
+
+ value_text = value_nodes and _coalesceTextNodeChildren( value_nodes[ 0 ]
+ , encoding
+ ) or ''
+
+ expr_nodes = node.getElementsByTagName( 'expression' )
+ assert( len( expr_nodes ) <= 1 )
+
+ expr_text = expr_nodes and _coalesceTextNodeChildren( expr_nodes[ 0 ]
+ , encoding
+ ) or ''
+
+ return { 'value' : value_text
+ , 'type' : value_type
+ , 'expression' : expr_text
+ }
+
+_SEMICOLON_LIST_SPLITTER = re.compile( r';[ ]*' )
+
+def _extractMatchNode( parent, encoding=None ):
+
+ nodes = parent.getElementsByTagName( 'match' )
+
+ result = {}
+
+ for node in nodes:
+
+ name = _getNodeAttribute( node, 'name', encoding )
+ values = _getNodeAttribute( node, 'values', encoding )
+ result[ name ] = _SEMICOLON_LIST_SPLITTER.split( values )
+
+ return result
+
+def _guessVariableType( value ):
+
+ from DateTime.DateTime import DateTime
+
+ if value is None:
+ return 'none'
+
+ if isinstance( value, DateTime ):
+ return 'datetime'
+
+ if isinstance( value, bool ):
+ return 'bool'
+
+ if isinstance( value, int ):
+ return 'int'
+
+ if isinstance( value, float ):
+ return 'float'
+
+ if isinstance( value, basestring ):
+ return 'string'
+
+ return 'unknown'
+
+def _convertVariableValue( value, type_id ):
+
+ from DateTime.DateTime import DateTime
+
+ if type_id == 'none':
+ return None
+
+ if type_id == 'datetime':
+
+ return DateTime( value )
+
+ if type_id == 'bool':
+
+ if isinstance( value, basestring ):
+
+ value = str( value ).lower()
+
+ return value in ( 'true', 'yes', '1' )
+
+ else:
+ return bool( value )
+
+ if type_id == 'int':
+ return int( value )
+
+ if type_id == 'float':
+ return float( value )
+
+ return value
+
+from Products.PythonScripts.PythonScript import PythonScript
+from Products.ExternalMethod.ExternalMethod import ExternalMethod
+from OFS.DTMLMethod import DTMLMethod
+
+_METATYPE_SUFFIXES = \
+{ PythonScript.meta_type : 'py'
+, DTMLMethod.meta_type : 'dtml'
+}
+
+def _initDCWorkflow( workflow
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ , context
+ ):
+ """ Initialize a DC Workflow using values parsed from XML.
+ """
+ workflow.title = title
+ workflow.state_var = state_variable
+ workflow.initial_state = initial_state
+
+ permissions = permissions[:]
+ permissions.sort()
+ workflow.permissions = tuple(permissions)
+
+ _initDCWorkflowVariables( workflow, variables )
+ _initDCWorkflowStates( workflow, states )
+ _initDCWorkflowTransitions( workflow, transitions )
+ _initDCWorkflowWorklists( workflow, worklists )
+ _initDCWorkflowScripts( workflow, scripts, context )
+
+
+def _initDCWorkflowVariables( workflow, variables ):
+
+ """ Initialize DCWorkflow variables
+ """
+ from Products.DCWorkflow.Variables import VariableDefinition
+
+ for v_info in variables:
+
+ id = str( v_info[ 'variable_id' ] ) # no unicode!
+ v = VariableDefinition( id )
+ workflow.variables._setObject( id, v )
+ v = workflow.variables._getOb( id )
+
+ guard = v_info[ 'guard' ]
+ props = { 'guard_roles' : ';'.join( guard[ 'roles' ] )
+ , 'guard_permissions' : ';'.join( guard[ 'permissions' ] )
+ , 'guard_groups' : ';'.join( guard[ 'groups' ] )
+ , 'guard_expr' : guard[ 'expression' ]
+ }
+
+ default = v_info[ 'default' ]
+ default_value = _convertVariableValue( default[ 'value' ]
+ , default[ 'type' ] )
+
+ v.setProperties( description = v_info[ 'description' ]
+ , default_value = default_value
+ , default_expr = default[ 'expression' ]
+ , for_catalog = v_info[ 'for_catalog' ]
+ , for_status = v_info[ 'for_status' ]
+ , update_always = v_info[ 'update_always' ]
+ , props = props
+ )
+
+
+def _initDCWorkflowStates( workflow, states ):
+
+ """ Initialize DCWorkflow states
+ """
+ from Globals import PersistentMapping
+ from Products.DCWorkflow.States import StateDefinition
+
+ for s_info in states:
+
+ id = str( s_info[ 'state_id' ] ) # no unicode!
+ s = StateDefinition( id )
+ workflow.states._setObject( id, s )
+ s = workflow.states._getOb( id )
+
+ s.setProperties( title = s_info[ 'title' ]
+ , description = s_info[ 'description' ]
+ , transitions = s_info[ 'transitions' ]
+ )
+
+ for k, v in s_info[ 'permissions' ].items():
+ s.setPermission( k, isinstance(v, list), v )
+
+ gmap = s.group_roles = PersistentMapping()
+
+ for group_id, roles in s_info[ 'groups' ]:
+ gmap[ group_id ] = roles
+
+ vmap = s.var_values = PersistentMapping()
+
+ for name, v_info in s_info[ 'variables' ].items():
+
+ value = _convertVariableValue( v_info[ 'value' ]
+ , v_info[ 'type' ] )
+
+ vmap[ name ] = value
+
+
+def _initDCWorkflowTransitions( workflow, transitions ):
+
+ """ Initialize DCWorkflow transitions
+ """
+ from Globals import PersistentMapping
+ from Products.DCWorkflow.Transitions import TransitionDefinition
+
+ for t_info in transitions:
+
+ id = str( t_info[ 'transition_id' ] ) # no unicode!
+ t = TransitionDefinition( id )
+ workflow.transitions._setObject( id, t )
+ t = workflow.transitions._getOb( id )
+
+ trigger_type = list( TRIGGER_TYPES ).index( t_info[ 'trigger' ] )
+
+ action = t_info[ 'action' ]
+
+ guard = t_info[ 'guard' ]
+ props = { 'guard_roles' : ';'.join( guard[ 'roles' ] )
+ , 'guard_permissions' : ';'.join( guard[ 'permissions' ] )
+ , 'guard_groups' : ';'.join( guard[ 'groups' ] )
+ , 'guard_expr' : guard[ 'expression' ]
+ }
+
+ t.setProperties( title = t_info[ 'title' ]
+ , description = t_info[ 'description' ]
+ , new_state_id = t_info[ 'new_state' ]
+ , trigger_type = trigger_type
+ , script_name = t_info[ 'before_script' ]
+ , after_script_name = t_info[ 'after_script' ]
+ , actbox_name = action[ 'name' ]
+ , actbox_url = action[ 'url' ]
+ , actbox_category = action[ 'category' ]
+ , props = props
+ )
+
+ t.var_exprs = PersistentMapping( t_info[ 'variables' ].items() )
+
+def _initDCWorkflowWorklists( workflow, worklists ):
+
+ """ Initialize DCWorkflow worklists
+ """
+ from Globals import PersistentMapping
+ from Products.DCWorkflow.Worklists import WorklistDefinition
+
+ for w_info in worklists:
+
+ id = str( w_info[ 'worklist_id' ] ) # no unicode!
+ w = WorklistDefinition( id )
+ workflow.worklists._setObject( id, w )
+
+ w = workflow.worklists._getOb( id )
+
+ action = w_info[ 'action' ]
+
+ guard = w_info[ 'guard' ]
+ props = { 'guard_roles' : ';'.join( guard[ 'roles' ] )
+ , 'guard_permissions' : ';'.join( guard[ 'permissions' ] )
+ , 'guard_groups' : ';'.join( guard[ 'groups' ] )
+ , 'guard_expr' : guard[ 'expression' ]
+ }
+
+ w.setProperties( description = w_info[ 'description' ]
+ , actbox_name = action[ 'name' ]
+ , actbox_url = action[ 'url' ]
+ , actbox_category = action[ 'category' ]
+ , props = props
+ )
+
+ w.var_matches = PersistentMapping()
+ for k, v in w_info[ 'match' ].items():
+ w.var_matches[ str( k ) ] = tuple( [ str(x) for x in v ] )
+
+def _initDCWorkflowScripts( workflow, scripts, context ):
+
+ """ Initialize DCWorkflow scripts
+ """
+ for s_info in scripts:
+
+ id = str( s_info[ 'script_id' ] ) # no unicode!
+ meta_type = s_info[ 'meta_type' ]
+ filename = s_info[ 'filename' ]
+ file = ''
+
+ if filename:
+ file = context.readDataFile( filename )
+
+ if meta_type == PythonScript.meta_type:
+ script = PythonScript( id )
+ script.write( file )
+
+ elif meta_type == ExternalMethod.meta_type:
+ script = ExternalMethod( id
+ , ''
+ , s_info['module']
+ , s_info['function']
+ )
+
+ elif meta_type == DTMLMethod.meta_type:
+ script = DTMLMethod( file, __name__=id )
+
+ workflow.scripts._setObject( id, script )
+
+#
+# deprecated DOM parsing utilities
+#
+_marker = object()
+
+def _queryNodeAttribute( node, attr_name, default, encoding=None ):
+
+ """ Extract a string-valued attribute from node.
+
+ o Return 'default' if the attribute is not present.
+ """
+ attr_node = node.attributes.get( attr_name, _marker )
+
+ if attr_node is _marker:
+ return default
+
+ value = attr_node.nodeValue
+
+ if encoding is not None:
+ value = value.encode( encoding )
+
+ return value
+
+def _getNodeAttribute( node, attr_name, encoding=None ):
+
+ """ Extract a string-valued attribute from node.
+ """
+ value = _queryNodeAttribute( node, attr_name, _marker, encoding )
+
+ if value is _marker:
+ raise ValueError, 'Invalid attribute: %s' % attr_name
+
+ return value
+
+def _queryNodeAttributeBoolean( node, attr_name, default ):
+
+ """ Extract a string-valued attribute from node.
+
+ o Return 'default' if the attribute is not present.
+ """
+ attr_node = node.attributes.get( attr_name, _marker )
+
+ if attr_node is _marker:
+ return default
+
+ value = node.attributes[ attr_name ].nodeValue.lower()
+
+ return value in ( 'true', 'yes', '1' )
+
+def _getNodeAttributeBoolean( node, attr_name ):
+
+ """ Extract a string-valued attribute from node.
+ """
+ value = node.attributes[ attr_name ].nodeValue.lower()
+
+ return value in ( 'true', 'yes', '1' )
+
+def _coalesceTextNodeChildren( node, encoding=None ):
+
+ """ Concatenate all childe text nodes into a single string.
+ """
+ from xml.dom import Node
+ fragments = []
+ node.normalize()
+ child = node.firstChild
+
+ while child is not None:
+
+ if child.nodeType == Node.TEXT_NODE:
+ fragments.append( child.nodeValue )
+
+ child = child.nextSibling
+
+ joined = ''.join( fragments )
+
+ if encoding is not None:
+ joined = joined.encode( encoding )
+
+ return ''.join( [ line.lstrip() for line in joined.splitlines(True) ] )
+
+def _extractDescriptionNode(parent, encoding=None):
+
+ d_nodes = parent.getElementsByTagName('description')
+ if d_nodes:
+ return _coalesceTextNodeChildren(d_nodes[0], encoding)
+ else:
+ return ''
Added: CMF/branches/yuppie-workflow_setup/DCWorkflow/interfaces.py
===================================================================
--- CMF/branches/yuppie-workflow_setup/DCWorkflow/interfaces.py 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/DCWorkflow/interfaces.py 2005-11-22 16:12:02 UTC (rev 40321)
@@ -0,0 +1,24 @@
+##############################################################################
+#
+# Copyright (c) 2005 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""DCWorkflow interfaces.
+
+$Id$
+"""
+
+from zope.interface import Interface
+
+
+class IDCWorkflowDefinition(Interface):
+
+ """Web-configurable workflow definition.
+ """
Property changes on: CMF/branches/yuppie-workflow_setup/DCWorkflow/interfaces.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Copied: CMF/branches/yuppie-workflow_setup/DCWorkflow/tests/test_exportimport.py (from rev 40305, CMF/branches/yuppie-workflow_setup/CMFSetup/tests/test_workflow.py)
===================================================================
--- CMF/branches/yuppie-workflow_setup/CMFSetup/tests/test_workflow.py 2005-11-21 19:08:48 UTC (rev 40305)
+++ CMF/branches/yuppie-workflow_setup/DCWorkflow/tests/test_exportimport.py 2005-11-22 16:12:02 UTC (rev 40321)
@@ -0,0 +1,2337 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""DCWorkflow export / import unit tests.
+
+$Id$
+"""
+
+import unittest
+import Testing
+
+import Products
+from Products.PythonScripts.PythonScript import PythonScript
+from Products.ExternalMethod.ExternalMethod import ExternalMethod
+from Products.Five import zcml
+
+import Products.GenericSetup.PythonScripts
+from Products.CMFCore.exportimport.tests.test_workflow \
+ import _BINDINGS_TOOL_EXPORT
+from Products.CMFCore.exportimport.tests.test_workflow import _DUMMY_ZCML
+from Products.CMFCore.exportimport.tests.test_workflow \
+ import _EMPTY_TOOL_EXPORT
+from Products.CMFCore.exportimport.tests.test_workflow \
+ import _WorkflowSetup as WorkflowSetupBase
+from Products.CMFCore.exportimport.tests.test_workflow import DummyWorkflow
+from Products.CMFCore.exportimport.tests.test_workflow \
+ import DummyWorkflowTool
+from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition
+from Products.DCWorkflow.Transitions import TRIGGER_USER_ACTION
+from Products.DCWorkflow.Transitions import TRIGGER_AUTOMATIC
+from Products.GenericSetup.tests.common import DummyExportContext
+from Products.GenericSetup.tests.common import DummyImportContext
+
+
+class _GuardChecker:
+
+ def _genGuardProps( self, permissions, roles, groups, expr ):
+
+ return { 'guard_permissions' : '; '.join( permissions )
+ , 'guard_roles' : '; '.join( roles )
+ , 'guard_groups' : '; '.join( groups )
+ , 'guard_expr' : expr
+ }
+
+ def _assertGuard( self, info, permissions, roles, groups, expr ):
+
+ self.assertEqual( len( info[ 'guard_permissions' ] )
+ , len( permissions ) )
+
+ for expected in permissions:
+ self.failUnless( expected in info[ 'guard_permissions' ] )
+
+ self.assertEqual( len( info[ 'guard_roles' ] )
+ , len( roles ) )
+
+ for expected in roles:
+ self.failUnless( expected in info[ 'guard_roles' ] )
+
+ self.assertEqual( len( info[ 'guard_groups' ] )
+ , len( groups ) )
+
+ for expected in groups:
+ self.failUnless( expected in info[ 'guard_groups' ] )
+
+ self.assertEqual( info[ 'guard_expr' ], expr )
+
+
+class _WorkflowSetup(WorkflowSetupBase):
+
+ def setUp(self):
+ WorkflowSetupBase.setUp(self)
+ zcml.load_config('permissions.zcml', Products.Five)
+ zcml.load_config('configure.zcml', Products.DCWorkflow)
+ zcml.load_config('configure.zcml', Products.GenericSetup.PythonScripts)
+ zcml.load_string(_DUMMY_ZCML)
+
+ def _initDCWorkflow( self, workflow_id ):
+
+ wf_tool = self.root.site.portal_workflow
+ wf_tool._setObject( workflow_id, DCWorkflowDefinition( workflow_id ) )
+
+ return wf_tool._getOb( workflow_id )
+
+ def _initVariables( self, dcworkflow ):
+
+ for id, args in _WF_VARIABLES.items():
+
+ dcworkflow.variables.addVariable( id )
+ variable = dcworkflow.variables._getOb( id )
+
+ ( descr, def_val, def_exp, for_cat, for_stat, upd_alw
+ ) = args[ :-4 ]
+
+ variable.setProperties( description=args[0]
+ , default_value=args[1]
+ , default_expr=args[2]
+ , for_catalog=args[3]
+ , for_status=args[4]
+ , update_always=args[5]
+ , props=self._genGuardProps( *args[ -4: ] )
+ )
+
+ def _initStates( self, dcworkflow ):
+
+ dcworkflow.groups = _WF_GROUPS
+
+ for k, v in _WF_STATES.items():
+
+ dcworkflow.states.addState( k )
+ state = dcworkflow.states._getOb( k )
+
+ state.setProperties( title=v[ 0 ]
+ , description=v[ 1 ]
+ , transitions=v[ 2 ]
+ )
+ if not v[ 3 ]:
+ state.permission_roles = {}
+
+ for permission, roles in v[ 3 ].items():
+ state.setPermission( permission
+ , not isinstance( roles, tuple )
+ , roles
+ )
+ faux_request = {}
+
+ for group_id, roles in v[ 4 ]:
+ for role in roles:
+ faux_request[ '%s|%s' % ( group_id, role ) ] = True
+
+ state.setGroups( REQUEST=faux_request )
+
+ for k, v in v[ 5 ].items():
+ state.addVariable( k, v )
+
+ def _initTransitions( self, dcworkflow ):
+
+ for k, v in _WF_TRANSITIONS.items():
+
+ dcworkflow.transitions.addTransition( k )
+ transition = dcworkflow.transitions._getOb( k )
+
+ transition.setProperties( title=v[ 0 ]
+ , description=v[ 1 ]
+ , new_state_id=v[ 2 ]
+ , trigger_type=v[ 3 ]
+ , script_name=v[ 4 ]
+ , after_script_name=v[ 5 ]
+ , actbox_name=v[ 6 ]
+ , actbox_url=v[ 7 ]
+ , actbox_category=v[ 8 ]
+ , props=self._genGuardProps( *v[ -4: ] )
+ )
+
+ for k, v in v[ 9 ].items():
+ transition.addVariable( k, v )
+
+ def _initWorklists( self, dcworkflow ):
+
+ for k, v in _WF_WORKLISTS.items():
+
+ dcworkflow.worklists.addWorklist( k )
+ worklist = dcworkflow.worklists._getOb( k )
+
+ worklist.title = v[ 0 ]
+
+ props=self._genGuardProps( *v[ -4: ] )
+
+ for var_id, matches in v[ 2 ].items():
+ props[ 'var_match_%s' % var_id ] = ';'.join( matches )
+
+ worklist.setProperties( description=v[ 1 ]
+ , actbox_name=v[ 3 ]
+ , actbox_url=v[ 4 ]
+ , actbox_category=v[ 5 ]
+ , props=props
+ )
+
+ def _initScripts( self, dcworkflow ):
+
+ for k, v in _WF_SCRIPTS.items():
+
+ if v[ 0 ] == PythonScript.meta_type:
+ script = PythonScript( k )
+ script.write( v[ 1 ] )
+
+ elif v[ 0 ] == ExternalMethod.meta_type:
+ script = ExternalMethod(k,'', v[3], v[4])
+
+ else:
+ raise ValueError, 'Unknown script type: %s' % v[ 0 ]
+
+ dcworkflow.scripts._setObject( k, script )
+
+
+class WorkflowDefinitionConfiguratorTests( _WorkflowSetup, _GuardChecker ):
+
+ def _getTargetClass( self ):
+ from Products.DCWorkflow.exportimport \
+ import WorkflowDefinitionConfigurator
+
+ return WorkflowDefinitionConfigurator
+
+ def test_getWorkflowInfo_dcworkflow_defaults( self ):
+
+ WF_ID = 'dcworkflow_defaults'
+
+ site = self._initSite()
+ dcworkflow = self._initDCWorkflow( WF_ID )
+
+ configurator = self._makeOne(dcworkflow).__of__(site)
+ info = configurator.getWorkflowInfo( WF_ID )
+
+ self.assertEqual( info[ 'id' ], WF_ID )
+ self.assertEqual( info[ 'meta_type' ], DCWorkflowDefinition.meta_type )
+ self.assertEqual( info[ 'title' ], dcworkflow.title )
+
+ self.assertEqual( info[ 'state_variable' ], dcworkflow.state_var )
+
+ self.assertEqual( len( info[ 'permissions' ] ), 0 )
+ self.assertEqual( len( info[ 'variable_info' ] ), 0 )
+ self.assertEqual( len( info[ 'state_info' ] ), 0 )
+ self.assertEqual( len( info[ 'transition_info' ] ), 0 )
+
+ def test_getWorkflowInfo_dcworkflow_permissions( self ):
+
+ WF_ID = 'dcworkflow_permissions'
+
+ site = self._initSite()
+ dcworkflow = self._initDCWorkflow( WF_ID )
+ dcworkflow.permissions = _WF_PERMISSIONS
+
+ configurator = self._makeOne(dcworkflow).__of__(site)
+ info = configurator.getWorkflowInfo( WF_ID )
+
+ permissions = info[ 'permissions' ]
+ self.assertEqual( len( permissions ), len( _WF_PERMISSIONS ) )
+
+ for permission in _WF_PERMISSIONS:
+ self.failUnless( permission in permissions )
+
+ def test_getWorkflowInfo_dcworkflow_variables( self ):
+
+ WF_ID = 'dcworkflow_variables'
+
+ site = self._initSite()
+ dcworkflow = self._initDCWorkflow( WF_ID )
+ self._initVariables( dcworkflow )
+
+ configurator = self._makeOne(dcworkflow).__of__(site)
+ info = configurator.getWorkflowInfo( WF_ID )
+
+ variable_info = info[ 'variable_info' ]
+ self.assertEqual( len( variable_info ), len( _WF_VARIABLES ) )
+
+ ids = [ x[ 'id' ] for x in variable_info ]
+
+ for k in _WF_VARIABLES.keys():
+ self.failUnless( k in ids )
+
+ for info in variable_info:
+
+ expected = _WF_VARIABLES[ info[ 'id' ] ]
+
+ self.assertEqual( info[ 'description' ], expected[ 0 ] )
+ self.assertEqual( info[ 'default_value' ], expected[ 1 ] )
+ self.assertEqual( info[ 'default_expr' ], expected[ 2 ] )
+ self.assertEqual( info[ 'for_catalog' ], expected[ 3 ] )
+ self.assertEqual( info[ 'for_status' ], expected[ 4 ] )
+ self.assertEqual( info[ 'update_always' ], expected[ 5 ] )
+
+ self._assertGuard( info, *expected[ -4: ] )
+
+ def test_getWorkflowInfo_dcworkflow_states( self ):
+
+ WF_ID = 'dcworkflow_states'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+ dcworkflow = self._initDCWorkflow( WF_ID )
+ dcworkflow.initial_state = WF_INITIAL_STATE
+ self._initStates( dcworkflow )
+
+ configurator = self._makeOne(dcworkflow).__of__(site)
+ info = configurator.getWorkflowInfo( WF_ID )
+
+ self.assertEqual( info[ 'state_variable' ], dcworkflow.state_var )
+ self.assertEqual( info[ 'initial_state' ], dcworkflow.initial_state )
+
+ state_info = info[ 'state_info' ]
+ self.assertEqual( len( state_info ), len( _WF_STATES ) )
+
+ ids = [ x[ 'id' ] for x in state_info ]
+
+ for k in _WF_STATES.keys():
+ self.failUnless( k in ids )
+
+ for info in state_info:
+
+ expected = _WF_STATES[ info[ 'id' ] ]
+
+ self.assertEqual( info[ 'title' ], expected[ 0 ] )
+ self.assertEqual( info[ 'description' ], expected[ 1 ] )
+ self.assertEqual( info[ 'transitions' ], expected[ 2 ] )
+
+ permissions = info[ 'permissions' ]
+
+ self.assertEqual( len( permissions ), len( expected[ 3 ] ) )
+
+ for ep_id, ep_roles in expected[ 3 ].items():
+
+ fp = [ x for x in permissions if x[ 'name' ] == ep_id ][ 0 ]
+
+ self.assertEqual( fp[ 'acquired' ]
+ , not isinstance( ep_roles, tuple ) )
+
+ self.assertEqual( len( fp[ 'roles' ] ), len( ep_roles ) )
+
+ for ep_role in ep_roles:
+ self.failUnless( ep_role in fp[ 'roles' ] )
+
+ groups = info[ 'groups' ]
+ self.assertEqual( len( groups ), len( expected[ 4 ] ) )
+
+ for i in range( len( groups ) ):
+ self.assertEqual( groups[ i ], expected[ 4 ][ i ] )
+
+ variables = info[ 'variables' ]
+ self.assertEqual( len( variables ), len( expected[ 5 ] ) )
+
+ for v_info in variables:
+
+ name, type, value = ( v_info[ 'name' ]
+ , v_info[ 'type' ], v_info[ 'value' ] )
+
+ self.assertEqual( value, expected[ 5 ][ name ] )
+
+ if isinstance( value, bool ):
+ self.assertEqual( type, 'bool' )
+ elif isinstance( value, int ):
+ self.assertEqual( type, 'int' )
+ elif isinstance( value, float ):
+ self.assertEqual( type, 'float' )
+ elif isinstance( value, basestring ):
+ self.assertEqual( type, 'string' )
+
+ def test_getWorkflowInfo_dcworkflow_transitions( self ):
+
+ from Products.DCWorkflow.exportimport import TRIGGER_TYPES
+
+ WF_ID = 'dcworkflow_transitions'
+
+ site = self._initSite()
+ dcworkflow = self._initDCWorkflow( WF_ID )
+ self._initTransitions( dcworkflow )
+
+ configurator = self._makeOne(dcworkflow).__of__(site)
+ info = configurator.getWorkflowInfo( WF_ID )
+
+ transition_info = info[ 'transition_info' ]
+ self.assertEqual( len( transition_info ), len( _WF_TRANSITIONS ) )
+
+ ids = [ x[ 'id' ] for x in transition_info ]
+
+ for k in _WF_TRANSITIONS.keys():
+ self.failUnless( k in ids )
+
+ for info in transition_info:
+
+ expected = _WF_TRANSITIONS[ info[ 'id' ] ]
+
+ self.assertEqual( info[ 'title' ], expected[ 0 ] )
+ self.assertEqual( info[ 'description' ], expected[ 1 ] )
+ self.assertEqual( info[ 'new_state_id' ], expected[ 2 ] )
+ self.assertEqual( info[ 'trigger_type' ]
+ , TRIGGER_TYPES[ expected[ 3 ] ] )
+ self.assertEqual( info[ 'script_name' ], expected[ 4 ] )
+ self.assertEqual( info[ 'after_script_name' ], expected[ 5 ] )
+ self.assertEqual( info[ 'actbox_name' ], expected[ 6 ] )
+ self.assertEqual( info[ 'actbox_url' ], expected[ 7 ] )
+ self.assertEqual( info[ 'actbox_category' ], expected[ 8 ] )
+
+ variables = info[ 'variables' ]
+ self.assertEqual( len( variables ), len( expected[ 9 ] ) )
+
+ for v_info in variables:
+ self.assertEqual( v_info[ 'expr' ]
+ , expected[ 9 ][ v_info[ 'name' ] ] )
+
+ self._assertGuard( info, *expected[ -4: ] )
+
+ def test_getWorkflowInfo_dcworkflow_worklists( self ):
+
+ WF_ID = 'dcworkflow_worklists'
+
+ site = self._initSite()
+ dcworkflow = self._initDCWorkflow( WF_ID )
+ self._initWorklists( dcworkflow )
+
+ configurator = self._makeOne(dcworkflow).__of__(site)
+ info = configurator.getWorkflowInfo( WF_ID )
+
+ worklist_info = info[ 'worklist_info' ]
+ self.assertEqual( len( worklist_info ), len( _WF_WORKLISTS ) )
+
+ ids = [ x[ 'id' ] for x in worklist_info ]
+
+ for k in _WF_WORKLISTS.keys():
+ self.failUnless( k in ids )
+
+ for info in worklist_info:
+
+ expected = _WF_WORKLISTS[ info[ 'id' ] ]
+
+ self.assertEqual( info[ 'title' ], expected[ 0 ] )
+ self.assertEqual( info[ 'description' ], expected[ 1 ] )
+ self.assertEqual( info[ 'actbox_name' ], expected[ 3 ] )
+ self.assertEqual( info[ 'actbox_url' ], expected[ 4 ] )
+ self.assertEqual( info[ 'actbox_category' ], expected[ 5 ] )
+
+ var_match = info[ 'var_match' ]
+ self.assertEqual( len( var_match ), len( expected[ 2 ] ) )
+
+ for var_id, values_txt in var_match:
+
+ values = [ x.strip() for x in values_txt.split( ';' ) ]
+ e_values = expected[ 2 ][ var_id ]
+ self.assertEqual( len( values ), len( e_values ) )
+
+ for e_value in e_values:
+ self.failUnless( e_value in values )
+
+ self._assertGuard( info, *expected[ -4: ] )
+
+ def test_getWorkflowInfo_dcworkflow_scripts( self ):
+
+ WF_ID = 'dcworkflow_scripts'
+
+ site = self._initSite()
+ dcworkflow = self._initDCWorkflow( WF_ID )
+ self._initScripts( dcworkflow )
+
+ configurator = self._makeOne(dcworkflow).__of__(site)
+ info = configurator.getWorkflowInfo( WF_ID )
+
+ script_info = info[ 'script_info' ]
+ self.assertEqual( len( script_info ), len( _WF_SCRIPTS ) )
+
+ ids = [ x[ 'id' ] for x in script_info ]
+
+ for k in _WF_SCRIPTS.keys():
+ self.failUnless( k in ids )
+
+ for info in script_info:
+
+ expected = _WF_SCRIPTS[ info[ 'id' ] ]
+
+ self.assertEqual( info[ 'meta_type' ], expected[ 0 ] )
+
+ if info[ 'meta_type' ] == PythonScript.meta_type:
+ self.assertEqual( info[ 'filename' ]
+ , expected[ 2 ] % WF_ID )
+ else:
+ self.assertEqual( info[ 'filename' ], expected[ 2 ] )
+
+ def test_generateXML_empty( self ):
+
+ WF_ID = 'empty'
+ WF_TITLE = 'Empty DCWorkflow'
+ WF_INITIAL_STATE = 'initial'
+
+ site = self._initSite()
+ dcworkflow = self._initDCWorkflow( WF_ID )
+ dcworkflow.title = WF_TITLE
+ dcworkflow.initial_state = WF_INITIAL_STATE
+
+ configurator = self._makeOne(dcworkflow).__of__(site)
+
+ self._compareDOM( configurator.generateWorkflowXML()
+ , _EMPTY_WORKFLOW_EXPORT % ( WF_ID
+ , WF_TITLE
+ , WF_INITIAL_STATE
+ ) )
+
+ def test_generateWorkflowXML_normal( self ):
+
+ WF_ID = 'normal'
+ WF_TITLE = 'Normal DCWorkflow'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+ dcworkflow = self._initDCWorkflow( WF_ID )
+ dcworkflow.title = WF_TITLE
+ dcworkflow.initial_state = WF_INITIAL_STATE
+ dcworkflow.permissions = _WF_PERMISSIONS
+ self._initVariables( dcworkflow )
+ self._initStates( dcworkflow )
+ self._initTransitions( dcworkflow )
+ self._initWorklists( dcworkflow )
+ self._initScripts( dcworkflow )
+
+ configurator = self._makeOne(dcworkflow).__of__(site)
+
+ self._compareDOM( configurator.generateWorkflowXML()
+ , _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID
+ , 'title' : WF_TITLE
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID.replace(' ', '_')
+ } )
+
+ def test_generateWorkflowXML_multiple( self ):
+
+ WF_ID_1 = 'dc1'
+ WF_TITLE_1 = 'Normal DCWorkflow #1'
+ WF_ID_2 = 'dc2'
+ WF_TITLE_2 = 'Normal DCWorkflow #2'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+
+ dcworkflow_1 = self._initDCWorkflow( WF_ID_1 )
+ dcworkflow_1.title = WF_TITLE_1
+ dcworkflow_1.initial_state = WF_INITIAL_STATE
+ dcworkflow_1.permissions = _WF_PERMISSIONS
+ self._initVariables( dcworkflow_1 )
+ self._initStates( dcworkflow_1 )
+ self._initTransitions( dcworkflow_1 )
+ self._initWorklists( dcworkflow_1 )
+ self._initScripts( dcworkflow_1 )
+
+ dcworkflow_2 = self._initDCWorkflow( WF_ID_2 )
+ dcworkflow_2.title = WF_TITLE_2
+ dcworkflow_2.initial_state = WF_INITIAL_STATE
+ dcworkflow_2.permissions = _WF_PERMISSIONS
+ self._initVariables( dcworkflow_2 )
+ self._initStates( dcworkflow_2 )
+ self._initTransitions( dcworkflow_2 )
+ self._initWorklists( dcworkflow_2 )
+ self._initScripts( dcworkflow_2 )
+
+ configurator = self._makeOne(dcworkflow_1).__of__(site)
+
+ self._compareDOM( configurator.generateWorkflowXML()
+ , _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID_1
+ , 'title' : WF_TITLE_1
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID_1.replace(' ', '_')
+ } )
+
+ configurator = self._makeOne(dcworkflow_2).__of__(site)
+
+ self._compareDOM( configurator.generateWorkflowXML()
+ , _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID_2
+ , 'title' : WF_TITLE_2
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID_2.replace(' ', '_')
+ } )
+
+ def test_parseWorkflowXML_empty( self ):
+
+ WF_ID = 'empty'
+ WF_TITLE = 'Empty DCWorkflow'
+ WF_INITIAL_STATE = 'initial'
+
+ site = self._initSite()
+
+ configurator = self._makeOne( site ).__of__( site )
+
+ ( workflow_id
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ ) = configurator.parseWorkflowXML( _EMPTY_WORKFLOW_EXPORT
+ % ( WF_ID
+ , WF_TITLE
+ , WF_INITIAL_STATE
+ ) )
+
+ self.assertEqual( len( states ), 0 )
+ self.assertEqual( len( transitions ), 0 )
+ self.assertEqual( len( variables ), 0 )
+ self.assertEqual( len( worklists ), 0 )
+ self.assertEqual( len( permissions ), 0 )
+ self.assertEqual( len( scripts ), 0 )
+
+ def test_parseWorkflowXML_normal_attribs( self ):
+
+ WF_ID = 'normal'
+ WF_TITLE = 'Normal DCWorkflow'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+
+ configurator = self._makeOne( site ).__of__( site )
+
+ ( workflow_id
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ ) = configurator.parseWorkflowXML(
+ _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID
+ , 'title' : WF_TITLE
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID.replace(' ', '_')
+ } )
+
+ self.assertEqual( workflow_id, WF_ID )
+ self.assertEqual( title, WF_TITLE )
+ self.assertEqual( state_variable, 'state' )
+ self.assertEqual( initial_state, WF_INITIAL_STATE )
+
+ def test_parseWorkflowXML_normal_states( self ):
+
+ WF_ID = 'normal'
+ WF_TITLE = 'Normal DCWorkflow'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+
+ configurator = self._makeOne( site ).__of__( site )
+
+ ( workflow_id
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ ) = configurator.parseWorkflowXML(
+ _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID
+ , 'title' : WF_TITLE
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID.replace(' ', '_')
+ } )
+
+ self.assertEqual( len( states ), len( _WF_STATES ) )
+
+ for state in states:
+
+ state_id = state[ 'state_id' ]
+ self.failUnless( state_id in _WF_STATES )
+
+ expected = _WF_STATES[ state_id ]
+
+ self.assertEqual( state[ 'title' ], expected[ 0 ] )
+
+ description = ''.join( state[ 'description' ] )
+ self.failUnless( expected[ 1 ] in description )
+
+ self.assertEqual( tuple( state[ 'transitions' ] ), expected[ 2 ] )
+ self.assertEqual( state[ 'permissions' ], expected[ 3 ] )
+ self.assertEqual( tuple( state[ 'groups' ] )
+ , tuple( expected[ 4 ] ) )
+
+ for k, v_info in state[ 'variables' ].items():
+
+ exp_value = expected[ 5 ][ k ]
+ self.assertEqual( v_info[ 'value' ], str( exp_value ) )
+
+ if isinstance( exp_value, bool ):
+ self.assertEqual( v_info[ 'type' ], 'bool' )
+ elif isinstance( exp_value, int ):
+ self.assertEqual( v_info[ 'type' ], 'int' )
+ elif isinstance( exp_value, float ):
+ self.assertEqual( v_info[ 'type' ], 'float' )
+ elif isinstance( exp_value, basestring ):
+ self.assertEqual( v_info[ 'type' ], 'string' )
+
+ def test_parseWorkflowXML_normal_transitions( self ):
+
+ from Products.DCWorkflow.exportimport import TRIGGER_TYPES
+
+ WF_ID = 'normal'
+ WF_TITLE = 'Normal DCWorkflow'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+
+ configurator = self._makeOne( site ).__of__( site )
+
+ ( workflow_id
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ ) = configurator.parseWorkflowXML(
+ _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID
+ , 'title' : WF_TITLE
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID.replace(' ', '_')
+ } )
+
+ self.assertEqual( len( transitions ), len( _WF_TRANSITIONS ) )
+
+ for transition in transitions:
+
+ transition_id = transition[ 'transition_id' ]
+ self.failUnless( transition_id in _WF_TRANSITIONS )
+
+ expected = _WF_TRANSITIONS[ transition_id ]
+
+ self.assertEqual( transition[ 'title' ], expected[ 0 ] )
+
+ description = ''.join( transition[ 'description' ] )
+ self.failUnless( expected[ 1 ] in description )
+
+ self.assertEqual( transition[ 'new_state' ], expected[ 2 ] )
+ self.assertEqual( transition[ 'trigger' ]
+ , TRIGGER_TYPES[ expected[ 3 ] ] )
+ self.assertEqual( transition[ 'before_script' ], expected[ 4 ] )
+ self.assertEqual( transition[ 'after_script' ]
+ , expected[ 5 ] )
+
+ action = transition[ 'action' ]
+ self.assertEqual( action.get( 'name', '' ), expected[ 6 ] )
+ self.assertEqual( action.get( 'url', '' ), expected[ 7 ] )
+ self.assertEqual( action.get( 'category', '' ), expected[ 8 ] )
+
+ self.assertEqual( transition[ 'variables' ], expected[ 9 ] )
+
+ guard = transition[ 'guard' ]
+ self.assertEqual( tuple( guard.get( 'permissions', () ) )
+ , expected[ 10 ] )
+ self.assertEqual( tuple( guard.get( 'roles', () ) )
+ , expected[ 11 ] )
+ self.assertEqual( tuple( guard.get( 'groups', () ) )
+ , expected[ 12 ] )
+ self.assertEqual( guard.get( 'expression', '' ), expected[ 13 ] )
+
+ def test_parseWorkflowXML_normal_variables( self ):
+
+ from Products.DCWorkflow.exportimport import TRIGGER_TYPES
+
+ WF_ID = 'normal'
+ WF_TITLE = 'Normal DCWorkflow'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+
+ configurator = self._makeOne( site ).__of__( site )
+
+ ( workflow_id
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ ) = configurator.parseWorkflowXML(
+ _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID
+ , 'title' : WF_TITLE
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID.replace(' ', '_')
+ } )
+
+ self.assertEqual( len( variables ), len( _WF_VARIABLES ) )
+
+ for variable in variables:
+
+ variable_id = variable[ 'variable_id' ]
+ self.failUnless( variable_id in _WF_VARIABLES )
+
+ expected = _WF_VARIABLES[ variable_id ]
+
+ description = ''.join( variable[ 'description' ] )
+ self.failUnless( expected[ 0 ] in description )
+
+ default = variable[ 'default' ]
+ self.assertEqual( default[ 'value' ], expected[ 1 ] )
+
+ exp_type = 'n/a'
+
+ if expected[ 1 ]:
+ exp_value = expected[ 1 ]
+
+ if isinstance( exp_value, bool ):
+ exp_type = 'bool'
+ elif isinstance( exp_value, int ):
+ exp_type = 'int'
+ elif isinstance( exp_value, float ):
+ exp_type = 'float'
+ elif isinstance( exp_value, basestring ):
+ exp_type = 'string'
+ else:
+ exp_type = 'XXX'
+
+ self.assertEqual( default[ 'type' ], exp_type )
+ self.assertEqual( default[ 'expression' ], expected[ 2 ] )
+
+ self.assertEqual( variable[ 'for_catalog' ], expected[ 3 ] )
+ self.assertEqual( variable[ 'for_status' ], expected[ 4 ] )
+ self.assertEqual( variable[ 'update_always' ], expected[ 5 ] )
+
+ guard = variable[ 'guard' ]
+ self.assertEqual( tuple( guard.get( 'permissions', () ) )
+ , expected[ 6 ] )
+ self.assertEqual( tuple( guard.get( 'roles', () ) )
+ , expected[ 7 ] )
+ self.assertEqual( tuple( guard.get( 'groups', () ) )
+ , expected[ 8 ] )
+ self.assertEqual( guard.get( 'expression', '' ), expected[ 9 ] )
+
+ def test_parseWorkflowXML_normal_worklists( self ):
+
+ from Products.DCWorkflow.exportimport import TRIGGER_TYPES
+
+ WF_ID = 'normal'
+ WF_TITLE = 'Normal DCWorkflow'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+
+ configurator = self._makeOne( site ).__of__( site )
+
+ ( workflow_id
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ ) = configurator.parseWorkflowXML(
+ _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID
+ , 'title' : WF_TITLE
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID.replace(' ', '_')
+ } )
+
+ self.assertEqual( len( worklists ), len( _WF_WORKLISTS ) )
+
+ for worklist in worklists:
+
+ worklist_id = worklist[ 'worklist_id' ]
+ self.failUnless( worklist_id in _WF_WORKLISTS )
+
+ expected = _WF_WORKLISTS[ worklist_id ]
+
+ self.assertEqual( worklist[ 'title' ], expected[ 0 ] )
+
+ description = ''.join( worklist[ 'description' ] )
+ self.failUnless( expected[ 1 ] in description )
+
+ self.assertEqual( tuple( worklist[ 'match' ] )
+ , tuple( expected[ 2 ] ) )
+
+ action = worklist[ 'action' ]
+ self.assertEqual( action.get( 'name', '' ), expected[ 3 ] )
+ self.assertEqual( action.get( 'url', '' ), expected[ 4 ] )
+ self.assertEqual( action.get( 'category', '' ), expected[ 5 ] )
+
+ guard = worklist[ 'guard' ]
+ self.assertEqual( tuple( guard.get( 'permissions', () ) )
+ , expected[ 6 ] )
+ self.assertEqual( tuple( guard.get( 'roles', () ) )
+ , expected[ 7 ] )
+ self.assertEqual( tuple( guard.get( 'groups', () ) )
+ , expected[ 8 ] )
+ self.assertEqual( guard.get( 'expression', '' ), expected[ 9 ] )
+
+ def test_parseWorkflowXML_normal_permissions( self ):
+
+ from Products.DCWorkflow.exportimport import TRIGGER_TYPES
+
+ WF_ID = 'normal'
+ WF_TITLE = 'Normal DCWorkflow'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+
+ configurator = self._makeOne( site ).__of__( site )
+
+ ( workflow_id
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ ) = configurator.parseWorkflowXML(
+ _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID
+ , 'title' : WF_TITLE
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID.replace(' ', '_')
+ } )
+
+ self.assertEqual( len( permissions ), len( _WF_PERMISSIONS ) )
+
+ for permission in permissions:
+
+ self.failUnless( permission in _WF_PERMISSIONS )
+
+ def test_parseWorkflowXML_normal_scripts( self ):
+
+ from Products.DCWorkflow.exportimport import TRIGGER_TYPES
+
+ WF_ID = 'normal'
+ WF_TITLE = 'Normal DCWorkflow'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+
+ configurator = self._makeOne( site ).__of__( site )
+
+ ( workflow_id
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ ) = configurator.parseWorkflowXML(
+ _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID
+ , 'title' : WF_TITLE
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID.replace(' ', '_')
+ } )
+
+ self.assertEqual( len( scripts ), len( _WF_SCRIPTS ) )
+
+ for script in scripts:
+
+ script_id = script[ 'script_id' ]
+ self.failUnless( script_id in _WF_SCRIPTS )
+
+ expected = _WF_SCRIPTS[ script_id ]
+
+ self.assertEqual( script[ 'meta_type' ], expected[ 0 ] )
+
+ # Body is not kept as part of the workflow XML
+
+ if script[ 'meta_type' ] == PythonScript.meta_type:
+ self.assertEqual( script[ 'filename' ]
+ , expected[ 2 ] % workflow_id )
+ else:
+ self.assertEqual( script[ 'filename' ], expected[ 2 ] )
+
+
+_WF_PERMISSIONS = \
+( 'Open content for modifications'
+, 'Modify content'
+, 'Query history'
+, 'Restore expired content'
+)
+
+_WF_GROUPS = \
+( 'Content_owners'
+, 'Content_assassins'
+)
+
+_WF_VARIABLES = \
+{ 'when_opened': ( 'Opened when'
+ , ''
+ , "python:None"
+ , True
+ , False
+ , True
+ , ( 'Query history', 'Open content for modifications' )
+ , ()
+ , ()
+ , ""
+ )
+, 'when_expired': ( 'Expired when'
+ , ''
+ , "nothing"
+ , True
+ , False
+ , True
+ , ( 'Query history', 'Open content for modifications' )
+ , ()
+ , ()
+ , ""
+ )
+, 'killed_by': ( 'Killed by'
+ , 'n/a'
+ , ""
+ , True
+ , False
+ , True
+ , ()
+ , ( 'Hangman', 'Sherrif' )
+ , ()
+ , ""
+ )
+}
+
+_WF_STATES = \
+{ 'closed': ( 'Closed'
+ , 'Closed for modifications'
+ , ( 'open', 'kill', 'expire' )
+ , { 'Modify content': () }
+ , ()
+ , { 'is_opened': False, 'is_closed': True }
+ )
+, 'opened': ( 'Opened'
+ , 'Open for modifications'
+ , ( 'close', 'kill', 'expire' )
+ , { 'Modify content': [ 'Owner', 'Manager' ] }
+ , [ ( 'Content_owners', ( 'Owner', ) ) ]
+ , { 'is_opened': True, 'is_closed': False }
+ )
+, 'killed': ( 'Killed'
+ , 'Permanently unavailable'
+ , ()
+ , {}
+ , ()
+ , {}
+ )
+, 'expired': ( 'Expired'
+ , 'Expiration date has passed'
+ , ( 'open', )
+ , { 'Modify content': [ 'Owner', 'Manager' ] }
+ , ()
+ , { 'is_opened': False, 'is_closed': False }
+ )
+}
+
+_WF_TRANSITIONS = \
+{ 'open': ( 'Open'
+ , 'Open the object for modifications'
+ , 'opened'
+ , TRIGGER_USER_ACTION
+ , 'before_open'
+ , ''
+ , 'Open'
+ , 'string:${object_url}/open_for_modifications'
+ , 'workflow'
+ , { 'when_opened' : 'object/ZopeTime' }
+ , ( 'Open content for modifications', )
+ , ()
+ , ()
+ , ""
+ )
+, 'close': ( 'Close'
+ , 'Close the object for modifications'
+ , 'closed'
+ , TRIGGER_USER_ACTION
+ , ''
+ , 'after_close'
+ , 'Close'
+ , 'string:${object_url}/close_for_modifications'
+ , 'workflow'
+ , {}
+ , ()
+ , ( 'Owner', 'Manager' )
+ , ()
+ , ""
+ )
+, 'kill': ( 'Kill'
+ , 'Make the object permanently unavailable.'
+ , 'killed'
+ , TRIGGER_USER_ACTION
+ , ''
+ , 'after_kill'
+ , 'Kill'
+ , 'string:${object_url}/kill_object'
+ , 'workflow'
+ , { 'killed_by' : 'string:${user/getId}' }
+ , ()
+ , ()
+ , ( 'Content_assassins', )
+ , ""
+ )
+, 'expire': ( 'Expire'
+ , 'Retire objects whose expiration is past.'
+ , 'expired'
+ , TRIGGER_AUTOMATIC
+ , 'before_expire'
+ , ''
+ , ''
+ , ''
+ , ''
+ , { 'when_expired' : 'object/ZopeTime' }
+ , ()
+ , ()
+ , ()
+ , "python: object.expiration() <= object.ZopeTime()"
+ )
+}
+
+_WF_WORKLISTS = \
+{ 'expired_list': ( 'Expired'
+ , 'Worklist for expired content'
+ , { 'state' : ( 'expired', ) }
+ , 'Expired items'
+ , 'string:${portal_url}/expired_items'
+ , 'workflow'
+ , ( 'Restore expired content', )
+ , ()
+ , ()
+ , ""
+ )
+, 'alive_list': ( 'Alive'
+ , 'Worklist for content not yet expired / killed'
+ , { 'state' : ( 'open', 'closed' ) }
+ , 'Expired items'
+ , 'string:${portal_url}/expired_items'
+ , 'workflow'
+ , ( 'Restore expired content', )
+ , ()
+ , ()
+ , ""
+ )
+}
+
+_BEFORE_OPEN_SCRIPT = """\
+## Script (Python) "before_open"
+##bind container=container
+##bind context=context
+##bind namespace=
+##bind script=script
+##bind subpath=traverse_subpath
+##parameters=
+##title=
+##
+return 'before_open'
+"""
+
+_AFTER_CLOSE_SCRIPT = """\
+## Script (Python) "after_close"
+##bind container=container
+##bind context=context
+##bind namespace=
+##bind script=script
+##bind subpath=traverse_subpath
+##parameters=
+##title=
+##
+return 'after_close'
+"""
+
+_AFTER_KILL_SCRIPT = """\
+## Script (Python) "after_kill"
+##bind container=container
+##bind context=context
+##bind namespace=
+##bind script=script
+##bind subpath=traverse_subpath
+##parameters=
+##title=
+##
+return 'after_kill'
+"""
+
+_WF_SCRIPTS = \
+{ 'before_open': ( PythonScript.meta_type
+ , _BEFORE_OPEN_SCRIPT
+ , 'workflows/%s/scripts/before_open.py'
+ , None
+ , None
+ )
+, 'after_close': ( PythonScript.meta_type
+ , _AFTER_CLOSE_SCRIPT
+ , 'workflows/%s/scripts/after_close.py'
+ , None
+ , None
+ )
+, 'after_kill': ( PythonScript.meta_type
+ , _AFTER_KILL_SCRIPT
+ , 'workflows/%s/scripts/after_kill.py'
+ , None
+ , None
+ )
+, 'before_expire': ( ExternalMethod.meta_type
+ , ''
+ , ''
+ , 'DCWorkflow.test_method'
+ , 'test'
+ )
+}
+
+_NORMAL_TOOL_EXPORT = """\
+<?xml version="1.0"?>
+<object name="portal_workflow" meta_type="Dummy Workflow Tool">
+ <property name="title"></property>
+ <object name="Non-DCWorkflow" meta_type="Dummy Workflow"/>
+ <object name="dcworkflow" meta_type="Workflow"/>
+ <bindings>
+ <default/>
+ </bindings>
+</object>
+"""
+
+_NORMAL_TOOL_EXPORT_WITH_FILENAME = """\
+<?xml version="1.0"?>
+<object name="portal_workflow" meta_type="Dummy Workflow Tool">
+ <property name="title"></property>
+ <object name="Non-DCWorkflow" meta_type="Dummy Workflow"/>
+ <object name="%(workflow_id)s" meta_type="Workflow"/>
+ <bindings>
+ <default/>
+ </bindings>
+</object>
+"""
+
+_FILENAME_TOOL_EXPORT = """\
+<?xml version="1.0"?>
+<object name="portal_workflow" meta_type="Dummy Workflow Tool">
+ <property name="title"></property>
+ <object name="name with spaces" meta_type="Workflow"/>
+ <bindings>
+ <default/>
+ </bindings>
+</object>
+"""
+
+_EMPTY_WORKFLOW_EXPORT = """\
+<?xml version="1.0"?>
+<dc-workflow
+ workflow_id="%s"
+ title="%s"
+ state_variable="state"
+ initial_state="%s">
+</dc-workflow>
+"""
+
+# Make sure old exports are still imported well. Changes:
+# - scripts are now in in a 'scripts' subdirectory
+_OLD_WORKFLOW_EXPORT = """\
+<?xml version="1.0"?>
+<dc-workflow
+ workflow_id="%(workflow_id)s"
+ title="%(title)s"
+ state_variable="state"
+ initial_state="%(initial_state)s">
+ <permission>Open content for modifications</permission>
+ <permission>Modify content</permission>
+ <permission>Query history</permission>
+ <permission>Restore expired content</permission>
+ <state
+ state_id="closed"
+ title="Closed">
+ <description>Closed for modifications</description>
+ <exit-transition
+ transition_id="open"/>
+ <exit-transition
+ transition_id="kill"/>
+ <exit-transition
+ transition_id="expire"/>
+ <permission-map
+ acquired="False"
+ name="Modify content">
+ </permission-map>
+ <assignment
+ name="is_closed"
+ type="bool">True</assignment>
+ <assignment
+ name="is_opened"
+ type="bool">False</assignment>
+ </state>
+ <state
+ state_id="expired"
+ title="Expired">
+ <description>Expiration date has passed</description>
+ <exit-transition
+ transition_id="open"/>
+ <permission-map
+ acquired="True"
+ name="Modify content">
+ <permission-role>Owner</permission-role>
+ <permission-role>Manager</permission-role>
+ </permission-map>
+ <assignment
+ name="is_closed"
+ type="bool">False</assignment>
+ <assignment
+ name="is_opened"
+ type="bool">False</assignment>
+ </state>
+ <state
+ state_id="killed"
+ title="Killed">
+ <description>Permanently unavailable</description>
+ </state>
+ <state
+ state_id="opened"
+ title="Opened">
+ <description>Open for modifications</description>
+ <exit-transition
+ transition_id="close"/>
+ <exit-transition
+ transition_id="kill"/>
+ <exit-transition
+ transition_id="expire"/>
+ <permission-map
+ acquired="True"
+ name="Modify content">
+ <permission-role>Owner</permission-role>
+ <permission-role>Manager</permission-role>
+ </permission-map>
+ <group-map name="Content_owners">
+ <group-role>Owner</group-role>
+ </group-map>
+ <assignment
+ name="is_closed"
+ type="bool">False</assignment>
+ <assignment
+ name="is_opened"
+ type="bool">True</assignment>
+ </state>
+ <transition
+ transition_id="close"
+ title="Close"
+ trigger="USER"
+ new_state="closed"
+ before_script=""
+ after_script="after_close">
+ <description>Close the object for modifications</description>
+ <action
+ category="workflow"
+ url="string:${object_url}/close_for_modifications">Close</action>
+ <guard>
+ <guard-role>Owner</guard-role>
+ <guard-role>Manager</guard-role>
+ </guard>
+ </transition>
+ <transition
+ transition_id="expire"
+ title="Expire"
+ trigger="AUTOMATIC"
+ new_state="expired"
+ before_script="before_expire"
+ after_script="">
+ <description>Retire objects whose expiration is past.</description>
+ <guard>
+ <guard-expression>python: object.expiration() <= object.ZopeTime()</guard-expression>
+ </guard>
+ <assignment
+ name="when_expired">object/ZopeTime</assignment>
+ </transition>
+ <transition
+ transition_id="kill"
+ title="Kill"
+ trigger="USER"
+ new_state="killed"
+ before_script=""
+ after_script="after_kill">
+ <description>Make the object permanently unavailable.</description>
+ <action
+ category="workflow"
+ url="string:${object_url}/kill_object">Kill</action>
+ <guard>
+ <guard-group>Content_assassins</guard-group>
+ </guard>
+ <assignment
+ name="killed_by">string:${user/getId}</assignment>
+ </transition>
+ <transition
+ transition_id="open"
+ title="Open"
+ trigger="USER"
+ new_state="opened"
+ before_script="before_open"
+ after_script="">
+ <description>Open the object for modifications</description>
+ <action
+ category="workflow"
+ url="string:${object_url}/open_for_modifications">Open</action>
+ <guard>
+ <guard-permission>Open content for modifications</guard-permission>
+ </guard>
+ <assignment
+ name="when_opened">object/ZopeTime</assignment>
+ </transition>
+ <worklist
+ worklist_id="alive_list"
+ title="Alive">
+ <description>Worklist for content not yet expired / killed</description>
+ <action
+ category="workflow"
+ url="string:${portal_url}/expired_items">Expired items</action>
+ <guard>
+ <guard-permission>Restore expired content</guard-permission>
+ </guard>
+ <match name="state" values="open; closed"/>
+ </worklist>
+ <worklist
+ worklist_id="expired_list"
+ title="Expired">
+ <description>Worklist for expired content</description>
+ <action
+ category="workflow"
+ url="string:${portal_url}/expired_items">Expired items</action>
+ <guard>
+ <guard-permission>Restore expired content</guard-permission>
+ </guard>
+ <match name="state" values="expired"/>
+ </worklist>
+ <variable
+ variable_id="killed_by"
+ for_catalog="True"
+ for_status="False"
+ update_always="True">
+ <description>Killed by</description>
+ <default>
+ <value type="string">n/a</value>
+ </default>
+ <guard>
+ <guard-role>Hangman</guard-role>
+ <guard-role>Sherrif</guard-role>
+ </guard>
+ </variable>
+ <variable
+ variable_id="when_expired"
+ for_catalog="True"
+ for_status="False"
+ update_always="True">
+ <description>Expired when</description>
+ <default>
+ <expression>nothing</expression>
+ </default>
+ <guard>
+ <guard-permission>Query history</guard-permission>
+ <guard-permission>Open content for modifications</guard-permission>
+ </guard>
+ </variable>
+ <variable
+ variable_id="when_opened"
+ for_catalog="True"
+ for_status="False"
+ update_always="True">
+ <description>Opened when</description>
+ <default>
+ <expression>python:None</expression>
+ </default>
+ <guard>
+ <guard-permission>Query history</guard-permission>
+ <guard-permission>Open content for modifications</guard-permission>
+ </guard>
+ </variable>
+ <script
+ script_id="after_close"
+ type="Script (Python)"
+ filename="workflows/%(workflow_filename)s/after_close.py"
+ module=""
+ function=""
+ />
+ <script
+ script_id="after_kill"
+ type="Script (Python)"
+ filename="workflows/%(workflow_filename)s/after_kill.py"
+ module=""
+ function=""
+ />
+ <script
+ script_id="before_expire"
+ type="External Method"
+ filename=""
+ module="DCWorkflow.test_method"
+ function="test"
+ />
+ <script
+ script_id="before_open"
+ type="Script (Python)"
+ filename="workflows/%(workflow_filename)s/before_open.py"
+ module=""
+ function=""
+ />
+</dc-workflow>
+"""
+
+_NORMAL_WORKFLOW_EXPORT = """\
+<?xml version="1.0"?>
+<dc-workflow
+ workflow_id="%(workflow_id)s"
+ title="%(title)s"
+ state_variable="state"
+ initial_state="%(initial_state)s">
+ <permission>Open content for modifications</permission>
+ <permission>Modify content</permission>
+ <permission>Query history</permission>
+ <permission>Restore expired content</permission>
+ <state
+ state_id="closed"
+ title="Closed">
+ <description>Closed for modifications</description>
+ <exit-transition
+ transition_id="open"/>
+ <exit-transition
+ transition_id="kill"/>
+ <exit-transition
+ transition_id="expire"/>
+ <permission-map
+ acquired="False"
+ name="Modify content">
+ </permission-map>
+ <assignment
+ name="is_closed"
+ type="bool">True</assignment>
+ <assignment
+ name="is_opened"
+ type="bool">False</assignment>
+ </state>
+ <state
+ state_id="expired"
+ title="Expired">
+ <description>Expiration date has passed</description>
+ <exit-transition
+ transition_id="open"/>
+ <permission-map
+ acquired="True"
+ name="Modify content">
+ <permission-role>Owner</permission-role>
+ <permission-role>Manager</permission-role>
+ </permission-map>
+ <assignment
+ name="is_closed"
+ type="bool">False</assignment>
+ <assignment
+ name="is_opened"
+ type="bool">False</assignment>
+ </state>
+ <state
+ state_id="killed"
+ title="Killed">
+ <description>Permanently unavailable</description>
+ </state>
+ <state
+ state_id="opened"
+ title="Opened">
+ <description>Open for modifications</description>
+ <exit-transition
+ transition_id="close"/>
+ <exit-transition
+ transition_id="kill"/>
+ <exit-transition
+ transition_id="expire"/>
+ <permission-map
+ acquired="True"
+ name="Modify content">
+ <permission-role>Owner</permission-role>
+ <permission-role>Manager</permission-role>
+ </permission-map>
+ <group-map name="Content_owners">
+ <group-role>Owner</group-role>
+ </group-map>
+ <assignment
+ name="is_closed"
+ type="bool">False</assignment>
+ <assignment
+ name="is_opened"
+ type="bool">True</assignment>
+ </state>
+ <transition
+ transition_id="close"
+ title="Close"
+ trigger="USER"
+ new_state="closed"
+ before_script=""
+ after_script="after_close">
+ <description>Close the object for modifications</description>
+ <action
+ category="workflow"
+ url="string:${object_url}/close_for_modifications">Close</action>
+ <guard>
+ <guard-role>Owner</guard-role>
+ <guard-role>Manager</guard-role>
+ </guard>
+ </transition>
+ <transition
+ transition_id="expire"
+ title="Expire"
+ trigger="AUTOMATIC"
+ new_state="expired"
+ before_script="before_expire"
+ after_script="">
+ <description>Retire objects whose expiration is past.</description>
+ <guard>
+ <guard-expression>python: object.expiration() <= object.ZopeTime()</guard-expression>
+ </guard>
+ <assignment
+ name="when_expired">object/ZopeTime</assignment>
+ </transition>
+ <transition
+ transition_id="kill"
+ title="Kill"
+ trigger="USER"
+ new_state="killed"
+ before_script=""
+ after_script="after_kill">
+ <description>Make the object permanently unavailable.</description>
+ <action
+ category="workflow"
+ url="string:${object_url}/kill_object">Kill</action>
+ <guard>
+ <guard-group>Content_assassins</guard-group>
+ </guard>
+ <assignment
+ name="killed_by">string:${user/getId}</assignment>
+ </transition>
+ <transition
+ transition_id="open"
+ title="Open"
+ trigger="USER"
+ new_state="opened"
+ before_script="before_open"
+ after_script="">
+ <description>Open the object for modifications</description>
+ <action
+ category="workflow"
+ url="string:${object_url}/open_for_modifications">Open</action>
+ <guard>
+ <guard-permission>Open content for modifications</guard-permission>
+ </guard>
+ <assignment
+ name="when_opened">object/ZopeTime</assignment>
+ </transition>
+ <worklist
+ worklist_id="alive_list"
+ title="Alive">
+ <description>Worklist for content not yet expired / killed</description>
+ <action
+ category="workflow"
+ url="string:${portal_url}/expired_items">Expired items</action>
+ <guard>
+ <guard-permission>Restore expired content</guard-permission>
+ </guard>
+ <match name="state" values="open; closed"/>
+ </worklist>
+ <worklist
+ worklist_id="expired_list"
+ title="Expired">
+ <description>Worklist for expired content</description>
+ <action
+ category="workflow"
+ url="string:${portal_url}/expired_items">Expired items</action>
+ <guard>
+ <guard-permission>Restore expired content</guard-permission>
+ </guard>
+ <match name="state" values="expired"/>
+ </worklist>
+ <variable
+ variable_id="killed_by"
+ for_catalog="True"
+ for_status="False"
+ update_always="True">
+ <description>Killed by</description>
+ <default>
+ <value type="string">n/a</value>
+ </default>
+ <guard>
+ <guard-role>Hangman</guard-role>
+ <guard-role>Sherrif</guard-role>
+ </guard>
+ </variable>
+ <variable
+ variable_id="when_expired"
+ for_catalog="True"
+ for_status="False"
+ update_always="True">
+ <description>Expired when</description>
+ <default>
+ <expression>nothing</expression>
+ </default>
+ <guard>
+ <guard-permission>Query history</guard-permission>
+ <guard-permission>Open content for modifications</guard-permission>
+ </guard>
+ </variable>
+ <variable
+ variable_id="when_opened"
+ for_catalog="True"
+ for_status="False"
+ update_always="True">
+ <description>Opened when</description>
+ <default>
+ <expression>python:None</expression>
+ </default>
+ <guard>
+ <guard-permission>Query history</guard-permission>
+ <guard-permission>Open content for modifications</guard-permission>
+ </guard>
+ </variable>
+ <script
+ script_id="after_close"
+ type="Script (Python)"
+ filename="workflows/%(workflow_filename)s/scripts/after_close.py"
+ module=""
+ function=""
+ />
+ <script
+ script_id="after_kill"
+ type="Script (Python)"
+ filename="workflows/%(workflow_filename)s/scripts/after_kill.py"
+ module=""
+ function=""
+ />
+ <script
+ script_id="before_expire"
+ type="External Method"
+ filename=""
+ module="DCWorkflow.test_method"
+ function="test"
+ />
+ <script
+ script_id="before_open"
+ type="Script (Python)"
+ filename="workflows/%(workflow_filename)s/scripts/before_open.py"
+ module=""
+ function=""
+ />
+</dc-workflow>
+"""
+
+class Test_exportWorkflow( _WorkflowSetup
+ , _GuardChecker
+ ):
+
+ def test_empty( self ):
+ from Products.CMFCore.exportimport.workflow import exportWorkflowTool
+
+ site = self._initSite()
+ context = DummyExportContext( site )
+ exportWorkflowTool( context )
+
+ self.assertEqual( len( context._wrote ), 1 )
+ filename, text, content_type = context._wrote[ 0 ]
+ self.assertEqual( filename, 'workflows.xml' )
+ self._compareDOM( text, _EMPTY_TOOL_EXPORT )
+ self.assertEqual( content_type, 'text/xml' )
+
+ def test_normal( self ):
+ from Products.CMFCore.exportimport.workflow import exportWorkflowTool
+
+ WF_ID_NON = 'non_dcworkflow'
+ WF_TITLE_NON = 'Non-DCWorkflow'
+ WF_ID_DC = 'dcworkflow'
+ WF_TITLE_DC = 'DCWorkflow'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+
+ wf_tool = site.portal_workflow
+ nondcworkflow = DummyWorkflow( WF_TITLE_NON )
+ nondcworkflow.title = WF_TITLE_NON
+ wf_tool._setObject( WF_ID_NON, nondcworkflow )
+
+ dcworkflow = self._initDCWorkflow( WF_ID_DC )
+ dcworkflow.title = WF_TITLE_DC
+ dcworkflow.initial_state = WF_INITIAL_STATE
+ dcworkflow.permissions = _WF_PERMISSIONS
+ self._initVariables( dcworkflow )
+ self._initStates( dcworkflow )
+ self._initTransitions( dcworkflow )
+ self._initWorklists( dcworkflow )
+ self._initScripts( dcworkflow )
+
+ context = DummyExportContext( site )
+ exportWorkflowTool( context )
+
+ # workflows list, wf defintion and 3 scripts
+ self.assertEqual( len( context._wrote ), 5 )
+
+ filename, text, content_type = context._wrote[ 0 ]
+ self.assertEqual( filename, 'workflows.xml' )
+ self._compareDOM( text, _NORMAL_TOOL_EXPORT )
+ self.assertEqual( content_type, 'text/xml' )
+
+ filename, text, content_type = context._wrote[ 1 ]
+ self.assertEqual( filename, 'workflows/%s/definition.xml' % WF_ID_DC )
+ self._compareDOM( text
+ , _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID_DC
+ , 'title' : WF_TITLE_DC
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID_DC.replace(' ', '_')
+ } )
+ self.assertEqual( content_type, 'text/xml' )
+
+ # just testing first script
+ filename, text, content_type = context._wrote[ 2 ]
+ self.assertEqual( filename, 'workflows/%s/scripts/after_close.py' % WF_ID_DC )
+ self.assertEqual( text, _AFTER_CLOSE_SCRIPT)
+ self.assertEqual( content_type, 'text/plain' )
+
+ def test_with_filenames( self ):
+ from Products.CMFCore.exportimport.workflow import exportWorkflowTool
+
+ WF_ID_DC = 'name with spaces'
+ WF_TITLE_DC = 'DCWorkflow with spaces'
+ WF_INITIAL_STATE = 'closed'
+
+ site = self._initSite()
+
+ dcworkflow = self._initDCWorkflow( WF_ID_DC )
+ dcworkflow.title = WF_TITLE_DC
+ dcworkflow.initial_state = WF_INITIAL_STATE
+ dcworkflow.permissions = _WF_PERMISSIONS
+ self._initVariables( dcworkflow )
+ self._initStates( dcworkflow )
+ self._initTransitions( dcworkflow )
+ self._initWorklists( dcworkflow )
+ self._initScripts( dcworkflow )
+
+ context = DummyExportContext( site )
+ exportWorkflowTool( context )
+
+ # workflows list, wf defintion and 3 scripts
+ self.assertEqual( len( context._wrote ), 5 )
+
+ filename, text, content_type = context._wrote[ 0 ]
+ self.assertEqual( filename, 'workflows.xml' )
+ self._compareDOM( text, _FILENAME_TOOL_EXPORT )
+ self.assertEqual( content_type, 'text/xml' )
+
+ filename, text, content_type = context._wrote[ 1 ]
+ self.assertEqual( filename
+ , 'workflows/name_with_spaces/definition.xml' )
+ self._compareDOM( text
+ , _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : WF_ID_DC
+ , 'title' : WF_TITLE_DC
+ , 'initial_state' : WF_INITIAL_STATE
+ , 'workflow_filename' : WF_ID_DC.replace(' ', '_')
+ } )
+ self.assertEqual( content_type, 'text/xml' )
+
+ # just testing first script
+ filename, text, content_type = context._wrote[ 2 ]
+ self.assertEqual( filename, 'workflows/%s/scripts/after_close.py' %
+ WF_ID_DC.replace(' ', '_'))
+ self.assertEqual( text, _AFTER_CLOSE_SCRIPT)
+ self.assertEqual( content_type, 'text/plain' )
+
+class Test_importWorkflow( _WorkflowSetup
+ , _GuardChecker
+ ):
+
+ def _importNormalWorkflow( self, wf_id, wf_title, wf_initial_state ):
+ from Products.CMFCore.exportimport.workflow import importWorkflowTool
+
+ site = self._initSite()
+ wf_tool = site.portal_workflow
+ workflow_filename = wf_id.replace(' ', '_')
+
+ context = DummyImportContext( site )
+ context._files[ 'workflows.xml'
+ ] = (_NORMAL_TOOL_EXPORT_WITH_FILENAME
+ % { 'workflow_id' : wf_id
+ }
+ )
+
+ context._files[ 'workflows/%s/definition.xml' % workflow_filename
+ ] = ( _NORMAL_WORKFLOW_EXPORT
+ % { 'workflow_id' : wf_id
+ , 'title' : wf_title
+ , 'initial_state' : wf_initial_state
+ , 'workflow_filename' : workflow_filename
+ }
+ )
+
+ context._files[ 'workflows/%s/scripts/after_close.py' % workflow_filename
+ ] = _AFTER_CLOSE_SCRIPT
+
+ context._files[ 'workflows/%s/scripts/after_kill.py' % workflow_filename
+ ] = _AFTER_KILL_SCRIPT
+
+ context._files[ 'workflows/%s/scripts/before_open.py' % workflow_filename
+ ] = _BEFORE_OPEN_SCRIPT
+
+ importWorkflowTool( context )
+
+ return wf_tool
+
+ def _importOldWorkflow( self, wf_id, wf_title, wf_initial_state ):
+ from Products.CMFCore.exportimport.workflow import importWorkflowTool
+
+ site = self._initSite()
+ wf_tool = site.portal_workflow
+ workflow_filename = wf_id.replace(' ', '_')
+
+ context = DummyImportContext( site )
+ context._files[ 'workflows.xml'
+ ] = (_NORMAL_TOOL_EXPORT_WITH_FILENAME
+ % { 'workflow_id' : wf_id
+ }
+ )
+
+ context._files[ 'workflows/%s/definition.xml' % workflow_filename
+ ] = ( _OLD_WORKFLOW_EXPORT
+ % { 'workflow_id' : wf_id
+ , 'title' : wf_title
+ , 'initial_state' : wf_initial_state
+ , 'workflow_filename' : workflow_filename
+ }
+ )
+
+ context._files[ 'workflows/%s/after_close.py' % workflow_filename
+ ] = _AFTER_CLOSE_SCRIPT
+
+ context._files[ 'workflows/%s/after_kill.py' % workflow_filename
+ ] = _AFTER_KILL_SCRIPT
+
+ context._files[ 'workflows/%s/before_open.py' % workflow_filename
+ ] = _BEFORE_OPEN_SCRIPT
+
+ importWorkflowTool( context )
+
+ return wf_tool
+
+ def test_empty_default_purge( self ):
+ from Products.CMFCore.exportimport.workflow import importWorkflowTool
+
+ WF_ID_NON = 'non_dcworkflow_%s'
+ WF_TITLE_NON = 'Non-DCWorkflow #%s'
+
+ site = self._initSite()
+ wf_tool = site.portal_workflow
+
+ for i in range( 4 ):
+ nondcworkflow = DummyWorkflow( WF_TITLE_NON % i )
+ nondcworkflow.title = WF_TITLE_NON % i
+ wf_tool._setObject( WF_ID_NON % i, nondcworkflow )
+
+ wf_tool._default_chain = ( WF_ID_NON % 1, )
+ wf_tool._chains_by_type[ 'sometype' ] = ( WF_ID_NON % 2, )
+ self.assertEqual( len( wf_tool.objectIds() ), 4 )
+
+ context = DummyImportContext( site )
+ context._files[ 'workflows.xml' ] = _EMPTY_TOOL_EXPORT
+ importWorkflowTool( context )
+
+ self.assertEqual( len( wf_tool.objectIds() ), 0 )
+ self.assertEqual( len( wf_tool._default_chain ), 0 )
+ self.assertEqual( len( wf_tool._chains_by_type ), 0 )
+
+ def test_empty_explicit_purge( self ):
+ from Products.CMFCore.exportimport.workflow import importWorkflowTool
+
+ WF_ID_NON = 'non_dcworkflow_%s'
+ WF_TITLE_NON = 'Non-DCWorkflow #%s'
+
+ site = self._initSite()
+ wf_tool = site.portal_workflow
+
+ for i in range( 4 ):
+ nondcworkflow = DummyWorkflow( WF_TITLE_NON % i )
+ nondcworkflow.title = WF_TITLE_NON % i
+ wf_tool._setObject( WF_ID_NON % i, nondcworkflow )
+
+ wf_tool._default_chain = ( WF_ID_NON % 1, )
+ wf_tool._chains_by_type[ 'sometype' ] = ( WF_ID_NON % 2, )
+ self.assertEqual( len( wf_tool.objectIds() ), 4 )
+
+ context = DummyImportContext( site, True )
+ context._files[ 'workflows.xml' ] = _EMPTY_TOOL_EXPORT
+ importWorkflowTool( context )
+
+ self.assertEqual( len( wf_tool.objectIds() ), 0 )
+ self.assertEqual( len( wf_tool._default_chain ), 0 )
+ self.assertEqual( len( wf_tool._chains_by_type ), 0 )
+
+ def test_empty_skip_purge( self ):
+ from Products.CMFCore.exportimport.workflow import importWorkflowTool
+
+ WF_ID_NON = 'non_dcworkflow_%s'
+ WF_TITLE_NON = 'Non-DCWorkflow #%s'
+
+ site = self._initSite()
+ wf_tool = site.portal_workflow
+
+ for i in range( 4 ):
+ nondcworkflow = DummyWorkflow( WF_TITLE_NON % i )
+ nondcworkflow.title = WF_TITLE_NON % i
+ wf_tool._setObject( WF_ID_NON % i, nondcworkflow )
+
+ wf_tool._default_chain = ( WF_ID_NON % 1, )
+ wf_tool._chains_by_type[ 'sometype' ] = ( WF_ID_NON % 2, )
+ self.assertEqual( len( wf_tool.objectIds() ), 4 )
+
+ context = DummyImportContext( site, False )
+ context._files[ 'typestool.xml' ] = _EMPTY_TOOL_EXPORT
+ importWorkflowTool( context )
+
+ self.assertEqual( len( wf_tool.objectIds() ), 4 )
+ self.assertEqual( len( wf_tool._default_chain ), 1 )
+ self.assertEqual( wf_tool._default_chain[ 0 ], WF_ID_NON % 1 )
+ self.assertEqual( len( wf_tool._chains_by_type ), 1 )
+ self.assertEqual( wf_tool._chains_by_type[ 'sometype' ]
+ , ( WF_ID_NON % 2, )
+ )
+
+ def test_bindings_skip_purge( self ):
+ from Products.CMFCore.exportimport.workflow import importWorkflowTool
+
+ WF_ID_NON = 'non_dcworkflow_%s'
+ WF_TITLE_NON = 'Non-DCWorkflow #%s'
+
+ site = self._initSite()
+ wf_tool = site.portal_workflow
+
+ for i in range( 4 ):
+ nondcworkflow = DummyWorkflow( WF_TITLE_NON % i )
+ nondcworkflow.title = WF_TITLE_NON % i
+ wf_tool._setObject( WF_ID_NON % i, nondcworkflow )
+
+ wf_tool._default_chain = ( WF_ID_NON % 1, )
+ wf_tool._chains_by_type[ 'sometype' ] = ( WF_ID_NON % 2, )
+ self.assertEqual( len( wf_tool.objectIds() ), 4 )
+
+ context = DummyImportContext( site, False )
+ context._files[ 'workflows.xml' ] = _BINDINGS_TOOL_EXPORT
+ importWorkflowTool( context )
+
+ self.assertEqual( len( wf_tool.objectIds() ), 4 )
+ self.assertEqual( len( wf_tool._default_chain ), 2 )
+ self.assertEqual( wf_tool._default_chain[ 0 ], WF_ID_NON % 0 )
+ self.assertEqual( wf_tool._default_chain[ 1 ], WF_ID_NON % 1 )
+ self.assertEqual( len( wf_tool._chains_by_type ), 2 )
+ self.assertEqual( wf_tool._chains_by_type[ 'sometype' ]
+ , ( WF_ID_NON % 2, )
+ )
+ self.assertEqual( wf_tool._chains_by_type[ 'anothertype' ]
+ , ( WF_ID_NON % 3, )
+ )
+
+ def test_from_empty_dcworkflow_top_level( self ):
+
+ WF_ID = 'dcworkflow_tool'
+ WF_TITLE = 'DC Workflow testing tool'
+ WF_INITIAL_STATE = 'closed'
+
+ tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
+
+ self.assertEqual( len( tool.objectIds() ), 2 )
+ self.assertEqual( tool.objectIds()[ 1 ], WF_ID )
+
+ def test_from_empty_dcworkflow_workflow_attrs( self ):
+
+ WF_ID = 'dcworkflow_attrs'
+ WF_TITLE = 'DC Workflow testing attrs'
+ WF_INITIAL_STATE = 'closed'
+
+ tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
+
+ workflow = tool.objectValues()[ 1 ]
+ self.assertEqual( workflow.meta_type, DCWorkflowDefinition.meta_type )
+ self.assertEqual( workflow.title, WF_TITLE )
+ self.assertEqual( workflow.state_var, 'state' )
+ self.assertEqual( workflow.initial_state, WF_INITIAL_STATE )
+
+ def test_from_empty_dcworkflow_workflow_permissions( self ):
+
+ WF_ID = 'dcworkflow_permissions'
+ WF_TITLE = 'DC Workflow testing permissions'
+ WF_INITIAL_STATE = 'closed'
+
+ tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
+
+ workflow = tool.objectValues()[ 1 ]
+
+ permissions = workflow.permissions
+ self.assertEqual( len( permissions ), len( _WF_PERMISSIONS ) )
+
+ for permission in permissions:
+ self.failUnless( permission in _WF_PERMISSIONS )
+
+ def test_from_empty_dcworkflow_workflow_variables( self ):
+
+ WF_ID = 'dcworkflow_variables'
+ WF_TITLE = 'DC Workflow testing variables'
+ WF_INITIAL_STATE = 'closed'
+
+ tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
+
+ workflow = tool.objectValues()[ 1 ]
+
+ variables = workflow.variables
+
+ self.assertEqual( len( variables.objectItems() )
+ , len( _WF_VARIABLES ) )
+
+ for id, variable in variables.objectItems():
+
+ expected = _WF_VARIABLES[ variable.getId() ]
+ self.failUnless( expected[ 0 ] in variable.description )
+ self.assertEqual( variable.default_value, expected[ 1 ] )
+ self.assertEqual( variable.getDefaultExprText(), expected[ 2 ] )
+ self.assertEqual( variable.for_catalog, expected[ 3 ] )
+ self.assertEqual( variable.for_status, expected[ 4 ] )
+ self.assertEqual( variable.update_always, expected[ 5 ] )
+
+ guard = variable.getInfoGuard()
+
+ self.assertEqual( guard.permissions, expected[ 6 ] )
+ self.assertEqual( guard.roles, expected[ 7 ] )
+ self.assertEqual( guard.groups, expected[ 8 ] )
+ self.assertEqual( guard.getExprText(), expected[ 9 ] )
+
+ def test_from_empty_dcworkflow_workflow_states( self ):
+
+ WF_ID = 'dcworkflow_states'
+ WF_TITLE = 'DC Workflow testing states'
+ WF_INITIAL_STATE = 'closed'
+
+ tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
+
+ workflow = tool.objectValues()[ 1 ]
+
+ states = workflow.states
+
+ self.assertEqual( len( states.objectItems() )
+ , len( _WF_STATES ) )
+
+ for id, state in states.objectItems():
+
+ expected = _WF_STATES[ state.getId() ]
+ self.assertEqual( state.title, expected[ 0 ] )
+ self.failUnless( expected[ 1 ] in state.description )
+
+ self.assertEqual( len( state.transitions ), len( expected[ 2 ] ) )
+
+ for transition_id in state.transitions:
+ self.failUnless( transition_id in expected[ 2 ] )
+
+ for permission in state.getManagedPermissions():
+
+ p_info = state.getPermissionInfo( permission )
+ p_expected = expected[ 3 ].get( permission, [] )
+
+ self.assertEqual( bool( p_info[ 'acquired' ] )
+ , isinstance(p_expected, list) )
+
+ self.assertEqual( len( p_info[ 'roles' ] ), len( p_expected ) )
+
+ for role in p_info[ 'roles' ]:
+ self.failIf( role not in p_expected )
+
+ group_roles = state.group_roles or {}
+ self.assertEqual( len( group_roles ), len( expected[ 4 ] ) )
+
+ for group_id, exp_roles in expected[ 4 ]:
+
+ self.assertEqual( len( state.getGroupInfo( group_id ) )
+ , len( exp_roles ) )
+
+ for role in state.getGroupInfo( group_id ):
+ self.failUnless( role in exp_roles )
+
+ self.assertEqual( len( state.getVariableValues() )
+ , len( expected[ 5 ] ) )
+
+ for var_id, value in state.getVariableValues():
+
+ self.assertEqual( value, expected[ 5 ][ var_id ] )
+
+ def test_from_empty_dcworkflow_workflow_transitions( self ):
+
+ WF_ID = 'dcworkflow_transitions'
+ WF_TITLE = 'DC Workflow testing transitions'
+ WF_INITIAL_STATE = 'closed'
+
+ tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
+
+ workflow = tool.objectValues()[ 1 ]
+
+ transitions = workflow.transitions
+
+ self.assertEqual( len( transitions.objectItems() )
+ , len( _WF_TRANSITIONS ) )
+
+ for id, transition in transitions.objectItems():
+
+ expected = _WF_TRANSITIONS[ transition.getId() ]
+ self.assertEqual( transition.title, expected[ 0 ] )
+ self.failUnless( expected[ 1 ] in transition.description )
+ self.assertEqual( transition.new_state_id, expected[ 2 ] )
+ self.assertEqual( transition.trigger_type, expected[ 3 ] )
+ self.assertEqual( transition.script_name, expected[ 4 ] )
+ self.assertEqual( transition.after_script_name, expected[ 5 ] )
+ self.assertEqual( transition.actbox_name, expected[ 6 ] )
+ self.assertEqual( transition.actbox_url, expected[ 7 ] )
+ self.assertEqual( transition.actbox_category, expected[ 8 ] )
+
+ var_exprs = transition.var_exprs
+
+ self.assertEqual( len( var_exprs ), len( expected[ 9 ] ) )
+
+ for var_id, expr in var_exprs.items():
+ self.assertEqual( expr, expected[ 9 ][ var_id ] )
+
+ guard = transition.getGuard()
+
+ self.assertEqual( guard.permissions, expected[ 10 ] )
+ self.assertEqual( guard.roles, expected[ 11 ] )
+ self.assertEqual( guard.groups, expected[ 12 ] )
+ self.assertEqual( guard.getExprText(), expected[ 13 ] )
+
+ def test_from_empty_dcworkflow_workflow_worklists( self ):
+
+ WF_ID = 'dcworkflow_worklists'
+ WF_TITLE = 'DC Workflow testing worklists'
+ WF_INITIAL_STATE = 'closed'
+
+ tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
+
+ workflow = tool.objectValues()[ 1 ]
+
+ worklists = workflow.worklists
+
+ self.assertEqual( len( worklists.objectItems() )
+ , len( _WF_WORKLISTS ) )
+
+ for id, worklist in worklists.objectItems():
+
+ expected = _WF_WORKLISTS[ worklist.getId() ]
+ self.failUnless( expected[ 1 ] in worklist.description )
+
+ var_matches = worklist.var_matches
+
+ self.assertEqual( len( var_matches ), len( expected[ 2 ] ) )
+
+ for var_id, values in var_matches.items():
+ exp_values = expected[ 2 ][ var_id ]
+ self.assertEqual( len( values ), len( exp_values ) )
+
+ for value in values:
+ self.failUnless( value in exp_values, values )
+
+ self.assertEqual( worklist.actbox_name, expected[ 3 ] )
+ self.assertEqual( worklist.actbox_url, expected[ 4 ] )
+ self.assertEqual( worklist.actbox_category, expected[ 5 ] )
+
+ guard = worklist.getGuard()
+
+ self.assertEqual( guard.permissions, expected[ 6 ] )
+ self.assertEqual( guard.roles, expected[ 7 ] )
+ self.assertEqual( guard.groups, expected[ 8 ] )
+ self.assertEqual( guard.getExprText(), expected[ 9 ] )
+
+ def test_from_old_dcworkflow_workflow_scripts( self ):
+
+ WF_ID = 'old_dcworkflow_scripts'
+ WF_TITLE = 'Old DC Workflow testing scripts'
+ WF_INITIAL_STATE = 'closed'
+
+ tool = self._importOldWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
+
+ workflow = tool.objectValues()[ 1 ]
+
+ scripts = workflow.scripts
+
+ self.assertEqual( len( scripts.objectItems() )
+ , len( _WF_SCRIPTS ) )
+
+ for script_id, script in scripts.objectItems():
+
+ expected = _WF_SCRIPTS[ script_id ]
+
+ self.assertEqual( script.meta_type, expected[ 0 ] )
+
+ if script.meta_type == PythonScript.meta_type:
+ self.assertEqual( script.manage_FTPget(), expected[ 1 ] )
+
+ def test_from_empty_dcworkflow_workflow_scripts( self ):
+
+ WF_ID = 'dcworkflow_scripts'
+ WF_TITLE = 'DC Workflow testing scripts'
+ WF_INITIAL_STATE = 'closed'
+
+ tool = self._importNormalWorkflow( WF_ID, WF_TITLE, WF_INITIAL_STATE )
+
+ workflow = tool.objectValues()[ 1 ]
+
+ scripts = workflow.scripts
+
+ self.assertEqual( len( scripts.objectItems() )
+ , len( _WF_SCRIPTS ) )
+
+ for script_id, script in scripts.objectItems():
+
+ expected = _WF_SCRIPTS[ script_id ]
+
+ self.assertEqual( script.meta_type, expected[ 0 ] )
+
+ if script.meta_type == PythonScript.meta_type:
+ self.assertEqual( script.manage_FTPget(), expected[ 1 ] )
+
+
+def test_suite():
+ return unittest.TestSuite((
+ unittest.makeSuite( WorkflowDefinitionConfiguratorTests ),
+ unittest.makeSuite( Test_exportWorkflow ),
+ unittest.makeSuite( Test_importWorkflow ),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
Modified: CMF/branches/yuppie-workflow_setup/DCWorkflow/utils.py
===================================================================
--- CMF/branches/yuppie-workflow_setup/DCWorkflow/utils.py 2005-11-22 16:08:56 UTC (rev 40320)
+++ CMF/branches/yuppie-workflow_setup/DCWorkflow/utils.py 2005-11-22 16:12:02 UTC (rev 40321)
@@ -16,15 +16,16 @@
"""
import os
+
+from AccessControl.Permission import Permission
+from AccessControl.Role import gather_permissions
+from Acquisition import aq_base
from App.Common import package_home
_dtmldir = os.path.join( package_home( globals() ), 'dtml' )
+_xmldir = os.path.join( package_home( globals() ), 'xml' )
-from AccessControl.Role import gather_permissions
-from AccessControl.Permission import Permission
-from Acquisition import aq_base
-
def ac_inherited_permissions(ob, all=0):
# Get all permissions not defined in ourself that are inherited
# This will be a sequence of tuples with a name as the first item and
Copied: CMF/branches/yuppie-workflow_setup/DCWorkflow/xml/wtcWorkflowExport.xml (from rev 40305, CMF/branches/yuppie-workflow_setup/CMFSetup/xml/wtcWorkflowExport.xml)
More information about the CMF-checkins
mailing list