[Zope-CVS] CVS: Products/PluggableAuthService/plugins -
ChallengeProtocolChooser.py:1.3.2.1 RequestTypeSniffer.py:1.3.2.1
Sidnei da Silva
sidnei at enfoldsystems.com
Wed Aug 17 00:53:06 EDT 2005
Update of /cvs-repository/Products/PluggableAuthService/plugins
In directory cvs.zope.org:/tmp/cvs-serv27657/plugins
Added Files:
Tag: sidnei-challenge-protocol-chooser
ChallengeProtocolChooser.py RequestTypeSniffer.py
Log Message:
- Had mistakenly added those files to HEAD instead of this branch
=== Added File Products/PluggableAuthService/plugins/ChallengeProtocolChooser.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
# Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this
# distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Classes: ChallengeProtocolChooser
$Id: ChallengeProtocolChooser.py,v 1.3.2.1 2005/08/17 04:53:05 sidnei Exp $
"""
from Acquisition import aq_parent
from AccessControl import ClassSecurityInfo
from BTrees.OOBTree import OOBTree
from Globals import InitializeClass
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.interfaces.plugins \
import IRequestTypeSniffer
from Products.PluggableAuthService.interfaces.plugins \
import IChallengeProtocolChooser
from Products.PluggableAuthService.interfaces.plugins \
import IChallengePlugin
from Products.PluggableAuthService.interfaces.request \
import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest
_request_types = ()
_request_type_bmap = {}
def registerRequestType(label, iface):
global _request_types
registry = list(_request_types)
registry.append((label, iface))
_request_types = tuple(registry)
_request_type_bmap[iface] = label
def listRequestTypesLabels():
return _request_type_bmap.values()
manage_addChallengeProtocolChooserForm = PageTemplateFile(
'www/cpcAdd', globals(), __name__='manage_addChallengeProtocolChooserForm' )
def addChallengeProtocolChooserPlugin( dispatcher, id, title=None,
mapping=None, REQUEST=None ):
""" Add a ChallengeProtocolChooserPlugin to a Pluggable Auth Service. """
cpc = ChallengeProtocolChooser(id, title=title, mapping=mapping)
dispatcher._setObject(cpc.getId(), cpc)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ChallengeProtocolChooser+added.'
% dispatcher.absolute_url())
class ChallengeProtocolChooser( BasePlugin ):
""" PAS plugin for choosing challenger protocol based on request
"""
meta_type = 'Challenge Protocol Chooser Plugin'
security = ClassSecurityInfo()
manage_options = (({'label': 'Mapping',
'action': 'manage_editProtocolMapping'
},
)
+ BasePlugin.manage_options
)
def __init__(self, id, title=None, mapping=None):
self._id = self.id = id
self.title = title
self._map = OOBTree()
if mapping is not None:
self.manage_updateProtocolMapping(mapping=mapping)
security.declarePrivate('chooseProtocol')
def chooseProtocols(self, request):
pas_instance = self._getPAS()
plugins = pas_instance._getOb('plugins')
sniffers = plugins.listPlugins( IRequestTypeSniffer )
for sniffer_id, sniffer in sniffers:
request_type = sniffer.sniffRequestType(request)
if request_type is not None:
return self._getProtocolsFor(request_type)
def _getProtocolsFor(self, request_type):
label = _request_type_bmap.get(request_type, None)
if label is None:
return
return self._map.get(label, None)
def _listProtocols(self):
pas_instance = self._getPAS()
plugins = pas_instance._getOb('plugins')
challengers = plugins.listPlugins( IChallengePlugin )
found = []
for challenger_id, challenger in challengers:
protocol = getattr(challenger, 'protocol', challenger_id)
if protocol not in found:
found.append(protocol)
return found
manage_editProtocolMappingForm = PageTemplateFile(
'www/cpcEdit', globals(),
__name__='manage_editProtocolMappingForm')
def manage_editProtocolMapping(self, REQUEST=None):
""" Edit Protocol Mapping
"""
info = []
available_protocols = self._listProtocols()
request_types = listRequestTypesLabels()
request_types.sort()
for label in request_types:
settings = []
select_any = False
info.append(
{'label': label,
'settings': settings
})
protocols = self._map.get(label, None)
if not protocols:
select_any = True
for protocol in available_protocols:
selected = False
if protocols and protocol in protocols:
selected = True
settings.append({'label': protocol,
'selected': selected,
'value': protocol,
})
settings.insert(0, {'label': '(any)',
'selected': select_any,
'value': '',
})
return self.manage_editProtocolMappingForm(info=info, REQUEST=REQUEST)
def manage_updateProtocolMapping(self, mapping, REQUEST=None):
""" Update mapping of Request Type to Protocols
"""
for key, value in mapping.items():
value = filter(None, value)
if not value:
if self._map.has_key(key):
del self._map[key]
else:
self._map[key] = value
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_editProtocolMapping'
'?manage_tabs_message='
'Protocol+Mappings+Changed.'
% self.absolute_url())
classImplements(ChallengeProtocolChooser,
IChallengeProtocolChooser)
InitializeClass(ChallengeProtocolChooser)
for label, iface in (('Browser', IBrowserRequest),
('WebDAV', IWebDAVRequest),
('FTP', IFTPRequest),
('XML-RPC', IXMLRPCRequest)):
registerRequestType(label, iface)
=== Added File Products/PluggableAuthService/plugins/RequestTypeSniffer.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
# Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this
# distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Classes: RequestTypeSniffer
$Id: RequestTypeSniffer.py,v 1.3.2.1 2005/08/17 04:53:05 sidnei Exp $
"""
from Acquisition import aq_parent
from AccessControl import ClassSecurityInfo
from Globals import InitializeClass
from ZServer.FTPRequest import FTPRequest
from ZPublisher import xmlrpc
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.interfaces.plugins \
import IRequestTypeSniffer
from Products.PluggableAuthService.interfaces.request \
import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest
_sniffers = ()
def registerSniffer(iface, func):
global _sniffers
registry = list(_sniffers)
registry.append((iface, func))
_sniffers = tuple(registry)
manage_addRequestTypeSnifferForm = PageTemplateFile(
'www/rtsAdd', globals(), __name__='manage_addRequestTypeSnifferForm' )
def addRequestTypeSnifferPlugin( dispatcher, id, title=None, REQUEST=None ):
""" Add a RequestTypeSnifferPlugin to a Pluggable Auth Service. """
rts = RequestTypeSniffer(id, title)
dispatcher._setObject(rts.getId(), rts)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'RequestTypeSniffer+added.'
% dispatcher.absolute_url())
class RequestTypeSniffer( BasePlugin ):
""" PAS plugin for detecting a Request's type
"""
meta_type = 'Request Type Sniffer Plugin'
security = ClassSecurityInfo()
def __init__(self, id, title=None):
self._id = self.id = id
self.title = title
security.declarePrivate('sniffRequestType')
def sniffRequestType(self, request):
found = None
for iface, func in _sniffers:
if func( request ):
found = iface
if found is not None:
return found
classImplements(RequestTypeSniffer,
IRequestTypeSniffer)
InitializeClass(RequestTypeSniffer)
# Most of the sniffing code below has been inspired by
# similar tests found in BaseRequest, HTTPRequest and ZServer
def webdavSniffer(request):
dav_src = request.get('WEBDAV_SOURCE_PORT', None)
method = request.get('REQUEST_METHOD', 'GET').upper()
path_info = request.get('PATH_INFO', '')
if dav_src:
return True
if method not in ('GET', 'POST'):
return True
if method in ('GET',) and path_info.endswith('manage_DAVget'):
return True
registerSniffer(IWebDAVRequest, webdavSniffer)
def xmlrpcSniffer(request):
response = request['RESPONSE']
method = request.get('REQUEST_METHOD', 'GET').upper()
if method in ('GET', 'POST') and isinstance(response, xmlrpc.Response):
return True
registerSniffer(IXMLRPCRequest, xmlrpcSniffer)
def ftpSniffer(request):
if isinstance(request, FTPRequest):
return True
registerSniffer(IFTPRequest, ftpSniffer)
def browserSniffer(request):
# If it's none of the above, it's very likely a browser request.
for sniffer in (webdavSniffer, ftpSniffer, xmlrpcSniffer):
if sniffer(request):
return False
return True
registerSniffer(IBrowserRequest, browserSniffer)
More information about the Zope-CVS
mailing list