[CMF-checkins] CVS: CMF/CMFSetup - workflow.py:1.7

Tres Seaver tseaver at zope.com
Tue Jun 8 19:52:17 EDT 2004


Update of /cvs-repository/CMF/CMFSetup
In directory cvs.zope.org:/tmp/cvs-serv15327

Modified Files:
	workflow.py 
Log Message:


  - workflow.py:

    o Add SAX-based parsing of tool config XML.

    o Add DOM-based parsing of Workflow Definition (currently incomplete,
      parsing only top-level attributes and states).

  - xml/wtcWorkflowExport.xml:

    o Update template for improved parseability, and to fix bugs revealed
      during parsing.


=== CMF/CMFSetup/workflow.py 1.6 => 1.7 ===
--- CMF/CMFSetup/workflow.py:1.6	Fri Jun  4 23:16:47 2004
+++ CMF/CMFSetup/workflow.py	Tue Jun  8 19:51:45 2004
@@ -3,6 +3,7 @@
 $Id$
 """
 from xml.sax import parseString
+from xml.dom.minidom import parseString as domParseString
 
 from AccessControl import ClassSecurityInfo
 from Acquisition import Implicit
@@ -63,8 +64,8 @@
 
     if text is not None:
 
-        apc = WorkflowToolConfigurator( site ).__of__( site )
-        apc.parseXML( text, encoding )
+        wtc = WorkflowToolConfigurator( site ).__of__( site )
+        wtc.parseXML( text, encoding )
 
     return 'Workflows imported.'
 
@@ -196,6 +197,64 @@
         """
         return self._workflowConfig( workflow_id=workflow_id )
 
+    security.declareProtected( ManagePortal, 'parseToolXML' )
+    def parseToolXML( self, xml, encoding=None ):
+
+        """ Pseudo API.
+        """
+        parser = _WorkflowToolParser( encoding )
+        parseString( xml, parser )
+
+        return parser._workflows, parser._bindings
+
+    security.declareProtected( ManagePortal, 'parseWorkflowXML' )
+    def parseWorkflowXML( self, xml, encoding=None ):
+
+        """ Pseudo API.
+        """
+        dom = domParseString( xml )
+
+        root = dom.getElementsByTagName( 'dc-workflow' )[ 0 ]
+
+        workflow_id = _getNodeAttribute( root, 'workflow_id', encoding )
+        title = _getNodeAttribute( root, 'title', encoding )
+        state_variable = _getNodeAttribute( root, 'state_variable', encoding )
+        initial_state = _getNodeAttribute( root, 'initial_state', encoding )
+
+        states = _extractStateNodes( root )
+        transitions = []
+        variables = []
+        worklists = []
+        permissions = []
+        scripts = []
+
+        return ( workflow_id
+               , title
+               , state_variable
+               , initial_state
+               , states
+               , transitions
+               , variables
+               , worklists
+               , permissions
+               , scripts
+               )
+
+        parser = _WorkflowDefinitionParser( encoding )
+        parseString( xml, parser )
+
+        return ( parser._workflow_id
+               , parser._title
+               , parser._state_variable
+               , parser._initial_state
+               , parser._states
+               , parser._transitions
+               , parser._variables
+               , parser._worklists
+               , parser._permissions
+               , parser._scripts
+               )
+
     #
     #   Helper methods
     #
@@ -563,5 +622,244 @@
 
         return result
 
-
 InitializeClass( WorkflowToolConfigurator )
+
+class _WorkflowToolParser( HandlerBase ):
+
+    security = ClassSecurityInfo()
+
+    def __init__( self, encoding ):
+
+        self._encoding = encoding
+        self._workflows = []
+        self._bindings = {}
+        self._binding_type = None
+
+    security.declarePrivate( 'startElement' )
+    def startElement( self, name, attrs ):
+
+        if name in ( 'workflow-tool', 'bindings' ):
+            pass
+
+        elif name == 'workflow':
+
+            workflow_id = self._extract( attrs, 'workflow_id' )
+            meta_type = self._extract( attrs, 'meta_type', workflow_id )
+
+            if meta_type == DCWorkflowDefinition.meta_type:
+
+                filename = self._extract( attrs, 'filename', workflow_id )
+            
+                if filename == workflow_id:
+                    filename = _getWorkflowFilename( filename )
+
+            else:
+                filename = None
+
+            self._workflows.append( ( workflow_id, meta_type, filename ) )
+
+        elif name == 'default':
+
+            self._binding_type = None
+
+        elif name == 'type':
+
+            self._binding_type = self._extract( attrs, 'type_id' )
+
+        elif name == 'bound-workflow':
+
+            workflow_id = self._extract( attrs, 'workflow_id' )
+
+            bindings = self._bindings.setdefault( self._binding_type, [] )
+            bindings.append( workflow_id )
+
+        else:
+            raise ValueError, 'Unknown element: %s' % name
+
+InitializeClass( _WorkflowToolParser )
+
+class _WorkflowDefinitionParser( HandlerBase ):
+
+    security = ClassSecurityInfo()
+
+    def __init__( self, encoding ):
+
+        self._encoding = encoding
+        self._workflow_id = None
+        self._title = None
+        self._state_variable = None
+        self._initial_state = None
+        self._states = []
+        self._transitions = []
+        self._variables = []
+        self._worklists = []
+        self._permissions = []
+        self._scripts = []
+        self._current = None
+        self._permission_map = None
+        self._permission_role = None
+
+    security.declarePrivate( 'startElement' )
+    def startElement( self, name, attrs ):
+
+        if name == 'dc-workflow':
+
+            self._workflow_id = self._extract( attrs, 'workflow_id' )
+            self._title = self._extract( attrs, 'title' )
+            self._state_variable = self._extract( attrs, 'state_variable' )
+            self._initial_state = self._extract( attrs, 'initial_state' )
+
+        elif name == 'state':
+
+            info = { 'state_id' : self._extract( attrs, 'state_id' )
+                   , 'title' : self._extract( attrs, 'title' )
+                   , 'description' : []
+                   }
+
+            self._states.append( info )
+            self._current = info
+
+        elif name == 'permission-map':
+
+            info = { 'name' : self._extract( attrs, 'name' )
+                   , 'acquired' : self._extractBoolean( attrs
+                                                      , 'acquired', True )
+                   , 'roles' : []
+                   }
+
+            self._current.setdefault( 'permission_map', [] ).append( info )
+            self._permission_map = info
+
+        elif name == 'permission-role':
+
+            self._permission_role = []
+
+    security.declarePrivate( 'endElement' )
+    def endElement( self, name ):
+
+        if name == 'permission-role':
+            self._permission_map[ 'roles' ].append(
+                                            ''.join( self._permission_role ) )
+            self._permission_role = None
+
+        elif self._current is not None:
+            desc = ''.join( self._current[ 'description' ] )
+            self._current[ 'description' ] = desc
+
+        self._current = None
+        self._permission_map = None
+
+    security.declarePrivate( 'characters' )
+    def characters( self, text ):
+
+        if self._permission_role is not None:
+            self._permission_role.append( text )
+        elif self._current is not None and 'description' in self._current:
+            self._current[ 'description' ].append( text )
+                   
+
+InitializeClass( _WorkflowDefinitionParser )
+
+def _getWorkflowFilename( workflow_id ):
+
+    """ Return the name of the file which holds info for a given type.
+    """
+    return 'workflows/%s/definition.xml' % workflow_id.replace( ' ', '_' )
+
+
+def _getNodeAttribute( node, attr_name, encoding=None ):
+
+    """ Extract a string-valued attribute from node.
+    """
+    value = node.attributes[ attr_name ].nodeValue
+
+    if encoding is not None:
+        value = value.encode( encoding )
+
+    return value
+
+def _getNodeAttributeBoolean( node, attr_name ):
+
+    """ Extract a string-valued attribute from node.
+    """
+    value = node.attributes[ attr_name ].nodeValue.lower()
+
+    return value in ( 'true', 'yes', '1' )
+
+def _coalesceTextNodeChildren( node, encoding=None ):
+
+    """ Concatenate all childe text nodes into a single string.
+    """
+    from xml.dom import Node
+    fragments = []
+    node.normalize()
+    child = node.firstChild
+
+    while child is not None:
+
+        if child.nodeType == Node.TEXT_NODE:
+            fragments.append( child.nodeValue )
+
+        child = child.nextSibling
+
+    joined = ''.join( fragments )
+
+    if encoding is not None:
+        joined = joined.encode( encoding )
+
+    return joined
+
+def _extractStateNodes( root, encoding=None ):
+
+    result = []
+
+    for s_node in root.getElementsByTagName( 'state' ):
+
+        info = { 'state_id' : _getNodeAttribute( s_node, 'state_id', encoding )
+               , 'title' : _getNodeAttribute( s_node, 'title', encoding )
+               , 'description' : _coalesceTextNodeChildren( s_node, encoding )
+               }
+
+        info[ 'transitions' ] = [ _getNodeAttribute( x, 'transition_id' )
+                                  for x in s_node.getElementsByTagName(
+                                                        'exit-transition' ) ]
+
+        info[ 'permissions' ] = permission_map = {}
+
+        for p_map in s_node.getElementsByTagName( 'permission-map' ):
+
+            name = _getNodeAttribute( p_map, 'name', encoding )
+            acquired = _getNodeAttributeBoolean( p_map, 'acquired' )
+
+            roles = [ _coalesceTextNodeChildren( x )
+                        for x in p_map.getElementsByTagName(
+                                            'permission-role' ) ]
+
+            if not acquired:
+                roles = tuple( roles )
+
+            permission_map[ name ] = roles
+
+        info[ 'groups' ] = group_map = []
+
+        for g_map in s_node.getElementsByTagName( 'group-map' ):
+
+            name = _getNodeAttribute( g_map, 'name', encoding )
+
+            roles = [ _coalesceTextNodeChildren( x )
+                        for x in g_map.getElementsByTagName(
+                                            'group-role' ) ]
+
+            group_map.append( ( name, tuple( roles ) ) )
+
+        info[ 'variables' ] = var_map = {}
+
+        for assignment in s_node.getElementsByTagName( 'assignment' ):
+
+            name = _getNodeAttribute( assignment, 'name' )
+            value = _coalesceTextNodeChildren( assignment )
+            var_map[ name ] = value # XXX: type lost, only know strings???
+
+        result.append( info )
+
+    return result




More information about the CMF-checkins mailing list