[Zope-CVS] CVS: Products/PluggableAuthService/plugins - ChallengeProtocolChooser.py:1.3.2.1 RequestTypeSniffer.py:1.3.2.1

Sidnei da Silva sidnei at enfoldsystems.com
Wed Aug 17 00:53:06 EDT 2005


Update of /cvs-repository/Products/PluggableAuthService/plugins
In directory cvs.zope.org:/tmp/cvs-serv27657/plugins

Added Files:
      Tag: sidnei-challenge-protocol-chooser
	ChallengeProtocolChooser.py RequestTypeSniffer.py 
Log Message:

- Had mistakenly added those files to HEAD instead of this branch


=== Added File Products/PluggableAuthService/plugins/ChallengeProtocolChooser.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
# Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
# distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Classes: ChallengeProtocolChooser

$Id: ChallengeProtocolChooser.py,v 1.3.2.1 2005/08/17 04:53:05 sidnei Exp $
"""

from Acquisition import aq_parent
from AccessControl import ClassSecurityInfo
from BTrees.OOBTree import OOBTree
from Globals import InitializeClass

from Products.PageTemplates.PageTemplateFile import PageTemplateFile

from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.interfaces.plugins \
    import IRequestTypeSniffer
from Products.PluggableAuthService.interfaces.plugins \
    import IChallengeProtocolChooser
from Products.PluggableAuthService.interfaces.plugins \
     import IChallengePlugin

from Products.PluggableAuthService.interfaces.request \
    import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest

_request_types = ()
_request_type_bmap = {}

def registerRequestType(label, iface):
    global _request_types
    registry = list(_request_types)
    registry.append((label, iface))
    _request_types = tuple(registry)
    _request_type_bmap[iface] = label

def listRequestTypesLabels():
    return _request_type_bmap.values()

manage_addChallengeProtocolChooserForm = PageTemplateFile(
    'www/cpcAdd', globals(), __name__='manage_addChallengeProtocolChooserForm' )

def addChallengeProtocolChooserPlugin( dispatcher, id, title=None,
                                       mapping=None, REQUEST=None ):
    """ Add a ChallengeProtocolChooserPlugin to a Pluggable Auth Service. """

    cpc = ChallengeProtocolChooser(id, title=title, mapping=mapping)
    dispatcher._setObject(cpc.getId(), cpc)

    if REQUEST is not None:
        REQUEST['RESPONSE'].redirect(
                                '%s/manage_workspace'
                                '?manage_tabs_message='
                                'ChallengeProtocolChooser+added.'
                            % dispatcher.absolute_url())


class ChallengeProtocolChooser( BasePlugin ):

    """ PAS plugin for choosing challenger protocol based on request
    """
    meta_type = 'Challenge Protocol Chooser Plugin'

    security = ClassSecurityInfo()

    manage_options = (({'label': 'Mapping',
                        'action': 'manage_editProtocolMapping'
                        },
                       )
                      + BasePlugin.manage_options
                      )

    def __init__(self, id, title=None, mapping=None):
        self._id = self.id = id
        self.title = title
        self._map = OOBTree()
        if mapping is not None:
            self.manage_updateProtocolMapping(mapping=mapping)

    security.declarePrivate('chooseProtocol')
    def chooseProtocols(self, request):
        pas_instance = self._getPAS()
        plugins = pas_instance._getOb('plugins')

        sniffers = plugins.listPlugins( IRequestTypeSniffer )

        for sniffer_id, sniffer in sniffers:
            request_type = sniffer.sniffRequestType(request)
            if request_type is not None:
                return self._getProtocolsFor(request_type)

    def _getProtocolsFor(self, request_type):
        label = _request_type_bmap.get(request_type, None)
        if label is None:
            return
        return self._map.get(label, None)


    def _listProtocols(self):
        pas_instance = self._getPAS()
        plugins = pas_instance._getOb('plugins')

        challengers = plugins.listPlugins( IChallengePlugin )
        found = []

        for challenger_id, challenger in challengers:
            protocol = getattr(challenger, 'protocol', challenger_id)
            if protocol not in found:
                found.append(protocol)

        return found

    manage_editProtocolMappingForm = PageTemplateFile(
        'www/cpcEdit', globals(),
        __name__='manage_editProtocolMappingForm')

    def manage_editProtocolMapping(self, REQUEST=None):
        """ Edit Protocol Mapping
        """
        info = []
        available_protocols = self._listProtocols()

        request_types = listRequestTypesLabels()
        request_types.sort()

        for label in request_types:
            settings = []
            select_any = False
            info.append(
                {'label': label,
                 'settings': settings
                 })
            protocols = self._map.get(label, None)
            if not protocols:
                select_any = True
            for protocol in available_protocols:
                selected = False
                if protocols and protocol in protocols:
                    selected = True
                settings.append({'label': protocol,
                                 'selected': selected,
                                 'value': protocol,
                                 })

            settings.insert(0, {'label': '(any)',
                                'selected': select_any,
                                'value': '',
                                })
        return self.manage_editProtocolMappingForm(info=info, REQUEST=REQUEST)

    def manage_updateProtocolMapping(self, mapping, REQUEST=None):
        """ Update mapping of Request Type to Protocols
        """
        for key, value in mapping.items():
            value = filter(None, value)
            if not value:
                if self._map.has_key(key):
                    del self._map[key]
            else:
                self._map[key] = value

        if REQUEST is not None:
            REQUEST['RESPONSE'].redirect(
                '%s/manage_editProtocolMapping'
                '?manage_tabs_message='
                'Protocol+Mappings+Changed.'
                % self.absolute_url())

classImplements(ChallengeProtocolChooser,
                IChallengeProtocolChooser)

InitializeClass(ChallengeProtocolChooser)

for label, iface in (('Browser', IBrowserRequest),
                     ('WebDAV', IWebDAVRequest),
                     ('FTP', IFTPRequest),
                     ('XML-RPC', IXMLRPCRequest)):
    registerRequestType(label, iface)


=== Added File Products/PluggableAuthService/plugins/RequestTypeSniffer.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
# Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
# distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Classes: RequestTypeSniffer

$Id: RequestTypeSniffer.py,v 1.3.2.1 2005/08/17 04:53:05 sidnei Exp $
"""

from Acquisition import aq_parent
from AccessControl import ClassSecurityInfo
from Globals import InitializeClass
from ZServer.FTPRequest import FTPRequest
from ZPublisher import xmlrpc

from Products.PageTemplates.PageTemplateFile import PageTemplateFile

from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.interfaces.plugins \
    import IRequestTypeSniffer

from Products.PluggableAuthService.interfaces.request \
    import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest

_sniffers = ()

def registerSniffer(iface, func):
    global _sniffers
    registry = list(_sniffers)
    registry.append((iface, func))
    _sniffers = tuple(registry)

manage_addRequestTypeSnifferForm = PageTemplateFile(
    'www/rtsAdd', globals(), __name__='manage_addRequestTypeSnifferForm' )

def addRequestTypeSnifferPlugin( dispatcher, id, title=None, REQUEST=None ):
    """ Add a RequestTypeSnifferPlugin to a Pluggable Auth Service. """

    rts = RequestTypeSniffer(id, title)
    dispatcher._setObject(rts.getId(), rts)

    if REQUEST is not None:
        REQUEST['RESPONSE'].redirect(
                                '%s/manage_workspace'
                                '?manage_tabs_message='
                                'RequestTypeSniffer+added.'
                            % dispatcher.absolute_url())


class RequestTypeSniffer( BasePlugin ):

    """ PAS plugin for detecting a Request's type
    """
    meta_type = 'Request Type Sniffer Plugin'

    security = ClassSecurityInfo()

    def __init__(self, id, title=None):

        self._id = self.id = id
        self.title = title

    security.declarePrivate('sniffRequestType')
    def sniffRequestType(self, request):
        found = None
        for iface, func in _sniffers:
            if func( request ):
                found = iface

        if found is not None:
            return found

classImplements(RequestTypeSniffer,
                IRequestTypeSniffer)

InitializeClass(RequestTypeSniffer)

# Most of the sniffing code below has been inspired by
# similar tests found in BaseRequest, HTTPRequest and ZServer
def webdavSniffer(request):
    dav_src = request.get('WEBDAV_SOURCE_PORT', None)
    method = request.get('REQUEST_METHOD', 'GET').upper()
    path_info = request.get('PATH_INFO', '')

    if dav_src:
        return True

    if method not in ('GET', 'POST'):
        return True

    if method in ('GET',) and path_info.endswith('manage_DAVget'):
        return True

registerSniffer(IWebDAVRequest, webdavSniffer)

def xmlrpcSniffer(request):
    response = request['RESPONSE']
    method = request.get('REQUEST_METHOD', 'GET').upper()

    if method in ('GET', 'POST') and isinstance(response, xmlrpc.Response):
        return True

registerSniffer(IXMLRPCRequest, xmlrpcSniffer)

def ftpSniffer(request):
    if isinstance(request, FTPRequest):
        return True

registerSniffer(IFTPRequest, ftpSniffer)

def browserSniffer(request):
    # If it's none of the above, it's very likely a browser request.
    for sniffer in (webdavSniffer, ftpSniffer, xmlrpcSniffer):
        if sniffer(request):
            return False
    return True

registerSniffer(IBrowserRequest, browserSniffer)



More information about the Zope-CVS mailing list