[Zope3-checkins] SVN: ldapauth/trunk/source.py Changes in source.py
Implement chaching results in a recursion error.
Roger Ineichen
roger at projekt01.ch
Sun Jul 11 17:37:51 EDT 2004
Log message for revision 26416:
Changes in source.py Implement chaching results in a recursion error.
It's just a branch. The test will fail in ldapauth till we fix this.
-=-
Modified: ldapauth/trunk/source.py
===================================================================
--- ldapauth/trunk/source.py 2004-07-11 12:54:14 UTC (rev 26415)
+++ ldapauth/trunk/source.py 2004-07-11 21:37:51 UTC (rev 26416)
@@ -14,131 +14,177 @@
"""A plugable authentication module for LDAP.
$Id:
-"""
+"""
+
+import ldap
+from persistent import Persistent
+from zope.app.container.contained import Contained, setitem
+from zope.app.pluggableauth.interfaces import \
+ ILoginPasswordPrincipalSource, IContainerPrincipalSource
+from zope.app.location import locate
+from zope.app.pluggableauth import SimplePrincipal
+from zope.exceptions import NotFoundError
+from zope.interface import implements
+
+from zope.app.cache.ram import RAMCache
+from zope.app.cache.caching import getCacheForObject, getLocationForCache
+from zope.app.cache.annotationcacheable import AnnotationCacheable
+
+from interfaces import ILDAPBasedPrincipalSource
+
+class LDAPPrincipalSource(Contained, Persistent):
+ """A Principal source using LDAP"""
+ implements(ILoginPasswordPrincipalSource, ILDAPBasedPrincipalSource,
+ IContainerPrincipalSource)
+
+ def __init__(self, server=u'', port=389, basedn=u'',
+ login_attribute=u'',
+ manager_dn=u'', manager_passwd=u''):
+ self.host = server
+ self.port = port
+ self.basedn = basedn
+ self.login_attribute = login_attribute
+ self.manager_dn = manager_dn
+ self.manager_passwd = manager_passwd
+
+ ### IContainer-related methods
+
+ def __delitem__(self, login):
+ cache = getCacheForObject(self)
+ location = getLocationForCache(self)
+ if cache and location:
+ cache.invalidate(location, login)
+
+ ### This is the stuff needed to add the Principal to the cache and to
+ ### make it containment friendly.
+ ### TODO: add the principal to the ldap server if it is a new one.
+ def __setitem__(self, login, obj):
+ obj.id = login
+ setitem(self, self._setitem, login, obj)
+
+ def _setitem(self, key, obj):
+ cache = getCacheForObject(self)
+ location = getLocationForCache(self)
+ if cache and location:
+ principal = cache.query(location, key)
+ if principal is None:
+ cache.set(obj, location, key)
+
+ def keys(self):
+ logins = []
+ l = self.__connect()
+ l.simple_bind_s(self.manager_dn, self.manager_passwd)
+ lsearch = l.search_s(self.basedn, ldap.SCOPE_ONELEVEL, '(%s=*)' %
+ self.login_attribute)
+ for node in lsearch:
+ node_dn, node_dict = node
+ logins.append(node_dict[self.login_attribute][0])
+ return logins
+
+ def __iter__(self):
+ return self.keys()
+
+ def __getitem__(self, key):
+ cache = getCacheForObject(self)
+ location = getLocationForCache(self)
+ if cache and location:
+ principal = cache.query(location, key)
+ if principal is None:
+ principal = self.__findInLDAP(key)
+
+ #XXX
+ # RuntimeError: maximum recursion depth exceeded
+ # this is calling __setitem__ if we call authenticate --> __setitem__ --> __getitem__ --> ...
-import ldap
-from persistent import Persistent
-from zope.app.container.contained import Contained
-from zope.app.pluggableauth.interfaces import \
- ILoginPasswordPrincipalSource, IContainerPrincipalSource
-from zope.app.location import locate
-from zope.app.pluggableauth import SimplePrincipal
-from zope.exceptions import NotFoundError
-from zope.interface import implements
-
-from interfaces import ILDAPBasedPrincipalSource
-
-class LDAPPrincipalSource(Contained, Persistent):
- """A Principal source using LDAP"""
- implements(ILoginPasswordPrincipalSource, ILDAPBasedPrincipalSource,
- IContainerPrincipalSource)
-
- def __init__(self, server=u'', port=389, basedn=u'',
- login_attribute=u'',
- manager_dn=u'', manager_passwd=u''):
- self.host = server
- self.port = port
- self.basedn = basedn
- self.login_attribute = login_attribute
- self.manager_dn = manager_dn
- self.manager_passwd = manager_passwd
-
- ### IContainer-related methods
-
- def __delitem__(self, login):
- pass
-
- # XXX We should use setitem from zope.app.container.contained
- # Would allow events and so on. This is just a test.
- # This way of registering the principal is somehow stupid since we must
- # use it each time a new principal is created. This is UGLY.
- def __setitem__(self, login, obj):
- obj.id = login
- obj.__parent__ = self
-
- def keys(self):
- pass
-
- def __iter__(self):
- pass
-
- def __getitem__(self):
- pass
-
- def get(self, key, default=None):
- pass
-
- def values(self):
- pass
-
- def __len__(self):
- pass
-
- def items(self):
- pass
-
- def __contains__(self):
- pass
-
- ### IPrincipalSource methods
-
- def getPrincipal(self, id):
- uid = id.split('\t')[2]
- l = self.__connect()
- l.simple_bind_s(self.manager_dn, self.manager_passwd)
- lsearch = l.search_s(self.basedn, ldap.SCOPE_ONELEVEL,
- '(%s=%s)' % (self.login_attribute, uid))
- if lsearch:
- uid_dn, uid_dict = lsearch[0]
- principal = SimplePrincipal(
- login = uid_dict[self.login_attribute][0],
- password = uid_dict['userPassword'][0])
- self.__setitem__(principal.login, principal)
- return principal
- else:
- raise NotFoundError, id
-
- def getPrincipals(self, name):
- if name == '' :
- search = '(%s=*)' % self.login_attribute
- else:
- search = '(%s=*%s*)' % (self.login_attribute, name)
- l = self.__connect()
- l.simple_bind_s(self.manager_dn, self.manager_passwd)
- lsearch = l.search_s(self.basedn, ldap.SCOPE_ONELEVEL, search)
-
- principals = []
- for userinfo in lsearch:
- uid_dn, uid_dict = userinfo
- principal = SimplePrincipal(
- login = uid_dict[self.login_attribute][0],
- password = uid_dict['userPassword'][0])
- self.__setitem__(principal.login, principal)
- principals.append(principal)
-
- return principals
-
- def authenticate(self, uid, password):
- if password:
- l = self.__connect()
- dn = '%s=%s,' % (self.login_attribute, uid) + self.basedn
- try:
- l.simple_bind_s(dn, password)
- principal = SimplePrincipal(login = uid, password = password)
- self.__setitem__(uid, principal)
- return principal
- except ldap.INVALID_CREDENTIALS:
- return None
- else:
- return None
-
- def __connect(self):
- conn = getattr(self, '_v_conn', None)
- if not conn:
- connectstring = 'ldap://%s:%s' % (self.host, self.port)
- connection = ldap.initialize(connectstring)
- self._v_conn = connection
- return connection
- else:
- return conn
+ #self[principal.login] = principal
+ else:
+ principal = self.__findInLDAP(key)
+ #XXX
+ # RuntimeError: maximum recursion depth exceeded
+ # this is calling __setitem__ if we call authenticate --> __setitem__ --> __getitem__ --> ...
+
+ #self[principal.login] = principal
+ return principal
+
+ def __findInLDAP(self, login):
+ l = self.__connect()
+ l.simple_bind_s(self.manager_dn, self.manager_passwd)
+ lsearch = l.search_s(self.basedn, ldap.SCOPE_ONELEVEL,
+ '(%s=%s)' % (self.login_attribute, login))
+ if lsearch:
+ uid_dn, uid_dict = lsearch[0]
+ principal = SimplePrincipal(
+ login = uid_dict[self.login_attribute][0],
+ password = uid_dict['userPassword'][0])
+ return principal
+
+ def get(self, key, default=None):
+ return self[key]
+
+ def values(self):
+ pass
+
+ def __len__(self):
+ pass
+
+ def items(self):
+ pass
+
+ def __contains__(self, key):
+ pass
+
+ ### IPrincipalSource methods
+
+ def getPrincipal(self, id):
+ uid = id.split('\t')[2]
+ principal = self[uid]
+ if principal:
+ return principal
+ else:
+ raise NotFoundError, id
+
+ def getPrincipals(self, name):
+ if name == '' :
+ search = '(%s=*)' % self.login_attribute
+ else:
+ search = '(%s=*%s*)' % (self.login_attribute, name)
+ l = self.__connect()
+ l.simple_bind_s(self.manager_dn, self.manager_passwd)
+ lsearch = l.search_s(self.basedn, ldap.SCOPE_ONELEVEL, search)
+
+ principals = []
+ for node in lsearch:
+ node_dn, node_dict = node
+ principal = SimplePrincipal(
+ login = node_dict[self.login_attribute][0],
+ password = node_dict['userPassword'][0])
+ self[principal.login] = principal
+ principals.append(principal)
+
+ return principals
+
+ def authenticate(self, uid, password):
+ if password:
+ l = self.__connect()
+ dn = '%s=%s,' % (self.login_attribute, uid) + self.basedn
+ try:
+ l.simple_bind_s(dn, password)
+ principal = SimplePrincipal(login = uid, password = password)
+ self.__setitem__(uid, principal)
+ return principal
+ except ldap.INVALID_CREDENTIALS:
+ return None
+ else:
+ return None
+
+ def __connect(self):
+ conn = getattr(self, '_v_conn', None)
+ if not conn:
+ connectstring = 'ldap://%s:%s' % (self.host, self.port)
+ connection = ldap.initialize(connectstring)
+ self._v_conn = connection
+ return connection
+ else:
+ return conn
+
More information about the Zope3-Checkins
mailing list