[Zope3-checkins] CVS: Zope3/src/zope/fssync - passwd.py:1.1
Fred L. Drake, Jr.
fred at zope.com
Wed Aug 27 16:33:49 EDT 2003
Update of /cvs-repository/Zope3/src/zope/fssync
In directory cvs.zope.org:/tmp/cvs-serv14786
Added Files:
passwd.py
Log Message:
New password manager for zsync to allow checkouts to not contain everything
required to authenticate to the server. This implements a persistent cache
for these tokens stored in ~/.zsyncpass (similar to ~/.cvspass).
=== Added File Zope3/src/zope/fssync/passwd.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Password manager for fssync clients.
$Id: passwd.py,v 1.1 2003/08/27 19:33:49 fdrake Exp $
"""
import base64
import httplib
import os
from cStringIO import StringIO
DEFAULT_FILENAME = os.path.expanduser(os.path.join("~", ".zsyncpass"))
class PasswordManager(object):
"""Manager for a cache of basic authentication tokens for zsync.
This stores tokens in a file, and allows them to be retrieved by
the zsync application. The tokens are stored in their 'cooked'
form, so while someone could easily decode them or use them to
make requests, the casual reader won't be able to use them easily.
The cache file is created with restricted permissions, so other
users should not be able to read it unless the permissions are
modified.
"""
def __init__(self, filename=None):
if not filename:
filename = DEFAULT_FILENAME
self.authfile = filename
def getPassword(self, user, host_port):
"""Read a password from the user."""
import getpass
prompt = "Password for %s at %s: " % (user, host_port)
return getpass.getpass(prompt)
def createToken(self, user_passwd):
"""Generate a basic authentication token from 'user:password'."""
return base64.encodestring(user_passwd).strip()
def getToken(self, scheme, host_port, user):
"""Get an authentication token for the user for a specific server.
If a corresponding token exists in the cache, that is retured,
otherwise the user is prompted for their password and a new
token is generated. A new token is not automatically stored
in the cache.
"""
host_port = _normalize_host(scheme, host_port)
prefix = [scheme, host_port, user]
if os.path.exists(self.authfile):
f = open(self.authfile, "r")
try:
for line in f:
line = line.strip()
if line[:1] in ("#", ""):
continue
parts = line.split()
if parts[:3] == prefix:
return parts[3]
finally:
f.close()
# not in ~/.zsyncpass
pw = self.getPassword(user, host_port)
user_passwd = "%s:%s" % (user, pw)
return self.createToken(user_passwd)
def addToken(self, scheme, host_port, user, token):
"""Add a token to the persistent cache.
If a corresponding token already exists in the cache, it is
replaced.
"""
host_port = _normalize_host(scheme, host_port)
record = "%s %s %s %s\n" % (scheme, host_port, user, token)
if os.path.exists(self.authfile):
prefix = [scheme, host_port, user]
f = open(self.authfile)
sio = StringIO()
found = False
for line in f:
parts = line.split()
if parts[:3] == prefix:
sio.write(record)
found = True
else:
sio.write(line)
f.close()
if not found:
sio.write(record)
text = sio.getvalue()
else:
text = record
f = self.createAuthFile()
f.write(text)
f.close()
def removeToken(self, scheme, host_port, user):
"""Remove a token from the authentication database.
Returns True if a token was found and removed, or False if no
matching token was found.
If the resulting cache file contains only blank lines, it is
removed.
"""
if not os.path.exists(self.authfile):
return False
host_port = _normalize_host(scheme, host_port)
prefix = [scheme, host_port, user]
found = False
sio = StringIO()
f = open(self.authfile)
nonblank = False
for line in f:
parts = line.split()
if parts[:3] == prefix:
found = True
else:
if line.strip():
nonblank = True
sio.write(line)
f.close()
if found:
if nonblank:
text = sio.getvalue()
f = self.createAuthFile()
f.write(text)
f.close()
else:
# nothing left in the file but blank lines; remove it
os.unlink(self.authfile)
return found
def createAuthFile(self):
"""Create the token cache file with the right permissions."""
new = not os.path.exists(self.authfile)
if os.name == "posix":
old_umask = os.umask(0077)
try:
f = open(self.authfile, "w", 0600)
finally:
os.umask(old_umask)
else:
f = open(self.authfile, "w")
if new:
f.write(_NEW_FILE_HEADER)
return f
_NEW_FILE_HEADER = """\
#
# Stored authentication tokens for zsync.
# Manipulate this data using the 'zsync login' and 'zsync logout';
# read the zsync documentation for more information.
#
"""
def _normalize_host(scheme, host_port):
if scheme == "http":
return _normalize_port(host_port, httplib.HTTP_PORT)
elif scheme == "https":
return _normalize_port(host_port, httplib.HTTPS_PORT)
else:
raise fsutil.Error("unsupported URL scheme: %r" % scheme)
def _normalize_port(host_port, default_port):
if ":" in host_port:
host, port = host_port.split(":", 1)
try:
port = int(port)
except ValueError:
raise fsutil.Error("invalid port specification: %r" % port)
if port <= 0:
raise fsutil.Error("invalid port: %d" % port)
if port == default_port:
host_port = host
return host_port.lower()
More information about the Zope3-Checkins
mailing list