[Zope-CVS] CVS: Products/PluggableAuthService/plugins -
ChallengeProtocolChooser.py:1.1.2.1 RequestTypeSniffer.py:1.1.2.1
Sidnei da Silva
sidnei at enfoldsystems.com
Sat Aug 13 00:34:34 EDT 2005
Update of /cvs-repository/Products/PluggableAuthService/plugins
In directory cvs.zope.org:/tmp/cvs-serv7291/plugins
Added Files:
Tag: sidnei-challenge-protocol-chooser
ChallengeProtocolChooser.py RequestTypeSniffer.py
Log Message:
- Implement Challenge Protocol Chooser, with functional doctests
=== Added File Products/PluggableAuthService/plugins/ChallengeProtocolChooser.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
# Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this
# distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Classes: ChallengeProtocolChooser
$Id: ChallengeProtocolChooser.py,v 1.1.2.1 2005/08/13 04:34:33 sidnei Exp $
"""
from Acquisition import aq_parent
from AccessControl import ClassSecurityInfo
from BTrees.OOBTree import OOBTree
from Globals import InitializeClass
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.interfaces.plugins \
import IRequestTypeSniffer
from Products.PluggableAuthService.interfaces.plugins \
import IChallengeProtocolChooser
from Products.PluggableAuthService.interfaces.plugins \
import IChallengePlugin
from Products.PluggableAuthService.interfaces.request \
import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest
_request_types = ()
_request_type_bmap = {}
def registerRequestType(label, iface):
global _request_types
registry = list(_request_types)
registry.append((label, iface))
_request_types = tuple(registry)
_request_type_bmap[iface] = label
def listRequestTypesLabels():
return _request_type_bmap.values()
manage_addChallengeProtocolChooserForm = PageTemplateFile(
'www/cpcAdd', globals(), __name__='manage_addChallengeProtocolChooserForm' )
def addChallengeProtocolChooserPlugin( dispatcher, id, title=None,
mapping=None, REQUEST=None ):
""" Add a ChallengeProtocolChooserPlugin to a Pluggable Auth Service. """
cpc = ChallengeProtocolChooser(id, title=title, mapping=mapping)
dispatcher._setObject(cpc.getId(), cpc)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ChallengeProtocolChooser+added.'
% dispatcher.absolute_url())
class ChallengeProtocolChooser( BasePlugin ):
""" PAS plugin for choosing challenger protocol based on request
"""
__implements__ = ( IChallengeProtocolChooser, )
meta_type = 'Challenge Protocol Chooser Plugin'
security = ClassSecurityInfo()
manage_options = (({'label': 'Mapping',
'action': 'manage_editProtocolMapping'
},
)
+ BasePlugin.manage_options
)
def __init__(self, id, title=None, mapping=None):
self._id = self.id = id
self.title = title
self._map = OOBTree()
if mapping is not None:
self.manage_updateProtocolMapping(mapping=mapping)
security.declarePrivate('chooseProtocol')
def chooseProtocols(self, request):
pas_instance = self._getPAS()
plugins = pas_instance._getOb('plugins')
sniffers = plugins.listPlugins( IRequestTypeSniffer )
for sniffer_id, sniffer in sniffers:
request_type = sniffer.sniffRequestType(request)
if request_type is not None:
return self._getProtocolsFor(request_type)
def _getProtocolsFor(self, request_type):
label = _request_type_bmap.get(request_type, None)
if label is None:
return
return self._map.get(label, None)
def _listProtocols(self):
pas_instance = self._getPAS()
plugins = pas_instance._getOb('plugins')
challengers = plugins.listPlugins( IChallengePlugin )
found = []
for challenger_id, challenger in challengers:
protocol = getattr(challenger, 'protocol', challenger_id)
if protocol not in found:
found.append(protocol)
return found
manage_editProtocolMappingForm = PageTemplateFile(
'www/cpcEdit', globals(),
__name__='manage_editProtocolMappingForm')
def manage_editProtocolMapping(self, REQUEST=None):
""" Edit Protocol Mapping
"""
info = []
available_protocols = self._listProtocols()
request_types = listRequestTypesLabels()
request_types.sort()
for label in request_types:
settings = []
select_any = False
info.append(
{'label': label,
'settings': settings
})
protocols = self._map.get(label, None)
if not protocols:
select_any = True
for protocol in available_protocols:
selected = False
if protocols and protocol in protocols:
selected = True
settings.append({'label': protocol,
'selected': selected,
'value': protocol,
})
settings.insert(0, {'label': '(any)',
'selected': select_any,
'value': '',
})
return self.manage_editProtocolMappingForm(info=info, REQUEST=REQUEST)
def manage_updateProtocolMapping(self, mapping, REQUEST=None):
""" Update mapping of Request Type to Protocols
"""
for key, value in mapping.items():
value = filter(None, value)
if not value:
if self._map.has_key(key):
del self._map[key]
else:
self._map[key] = value
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_editProtocolMapping'
'?manage_tabs_message='
'Protocol+Mappings+Changed.'
% self.absolute_url())
for label, iface in (('Browser', IBrowserRequest),
('WebDAV', IWebDAVRequest),
('FTP', IFTPRequest),
('XML-RPC', IXMLRPCRequest)):
registerRequestType(label, iface)
=== Added File Products/PluggableAuthService/plugins/RequestTypeSniffer.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
# Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this
# distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Classes: RequestTypeSniffer
$Id: RequestTypeSniffer.py,v 1.1.2.1 2005/08/13 04:34:33 sidnei Exp $
"""
from Acquisition import aq_parent
from AccessControl import ClassSecurityInfo
from Globals import InitializeClass
from ZServer.FTPRequest import FTPRequest
from ZPublisher import xmlrpc
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.interfaces.plugins \
import IRequestTypeSniffer
from Products.PluggableAuthService.interfaces.request \
import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest
_sniffers = ()
def registerSniffer(iface, func):
global _sniffers
registry = list(_sniffers)
registry.append((iface, func))
_sniffers = tuple(registry)
manage_addRequestTypeSnifferForm = PageTemplateFile(
'www/rtsAdd', globals(), __name__='manage_addRequestTypeSnifferForm' )
def addRequestTypeSnifferPlugin( dispatcher, id, title=None, REQUEST=None ):
""" Add a RequestTypeSnifferPlugin to a Pluggable Auth Service. """
rts = RequestTypeSniffer(id, title)
dispatcher._setObject(rts.getId(), rts)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'RequestTypeSniffer+added.'
% dispatcher.absolute_url())
class RequestTypeSniffer( BasePlugin ):
""" PAS plugin for detecting a Request's type
"""
__implements__ = ( IRequestTypeSniffer, )
meta_type = 'Request Type Sniffer Plugin'
security = ClassSecurityInfo()
def __init__(self, id, title=None):
self._id = self.id = id
self.title = title
security.declarePrivate('sniffRequestType')
def sniffRequestType(self, request):
found = None
for iface, func in _sniffers:
if func( request ):
found = iface
if found is not None:
return found
# Most of the sniffing code below has been inspired by
# similar tests found in BaseRequest, HTTPRequest and ZServer
def webdavSniffer(request):
dav_src = request.get('WEBDAV_SOURCE_PORT', None)
method = request.get('REQUEST_METHOD', 'GET').upper()
path_info = request.get('PATH_INFO', '')
if dav_src:
return True
if method not in ('GET', 'POST'):
return True
if method in ('GET',) and path_info.endswith('manage_DAVget'):
return True
registerSniffer(IWebDAVRequest, webdavSniffer)
def xmlrpcSniffer(request):
response = request['RESPONSE']
method = request.get('REQUEST_METHOD', 'GET').upper()
if method in ('GET', 'POST') and isinstance(response, xmlrpc.Response):
return True
registerSniffer(IXMLRPCRequest, xmlrpcSniffer)
def ftpSniffer(request):
if isinstance(request, FTPRequest):
return True
registerSniffer(IFTPRequest, ftpSniffer)
def browserSniffer(request):
# If it's none of the above, it's very likely a browser request.
for sniffer in (webdavSniffer, ftpSniffer, xmlrpcSniffer):
if sniffer(request):
return False
return True
registerSniffer(IBrowserRequest, browserSniffer)
More information about the Zope-CVS
mailing list