[Zope] UserDB plus crypt
Ed Colmar
edc@8days.com
Tue, 3 Oct 2000 14:24:42 -0600
--============_-1241517811==_============
Content-Type: text/plain; charset="us-ascii" ; format="flowed"
Since I couldn't get LoginManger to work, I started modifying
UserDb to handle encrypted passwords... I've changed it so the
encrypted password is stored in the DB, but I haven't been able to
get it to authenticate.
Can anyone offer any insight as to what I need to do?
My modified UserDb.py is attached.
Thanks!
-ed-
--============_-1241517811==_============
Content-Id: <a05001905b5ffefaf0f65@[204.144.132.38].0.0>
Content-Type: text/plain; name="UserDb.py"; charset="us-ascii" ; format="flowed"
Content-Disposition: attachment; filename="UserDb.py"
; modification-date="Tue, 3 Oct 2000 14:24:04 -0600"
"""User Db product
This product provides support for User Folder-like objects that
store data in back-end databases.
CREATE TABLE users (
username varchar(255) NOT NULL,
password varchar(255) NOT NULL,
domains varchar(255),
roles varchar(255)
);
"""
__version__='$Revision: 1.13 $'[11:-2]
import Globals, App.Undo, socket, regex
from Products.ZSQLMethods.SQL import SQL
from Globals import PersistentMapping
from Globals import Persistent
from Globals import HTMLFile, MessageDialog
from string import join,strip,split,lower
from App.Management import Navigation, Tabs
from Acquisition import Implicit
from OFS.SimpleItem import Item
from OFS.Document import Document
from base64 import decodestring, encodestring
from urllib import quote, unquote
from ImageFile import ImageFile
from AccessControl.Role import RoleManager
from AccessControl.User import BasicUser, BasicUserFolder
from string import split, join, upper, lower
## Crypt toolkit ##
import crypt, random
ListType=type([])
class User(BasicUser):
icon='misc_/UserDb/User_icon'
def __init__(self, name, password, roles, domains):
self.name =name
self.__ =password
self.roles =filter(None, map(strip, split(roles, ',')))
self.domains=filter(None, map(strip, split(domains, ',')))
def getUserName(self):
return self.name
def _getPassword(self):
return self.__
def getRoles(self):
return self.roles
def getDomains(self):
return self.domains
class UserDb(BasicUserFolder):
""" """
meta_type='User Db'
id ='acl_users'
title ='User Db'
icon ='misc_/UserDb/UserDb_icon'
isPrincipiaFolderish=1
isAUserFolder=1
encryptedpasswords=1
manage_options=(
{'label':'Contents', 'action':'manage_main'},
{'label':'Properties', 'action':'manage_properties'},
{'label':'Security', 'action':'manage_access'},
{'label':'Undo', 'action':'manage_UndoForm'},
)
__ac_permissions__=(
('View management screens',
['manage','manage_menu','manage_main','manage_copyright', 'manage_tabs',
'manage_properties', 'manage_UndoForm']),
('Undo changes', ['manage_undo_transactions']),
('Change permissions', ['manage_access']),
('Manage users', ['manage_users']),
('Change User Dbs', ['manage_edit']),
)
manage_properties=HTMLFile('properties', globals())
def __init__(self, conn_id, cookie_mode=0):
self.conn_id=conn_id
self.cookie_mode=cookie_mode
self.docLogin =Document(_docLogin, __name__='docLogin')
self.docLogout=Document(_docLogout, __name__='docLogout')
self.docLogin.__roles__=None
self.docLogout.__roles__=None
self.sqlListQuery=SQL('sqlListQuery', '', conn_id,
'',
_sqlListQuery)
self.sqlUserQuery=SQL('sqlUserQuery', '', conn_id,
'username',
_sqlUserQuery)
self.sqlAddQuery=SQL('sqlAddQuery', '', conn_id,
'username password domains roles',
_sqlAddQuery)
self.sqlEditQuery=SQL('sqlEditQuery', '', conn_id,
'username password domains roles',
_sqlEditQuery)
self.sqlDelQuery=SQL('sqlDelQuery', '', conn_id,
'username',
_sqlDelQuery)
def getUserNames(self):
"""Returns a list of user names or [] if no users exist"""
data=[]
try: items=self.sqlListQuery()
except: return data
for ob in items:
data.append(sqlattr(ob, 'username'))
data.sort()
return data
def getUsers(self):
"""Return a list of user objects or [] if no users exist"""
data=[]
try: items=self.sqlListQuery()
except: return data
for ob in items:
user=User(sqlattr(ob, 'username'),
sqlattr(ob, 'password'),
sqlattr(ob, 'roles'),
sqlattr(ob, 'domains'))
data.append(user)
return data
def getUser(self, name):
"""Return the named user object or None if no such user exists"""
if name==self._super.getUserName():
return self._super
try: ob=self.sqlUserQuery(username=name)
except: return None
if not ob:
return None
ob=ob[0]
return User(sqlattr(ob, 'username'),
sqlattr(ob, 'password'),
sqlattr(ob, 'roles'),
sqlattr(ob, 'domains'))
def _doAddUser(self, name, password, roles, domains):
if encryptedpasswords:
encryptedpass = str(crypt.crypt(password,'ab'))
self.sqlAddQuery(username=name, password=encryptedpass,
roles=join(roles, ','),
domains=join(domains, ',')
)
else:
self.sqlAddQuery(username=name, password=password,
roles=join(roles, ','),
domains=join(domains, ',')
)
def _doChangeUser(self, name, password, roles, domains):
if encryptedpasswords:
encryptedpass = str(crypt.crypt(password,'ab'))
self.sqlEditQuery(username=name, password=encryptedpass,
roles=join(roles, ','),
domains=join(domains, ',')
)
else:
self.sqlEditQuery(username=name, password=password,
roles=join(roles, ','),
domains=join(domains, ',')
)
def _doDelUsers(self, names):
for name in names:
self.sqlDelQuery(username=name)
# ----
def old_std_validate(self,request,auth='',roles=None):
parent=request['PARENTS'][0]
# If no authorization, only a user with a
# domain spec and no passwd or nobody can
# match
if not auth:
for ob in self.getUsers():
domains=ob.getDomains()
if domains:
if ob.authenticate('', request):
if ob.allowed(parent, roles):
ob=ob.__of__(self)
return ob
return None
# Only do basic authentication
if lower(auth[:6])!='basic ':
return None
name,password=tuple(split(decodestring(split(auth)[-1]), ':'))
user=self.getUser(name)
if user is None:
return None
# Try to authenticate user
encryptedpass = str(crypt.crypt(str(password),'ab'))
#print encryptedpass
if not user.authenticate(encryptedpass, request):
return None
# We need the user to be able to acquire!
user=user.__of__(self)
# Try to authorize user
if user.allowed(parent, roles):
return user
return None
std_validate=BasicUserFolder.validate
def cookie_validate(self,request,auth='',roles=None):
parent=request['PARENTS'][0]
req_has=request.has_key
resp=request['RESPONSE']
if req_has('__ac'):
c=request['__ac']
c=unquote(c)
try: c=decodestring(c)
except:
resp.expireCookie('__ac', path='/')
raise 'LoginRequired', self.docLogin(self, request)
name,password=tuple(split(c, ':'))
encryptedpass = str(crypt.crypt(password,'ab'))
# Find user
user=self.getUser(name)
if user is not None:
if user.authenticate(encryptedpass, request):
user=user.__of__(self)
if user.allowed(parent, roles):
return user
resp.expireCookie('__ac', path='/')
raise 'LoginRequired', self.docLogin(self, request)
if req_has('__ac_name') and req_has('__ac_password'):
name=request['__ac_name']
password=request['__ac_password']
encryptedpass = str(crypt.crypt(password,'ab'))
# Find user
user=self.getUser(name)
if user is not None:
if user.authenticate(encryptedpass, request):
user=user.__of__(self)
if user.allowed(parent, roles):
token='%s:%s' % (name, password)
token=encodestring(token)
token=quote(token)
resp.setCookie('__ac', token, path='/')
request['__ac']=token
try:
del request['__ac_name']
del request['__ac_password']
except:
pass
return user
raise 'LoginRequired', self.docLogin(self, request)
if auth: return None
# Try domain matching
for ob in self.getUsers():
domains=ob.getDomains()
if ob.domains:
if ob.authenticate('', request):
if ob.allowed(parent, roles):
ob=ob.__of__(self)
return ob
# Allow anonymous access to things under UserDbs,
# if the object is public.
nobody=self._nobody
if nobody.allowed(parent, roles):
ob=nobody.__of__(self)
return ob
raise 'LoginRequired', self.docLogin(self, request)
def validate(self,request,auth='',roles=None):
if self.cookie_mode:
return self.cookie_validate(request, auth, roles)
return self.std_validate(request, auth, roles)
def __len__(self):
return len(self.sqlListQuery())
def enumQueries(self):
list=[]
type=SQL.meta_type
for ob in self.__dict__.values():
if hasattr(ob, 'meta_type') and ob.meta_type==type:
list.append(ob)
return list
def enumDocuments(self):
list=[]
type=Document.meta_type
for ob in self.__dict__.values():
if hasattr(ob, 'meta_type') and ob.meta_type==type:
list.append(ob)
return list
def manage_edit(self, connection_id, cookie_mode, REQUEST=None):
"""Change properties"""
if connection_id != self.conn_id:
# Connection changed - update sql methods!
self.conn_id=connection_id
for ob in self.enumQueries():
title=ob.title
args=ob.arguments_src
src=ob.src
ob.manage_edit(title, self.conn_id, args, src)
self.cookie_mode=cookie_mode
if REQUEST:
return MessageDialog(
title ='UserDb Changed',
message='UserDb properties have been updated',
action =REQUEST['URL1']+'/manage_properties',
target ='manage_main')
logout__roles__=None
def logout(self, REQUEST):
"""Logout"""
REQUEST['RESPONSE'].expireCookie('__ac', path='/')
return self.docLogout(self, REQUEST)
manage_addUserDbForm=HTMLFile('addUserDb', globals())
def manage_addUserDb(self, connection_id, cookie_mode=0, REQUEST=None):
""" """
if hasattr(self.aq_base, 'acl_users'):
return MessageDialog(
title ='Item Exists',
message='This object already contains a User Folder',
action ='%s/manage_main' % REQUEST['PARENT_URL'])
ob=UserDb(connection_id, cookie_mode)
self._setObject('acl_users', ob)
self.__allow_groups__=self.acl_users
if REQUEST: return self.manage_main(self,REQUEST,update_menu=1)
addr_match=regex.compile('[0-9\.\*]*').match
host_match=regex.compile('[A-Za-z0-9\.\*]*').match
def domainSpecValidate(spec):
for ob in spec:
sz=len(ob)
if not ((addr_match(ob) == sz) or (host_match(ob) == sz)):
return 0
return 1
def domainSpecMatch(spec, request):
if request.has_key('REMOTE_HOST'):
host=request['REMOTE_HOST']
else: host=''
if request.has_key('REMOTE_ADDR'):
addr=request['REMOTE_ADDR']
else: addr=''
if not host and not addr:
return 0
if not host:
host=socket.gethostbyaddr(addr)[0]
if not addr:
addr=socket.gethostbyname(host)
_host=split(host, '.')
_addr=split(addr, '.')
_hlen=len(_host)
_alen=len(_addr)
for ob in spec:
sz=len(ob)
_ob=split(ob, '.')
_sz=len(_ob)
if addr_match(ob)==sz:
if _sz != _alen:
continue
fail=0
for i in range(_sz):
a=_addr[i]
o=_ob[i]
if (o != a) and (o != '*'):
fail=1
break
if fail:
continue
return 1
if host_match(ob)==sz:
if _hlen < _sz:
continue
elif _hlen > _sz:
_item=_host[-_sz:]
else:
_item=_host
fail=0
for i in range(_sz):
h=_item[i]
o=_ob[i]
if (o != h) and (o != '*'):
fail=1
break
if fail:
continue
return 1
return 0
# A getattr-like helper that hopefully can compensate for the
# various oddities in data returned by different stupidbases.
import Missing
mt=type(Missing.Value)
def typeconv(val):
if type(val)==mt:
return ''
return val
def sqlattr(ob, attr):
name=attr
if hasattr(ob, attr):
return typeconv(getattr(ob, attr))
attr=upper(attr)
if hasattr(ob, attr):
return typeconv(getattr(ob, attr))
attr=lower(attr)
if hasattr(ob, attr):
return typeconv(getattr(ob, attr))
raise NameError, name
def absattr(attr):
if callable(attr): return attr()
return attr
def reqattr(request, attr):
try: return request[attr]
except: return None
_docLogin="""<HTML>
<HEAD>
<TITLE>Login</TITLE>
</HEAD>
<BODY BGCOLOR="#FFFFFF" LINK="#000099" VLINK="#555555">
<P>
<FORM ACTION="<!--#var SCRIPT_NAME--><!--#var PATH_INFO--><!--#if
QUERY_STRING--
>?<!--#var QUERY_STRING--><!--#/if-->" METHOD="POST">
<TABLE>
<TR>
<TD ALIGN="LEFT" VALIGN="TOP">
<STRONG>Name</STRONG>
</TD>
<TD ALIGN="LEFT" VALIGN="TOP">
<INPUT TYPE="TEXT" NAME="__ac_name" SIZE="20">
</TD>
</TR>
<TR>
<TD ALIGN="LEFT" VALIGN="TOP">
<STRONG>Password</STRONG>
</TD>
<TD ALIGN="LEFT" VALIGN="TOP">
<INPUT TYPE="PASSWORD" NAME="__ac_password" SIZE="20">
</TD>
</TR>
<TR>
<TD ALIGN="LEFT" VALIGN="TOP">
</TD>
<TD ALIGN="LEFT" VALIGN="TOP">
<INPUT TYPE="SUBMIT" NAME="submit" VALUE=" Ok ">
</TD>
</TR>
</TABLE>
</FORM>
</BODY>
</HTML>"""
_docLogout="""<HTML>
<HEAD>
<TITLE>Logout</TITLE>
</HEAD>
<BODY BGCOLOR="#FFFFFF" LINK="#000099" VLINK="#555555">
<P>
<CENTER>
You have been logged out of the system.
</CENTER>
</BODY>
</HTML>"""
_sqlListQuery="""<!--#comment-->
This query retrieves rows representing all users
schema: username password domains roles
<!--#/comment-->
SELECT * FROM users
"""
_sqlUserQuery="""<!--#comment-->
This query retrieves a row representing a given user
schema: username password domains roles
<!--#/comment-->
SELECT * FROM users
<!--#sqlgroup required where-->
<!--#sqltest username column="username" type="string"-->
<!--#/sqlgroup-->
"""
_sqlAddQuery="""<!--#comment-->
This query adds a new user
<!--#/comment-->
INSERT INTO users VALUES (
<!--#sqlvar username type="string"-->,
<!--#sqlvar password type="string"-->,
<!--#sqlvar domains type="string"-->,
<!--#sqlvar roles type="string"-->
)
"""
_sqlEditQuery="""<!--#comment-->
This query updates a users data
<!--#/comment-->
UPDATE users SET
password=<!--#sqlvar password type="string"-->,
domains=<!--#sqlvar domains type="string"-->,
roles=<!--#sqlvar roles type="string"-->
<!--#sqlgroup required where-->
<!--#sqltest username column="username" type="string"-->
<!--#/sqlgroup-->
"""
_sqlDelQuery="""<!--#comment-->
This query removes a user
<!--#/comment-->
DELETE from users
<!--#sqlgroup required where-->
<!--#sqltest username column="username" type="string"-->
<!--#/sqlgroup-->
"""
import __init__
__init__.need_license=1
--============_-1241517811==_============--