[CMF-checkins] CVS: CMF/CMFSetup - workflow.py:1.7
Tres Seaver
tseaver at zope.com
Tue Jun 8 19:52:17 EDT 2004
Update of /cvs-repository/CMF/CMFSetup
In directory cvs.zope.org:/tmp/cvs-serv15327
Modified Files:
workflow.py
Log Message:
- workflow.py:
o Add SAX-based parsing of tool config XML.
o Add DOM-based parsing of Workflow Definition (currently incomplete,
parsing only top-level attributes and states).
- xml/wtcWorkflowExport.xml:
o Update template for improved parseability, and to fix bugs revealed
during parsing.
=== CMF/CMFSetup/workflow.py 1.6 => 1.7 ===
--- CMF/CMFSetup/workflow.py:1.6 Fri Jun 4 23:16:47 2004
+++ CMF/CMFSetup/workflow.py Tue Jun 8 19:51:45 2004
@@ -3,6 +3,7 @@
$Id$
"""
from xml.sax import parseString
+from xml.dom.minidom import parseString as domParseString
from AccessControl import ClassSecurityInfo
from Acquisition import Implicit
@@ -63,8 +64,8 @@
if text is not None:
- apc = WorkflowToolConfigurator( site ).__of__( site )
- apc.parseXML( text, encoding )
+ wtc = WorkflowToolConfigurator( site ).__of__( site )
+ wtc.parseXML( text, encoding )
return 'Workflows imported.'
@@ -196,6 +197,64 @@
"""
return self._workflowConfig( workflow_id=workflow_id )
+ security.declareProtected( ManagePortal, 'parseToolXML' )
+ def parseToolXML( self, xml, encoding=None ):
+
+ """ Pseudo API.
+ """
+ parser = _WorkflowToolParser( encoding )
+ parseString( xml, parser )
+
+ return parser._workflows, parser._bindings
+
+ security.declareProtected( ManagePortal, 'parseWorkflowXML' )
+ def parseWorkflowXML( self, xml, encoding=None ):
+
+ """ Pseudo API.
+ """
+ dom = domParseString( xml )
+
+ root = dom.getElementsByTagName( 'dc-workflow' )[ 0 ]
+
+ workflow_id = _getNodeAttribute( root, 'workflow_id', encoding )
+ title = _getNodeAttribute( root, 'title', encoding )
+ state_variable = _getNodeAttribute( root, 'state_variable', encoding )
+ initial_state = _getNodeAttribute( root, 'initial_state', encoding )
+
+ states = _extractStateNodes( root )
+ transitions = []
+ variables = []
+ worklists = []
+ permissions = []
+ scripts = []
+
+ return ( workflow_id
+ , title
+ , state_variable
+ , initial_state
+ , states
+ , transitions
+ , variables
+ , worklists
+ , permissions
+ , scripts
+ )
+
+ parser = _WorkflowDefinitionParser( encoding )
+ parseString( xml, parser )
+
+ return ( parser._workflow_id
+ , parser._title
+ , parser._state_variable
+ , parser._initial_state
+ , parser._states
+ , parser._transitions
+ , parser._variables
+ , parser._worklists
+ , parser._permissions
+ , parser._scripts
+ )
+
#
# Helper methods
#
@@ -563,5 +622,244 @@
return result
-
InitializeClass( WorkflowToolConfigurator )
+
+class _WorkflowToolParser( HandlerBase ):
+
+ security = ClassSecurityInfo()
+
+ def __init__( self, encoding ):
+
+ self._encoding = encoding
+ self._workflows = []
+ self._bindings = {}
+ self._binding_type = None
+
+ security.declarePrivate( 'startElement' )
+ def startElement( self, name, attrs ):
+
+ if name in ( 'workflow-tool', 'bindings' ):
+ pass
+
+ elif name == 'workflow':
+
+ workflow_id = self._extract( attrs, 'workflow_id' )
+ meta_type = self._extract( attrs, 'meta_type', workflow_id )
+
+ if meta_type == DCWorkflowDefinition.meta_type:
+
+ filename = self._extract( attrs, 'filename', workflow_id )
+
+ if filename == workflow_id:
+ filename = _getWorkflowFilename( filename )
+
+ else:
+ filename = None
+
+ self._workflows.append( ( workflow_id, meta_type, filename ) )
+
+ elif name == 'default':
+
+ self._binding_type = None
+
+ elif name == 'type':
+
+ self._binding_type = self._extract( attrs, 'type_id' )
+
+ elif name == 'bound-workflow':
+
+ workflow_id = self._extract( attrs, 'workflow_id' )
+
+ bindings = self._bindings.setdefault( self._binding_type, [] )
+ bindings.append( workflow_id )
+
+ else:
+ raise ValueError, 'Unknown element: %s' % name
+
+InitializeClass( _WorkflowToolParser )
+
+class _WorkflowDefinitionParser( HandlerBase ):
+
+ security = ClassSecurityInfo()
+
+ def __init__( self, encoding ):
+
+ self._encoding = encoding
+ self._workflow_id = None
+ self._title = None
+ self._state_variable = None
+ self._initial_state = None
+ self._states = []
+ self._transitions = []
+ self._variables = []
+ self._worklists = []
+ self._permissions = []
+ self._scripts = []
+ self._current = None
+ self._permission_map = None
+ self._permission_role = None
+
+ security.declarePrivate( 'startElement' )
+ def startElement( self, name, attrs ):
+
+ if name == 'dc-workflow':
+
+ self._workflow_id = self._extract( attrs, 'workflow_id' )
+ self._title = self._extract( attrs, 'title' )
+ self._state_variable = self._extract( attrs, 'state_variable' )
+ self._initial_state = self._extract( attrs, 'initial_state' )
+
+ elif name == 'state':
+
+ info = { 'state_id' : self._extract( attrs, 'state_id' )
+ , 'title' : self._extract( attrs, 'title' )
+ , 'description' : []
+ }
+
+ self._states.append( info )
+ self._current = info
+
+ elif name == 'permission-map':
+
+ info = { 'name' : self._extract( attrs, 'name' )
+ , 'acquired' : self._extractBoolean( attrs
+ , 'acquired', True )
+ , 'roles' : []
+ }
+
+ self._current.setdefault( 'permission_map', [] ).append( info )
+ self._permission_map = info
+
+ elif name == 'permission-role':
+
+ self._permission_role = []
+
+ security.declarePrivate( 'endElement' )
+ def endElement( self, name ):
+
+ if name == 'permission-role':
+ self._permission_map[ 'roles' ].append(
+ ''.join( self._permission_role ) )
+ self._permission_role = None
+
+ elif self._current is not None:
+ desc = ''.join( self._current[ 'description' ] )
+ self._current[ 'description' ] = desc
+
+ self._current = None
+ self._permission_map = None
+
+ security.declarePrivate( 'characters' )
+ def characters( self, text ):
+
+ if self._permission_role is not None:
+ self._permission_role.append( text )
+ elif self._current is not None and 'description' in self._current:
+ self._current[ 'description' ].append( text )
+
+
+InitializeClass( _WorkflowDefinitionParser )
+
+def _getWorkflowFilename( workflow_id ):
+
+ """ Return the name of the file which holds info for a given type.
+ """
+ return 'workflows/%s/definition.xml' % workflow_id.replace( ' ', '_' )
+
+
+def _getNodeAttribute( node, attr_name, encoding=None ):
+
+ """ Extract a string-valued attribute from node.
+ """
+ value = node.attributes[ attr_name ].nodeValue
+
+ if encoding is not None:
+ value = value.encode( encoding )
+
+ return value
+
+def _getNodeAttributeBoolean( node, attr_name ):
+
+ """ Extract a string-valued attribute from node.
+ """
+ value = node.attributes[ attr_name ].nodeValue.lower()
+
+ return value in ( 'true', 'yes', '1' )
+
+def _coalesceTextNodeChildren( node, encoding=None ):
+
+ """ Concatenate all childe text nodes into a single string.
+ """
+ from xml.dom import Node
+ fragments = []
+ node.normalize()
+ child = node.firstChild
+
+ while child is not None:
+
+ if child.nodeType == Node.TEXT_NODE:
+ fragments.append( child.nodeValue )
+
+ child = child.nextSibling
+
+ joined = ''.join( fragments )
+
+ if encoding is not None:
+ joined = joined.encode( encoding )
+
+ return joined
+
+def _extractStateNodes( root, encoding=None ):
+
+ result = []
+
+ for s_node in root.getElementsByTagName( 'state' ):
+
+ info = { 'state_id' : _getNodeAttribute( s_node, 'state_id', encoding )
+ , 'title' : _getNodeAttribute( s_node, 'title', encoding )
+ , 'description' : _coalesceTextNodeChildren( s_node, encoding )
+ }
+
+ info[ 'transitions' ] = [ _getNodeAttribute( x, 'transition_id' )
+ for x in s_node.getElementsByTagName(
+ 'exit-transition' ) ]
+
+ info[ 'permissions' ] = permission_map = {}
+
+ for p_map in s_node.getElementsByTagName( 'permission-map' ):
+
+ name = _getNodeAttribute( p_map, 'name', encoding )
+ acquired = _getNodeAttributeBoolean( p_map, 'acquired' )
+
+ roles = [ _coalesceTextNodeChildren( x )
+ for x in p_map.getElementsByTagName(
+ 'permission-role' ) ]
+
+ if not acquired:
+ roles = tuple( roles )
+
+ permission_map[ name ] = roles
+
+ info[ 'groups' ] = group_map = []
+
+ for g_map in s_node.getElementsByTagName( 'group-map' ):
+
+ name = _getNodeAttribute( g_map, 'name', encoding )
+
+ roles = [ _coalesceTextNodeChildren( x )
+ for x in g_map.getElementsByTagName(
+ 'group-role' ) ]
+
+ group_map.append( ( name, tuple( roles ) ) )
+
+ info[ 'variables' ] = var_map = {}
+
+ for assignment in s_node.getElementsByTagName( 'assignment' ):
+
+ name = _getNodeAttribute( assignment, 'name' )
+ value = _coalesceTextNodeChildren( assignment )
+ var_map[ name ] = value # XXX: type lost, only know strings???
+
+ result.append( info )
+
+ return result
More information about the CMF-checkins
mailing list