[Zope-CVS] CVS: Products/PluggableAuthService/plugins -
ChallengeProtocolChooser.py:1.4 RequestTypeSniffer.py:1.4
Sidnei da Silva
sidnei at enfoldsystems.com
Wed Aug 17 16:53:45 EDT 2005
Update of /cvs-repository/Products/PluggableAuthService/plugins
In directory cvs.zope.org:/tmp/cvs-serv13697/plugins
Added Files:
ChallengeProtocolChooser.py RequestTypeSniffer.py
Log Message:
Merge changes from sidnei-challenge-protocol-chooser:
- Added two new interfaces, IChallengeProtocolChooser and
IRequestTypeSniffer. Those are used to select the 'authorization
protocol' or 'challenger protocol' to be used for challenging
according to the incoming request type.
- Fixed a couple more places where Zope 2-style __implements__
where being used to standardize on using classImplements.
- Fixed fallback implementations of providedBy and
implementedBy to always return a tuple.
- Make sure challenge doesn't break if existing instances of the
PluginRegistry don't yet have IChallengeProtocolChooser as a
registered interface. (Would be nice to have some sort of
migration for the PluginRegistry between PAS releases)
- Don't assume that just because zope.interface can be imported
that Five is present.
=== Products/PluggableAuthService/plugins/ChallengeProtocolChooser.py 1.3 => 1.4 ===
--- /dev/null Wed Aug 17 16:53:44 2005
+++ Products/PluggableAuthService/plugins/ChallengeProtocolChooser.py Wed Aug 17 16:53:14 2005
@@ -0,0 +1,190 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
+# Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this
+# distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+""" Classes: ChallengeProtocolChooser
+
+$Id$
+"""
+
+from Acquisition import aq_parent
+from AccessControl import ClassSecurityInfo
+from BTrees.OOBTree import OOBTree
+from Globals import InitializeClass
+
+from Products.PageTemplates.PageTemplateFile import PageTemplateFile
+
+from Products.PluggableAuthService.utils import classImplements
+from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
+from Products.PluggableAuthService.interfaces.plugins \
+ import IRequestTypeSniffer
+from Products.PluggableAuthService.interfaces.plugins \
+ import IChallengeProtocolChooser
+from Products.PluggableAuthService.interfaces.plugins \
+ import IChallengePlugin
+
+from Products.PluggableAuthService.interfaces.request \
+ import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest
+
+_request_types = ()
+_request_type_bmap = {}
+
+def registerRequestType(label, iface):
+ global _request_types
+ registry = list(_request_types)
+ registry.append((label, iface))
+ _request_types = tuple(registry)
+ _request_type_bmap[iface] = label
+
+def listRequestTypesLabels():
+ return _request_type_bmap.values()
+
+manage_addChallengeProtocolChooserForm = PageTemplateFile(
+ 'www/cpcAdd', globals(), __name__='manage_addChallengeProtocolChooserForm' )
+
+def addChallengeProtocolChooserPlugin( dispatcher, id, title=None,
+ mapping=None, REQUEST=None ):
+ """ Add a ChallengeProtocolChooserPlugin to a Pluggable Auth Service. """
+
+ cpc = ChallengeProtocolChooser(id, title=title, mapping=mapping)
+ dispatcher._setObject(cpc.getId(), cpc)
+
+ if REQUEST is not None:
+ REQUEST['RESPONSE'].redirect(
+ '%s/manage_workspace'
+ '?manage_tabs_message='
+ 'ChallengeProtocolChooser+added.'
+ % dispatcher.absolute_url())
+
+
+class ChallengeProtocolChooser( BasePlugin ):
+
+ """ PAS plugin for choosing challenger protocol based on request
+ """
+ meta_type = 'Challenge Protocol Chooser Plugin'
+
+ security = ClassSecurityInfo()
+
+ manage_options = (({'label': 'Mapping',
+ 'action': 'manage_editProtocolMapping'
+ },
+ )
+ + BasePlugin.manage_options
+ )
+
+ def __init__(self, id, title=None, mapping=None):
+ self._id = self.id = id
+ self.title = title
+ self._map = OOBTree()
+ if mapping is not None:
+ self.manage_updateProtocolMapping(mapping=mapping)
+
+ security.declarePrivate('chooseProtocol')
+ def chooseProtocols(self, request):
+ pas_instance = self._getPAS()
+ plugins = pas_instance._getOb('plugins')
+
+ sniffers = plugins.listPlugins( IRequestTypeSniffer )
+
+ for sniffer_id, sniffer in sniffers:
+ request_type = sniffer.sniffRequestType(request)
+ if request_type is not None:
+ return self._getProtocolsFor(request_type)
+
+ def _getProtocolsFor(self, request_type):
+ label = _request_type_bmap.get(request_type, None)
+ if label is None:
+ return
+ return self._map.get(label, None)
+
+
+ def _listProtocols(self):
+ pas_instance = self._getPAS()
+ plugins = pas_instance._getOb('plugins')
+
+ challengers = plugins.listPlugins( IChallengePlugin )
+ found = []
+
+ for challenger_id, challenger in challengers:
+ protocol = getattr(challenger, 'protocol', challenger_id)
+ if protocol not in found:
+ found.append(protocol)
+
+ return found
+
+ manage_editProtocolMappingForm = PageTemplateFile(
+ 'www/cpcEdit', globals(),
+ __name__='manage_editProtocolMappingForm')
+
+ def manage_editProtocolMapping(self, REQUEST=None):
+ """ Edit Protocol Mapping
+ """
+ info = []
+ available_protocols = self._listProtocols()
+
+ request_types = listRequestTypesLabels()
+ request_types.sort()
+
+ for label in request_types:
+ settings = []
+ select_any = False
+ info.append(
+ {'label': label,
+ 'settings': settings
+ })
+ protocols = self._map.get(label, None)
+ if not protocols:
+ select_any = True
+ for protocol in available_protocols:
+ selected = False
+ if protocols and protocol in protocols:
+ selected = True
+ settings.append({'label': protocol,
+ 'selected': selected,
+ 'value': protocol,
+ })
+
+ settings.insert(0, {'label': '(any)',
+ 'selected': select_any,
+ 'value': '',
+ })
+ return self.manage_editProtocolMappingForm(info=info, REQUEST=REQUEST)
+
+ def manage_updateProtocolMapping(self, mapping, REQUEST=None):
+ """ Update mapping of Request Type to Protocols
+ """
+ for key, value in mapping.items():
+ value = filter(None, value)
+ if not value:
+ if self._map.has_key(key):
+ del self._map[key]
+ else:
+ self._map[key] = value
+
+ if REQUEST is not None:
+ REQUEST['RESPONSE'].redirect(
+ '%s/manage_editProtocolMapping'
+ '?manage_tabs_message='
+ 'Protocol+Mappings+Changed.'
+ % self.absolute_url())
+
+classImplements(ChallengeProtocolChooser,
+ IChallengeProtocolChooser)
+
+InitializeClass(ChallengeProtocolChooser)
+
+for label, iface in (('Browser', IBrowserRequest),
+ ('WebDAV', IWebDAVRequest),
+ ('FTP', IFTPRequest),
+ ('XML-RPC', IXMLRPCRequest)):
+ registerRequestType(label, iface)
=== Products/PluggableAuthService/plugins/RequestTypeSniffer.py 1.3 => 1.4 ===
--- /dev/null Wed Aug 17 16:53:44 2005
+++ Products/PluggableAuthService/plugins/RequestTypeSniffer.py Wed Aug 17 16:53:14 2005
@@ -0,0 +1,129 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
+# Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this
+# distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+""" Classes: RequestTypeSniffer
+
+$Id$
+"""
+
+from Acquisition import aq_parent
+from AccessControl import ClassSecurityInfo
+from Globals import InitializeClass
+from ZServer.FTPRequest import FTPRequest
+from ZPublisher import xmlrpc
+
+from Products.PageTemplates.PageTemplateFile import PageTemplateFile
+
+from Products.PluggableAuthService.utils import classImplements
+from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
+from Products.PluggableAuthService.interfaces.plugins \
+ import IRequestTypeSniffer
+
+from Products.PluggableAuthService.interfaces.request \
+ import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest
+
+_sniffers = ()
+
+def registerSniffer(iface, func):
+ global _sniffers
+ registry = list(_sniffers)
+ registry.append((iface, func))
+ _sniffers = tuple(registry)
+
+manage_addRequestTypeSnifferForm = PageTemplateFile(
+ 'www/rtsAdd', globals(), __name__='manage_addRequestTypeSnifferForm' )
+
+def addRequestTypeSnifferPlugin( dispatcher, id, title=None, REQUEST=None ):
+ """ Add a RequestTypeSnifferPlugin to a Pluggable Auth Service. """
+
+ rts = RequestTypeSniffer(id, title)
+ dispatcher._setObject(rts.getId(), rts)
+
+ if REQUEST is not None:
+ REQUEST['RESPONSE'].redirect(
+ '%s/manage_workspace'
+ '?manage_tabs_message='
+ 'RequestTypeSniffer+added.'
+ % dispatcher.absolute_url())
+
+
+class RequestTypeSniffer( BasePlugin ):
+
+ """ PAS plugin for detecting a Request's type
+ """
+ meta_type = 'Request Type Sniffer Plugin'
+
+ security = ClassSecurityInfo()
+
+ def __init__(self, id, title=None):
+
+ self._id = self.id = id
+ self.title = title
+
+ security.declarePrivate('sniffRequestType')
+ def sniffRequestType(self, request):
+ found = None
+ for iface, func in _sniffers:
+ if func( request ):
+ found = iface
+
+ if found is not None:
+ return found
+
+classImplements(RequestTypeSniffer,
+ IRequestTypeSniffer)
+
+InitializeClass(RequestTypeSniffer)
+
+# Most of the sniffing code below has been inspired by
+# similar tests found in BaseRequest, HTTPRequest and ZServer
+def webdavSniffer(request):
+ dav_src = request.get('WEBDAV_SOURCE_PORT', None)
+ method = request.get('REQUEST_METHOD', 'GET').upper()
+ path_info = request.get('PATH_INFO', '')
+
+ if dav_src:
+ return True
+
+ if method not in ('GET', 'POST'):
+ return True
+
+ if method in ('GET',) and path_info.endswith('manage_DAVget'):
+ return True
+
+registerSniffer(IWebDAVRequest, webdavSniffer)
+
+def xmlrpcSniffer(request):
+ response = request['RESPONSE']
+ method = request.get('REQUEST_METHOD', 'GET').upper()
+
+ if method in ('GET', 'POST') and isinstance(response, xmlrpc.Response):
+ return True
+
+registerSniffer(IXMLRPCRequest, xmlrpcSniffer)
+
+def ftpSniffer(request):
+ if isinstance(request, FTPRequest):
+ return True
+
+registerSniffer(IFTPRequest, ftpSniffer)
+
+def browserSniffer(request):
+ # If it's none of the above, it's very likely a browser request.
+ for sniffer in (webdavSniffer, ftpSniffer, xmlrpcSniffer):
+ if sniffer(request):
+ return False
+ return True
+
+registerSniffer(IBrowserRequest, browserSniffer)
More information about the Zope-CVS
mailing list